Flink-Introduction

Introduction to flink

1. Start on local

1
2
3
4
5
6
7
8
9
10
#本地启动, localhost:8081进入监控页面
$ ~/opt/flink/bin/start-cluseter.sh

$ jps
7216 StandaloneSessionClusterEntrypoint
4450 RemoteMavenServer
3701 Main
7670 TaskManagerRunner
10489 Jps
7801 NailgunRunner
2. Run scala code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
object LocalFlinkTest extends App with Serializable {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = List(("a", 1), ("b", 2), ("a", 0), ("b", 5), ("c", -2), ("a", 1), ("b", 20), ("a", 11), ("b", -5), ("c", 5))
val inputs: DataStream[(String, Int)] = env.fromCollection(list)


inputs.keyBy(_._1).flatMap {
new RichFlatMapFunction[(String, Int), (String, Int)] {
private var globalLeastValueState: ValueState[Int] = _
private var keyLeastValueState: MapState[String, Int] = _

override def open(parameters: Configuration): Unit = {
val globalLeastValueDescriptor = new ValueStateDescriptor[Int]("globalLeast", classOf[Int])
val keyLeastValueStateDescriptor = new MapStateDescriptor[String, Int]("keyLeast", classOf[String], classOf[Int])

globalLeastValueState = getRuntimeContext.getState(globalLeastValueDescriptor)
keyLeastValueState = getRuntimeContext.getMapState(keyLeastValueStateDescriptor)
}

override def flatMap(in: (String, Int), collector: Collector[(String, Int)]): Unit = {
println("************************flat map*****************************")
if (globalLeastValueState.value() == null || globalLeastValueState == null) {
println("global init............")
globalLeastValueState.update(in._2)
}

if (in._2 < globalLeastValueState.value()) {
globalLeastValueState.update(in._2)
}


if (!keyLeastValueState.contains(in._1)) {
keyLeastValueState.put(in._1, in._2)
println(s"keyed ${in._1} init............")
} else if (keyLeastValueState.contains(in._1) && keyLeastValueState.get(in._1) > in._2) {
keyLeastValueState.put(in._1, in._2)
}
collector.collect(in)

println(s"global=${globalLeastValueState.value()}")
println(s"keyed=${keyLeastValueState.entries()}")
}
}
}.map(x => s"${x._1}-->>${x._2}").writeAsText(s"file:///home/quantux/test_${System.currentTimeMillis()}")

env.execute("local flink")
}
1
2
3
4
5
$ mvn clean package

$ ~/opt/flink/bin/flink run -c com.quantux.text.MainClass target/my-flink-test.jar

#localhost:8081