博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink SQL 开发的代码结构
阅读量:4070 次
发布时间:2019-05-25

本文共 1785 字,大约阅读时间需要 5 分钟。

最近一直都在搞 Flink 相关的开发,清晰的代码结构有利于开发规范的统一和业务逻辑的梳理。

Scala 代码结构,官网推荐的如下,

// create a TableEnvironment for specific planner batch or streamingval tableEnv = ... // see "Create a TableEnvironment" section// create an input TabletableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )")// register an output TabletableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )")// create a Table from a Table API queryval table2 = tableEnv.from("table1").select(...)// create a Table from a SQL queryval table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ...")// emit a Table API result Table to a TableSink, same for SQL resultval tableResult = table2.executeInsert("outputTable")tableResult...

但是,实践下来,感觉不是很实用,我们就自己微调了下,还是喜欢下面的代码结构体。

1. 创建表执行环境(table env)

2. 创建输入表,读取数据流,处理为数据流,转化为输入表。

3. 创建输出表,将输入表转化为输出表。

4. 任务执行。

def main(args: Array[String]): Unit = {    // 1. create table env (创建表执行环境)    val env = StreamExecutionEnvironment.getExecutionEnvironment    val settings = EnvironmentSettings      .newInstance()      .useOldPlanner()      .inStreamingMode()      .build()    val tableEnv = StreamTableEnvironment.create(env, settings)    // 2. create inputTable (inputStream => dataStream => inputTable)(读取输入流,处理数为数据流,转化为输入表)    // 2.1 inputStream => dataStream    val inputStream: DataStream[String] = env.readTextFile(filePath)    val dataStream: DataStream[(String, Int)] = inputStream.flatMap(_.split(" ")).map((_, 1))    // 2.2 dataStream => inputTable    val inputTable = tableEnv.fromDataStream[(String, Int)](dataStream, $"word", $"count")      .groupBy($"word")      .select($"word", $"count".sum)    // 3. create outputTable (inputTable => outputTable)(将输入表转化为输出表)    // 4. env exec (任务执行)    env.execute("TableApiTemplate Job")  }

 

转载地址:http://fulji.baihongyu.com/

你可能感兴趣的文章
项目导入时报错:The import javax.servlet.http.HttpServletRequest cannot be resolved
查看>>
linux对于没有写权限的文件如何保存退出vim
查看>>
Windows下安装ElasticSearch6.3.1以及ElasticSearch6.3.1的Head插件
查看>>
IntelliJ IDEA 下的svn配置及使用的非常详细的图文总结
查看>>
【IntelliJ IDEA】idea导入项目只显示项目中的文件,不显示项目结构
查看>>
ssh 如何方便的切换到其他节点??
查看>>
JSP中文乱码总结
查看>>
Java-IO-File类
查看>>
Java-IO-java的IO流
查看>>
Java-IO-输入/输出流体系
查看>>
Java实现DES加密解密
查看>>
HTML基础
查看>>
Java IO
查看>>
Java NIO
查看>>
Java大数据:Hbase分布式存储入门
查看>>
Java大数据:全文搜索引擎Elasticsearch入门
查看>>
大数据学习:Hadoop入门学习书单
查看>>
大数据学习:Spark SQL入门简介
查看>>
大数据学习:Spark RDD操作入门
查看>>
大数据框架:Spark 生态实时流计算
查看>>