哈喽,大家好,我是指北君。
最近项目中准备使用消息中间件Apache Pulsar,借着机会先做个简单了解吧。
Apache Pulsar是Apache软件基金会顶级项目,是下一代云原生分布式消息流平台。
Pulsar 作为下一代云原生分布式消息流平台,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性, 内置诸多其他系统商业版本才有的特性,是云原生时代解决实时消息流数据传输、存储和计算的最佳解决方案。
JAVA 客户端
C++ 客户端
.Net/C# 客户端
Go 客户端
NodeJS 客户端
Ruby 客户端
目前Pulsar不支持Window,下面通过Docker进行安装,可以参考官网https://pulsar.apache.org/docs/next/getting-started-docker/
同时可以安装Pulsar Manager,具体操作可以参考官方文档 https://pulsar.apache.org/docs/next/administration-pulsar-manager/
其中Pulsar Manager 是一个网页式可视化管理与监测工具,支持多环境下的动态配置。可用于管理和监测租户、命名空间、topic、订阅、broker、集群等。
打开并启动Docker Desktop后,在终端执行命令执行
_> docker search pulsar
可以查询到pulsar相关的镜像
这里我们选择分别下载红框的两个镜像,执行命令
_> docker pull apachepulsar/pulsar _> docker pull apachepulsar/pulsar-manager
docker run -it -p 6650:6650 -p 8080:8080
--mount source=pulsardata,target=/pulsar/data
--mount source=pulsarconf,target=/pulsar/conf
apachepulsar/pulsar bin/pulsar standalone
启动Pulsar Manager
docker run --name pulsar-manager -dit
-p 9527:9527 -p 7750:7750
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/Application.properties
apachepulsar/pulsar-manager
添加用户:
for /f "tokens=1" %A in ('curl http://localhost:7750/pulsar-manager/csrf-token') do set CSRF_TOKEN=%A
curl -X PUT "X-XSRF-TOKEN: %CSRF_TOKEN%" -H "Cookie: XSRF-TOKEN=%CSRF_TOKEN%;"
-H "Content-Type: application/json" -d "{"name": "admin", "password": "123456", "description": "super user admin", "emAIl": "admin@test.com"}"
"http://localhost:7750/pulsar-manager/users/superuser"
访问:
http://localhost:9527/
用户名密码:admin/123456
配置environments:
这里需要保证Pulsar Manager应用服务能够访问到Pulsar应用,由于都是通过Docker部署,配置Service URL需要使用网络IP,不要用localhost。
管理界面:
@Data
@ConfigurationProperties(prefix = "pulsar")
public class PulsarProperties {
private String cluster;
private String namespace;
private String serverUrl;
private String token;
}
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
public class PulsarBootstrapConfiguration {
private final PulsarProperties properties;
public PulsarBootstrapConfiguration(PulsarProperties properties) {
this.properties = properties;
}
@Bean(destroyMethod = "close")
public PulsarClient pulsarClient() throws PulsarClientException {
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(properties.getServerUrl());
return clientBuilder.build();
}
@Bean
public PulsarProducerFactory pulsarProducerFactory() throws PulsarClientException {
return new PulsarProducerFactory(pulsarClient(), properties);
}
@Bean
public PulsarConsumerFactory pulsarConsumerFactory() throws PulsarClientException {
return new PulsarConsumerFactory(pulsarClient(), properties);
}
}
@Slf4j
@SpringBootApplication
public class PulsarApplication implements SmartInitializingSingleton {
@Autowired
private PulsarConsumerFactory consumerFactory;
public static void main(String[] args) {
SpringApplication.run(PulsarApplication.class,args);
}
@Override
public void afterSingletonsInstantiated() {
startConsumerListener();
}
private void startConsumerListener(){
Consumer<String> consumer = createConsumer();
if( consumer != null ){
while (!Thread.currentThread().isInterrupted()){
CompletableFuture<? extends Message<?>> completableFuture = consumer.receiveAsync();
Message<?> message = null;
try {
message = completableFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("错误",e);
} catch (ExecutionException e) {
log.error("错误",e);
}
if( message!=null ){
try {
log.info(" 接收消息:{} ", message.getValue() );
consumer.acknowledge(message);
} catch (PulsarClientException e) {
consumer.negativeAcknowledge(message);
throw new RuntimeException(e);
}
}
}
}
}
private Consumer<String> createConsumer() {
try {
return consumerFactory.getConsumer(Constants.TOPIC_DEMO);
} catch (PulsarClientException e) {
log.error("创建consumer出错:{}", e.getMessage(),e);
}
return null;
}
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class PulsarBootTests {
@Autowired
private PulsarProducerFactory producerFactory;
@Test
public void sendMessage() throws PulsarClientException {
Producer producer = producerFactory.getProducer(Constants.TOPIC_DEMO);
producer.send(" 测试消息: " + new Date());
producer.close();
}
}
2023-02-05 12:05:14.043 INFO 23472 --- [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl : [TOPIC_DEMO] [sub-TOPIC_DEMO] [7c2b2] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2023-02-05 12:06:16.425 INFO 23472 --- [ main] com.sucl.pulsar.PulsarApplication : 接收消息: 测试消息: Sun Feb 05 12:06:16 CST 2023
该篇主要通过官网对Apache Pulsar做了简单的了解与尝试,同时基于SpringBoot,以简单的示例代码实现了消息的发送与接收,其中各个组件仅仅使用了默认的配置,在生产环境需要根据Pulsar的特性以及官方API使其具有扩展性与易用性。