1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
本文是Flink table api 与 datastream api的集成的第一篇,主要介绍了集成的概述、table api 与 datastream api相互转换的三个示例以及其集成的说明(即maven依赖、import、配置以及执行行为),并以具体的示例进行说明。
本文依赖flink、kafka集群能正常使用。
本文分为3个部分,即集成概述、三个入门示例、集成说明。
本文的示例是在Flink 1.17版本中运行。
在定义数据处理管道时,Table API和DataStream API同样重要。
DataStream API在一个相对低级的命令式编程API中提供流处理的原语(即时间、状态和数据流管理)。Table API抽象了许多内部构件,并提供了结构化和声明性API。
这两个API都可以处理有界和无界流。
在处理历史数据时,需要管理有界流。无界流发生在实时处理场景中,这些场景可能先使用历史数据进行初始化。
为了有效执行,这两个API都以优化的批处理执行模式提供处理有界流。然而,由于批处理只是流的一种特殊情况,因此也可以在常规流执行模式下运行有界流的管道。
一个API中的管道可以端到端定义,而不依赖于另一个API。然而,出于各种原因,混合这两种API可能是有用的:
在DataStream API中实现主管道(main pipeline)之前,使用表生态系统(table ecosystem)轻松访问目录(catalogs )或连接到外部系统。在DataStream API中实现主管道之前,访问一些SQL函数以进行无状态数据规范化和清理。如果table API中不存在更低级的操作(例如自定义计时器处理),则不时切换到DataStream API。Flink提供了特殊的桥接功能,以使与DataStream API的集成尽可能顺利。
在DataStream 和Table API之间切换会增加一些转换开销。例如,部分处理二进制数据的表运行时(即RowData)的内部数据结构需要转换为更用户友好的数据结构(即Row)。通常,这个开销可以忽略。
maven依赖本篇文章,如果没有特殊说明,将使用如下maven依赖
Flink提供了专门的StreamTableEnvironment,用于与DataStream API集成。这些环境使用其他方法扩展常规TableEnvironment,并将DataStream API中使用的StreamExecutionEnvironments作为参数。
1)、示例1 - toDataStream下面的代码展示了如何在两个API之间来回切换的示例。表的列名和类型自动从DataStream的TypeInformation派生。由于DataStream API本机不支持变更日志处理,因此代码假设在流到表和表到流转换期间仅附加/仅插入语义。
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * @author alanchan * */ public class ConvertingDataStreamAndTableDemo { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 2、创建输入流 DataStreamfromDataStream和toDataStream的完整语义可以在下面的部分中找到。特别是,本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它还包括使用事件时间和水印。
根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将表转换为数据流时产生仅插入的更改,而且还产生收回和其他类型的更新。在表到流转换期间,这可能会导致类似于以下内容的异常
Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].在这种情况下,需要再次修改查询或切换到ChangelogStream。
2)、示例2 - toChangelogStream下面的示例显示如何转换更新表。
每个结果行表示更改日志中的一个条目,该条目具有更改标志,可以通过对其调用row.getKind()来查询。在本例中,alan的第二个分数在更改之前(-U)创建更新,在更改之后(+U)创建更新。
本示例仅仅以一个方法来展示,避免没有必要的代码,运行框架参考上述示例。
public static void test2() throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 2、创建输入流 DataStreamfromChangelogStream和toChangelogStream的完整语义可以在下面的部分中找到。特别是,本节讨论了如何使用更复杂的嵌套类型来影响模式派生。它包括使用事件时间和水印。它讨论了如何为输入和输出流声明主键和变更日志模式。
上面的示例显示了如何通过为每个传入记录连续发出逐行更新来增量计算最终结果。然而,在输入流有限(即有界)的情况下,通过利用批处理原理可以更有效地计算结果。
在批处理中,可以在连续的阶段中执行运算符,这些阶段在发出结果之前使用整个输入表。例如,连接操作符可以在执行实际连接之前对两个有界输入进行排序(即排序合并连接算法),或者在使用另一个输入之前从一个输入构建哈希表(即哈希连接算法的构建/探测阶段)。
DataStream API和Table API都提供专门的批处理运行时模式。
3)、示例3 - 通过仅切换标志来处理批处理和流数据下面的示例说明了统一管道能够通过仅切换标志来处理批处理和流数据。
public static void test3() throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 2、创建输入流 DataStream注意比较和示例2的输出区别
+I[alanchan, 19] +I[alan, 38] +I[alanchanchn, 20]一旦将changelog 应用于外部系统(例如键值存储),可以看到两种模式都能够产生完全相同的输出表。通过在发出结果之前使用所有输入数据,批处理模式的更改日志仅由仅插入的更改组成。有关更多细节,请参阅下面的专用批处理模式部分。
3、集成说明将Table API与DataStream API相结合的项目需要添加以下桥接模块之一。
它们包括对 flink-table-api-java或flink-table-api-scala的可传递依赖性,以及相应的特定于语言的DataStream api模块。
使用DataStream API和Table API的Java或Scala版本声明公共管道需要以下导入。
// imports for Java DataStream API import org.apache.flink.streaming.api.*; import org.apache.flink.streaming.api.environment.*; // imports for Table API with bridging to Java DataStream API import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.*; 3)、ConfigurationTableEnvironment将采用传递的StreamExecutionEnvironment.中的所有配置选项。然而,不能保证对StreamExecutionEnvironment配置的进一步更改在实例化后传播到StreamTableEnvironment。在规划期间,将选项从Table API传播到DataStream API。
我们建议在切换到Table API之前尽早在DataStream API中设置所有配置选项。
import java.time.ZoneId; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; // create Java DataStream API StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set various configuration early env.setMaxParallelism(256); env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // then switch to Java Table API StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // set configuration early tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin")); // start defining your pipelines in both APIs... 4)、执行行为这两个API都提供了执行管道的方法。换句话说:如果被请求,它们将编译一个作业图( job graph),该作业图将提交到集群并触发以执行。结果将流式传输到声明的sinks。
通常,这两个API都在方法名称中使用术语“执行”来标记这种行为。然而,Table API和DataStream API之间的执行行为略有不同。
1、DataStream APIDataStream API的StreamExecutionEnvironment使用生成器模式(builder pattern)来构造复杂的管道。管道可能会拆分为多个分支,这些分支可能以sink结尾,也可能不以sink结尾。环境缓冲(environment buffers)所有这些定义的分支,直到提交作业。
StreamExecutionEnvironment.execute()提交整个构建的管道,然后清除构建器。换句话说:不再声明sources 和sinks ,并且可以向生成器中添加新的管道。因此,每个DataStream程序通常以对StreamExecutionEnvironment.execute()的调用结束。或者,DataStream.executeAndCollect()隐式定义了一个sink,用于将结果流式传输到本地客户端。
2、Table API在Table API中,分支管道仅在StatementSet中受支持,其中每个分支必须声明一个最终sink。TableEnvironment和StreamTableEnvironment都不提供专用的通用execute()方法。相反,它们提供了提交单个source-to-sink管道或语句集的方法:
final static String sinkSQL ="CREATE TABLE OutputTable (\n"+"userId INT,\r\n"+"age INT,\r\n"+"balance DOUBLE,\r\n"+"userName STRING,\r\n"+"t_insert_time TIMESTAMP(3)\r\n"+") WITH (\n"+"'connector' = 'print'\n"+")"; final static String sinkSQL2 ="CREATE TABLE OutputTable2 (\n"+"userId INT,\r\n"+"age INT,\r\n"+"balance DOUBLE,\r\n"+"userName STRING,\r\n"+"t_insert_time TIMESTAMP(3)\r\n"+") WITH (\n"+"'connector' = 'print'\n"+")"; final static String sourceSQL ="CREATE TABLE InputTable (\r\n"+"userId INT,\r\n"+"age INT,\r\n"+"balance DOUBLE,\r\n"+"userName STRING,\r\n"+"t_insert_time AS localtimestamp,\r\n"+"WATERMARK FOR t_insert_time AS t_insert_time\r\n"+") WITH (\r\n"+"'connector' = 'datagen',\r\n"+"'rows-per-second'='10',\r\n"+"'fields.userId.kind'='sequence',\r\n"+"'fields.userId.start'='1',\r\n"+"'fields.userId.end'='20',\r\n"+"'fields.balance.kind'='random',\r\n"+"'fields.balance.min'='1',\r\n"+"'fields.balance.max'='100',\r\n"+"'fields.age.min'='1',\r\n"+"'fields.age.max'='100',\r\n"+"'fields.userName.length'='6'\r\n"+");"; public static void test4() throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); //sinkSQL //sourceSQL // 建表 tenv.executeSql(sourceSQL); // tenv.executeSql(sinkSQL); tenv.executeSql(sinkSQL2); //插入表数据,方式一 tenv.from("InputTable").insertInto("OutputTable").execute(); tenv.executeSql("select * from OutputTable"); tenv.from("InputTable").execute().print(); //插入表数据,方式二 tenv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable"); tenv.executeSql("select * from OutputTable"); //插入表数据,方式三 tenv.createStatementSet() .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable") .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable") .execute(); // 输出 tenv.from("InputTable").execute().print(); tenv.executeSql("SELECT * FROM InputTable").print(); env.execute(); } 输出结果 3> +I[3, 99, 36.20987556045243, d23888, 2023-11-13T14:49:58.812] 15> +I[15, 39, 68.30743253178122, 43bec8, 2023-11-13T14:49:58.812] 2> +I[2, 62, 47.280395949976885, 7bae4e, 2023-11-13T14:49:58.812] 16> +I[16, 52, 42.10205629532836, 6baf0e, 2023-11-13T14:49:58.812] 10> +I[10, 25, 58.008035887440094, d43dea, 2023-11-13T14:49:58.812] 13> +I[13, 36, 70.9215559827798, 01bb28, 2023-11-13T14:49:58.812] 12> +I[12, 38, 30.31004698340413, 322ba8, 2023-11-13T14:49:58.812] 6> +I[6, 17, 32.28909358733212, 13bf88, 2023-11-13T14:49:58.812] 9> +I[9, 49, 44.52802246768357, e8280c, 2023-11-13T14:49:58.812] 8> +I[8, 80, 18.03487847824154, 803b2a, 2023-11-13T14:49:58.812] 5> +I[5, 61, 54.43695775227862, 063f08, 2023-11-13T14:49:58.812] 7> +I[7, 64, 33.886576642098404, 443dea, 2023-11-13T14:49:58.812] 14> +I[14, 92, 63.71527772015468, 123848, 2023-11-13T14:49:58.812] 11> +I[11, 22, 30.745102844313315, e62848, 2023-11-13T14:49:58.812] 4> +I[4, 78, 88.60724929598506, 55bca8, 2023-11-13T14:49:58.812] 1> +I[1, 82, 62.50149215989057, 0bba0c, 2023-11-13T14:49:58.812] 3> +I[19, 67, 14.244993215937432, e6c911, 2023-11-13T14:49:59.806] 1> +I[17, 67, 91.05078612782468, 560b6c, 2023-11-13T14:49:59.807] 4> +I[20, 95, 82.12047947156385, 1ac5b2, 2023-11-13T14:49:59.807] 2> +I[18, 81, 25.384055001988084, fe98d1, 2023-11-13T14:49:59.806] +----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+ | op | userId | age | balance | userName | t_insert_time | +----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+ | +I | 1 | 91 | 22.629318048042723 | 923e08 | 2023-11-13 14:49:59.800 | | +I | 2 | 67 | 75.26915785038814 | 342baa | 2023-11-13 14:49:59.803 | | +I | 3 | 68 | 74.06076023217011 | 1dbbce | 2023-11-13 14:49:59.803 | | +I | 4 | 26 | 79.47471729272772 | 083e2e | 2023-11-13 14:49:59.802 | | +I | 5 | 97 | 82.56249330491859 | 4a3c6e | 2023-11-13 14:49:59.804 | | +I | 6 | 32 | 81.74903214944425 | fdac4e | 2023-11-13 14:49:59.800 | | +I | 7 | 67 | 94.80154136831771 | f7acea | 2023-11-13 14:49:59.800 | | +I | 8 | 53 | 50.85073238739004 | cfbd0c | 2023-11-13 14:49:59.800 | | +I | 9 | 69 | 93.64054547476522 | 7fa9ec | 2023-11-13 14:49:59.801 | | +I | 10 | 66 | 61.92366658766452 | 05b86a | 2023-11-13 14:49:59.803 | | +I | 11 | 81 | 95.61717698776191 | efa8ce | 2023-11-13 14:49:59.797 | | +I | 12 | 8 | 63.573174957723076 | 0fbfec | 2023-11-13 14:49:59.802 | | +I | 13 | 85 | 52.938510850778734 | 43bfa8 | 2023-11-13 14:49:59.803 | | +I | 14 | 26 | 5.130287258770441 | 083c6c | 2023-11-13 14:49:59.797 | | +I | 15 | 35 | 73.3318749510538 | 0e3b4c | 2023-11-13 14:49:59.802 | | +I | 16 | 84 | 16.24326410122912 | ac2d6e | 2023-11-13 14:49:59.802 | | +I | 18 | 41 | 32.38455189801736 | b07afb | 2023-11-13 14:50:00.804 | | +I | 19 | 24 | 77.6947569111452 | 7f72ac | 2023-11-13 14:50:00.803 | | +I | 20 | 92 | 82.53929937026987 | 051fb9 | 2023-11-13 14:50:00.802 | | +I | 17 | 93 | 12.784194121509948 | bce5d9 | 2023-11-13 14:50:00.801 | +----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+ 20 rows in set为了组合这两种执行行为,对StreamTableEnvironment.toDataStream或StreamTableEnviron.toChangelogStream的每次调用都将具体化(materialize )(即编译)Table API子管道(sub-pipeline),并将其插入DataStream API管道生成器(builder)中。这意味着之后必须调用StreamExecutionEnvironment.execute()或DataStream.executeAndCollect。Table API中的执行不会触发这些“外部部件(external parts)”。
// adds a branch with a printing sink to the StreamExecutionEnvironment tableEnv.toDataStream(table).print(); // (2) // executes a Table API end-to-end pipeline as a Flink job and prints locally, // thus (1) has still not been executed table.execute().print(); // executes the DataStream API pipeline with the sink defined in (1) as a // Flink job, (2) was already running before env.execute();上述示例中有具体应用。
以上,本文是Flink table api 与 datastream api的集成的第一篇,主要介绍了集成的概述、table api 与 datastream api相互转换的三个示例以及其集成的说明(即maven依赖、import、配置以及执行行为),并以具体的示例进行说明。