![Flink设计与实现:核心原理与源码解析](https://wfqqreader-1252317822.image.myqcloud.com/cover/437/39980437/b_39980437.jpg)
2.1.1 DataStream API应用实例
我们以DataStream API构建WordCount作业为例,简单介绍一下DataStream构建作业的主要组成部分,如代码清单2-1所示。
代码清单2-1 基于DataStream API构建的WordCount作业实例
public class WordCount { public static void main(String[] args) throws Exception { // StreamExecutionEnvironment初始化 final StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); // 业务逻辑转换代码 DataStream<String> text = env.readTextFile("the_path_for_input"); DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()) .keyBy(0).sum(1); counts.writeAsText("the_path_for_output"); // 执行应用程序 env.execute("Streaming WordCount"); } }
从WordCount代码实例可以看出,DataStream程序主要包括以下3个部分。
·StreamExecutionEnvironment初始化:该部分主要创建和初始化StreamExecutionEnvironment,提供通过DataStream API构建Flink作业需要的执行环境,包括设定ExecutionConfig、CheckpointConfig等配置信息以及StateBackend和TimeCharacteristic等变量。
·业务逻辑转换代码:该模块是用户编写转换逻辑的区域,在treamExecutionEnvironment中提供了创建DataStream的方法,例如通过StreamExecutionEnvironment.readTextFile()方法读取文本数据并构建DataStreamSource数据集,之后所有的DataStream转换操作都会以DataStreamSource为头部节点。同时,DataStream API中提供了各种转换操作,例如map、reduce、join等算子,用户可以通过这些转换操作构建完整的Flink计算逻辑。
·执行应用程序:编写完Flink应用后,必须调用ExecutionEnvironment.execute()方法执行整个应用程序,在execute()方法中会基于DataStream之间的转换操作生成StreamGraph,并将StreamGraph结构转换为JobGraph,最终将JobGraph提交到指定的Session集群中运行。
1.DataStream的主要成员
DataStream代表一系列同类型数据的集合,可以通过转换操作生成新的DataStream。DataStream用于表达业务转换逻辑,实际上并没有存储真实数据。
DataStream数据结构包含两个主要成员:StreamExecutionEnvironment和Transformation< T > transformation。其中transformation是当前DataStream对应的上一次的转换操作,换句话讲,就是通过transformation生成当前的DataStream。
当用户通过DataStream API构建Flink作业时,StreamExecutionEnvironment会将DataStream之间的转换操作存储至StreamExecutionEnvironment的List<Transformation<?>> transformations集合,然后基于这些转换操作构建作业Pipeline拓扑,用于描述整个作业的计算逻辑。其中流式作业对应的Pipeline实现类为StreamGraph,批作业对应的Pipeline实现类为Plan。
如图2-1所示,DataStream之间的转换操作都是通过StreamTransformation结构展示的,例如当用户执行DataStream.map()方法转换时,底层对应的便是OneInputTransformation转换操作。
![](https://epubservercos.yuewen.com/98D1A5/20818201201955806/epubprivate/OEBPS/Images/2-1.jpg?sign=1738923331-vGZo0tcXVVjGL8vmvEtxkvrR4sXWXKvT-0-417cd90e3586fbc3049fed7a13320e02)
图2-1 DataStream API的主要组成
每个StreamTransformation都包含相应的StreamOperator,例如执行DataStream.map-(new MapFunction(...))转换之后,内部生成的便是StreamMap算子。StreamOperator涵盖了用户自定义函数的信息,如图2-1所示,StreamMap算子包含了MapFunction。MapFunction就是用户自定义的map转换函数。当然还有其他类型的函数,例如ProcessFunction、SourceFunction和SinkFunction等,不同的转换操作,对应的函数也有所不同。
通常情况下,用户是不直接参与定义StreamOperator的,而是由Flink根据用户执行的DataStream转换操作以及函数共同生成StreamOperator,之后Task运行时会运行定义的StreamOperator。
2.StreamMap实例
我们以DataStream中的map转换操作为例,对DataStream底层源码实现进行说明。如代码清单2-2所示,首先自定义MapFunction实现数据处理逻辑,然后调用DataStream.map()方法将MapFunction作为参数应用在map转换操作中。在DataStream.map()方法中可以看出,实际调用了transform()方法进行后续的转换处理,且调用过程会基于MapFunction参数创建StreamMap实例,StreamMap实际上就是StreamOperator的实现子类。
代码清单2-2 DataStream.map()方法定义
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) { return transform("Map", outputType, new StreamMap<>(clean(mapper))); }
接下来在DataStream.transform()方法中调用doTransform()方法继续进行转换操作。如代码清单2-3所示,DataStream.doTransform()方法主要包含如下逻辑。
·从上一次转换操作中获取TypeInformation信息,确定没有出现MissingTypeInfo错误,以确保下游算子转换不会出现问题。
·基于operatorName、outTypeInfo和operatorFactory等参数创建OneInputTransformation实例,注意OneInputTransformation也会包含当前DataStream对应的上一次转换操作。
·基于OneInputTransformation实例创建SingleOutputStreamOperator。SingleOutputStreamOperator继承了DataStream类,属于特殊的DataStream,主要用于每次转换操作后返回给用户继续操作的数据结构。SingleOutputStreamOperator额外提供了returns()、disableChaining()等方法供用户使用。
·调用getExecutionEnvironment().addOperator(resultTransform)方法,将创建好的OneInputTransformation添加到StreamExecutionEnvironment的Transformation集合中,用于生成StreamGraph对象。
·将returnStream返回给用户,继续执行后续的转换操作。基于这样连续的转换操作,将所有DataStream之间的转换按顺序存储在StreamExecutionEnvironment中。
代码清单2-3 DataStream.doTransform()方法定义
protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // 获取上一次转换操作输出的TypeInformation信息 transformation.getOutputType(); // 创建OneInputTransformation OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); @SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator (environment, resultTransform); // 添加Transformation getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
在DataStream转换的过程中,不管是哪种类型的转换操作,都是按照同样的方式进行的:首先将用户自定义的函数封装到Operator中,然后将Operator封装到Transformation转换操作结构中,最后将Transformation写入StreamExecutionEnvironment提供的Transformation集合。通过DataStream之间的转换操作形成Pipeline拓扑,即StreamGraph数据结构,最终通过StreamGraph生成JobGraph并提交到集群上运行。
接下来我们抽丝剥茧,分别从Transformation、Operator和Function三个层面,介绍DataStream API中每个层面对应的主要设计与实现。