您当前的位置:首页 > 电脑百科 > 程序开发 > 框架

新一代WebFlux框架核心技术Reactor响应式编程基本用法

时间:2023-11-27 14:53:43  来源:微信公众号  作者:Spring全家桶实战案例源码

环境:projectreactor2020.0.14

 

1. 前言

在响应式编程中,Project Reactor提供了两个核心的概念:Mono和Flux。Mono和Flux都是Reactor中的Publisher,它们可以产生并发布数据,然后可以被订阅和消费。这两个概念在WebFlux中有着广泛的应用,帮助我们实现异步和非阻塞的编程模型。

在这个主题中,我们将深入探讨Mono和Flux的基本使用。我们将了解它们如何被创建,如何订阅它们的事件,以及如何处理错误和完成通知。通过学习这些内容,你将能够更好地理解WebFlux的响应式编程模型,并能够在你的项目中有效地使用Mono和Flux。

让我们开始吧!

2. 环境依赖

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
</dependency>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-bom</artifactId>
      <version>${reactor.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

3. Mono & Flux介绍

Flux

Flux表示了0到N个元素序列,下图展示了Flux如何转换元素

新一代WebFlux框架核心技术Reactor响应式编程基本用法Flux

一个Flux<T>是一个标准的Publisher<T>,它表示一个由0到N个发射项目组成的异步序列,可选地由一个完成信号或一个错误终止。在响应式流规范中,这三种类型的信号转换为对下游订阅者的onNext、onComplete和onError方法的调用。

由于可能信号的范围很大,Flux是通用的反应式类型。请注意,所有事件,甚至是终止事件,都是可选的:只有onComplete事件才能表示一个空的有限序列,但删除onComplete事件就会得到一个无限的空序列(没什么用处,除了关于取消的测试)。类似地,无限序列不一定是空的。例如,Flux.interval(Duration)产生一个无限长的Flux<Long>,并从时钟发出规则的时标。

Mono

Mono表示了0个或1个元素序列,下图展示了Mono如何转换元素

新一代WebFlux框架核心技术Reactor响应式编程基本用法

Mono

 

Mono<T>是一个专门的发布者<T>,它通过onNext信号发出最多一个项目,然后以onComplete信号终止(Mono成功,有或没有值),或只发出一个onError信号(Mono失败)。

大多数Mono实现都希望在调用onNext之后立即对其订阅者调用onComplete。Mono.never()是一个异常值:它不会发出任何信号,这在技术上并没有被禁止,但在测试之外并不是特别有用。另一方面,onNext和onError的组合是明确禁止的。

Mono只提供了可用于` Flux `的操作符子集,有些操作符(特别是那些将Mono与另一个`Publisher`结合的操作符)会切换到`Flux`。例如,Mono#concatWith(Publisher)返回一个Flux,而Mono#then(Mono)返回另一个Mono。

注意,你可以使用Mono来表示只有完成概念的无值异步进程(类似于Runnable)。要创建一个,可以使用一个空的Mono<Void>。

4. Mono & Flux常用操作

Mono常用操作

  • 创建元素

Mono.just(T value)方法:创建一个包含指定值的Mono对象。

Mono.just(10).subscribe(System.out::println) ;

Mono.empty()方法:创建一个空的Mono对象,即不包含任何元素。

Mono.justOrEmpty(T value)方法:如果指定值不为null,则创建一个包含该值的Mono对象;否则创建一个空的Mono对象。

// 输出10
Mono.justOrEmpty(10).subscribe(System.out::println) ;
// 如果值为null,没有任何输出
Mono.justOrEmpty(null).subscribe(System.out::println) ;

新一代WebFlux框架核心技术Reactor响应式编程基本用法图片

Mono.fromCallable(Callable<? extends T> supplier)方法:创建一个Mono对象,该对象包含通过调用给定Callable对象的call()方法得到的返回值。

// 通过Callable方式,我们可以在内部执行其它一些动作
Mono.fromCallable(() -> 666).subscribe(System.out::println) ;

新一代WebFlux框架核心技术Reactor响应式编程基本用法

Mono.fromSupplier(Supplier<? extends T> supplier)方法:创建一个Mono对象,该对象包含通过调用给定Supplier对象的get()方法得到的返回值。

Mono.fromSupplier(() -> 666).subscribe(System.out::println) ;

新一代WebFlux框架核心技术Reactor响应式编程基本用法图片

Mono.fromFuture(CompletableFuture<? extends Integer> future)方法:创建一个Mono对象,该对象包含通过调用给定CompletableFuture对象

Mono.fromFuture(CompletableFuture.supplyAsync(() -> 666)).subscribe(System.out::println) ;

新一代WebFlux框架核心技术Reactor响应式编程基本用法

  • 异常处理

下面这个示例完整的展示了当发生异常后的处理

public static Mono<Users> invoke(Mono<Users> user) {
  return user.flatMap(u -> {
    if ("admin".equals(u.getName())) {
      return Mono.error(new RuntimeException("越权")) ;
    }
    u.setName(u.getName() + " - ");
    return Mono.just(u) ;
  });
}


public static void mAIn(String[] args) {
  invoke(Mono.just(new Users("admin")))
    .doOnNext(System.out::println)
    .doOnError(e -> {
      System.out.println(e.getMessage()) ;
    })
    // .onErrorResume(e -> Mono.just(new Users(e.getMessage() + " - fallback"))) // 功能更强,可以对捕获的异常进行响应的处理,然后再返回一个值
    .onErrorReturn(new Users("return")) // 捕获异常,简单粗暴直接返回一个静态值
    .doOnNext(System.out::println)
    .subscribe(); 


}
  • 执行结果
越权
Users [name=return]
  • 错误操作符

新一代WebFlux框架核心技术Reactor响应式编程基本用法

  • 错误返回操作符

新一代WebFlux框架核心技术Reactor响应式编程基本用法

  • 连接操作符

将该Mono的发射与提供的发布者连接(不交错)。

Mono.just(10).concatWith(Mono.just(20)).subscribe(System.out::println) ;

新一代WebFlux框架核心技术Reactor响应式编程基本用法图片

  • then操作符

该操作符是在当前Mono执行完成后切换到另外一个Mono。

Mono.just(10).doOnNext(System.out::println)
  .then(Mono.just(666)) // 切换到另外一个Mono通道, 忽略之前的Mono元素
  .doOnNext(System.out::println)
  .subscribe();

新一代WebFlux框架核心技术Reactor响应式编程基本用法

Flux常用操作

  • 创建元素

just():直接使用元素创建Flux,即在创建Flux时拿到数据,之后有谁订阅它,就重新发送数据给订阅者。

Flux.just(1, 2, 3...)

新一代WebFlux框架核心技术Reactor响应式编程基本用法图片

fromArray()、fromIterable()和fromStream():可以从数组、Iterable对象或Stream对象中创建Flux对象。

Flux.fromArray(new String[]{"1","2","3"});
Flux.fromIterable(List.of("a","b","c"));
Flux.fromStream(List.of("a","b","c").stream());

fromArray

新一代WebFlux框架核心技术Reactor响应式编程基本用法

fromIterable

新一代WebFlux框架核心技术Reactor响应式编程基本用法

fromStream

新一代WebFlux框架核心技术Reactor响应式编程基本用法

empty():创建一个不包含任何元素,只发布结束消息的序列。

新一代WebFlux框架核心技术Reactor响应式编程基本用法

range(int start, int count):创建包含从start起始的count个数量的Integer对象的序列。

Flux.range(1, 10) ;

新一代WebFlux框架核心技术Reactor响应式编程基本用法图片

  • 错误处理
Flux.error(new RuntimeException("错误")).onErrorResume(ex -> Mono.just("发生异常:" + ex.getMessage())).subscribe(System.out::println) ;

error操作符

新一代WebFlux框架核心技术Reactor响应式编程基本用法

onErrorResume操作符

新一代WebFlux框架核心技术Reactor响应式编程基本用法

  • when操作符将给定的Publisher<T>聚合成一个新的Mono,当所有给定的来源都完成时,这个Mono就会被满足。错误将导致未执行的结果被取消,并立即向返回的Mono发送错误。
Flux.just(1, 3, 6).flatMap(id -> {
  Mono<Integer> query = Mono.fromSupplier(() -> {
    System.out.println("查询数据...") ;
    return id * 10 ;
  }).delayElement(Duration.ofSeconds(2)) ;
  Mono<String> save = Mono.fromSupplier(() -> {
    System.out.println("保存数据...") ;
    return "success - " + id ;
  }) ;
  return Mono.when(query, save) ;
}).doOnComplete(() -> {
  System.out.println("执行完成...") ;
}).subscribe() ;

新一代WebFlux框架核心技术Reactor响应式编程基本用法图片

  • filterWhen操作符
Flux.just(1,2,3,4,5,6).concatMap(item -> Mono.just(item).filterWhen(r -> {
  return Mono.just(r % 2 == 0) ;
})).subscribe(System.out::println) ;

新一代WebFlux框架核心技术Reactor响应式编程基本用法

总之,Reactor中的Flux和Mono是响应式编程的核心组件,它们提供了丰富的操作符和方法来处理异步数据流。因此,对于使用WebFlux的开发者来说,掌握Reactor的使用是非常重要的。

完毕!!!



Tags:框架   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Web Components实践:如何搭建一个框架无关的AI组件库
一、让人又爱又恨的Web ComponentsWeb Components是一种用于构建可重用的Web元素的技术。它允许开发者创建自定义的HTML元素,这些元素可以在不同的Web应用程序中重复使用,并且...【详细内容】
2024-04-03  Search: 框架  点击:(7)  评论:(0)  加入收藏
Htmx,它到底是框架还是库?
在最近的前端开发技术的探讨中,htmx经常成为热议的话题。一些人批评它,认为尽管htmx批评现代前端框架过于复杂,但它自己却似乎也是一个复杂的框架。这种看法值得我们深入思考。...【详细内容】
2024-03-28  Search: 框架  点击:(16)  评论:(0)  加入收藏
五大跨平台桌面应用开发框架:Electron、Tauri、Flutter等
一、什么是跨平台桌面应用开发框架跨平台桌面应用开发框架是一种工具或框架,它允许开发者使用一种统一的代码库或语言来创建能够在多个操作系统上运行的桌面应用程序。传统上...【详细内容】
2024-02-26  Search: 框架  点击:(46)  评论:(0)  加入收藏
Spring Security权限控制框架使用指南
在常用的后台管理系统中,通常都会有访问权限控制的需求,用于限制不同人员对于接口的访问能力,如果用户不具备指定的权限,则不能访问某些接口。本文将用 waynboot-mall 项目举例...【详细内容】
2024-02-19  Search: 框架  点击:(39)  评论:(0)  加入收藏
Go Gin框架实现优雅地重启和停止
在Web应用程序中,有时候我们需要重启或停止服务器,无论是因为更新代码还是进行例行维护。在这种情景下,我们需要保证应用程序的可用性和数据的一致性。这就需要优雅地关闭和重...【详细内容】
2024-01-30  Search: 框架  点击:(67)  评论:(0)  加入收藏
链世界:一种简单而有效的人类行为Agent模型强化学习框架
强化学习是一种机器学习的方法,它通过让智能体(Agent)与环境交互,从而学习如何选择最优的行动来最大化累积的奖励。强化学习在许多领域都有广泛的应用,例如游戏、机器人、自动驾...【详细内容】
2024-01-30  Search: 框架  点击:(67)  评论:(0)  加入收藏
OpenHarmony - 基于ArkUI框架实现日历应用
前言对于刚刚接触OpenHarmony应用开发的开发者,最快的入门方式就是开发一个简单的应用,下面记录了一个日历应用的开发过程,通过日历应用的开发,来熟悉基本图形的绘制,ArkUI的组件...【详细内容】
2024-01-16  Search: 框架  点击:(54)  评论:(0)  加入收藏
阿里“AI替换万物”框架火爆社区,网友:偶像不需要真人了?
白交 发自 凹非寺量子位 | 公众号 QbitAIReplace Anything as you want。现在只需框住你需要保留的区域,AI就可以替换万物了!比如让霉霉穿上中国旗袍,结果发饰、服装、背景等各...【详细内容】
2024-01-15  Search: 框架  点击:(66)  评论:(0)  加入收藏
分布式事务框架选择与实践
分布式事务是处理跨多个服务的原子操作的关键概念,而选择适合应用场景的框架对于确保事务一致性至关重要。以下是几个常见的分布式事务框架,并讨论它们的使用和实践。1. XA协...【详细内容】
2024-01-05  Search: 框架  点击:(96)  评论:(0)  加入收藏
JavaScript前端框架2024年展望
Angular、Next.js、React和Solid的维护者和创作者们展望2024年,分享了他们计划中的改进。译自2024 Predictions by JavaScript Frontend Framework Maintainers,作者 Loraine...【详细内容】
2024-01-05  Search: 框架  点击:(89)  评论:(0)  加入收藏
▌简易百科推荐
Web Components实践:如何搭建一个框架无关的AI组件库
一、让人又爱又恨的Web ComponentsWeb Components是一种用于构建可重用的Web元素的技术。它允许开发者创建自定义的HTML元素,这些元素可以在不同的Web应用程序中重复使用,并且...【详细内容】
2024-04-03  京东云开发者    Tags:Web Components   点击:(7)  评论:(0)  加入收藏
Kubernetes 集群 CPU 使用率只有 13% :这下大家该知道如何省钱了
作者 | THE STACK译者 | 刘雅梦策划 | Tina根据 CAST AI 对 4000 个 Kubernetes 集群的分析,Kubernetes 集群通常只使用 13% 的 CPU 和平均 20% 的内存,这表明存在严重的过度...【详细内容】
2024-03-08  InfoQ    Tags:Kubernetes   点击:(12)  评论:(0)  加入收藏
Spring Security:保障应用安全的利器
SpringSecurity作为一个功能强大的安全框架,为Java应用程序提供了全面的安全保障,包括认证、授权、防护和集成等方面。本文将介绍SpringSecurity在这些方面的特性和优势,以及它...【详细内容】
2024-02-27  风舞凋零叶    Tags:Spring Security   点击:(52)  评论:(0)  加入收藏
五大跨平台桌面应用开发框架:Electron、Tauri、Flutter等
一、什么是跨平台桌面应用开发框架跨平台桌面应用开发框架是一种工具或框架,它允许开发者使用一种统一的代码库或语言来创建能够在多个操作系统上运行的桌面应用程序。传统上...【详细内容】
2024-02-26  贝格前端工场    Tags:框架   点击:(46)  评论:(0)  加入收藏
Spring Security权限控制框架使用指南
在常用的后台管理系统中,通常都会有访问权限控制的需求,用于限制不同人员对于接口的访问能力,如果用户不具备指定的权限,则不能访问某些接口。本文将用 waynboot-mall 项目举例...【详细内容】
2024-02-19  程序员wayn  微信公众号  Tags:Spring   点击:(39)  评论:(0)  加入收藏
开发者的Kubernetes懒人指南
你可以将本文作为开发者快速了解 Kubernetes 的指南。从基础知识到更高级的主题,如 Helm Chart,以及所有这些如何影响你作为开发者。译自Kubernetes for Lazy Developers。作...【详细内容】
2024-02-01  云云众生s  微信公众号  Tags:Kubernetes   点击:(50)  评论:(0)  加入收藏
链世界:一种简单而有效的人类行为Agent模型强化学习框架
强化学习是一种机器学习的方法,它通过让智能体(Agent)与环境交互,从而学习如何选择最优的行动来最大化累积的奖励。强化学习在许多领域都有广泛的应用,例如游戏、机器人、自动驾...【详细内容】
2024-01-30  大噬元兽  微信公众号  Tags:框架   点击:(67)  评论:(0)  加入收藏
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  HELLO程序员  微信公众号  Tags:Spring   点击:(84)  评论:(0)  加入收藏
SpringBoot如何实现缓存预热?
缓存预热是指在 Spring Boot 项目启动时,预先将数据加载到缓存系统(如 Redis)中的一种机制。那么问题来了,在 Spring Boot 项目启动之后,在什么时候?在哪里可以将数据加载到缓存系...【详细内容】
2024-01-19   Java中文社群  微信公众号  Tags:SpringBoot   点击:(86)  评论:(0)  加入收藏
花 15 分钟把 Express.js 搞明白,全栈没有那么难
Express 是老牌的 Node.js 框架,以简单和轻量著称,几行代码就可以启动一个 HTTP 服务器。市面上主流的 Node.js 框架,如 Egg.js、Nest.js 等都与 Express 息息相关。Express 框...【详细内容】
2024-01-16  程序员成功  微信公众号  Tags:Express.js   点击:(85)  评论:(0)  加入收藏
站内最新
站内热门
站内头条