您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > Go语言

基于dispatcher模式的事件与数据分发处理器的go语言实现

时间:2023-01-19 17:41:03  来源:今日头条  作者:研道鸠摩智

背景

在实际项目中,我们经常需要异步处理事件与数据。比如MVC模型中处理请求的Filter链,又如在Nginx中或是linux的iptables中,都会有一个处理链条,来一步步的顺序处理一个请求。此外基于集中存储与分发的模式,实现事件与数据的异步处理,对于提升系统响应程度,实现业务处理的解耦至关重要。本文以eosc(一个高性能中间件开发框架)中的代码为例子,看看如何在我们的实际项目中,实现这样的功能

代码

eosc提供了关于dispatcher的关键实现的两个文件,分别是dispatch.go和data-dispatch.go,具体的代码地址是
https://Github.com/eolinker/eosc/tree/mAIn/common/dispatcher

这两个文件中实现的结构体与接口的关系如图所示:

dispatcher关键接口与结构体的关系

  1. dispatch.go文件

在dispatch.go文件中,esco提供了IEvent、CallBackHandler、IListener三个重要的接口。

同时通过CallBackFunc来实现接口CallBackHandler, tListener来实现IListener

//2个接口
type CallBackHandler interface {
   DataEvent(e IEvent) error
}

type IListener interface {
   Leave()
   Event() <-chan IEvent
}
/*CallBackFunc实现了CallBackHandler,同时CallBackFunc又是一个接受IEvent为参数,
返回error的函数*/
type CallBackFunc func(e IEvent) error

func (f CallBackFunc) DataEvent(e IEvent) error {
   return f(e)
}
//实现了IListener接口
func (t *tListener) Leave() {
   t.Once.Do(func() {
      atomic.StoreUint32(&t.closed, 1)
      close(t.c)
   })
}

func (t *tListener) Event() <-chan IEvent {
   return t.c
}

注意:tListener还提供了一个Handler方法,这个方法的参数与返回结果与CallBackFunc一样,也就是说它实现的Handler方法是一种CallBackFunc,这个在后面的分发处理逻辑的注册中会用到

func (t *tListener) Handler(e IEvent) error {
   if atomic.LoadUint32(&t.closed) == 0 {
      t.c <- e
      return nil
   }
   return ErrorIsClosed
}

2 .data-dispatch.go文件

该文件提供了两种dispatcher创建方法,分别是NewDataDispatchCenter、NewEventDispatchCenter。这两个方法都是创建了DataDispatchCenter结构体(这个结构体后面会讲到),但是启动的处理协程不同,NewDataDispatchCenter启动的是doDataLoop,NewEventDispatchCenter启动的是doEventLoop.

//两种DispatchCenter创建方法
func NewDataDispatchCenter() IDispatchCenter {
   ctx, cancelFunc := context.WithCancel(context.Background())
   center := &DataDispatchCenter{
      ctx:          ctx,
      cancelFunc:   cancelFunc,
      addChannel:   make(chan *_CallbackBox, 10),
      eventChannel: make(chan IEvent),
   }
   go center.doDataLoop()
   return center
}

func NewEventDispatchCenter() IDispatchCenter {
   ctx, cancelFunc := context.WithCancel(context.Background())
   center := &DataDispatchCenter{
      ctx:          ctx,
      cancelFunc:   cancelFunc,
      addChannel:   make(chan *_CallbackBox, 10),
      eventChannel: make(chan IEvent),
   }
   go center.doEventLoop()
   return center
}

//DataDispatchCenter 数据广播中心
type DataDispatchCenter struct {
	addChannel   chan *_CallbackBox
	eventChannel chan IEvent

	ctx        context.Context
	cancelFunc context.CancelFunc
}

DataDispatchCenter这个结构体中有两个chan,一个是addChannel,一个是eventChannel,

addChannel

接受_CallbackBox,这个_CallbackBox提供了逻辑处理Handler

eventChannel

接受IEvent,触发

doEventLoop逻辑:

NewEventDispatchCenter方法中启动的doEventLoop,逻辑相对简单,创建的channels用于存储addChannel发送过来的_CallbackBox,即事件处理Handler.当eventChannel收到事件后,遍历channels中的每一个_CallbackBox,并调用相应的Handler处理。

doEventLoop状态图

具体代码可以查看
https://github.com/eolinker/eosc/blob/main/common/dispatcher/data-dispatch.go#L48

doDataLoop逻辑:

NewDataDispatchCenter方法中启动的doDataLoop,这个逻辑稍微复杂点。其实它的大致流程和doEventLoop,不同的是每个新增加的_CallbackBox,需要对当前接收并缓存的所有Event键值对进行处理。而doEventLoop是不会的,新增加的_CallbackBox,只会对在它之后接收的Event生效。下面的代码InitEvent(data.GET())很有意思。

  1. 首先InitEvent实现了IEvent接口,是一种IEvent。
  2. type InitEvent map[string]map[string][]byte (代码链接:https://github.com/eolinker/eosc/blob/main/common/dispatcher/data.go#L88)InitEvent是一个map,可以通过InitEvent(data.GET())初始化。
func (d *DataDispatchCenter) doDataLoop() {
   data := NewMyData(nil)
   channels := make([]*_CallbackBox, 0, 10)
   isInit := false
   for {
      select {
      case event, ok := <-d.eventChannel:
         if ok {
            isInit = true
            data.DoEvent(event)
            next := channels[:0]
            for _, c := range channels {
               if err := c.handler(event); err != nil {
                  close(c.closeChan)
                  continue
               }
               next = Append(next, c)
            }
            channels = next
         }
      case hbox, ok := <-d.addChannel:
         {
            if ok {
               if !isInit {
                  channels = append(channels, hbox)
               } else {
                  if err := hbox.handler(InitEvent(data.GET())); err == nil {
                     channels = append(channels, hbox)
                  }
               }
            }
         }
      }

   }
}

应用

创建EventServer

type EventServer struct {
   IDispatchCenter
}
func NewEventServer() *EventServer {
	es := &EventServer{
		IDispatchCenter: NewDataDispatchCenter(),
	}
	return es
}

定义事件

type MyEvent struct {
	namespace string
	key       string
	event     string
	data      []byte
}

func (m *MyEvent) Namespace() string {
	return m.namespace
}

func (m *MyEvent) Event() string {
	return m.event
}

func (m *MyEvent) Key() string {
	return m.key
}

func (m *MyEvent) Data() []byte {
	return m.data
}

定义Handler并注册

func Handler(e IEvent) error {
   //根据自己的业务要求
}
es.Register(Handler)

发送事件

es.Send(&MyEvent{
   namespace: "a",
   key:       "b",
   event:     "set",
   data:      []byte(fmt.Sprint(index)),
})


Tags:dispatcher   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
基于dispatcher模式的事件与数据分发处理器的go语言实现
背景在实际项目中,我们经常需要异步处理事件与数据。比如MVC模型中处理请求的Filter链,又如在nginx中或是linux的iptables中,都会有一个处理链条,来一步步的顺序处理一个请求。...【详细内容】
2023-01-19  Search: dispatcher  点击:(252)  评论:(0)  加入收藏
▌简易百科推荐
宝藏级Go语言开源项目——教你自己动手开发互联网搜索引擎
DIYSearchEngine 是一个能够高速采集海量互联网数据的开源搜索引擎,采用 Go 语言开发。Github 地址:https://github.com/johnlui/DIYSearchEngine运行方法首先,给自己准备一杯...【详细内容】
2024-03-12  OSC开源社区    Tags:Go语言   点击:(29)  评论:(0)  加入收藏
Go Gin框架实现优雅地重启和停止
在Web应用程序中,有时候我们需要重启或停止服务器,无论是因为更新代码还是进行例行维护。在这种情景下,我们需要保证应用程序的可用性和数据的一致性。这就需要优雅地关闭和重...【详细内容】
2024-01-30  源自开发者  微信公众号  Tags:Go   点击:(70)  评论:(0)  加入收藏
如何让Go程序以后台进程或daemon方式运行
本文探讨了如何通过Go代码实现在后台运行的程序。最近我用Go语言开发了一个WebSocket服务,我希望它能在后台运行,并在异常退出时自动重新启动。我的整体思路是将程序转为后台...【详细内容】
2024-01-26  Go语言圈  微信公众号  Tags:Go程序   点击:(61)  评论:(0)  加入收藏
深入Go底层原理,重写Redis中间件实战
Go语言以其简洁、高效和并发性能而闻名,深入了解其底层原理可以帮助我们更好地利用其优势。在本文中,我们将探讨如何深入Go底层原理,以及如何利用这些知识重新实现一个简单的Re...【详细内容】
2024-01-25  547蓝色星球    Tags:Go   点击:(74)  评论:(0)  加入收藏
Go 内存优化与垃圾收集
Go提供了自动化的内存管理机制,但在某些情况下需要更精细的微调从而避免发生OOM错误。本文将讨论Go的垃圾收集器、应用程序内存优化以及如何防止OOM(Out-Of-Memory)错误。Go...【详细内容】
2024-01-15  DeepNoMind  微信公众号  Tags:Go   点击:(64)  评论:(0)  加入收藏
Go函数指针是如何让你的程序变慢的?
导读Go 语言的常规优化手段无需赘述,相信大家也能找到大量的经典教程。但基于 Go 的函数值问题,业界还没有太多深度讨论的内容分享。本文作者根据自己对 Go 代码的使用与调优...【详细内容】
2024-01-15  腾讯云开发者  微信公众号  Tags:Go函数   点击:(91)  评论:(0)  加入收藏
Go编程中调用外部命令的几种场景
在很多场合, 使用Go语言需要调用外部命令来完成一些特定的任务, 例如: 使用Go语言调用Linux命令来获取执行的结果,又或者调用第三方程序执行来完成额外的任务。在go的标准库...【详细内容】
2024-01-09  suntiger    Tags:Go编程   点击:(116)  评论:(0)  加入收藏
Go 语言不支持并发读写 Map,为什么?
Go语言的map类型不支持并发读写的主要原因是并发读写会导致数据竞态(data race),这意味着多个 goroutine 可能同时访问并修改同一个 map,从而引发不确定的结果。在Go语言的设计...【详细内容】
2024-01-05  Go语言圈  微信公众号  Tags:Go 语言   点击:(83)  评论:(0)  加入收藏
Go微服务入门到容器化实践
Go微服务入门到容器化实践Go 是一门高效、现代化、快速增长的编程语言,非常适合构建 Web 应用程序。而 Docker 是一种轻量级的容器化技术,能够使得您的应用程序在任何地方运行...【详细内容】
2024-01-01  大雷家吃饭    Tags:Go微服务   点击:(69)  评论:(0)  加入收藏
你是否想知道如何应对高并发?Go语言为你提供了答案!
并发编程是当前软件领域中不可忽视的一个关键概念。随着CPU等硬件的不断发展,我们都渴望让我们的程序运行速度更快、更快。而Go语言在语言层面天生支持并发,充分利用现代CPU的...【详细内容】
2023-12-29  灵墨AI探索室  微信公众号  Tags:Go语言   点击:(114)  评论:(0)  加入收藏
相关文章
    无相关信息
站内最新
站内热门
站内头条