本文共 1733 字,大约阅读时间需要 5 分钟。
写代码过程中,IDE 的代码提示功能是程序员的最爱,但是在用 Scala 写 Flink 代码的过程中,经常会有不提示的情况。
蛋疼。。。
这个就是 Scala 的引入包的问题,常用的用下面这几个,mark 下,
import org.apache.flink.streaming.api.scala._import org.apache.flink.table.api._import org.apache.flink.table.api.bridge.scala._
注意后面的下划线,就是整包导入的意思,类似Java里面的星号 * 。
这样处理后,在我们用点提示的时候,对应的api方法就会出来了,可以选择自己需要的实现方法。
附加,使用 flink sql 版本的 wordcount(写wordcount还是api好用,用sql麻烦。。)
import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala._import org.apache.flink.table.api._import org.apache.flink.table.api.bridge.scala._/** * @Author: shipfei * @Date: 2021/3/4 13:26 * motto: Saying and doing are two different things. */object WcBySql { val filePath = "D:\\dev\\workspace\\eclipse-workspace\\bigdata-realtime\\my-demo\\input\\wc.txt" def main(args: Array[String]): Unit = { // 1. create table env (创建表执行环境) val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, settings) // 2. inputStream => dataStream(流转化,输入流 map 转化成需要的实体类) val inputStream: DataStream[String] = env.readTextFile(filePath) val dataStream: DataStream[(String, Int)] = inputStream.flatMap(_.split(" ")).map((_, 1)) // 3. dataStream => inputTable(数据流转化成inputTable表) val inputTable = tableEnv.fromDataStream[(String, Int)](dataStream, $"word", $"count") val resultTable = inputTable.groupBy($"word").select($"word", $"count".sum) // 4. sink print resultTable.toRetractStream[(String, Int)].print("result") // 5. 任务执行 env.execute("WcBySql Job") }}
转载地址:http://yulji.baihongyu.com/