您当前的位置:首页 > 电脑百科 > 程序开发 > 架构

Spring Cloud Stream使用详解及部分重点源码分析

时间:2022-03-07 13:08:40  来源:  作者:Spring全家桶实战案例

环境:Springboot2.3.12.RELEASE + Spring Cloud Hoxton.SR12 + RabbitMQ3.8.12


简介

Spring Cloud Stream是一个框架,用于构建与MQ连接的高度可伸缩的事件驱动微服务。其目的是为了简化消息在 Spring Cloud 应用程序中的开发。屏蔽了各种MQ之间的差异,使得在更换MQ的时候不需要修改代码。

Spring Cloud Stream支持多种绑定器实现,如下:

  • RabbitMQ
  • Apache Kafka
  • Kafka Streams
  • Amazon Kinesis
  • google PubSub (partner mAIntained)
  • Solace PubSub+ (partner maintained)
  • Azure Event Hubs (partner maintained)
  • AWS SQS (partner maintained)
  • AWS SNS (partner maintained)
  • Apache RocketMQ (partner maintained)

详细查看官方文档,对应每一个MQ都有一个Github地址。

Spring Cloud Stream的核心构建块是:

  • 目标绑定器(Destination Binders):负责与MQ集成的组件。
  • 目标绑定(Destination Bindings):MQ中间件与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
  • 消息(Message):生产者和消费者用来与目标绑定器(以及通过MQ与其他应用程序)通信的规范数据结构。
Spring Cloud Stream使用详解及部分重点源码分析

Stream 核心组件关系图

快速入门

依赖:

<properties>
  <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  </dependency>
</dependencies>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>${spring-cloud.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

应用配置:

spring:
  rabbitmq:
    host: localhost
    virtual-host: bus
    port: 5672
    username: xxx
    password: xxx
---
spring:
  cloud:
    stream:
      bindings:
        #自定义输入输出
        myInput:
          #指定输入通道对应的主题名
          destination: demo
        myOutput:
          destination: demo

创建消息通道绑定的接口:

public interface StreamBinding {
 
  String INPUT = "myInput";
  String OUTPUT = "myOutput";
 
  @Input(StreamBinding.INPUT)
  SubscribableChannel input();
 
  @Output(StreamBinding.OUTPUT)
  MessageChannel output();
}

通过 @Input和 @Output注解定义输入通道和输出通道名称,这里的名称与上面配置文件中的是对应的。

当定义输出通道的时候,需要返回 MessageChannel 接口对象,该接口定义了向消息通道发送消息的方法;定义输入通道时,需要返回 SubscribableChannel 接口对象,该接口集成自 MessageChannel 接口,它定义了维护消息通道订阅者的方法。

这里的Input,Output两个方法容器会分别创建一个Bean对象

创建消费者:

@Component
@EnableBinding(value = {StreamBinding.class})
public class StreamReceiver {
 
  private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);

  @StreamListener(StreamBinding.INPUT)
  public void receive(String message) {
    logger.info("接收到消息: {}", message);
  }
}

@EnableBinding 注解用来指定一个或多个定义了 @Input 或 @Output 注解的接口,以此实现对消息通道(Channel)的绑定。上面我们通过 @EnableBinding(value = {StreamClient.class}) 绑定了 StreamClient 接口,该接口是我们自己实现的对输入输出消息通道绑定的定义

@StreamListener,主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。上面我们将 receive 方法注册为 myInput 消息通道的监听处理器,当我们往这个消息通道发送信息的时候,receiver 方法会执行。

消息发送接口:

@Resource
private StreamBinding streamBinding;
@GetMApping("/send")
public void send() {
  streamBinding.output().send(MessageBuilder.withPayload("First Message...").build());
}

启动服务:

查看RabbitMQ

Spring Cloud Stream使用详解及部分重点源码分析

 

自动为我们创建了一个队列,队列的名称是以我们在配置文件中配置的开头,后面是随机生成的。这个队列会自动删除AD,服务关闭后就自动删除队列;Excl:排他的,存在该队列就不会在创建了。

修改端口后,再启动一个服务:

Spring Cloud Stream使用详解及部分重点源码分析

 

创建了2个队列,使用其中一个发送消息:

Spring Cloud Stream使用详解及部分重点源码分析

 


Spring Cloud Stream使用详解及部分重点源码分析

 

两个服务都收到了消息。

消费者组

上面启动了2个服务都能收到消息,在集群的环境下这样肯定会带来问题,如果是业务方面的就会出现重复数据,这时候我们可以通过设置分组的解决此问题。修改配置:

spring:
  cloud:
    stream:
      bindings:
        myInput:
          #指定输入通道对应的主题名
          destination: demo
          #指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列
          #多个实例会轮询的接收消息
          group: g_test
        myOutput:
          destination: demo

再次启动服务后,两个服务会轮询的接收到消息。

Spring Cloud Stream使用详解及部分重点源码分析

 

启动服务后,两个服务都同时监听同一个队列。队列也不是随机生成的了,并且队列是持久化的,服务断开后队列也不会自动删除。

消息分区

通过消费组的设置,虽然能保证同一消息只被一个消费者进行接收和处理,但是对于特殊业务情况,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能被同一个实例消费,这个就可以使用 Spring Cloud Stream 提供的消息分区功能。修改配置

spring:
  cloud:
    stream:
      bindings:
        myInput:
          #指定输入通道对应的主题名
          destination: demo
          #指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列
          #多个实例会轮询的接收消息
          group: g_test
          consumer:
            #通过该参数开启消费者分区功能
            partitioned: true
        myOutput:
          destination: demo
          producer:
            #这里的配置也可以是SpEL表达式,比如:headers['partition']通过消息header获取属性
            #这里会通过表达式及消息对象进行计算得到一个Key,然后获取key的hashCode
            # 得到hashCode以后会与partitionCount进行取模运算得到具体的分区
            partitionKeyExpression: '1' #我这里给的值就是对应的instanceIndex的值,你希望谁接收就设置谁配置的值即可
            partitionCount: 2
      #实例总数
      instanceCount: 2
      #该参数设置了当前实例的索引号,从 0 开始
      instanceIndex: 0

计算分区源码:

Spring Cloud Stream使用详解及部分重点源码分析

 

最后得到分区信息后会在消息头中放入一个scst_partition为key,partition为值的头信息。

启动多个实例后,测试发现所有的消息都只是同一个实例收到消息

Spring Cloud Stream使用详解及部分重点源码分析

 


Spring Cloud Stream使用详解及部分重点源码分析

 

交换机分别与每一个服务进行绑定使用不同的Routing Key这样在发送消息的时候就可以根据计算处理的分区进行定向发送消息了。

通过源码查看:

Spring Cloud Stream使用详解及部分重点源码分析

 

这里通过我们的配置交换机为demo。接着是获取路由key了

Spring Cloud Stream使用详解及部分重点源码分析

 

这里会从消息header中获取key = scst_partition的头信息。

这样针对使用RabbitMQ的中间件发送消息所需要的交换机及路由key就确定下来了。

完毕!!!



Tags:Spring Cloud   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
监控 Spring Cloud 微服务的实践方案
一、简介Spring Cloud是一个基于Spring Boot实现的微服务框架,它提供了丰富的微服务功能,如分布式配置、服务注册与发现、服务熔断、负载均衡等。为了更好地管理和监控这样复...【详细内容】
2023-12-19  Search: Spring Cloud  点击:(150)  评论:(0)  加入收藏
彻底解决Spring Cloud Gateway中Body读取问题
在构建微服务架构时,Spring Cloud Gateway作为一个重要的微服务网关,经常需要在过滤器(Filter)中对POST请求的Body内容进行操作,如日志记录、签名验证和权限验证等。然而,由于Requ...【详细内容】
2023-11-28  Search: Spring Cloud  点击:(167)  评论:(0)  加入收藏
Spring Cloud 实现分布式实时日志分析采集的三种方案
ELK 已经成为目前最流行的集中式日志解决方案,它主要是由Beats、Logstash、Elasticsearch、Kibana等组件组成,来共同完成实时日志的收集,存储,展示等一站式的解决方案。本文将会...【详细内容】
2023-11-27  Search: Spring Cloud  点击:(194)  评论:(0)  加入收藏
十个使用Spring Cloud和Java创建微服务的实践案例
在使用Java构建微服务时,许多人认为只要学习一些微服务设计模式就足够了,比如CQRS、SAGA或每个微服务一个数据库。虽然这是正确的,但同时学习一些通用的最佳实践也是很有意义的...【详细内容】
2023-11-10  Search: Spring Cloud  点击:(211)  评论:(0)  加入收藏
四个步骤,掌握Spring Cloud Stream
随着微服务和云原生应用程序的不断发展,事件驱动架构成为了一种实现微服务之间高效通信的主流方法。在Java Spring生态系统中,Spring Cloud Stream是一个专门为事件驱动、云原...【详细内容】
2023-10-13  Search: Spring Cloud  点击:(218)  评论:(0)  加入收藏
Spring Cloud 远程调用 OpenFeign 这些知识点,能颠覆你的认知!
环境:SpringBoot2.7.12 + Spring Cloud2021.0.71. 概述Spring Cloud Openfeign是一种声明式、模板化的HTTP客户端,主要用于在Spring Cloud微服务架构中进行服务调用。相比于传...【详细内容】
2023-10-13  Search: Spring Cloud  点击:(315)  评论:(0)  加入收藏
Spring Cloud微服务架构:构建弹性和可伸缩的云原生应用
Spring Cloud是一个开源的微服务架构框架,它基于Spring Boot构建,通过一系列的组件和工具,为开发人员提供了快速构建弹性和可伸缩的云原生应用的能力。Spring Cloud的核心概念1...【详细内容】
2023-10-12  Search: Spring Cloud  点击:(327)  评论:(0)  加入收藏
你了解Spring Cloud的这些组件吗?
今天,我要和大家一起探讨一个热门的话题:Spring Cloud。如果你在Java领域有一定的开发经验,那么你一定听说过Spring Cloud,它是一套用于构建分布式系统的开发工具,今天,我们将深入...【详细内容】
2023-09-25  Search: Spring Cloud  点击:(294)  评论:(0)  加入收藏
Spring Cloud 微服务系列之 ShardingSphere-Proxy 数据库代理
ShardingSphere-Proxy是在数据库和应用程序之间起到了一个桥梁的作用,对于应用程序来说,它不需要感知ShardingSphere-Proxy的存在,依然可以使用原来的方式操作数据库。1. 简介S...【详细内容】
2023-09-19  Search: Spring Cloud  点击:(213)  评论:(0)  加入收藏
Spring Cloud Gateway提供的简易网关实现方式,你使用过吗?
环境:SpringBoot2.5.13Spring Cloud Gateway提供了一个名为ProxyExchange的实用程序对象。你可以在常规Spring web处理程序中使用它作为方法参数。它通过镜像HTTP动词的方法...【详细内容】
2023-09-15  Search: Spring Cloud  点击:(209)  评论:(0)  加入收藏
▌简易百科推荐
对于微服务架构监控应该遵守的原则
随着软件交付方式的变革,微服务架构的兴起使得软件开发变得更加快速和灵活。在这种情况下,监控系统成为了微服务控制系统的核心组成部分。随着软件的复杂性不断增加,了解系统的...【详细内容】
2024-04-03  步步运维步步坑    Tags:架构   点击:(6)  评论:(0)  加入收藏
大模型应用的 10 种架构模式
作者 | 曹洪伟在塑造新领域的过程中,我们往往依赖于一些经过实践验证的策略、方法和模式。这种观念对于软件工程领域的专业人士来说,已经司空见惯,设计模式已成为程序员们的重...【详细内容】
2024-03-27    InfoQ  Tags:架构模式   点击:(16)  评论:(0)  加入收藏
哈啰云原生架构落地实践
一、弹性伸缩技术实践1.全网容器化后一线研发的使用问题全网容器化后一线研发会面临一系列使用问题,包括时机、容量、效率和成本问题,弹性伸缩是云原生容器化后的必然技术选择...【详细内容】
2024-03-27  哈啰技术  微信公众号  Tags:架构   点击:(12)  评论:(0)  加入收藏
DDD 与 CQRS 才是黄金组合
在日常工作中,你是否也遇到过下面几种情况: 使用一个已有接口进行业务开发,上线后出现严重的性能问题,被老板当众质疑:“你为什么不使用缓存接口,这个接口全部走数据库,这怎么能扛...【详细内容】
2024-03-27  dbaplus社群    Tags:DDD   点击:(15)  评论:(0)  加入收藏
高并发架构设计(三大利器:缓存、限流和降级)
软件系统有三个追求:高性能、高并发、高可用,俗称三高。本篇讨论高并发,从高并发是什么到高并发应对的策略、缓存、限流、降级等。引言1.高并发背景互联网行业迅速发展,用户量剧...【详细内容】
2024-03-13    阿里云开发者  Tags:高并发   点击:(8)  评论:(0)  加入收藏
如何判断架构设计的优劣?
架构设计的基本准则是非常重要的,它们指导着我们如何构建可靠、可维护、可测试的系统。下面是这些准则的转换表达方式:简单即美(KISS):KISS原则的核心思想是保持简单。在设计系统...【详细内容】
2024-02-20  二进制跳动  微信公众号  Tags:架构设计   点击:(38)  评论:(0)  加入收藏
详解基于SpringBoot的WebSocket应用开发
在现代Web应用中,实时交互和数据推送的需求日益增长。WebSocket协议作为一种全双工通信协议,允许服务端与客户端之间建立持久性的连接,实现实时、双向的数据传输,极大地提升了用...【详细内容】
2024-01-30  ijunfu  今日头条  Tags:SpringBoot   点击:(21)  评论:(0)  加入收藏
PHP+Go 开发仿简书,实战高并发高可用微服务架构
来百度APP畅享高清图片//下栽のke:chaoxingit.com/2105/PHP和Go语言结合,可以开发出高效且稳定的仿简书应用。在实现高并发和高可用微服务架构时,我们可以采用一些关键技术。首...【详细内容】
2024-01-14  547蓝色星球    Tags:架构   点击:(120)  评论:(0)  加入收藏
GraalVM与Spring Boot 3.0:加速应用性能的完美融合
在2023年,SpringBoot3.0的发布标志着Spring框架对GraalVM的全面支持,这一支持是对Spring技术栈的重要补充。GraalVM是一个高性能的多语言虚拟机,它提供了Ahead-of-Time(AOT)编...【详细内容】
2024-01-11    王建立  Tags:Spring Boot   点击:(128)  评论:(0)  加入收藏
Spring Boot虚拟线程的性能还不如Webflux?
早上看到一篇关于Spring Boot虚拟线程和Webflux性能对比的文章,觉得还不错。内容较长,抓重点给大家介绍一下这篇文章的核心内容,方便大家快速阅读。测试场景作者采用了一个尽可...【详细内容】
2024-01-10  互联网架构小马哥    Tags:Spring Boot   点击:(127)  评论:(0)  加入收藏
站内最新
站内热门
站内头条