您当前的位置:首页 > 电脑百科 > 软件技术 > 应用软件

gocraft/work工作队列源码简介

时间:2022-03-15 11:07:52  来源:  作者:LStack朗澈云
「技术干货」gocraft/work工作队列源码简介

 

作者/蔡锡生

 

简介

 

gocraft/work是一款使用go开发的任务处理软件,通过redis存储任务队列,可以使用工作池同时处理多个任务。本文主要介绍任务注册和任务消费的源代码。

 

功能特性

 

• Fast and efficient. Faster than this, this, and this. See below for benchmarks.

 

• Reliable - don't lose jobs even if your process crashes.

 

• Middleware on jobs -- good for metrics instrumentation, logging, etc.

 

• If a job fAIls, it will be retried a specified number of times.

 

• Schedule jobs to hAppen in the future.

 

• Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.

 

• Web UI to manage failed jobs and observe the system.

 

• Periodically enqueue jobs on a cron-like schedule.

 

• Pause / unpause jobs and control concurrency within and across processes.

 

注册Job

 

注册Job流程

 

1. 创建redis client pool。

 

2. 创建对象,定义任务处理函数。

 

3. 创建任务工作池,需要传入被处理对象结构体,最大并发数,命名空间,redis client pool。

 

4. 创建Job,需要传入job名称和job处理函数, job在redis中使用列表存储,key的组成:nameSapce:job:jobName,同一namespace支持多种类型任务处理。

 

这里使用任务名称作为key存入redis, 任务处理参数存放到列表中

 

func main() {
  // Make a new pool. Arguments:
  // Context{} is a struct that will be the context for the request.
  // 10 is the max concurrency
  // "my_app_namespace" is the Redis namespace
  // redisPool is a Redis pool
  pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)

  // Add middleware that will be executed for each job
  pool.Middleware((*Context).Log)

  // Map the name of jobs to handler functions
  // pool 中的 jobTypes是一个字典,key 是任务名称, value 是 任务处理函数
  // 当有任务的时候,会将任务需要的参数 放入到redis key 为jobName的列表中
  // 第二个参数必须是 工作池对象的方法
  pool.Job("send_email", (*Context).SendEmail)

  // Customize options:
  pool.JobWithOptions("export", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)

  // Start processing jobs
  pool.Start()
  ...
}

 

发送Job

 

发送job其实调用NewEnqueuer方法向redis的列表中压入元素(具体的内容是任务参数)。

 

package main

import (
  "Github.com/gomodule/redigo/redis"
  "github.com/gocraft/work"
)

// Make a redis pool
var redisPool = &redis.Pool{
  MaxActive: 5,
  MaxIdle: 5,
  Wait: true,
  Dial: func() (redis.Conn, error) {
    return redis.Dial("tcp", ":6379")
  },
}

// Make an enqueuer with a particular namespace
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func main() {
  // Enqueue a job named "send_email" with the specified parameters.
  _, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4})
  if err != nil {
    log.Fatal(err)
  }
}

 

Woker Fetch Job

 

在New WrokPool的时候会根据并法参数concurrency,创建同等个数的woker。

 

Worker是一个job处理者,通过永久for循环,不间断的从redis的任务队列中获取任务,在处理任务的时候,协程阻塞,等待一个任务处理完,再继续下一个。

 

下面的代码是worker在for循环中的重要操作(1) fetch job (2) process job

 

func (w *worker) loop() {
  for {
    select {
     。。。
    case <-timer.C:
      job, err := w.fetchJob()
      w.process(job)
    }
  }
}

 

fetchJob本质是redis的pop,push操作。首先将redis列表中的任务移除,然后再放入到处理队列中,这个操作必须是原子操作(原子性是指事务是一个不可再分割的工作单元,事务中的操作要么都发生,要么都不发生),作者使用了lua脚本完成。最后返回一个job对象,里面有后面任务处理函数需要的args,即这里的rawJson。

 

func (w *worker) fetchJob() (*Job, error) {

   scriptArgs = append(scriptArgs, w.poolID) // ARGV[1]
  ...
   values, err := redis.Values(w.redisFetchScript.Do(conn, scriptArgs...))
    ...
   job, err := newJob(rawJSON, dequeuedFrom, inProgQueue)
    ..
   return job, nil
}

 

Worker handle Job.

 

Pool.JobWithOptions(InstallMasterJob, work.JobOptions{Priority: 1, MaxFails: 1}, ConsumeJob)

 

workpool注册任务ConsumeJob后, 该任务ConsumeJob会被赋值给worker.jobTypes[job.Name].GenericHandler, 他的反射类型被赋值给了jobType.DynamicHandler。如果该消费任务使用了上下文参数。

 

创建消费任务的2种方法

 

• If you don't need context:

 

func YourFunctionName(job *work.Job) error.

 

• If you want your handler to accept a context:

 

func (c *Context) YourFunctionName(job *work.Job) error // or,

 

func YourFunctionName(c *Context, job *work.Job) error.

 

func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool {
  jobOpts = applyDefaultsAndValidate(jobOpts)

  vfn := reflect.ValueOf(fn)
  validateHandlerType(wp.contextType, vfn)
  jt := &jobType{
    Name:           name,
    //vfn 任务消费方法的反射类型, 如果消费方法中有ctx 参数,那么会调用反射执行
    DynamicHandler: vfn, 
    JobOptions:     jobOpts,
  }
  if gh, ok := fn.(func(*Job) error); ok {
    // 用户的任务消费函数,被赋值给了jobType的GenericHandler, 如果消费方法只有一个job参数,则执行GenericHandler
    jt.IsGeneric = true
    jt.GenericHandler = gh
  }

  wp.jobTypes[name] = jt

  for _, w := range wp.workers {
    w.updateMiddlewareAndJobTypes(wp.middleware, wp.jobTypes)
  }

  return wp
}

 

执行消费任务的真正代码

 

worker对象的processJob(job * Job)方法 调用了runJob方法执行GenericHandler or DynamicHandler.Call。

 

func runJob(job *Job, ctxType reflect.Type, middleware []*middlewareHandler, jt *jobType) (returnCtx reflect.Value, returnError error) {
  。。。。
  next = func() error {
    。。。。
    if jt.IsGeneric {
      // 任务消费方法没有ctx时候执行
      return jt.GenericHandler(job)
    }
    
    // 任务消费方法有ctx时执行
    res := jt.DynamicHandler.Call([]reflect.Value{returnCtx, reflect.ValueOf(job)})
    x := res[0].Interface()
    if x == nil {
      return nil
    }
    return x.(error)
  }
  ...
  returnError = next()

  return
}

 

LStack产品简介

 

面向行业应用开发商(ISV/SI)提供混合云/边缘云场景下云原生应用开发测试、交付、运维一站式服务,帮助企业采用云原生敏捷开发交付方法论,从而提高软件开发人员效率、减少运维成本,加快数字化转型,并最终实现业务创新。



Tags:gocraft/work   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
gocraft/work工作队列源码简介
作者/蔡锡生 简介 gocraft/work是一款使用go开发的任务处理软件,通过redis存储任务队列,可以使用工作池同时处理多个任务。本文主要介绍任务注册和任务消费的源代码。 功能特...【详细内容】
2022-03-15  Search: gocraft/work  点击:(340)  评论:(0)  加入收藏
▌简易百科推荐
系统优化工具,Ultimate Windows Tweaker软件体验
电脑上的Windows优化工具年年都有,每年还会翻着花样地出现新东西,都不带重复的。每个人都可以上来折腾一番Windows...从这个角度来说,Windows系统还挺“稳定”的,经得起各种用户...【详细内容】
2024-04-10  果核剥壳    Tags:系统优化   点击:(4)  评论:(0)  加入收藏
Telegram怎么不显示在线?
在Telegram中,您可以通过进入“设置” -> “隐私与安全” -> “最后在线时间”,然后选择“没有人”或者自定义特定的人群,以隐藏自己的在线状态。这样设置后,其他用户将无法看到...【详细内容】
2024-04-04  HouseRelax    Tags:Telegram   点击:(4)  评论:(0)  加入收藏
谷歌 Gmail 新规生效:为遏制钓鱼 / 欺诈情况,日群发超 5000 封邮件账号需验证
IT之家 4 月 2 日消息,谷歌为了增强对垃圾邮件和网络钓鱼攻击的管控,今天宣布正式启用新措施:对于向 Gmail 邮箱账号日群发数量超过 5000 封的用户,需要其在域名中设置 SPF / DK...【详细内容】
2024-04-02    IT之家  Tags:Gmail   点击:(14)  评论:(0)  加入收藏
钉钉AI升级多模态:能根据图片识人、翻译、创作、多轮问答
新浪科技讯 3月28日午间消息,钉钉AI助理迎来升级,上线图片理解、文档速读、工作流等产品能力,探索多模态、长文本与RPA技术在AI应用的落地。基于阿里通义千问大模型,升级后的钉...【详细内容】
2024-03-28    新浪科技  Tags:钉钉   点击:(16)  评论:(0)  加入收藏
都2024年了,谁还在用QQ聊天啊?
你还在用 QQ 吗?之所以突然这么问,是因为前些天腾讯发了份热气腾腾的财报。随手翻了翻,发现 QQ 这个老企鹅,居然还有5.54 亿多人每个月都在坚持登录。虽说和辉煌时候没法比了,但...【详细内容】
2024-03-26    差评  Tags:QQ   点击:(11)  评论:(0)  加入收藏
腾讯QQ浏览器工具权益卡上线PC端,每月最低6元
IT之家 1 月 29 日消息,腾讯 QQ 浏览器此前在手机端上线工具权益卡,现将部分权益适用范围拓展至 PC 端,每月 10 元,连续包月为 6 元。开通后用户可以在 QQ 浏览器软件内享有由腾...【详细内容】
2024-01-29    IT之家  Tags:QQ浏览器   点击:(84)  评论:(0)  加入收藏
开源工具Ventoy更新:新增对FreeBSD 14.0的支持
近日,开源装机工具Ventoy发布了1.0.97版本的更新。本次更新的主要亮点是新增了对FreeBSD 14.0版本的支持,并修复了启动问题以及解决了几个Linux独有的bug等。同时,官方还修复了...【详细内容】
2024-01-25    中关村在线  Tags:Ventoy   点击:(41)  评论:(0)  加入收藏
微软Copilot Pro来了:个人用户也能在Word里用GPT-4,20美元/月
面向个人用户的微软Copilot会员版来了。一个月多交20刀(约合人民币142元),Microsoft 365个人版/家庭版用户就能在Word、Excel、PPT等Office全家桶中用上GPT-4。就像这样,不用在C...【详细内容】
2024-01-16    量子位  Tags:Copilot Pro   点击:(94)  评论:(0)  加入收藏
微软 Edge 浏览器支持双引擎同时搜索功能,便利与槽点并存
IT之家 1 月 15 日消息,微软广告和网络服务部门首席执行官 Mikhail Parakhin 近日透露了一个微软 Edge 浏览器的隐藏功能:双引擎同时搜索。顾名思义,该功能允许用户同时使用两...【详细内容】
2024-01-16    IT之家  Tags:Edge   点击:(63)  评论:(0)  加入收藏
11个面向设计师的必备AI工具
译者 | 布加迪审校 | 重楼在当今快速发展的设计领域,人工智能(AI)工具已成为不可或缺的创新催化剂。这些工具专门用于提高效率和创造力,从而重新定义传统的设计方法。AI正在彻底...【详细内容】
2024-01-09    51CTO  Tags:AI工具   点击:(103)  评论:(0)  加入收藏
相关文章
    无相关信息
站内最新
站内热门
站内头条