处理函数是Flink底层的函数,工作中通常用来做一些更复杂的业务处理,这次把Flink的处理函数做一次总结,处理函数分好几种,主要包括基本处理函数,keyed处理函数,window处理函数,通过源码说明和案例代码进行测试。
处理函数就是位于底层API里,熟悉处理函数能够更好的处理Flink流处理。
Flink官方文档:https://nightlies.Apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/process_function/
1. 基本处理函数(ProcessFunction)
首先我们看ProcessFunction的源码,ProcessFunction是一个抽象类,继承了AbstractRichFunction类,那么处理函数就拥有了富函数的所有特性。
1. 拥有的方法如下
processElement:编写我们的处理逻辑,每个数据到来都会走这个函数,有三个参数,第一个参数是输入值类型,第二个参数是上下文Context,第三个参数是收集器(输出)。
onTimer:定时器,通过TimerService 进行注册,当定时时间到达的时候就会执行onTimer函数。只有在KeyedStream中才可以使用。
2. 拥有的抽象类
Context:上下文抽象类,在这个类中可以获取到当前时间戳,以及时间服务timerService,可用来注册定时器和查询时间。
3. 源码
//I:输入类型
//O:收集输出类型
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessFunction() {
}
public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;
// 定时器触发时执行的逻辑 当前不支持
public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomAIn timeDomain();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
3. 测试代码
使用linux的nc服务进行端口监听,并向9999端口发送数据,然后我们通过Flink监听9999端口,并获取数据进行数据处理。
安装nc组件:
sudo yum install nc -y
开启9999端口:
nc -lk 9999
代码如下:
/**
* 处理函数测试
*/
public class ProcessFunctionTest {
public static void main(String[] args) throws Exception {
// todo 构建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 监听hadoop110服务器9999端口,获取输入流
DataStreamSource<String> streamSource = env.socketTextStream("hadoop110", 9999);
// todo 封装输入流,将数据处理成{"userName":"aa","time",xxxxx}这个结构
SingleOutputStreamOperator<JSONObject> mapStream = streamSource.map(t -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName",t);
jsonObject.put("time",System.currentTimeMillis());
return jsonObject;
});
// TODO 调用处理函数
mapStream.process(new MyProcessFunction()).print("调用处理函数接收到的数据:");
env.execute();
}
}
// 自定义处理函数
class MyProcessFunction extends ProcessFunction<JSONObject,String>{
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
System.out.println("processElement方法接收到的用户数据:"+ jsonObject.getString("userName"));
collector.collect(jsonObject.getString("userName")+"-----");
}
}
2. 按键分区处理函数(KeyedProcessFunction)
按键分区处理函数是重点,用在keyby后面,对keyedStream进行处理,keyby将会按照Key进行分区,然后将不同key的数据分配到不同并行子任务上进行执行。
KeyedProcessFunction可以使用定时器和定时服务,代码中使用定时器和定时服务查看数据和完成定时任务。
KeyedProcessFunction:处理分区数据,每个元素执行一次processElement方法
1. KeyedProcessFunction源码
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public KeyedProcessFunction() {
}
// 处理方法,每个数据都会走这个方法
public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;
// 定时器逻辑,定时器触发时会走这个方法的逻辑
public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}
public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
public abstract K getCurrentKey();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
public abstract K getCurrentKey();
}
}
2. TimerService源码
public interface TimerService {
// 不支持注册定时器的提示语
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
// 不支持删除定时器的提示语
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
// 当前处理时间
long currentProcessingTime();
// 当前水位线
long currentWatermark();
// 注册基于处理时间的定时器
void registerProcessingTimeTimer(long var1);
// 注册基于事件时间的定时器
void registerEventTimeTimer(long var1);
// 删除基于处理时间的定时器
void deleteProcessingTimeTimer(long var1);
// 删除基于事件时间的定时器
void deleteEventTimeTimer(long var1);
}
3. 测试代码:
/**
* @title: KeyedProcessFunctionTest
* @Author Tian
* @Date: 2023/3/21 22:59
* @Version 1.0
*/
public class KeyedProcessFunctionTest {
public static void main(String[] args) throws Exception {
// todo 获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 处理原始数据并增加时间戳
SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", t);
jsonObject.put("time", System.currentTimeMillis()-5000);
return jsonObject;
});
// todo sourceStream设置水位线,指定事件时间字段
sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("time");
}
}));
// todo 按照用户名分组
// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);
sourceStream.keyBy(data -> true).process(new MyKeyedProcessFunction());
// todo 调用窗口处理函数并输出
// keyedStream.process(new MyKeyedProcessFunction()).print("处理结果:");
env.execute();
}
}
/**
* 自定义窗口处理函数
*/
class MyKeyedProcessFunction extends KeyedProcessFunction<Boolean,JSONObject,String>{
@Override
public void processElement(JSONObject jsonObject, Context context, Collector<String> collector) throws Exception {
System.out.println("当前处理Key:"+ context.getCurrentKey());
System.out.println("数据到达时间:"+ context.timestamp());
System.out.println("当前处理时间:"+ context.timerService().currentProcessingTime());
System.out.println("当前水位线:"+ context.timerService().currentWatermark());
// todo 注册定时器,处理时间+1秒
context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()+1000);
// todo 返回当前key
collector.collect(jsonObject.toJSONString());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// todo 触发器触发的时候执行的逻辑 ;
super.onTimer(timestamp, ctx, out);
System.out.println("触发器触发了:"+ ctx.timestamp());
}
}
3. 窗口处理函数(ProcesswindowsFunction)
除了上面的按键分区处理函数之外,对于窗口也有函数,分两种,一种是窗口处理函数(ProcessWindowsFunction),另一种是全窗口处理函数(ProcessAllWindowsFunction),ProcessWindowFunction获得一个包含窗口所有元素的可迭代器以及一个具有时间和状态信息访问权的上下文对象,使得它比其他窗口函数提供更大的灵活性。是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止。
ProcessWindowsFunction:处理分区数据,每个窗口执行一次process方法
1. ProcessWindowsFunction源码
// IN: input,数据流中窗口任务的输入数据类型
// OUT: output,窗口任务进行计算之后的输出数据类型。
// KEY:数据中键 key 的类型。
// W:窗口的类型,是 Window 的子类型。一般情况下我们定义时间窗口, W就是 TimeWindow。
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessWindowFunction() {
}
// Process处理数据方法
public abstract void process(KEY var1, ProcessWindowFunction<IN, OUT, KEY, W>.Context var2, Iterable<IN> var3, Collector<OUT> var4) throws Exception;
// 清除窗口数据方法
public void clear(ProcessWindowFunction<IN, OUT, KEY, W>.Context context) throws Exception {
}
// context上下文包含的内容
public abstract class Context implements Serializable {
public Context() {
}
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
2. 测试代码
public class ProcessWindowsFunctionTest {
public static void main(String[] args) throws Exception {
// todo 获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 处理原始数据并增加时间戳
SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {
Thread.sleep(5000);
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", t);
jsonObject.put("time", System.currentTimeMillis()-10000);
return jsonObject;
});
// todo sourceStream设置水位线,指定事件时间字段
sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("time");
}
}));
// todo 按照用户名分组
// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);
sourceStream.keyBy(data -> data.getString("userName"))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new MyProcessWindowsFunction()).print();
env.execute();
}
}
class MyProcessWindowsFunction extends ProcessWindowFunction<JSONObject, HashMap<String,Long>, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<JSONObject> iterable, Collector<HashMap<String,Long>> collector) throws Exception {
// 创建map对象
HashMap<String, Long> result = new HashMap<>();
// 遍历窗口中的数据
for (JSONObject jsonObject : iterable) {
String userName = jsonObject.getString("userName");
if(result.containsKey(userName)){
Long aLong = result.get(userName);
result.put(userName,aLong+1);
}else {
result.put(userName,1l);
}
}
collector.collect(result);
}
}
4. 全窗口处理函数(ProcessAllWindowFunction)
ProcessAllWindowFunction和ProcessFunction类相似,都是用来对上游过来的元素做处理,不过ProcessFunction是每个元素执行一次processElement方法,ProcessAllWindowFunction是每个窗口执行一次process方法(方法内可以遍历该窗口内的所有元素);
1. ProcessAllWindowFunction源码
//IN: input,数据流中窗口任务的输入数据类型。
//OUT: output,窗口任务进行计算之后的输出数据类型。
//W:窗口的类型,是 Window 的子类型。一般情况下我们定义时间窗口, W就是 TimeWindow。
public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public ProcessAllWindowFunction() {
}
// 处理方法逻辑
public abstract void process(ProcessAllWindowFunction<IN, OUT, W>.Context var1, Iterable<IN> var2, Collector<OUT> var3) throws Exception;
// 清除窗口内数据
public void clear(ProcessAllWindowFunction<IN, OUT, W>.Context context) throws Exception {
}
// 窗口上下文信息
public abstract class Context {
public Context() {
}
public abstract W window();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
2. 测试代码
public class ProcessAllWindowFunctionTest {
public static void main(String[] args) throws Exception {
// todo 获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// todo 处理原始数据并增加时间戳
SingleOutputStreamOperator<JSONObject> sourceStream = env.socketTextStream("hadoop110", 9999).map(t -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", t);
jsonObject.put("time", System.currentTimeMillis()-10000);
return jsonObject;
});
// todo sourceStream设置水位线,指定事件时间字段
sourceStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject jsonObject, long l) {
return jsonObject.getLong("time");
}
}));
// todo 按照用户名分组
// KeyedStream<JSONObject, String> keyedStream = sourceStream.keyBy(data -> true);
sourceStream.keyBy(data -> data.getString("userName"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new MyProcessAllWindowFunction()).print();
env.execute();
}
}
class MyProcessAllWindowFunction extends ProcessAllWindowFunction<JSONObject, HashMap<String,Long>, TimeWindow>{
@Override
public void process(Context context, Iterable<JSONObject> iterable, Collector<HashMap<String, Long>> collector) throws Exception {
// 创建map对象
HashMap<String, Long> result = new HashMap<>();
// 遍历窗口中的数据
for (JSONObject jsonObject : iterable) {
String userName = jsonObject.getString("userName");
if(result.containsKey(userName)){
Long aLong = result.get(userName);
result.put(userName,aLong+1);
}else {
result.put(userName,1l);
}
}
collector.collect(result);
}
}
5. 合并流处理函数(CoProcessFunction)
对于连接流ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)。与 CoMapFunction 类似,如果是调用.flatMap()就需要传入一个 CoFlatMapFunction,需要实现 flatMap1()、flatMap2()两个方法;而调用.process()时,传入的则是一个 CoProcessFunction。
1. 源码
public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
public CoProcessFunction() {
}
// 第一条流处理方法
public abstract void processElement1(IN1 var1, CoProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
// 第二条流处理方法
public abstract void processElement2(IN2 var1, CoProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
public void onTimer(long timestamp, CoProcessFunction<IN1, IN2, OUT>.OnTimerContext ctx, Collector<OUT> out) throws Exception {
}
public abstract class OnTimerContext extends CoProcessFunction<IN1, IN2, OUT>.Context {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
}
public abstract class Context {
public Context() {
}
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
6. 连接流处理函数(ProcessJoinFunction)
ProcessJoinFunction和CoProcessFunction类似,但是有区别。
ProcessJoinFunction 用于join流操作,可以拿到两个流数据处理
CoProcessFunction 用于连接流处理,两个流数据分别处理
1. 源码
//IN1:第一条流输入类型
//IN2:第二条流处理类型
//OUT:输出类型
public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
private static final long serialVersionUID = -2444626938039012398L;
public ProcessJoinFunction() {
}
// 流join后处理逻辑,可以获取到两个流的数据
public abstract void processElement(IN1 var1, IN2 var2, ProcessJoinFunction<IN1, IN2, OUT>.Context var3, Collector<OUT> var4) throws Exception;
public abstract class Context {
public Context() {
}
public abstract long getLeftTimestamp();
public abstract long getRightTimestamp();
public abstract long getTimestamp();
public abstract <X> void output(OutputTag<X> var1, X var2);
}
}
7. 广播流处理函数(BroadcastProcessFunction)
广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
1. 源码
// IN1:输入的第一条流
// IN2:输入的第二条流
// OUT:输出类型
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
private static final long serialVersionUID = 8352559162119034453L;
public BroadcastProcessFunction() {
}
// 处理流逻辑
// IN1:输入流数据
// ReadOnlyContext:只读上下文
// Collector<OUT>:输出
public abstract void processElement(IN1 var1, BroadcastProcessFunction<IN1, IN2, OUT>.ReadOnlyContext var2, Collector<OUT> var3) throws Exception;
// 处理广播流逻辑
public abstract void processBroadcastElement(IN2 var1, BroadcastProcessFunction<IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
// 只读的上下文
public abstract class ReadOnlyContext extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.ReadOnlyContext {
public ReadOnlyContext() {
super(BroadcastProcessFunction.this);
}
}
public abstract class Context extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.Context {
public Context() {
super(BroadcastProcessFunction.this);
}
}
}
8. 按键分区的广播连接流处理函数(KeyedBroadcastProcessFunction)
按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流, 是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。
1.源码
// KS:当调用keyBy时所依赖的Key 的类型
// IN1:第一条流类型
// IN2:广播流类型
// OUT:输出类型
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
private static final long serialVersionUID = -2584726797564976453L;
public KeyedBroadcastProcessFunction() {
}
// 第一条流处理逻辑
public abstract void processElement(IN1 var1, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.ReadOnlyContext var2, Collector<OUT> var3) throws Exception;
// 广播流处理逻辑
public abstract void processBroadcastElement(IN2 var1, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context var2, Collector<OUT> var3) throws Exception;
// 定时器出发后执行的逻辑
public void onTimer(long timestamp, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext ctx, Collector<OUT> out) throws Exception {
}
public abstract class OnTimerContext extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.ReadOnlyContext {
public OnTimerContext() {
super();
}
public abstract TimeDomain timeDomain();
public abstract KS getCurrentKey();
}
public abstract class ReadOnlyContext extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.ReadOnlyContext {
public ReadOnlyContext() {
super(KeyedBroadcastProcessFunction.this);
}
public abstract TimerService timerService();
public abstract KS getCurrentKey();
}
public abstract class Context extends org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.Context {
public Context() {
super(KeyedBroadcastProcessFunction.this);
}
public abstract <VS, S extends State> void ApplyToKeyedState(StateDescriptor<S, VS> var1, KeyedStateFunction<KS, S> var2) throws Exception;
}
}