本文主要从以下几个方面介绍Flink的流处理API——Source
一、从集合中读取数据
二、从文件中读取数据
三、从Kafka中读取数据
四、自定义Source
数据处理的过程基本可以分为三个阶段分别是,数据从来哪里,做什么业务逻辑,落地到哪里去。
这三部分在Flink中分别被称为Source、Transform和Sink
版本:
scala:2.11.12
Kafka:0.8.2.2
Flink:1.7.2
pom.xml依赖部分(log日志的依赖一定要加上,否则当Flink从Kafka0.8中读取数据报Failed to instantiate SLF4J LoggerFactory Reported exception)
<dependencies>
<dependency>
<groupId>org.Apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.22</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.22</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>MySQL</groupId>
<artifactId>mysql-connector-JAVA</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
package xxx
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
// 样例类,传感器ID,时间戳,温度 (后面都使用这个样例类作为数据的类型)
case class SensorReading(id: String, timestamo: Long, temperature: Double){
override def toString: String = {
id+":"+ timestamo.toString + "," + temperature
}
}
/**
*从集合中读取数据
*/
object Sensor {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val stream1: DataStream[SensorReading] = environment.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718201, 15.402984393403084),
SensorReading("sensor_7", 1547718202, 6.720945201171228),
SensorReading("sensor_10", 1547718205, 38.101067604893444)
))
stream1.print("Stream1:").setParallelism(1)
environment.execute()
}
}
package xxx
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
// 样例类,传感器ID,时间戳,温度
case class SensorReading(id: String, timestamo: Long, temperature: Double){
override def toString: String = {
id+":"+ timestamo.toString + "," + temperature
}
}
/**
*从文件中读取数据
*/
object Sensor {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream2: DataStream[String] = environment.readTextFile(
"D:\Scala\Code\FlinkTest\src\main\resources\sensor.txt")
stream2.print("Stream2:").setParallelism(1)
environment.execute()
}
}
Kafka的brokerList:slave1:9092,slave2:9092,slave3:9092
zookeeper集群:slave2:2181,slave3:2181,slave3:2181
package xxx
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
/**
* 从kafka中读取数据
*/
object ReadDataFromKafka {
def main(args: Array[String]): Unit = {
// 设置读取的kafka参数
val properties = new Properties()
properties.setProperty("bootstrap.servers", "slave1:9092,slave2:9092,slave3:9092")
properties.setProperty("group.id", "flink_group1")
properties.setProperty("zookeeper.connect", "slave2:2181,slave3:2181.slave4:2181")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // key的反序列化
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // value的反序列化
properties.setProperty("auto.offset.reset", "latest") // 偏移量
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 链接kafka读取数据
val kafkaStream: DataStream[String] = environment.addSource(new FlinkKafkaConsumer08[String]("sensor",
new SimpleStringSchema(), properties))
kafkaStream.print().setParallelism(1)
environment.execute("readDataFromKafka")
}
}
package xxx
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import scala.util.Random
/**
* 自定义Source
*/
object ReadDataFromMySource {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.addSource(new MySource())
dataStream.print().setParallelism(1)
environment.execute("MySource")
}
}
class MySource extends SourceFunction[String]{
// 表示数据源是否正常运行
var running:Boolean = true
// 数据正常生成
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val random = new Random()
var temp = 1.to(10).map(
i => (i, 100 + random.nextGaussian() * 100)
)
while (running){
// 更新数值
temp = temp.map(
t=>(t._1, t._2 + random.nextGaussian())
)
// 当前时间
val curTime = System.currentTimeMillis()
temp.foreach(t=>{
sourceContext.collect(curTime+": "+ t._1 + "--> "+ t._2)
})
Thread.sleep(500)
}
}
// 取消数据生成
override def cancel(): Unit ={
running = false
}
}