-
noWatermarks
public class FlinkWaterMark throws Exception {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
WatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();
SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);
env.execute();
}
}
关于noWaterMarks()
的使用没有太多内容.
-
forMonotonousTimestamps
public class FlinkWaterMark throws Exception {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
String time = element.split(",")[0];
long timestamp = Long.parseLong(time);
return timestamp;
}
});
SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);
env.execute();
}
}
对于forMonotonousTimestamps()
可说内容并不多,如果选择了forMonotonousTimestamps
这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
通过源码内容可以看到forMonotonousTimestamps
底层也是使用的forBoundedOutOfOrderness
方式,只不过将容错时间设置为了0
,源码如下:
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
-
forBoundedOutOfOrderness
public class FlinkWaterMark throws Exception {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);
WatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
String time = element.split(",")[0];
long timestamp = Long.parseLong(time);
return timestamp;
}
});
SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);
env.execute();
}
}
对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为2S
,那么前后的数据差最大只能是2S
,如果差值大于2S
,后来的这条数据就会被抛弃.