您当前的位置:首页 > 电脑百科 > 数据库 > Redis

使用Redis实现简单的事件驱动架构 「DDD、事件溯源和一致性哈希」

时间:2022-10-04 16:13:30  来源:今日头条  作者:解道Jdon

Apache Kafka 已成为大多数技术栈中的主流组件。使用 Kafka 的好处包括确保事件中的因果顺序,同时保持并行性,通过在服务器之间快速复制分区来恢复故障,等等。
然而,运行 Kafka 也面临着一系列挑战。虽然许多工程团队都希望将 Kafka 添加到他们的堆栈中并与“真正的”工程师一起赢得一席之地,但运营开销构成了强大的进入障碍。
在这篇文章中,我们将重点介绍如何构建一个看起来像传统单体应用程序但又是松散耦合的事件驱动系统的系统。为此,我们依赖于从领域驱动设计、事件溯源和一致性哈希等概念中学习。
 

有序事件
大多数系统关心事件的顺序。大多数系统中的排序仅限于所考虑的域。例如,当我们查看帖子到一个线程时,我们关心的是相对于帖子的排序。当我们查看金融系统时,排序主要限于账户。大型系统中事件的全局排序很少有用,但可能是相关的。

 

场景:帖子被添加到一个线程Thread中
假设我们对每个添加的帖子都有相当多的后期处理,这反过来会更新线程的某些属性。
这创建了一个相当好的场景来说明分区的使用。
在这种情况下,默认方法是将所有帖子发送到队列中,并让一群工作人员(或消费者)完成工作。这为我们提供了系统所需的并行性,但在我们与多个消费者打交道的那一刻,顺序就会丢失。
我们保留顺序的唯一方法是确保我们一次处理一个任务,从而才能反映该线程上发生的事情的真实顺序。
下一个明显的想法是使用每个线程的专用队列来处理相同的问题,但如果我们知道我们将生成大量线程,那立即感觉像是矫枉过正。
 

分区
分区只是将我们的排队系统分解为专门的分区。因此,如果我们从一个天真的估计开始,即8个工人每分钟能够处理1600个事件,那么我们的设计就从16个分区开始。
你可能需要做更多的工作来确保你的估计是好的,但在这个例子中,我们将以假设它是好的来工作。我们还为一个分区分配一个worker,因为我们希望每个分区都能始终保持因果排序。
现在我们需要确保一个特定线程的帖子都被路由到同一个分区。每个分区都由一个消费者管理,所以我们的排序不会被打乱。
重要的是要记住,"队列分区 "或 "专用分区 "是一个抽象的结构。它实际上只是一个队列。我们使用分区这个术语,因为它使我们很容易与该领域广泛使用的术语保持一致。
 

一致性哈希
我们将使用一致哈希散列作为一种手段,将属于特定线程的所有帖子路由到同一队列分区(或队列)。
在我们的例子中,我们将使用Murmurhash和一个由名为uHashRing的库管理这个持续体。
 

将我们的队列视为一个连续体
现在,如果我们简单地将所有的8个队列放在一个圆圈中,我们会得到这样的结果。让我们把这个称为连续体,因为第7个队列后面是第一个队列,即第0个队列。

 


现在,一致性散列允许我们使用threadId将一个给定的任务/工作映射到一个特定的队列。因此,在这种情况下,我们使用threadId作为分区的关键。
这里需要注意的一个重要方面是,我们没有把我们的队列称为后处理队列。它们不是专用队列。你可以把一个Transaction事务事件扔到这里,并期望相应的消费者(和事件处理程序)来处理它。
 

事件
在前面的几段话中,我们已经说了很多关于事件的内容,但我们还没有真正定义事件的含义。
我们的系统会把事件看成是发生在我们系统中的事实。
事实通常是指以某种方式改变了系统状态的事情(或者是失败的事情)。
例如,PostCreatedEvent发生在一个新帖子被创建时。同样地,当帖子被更新时,PostUpdatedEvent也会发生。
你可以将一个事件映射到你系统中的大多数CRUD操作。
如果将你的系统设计成领域,你会惊讶地发现一个应用服务所触发的事件的数量。
一个事件也映射了系统的周围状态。
让我们设计一个创建帖子的应用服务:
 

 

from typing import List
from .services.base import ServiceBase
from sqlalchemy.session import Session

class PostService(ServiceBase)
    def __init__(self, thread_id: UUID, params:   PostCreateAPIParams, db_session: Session):
        self.thread_id = thread_id
        self.params = params
        self.user: Union[User, None] = None
        self.post: Union[Post, None] =  None
        self.db_session = db_session
        self.errors: List[str] = []
        self.error_code: Union[str, None] = None
 
     async def __call__(self)
        return awAIt self.invoke()

     async def invoke(self):
         await self.find_thread()
         await self.verify_author()
         await self.create_post()
         await self.build_response_dao()
         await self.trigger_events()
         return self   

     async def find_thread(self):
    # truncated for brevity
    pass
    
    
     async def trigger_events(self):
    user_dao: UserDAO = UserDAO.from_orm(self.user) if self.user else None
        post_dao: PostDAO = PostDAO.from_orm(self.post) if self.post else None
        thread_dao: ThreadDAO = ThreadDAO.from_orm(self.thread) if self.thread else None

        if await self.has_errors:
            event_dao = PostCreatedEventDAO(
                user=user_dao,
                thread=thread_dao,
                post=post_dao,
                params=self.params,
                errors=self.errors,
                error_code= self.error_code
            )
        else:
            event_dao = PostCreationFailedEventDAO(
        user=user_dao,
                thread=thread_dao,
                params=self.params,
                post=post_dao
                errors=self.errors,
                error_code= self.error_code
            )     
                 
       partition_key = (
       str(self.thread.id) if self.thread else "PostCreationFailedEvent"
        )
       await SystemEventService.trigger(partition_key=partition_key, event_dao=event_dao, db_session=self.db_session)
       return self


在这个例子中, trigger_events方法决定了要发布的事实。在这种情况下,它收集了周围的上下文。这也可能包括请求参数(如传递到事件中的params属性)。然而,什么是正确捕获的上下文也取决于上下文:)。
因此,我们的最终事件可能看起来像这样。请注意,该事件没有一个 updated_at 属性,因为我们认为事件是不可改变的事实。我们不能撤消已经发生的事情。
 

 

{
   "event_name": "PostCreatedEvent", 
   "event_id": "0fb6a4d4-ae65-4f18-be44-edb9ace6b5bb",
   "event_version": "v1.0",
   "time": "2022-09-03T04:16:59.294509+00:00",
   "payload": {
           "user": { "user_id":"1a1269ee-6b6f-4325-8562-cb169a68e7b3", "is_blocked": false, "first_name": "Siddharth", "last_name": "R", "email": "sid@........},
           "post" :{ "post_id": "fa3e7b12-4908-4d53-be11-629e6f47ae90", "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", ...... },
           "thread": { "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", .....},
           "params": { .... },
           "errors": [ ... ],
           "error_code": ""
    },
    "created_at": "2022-09-03T04:16:59.294"
    "logged_at": "2022-09-03T04:16:59.294"
}


在我们的例子中,应用服务通过调用触发方法将事件转给一个叫做SystemEventsService的服务。 该方法在为我们实际发布该事件之前做了一系列的工作。它通过我们先前看到的连续体运行,根据我们传递给它的分区键识别队列(和相应的工作者worker)。 这几乎就是我们需要一致的散列的原因。这可以确保我们的事件总是由同一个分区(和工作者)处理。
因此,一旦我们为我们的任务确定了工作者,我们就要求工作者

  • 保留该事件,以备我们以后需要再来处理它
  • 将其发布给所有相关的工作者
  • 让我们订阅该任务的事件驱动型工作负载触发其工作流程。


 

将事件分配到正确的分区
 

 

@staticmethod
async def trigger(
    partition_key: str, event_dao: SystemEventDAO
):    

    try:
        worker: SystemEventPartitionConfig = await SystemEventsService._get_worker(
            partition_key=partition_key
        )
        worker_func = getattr(system_events_workers, worker.worker_name)
        log_info(msg=f"Trigger called with worker: {worker.worker_name}")
        worker_func.delay(event_dao=event_dao.json())
    except (OperationalError, ConnectionError) as e:
        log_error(msg=f"[redisError] {e}", e=e, method="trigger", loc=f"{__name__}")
    except Exception as e:
        log_error(
            msg=f"SystemEventError: {e}", e=e, method="trigger", loc=f"{__name__}"
        )
    return

@staticmethod
async def _get_worker(partition_key: str) -> SystemEventPartitionConfig:
    """For a given string it returns the worker that should process the event by running it through a murmurr hashing
    function and uses that to fetch the nodes from the continuum"""

    node = ring.get(key=partition_key)
    nodename = node.get("nodename", None)

    if not nodename:
        raise ValueError("Could not find a node in the continuum for key {node}")

    node_config = continuum.get(nodename, None)
    if not node_config:
        raise ValueError(
            "Could not find a node in the continuum for key {nodename}"
        )

    config_attrs = {"partition_key": partition_key, "partition_id": nodename}
    config_attrs = {**node_config, **config_attrs}
    return SystemEventPartitionConfig(**config_attrs)


 

事件驱动的系统
下面是最好的部分:
现在整个系统可以让你把你的应用程序作为一系列的异步事件处理程序来运行,这些处理程序可以在特定的事件上被调用。当一个事件到达正确的分区时,工作者会将该事件分配给一系列的事件处理程序。
 

 

async def create_system_event(
    task_type, event_dao: SystemEventDAO, db_session: Session = None
):
    if not db_session:
        db_session = get_session()

    system_event: Union[SystemEvent, None] = None

    try:
        if event_dao.event_name not in SYSTEM_GENERATED_REQUEST_EVENTS:
            system_event = await SystemEventsService.create(
                event_dao=event_dao, db_session=db_session
            )

            if system_event:
                log_info(
                    msg=f"system_event with id: {system_event.id} created for event_name: {system_event.event_name}"
                )
                event_dao.id = system_event.id
        else:
            log_info(
                msg=f"system_generated_request_event with name: {event_dao.event_name} ready for processing."
            )
        await EventHandler.process(event_dao=event_dao, db_session=db_session)
    except Exception as e:
        log_error(
            msg=f"Error handling events: {event_dao.event_name}: {e} \n {traceback.print_exc()}"
        )
        capture_exc(error=e)
    finally:
        db_session.close()
    return system_event


分区工作者持久化该事件,并将其分派给EventHandler。 事件处理程序是一系列可独立部署的函数,可以做任何你想做的事情。
如:

 

handlers = [
    PostInsightsGeneratorEventHandler,
    ThreadActivityManagerEventHandler,
    SpamDetectionEventHandler,
    #...,
    #....,
    ImageResizerEventHandler,
]


而我们的处理器可以以任何你喜欢的方式处理它们。在这里,我们按顺序处理它们,但它也可以并行派发:
 

 

class EventHandler:
    @staticmethod
    async def process(event_dao: SystemEventDAO, db_session: Session = None):
        log_info(msg=f'Event: {event_dao.event_name} arrived')
        
        for event_handler in handlers:
            await event_handler.process(event_dao=event_dao, db_session=db_session)
        return


处理程序本身是一个相当简单的类,它检查相关的事件。
 

 

class PostInsightsGeneratorEventHandler(event_dao: SystemEventDAO, db_session: Session):
    if not event_dao.event_name == "PostCreatedEvent" :
         return 

    log_info(msg=f"PostCreatedEventHandler called with {event_dao.event_id}")
    # Do whatever you need to here


Tags:Redis   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
16个Redis常见使用场景总结
来源:blog.csdn.net/qq_39938758/article/details/105577370目录 缓存 数据共享分布式 分布式锁 全局ID 计数器 限流 位统计 购物车 用户消息时间线timeline 消息...【详细内容】
2024-04-11  Search: Redis  点击:(4)  评论:(0)  加入收藏
Linux获取Redis 性能指标方法
一、监控指标Ø 性能指标:PerformanceØ 内存指标: MemoryØ 基本活动指标:Basic activityØ 持久性指标: PersistenceØ 错误指标:Error二、监...【详细内容】
2024-04-11  Search: Redis  点击:(4)  评论:(0)  加入收藏
Redis与缓存一致性问题
缓存一致性问题是在使用缓存系统,如Redis时经常遇到的问题。当数据在原始数据源(如数据库)中发生变化时,如何确保缓存中的数据与数据源保持一致,是开发者需要关注的关键问题。一...【详细内容】
2024-04-11  Search: Redis  点击:(3)  评论:(0)  加入收藏
Redis 不再 “开源”,未来采用 SSPLv1 和 RSALv2 许可证
Redis 官方于21日宣布修改开源协议 —— 未来所有版本都将使用 “源代码可用” 的许可证 (source-available licenses)。具体来说,Redis 将不再遵循 BSD 3-Clause...【详细内容】
2024-03-27  Search: Redis  点击:(14)  评论:(0)  加入收藏
Redis“叛逃”开源,得罪了几乎所有人
内存数据库供应商Redis近日在开源界砸下了一块“巨石”。Redis即将转向双许可模式,并实施更为严格的许可条款。官方对此次变更的公告直截了当:从Redis 7.4版本开始,Redis将在Re...【详细内容】
2024-03-25  Search: Redis  点击:(10)  评论:(0)  加入收藏
如何使用 Redis 实现消息队列
Redis不仅是一个强大的内存数据存储系统,它还可以用作一个高效的消息队列。消息队列是应用程序间或应用程序内部进行异步通信的一种方式,它允许数据生产者将消息放入队列中,然...【详细内容】
2024-03-22  Search: Redis  点击:(18)  评论:(0)  加入收藏
Redis不再 “开源”
Redis 官方今日宣布修改开源协议 —— 未来所有版本都将使用 “源代码可用” 的许可证 (source-available licenses)。具体来说,Redis 将不再遵循 BSD 3-Clause 开...【详细内容】
2024-03-21  Search: Redis  点击:(11)  评论:(0)  加入收藏
在Redis中如何实现分布式锁的防死锁机制?
在Redis中实现分布式锁是一个常见的需求,可以通过使用Redlock算法来防止死锁。Redlock算法是一种基于多个独立Redis实例的分布式锁实现方案,它通过协调多个Redis实例之间的锁...【详细内容】
2024-02-20  Search: Redis  点击:(49)  评论:(0)  加入收藏
手动撸一个 Redis 分布式锁
大家好呀,我是楼仔。今天第一天开工,收拾心情,又要开始好好学习,好好工作了。对于使用 Java 的小伙伴,其实我们完全不用手动撸一个分布式锁,直接使用 Redisson 就行。但是因为这些...【详细内容】
2024-02-19  Search: Redis  点击:(40)  评论:(0)  加入收藏
工作中Redis有哪些好用的运维工具
工作中使用 Redis 时,如果大家公司没有专业运维,可能开发人员就会面临这些运维的工作,包括 Redis 的运行状态监控,数据迁移,主从集群、切片集群的部署和运维等等。本文我就从这三...【详细内容】
2024-02-06  Search: Redis  点击:(56)  评论:(0)  加入收藏
▌简易百科推荐
16个Redis常见使用场景总结
来源:blog.csdn.net/qq_39938758/article/details/105577370目录 缓存 数据共享分布式 分布式锁 全局ID 计数器 限流 位统计 购物车 用户消息时间线timeline 消息...【详细内容】
2024-04-11    书圈  Tags:Redis   点击:(4)  评论:(0)  加入收藏
Linux获取Redis 性能指标方法
一、监控指标Ø 性能指标:PerformanceØ 内存指标: MemoryØ 基本活动指标:Basic activityØ 持久性指标: PersistenceØ 错误指标:Error二、监...【详细内容】
2024-04-11  上海天正信息科技有限    Tags:Redis   点击:(4)  评论:(0)  加入收藏
Redis与缓存一致性问题
缓存一致性问题是在使用缓存系统,如Redis时经常遇到的问题。当数据在原始数据源(如数据库)中发生变化时,如何确保缓存中的数据与数据源保持一致,是开发者需要关注的关键问题。一...【详细内容】
2024-04-11  后端Q    Tags:Redis   点击:(3)  评论:(0)  加入收藏
Redis 不再 “开源”,未来采用 SSPLv1 和 RSALv2 许可证
Redis 官方于21日宣布修改开源协议 —— 未来所有版本都将使用 “源代码可用” 的许可证 (source-available licenses)。具体来说,Redis 将不再遵循 BSD 3-Clause...【详细内容】
2024-03-27  dbaplus社群    Tags:Redis   点击:(14)  评论:(0)  加入收藏
Redis“叛逃”开源,得罪了几乎所有人
内存数据库供应商Redis近日在开源界砸下了一块“巨石”。Redis即将转向双许可模式,并实施更为严格的许可条款。官方对此次变更的公告直截了当:从Redis 7.4版本开始,Redis将在Re...【详细内容】
2024-03-25    51CTO  Tags:Redis   点击:(10)  评论:(0)  加入收藏
如何使用 Redis 实现消息队列
Redis不仅是一个强大的内存数据存储系统,它还可以用作一个高效的消息队列。消息队列是应用程序间或应用程序内部进行异步通信的一种方式,它允许数据生产者将消息放入队列中,然...【详细内容】
2024-03-22  后端Q  微信公众号  Tags:Redis   点击:(18)  评论:(0)  加入收藏
Redis不再 “开源”
Redis 官方今日宣布修改开源协议 —— 未来所有版本都将使用 “源代码可用” 的许可证 (source-available licenses)。具体来说,Redis 将不再遵循 BSD 3-Clause 开...【详细内容】
2024-03-21  OSC开源社区    Tags:Redis   点击:(11)  评论:(0)  加入收藏
在Redis中如何实现分布式锁的防死锁机制?
在Redis中实现分布式锁是一个常见的需求,可以通过使用Redlock算法来防止死锁。Redlock算法是一种基于多个独立Redis实例的分布式锁实现方案,它通过协调多个Redis实例之间的锁...【详细内容】
2024-02-20  编程技术汇    Tags:Redis   点击:(49)  评论:(0)  加入收藏
手动撸一个 Redis 分布式锁
大家好呀,我是楼仔。今天第一天开工,收拾心情,又要开始好好学习,好好工作了。对于使用 Java 的小伙伴,其实我们完全不用手动撸一个分布式锁,直接使用 Redisson 就行。但是因为这些...【详细内容】
2024-02-19  楼仔  微信公众号  Tags:Redis   点击:(40)  评论:(0)  加入收藏
工作中Redis有哪些好用的运维工具
工作中使用 Redis 时,如果大家公司没有专业运维,可能开发人员就会面临这些运维的工作,包括 Redis 的运行状态监控,数据迁移,主从集群、切片集群的部署和运维等等。本文我就从这三...【详细内容】
2024-02-06  waynaqua    Tags:Redis   点击:(56)  评论:(0)  加入收藏
站内最新
站内热门
站内头条