您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > javascript

JDK9响应式流使用详解

时间:2022-02-03 12:12:37  来源:  作者:蜜糖的代码注释

上文中咱们简单提到了JDK9中Flow接口中的静态内部类实现了响应式流的JAVA API,并且提供了一个一个Publisher的实现类SubmissionPublisher。本文将先梳理一下接口中具体的处理流程,然后再以几个调用者的例子来帮助大家理解。

JDK9中的实现

再放上一下上文中的响应式流的交互流程:

  1. 订阅者向发布者发送订阅请求。
  2. 发布者根据订阅请求生成令牌发送给订阅者。
  3. 订阅者根据令牌向发布者发送请求N个数据。
  4. 发送者根据订阅者的请求数量返回M(M<=N)个数据
  5. 重复3,4
  6. 数据发送完毕后由发布者发送给订阅者结束信号

该流程的角度是以接口调用的交互来说的,而考虑实际的coding工作中,我们的调用流程其实为:

  1. 创建发布者
  2. 创建订阅者
  3. 订阅令牌交互
  4. 发送信息

接下来我们按照这个流程来梳理一下代码细节。

创建发布者

对于实现响应流的最开始的步骤,便是创建一个发布者。之前提到在JDK9中提供了一个发布者的简单实现SubmissionPublisher。SubmissionPublisher继承自Flow.Publisher,他有三种构造函数:

    public SubmissionPublisher() {
        this(ASYNC_POOL, Flow.defaultBufferSize(), null);
    }
    
    public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
        this(executor, maxBufferCapacity, null);
    }

    public SubmissionPublisher(Executor executor, int maxBufferCapacity,
                               BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler)

SubmissionPublisher将使用Executor作为“线程池”向订阅者发送信息。如果需要需要设置线程池的话可以自己传入,否则的话再无参的构造函数中将默认使用ForkJoinPool类的commonPool()方法获取,即无餐构造方法中的ASYNC_POOL静态变量。

SubmissionPublisher会为每一个订阅者单独的建立一个缓冲空间,其大小由入参maxBufferCapacity决定。默认情况下直接使用Flow.defaultBufferSize()来设置,默认为256。如果缓冲区满了之后会根据发送信息时候的策略确定是阻塞等待还是抛弃数据。

SubmissionPublisher会在订阅者发生异常的时候(onNext处理中),会调用最后一个参数handler方法,然后才会取消订阅。默认的时候为null,也就是不会处理异常。

最简单的创建SubmissionPublisher的方法就是直接使用无参构造方法:

SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

上文书说到,因为SubmissionPublisher实现了AutoCloseable接口,所以可以用try来进行资源回收可以省略close()的调用:

try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
}

但是也可以手动的调用close()方法来显示的关闭发布者,关闭后再发送数据就会抛出异常:

if (complete)
    throw new IllegalStateException("Closed");

创建订阅者

上文中咱们没有手动创建订阅者,而是直接调用SubmissionPublisher中的consume方法使用其内部的订阅者来消费消息。在本节可以实现接口Flow.Subscriber<T>创建一个SimpleSubscriber类:

public class SimpleSubscriber implements Flow.Subscriber<Integer> {
    private Flow.Subscription subscription;
    /**
     * 订阅者名称
     */
    private String name;
    /**
     * 定义最大消费数量
     */
    private final long maxCount;
    /**
     * 计数器
     */
    private long counter;
    public SimpleSubscriber(String name, long maxCount) {
        this.name = name;
        this.maxCount = maxCount <= 0 ? 1 : maxCount;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.printf("订阅者:%s,最大消费数据: %d。%n", name, maxCount);
        // 实际上是等于消费全部数据
        subscription.request(maxCount);
    }
    @Override
    public void onNext(Integer item) {
        counter++;
        System.out.printf("订阅者:%s 接收到数据:%d.%n", name, item);
        if (counter >= maxCount) {
            System.out.printf("准备取消订阅者: %s。已处理数据个数:%d。%n", name, counter);
            // 处理完毕,取消订阅
            subscription.cancel();
        }
    }
    @Override
    public void onError(Throwable t) {
        System.out.printf("订阅者: %s,出现异常: %s。%n", name, t.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.printf("订阅者: %s 处理完成。%n", name);
    }
}

SimpleSubscriber是一个简单订阅者类,其逻辑是根据构造参数可以定义其名称name与最大处理数据值maxCount,最少处理一个数据。

当发布者进行一个订阅的时候会生成一个令牌Subscription作为参数调用onSubscribe方法。在订阅者需要捕获该令牌作为后续与发布者交互的纽带。一般来说在onSubscribe中至少调用一次request且参数需要>0,否则发布者将无法向订阅者发送任何信息,这也是为什么maxCount需要大于0。

当发布者开始发送数据后,会异步的调用onNext方法并将数据传入。该类中使用了一个计数器对数据数量进行校验,当达到最大值的时候,则会通过令牌(subscription)异步通知发布者订阅结束,然后发送者再异步的调用发订阅者的onComplete方法,以处理完成流程。

其中的onError和onComplete方法只进行打印,这里就不再说了。

以上的这个订阅者可以看作是一个push模型的实现,因为当开始订阅时订阅者就约定了需要接受的数量,然后在后续的处理(onNext)中不再请求新数据。

我们可以用以下的代码创建一个名称为S1,消费2个元素的订阅者:

SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);

订阅令牌交互

当我们可以创建了发送者和订阅者之后,我们需要确认一下进行交互的顺序,由于响应流的处理就是对于事件的处理,所以事件的顺序十分重要,具体顺序如下:

  1. 我们创建一个发布者publisher一个订阅者subscriber
  2. 订阅者subscriber通过调用发布者的subscribe()方法进行信息订阅。如果订阅成功,则发布者将生成一个令牌(Subscription)并作为入参调用订阅者的订阅事件方法onSubscribe()。如果调用异常则会直接调用订阅者的onError错误处理方法,并抛出IllegalStateException异常然后结束订阅。
  3. 在onSubscribe()中,订阅者需要通过调用令牌(Subscription)的请求方法request(long)来异步的向发布者请求数据。
  4. 当发布者有数据可以发布的时候,则会异步的调用订阅者的onNext()方法,直到所有消息的总数已经满足了订阅者调用request的数据请求上限。所以当订阅者请求订阅的消息数为Long.MAX_VALUE时,实际上是消费所有数据,即push模式。如果发布者没有数据要发布了,则可以会调用发布者自己的close()方法并异步的调用所有订阅者的onComplete()方法来通知订阅结束。
  5. 发布者可以随时向发布者请求更多的元素请求(一般在onNext里),而不用等到之前的处理完毕,一般是与之前的数据数量进行累加。
  6. 放发布者遇到异常的时候会调用订阅者的onError()方法。

上面的描述中是只使用的一个订阅者来进行描述的,后面的例子中将说明发布者可以拥有多个订阅者(甚至0个订阅者)。

发送信息

当发布者需要推送消息的时候会调用submit方法或者offer方法,上文中我们提到submit实际上是offer的一种简单实现,本节咱们自己比较一下。

首先他们的方法签名为:

int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
int offer(T item, BiPredicate<Flow.Subscriber <? super T>,? super T> onDrop)
int submit(T item)

而submit 和 offer的直接方法为:

    public int submit(T item) {
        return doOffer(item, Long.MAX_VALUE, null);
    }
    
    public int offer(T item,
                     BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
    return doOffer(item, 0L, onDrop);

可以看到他们的底层调用的都是 doOffer 方法,而doOffer的方法签名为:

    private int doOffer(T item, long nanos,
                        BiPredicate<Subscriber<? super T>, ? super T> onDrop)

所以我们可以直接看doOffer()方法。doOffer()方法是可选阻塞时长的,而时长根据入参数nanos来决定。而onDrop()是一个删除判断器,如果调用BiPredicate的test()方法结果为true则会再次重试(根据令牌中的nextRetry属性与发布器中的retryOffer()方法组合判断,但是具体实现还没梳理明白);如果结果为flase则直接删除内容。doOffer()返回的结果为正负两种,正数的结果为发送了数据,但是订阅者还未消费的数据(估计值,因为是异步多线程的);如果为负数,则返回的是重拾次数。

所以,根据submit()的参数我们可以发现,submit会一直阻塞直到数据可以被消费(因为不会阻塞超时,所以不需要传入onDrop()方法)。而我们可以根据需要配置offer()选择器。如果必须要求数据都要被消费的话,那就可以直接选择submit(),如果要设置重试次数的话就可以选择使用offer()

异步调用的例子

下面看一个具体的程序例子,程序将以3秒为周期进行数据发布:

public class PeriodicPublisher {

    public static final int WAIT_TIME = 2;
    public static final int SLEEP_TIME = 3;

    public static void main(String[] args) {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        // 创建4订阅者
        SimpleSubscriber subscriber1 = new SimpleSubscriber("S1", 2);
        SimpleSubscriber subscriber2 = new SimpleSubscriber("S2", 4);
        SimpleSubscriber subscriber3 = new SimpleSubscriber("S3", 6);
        SimpleSubscriber subscriber4 = new SimpleSubscriber("S4", 10);
        // 前三个订阅者直接进行订阅
        publisher.subscribe(subscriber1);
        publisher.subscribe(subscriber2);
        publisher.subscribe(subscriber3);
        // 第四个方法延迟订阅
        delaySubscribeWithWaitTime(publisher, subscriber4);
        // 开始发送消息
        Thread pubThread = publish(publisher, 5);
        try {
            // 等待处理完成
            pubThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static Thread publish(SubmissionPublisher<Integer> publisher, int count) {
        Thread t = new Thread(() -> {
            IntStream.range(1,count)
                    .forEach(item ->{
                        publisher.submit(item);
                        sleep(item);
                    });
            publisher.close();
        });
        t.start();
        return t;
    }
    
    
    private static void sleep(Integer item) {
        try {
            System.out.printf("推送数据:%d。休眠 3 秒。%n", item);
            TimeUnit.SECONDS.sleep(SLEEP_TIME);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private static void delaySubscribeWithWaitTime(SubmissionPublisher<Integer> publisher, Flow.Subscriber<Integer> sub) {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(WAIT_TIME);
                publisher.subscribe(sub);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

代码后是运行结果如下:

订阅者:S1,最大消费数据: 2。
推送数据:1。休眠 3 秒。
订阅者:S3,最大消费数据: 6。
订阅者:S2,最大消费数据: 4。
订阅者:S2 接收到数据:1.
订阅者:S3 接收到数据:1.
订阅者:S1 接收到数据:1.
订阅者:S4,最大消费数据: 10。
推送数据:2。休眠 3 秒。
订阅者:S2 接收到数据:2.
订阅者:S3 接收到数据:2.
订阅者:S1 接收到数据:2.
订阅者:S4 接收到数据:2.
准备取消订阅者: S1。已处理数据个数:2。
推送数据:3。休眠 3 秒。
订阅者:S4 接收到数据:3.
订阅者:S2 接收到数据:3.
订阅者:S3 接收到数据:3.
推送数据:4。休眠 3 秒。
订阅者:S4 接收到数据:4.
订阅者:S3 接收到数据:4.
订阅者:S2 接收到数据:4.
准备取消订阅者: S2。已处理数据个数:4。
推送数据:5。休眠 3 秒。
订阅者:S3 接收到数据:5.
订阅者:S4 接收到数据:5.
订阅者: S3 处理完成。
订阅者: S4 处理完成。

由于是异步执行,所以在“接收数据”部分的顺序可能不同。

我们分析一下程序的执行流程。

  • 创建一个发布者实例
  • 创建四个订阅者实例S1、S2、S3、S4,可以接收数据的数量分别为:2、4、6、10。
  • 前三个订阅者立即订阅消息。
  • S4的订阅者单独创建一个线程等待WAIT_TIME秒(2秒)之后进行数据的订阅。
  • 新建一个线程来以SLEEP_TIME秒(3秒)为间隔发布5个数据。
  • 将publish线程join()住等待流程结束。

执行的日志满足上述流程而针对一些关键点为:

  • S4在发送者推送数据"1"的时候还未订阅,所以S4没有接收到数据"1"。
  • 当发送数据"2"的时候S1已经接收够了预期数据2个,所以取消了订阅。之后只剩下S2、S3、S4。
  • 当发送数据"4"的时候S2已经接收够了预期数据4个,所以取消了订阅。之后只剩下S3、S4。
  • 当发送数据"5"的时候只剩下S3、S4,当发送完毕后publisher调用close()方法,通知S3、S4数据处理完成。

需要注意的是,如果在最后submit完毕之后直接close()然后结束进行的话可能订阅者并不能执行完毕。但是由于在任意一次submit()之后都有一次3秒的等待,所以本程序是可以执行完毕的。

最后

本文中的例子是是简单的实现,可以通过调整订阅者中的request的参数,与在onNext中添加request调用来测试背压的效果,还可以将submit调整为offer并添加onDrop方法以观察抛弃信息时的流程。同时本文没有提供Processor的例子,各位也可以自行学习。

总结一下流程: 订阅者向发布者进行订阅,然后发布者向订阅者发送令牌。订阅者使用令牌请求消息,发送者根据请求消息的数量推送消息。订阅者可以随时异步追加需要的更多信息。

JDK9中在Flow接口中实现了Java API的4个接口,并提供了SubmissionPublisher<T>作为Publisher<T>接口的简单实现。



Tags:JDK   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
上文中咱们简单提到了JDK9中Flow接口中的静态内部类实现了响应式流的JAVA API,并且提供了一个一个Publisher的实现类SubmissionPublisher。本文将先梳理一下接口中具体的处理...【详细内容】
2022-02-03  Tags: JDK  点击:(9)  评论:(0)  加入收藏
JDK命令行工具在JDK的开发包中,除了大家熟知的java.exe和javac.exe外,还有一系列辅助工具。这些辅助工具位于JDK安装目录下的bin目录中,可以帮助开发人员很好地解决Java应用程...【详细内容】
2021-10-27  Tags: JDK  点击:(44)  评论:(0)  加入收藏
JDK 17 正式发布+免费牛逼啊,JDK 16 刚发布半年(2021/03/16),JDK 17 又如期而至(2021/09/14),这个时间点牛逼啊,蹭苹果发布会的热度?记得当年 JDK 15 的发布也是同天,巧了。。虽然 iPho...【详细内容】
2021-09-16  Tags: JDK  点击:(104)  评论:(0)  加入收藏
背景在一台阿里云服务器上需要搭建nacos集群,服务器是centos7.先是使用安装上传命令yum install lrzszrz:从本地上传文件至服务器sz filename:从服务器下载文件至本地然后各种...【详细内容】
2021-04-14  Tags: JDK  点击:(420)  评论:(0)  加入收藏
背景目前,有很多公司的WEB服务器会出现CPU、内存、IO告警,运维人员往往不能及时地获取JVM等相关信息,以便分析造成告警的原因,故本文将从几个方面来阐述如何进行JVM快照,如何分析...【详细内容】
2021-03-12  Tags: JDK  点击:(174)  评论:(0)  加入收藏
1 类加载器在类加载器家族中存在着类似人类社会的权力等级制度:1.1 Bootstrap由C/C++实现,启动类加载器,属最高层,JVM启动时创建,通常由与os相关的本地代码实现,是最根基的类加载...【详细内容】
2021-01-20  Tags: JDK  点击:(197)  评论:(0)  加入收藏
JVM话说面试这块,JVM算是一个经典的也是三年必问的知识点了,而且这个知识点算是最重要的一个知识点,你如果会这个内容,那么对你的在之后的面试中,能够喊出一个不错的价格。而关于...【详细内容】
2020-12-15  Tags: JDK  点击:(79)  评论:(0)  加入收藏
现在很多编译器和软件都开始要求使用 JDK 11 了。因此我们希望在 CentOS 上安装 JDK 11。运行下面的命令:yum install java-11-openjdk-devel如果你的系统中还装有不同版本的...【详细内容】
2020-11-05  Tags: JDK  点击:(60)  评论:(0)  加入收藏
一,前言我们都知道,tomcat启动前需要配置JDK环境变量,如果没有配置JDK的环境变量,那么tomcat启动的时候就会报错,也就是无法启动。但是在我们的工作或者学习过程中,有的时候会出现...【详细内容】
2020-10-09  Tags: JDK  点击:(88)  评论:(0)  加入收藏
JDK 15已经于2020年9月15日如期发布。本文介绍JDK 15新特性。发布版本说明根据发布的规划,这次发布的 JDK 15 将是一个短期的过度版,只会被 Oracle 支持(维护)6 个月,直到明年 3...【详细内容】
2020-09-25  Tags: JDK  点击:(86)  评论:(0)  加入收藏
▌简易百科推荐
api接口指的是应用程序编程接口(Application Programming Interface),是一些预先定义的函数,或指软件系统不同组成部分衔接的约定。用来提供应用程序与开发人员基于某软件或硬件...【详细内容】
2022-02-03  风度翩翩的番茄鸡蛋    Tags:Js   点击:(13)  评论:(0)  加入收藏
上文中咱们简单提到了JDK9中Flow接口中的静态内部类实现了响应式流的JAVA API,并且提供了一个一个Publisher的实现类SubmissionPublisher。本文将先梳理一下接口中具体的处理...【详细内容】
2022-02-03  蜜糖的代码注释    Tags:JDK   点击:(9)  评论:(0)  加入收藏
什么是闭包?相信很多人听了有点懵,然后去百度查了一下官方的定义“闭包就是能够读取其他函数内部变量的函数”发现感觉自己更懵了。关于闭包如果只看那官方的定义的话确实不是...【详细内容】
2022-01-21  编程小帅    Tags:JS   点击:(21)  评论:(0)  加入收藏
预加载图片是提高用户体验的一个很好方法。图片预先加载到浏览器中,访问者便可顺利地在你的网站上冲浪,并享受到极快的加载速度。这对图片画廊及图片占据很大比例的网站来说十...【详细内容】
2022-01-04  锋享前端    Tags:Javascript   点击:(28)  评论:(0)  加入收藏
说明面向对象的三个基本特征是:封装、继承、多态。call函数:预定义的JavaScript方法,用来调用所有者对象作为参数的方法。上节内容:面向对象,类、对象、属性、方法,静态属性和方法...【详细内容】
2022-01-04  IT小奋斗    Tags:JavaScript   点击:(34)  评论:(0)  加入收藏
JavaScript 的故事很长。在今天,JavaScript 的运行从移动设备到服务器端,无论您是计划在 2022 年学习或使用 JavaScript ,还是目前正在使用JavaScript进行开发,还是已经熟练掌握...【详细内容】
2021-12-30  Mason程    Tags:JavaScript   点击:(35)  评论:(0)  加入收藏
1、通过条件判断给变量赋值布尔值的正确姿势// badif (a === &#39;a&#39;) { b = true} else { b = false}// goodb = a === &#39;a&#39;2、在if中判断数组长度不为零...【详细内容】
2021-12-24  Mason程    Tags:JavaScript   点击:(36)  评论:(0)  加入收藏
给新手朋友分享我收藏的前端必备javascript已经写好的封装好的方法函数,直接可用。方法函数总计:41个;以下给大家介绍有35个,需要整体文档的朋友私信我,1、输入一个值,将其返回数...【详细内容】
2021-12-15  未来讲IT    Tags:JavaScript   点击:(47)  评论:(0)  加入收藏
1. 检测一个对象是不是纯对象,检测数据类型// 检测数据类型的方法封装(function () { var getProto = Object.getPrototypeOf; // 获取实列的原型对象。 var class2type =...【详细内容】
2021-12-08  前端明明    Tags:js   点击:(47)  评论:(0)  加入收藏
作者:一川来源:前端万有引力 1 写在前面Javascript中的apply、call、bind方法是前端代码开发中相当重要的概念,并且与this的指向密切相关。本篇文章我们将深入探讨这个关键词的...【详细内容】
2021-12-06  Nodejs开发    Tags:Javascript   点击:(41)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条