您当前的位置:首页 > 电脑百科 > 站长技术 > 服务器

下一代MQ中间件,不来了解下?

时间:2023-02-28 14:20:53  来源:  作者:Java技术指北

哈喽,大家好,我是指北君。

最近项目中准备使用消息中间件Apache Pulsar,借着机会先做个简单了解吧。

Apache Pulsar

Apache Pulsar是Apache软件基金会顶级项目,是下一代云原生分布式消息流平台。

Pulsar 作为下一代云原生分布式消息流平台,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性, 内置诸多其他系统商业版本才有的特性,是云原生时代解决实时消息流数据传输、存储和计算的最佳解决方案。

图片

Pulsar简介

图片

  • 功能特色
    租户和命名空间(namespace)是 Pulsar 支持多租户的两个核心概念。在租户级别,Pulsar 为特定的租户预留合适的存储空间、应用授权与认证机制。在命名空间级别,Pulsar 有一系列的配置策略(policy),包括存储配额、流控、消息过期策略和命名空间之间的隔离策略。
    Pulsar 做了队列模型和流模型的统一,在 Topic 级别只需保存一份数据,同一份数据可多次消费。以流式、队列等方式计算不同的订阅模型大大提升了灵活度。
    Pulsar 使用计算与存储分离的云原生架构,数据从 Broker 搬离,存在共享存储内部。上层是无状态 Broker,复制消息分发和服务;下层是持久化的存储层 Bookie 集群。Pulsar 存储是分片的,这种构架可以避免扩容时受限制,实现数据的独立扩展和快速恢复。
    Pulsar 原生支持跨地域复制,因此 Pulsar 可以跨不同地理位置的数据中心复制数据。当数据中心中断或网络分区时,在多个数据中心存有消息副本尤为重要,提高可用性。
    Pulsar Functions 是基于 Pulsar 的轻量级流处理方式。Pulsar Functions 直接部署在 broker 节点上(或作为 Kube.NETes 集群中的容器)。通过 Pulsar Functions,Pulsar 可以直接解决许多流处理任务,简化操作。​
  • 支持客户端

JAVA 客户端

C++ 客户端

.Net/C# 客户端

Go 客户端

NodeJS 客户端

Ruby 客户端

Pulsar安装与部署

目前Pulsar不支持Window,下面通过Docker进行安装,可以参考官网​​https://pulsar.apache.org/docs/next/getting-started-docker/​​ 

同时可以安装Pulsar Manager,具体操作可以参考官方文档 ​​https://pulsar.apache.org/docs/next/administration-pulsar-manager/​

其中Pulsar Manager 是一个网页式可视化管理与监测工具,支持多环境下的动态配置。可用于管理和监测租户、命名空间、topic、订阅、broker、集群等。

  1. window环境使用docker推荐使用Docker Desktop,和linux一样可以通过docker命令管理镜像、部署容器等操作。

打开并启动Docker Desktop后,在终端执行命令执行 

_> docker search pulsar 

可以查询到pulsar相关的镜像

图片

  1. 镜像下载

这里我们选择分别下载红框的两个镜像,执行命令 

_> docker pull apachepulsar/pulsar _> docker pull apachepulsar/pulsar-manager

  1. 启动
  • 启动Pulsar
docker run -it -p 6650:6650 -p 8080:8080 
      --mount source=pulsardata,target=/pulsar/data 
      --mount source=pulsarconf,target=/pulsar/conf 
      apachepulsar/pulsar bin/pulsar standalone

启动Pulsar Manager
docker run --name pulsar-manager -dit 
      -p 9527:9527 -p 7750:7750 
      -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/Application.properties 
      apachepulsar/pulsar-manager

添加用户:

for /f "tokens=1" %A in ('curl http://localhost:7750/pulsar-manager/csrf-token') do set CSRF_TOKEN=%A
curl -X PUT "X-XSRF-TOKEN: %CSRF_TOKEN%"   -H "Cookie: XSRF-TOKEN=%CSRF_TOKEN%;" 
  -H "Content-Type: application/json" -d "{"name": "admin", "password": "123456", "description": "super user admin", "emAIl": "admin@test.com"}" 
  "http://localhost:7750/pulsar-manager/users/superuser"

访问:

http://localhost:9527/ 
用户名密码:admin/123456

配置environments:

这里需要保证Pulsar Manager应用服务能够访问到Pulsar应用,由于都是通过Docker部署,配置Service URL需要使用网络IP,不要用localhost。

图片

管理界面:

图片

Pulsar与SpringBoot集成

  • springboot version : 2.3.7.RELEASE
  • pulsar client: 2.10.2
  1. 通过Properties简单定义一些Broker相关的属性
@Data
@ConfigurationProperties(prefix = "pulsar")
public class PulsarProperties {
    
    private String cluster;
    
    private String namespace;

    private String serverUrl;

    private String token;
}
  1. 通过配置定义了一些常用的组件,比如生产、消费工厂
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
public class PulsarBootstrapConfiguration {

    private final PulsarProperties properties;

    public PulsarBootstrapConfiguration(PulsarProperties properties) {
        this.properties = properties;
    }

    @Bean(destroyMethod = "close")
    public PulsarClient pulsarClient() throws PulsarClientException {
        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(properties.getServerUrl());
        return clientBuilder.build();
    }

    @Bean
    public PulsarProducerFactory pulsarProducerFactory() throws PulsarClientException {
        return new PulsarProducerFactory(pulsarClient(), properties);
    }

    @Bean
    public PulsarConsumerFactory pulsarConsumerFactory() throws PulsarClientException {
        return new PulsarConsumerFactory(pulsarClient(), properties);
    }

}
  1. 启动服务,在服务启动后,通过实现SmartInitializingSingleton接口,完成容器基本启动(不包含Lazy的Bean)后,开始对消费者Consumer监听
@Slf4j
@SpringBootApplication
public class PulsarApplication implements SmartInitializingSingleton {

    @Autowired
    private PulsarConsumerFactory consumerFactory;

    public static void main(String[] args) {
        SpringApplication.run(PulsarApplication.class,args);
    }

    @Override
    public void afterSingletonsInstantiated() {
        startConsumerListener();
    }

    private void startConsumerListener(){
        Consumer<String> consumer = createConsumer();
        if( consumer != null ){
            while (!Thread.currentThread().isInterrupted()){
                CompletableFuture<? extends Message<?>> completableFuture = consumer.receiveAsync();
                Message<?> message = null;
                try {
                    message = completableFuture.get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("错误",e);
                } catch (ExecutionException e) {
                    log.error("错误",e);
                }

                if( message!=null ){
                    try {
                        log.info(" 接收消息:{} ", message.getValue() );
                        consumer.acknowledge(message);
                    } catch (PulsarClientException e) {
                        consumer.negativeAcknowledge(message);
                        throw new RuntimeException(e);
                    }
                }
            }
        }
    }

    private Consumer<String> createConsumer() {
        try {
            return consumerFactory.getConsumer(Constants.TOPIC_DEMO);
        } catch (PulsarClientException e) {
            log.error("创建consumer出错:{}", e.getMessage(),e);
        }
        return null;
    }
}
  1. 消息发送测试
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class PulsarBootTests {

    @Autowired
    private PulsarProducerFactory producerFactory;

    @Test
    public void sendMessage() throws PulsarClientException {
        Producer producer = producerFactory.getProducer(Constants.TOPIC_DEMO);

        producer.send(" 测试消息: " + new Date());

        producer.close();
    }

}
  1. 检查消息接收情况
 
2023-02-05 12:05:14.043  INFO 23472 --- [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [TOPIC_DEMO] [sub-TOPIC_DEMO] [7c2b2] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2023-02-05 12:06:16.425  INFO 23472 --- [           main] com.sucl.pulsar.PulsarApplication        :  接收消息: 测试消息: Sun Feb 05 12:06:16 CST 2023

结束语

该篇主要通过官网对Apache Pulsar做了简单的了解与尝试,同时基于SpringBoot,以简单的示例代码实现了消息的发送与接收,其中各个组件仅仅使用了默认的配置,在生产环境需要根据Pulsar的特性以及官方API使其具有扩展性与易用性。



Tags:MQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
RabbitMQ如何实现延迟队列?
延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。延迟队列的使用场景有以下几种: 未按时支付的订单,30 分钟过期之后取消订单。 给活...【详细内容】
2024-01-26  Search: MQ  点击:(46)  评论:(0)  加入收藏
大白话设计RocketMQ延迟消息
延迟消息一般用于:提前发送消息,延迟一段时间后才需要被处理的场景。比如:下单半小时后还未支付,则取消订单 释放库存 等。RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间...【详细内容】
2023-12-27  Search: MQ  点击:(102)  评论:(0)  加入收藏
九个问答牢记RocketMQ架构
RocketMQ是Java兄弟们常用的消息中间件,虽说常用,但对于RocketMQ架构经常忘记。究其原因就2点:忙于业务开发然后长时间不看则忘了、不理解架构设计的根本原因记不牢。本文用大...【详细内容】
2023-12-27  Search: MQ  点击:(111)  评论:(0)  加入收藏
SQL应用于LLM的程序开发利器——开源LMQL
译者 | 朱先忠审校 | 重楼我相信你听说过SQL,甚至已经掌握了它。SQL(结构化查询语言)是一种广泛用于处理数据库数据的声明性语言。根据StackOverflow的年度调查,SQL仍然是世界上...【详细内容】
2023-12-27  Search: MQ  点击:(77)  评论:(0)  加入收藏
如何应对 RocketMQ 消息堆积
这篇文章,我们聊聊如何应对 RocketMQ 消息堆积。图片1 基础概念消费者在消费的过程中,消费的速度跟不上服务端的发送速度,未处理的消息会越来越多,消息出现堆积进而会造成消息消...【详细内容】
2023-12-21  Search: MQ  点击:(71)  评论:(0)  加入收藏
解锁RocketMQ秘籍:如何保障消息顺序性?
嗨,小伙伴们!小米在这里啦!今天我们要聊的话题是社招面试中一个经典而又百思不得其解的问题&mdash;&mdash;“RocketMQ如何保证顺序性?”不用担心,小米来给你揭秘RocketMQ的秘密武...【详细内容】
2023-12-15  Search: MQ  点击:(98)  评论:(0)  加入收藏
Apache RocketMQ 5.0腾讯云落地实践
Apache RocketMQ 发展历程回顾RocketMQ 最早诞生于淘宝的在线电商交易场景,经过了历年双十一大促流量洪峰的打磨,2016年捐献给 Apache 社区,成为 Apache 社区的顶级项目,并在国...【详细内容】
2023-12-13  Search: MQ  点击:(132)  评论:(0)  加入收藏
RabbitMQ消息顺序性解密:保证消息的正确顺序
在分布式系统中,保证消息的正确顺序对于一些应用场景至关重要。而RabbitMQ作为一种流行的消息队列系统,本身并不提供严格的消息顺序保证。下面将探讨如何在使用RabbitMQ时实现...【详细内容】
2023-12-04  Search: MQ  点击:(126)  评论:(0)  加入收藏
RabbitMQ与消息限流策略的完美结合
在当今互联网时代,高并发访问已成为许多应用系统面临的常见挑战之一。对于需要处理大量请求的系统来说,如何保证系统的稳定性和可靠性是一个关键问题。RabbitMQ作为一种可靠的...【详细内容】
2023-11-27  Search: MQ  点击:(167)  评论:(0)  加入收藏
实时协作的秘诀:RabbitMQ与WebSockets的结合
实时协作是现代软件开发中非常重要的一个方面。为了实现实时协作,一种常见的做法是将消息队列与WebSocket技术相结合。其中,RabbitMQ是一个功能强大的消息队列系统,它能够有效...【详细内容】
2023-11-21  Search: MQ  点击:(177)  评论:(0)  加入收藏
▌简易百科推荐
为什么Nginx被称为“反向”代理呢?
Nginx(发音为"engine-x")是一款高性能、轻量级的开源Web服务器软件,也可用作反向代理服务器、负载均衡器和HTTP缓存。Nginx之所以有被称为“反向”代理,是因为它充当客户端设备...【详细内容】
2024-02-01  coderidea  微信公众号  Tags:Nginx   点击:(60)  评论:(0)  加入收藏
哪种服务器操作系统更好呢?
在当今的IT世界中,服务器操作系统扮演着至关重要的角色。它们是确保服务器能够高效、安全地运行的关键因素。然而,对于许多人来说,服务器操作系统的种类和特点可能是一个复杂的...【详细内容】
2024-01-30    简易百科  Tags:操作系统   点击:(76)  评论:(0)  加入收藏
什么是VPS服务器
VPS服务器是一种虚拟化技术,它将一台物理服务器划分为多个虚拟的独立服务器,每个虚拟服务器都可以拥有自己的操作系统、运行环境、应用程序等。这种技术使得每个虚拟服务器可...【详细内容】
2024-01-30    简易百科  Tags:VPS服务器   点击:(70)  评论:(0)  加入收藏
VPS服务器下载速度慢?这五招帮你提速
VPS服务器下载速度慢可能会让用户感到沮丧,尤其是对于需要大量下载和上传数据的用户。幸运的是,有一些方法可以帮助您提高VPS服务器的下载速度,使您的在线体验更加顺畅。在本文...【详细内容】
2024-01-30  IDC行业观察者    Tags:VPS服务器   点击:(57)  评论:(0)  加入收藏
美国VPS和英国VPS:地理位置对服务器性能的影响
在今天的数字时代,VPS已成为在线业务和网站托管的关键组成部分。然而,选择合适的VPS主机服务时,地理位置通常被忽视,尽管它对服务器性能有着重要的影响。本文将探讨美国VPS和英...【详细内容】
2024-01-26  IDC行业观察者    Tags:服务器   点击:(55)  评论:(0)  加入收藏
如何判断服务器所需带宽:基于业务需求和流量模式的关键考量
在选择服务器时,带宽是一个重要的考虑因素。带宽的大小直接影响到网站的加载速度和用户的访问体验。那么,如何判断服务器需要多大的带宽呢?本文将为你揭示这一关键问题的答案...【详细内容】
2024-01-26  源库科技    Tags:服务器   点击:(75)  评论:(0)  加入收藏
服务器内存空间及IO操作原理解析
服务器的内存空间分为内核空间和用户空间,而我们编写的程序通常在用户空间中运行。在进行读写操作时,我们直接操作的是用户缓冲区,而用户缓冲区的内容来自于内核缓冲区。这种内...【详细内容】
2024-01-23  王建立    Tags:服务器   点击:(44)  评论:(0)  加入收藏
如何在Java环境中安装Nginx?
1. 下载Nginx:首先,前往Nginx官方网站(https://nginx.org/en/download.html)下载新版本的Nginx。选择适合您操作系统的版本,通常有Windows、Linux和Mac等不同操作系统的版本可供...【详细内容】
2024-01-22  敲代码的小动    Tags:Nginx   点击:(61)  评论:(0)  加入收藏
服务器证书和SSL证书有啥区别?
在互联网经济时代,随着越来越多的信息以及合作都是从企业官网开始的,因此绝大多数企业都会为自己的网站配置SSL证书,以提高安全性。在接触SSL证书时,也有很多人称之为服务器证书...【详细内容】
2024-01-10  安信SSL证书    Tags:服务器证书   点击:(65)  评论:(0)  加入收藏
宝塔面板怎样部署java项目?
宝塔面板怎样部署java项目?在使用宝塔面板部署Java项目之前,需要确保已经安装了Java Development Kit (JDK)。接下来,将介绍如何使用宝塔面板来部署Java项目的步骤。步骤一:安装...【详细内容】
2024-01-09  西部数码    Tags:宝塔面板   点击:(113)  评论:(0)  加入收藏
站内最新
站内热门
站内头条