工程使用
- JDK 1.8
- Scala 2.11.12
package com.zhangblue.wc
import org.apache.flink.api.scala._
//批处理word count
object WordCount {
def main(args: Array[String]): Unit = {
//创建一个批处理的执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据
val inputPath = "D:\\software\\workspace\\FlinkTutorial\\src\\main\\resources\\hello.txt"
val inputDataSet = env.readTextFile(inputPath)
//分词之后做count
val wordCountDataSet = inputDataSet.flatMap(_.split(" "))
.map((_,1))
.groupBy(0)
.sum(1)
//打印输出
wordCountDataSet.print()
}
}
package com.zhangblue.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
/**
* 流式处理word count
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
//通过参数进行传递. 参数传递方式:--host localhost --port 7777
val parameter = ParameterTool.fromArgs(args)
val host:String = parameter.get("host")
val port:Int = parameter.getInt("port")
//创建一个流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//接收socket数据流
val testDataStream = env.socketTextStream(host,port)
//逐一读取数据
val wordCountDataStream = testDataStream.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_,1))
.keyBy(0)
.sum(1)
//打印输出
wordCountDataStream.print()
//执行任务
env.execute("Stream WordCount Example")
}
}
基于key的hash code重分区
同一个key只能在一个分区内处理, 一个分区内可以有不同的key的数据