您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > Go语言

Go语言实现海量日志收集系统

时间:2021-01-14 10:02:38  来源:  作者:

再次整理了一下这个日志收集系统的框,如下图

Go实现海量日志收集系统

 

这次要实现的代码的整体逻辑为:

Go实现海量日志收集系统

 

完整代码地址为: https://github.com/Pythonsite/logagent

etcd介绍

高可用的分布式key-value存储,可以用于配置共享和服务发现

类似的项目:zookeeper和consul

开发语言:go

接口:提供restful的接口,使用简单

实现算法:基于raft算法的强一致性,高可用的服务存储目录

etcd的应用场景:

  • 服务发现和服务注册
  • 配置中心(我们实现的日志收集客户端需要用到)
  • 分布式锁
  • master选举

官网对etcd的有一个非常简明的介绍:

Go实现海量日志收集系统

 

etcd搭建:
下载地址:https://github.com/coreos/etcd/releases/
根据自己的环境下载对应的版本然后启动起来就可以了

启动之后可以通过如下命令验证一下:

[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan 

zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]# 

context 介绍和使用

其实这个东西翻译过来就是上下文管理,那么context的作用是做什么,主要有如下两个作用:

  • 控制goroutine的超时
  • 保存上下文数据

通过下面一个简单的例子进行理解:

 

package main

import (
    "fmt"
    "time"
    "net/http"
    "context"
    "io/ioutil"
)


type Result struct{
    r *http.Response
    err error
}

func process(){
    ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
    defer cancel()
    tr := &http.Transport{}
    client := &http.Client{Transport:tr}
    c := make(chan Result,1)
    req,err := http.NewRequest("GET","http://www.google.com",nil)
    if err != nil{
        fmt.Println("http request failed,err:",err)
        return
    }
    // 如果请求成功了会将数据存入到管道中
    go func(){
        resp,err := client.Do(req)
        pack := Result{resp,err}
        c <- pack
    }()

    select{
    case <- ctx.Done():
        tr.CancelRequest(req)
        fmt.Println("timeout!")
    case res := <-c:
        defer res.r.Body.Close()
        out,_:= ioutil.ReadAll(res.r.Body)
        fmt.Printf("server response:%s",out)
    }
    return

}

func main() {
    process()
}

写一个通过context保存上下文,代码例子如:

package main

import (
    "github.com/Go-zh/net/context"
    "fmt"
)

func add(ctx context.Context,a,b int) int {
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%vn",traceId)
    return a+b
}

func calc(ctx context.Context,a, b int) int{
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%vn",traceId)
    //再将ctx传入到add中
    return add(ctx,a,b)
}

func main() {
    //将ctx传递到calc中
    ctx := context.WithValue(context.Background(),"trace_id","123456")
    calc(ctx,20,30)

}

结合etcd和context使用

关于通过go连接etcd的简单例子:(这里有个小问题需要注意就是etcd的启动方式,默认启动可能会连接不上,尤其你是在虚拟你安装,所以需要通过如下命令启动:
./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
)

package main

import (
    etcd_client "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
)

func main() {
    cli, err := etcd_client.New(etcd_client.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }

    fmt.Println("connect success")
    defer cli.Close()
}

下面一个例子是通过连接etcd,存值并取值

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }
    fmt.Println("connect succ")
    defer cli.Close()
    ctx,cancel := context.WithTimeout(context.Background(),time.Second)
    _,err = cli.Put(ctx,"logagent/conf/","sample_value")
    cancel()
    if err != nil{
        fmt.Println("put failed,err",err)
        return
    }
    ctx, cancel = context.WithTimeout(context.Background(),time.Second)
    resp,err := cli.Get(ctx,"logagent/conf/")
    cancel()
    if err != nil{
        fmt.Println("get failed,err:",err)
        return
    }
    for _,ev := range resp.Kvs{
        fmt.Printf("%s:%sn",ev.Key,ev.Value)
    }
}

关于context官网也有一个例子非常有用,用于控制开启的goroutine的退出,代码如下:

 

package main

import (
    "context"
    "fmt"
)

func main() {
    // gen generates integers in a separate goroutine and
    // sends them to the returned channel.
    // The callers of gen need to cancel the context once
    // they are done consuming generated integers not to leak
    // the internal goroutine started by gen.
    gen := func(ctx context.Context) <-chan int {
        dst := make(chan int)
        n := 1
        go func() {
            for {
                select {
                case <-ctx.Done():
                    return // returning not to leak the goroutine
                case dst <- n:
                    n++
                }
            }
        }()
        return dst
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // cancel when we are finished consuming integers

    for n := range gen(ctx) {
        fmt.Println(n)
        if n == 5 {
            break
        }
    }
}

关于官网文档中的WithDeadline演示的代码例子:

package main


import (
    "context"
    "fmt"
    "time"
)

func main() {
    d := time.Now().Add(50 * time.Millisecond)
    ctx, cancel := context.WithDeadline(context.Background(), d)

    // Even though ctx will be expired, it is good practice to call its
    // cancelation function in any case. Failure to do so may keep the
    // context and its parent alive longer than necessary.
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err())
    }

}

通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil {
        fmt.Println("connect failed,err:",err)
        return
    }
    defer cli.Close()
    // 这里会阻塞
    rch := cli.Watch(context.Background(),"logagent/conf/")
    for wresp := range rch{
        for _,ev := range wresp.Events{
            fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}

实现一个kafka的消费者代码的简单例子:

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "time"
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("Nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%sn",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            for msg := range pc.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
        }(pc)
    }
    time.Sleep(time.Hour)
    consumer.Close()

}

但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "sync"
)

var (
    wg sync.WaitGroup
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%sn",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            wg.Add(1)
            for msg := range partitionConsumer.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
            wg.Done()
        }(pc)
    }

    //time.Sleep(time.Hour)
    wg.Wait()
    consumer.Close()

}

将客户端需要收集的日志信息放到etcd中

关于etcd处理的代码为:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "github.com/astaxie/beego/logs"
    "context"
    "fmt"
)

var Client *clientv3.Client
var logConfChan chan string


// 初始化etcd
func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

    var keys []string
    for _,ip := range ipArrays{
        //keyfmt = /logagent/%s/log_config
        keys = Append(keys,fmt.Sprintf(keyfmt,ip))
    }

    logConfChan = make(chan string,10)
    logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)

    Client,err = clientv3.New(clientv3.Config{
        Endpoints:addr,
        DialTimeout: timeout,
    })
    if err != nil{
        logs.Error("connect failed,err:%v",err)
        return
    }
    logs.Debug("init etcd success")
    waitGroup.Add(1)
    for _, key := range keys{
        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
        // 从etcd中获取要收集日志的信息
        resp,err := Client.Get(ctx,key)
        cancel()
        if err != nil {
            logs.Warn("get key %s failed,err:%v",key,err)
            continue
        }

        for _, ev := range resp.Kvs{
            logs.Debug("%q : %qn",  ev.Key, ev.Value)
            logConfChan <- string(ev.Value)
        }
    }
    go WatchEtcd(keys)
    return
}

func WatchEtcd(keys []string){
    // 这里用于检测当需要收集的日志信息更改时及时更新
    var watchChans []clientv3.WatchChan
    for _,key := range keys{
        rch := Client.Watch(context.Background(),key)
        watchChans = append(watchChans,rch)
    }

    for {
        for _,watchC := range watchChans{
            select{
            case wresp := <-watchC:
                for _,ev:= range wresp.Events{
                    logs.Debug("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
                    logConfChan <- string(ev.Kv.Value)
                }
            default:

            }
        }
        time.Sleep(time.Second)
    }
    waitGroup.Done()
}

func GetLogConf()chan string{
    return logConfChan
}

同样的这里增加对了限速的处理,毕竟日志收集程序不能影响了当前业务的性能,所以增加了limit.go用于限制速度:

package main

import (
    "time"
    "sync/atomic"
    "github.com/astaxie/beego/logs"
)

type SecondLimit struct {
    unixSecond int64
    curCount int32
    limit int32
}

func NewSecondLimit(limit int32) *SecondLimit {
    secLimit := &SecondLimit{
        unixSecond:time.Now().Unix(),
        curCount:0,
        limit:limit,
    }
    return secLimit
}

func (s *SecondLimit) Add(count int) {
    sec := time.Now().Unix()
    if sec == s.unixSecond {
        atomic.AddInt32(&s.curCount,int32(count))
        return
    }
    atomic.StoreInt64(&s.unixSecond,sec)
    atomic.StoreInt32(&s.curCount, int32(count))
}

func (s *SecondLimit) Wait()bool {
    for {
        sec := time.Now().Unix()
        if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {
            time.Sleep(time.Microsecond)
            logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)
            continue
        }

        if sec != atomic.LoadInt64(&s.unixSecond) {
            atomic.StoreInt64(&s.unixSecond,sec)
            atomic.StoreInt32(&s.curCount,0)
        }
        logs.Debug("limit is exited")
        return false
    }
}

小结

这次基本实现了日志收集的前半段的处理,后面将把日志扔到es中,并最终在页面上呈现

 

作者: 红烧不是清蒸
链接: http://www.cnblogs.com/zhaof/p/8910761.html



Tags:Go语言   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
zip 是一种常见的归档格式,本文讲解 Go 如何操作 zip。首先看看 zip 文件是如何工作的。以一个小文件为例:(类 Unix 系统下)$ cat hello.textHello!执行 zip 命令进行归档:$ zip...【详细内容】
2021-12-17  Tags: Go语言  点击:(12)  评论:(0)  加入收藏
统一规范篇合理规划目录本篇主要描述了公司内部同事都必须遵守的一些开发规矩,如统一开发空间,既使用统一的开发工具来保证代码最后的格式的统一,开发中对文件和代码长度的控制...【详细内容】
2021-05-18  Tags: Go语言  点击:(232)  评论:(0)  加入收藏
闭包概述 闭包不是Go语言独有的概念,在很多编程语言中都有闭包 闭包就是解决局部变量不能被外部访问的一种解决方案 是把函数当作返回值的一种应用 代码演示总体思想:在函数...【详细内容】
2021-05-14  Tags: Go语言  点击:(223)  评论:(0)  加入收藏
一时想不开,想了解一下Go语言,于是安装了并体验了一下。下载1. 进入golang.google.cn 点击Download Go 2.选择对应的操作系统,点击后开始下载。 安装1. windows下执行傻瓜式安...【详细内容】
2021-05-12  Tags: Go语言  点击:(236)  评论:(0)  加入收藏
再次整理了一下这个日志收集系统的框,如下图 这次要实现的代码的整体逻辑为: 完整代码地址为: https://github.com/pythonsite/logagentetcd介绍高可用的分布式key-value存储,...【详细内容】
2021-01-14  Tags: Go语言  点击:(135)  评论:(0)  加入收藏
生命不止,继续 Go go go !!!之前关于golang操作数据库的博客:Go实战&ndash;go语言操作MySQL数据库(go-sql-driver/mysql)Go实战&ndash;go语言操作sqlite数据库(The way to go)...【详细内容】
2021-01-05  Tags: Go语言  点击:(196)  评论:(0)  加入收藏
欢迎访问我的GitHubhttps://github.com/zq2599/blog_demos内容:所有原创文章分类和汇总,及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;关于《gRPC学习》系列《gRPC学习》...【详细内容】
2020-12-14  Tags: Go语言  点击:(124)  评论:(0)  加入收藏
概述Go语言作为一门开源的编程语言,以简洁、快速、安全著称。尤其在高性能的分布式服务器领域得到广泛应用。技多不压身,在学习过程中记录下来,以备后续参考,希望对有同样需求的...【详细内容】
2020-11-12  Tags: Go语言  点击:(65)  评论:(0)  加入收藏
go-fly基于GO语言实现的web客服即时通讯与客服管理系统。非常适合给自己的网站增加在线客服功能,代码简单也适合学习。Github地址:https://github.com/taoshihan1991/go-fly1....【详细内容】
2020-09-25  Tags: Go语言  点击:(154)  评论:(0)  加入收藏
Illustration created for “A Journey With Go”, made from the original Go Gopher, created by Renee Fℹ️ 这篇文章基于 Go 1.13。在内存从分配到回收的生命周期中,内存...【详细内容】
2020-09-24  Tags: Go语言  点击:(65)  评论:(0)  加入收藏
▌简易百科推荐
zip 是一种常见的归档格式,本文讲解 Go 如何操作 zip。首先看看 zip 文件是如何工作的。以一个小文件为例:(类 Unix 系统下)$ cat hello.textHello!执行 zip 命令进行归档:$ zip...【详细内容】
2021-12-17  Go语言中文网    Tags:Go语言   点击:(12)  评论:(0)  加入收藏
大家好,我是 polarisxu。前段时间,Russ Cox 明确了泛型相关的事情,原计划在标准库中加入泛型相关的包,改放到 golang.org/x/exp 下。目前,Go 泛型的主要设计者 ianlancetaylor 完...【详细内容】
2021-11-30  Go语言中文网    Tags:slices 包   点击:(24)  评论:(0)  加入收藏
前言最近因为项目需要写了一段时间的 Go ,相对于 Java 来说语法简单同时又有着一些 Python 之类的语法糖,让人大呼”真香“。 但现阶段相对来说还是 Python 写的多一些,偶尔还...【详细内容】
2021-11-25  crossoverJie    Tags:Go   点击:(29)  评论:(0)  加入收藏
go-micro是基于 Go 语言用于开发的微服务的 RPC 框架,主要功能如下:服务发现,负载均衡 ,消息编码,请求/响应,Async Messaging,可插拔接口,最后这个功能牛p安装步骤安装proto...【详细内容】
2021-09-06    石老师小跟班  Tags:go-micro   点击:(196)  评论:(0)  加入收藏
GoLand 2021.2 EAP 5 现已发布。用户可以从工具箱应用程序中获得 EAP 构建,也可以从官方网站手动下载。并且从此 EAP 开始,只有拥有有效的 JetBrains 帐户才能加入该计划。手...【详细内容】
2021-06-29  IT实战联盟  今日头条  Tags:GoLand   点击:(185)  评论:(0)  加入收藏
作者:HDT3213今天给大家带来的开源项目是 Godis:一个用 Go 语言实现的 Redis 服务器。支持: 5 种数据结构(string、list、hash、set、sortedset) 自动过期(TTL) 发布订阅、地理位...【详细内容】
2021-06-18  HelloGitHub  今日头条  Tags:Go   点击:(125)  评论:(0)  加入收藏
统一规范篇合理规划目录本篇主要描述了公司内部同事都必须遵守的一些开发规矩,如统一开发空间,既使用统一的开发工具来保证代码最后的格式的统一,开发中对文件和代码长度的控制...【详细内容】
2021-05-18  1024课堂    Tags:Go语言   点击:(232)  评论:(0)  加入收藏
闭包概述 闭包不是Go语言独有的概念,在很多编程语言中都有闭包 闭包就是解决局部变量不能被外部访问的一种解决方案 是把函数当作返回值的一种应用 代码演示总体思想:在函数...【详细内容】
2021-05-14  HelloGo  今日头条  Tags:Go语言   点击:(223)  评论:(0)  加入收藏
一时想不开,想了解一下Go语言,于是安装了并体验了一下。下载1. 进入golang.google.cn 点击Download Go 2.选择对应的操作系统,点击后开始下载。 安装1. windows下执行傻瓜式安...【详细内容】
2021-05-12  程序员fearlazy  fearlazy  Tags:Go语言   点击:(236)  评论:(0)  加入收藏
1.简介channel是Go语言的一大特性,基于channel有很多值得探讨的问题,如 channel为什么是并发安全的? 同步通道和异步通道有啥区别? 通道为何会阻塞协程? 使用通道导致阻塞的协程...【详细内容】
2021-05-10  程序员麻辣烫  今日头条  Tags:Go通道   点击:(272)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条