Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

wangshumin/FlinkSQL

Open more actions menu
 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

一.背景

阿里工作的时候是使用Blink进行流数据处理和计算,通过编写sql实现Blink的计算job,开发简单高效,产品易用。 目前尝试实现Flink产品化,类似Blink。使用SQL为统一开发规范,SQL语言的好处是:声明式,易理解,稳定可靠,自动优化。 如果采用API开发的话,最大的问题是对于job调优依赖程序员经验,比较困难,同时API开发方式侵入性太强(数据安全,集群安全等),而sql可以自动调优,避免这种问题的产生。

二.实现思路:

用户输入sql(ddl,query,dml)  -> ddl对应为Flink的source和sink
                       
                       
                            -> query/dml的insert into数据处理和计算
                       
                       
--> 封装为对应Flink的Job:env.sqlQuery/env.sqlUpdate


--> JobGraph和对应job提交,ClusterClient.submitJob或者ClusterDescriptor.deployJobCluster

三.发布版本:

v3.0.0 2020年1月

     1.使用flink 1.10版本
           1.10之前的版本自带的sql解析功能不完善,如解析function,watermark等,所以比较鸡肋,还不如不用更换以前开发的解析层功能。
     2.使用新接口ClusterClient.submitJob提交job
     3.
     4.

新特性

      1. flink自带的sql解析
      2. 使用新的job提交接口
      2. 流批处理一体化实现
      3. 钉钉/微信告警通知        

v2.0.1
v2.0.0 2019年4月

       blink-client    接口定义
       blink-sql/calcite   stream和batch table的sql解析
       blink-libraries   自定义source, sink, side开发
       blink-batch   BatchTableSource和BatchTableSink
       blink-stream  StreamTableSource和StreamTableSink
       blink-job   batch/stream job 提交

v2.0.1新特性
v2.0.0新特性

    1. 抽取sql层被流和批使用,SQL参考flink issues和对应提供的doc
    2. 增加批处理开发
    3. 增加维表功能
    4. 升级flink版本为1.7.x

v1.0.0 2018年7月

      blink-client 接口定义
      blink-sqlserver  stream table的sql解析
      blink-job  封装为stream job    

新特性

      1. 实现create function
      2. 实现sql开发流处理程序任务  
      3. 更改源码实现sql CEP     

四.样例

v3.0.0 sql开发流任务示例:

v2.0.1 sql开发流任务示例:

batch sql示例:

CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';

CREATE TABLE csv_source (
    id int, 
    name varchar, 
    `date` date , 
    age int
) 
with (
    type=source,
    connect.type=json,
    'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json'
);

CREATE TABLE csv_sink (
    `date` date, 
    age int, 
    PRIMARY KEY (`date`)
) 
with (
    type=sink,
    connect.type=csv,
    'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo_out.csv'
);

create view view_select as  
    SELECT `date`, age FROM csv_source group by `date`,age
;

INSERT INTO csv_sink 
    SELECT `date`, sum(age) FROM view_select group by `date`
;

stream sql 示例:

CREATE FUNCTION demouf AS 
      'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';
      
CREATE TABLE kafka_source (
      `date` varchar,
      amount float, 
      proctime timestamp
      ) 
with (
      type=source,
      'connect.type'=kafka,
      'flink.parallelism'=1,
      'kafka.topic'=topic,
      'kafka.group.id'=flinks,
      'kafka.enable.auto.commit'=true,
      'kafka.bootstrap.servers'='localhost:9092'
);

CREATE TABLE mysql_sink (
      `date` varchar, 
      total_amount float, 
      PRIMARY KEY (`date`)
      ) 
with (
      type=mysql,
      'connect.type'=mysql,
      'mysql.connection'='localhost:3306',
      'mysql.db.name'=flink,
      'mysql.batch.size'=10,
      'mysql.table.name'=flink_table,
      'mysql.user'=root,
      'mysql.pass'=root
);

CREATE VIEW view_select AS 
      SELECT `date`, 
              amount 
      FROM kafka_source 
      GROUP BY 
            `date`,
            amount
      ;

INSERT INTO mysql_sink 
       SELECT 
          `date`, 
          sum(amount) as total_amount
       FROM view_select 
       GROUP BY 
          `date`
      ;

v2.0.0 sql开发流任务示例:

batch sql示例:

CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';

CREATE SOURCE TABLE json_source (
      id int, 
      name varchar, 
      `date` date , 
      age int
) 
with (
      type=json,
      'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json'
);

CREATE SINK TABLE csv_sink (
      `date` date, 
      total_age int
) 
with (
      type=csv,
      'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo_out.csv'
);

CREATE VIEW view_select as  
      SELECT `date`, age 
      FROM json_source 
      GROUP BY `date`,age;
  
INSERT INTO csv_sink 
      SELECT `date`, sum(age) as total_age
      FROM view_select 
      GROUP BY `date`;

stream sql 示例:

CREATE FUNCTION demouf AS 
      'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';
      
CREATE SOURCE TABLE kafka_source (
      `date` varchar,
      amount float, 
      proctime timestamp
      ) 
with (
      type=kafka,
      'flink.parallelism'=1,
      'kafka.topic'=topic,
      'kafka.group.id'=flinks,
      'kafka.enable.auto.commit'=true,
      'kafka.bootstrap.servers'='localhost:9092'
);

CREATE SINK TABLE mysql_sink (
      `date` varchar, 
      total_amount float, 
      PRIMARY KEY (`date`)
      ) 
with (
      type=mysql,
      'mysql.connection'='localhost:3306',
      'mysql.db.name'=flink,
      'mysql.batch.size'=10,
      'mysql.table.name'=flink_table,
      'mysql.user'=root,
      'mysql.pass'=root
);

CREATE VIEW view_select AS 
      SELECT `date`, 
              amount 
      FROM kafka_source 
      GROUP BY 
            `date`,
            amount
      ;


INSERT INTO mysql_sink 
       SELECT 
          `date`, 
          sum(amount) as total_amount
       FROM view_select 
       GROUP BY 
          `date`
      ;

v1.0.0 sql开发流任务示例:

CREATE FUNCTION demouf AS 
      'ambition.api.sql.function.DemoUDF' 
USING JAR 'hdfs://flink/udf/jedis.jar',
      JAR 'hdfs://flink/udf/customudf.jar';
      
      
CREATE TABLE kafka_source (
      `date` string,
      amount float, 
      proctime timestamp
      ) 
with (
      type=kafka,
      'flink.parallelism'=1,
      'kafka.topic'=topic,
      'kafka.group.id'=flinks,
      'kafka.enable.auto.commit'=true,
      'kafka.bootstrap.servers'='localhost:9092'
);
CREATE TABLE mysql_sink (
      `date` string, 
      amount float, 
      PRIMARY KEY (`date`,amount)
      ) 
with (
      type=mysql,
      'mysql.connection'='localhost:3306',
      'mysql.db.name'=flink,
      'mysql.batch.size'=0,
      'mysql.table.name'=flink_table,
      'mysql.user'=root,
      'mysql.pass'=root
);
CREATE VIEW view_select AS 
      SELECT `date`, 
              amount 
      FROM kafka_source 
      GROUP BY 
            `date`,
            amount
      ;
INSERT INTO mysql_sink 
       SELECT 
          `date`, 
          sum(amount) 
       FROM view_select 
       GROUP BY 
          `date`
      ;

五.代码关注

apache flink

apache calcite

uber AthenaX

DTStack flinkStreamSQL

About

仿照阿里blink使用sql开发flink的实时程序

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

  • Java 100.0%
Morty Proxy This is a proxified and sanitized view of the page, visit original site.