Flink设计与实现:核心原理与源码解析
上QQ阅读APP看书,第一时间看更新

2.1.1 DataStream API应用实例

我们以DataStream API构建WordCount作业为例,简单介绍一下DataStream构建作业的主要组成部分,如代码清单2-1所示。

代码清单2-1 基于DataStream API构建的WordCount作业实例


public class WordCount {
   public static void main(String[] args) throws Exception {
      // StreamExecutionEnvironment初始化
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.
         getExecutionEnvironment();
      // 业务逻辑转换代码
      DataStream<String> text = env.readTextFile("the_path_for_input");
      DataStream<Tuple2<String, Integer>> counts =
         text.flatMap(new Tokenizer())
         .keyBy(0).sum(1);
      counts.writeAsText("the_path_for_output");
      // 执行应用程序
      env.execute("Streaming WordCount");
   }
}

从WordCount代码实例可以看出,DataStream程序主要包括以下3个部分。

·StreamExecutionEnvironment初始化:该部分主要创建和初始化StreamExecutionEnvironment,提供通过DataStream API构建Flink作业需要的执行环境,包括设定ExecutionConfig、CheckpointConfig等配置信息以及StateBackend和TimeCharacteristic等变量。

·业务逻辑转换代码:该模块是用户编写转换逻辑的区域,在treamExecutionEnvironment中提供了创建DataStream的方法,例如通过StreamExecutionEnvironment.readTextFile()方法读取文本数据并构建DataStreamSource数据集,之后所有的DataStream转换操作都会以DataStreamSource为头部节点。同时,DataStream API中提供了各种转换操作,例如map、reduce、join等算子,用户可以通过这些转换操作构建完整的Flink计算逻辑。

·执行应用程序:编写完Flink应用后,必须调用ExecutionEnvironment.execute()方法执行整个应用程序,在execute()方法中会基于DataStream之间的转换操作生成StreamGraph,并将StreamGraph结构转换为JobGraph,最终将JobGraph提交到指定的Session集群中运行。

1.DataStream的主要成员

DataStream代表一系列同类型数据的集合,可以通过转换操作生成新的DataStream。DataStream用于表达业务转换逻辑,实际上并没有存储真实数据。

DataStream数据结构包含两个主要成员:StreamExecutionEnvironment和Transformation< T > transformation。其中transformation是当前DataStream对应的上一次的转换操作,换句话讲,就是通过transformation生成当前的DataStream。

当用户通过DataStream API构建Flink作业时,StreamExecutionEnvironment会将DataStream之间的转换操作存储至StreamExecutionEnvironment的List<Transformation<?>> transformations集合,然后基于这些转换操作构建作业Pipeline拓扑,用于描述整个作业的计算逻辑。其中流式作业对应的Pipeline实现类为StreamGraph,批作业对应的Pipeline实现类为Plan。

如图2-1所示,DataStream之间的转换操作都是通过StreamTransformation结构展示的,例如当用户执行DataStream.map()方法转换时,底层对应的便是OneInputTransformation转换操作。

图2-1 DataStream API的主要组成

每个StreamTransformation都包含相应的StreamOperator,例如执行DataStream.map-(new MapFunction(...))转换之后,内部生成的便是StreamMap算子。StreamOperator涵盖了用户自定义函数的信息,如图2-1所示,StreamMap算子包含了MapFunction。MapFunction就是用户自定义的map转换函数。当然还有其他类型的函数,例如ProcessFunction、SourceFunction和SinkFunction等,不同的转换操作,对应的函数也有所不同。

通常情况下,用户是不直接参与定义StreamOperator的,而是由Flink根据用户执行的DataStream转换操作以及函数共同生成StreamOperator,之后Task运行时会运行定义的StreamOperator。

2.StreamMap实例

我们以DataStream中的map转换操作为例,对DataStream底层源码实现进行说明。如代码清单2-2所示,首先自定义MapFunction实现数据处理逻辑,然后调用DataStream.map()方法将MapFunction作为参数应用在map转换操作中。在DataStream.map()方法中可以看出,实际调用了transform()方法进行后续的转换处理,且调用过程会基于MapFunction参数创建StreamMap实例,StreamMap实际上就是StreamOperator的实现子类。

代码清单2-2 DataStream.map()方法定义


public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, 
   TypeInformation<R> outputType) {
   return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}

接下来在DataStream.transform()方法中调用doTransform()方法继续进行转换操作。如代码清单2-3所示,DataStream.doTransform()方法主要包含如下逻辑。

·从上一次转换操作中获取TypeInformation信息,确定没有出现MissingTypeInfo错误,以确保下游算子转换不会出现问题。

·基于operatorName、outTypeInfo和operatorFactory等参数创建OneInputTransformation实例,注意OneInputTransformation也会包含当前DataStream对应的上一次转换操作。

·基于OneInputTransformation实例创建SingleOutputStreamOperator。SingleOutputStreamOperator继承了DataStream类,属于特殊的DataStream,主要用于每次转换操作后返回给用户继续操作的数据结构。SingleOutputStreamOperator额外提供了returns()、disableChaining()等方法供用户使用。

·调用getExecutionEnvironment().addOperator(resultTransform)方法,将创建好的OneInputTransformation添加到StreamExecutionEnvironment的Transformation集合中,用于生成StreamGraph对象。

·将returnStream返回给用户,继续执行后续的转换操作。基于这样连续的转换操作,将所有DataStream之间的转换按顺序存储在StreamExecutionEnvironment中。

代码清单2-3 DataStream.doTransform()方法定义


protected <R> SingleOutputStreamOperator<R> doTransform(
      String operatorName,
      TypeInformation<R> outTypeInfo,
      StreamOperatorFactory<R> operatorFactory) {
   // 获取上一次转换操作输出的TypeInformation信息
   transformation.getOutputType();
   // 创建OneInputTransformation
   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
         this.transformation,
         operatorName,
         operatorFactory,
         outTypeInfo,
         environment.getParallelism());
   @SuppressWarnings({"unchecked", "rawtypes"})
   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator
      (environment, resultTransform);
   // 添加Transformation
   getExecutionEnvironment().addOperator(resultTransform);
   return returnStream;
}

在DataStream转换的过程中,不管是哪种类型的转换操作,都是按照同样的方式进行的:首先将用户自定义的函数封装到Operator中,然后将Operator封装到Transformation转换操作结构中,最后将Transformation写入StreamExecutionEnvironment提供的Transformation集合。通过DataStream之间的转换操作形成Pipeline拓扑,即StreamGraph数据结构,最终通过StreamGraph生成JobGraph并提交到集群上运行。

接下来我们抽丝剥茧,分别从Transformation、Operator和Function三个层面,介绍DataStream API中每个层面对应的主要设计与实现。