本文共 1785 字,大约阅读时间需要 5 分钟。
最近一直都在搞 Flink 相关的开发,清晰的代码结构有利于开发规范的统一和业务逻辑的梳理。
Scala 代码结构,官网推荐的如下,
// create a TableEnvironment for specific planner batch or streamingval tableEnv = ... // see "Create a TableEnvironment" section// create an input TabletableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )")// register an output TabletableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )")// create a Table from a Table API queryval table2 = tableEnv.from("table1").select(...)// create a Table from a SQL queryval table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ...")// emit a Table API result Table to a TableSink, same for SQL resultval tableResult = table2.executeInsert("outputTable")tableResult...
但是,实践下来,感觉不是很实用,我们就自己微调了下,还是喜欢下面的代码结构体。
1. 创建表执行环境(table env)
2. 创建输入表,读取数据流,处理为数据流,转化为输入表。
3. 创建输出表,将输入表转化为输出表。
4. 任务执行。
def main(args: Array[String]): Unit = { // 1. create table env (创建表执行环境) val env = StreamExecutionEnvironment.getExecutionEnvironment val settings = EnvironmentSettings .newInstance() .useOldPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(env, settings) // 2. create inputTable (inputStream => dataStream => inputTable)(读取输入流,处理数为数据流,转化为输入表) // 2.1 inputStream => dataStream val inputStream: DataStream[String] = env.readTextFile(filePath) val dataStream: DataStream[(String, Int)] = inputStream.flatMap(_.split(" ")).map((_, 1)) // 2.2 dataStream => inputTable val inputTable = tableEnv.fromDataStream[(String, Int)](dataStream, $"word", $"count") .groupBy($"word") .select($"word", $"count".sum) // 3. create outputTable (inputTable => outputTable)(将输入表转化为输出表) // 4. env exec (任务执行) env.execute("TableApiTemplate Job") }
转载地址:http://fulji.baihongyu.com/