文章目录
-
- 1. flink 三种time简介
-
- 1.1 Processing Time:
- 1.2 Event time:
- 1.3 Ingestion time:
- 2. flink中使用event-time
-
- 2.1 在stream-source中直接放入event-time
- 2.2 使用Timestamp Assigners / Watermark Generators
-
- 2.2.1 AssignerWithPeriodicWatermarks
- 2.2.2 AssignerWithPunctuatedWatermarks
- 3. window的两种使用方式
-
- 3.1 evn设置TimeCharacteristic
- 3.2 创建window的时候指定window的类型
- 4. 使用kafka record 的timestamp作为event的timestamp
- 5. watermark的注意事项
-
- 5.1 . watermark要满足递增特性
- 5.2. 多个输入流需要特别注意
- 6. 连续多个window的计算
1. flink 三种time简介
flink中有三种时间: processing-time, event-time, ingestion-time
1.1 Processing Time:
Processing Time是指执行程序时对应的物理机系统时间。
当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。例如:一个每小时处理的时间窗口将包括所有在系统指定的一个小时内到达指定操作的所有记录。
处理时间是最简单的时间概念,不需要流和物理机之间的协调,它有最好的性能和最低的延迟。然而,在分布式或者异步环境中,因为受到记录到达系统时间的影响,处理时间不能够决定系统内操作之间记录流的速度。
对于简单的总量统计模型,则可以采用processing-time 比如统计用户的发帖总量,实际上对数据是否有序并不敏感。
1.2 Event time:
Event Time是每个独立事件在产生它的设备上发生的时间,这个时间通常在事件进入Flink之前就已经嵌入到事件中了,并且事件的timestamp是可以从每一个record中抽取出来的。
事件时间可以通过备份或者持久化日志获取无序数据、延迟事件或者重试数据的正确结果。在事件时间中,时间进度依赖于数据而不是其他形式的时钟。事件时间程序必须要指定如何产生事件时间水印(Event Time Watermarks),这是事件时间处理进度的信号机制,这个机制在下面描述。对于很多有明确时间以来的数据比较有用,比如统计用户过去5分钟内的发帖量,则是有必要使用event-time的,否则如果处理已经堆积的数据,使用proccessing-time则明显会填进去很多不是对应时间段的数据。
使用event-time,需要我们做两个工作,
1.为每个record提取他的timestamp,每个事件都要有一个timestamp
2.产生watermark,这个watermark是在整个流当中都会起作用的
1.3 Ingestion time:
摄入时间(Ingestion Time)是事件进入Flink的时间,在源操作中每个记录都会获得源的当前时间作为时间戳,后续基于时间的操作(如: time window)会依赖这个时间戳
摄入时间从概念上来讲是在event-time和processing-time之间,与处理时间相比,成本可能会高一点,但是会提供更加可预测的结果。因为摄入时间使用的是固定的时间戳(都是在源处指定的),记录中的不同窗口操作依赖同一个时间戳,而在处理时间中每个窗口操作可能将记录赋给不同的窗口(根据本地的系统时钟和传输时延)。
与事件时间相比,摄入时间程序不能处理任何无序事件或者延迟事件,但是程序无需指定如何产生水印。
2. flink中使用event-time
flink提供了两种抽取event-time的方式
- 在stream-source中直接放入event-time并且产生watermark
- 使用Timestamp Assigners / Watermark Generators 来产生event-time和water-mark
2.1 在stream-source中直接放入event-time
DataStreamSource<MyEvent> dataStreamSource = env.addSource(new SourceFunction<MyEvent>() {@Override
public void run(SourceContext<MyType> ctx) throws Exception {while (/* condition */) {MyType next = getNext();ctx.collectWithTimestamp(next, next.getEventTimestamp());if (next.hasWatermarkTime()) {ctx.emitWatermark(new Watermark(next.getWatermarkTime()));}}
}}
这种需要自己对data-source进行封装,管理起来可能比较麻烦。
2.2 使用Timestamp Assigners / Watermark Generators
这种方式也有两种,
- AssignerWithPeriodicWatermarks 按照一定的时间产生waterMark
- AssignerWithPunctuatedWatermarks 按照特定的事件来产生waterMark
2.2.1 AssignerWithPeriodicWatermarks
实现这个类需要实现两个方法
/*** This generator generates watermarks assuming that elements arrive out of order,* but only to a certain degree. The latest elements for a certain timestamp t will arrive* at most n milliseconds after the earliest elements for timestamp t.*/
//这个产生water-mark 对应的情况是相同event-time产生的多个event,
//最晚的到达的event只会比最早到达的晚几毫秒的情况
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;// 实现了给每个event抽取timestamp@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {long timestamp = element.getCreationTime();currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}// 实现了获取watermark@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
}/*** This generator generates watermarks that are lagging behind processing time by a fixed amount.* It assumes that elements arrive in Flink after a bounded delay.*/
// 这个water-mark允许元素对应当前系统时间有一定的延迟,
//感觉不好用,这种是用processing-time来度量延迟的
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current time minus the maximum time lagreturn new Watermark(System.currentTimeMillis() - maxTimeLag);}
}
在这里 对于每个kafka元素都会调用 extractTimestamp 方法来产生 timestamp
然后再固定的时间片段会调用 getCurrentWatermark ,就像定时任务一样
具体的调用周期可以使用
ExecutionConfig.setAutoWatermarkInterval(...)
进行设置,一般都是几十毫秒就行了。
2.2.2 AssignerWithPunctuatedWatermarks
这个是针对特定的元素触发water-mark
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;}
}
3. window的两种使用方式
这个实际上说的是具体的api的使用
- 在env初始化的时候设置TimeCharacteristic,后面使用创建window的时候直接使用time
- env不指定TimeCharacteristic,创建window的时候指定window的类型
使用这两个的前提是先要设置env的时间线类型
3.1 evn设置TimeCharacteristic
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).timeWindow(Time.seconds(10)).reduce( (a, b) -> a.add(b) ).addSink(...);
3.2 创建window的时候指定window的类型
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> input =env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<Integer> resultsPerKey = input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new Summer());
4. 使用kafka record 的timestamp作为event的timestamp
flink 对kafka consumer做了一些特殊的处理
当你在代码中进行了如下设置
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
对于每个kafka元素,在获取的时候会将kafka record的timestamp作为当前元素的timestamp
所以,如果你选择使用kafka record的timestamp作为 event-time的话,在实现AssignerWithPeriodicWatermarks 只需要重点关注产生watermark的方法就行了。
下面这个是一个使用kafka record的timestamp作为 event-time的AssignerWithPeriodicWatermarks样例
public class KafkaMark implements AssignerWithPeriodicWatermarks<ObjectNode> {private long waterMark;private long allowLate;@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(waterMark-allowLate);}/**在这里,previousElementTimestamp 是当前element 之前产生的timestamp,默认是0,但是当使用 kafak-consumer并且开启了event-time时间线的时候,这里就是kafka-record的时间,所以你就不要再进行解析了,直接使用即可*/@Overridepublic long extractTimestamp(ObjectNode element, long previousElementTimestamp) {this.waterMark=Math.max(waterMark,previousElementTimestamp);return previousElementTimestamp;}}
5. watermark的注意事项
5.1 . watermark要满足递增特性
对于任何一个AssignerWithPeriodicWatermarks ,extractTimestamp产生的是每一个元素具体的timestamp,但是getCurrentWatermark 产生的一系列watermark必须满足递增特性,因为window是根据watermark创建和触发计算以及销毁的。如果watermark不能满足递增特性的话,同一个时间段的window可能会被反复创建,导致数据统计失真。所以在实现AssignerWithPeriodicWatermarks 的extractTimestamp 的时候一定需要注意,要满足watermark的递增特性。
下面的代码保证了watermark是递增的(严格的说是非递减的)
@Overridepublic long extractTimestamp(ObjectNode element, long previousElementTimestamp) {this.waterMark=Math.max(waterMark,previousElementTimestamp);return previousElementTimestamp;}
5.2. 多个输入流需要特别注意
并行度的引入可能导致可能有些窗口无法被触发,需要注意,在union的时候,会引入多个并行度,然后window会取每个并行度的最小值来作为窗口的最小warter-mark,这样有可能会导致water-mark一直没有办法触发。
比如,统计用户最近30天发帖量,如果使用了两个kafka-topic,一个是init-topic(存放存量数据),一个是binlog-topic(存放增量数据),使用event-time作为时间线的话,有可能导致window无法触发,因为init-topic对应的数据是存量,没有增量数据,但是window中的watermark是取两个流当中的最小值作为他的watermark,这样的话会导致window无法触发计算。
6. 连续多个window的计算
watermark会在产生的地方持续往下游流过去,下游的多个window都会接收到这些watermark,会按照规则触发计算,当一个大于等于(end-timestamp – 1)的watermark到来的时候,会触发所有的endTime 小于等于end-timestamp的窗口
可以进行两个连续的窗口计算,从第一个窗口出来的元素的timestamp都是这个window的 endTime-1.
比如下面的样例
DataStream<Integer> input = ...;DataStream<Integer> resultsPerKey = input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(60),Time.seconds(6))).reduce(new Summer());DataStream<Integer> globalResults = resultsPerKey.windowAll(TumblingEventTimeWindows.of(Time.seconds(6))).process(new TopKWindowFunction());
上面的方式可以在第一个window window01中每隔6s计算一下过去60s的各个key的统计数据,然后再后面第二个window window02会拿到window01中每隔6s输出的数据,做一次计算,求出topk。