本文共 2772 字,大约阅读时间需要 9 分钟。
Flink入坑指南系列文章,从实际例子入手,一步步引导用户零基础入门实时计算/Flink,并成长为使用Flink的高阶用户。本文属个人原创,仅做技术交流之用,笔者才疏学浅,如有错误,欢迎指正。
什么是view(视图):
视图无非就是存储在数据库中并具有名字的 SQL 语句,或者说是以预定义的 SQL 查询的形式存在的数据表的成分。视图可以包含表中的所有列,或者仅包含选定的列。视图可以创建自一个或者多个表,这取决于创建该视图的 SQL 语句的写法。视图,一种虚拟的表,允许用户执行以下操作:(引自:)
Flink SQL兼容标准SQL,view的作用与标准SQL相同,有几个特点:
Flink SQL中,视图的语法非常简单,可参考:。接下来我们通过一些例子来实际感受一下视图的作用。
假设在IoT场景中,要过滤出两个厂房中的传感器的异常数据。两个厂房的数据分别发到了datahub的两个不同topic,需要将两个datahub topic中异常数据过滤出来,再汇总。原始数据结构如下:DDL -- 定义输入输出数据的数据结构,具体语法请参见 ,维表相关语法详见:
-- source1 定义厂房1的topic的数据结构create table fab1( `date` int, hour int, ip varchar, event_id BIGINT) with ( type='datahub', endPoint='xxxxxxxxx', project='xxxxxxxxxx', topic='topic1', accessId='xXXXXXXXX', accessKey='XXXXXXXXX'); -- source2 定义厂房2的topic的数据结构 create table fab2( `date` int, hour int, ip varchar, event_id BIGINT) with ( type='datahub', endPoint='xxxxxxxxx', project='xxxxxxxxxx', topic='topic2', accessId='xXXXXXXXX', accessKey='XXXXXXXXX'); -- 定义结果表1的数据结构 create table sink( `date` int, hour int, event_id bigint, event_cnt bigint ) with ( type='datahub', endPoint='xxxxxxxxx', project='xxxxxxxxxx', topic='topic2', accessId='xXXXXXXXX', accessKey='XXXXXXXXX'); -- 定义结果表2的数据结构 create table sink( `date` int, hour int, event_id bigint, event_cnt bigint ) with ( type='rds', url='xxxxxx', tableName='xxxxxx', userName='xxxxxx', password='xxxxxx'); -- 维表 CREATE TABLE device_whitelist ( ip varchar, category varchar, PRIMARY KEY (ip), -- 用作维表时,必须有声明的主键。 PERIOD FOR SYSTEM_TIME -- 定义维表的变化周期) with ( type = 'rds', ...)
写法一,按照批处理系统/数据库的思维来看,这个需求非常简单:
insert into sinkselect e.`ip`,e.`hour`,e.`date`,e.`event_id` from ( select * from fab1 where event_id='00001' union select * from fab2 where event_id='00001') eJOIN device_whitelist FOR SYSTEM_TIME AS OF PROCTIME() AS dON e.`ip` = d.`ip`
写法二,使用view,将各个复杂SQL模块拆开:
-- CREATE VIEW view1(`date`,`hour`,`ip`,`event_id`) ASSELECT * FROM fab1WHERE event_id='00001'UNION SELECT * FROM fab2WHERE event_id='00001'-- CREATE VIEW view2(`date`,`hour`,`ip`,`event_id`) ASSELECT e.`date`,e.`hour`,e.`ip`,e.`event_id` FROM view1 eJOIN device_whitelist FOR SYSTEM_TIME AS OF PROCTIME() AS dON e.`ip` = d.`ip`-- INSERT INTO sink1INSERT INTO sink1SELECT * FROM view2-- INSERT INTO sink2INSERT INTO sink2SELECT * FROM view1
Flink中SQL的数据是不断动态变化的,特别是涉及到一些特殊语法(如window级连/嵌套等),需要分步调试每个SQL模块的结果。如果用写法一,会大大增加SQL调试难度。因此,使用Flink SQL,建议使用第二种写法,用view将各个语法块串联,方便调试和排查问题。写法一和写法二最终生成的作业DAG图都是一样的,没有任何区别。一个Flink SQL作业可以同时定义多个输出表,结果可同时被输出到多种数据源中。
如果在使用实时计算产品过程中有任何问题,欢迎在博客下方回复交流。
转载地址:http://claxa.baihongyu.com/