DataStream API

介绍

DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心 API 就以 DataStream 命名。

对于批处理和流处理,我们都可以用这同一套 API 来实现。

其用法上,有些类似于常规的 Java 集合,但又有所不同。

我们在代码中往往并 不关心集合中具体的数据,而只是用 API 定义出一连串的操作来处理它们;这就叫作数据流 的“转换”(transformations)。

一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分组成:

  • 获取执行环境(execution environment)
  • 读取数据源(source)
  • 定义基于数据的转换操作(transformations)
  • 定义计算结果的输出位置(sink)
  • 触发程序执行(execute)

其中,获取环境和触发执行,都可以认为是针对执行环境的操作。

Flink 程序的构成

执行环境

Flink 程序可以在各种上下文环境中运行

  • 本地 JVM
  • 远程集群

不同的环境,代码的提交运行的过程会有所不同。

这就要求我们在提交作业执行计算时, 首先必须获取当前 Flink 的运行环境,从而建立起与 Flink 框架之间的联系。

只有获取了环境 上下文信息,才能将具体的任务调度到不同的 TaskManager 执行。

创建执行环境

getExecutionEnvironment

这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式

1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

createLocalEnvironment

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果 不传入,则默认并行度就是本地的 CPU 核心数。

1
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

createRemoteEnvironment

这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定 要在集群中运行的 Jar 包。

1
2
3
4
5
6
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager 主机名
1234, // JobManager 进程端口号
"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
);

在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。

比如可以全局设置程 序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

关于时间语义和容错 机制,会在后续的章节介绍。

执行模式

上节中我们获取到的执行环境,是一个 StreamExecutionEnvironment,顾名思义它应该是 做流处理的。

那对于批处理,又应该怎么获取执行环境呢?

在之前的 Flink 版本中,批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment 的静态方法,返回它的对象:

1
2
3
4
// 批处理环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

而从 1.12.0 版本起,Flink 实现了 API 上的流批统一。

DataStream API 新增了一个重要特 性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序 在流处理和批处理之间切换。

这样一来,DataSet API 也就没有存在的必要了。

流执行模式(默认)

这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。

批执行模式

专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。

对于不会持续计算的有界数据,我们用这种模式处理会更方便。

自动模式

在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

配置方法

由于 Flink 程序默认是 STREAMING 模式,

我们这里重点介绍一下 BATCH 模式的配置。 主要有两种方式:

  • 命令行配置
1
bin/flink run -Dexecution.runtime-mode=BATCH
  • 代码配置
1
2
3
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

在代码中,直接基于执行环境调用 setRuntimeMode 方法,传入 BATCH 模式。

建议:

不要在代码中配置,而是使用命令行。

这同设置并行度是类似的:在提交作业时指定参数可以更加灵活,同一段应用程序写好之后,既可以用于批处理也可以用于流处理。

而在代码中硬编码(hard code)的方式可扩展性比较差,一般都不推荐。

创建程序执行


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!