Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

fulbee/FlinkSQL

Open more actions menu
 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

51 Commits
51 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

阿里工作的时候是使用Blink进行流数据处理和计算,通过编写sql实现Blink的计算job,开发简单高效,产品易用。目前想尝试实现Flink产品化,类似Blink这种产品。

1.使用SQL为统一开发规范,SQL语言的好处是:声明式,易理解,稳定可靠,自动优化。如果采用API开发的话,最大的问题是对于job调优依赖程序员经验,比较困难,同时API开发方式侵入性太强(数据安全,集群安装等)。而sql可以自动调优,避免这两个问题的产生。

2.目前实现思路:

用户输入sql(ddl,dml,query)  -> ddl对应为Flink的source和sink


                            -> dml的insert into对应将对应数据加载到 sink 
                       
                       
                            -> query数据处理和计算
                       
                       
--> 封装为api对应Flink的Job:env.sqlQuery/env.sqlUpdate;table.writeToSink;


--> JobGraph和对应job提交; 

二.开发过程

1.SqlConvertService是将用户sql语句解析为不同类型,source,sink,view,dml(目前只支持insert into)

SqlParserImpl SQL的解析

Validator 验证器

Planner 计划解析

2.FlinkJobImpl是实现Flink的Source和Sink,以及JobGraph

3.JobGraph的提交和执行,如StandaloneClusterClient.submitJob或者YarnClusterClient.runDetached

三.代码关注

apache Flink sql

apache calcite

apache hive

四.样例

CREATE FUNCTION demouf AS 
      'pingle.wang.api.sql.function.DemoUDF' 
USING JAR 'hdfs://flink/udf/jedis.jar',
      JAR 'hdfs://flink/udf/customudf.jar';
      
      
CREATE TABLE kafak_source (
      `date` string,
      amount float, 
      proctime timestamp
      ) 
with (
      type=kafka,
      'flink.parallelism'=1,
      'kafka.topic'=topic,
      'kafka.group.id'=flinks,
      'kafka.enable.auto.commit'=true,
      'kafka.bootstrap.servers'='localhost:9092'
);


CREATE TABLE mysql_sink (
      `date` string, 
      amount float, 
      PRIMARY KEY (`date`,amount)
      ) 
with (
      type=mysql,
      'mysql.connection'='localhost:3306',
      'mysql.db.name'=flink,
      'mysql.batch.size'=0,
      'mysql.table.name'=flink_table,
      'mysql.user'=root,
      'mysql.pass'=root
);


CREATE VIEW view_select AS 
      SELECT `date`, 
              amount 
      FROM kafak_source 
      GROUP BY 
            `date`,
            amount
      ;


INSERT INTO mysql_sink 
       SELECT 
          `date`, 
          sum(amount) 
       FROM view_select 
       GROUP BY 
          `date`
      ;

About

基于Flink的产品化,类似阿里Blink,使用SQL语言开发流计算任务

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

  • Java 70.5%
  • Scala 29.5%
Morty Proxy This is a proxified and sanitized view of the page, visit original site.