五分钟左右为你展示如何创建一个Spring Cloud Stream的应用程序,它是如何从消息中间件中接收并输出接收的信息到console,这里的消息中间件有两种选择:RabbitMQ和Kafka,本文以RabbitMQ为准
这节主要简化官方文档为两步:
打开项目目录,新建一个moudle,名为FirstStream,pom文件如下
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.Apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cnblogs.hellxz</groupId> <artifactId>FirstStream</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>FirstStream</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-parent</artifactId> <version>Dalston.SR5</version> <relativePath/> </parent> <dependencies> <!-- Spring boot 测试用 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- Stream rabbit 依赖中包含 binder-rabbit,所以只需导入此依赖即可 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.0.RELEASE</version> </dependency> </dependencies> </project>
在com.cnblogs.hellxz包下添加启动类,并添加
@SpringBootApplication@EnableBinding(Sink.class)public class FirstStreamApp { public static void mAIn(String[] args) { SpringApplication.run(FirstStreamApp.class, args); } @StreamListener(Sink.INPUT) public void receive(Object payload) { logger.info("Received: " + payload); }}
启动项目,我们去查看RabbitMQ的网页 http://localhost:15672 点击Connections,发现现在已经有一个连接进来了,我们刚才的项目,在Queues中也有一个队列被创建,我的是
input.anonymous.L92bTj6FRTyOC0QE-Pl0HA,我们点开那个唯一的队列,往下拉点开publish message,payload处输入一个hello world,点Publlish message发送一个消息
查看控制台,你会看到Received: hello world
spring.rabbitmq.host=<rabbitMQ所在的ip>
spring.rabbitmq.port=<端口号>
spring.rabbitmq.username=<登录用户名>
spring.rabbitmq.password=<密码>
Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架,是一个基于Spring Boot 创建的独立生产级的,使用Spring Integration提供连接到消息代理的Spring应用。介绍持久发布 - 订阅(persistent publish-subscribe)的语义,消费组(consumer groups)和分区(partitions)的概念。
你可以添加@EnableBinding注解在你的应用上,从而立即连接到消息代理,在方法上添加@StreamListener以使其接收流处理事件,下面的例子展示了一个Sink应用接收外部信息
@SpringBootApplication@EnableBinding(Sink.class)public class VoteRecordingSinkApplication { public static void main(String[] args) { SpringApplication.run(VoteRecordingSinkApplication.class, args); } @StreamListener(Sink.INPUT) public void processVote(Vote vote) { votingService.recordVote(vote); }}
@EnableBinding注解会带着一个或多个接口作为参数(举例中使用的是Sink的接口),一个接口往往声名了输入和输出的渠道,Spring Stream提供了Source、Sink、Processor这三个接口,你也可以自己定义接口。
下面展示的是Sink的接口内容
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input();}
@Input注解区分了一个输入channel,通过它接收消息到应用中,使用@Output注解 区分输出channel,消息通过它离开应用,使用这两个注解可以带一个channel的名字作为参数,如果未提供channel名称,则使用带注释的方法的名称。
你可以使用Spring Cloud Stream 现成的接口,也可以使用@Autowired注入这个接口,下面在测试类中举例
@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic class LoggingConsumerApplicationTests { @Autowired private Sink sink; @Test public void contextLoads() { assertNotNull(this.sink.input()); }}
在前面我们测试的项目中并没有修改application.properties,自动配置得益于Spring Boot
通过 Binder ,可以方便地连接中间件,可以通过修改application.yml中的
spring.cloud.stream.bindings.input.destination 来进行改变消息中间件(对应于Kafka的topic,RabbitMQ的exchanges)
在这两者间的切换甚至不需要修改一行代码。
其中topic对应于Spring Cloud Stream中的destinations(Kafka 的topic,RabbitMQ的 exchanges)
官方文档这块原理说的有点深,就没写,详见官方文档
所有订阅指定主题的组都会收到发布消息的一个备份,每个组中只有一个成员会收到该消息;如果没有指定组,那么默认会为该应用分配一个匿名消费者组,与所有其它组处于 订阅-发布 关系中。ps:也就是说如果管道没有指定消费组,那么这个匿名消费组会与其它组一起消费消息,出现了重复消费的问题。
注意:要使用分区处理,你必须同时对生产者和消费者进行配置。
为了理解编程模型,需要熟悉下列核心概念:
接下来的例子展示完全配置且正常运行的Spring Cloud Stream应用,由INPUT接收消息转换成String 类型并打印在控制台上,然后转换出一个大写的信息返回到OUTPUT中。
@SpringBootApplication@EnableBinding(Processor.class)public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public String handle(String value) { System.out.println("Received: " + value); return value.toUpperCase(); }}
通过SendTo注解将方法内返回值转发到其他消息通道中,这里因为没有定义接收通道,提示消息已丢失,解决方法是新建一个接口,如下
public interface MyPipe{ //方法1 @Input(Processor.OUTPUT) //这里使用Processor.OUTPUT是因为要同一个管道,或者名称相同 SubscribableChannel input(); //还可以如下这样=====二选一即可========== //方法2 String INPUT = "output"; @Input(MyPipe.INPUT) SubscribableChannel input();}
然后在在上边的方法下边加一个方法,并在@EnableBinding注解中改成@EnableBinding({Processor.class, MyPipe.class})
@StreamListener(MyPipe.INPUT) public void handleMyPipe(String value) { System.out.println("Received: " + value); }
Spring Cloud Stream已经为我们提供了三个绑定消息通道的默认实现
他们的源码分别为:
public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input();} public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output();} public interface Processor extends Source, Sink {}
Sink和Source中分别通过@Input和@Output注解定义了输入通道和输出通道,通过使用这两个接口中的成员变量来定义输入和输出通道的名称,Processor由于继承自这两个接口,所以同时拥有这两个通道。
注意:拥有多条管道的时候不能有输入输出管道名相同的,否则会出现发送消息被自己接收或报错的情况
我们可以根据上述源码的方式来定义我们自己的输入输出通道,定义输入通道需要返回SubscribaleChannel接口对象,这个接口继承自MessageChannel接口,它定义了维护消息通道订阅者的方法;定义输出通道则需要返回MessageChannel接口对象,它定义了向消息通道发送消息的方法。
依照上面的内容,我们也可以创建自己的绑定通道 如果你实现了上边的MyPipe接口,那么直接使用这个接口就好
package com.cnblogs.hellxz; import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.SubscribableChannel; public interface MyPipe { //方法1// @Input(Source.OUTPUT) //Source.OUTPUT的值是output,我们自定义也是一样的// SubscribableChannel input(); //使用@Input注解标注的输入管道需要使用SubscribableChannel来订阅通道 //========二选一使用=========== //方法2 String INPUT = "output"; @Input(MyPipe.INPUT) SubscribableChannel input();}
这里用Source.OUTPUT和第二种方法 是一样的,我们只要将消息发送到名为output的管道中,那么监听output管道的输入流一端就能获得数据
@StreamListener(MyPipe.INPUT) public void receiveFromMyPipe(Object payload){ logger.info("Received: "+payload); }
我们是通过注入消息通道,并调用他的output方法声明的管道获得的MessageChannel实例,发送的消息
通过注入消息通道的方式虽然很直接,但是也容易犯错,当一个接口中有多个通道的时候,他们返回的实例都是MessageChannel,这样通过@Autowired注入的时候往往会出现有多个实例找到无法确定需要注入实例的错误,我们可以通过@Qualifier指定消息通道的名称,下面举例:
给消费者设置消费组和主题
给生产者指定通道的主题:
spring.cloud.stream.bindings.<通道名>.destination=<主题名>
消费者开启分区,指定实例数量与实例索引
生产者指定分区键