- 例 1:一个新引入的 CUMULATE 窗口函数,它可以支持按特定步长扩展的窗口,直到达到最大窗口大小:
SELECT window_time, window_start, window_end, SUM(price) AS total_price
FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, window_time;
- 例 2:用户在 table-valued 窗口函数中可以访问窗口的起始和终止时间,从而使用户可以实现新的功能。例如,除了常规的基于窗口的聚合和 Join 之外,用户现在也可以实现基于窗口的 Top-K 聚合:
SELECT window_time, ...
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC)
as rank
FROM t
) WHERE rank <= 100;
提高 DataStream API 与 Table API / SQL 的互操作能力
这一版本极大的简化了 DataStream API 与 Table API 混合的程序。
Table API 是一种非常方便的应用开发接口,因为这经支持表达式的程序编写并提供了大量的内置函数。但是有时候用户也需要切换回 DataStream,例如当用户存在表达能力、灵活性或者 State 访问的需求时。
Flink 新引入的 StreamTableEnvironment.toDataStream()/.fromDataStream() 可以将一个 DataStream API 声明的 Source 或者 Sink 当作 Table 的 Source 或者 Sink 来使用。主要的优化包括:
- DataStream 与 Table API 类型系统的自动转换。
- Event Time 配置的无缝集成,Watermark 行为的高度一致性。
- Row 类型(即 Table API 中数据的表示)有了极大的增强,包括 toString() / hashCode() 和 equals() 方法的优化,按名称访问字段值的支持与稀疏表示的支持。
Table table = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
DataStream<Row> dataStream = tableEnv.toDataStream(table)
.keyBy(r -> r.getField("user"))
.window(...);
SQL Client: 初始化脚本和语句集合 (Statement Sets)
SQL Client 是一种直接运行和部署 SQL 流或批作业的简便方式,用户不需要编写代码就可以从命令行调用 SQL,或者作为 CI / CD 流程的一部分。
这个版本极大的提高了 SQL Client 的功能。现在基于所有通过 Java 编程(即通过编程的方式调用 TableEnvironment 来发起查询)可以支持的语法,现在 SQL Client 和 SQL 脚本都可以支持。这意味着 SQL 用户不再需要添加胶水代码来部署他们的SQL作业。
配置简化和代码共享
Flink 后续将不再支持通过 Yaml 的方式来配置 SQL Client(注:目前还在支持,但是已经被标记为废弃)。作为替代,SQL Client 现在支持使用一个初始化脚本在主 SQL 脚本执行前来配置环境。