这些初始化脚本通常可以在不同团队/部署之间共享。它可以用来加载常用的 catalog,应用通用的配置或者定义标准的视图。
./sql-client.sh -i init1.sql init2.sql -f sqljob.sql
更多的配置项
通过增加配置项,优化 SET / RESET 命令,用户可以更方便的在 SQL Client 和 SQL 脚本内部来控制执行的流程。
通过语句集合来支持多查询
多查询允许用户在一个 Flink 作业中执行多个 SQL 查询(或者语句)。这对于长期运行的流式 SQL 查询非常有用。
语句集可以用来将一组查询合并为一组同时执行。
以下是一个可以通过 SQL Client 来执行的 SQL 脚本的例子。它初始化和配置了执行多查询的环境。这一脚本包括了所有的查询和所有的环境初始化和配置的工作,从而使它可以作为一个自包含的部署组件。
-- set up a catalog
CREATE CATALOG hive_catalog WITH ('type' = 'hive');
USE CATALOG hive_catalog;
-- or use temporary objects
CREATE TEMPORARY TABLE clicks (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = '...',
'format' = 'avro'
);
-- set the execution mode for jobs
SET execution.runtime-mode=streaming;
-- set the sync/async mode for INSERT INTOs
SET table.dml-sync=false;
-- set the job's parallelism
SET parallism.default=10;
-- set the job name
SET pipeline.name = my_flink_job;
-- restore state from the specific savepoint path
SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab;
BEGIN STATEMENT SET;
INSERT INTO pageview_pv_sink
SELECT page_id, count(1) FROM clicks GROUP BY page_id;
INSERT INTO pageview_uv_sink
SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;
Hive 查询语法兼容性
用户现在在 Flink 上也可以使用 Hive SQL 语法。除了 Hive DDL 方言之外,Flink现在也支持常用的 Hive DML 和 DQL 方言。
为了使用 Hive SQL 方言,需要设置 table.sql-dialect 为 hive 并且加载 HiveModule。后者非常重要,因为必须要加载 Hive 的内置函数后才能正确实现对 Hive 语法和语义的兼容性。例子如下:
CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog
USE CATALOG myhive;
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect = hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries
需要注意的是, Hive 方言中不再支持 Flink 语法的 DML 和 DQL 语句。如果要使用 Flink 语法,需要切换回 default 的方言配置。
优化的 SQL 时间函数