Flink 并行流中 watermark 机制无法触发窗口计算的原因分析
Kafka Source 接收并处理来自 Kafka 的行流析点击数据(指定事件时间),开一个滚动窗口(Tumble Windows) 每 10 秒统计一次 pv 并将结果输出到 Print Sink 中。机制计算
CREATE TABLE sourceTable (
message STRING,无法
time_ltz AS TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(JSON_VALUE(message, $.request_body),$.clickTime) AS INTEGER),0),
WATERMARK FOR time_ltz AS time_ltz - INTERVAL 3 SECOND
) WITH (
connector = kafka,
topic = matrix_json_click_log_test,
properties.bootstrap.servers = xxxxxxxxx:9527,
properties.group.id = flinkTestGroup,
scan.startup.mode = latest-offset,
format = json
);
CREATE TABLE sinkTable (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
click_pv BIGINT
)
WITH (
connector = print
);
INSERT INTO sinkTable(
window_start,
window_end,
click_pv
)
select window_start,window_end,COUNT(*)
FROM TABLE (
TUMBLE( TABLE sourceTable, DESCRIPTOR(time_ltz) , INTERVAL 10 SECOND))
GROUP BY window_start, window_end;运行以上的 FlinkSQL 后观察发现数据流正常,但是触发窗口一直没能触发窗口计算,没有结果输出。因分
watermark 是行流析用于处理乱序事件的。流处理从事件产生,机制计算到流经source,无法再到operator,触发窗口中间是因分有一个过程和时间的。虽然大部分情况下流到 operator 的行流析数据都是源码下载按照事件产生的时间顺序来的,但是机制计算也不排除由于网络等原因导致部分数据延迟到达,产生乱序。无法对于迟到的触发窗口数据我们又不能无限期地等待下去,因此需要有个衡量事件时间进度的因分机制来保证一个特定的时间后必须触发 window 进行计算,这个特别的机制就是 watermark。
并行流中的 watermark在 多并行度的情况下,source 的云服务器每个 sub task 通常独立生成水印。watermark 通过 operator 时会推进 operators 处的当前 event time,同时 operators 会为下游生成一个新的 watermark。多并行度的情况下 watermark对齐会取所有 channel 最小的 watermark。
并行流中的 watermark
原因分析由于目标 Topic 只有一个分区而 source 并行度设置为 2 ,这导致了只有一个线程可以处理该分区而另一个线程获取不到数据,因此一直没能获取最小的 watermark。最终导致一直无法触发窗口计算。
解决方案可通过手动设置并行度来解决,保证 source 并行度 <= 目标 Topic Partition 分区数。这里将 source 并行度设置为 1 之后便可正常输出结果。
并行度设置为 1
正常输出
高防服务器