您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > Go语言

Golang之流式编程

时间:2020-10-19 13:24:25  来源:  作者:

流处理(Stream processing)是一种计算机编程范式,其允许给定一个数据序列(流处理数据源),一系列数据操作(函数)被应用到流中的每个元素。同时流处理工具可以显著提高程序员的开发效率,允许他们编写有效、干净和简洁的代码。

流数据处理在我们的日常工作中非常常见,举个例子,我们在业务开发中往往会记录许多业务日志,这些日志一般是先发送到Kafka,然后再由Job消费Kafaka写到elasticsearch,在进行日志流处理的过程中,往往还会对日志做一些处理,比如过滤无效的日志,做一些计算以及重新组合日志等等,示意图如下:

Golang之流式编程

 

流处理工具fx

gozero是一个功能完备的微服务框架,框架中内置了很多非常实用的工具,其中就包含流数据处理工具fx,下面我们通过一个简单的例子来认识下该工具:

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/tal-tech/go-zero/core/fx"
)

func main() {
	ch := make(chan int)

	go inputStream(ch)
	go outputStream(ch)

	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
	<-c
}

func inputStream(ch chan int) {
	count := 0
	for {
		ch <- count
		time.Sleep(time.Millisecond * 500)
		count++
	}
}

func outputStream(ch chan int) {
	fx.From(func(source chan<- interface{}) {
		for c := range ch {
			source <- c
		}
	}).Walk(func(item interface{}, pipe chan<- interface{}) {
		count := item.(int)
		pipe <- count
	}).Filter(func(item interface{}) bool {
		itemInt := item.(int)
		if itemInt%2 == 0 {
			return true
		}
		return false
	}).ForEach(func(item interface{}) {
		fmt.Println(item)
	})
}

inputStream函数模拟了流数据的产生,outputStream函数模拟了流数据的处理过程,其中From函数为流的输入,Walk函数并发的作用在每一个item上,Filter函数对item进行过滤为true保留为false不保留,ForEach函数遍历输出每一个item元素。

流数据处理中间操作

一个流的数据处理可能存在许多的中间操作,每个中间操作都可以作用在流上。就像流水线上的工人一样,每个工人操作完零件后都会返回处理完成的新零件,同理流处理中间操作完成后也会返回一个新的流。

Golang之流式编程

 

fx的流处理中间操作:

操作函数功能输入Distinct去除重复的itemKeyFunc,返回需要去重的keyFilter过滤不满足条件的itemFilterFunc,Option控制并发量Group对item进行分组KeyFunc,以key进行分组Head取出前n个item,返回新streamint64保留数量Map对象转换MapFunc,Option控制并发量Merge合并item到slice并生成新streamReverse反转itemSort对item进行排序LessFunc实现排序算法Tail与Head功能类似,取出后n个item组成新streamint64保留数量Walk作用在每个item上WalkFunc,Option控制并发量

下图展示了每个步骤和每个步骤的结果:

Golang之流式编程

 

用法与原理分析

From

通过From函数构建流并返回Stream,流数据通过channel进行存储:

// 例子
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
})

// 源码
func From(generate GenerateFunc) Stream {
	source := make(chan interface{})

	go func() {
		defer close(source)
    // 构造流数据写入channel
		generate(source)
	}()

	return Range(source)
}

Filter

Filter函数提供过滤item的功能,FilterFunc定义过滤逻辑true保留item,false则不保留:

// 例子 保留偶数
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
}).Filter(func(item interface{}) bool {
  if item.(int)%2 == 0 {
    return true
  }
  return false
})

// 源码
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
	return p.Walk(func(item interface{}, pipe chan<- interface{}) {
    // 执行过滤函数true保留,false丢弃
		if fn(item) {
			pipe <- item
		}
	}, opts...)
}

Group

Group对流数据进行分组,需定义分组的key,数据分组后以slice存入channel:

// 例子 按照首字符"g"或者"p"分组,没有则分到另一组
ss := []string{"golang", "google", "php", "Python", "JAVA", "c++"}
fx.From(func(source chan<- interface{}) {
		for _, s := range ss {
			source <- s
		}
	}).Group(func(item interface{}) interface{} {
		if strings.HasPrefix(item.(string), "g") {
			return "g"
		} else if strings.HasPrefix(item.(string), "p") {
			return "p"
		}
		return ""
	}).ForEach(func(item interface{}) {
		fmt.Println(item)
	})
}

// 源码
func (p Stream) Group(fn KeyFunc) Stream {
  // 定义分组存储map
	groups := make(map[interface{}][]interface{})
	for item := range p.source {
    // 用户自定义分组key
		key := fn(item)
    // key相同分到一组
		groups[key] = Append(groups[key], item)
	}

	source := make(chan interface{})
	go func() {
		for _, group := range groups {
      // 相同key的一组数据写入到channel
			source <- group
		}
		close(source)
	}()

	return Range(source)
}

Reverse

reverse可以对流中元素进行反转处理:

Golang之流式编程

 

// 例子
fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
  fmt.Println(item)
})

// 源码
func (p Stream) Reverse() Stream {
	var items []interface{}
  // 获取流中数据
	for item := range p.source {
		items = append(items, item)
	}
	// 反转算法
	for i := len(items)/2 - 1; i >= 0; i-- {
		opp := len(items) - 1 - i
		items[i], items[opp] = items[opp], items[i]
	}
	
  // 写入流
	return Just(items...)
}

Distinct

distinct对流中元素进行去重,去重在业务开发中比较常用,经常需要对用户id等做去重操作:

// 例子
fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
  return item
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})
// 结果为 1,2,3,4,5,6

// 源码
func (p Stream) Distinct(fn KeyFunc) Stream {
	source := make(chan interface{})

	threading.GoSafe(func() {
		defer close(source)
		// 通过key进行去重,相同key只保留一个
		keys := make(map[interface{}]lang.PlaceholderType)
		for item := range p.source {
			key := fn(item)
      // key存在则不保留
			if _, ok := keys[key]; !ok {
				source <- item
				keys[key] = lang.Placeholder
			}
		}
	})

	return Range(source)
}

Walk

Walk函数并发的作用在流中每一个item上,可以通过WithWorkers设置并发数,默认并发数为16,最小并发数为1,如设置unlimitedWorkers为true则并发数无限制,但并发写入流中的数据由defaultWorkers限制,WalkFunc中用户可以自定义后续写入流中的元素,可以不写入也可以写入多个元素:

// 例子
fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
  newItem := strings.ToUpper(item.(string))
  pipe <- newItem
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})

// 源码
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
	pipe := make(chan interface{}, option.workers)

	go func() {
		var wg sync.WaitGroup
		pool := make(chan lang.PlaceholderType, option.workers)

		for {
      // 控制并发数量
			pool <- lang.Placeholder
			item, ok := <-p.source
			if !ok {
				<-pool
				break
			}

			wg.Add(1)
			go func() {
				defer func() {
					wg.Done()
					<-pool
				}()
				// 作用在每个元素上
				fn(item, pipe)
			}()
		}

    // 等待处理完成
		wg.Wait()
		close(pipe)
	}()

	return Range(pipe)
}

并发处理

fx工具除了进行流数据处理以外还提供了函数并发功能,在微服务中实现某个功能往往需要依赖多个服务,并发的处理依赖可以有效的降低依赖耗时,提升服务的性能。

Golang之流式编程

 

fx.Parallel(func() {
  userRPC() // 依赖1
}, func() {
  accountRPC() // 依赖2
}, func() {
  orderRPC() // 依赖3
})

注意fx.Parallel进行依赖并行处理的时候不会有error返回,如需有error返回或者有一个依赖报错需要立马结束依赖请求请使用MapReduce工具进行处理。

总结

本篇文章介绍了流处理的基本概念和gozero中的流处理工具fx,在实际的生产中流处理场景应用也非常多,希望本篇文章能给大家带来一定的启发,更好的应对工作中的流处理场景。



Tags:Golang   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
golang基础语法Go 程序可以由多个标记组成,可以是关键字,标识符,常量,字符串,符号。Go 语言的基础组成有以下几个部分: 包声明 引入包 函数 变量 语句 & 表达式 注释基本语法介绍...【详细内容】
2022-06-20  Tags: Golang  点击:(14)  评论:(0)  加入收藏
开发环境IDE安装工欲善其事,必先利其器这里推荐两款golang开发工具,一个是goland,一个是VSCode;goland是收费的,收费还是有收费的道理,确实比较好用,个人比较推荐;VSCode需要安装插...【详细内容】
2022-06-15  Tags: Golang  点击:(49)  评论:(0)  加入收藏
1. 函数传参 package mainimport "fmt"func main() { slice := []int{1, 2} fmt.Printf("data:%v, len:%d, cap:%d\n", slice, len(slice), cap(slice)) updateslice(slice)...【详细内容】
2022-06-08  Tags: Golang  点击:(50)  评论:(0)  加入收藏
作者:bearluo,腾讯 IEG 运营开发工程师由于 golang 不像 java 一样有一个统一的编码模式,所以我们和其他团队一样,采用了 Go 面向包的设计和架构分层这篇文章介绍的一些理论,然后...【详细内容】
2022-05-11  Tags: Golang  点击:(40)  评论:(0)  加入收藏
GoFound 是 go语言实现的全文检索引擎 基于平衡二叉树+正排索引、倒排索引实现。可支持亿级数据,毫秒级查询。 使用简单,使用http接口,任何系统都可以使用。最大的特点是占用内...【详细内容】
2022-04-15  Tags: Golang  点击:(653)  评论:(0)  加入收藏
golang生成滑块验证码最近公司要求做一个滑块验证码,但是因为项目是孵化阶段,暂时不考虑使用第三方验证,于是只能自己写了,于是各种百度,几乎百度不到。于是我参考了一下网易易盾...【详细内容】
2022-03-22  Tags: Golang  点击:(128)  评论:(0)  加入收藏
昨天,Golang开发团队正式发Golang 1.18,可以通过官方下载页面下载,下载后注意核对校验码。 Golang 1.18 是一个功能上变化较大的大型版本,在语言层面增加了泛型、增加了Fuzzing...【详细内容】
2022-03-17  Tags: Golang  点击:(254)  评论:(0)  加入收藏
gin是作为golang web开发中被广泛使用到的框架,了解其内部的实现有助于我们更好地理解gin的设计思想。这篇文章主要探讨两个问题。 http请求如何流转到gin gin为什么比golang...【详细内容】
2022-03-14  Tags: Golang  点击:(94)  评论:(0)  加入收藏
golang context 很好用,就使用php实现了github地址 : https://github.com/qq1060656096/php-go-context context使用闭坑指南1. 将一个Context参数作为第一个参数传递给传入和...【详细内容】
2021-11-05  Tags: Golang  点击:(131)  评论:(0)  加入收藏
简介工作中经常有定时执行某些代码块的需求,如果是PHP代码,一般写个脚本,然后用Cron实现。Go里提供了两种定时器:Timer(到达指定时间触发且只触发一次)和 Ticker(间隔特定时间触发)...【详细内容】
2021-05-10  Tags: Golang  点击:(396)  评论:(0)  加入收藏
▌简易百科推荐
goshop开源项目的更新 备注:前面项目中用到的代码已经分享到GitHub中去了,并且以后所有项目中会出现的代码都会提交上去,欢迎查阅。感兴趣的可以点个star哦~gitee.com/jobhands...【详细内容】
2022-07-06  南方葵籽    Tags:Gorm   点击:(21)  评论:(0)  加入收藏
之前的一篇文章里,我提到了在VSCode和JetBrains之间反复横跳的事情,并且还觉得VSCode挺香的,Go、Python、Java等语言在VSCode中都可以获取到代码智能提示,不用死记硬背API了。没...【详细内容】
2022-06-29  BlackJ个啥    Tags:Go   点击:(70)  评论:(0)  加入收藏
构建方式对比于其他语言的程序,Go语言的跨平台能力是真的强,拿.Net和JAVA来说吧,.Net在.Net core出现之前是不能跨平台的,只能在windows上编译运行,即使是.net core出现以后,跨平...【详细内容】
2022-06-23  心有玲曦遇见你    Tags:Go语言   点击:(39)  评论:(0)  加入收藏
GO的全局变量很多从C++、C#等转过来做GO的初学小伙伴,都会发现同样一个疑问:就是GO中没有类似C++的静态变量。这就很纳闷了。有些配置数据,比如数据库的链接IP等,是需要全局共享...【详细内容】
2022-06-22  MEEGO    Tags:GO   点击:(60)  评论:(0)  加入收藏
Go 箴言 不要通过共享内存进行通信,通过通信共享内存 并发不是并行 管道用于协调;互斥量(锁)用于同步 接口越大,抽象就越弱 利用好零值 空接口 interface{} 没有任何类型约束 Gof...【详细内容】
2022-06-21  奇幻小鱼k    Tags:GO编程   点击:(39)  评论:(0)  加入收藏
协程(Goroutines)在Go语言中,每一个并发的执行单元叫作一个goroutine。,我们只需要通过 go 关键字来开启 goroutine 即可。goroutine 是轻量级线程,goroutine 的调度是由 Golang...【详细内容】
2022-06-20  VT漫步    Tags:go   点击:(27)  评论:(0)  加入收藏
golang基础语法Go 程序可以由多个标记组成,可以是关键字,标识符,常量,字符串,符号。Go 语言的基础组成有以下几个部分: 包声明 引入包 函数 变量 语句 & 表达式 注释基本语法介绍...【详细内容】
2022-06-20  VT漫步    Tags:golang   点击:(14)  评论:(0)  加入收藏
开发环境IDE安装工欲善其事,必先利其器这里推荐两款golang开发工具,一个是goland,一个是VSCode;goland是收费的,收费还是有收费的道理,确实比较好用,个人比较推荐;VSCode需要安装插...【详细内容】
2022-06-15  VT漫步    Tags:golang   点击:(49)  评论:(0)  加入收藏
1. 函数传参 package mainimport "fmt"func main() { slice := []int{1, 2} fmt.Printf("data:%v, len:%d, cap:%d\n", slice, len(slice), cap(slice)) updateslice(slice)...【详细内容】
2022-06-08  顾明同学    Tags:golang   点击:(50)  评论:(0)  加入收藏
前几天有个同学想了解下如何在go-micro中做链路跟踪,这几天正好看到wrapper这块,wrapper这个东西在某些框架中也称为中间件,里边有个opentracing的插件,正好用来做链路追踪。ope...【详细内容】
2022-05-05  萤火架构    Tags:go-micro   点击:(61)  评论:(0)  加入收藏
站内最新
站内热门
站内头条