go语言实现日志收集系统图文详解

互联网 19-11-27

整理了一下这个日志收集系统的框,如下图

这次要实现的代码的整体逻辑为:

完整代码地址为: https://github.com/pythonsite/logagent

etcd介绍

高可用的分布式key-value存储,可以用于配置共享和服务发现

类似的项目:zookeeper和consul

开发语言:go

接口:提供restful的接口,使用简单

实现算法:基于raft算法的强一致性,高可用的服务存储目录

etcd的应用场景:

1、服务发现和服务注册

2、配置中心(我们实现的日志收集客户端需要用到)

3、分布式锁

4、master选举

官网对etcd的有一个非常简明的介绍:

etcd搭建:下载地址:https://github.com/coreos/etcd/releases/根据自己的环境下载对应的版本然后启动起来就可以了

启动之后可以通过如下命令验证一下:

[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan   zhaofan [root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name zhaofan [root@localhost etcd-v3.2.18-linux-amd64]#

context 介绍和使用

其实这个东西翻译过来就是上下文管理,那么context的作用是做什么,主要有如下两个作用:

1、控制goroutine的超时

2、保存上下文数据

通过下面一个简单的例子进行理解:

package main  import (     "fmt"     "time"     "net/http"     "context"     "io/ioutil" )   type Result struct{     r *http.Response     err error }  func process(){     ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)     defer cancel()     tr := &http.Transport{}     client := &http.Client{Transport:tr}     c := make(chan Result,1)     req,err := http.NewRequest("GET","http://www.google.com",nil)     if err != nil{         fmt.Println("http request failed,err:",err)         return     }     // 如果请求成功了会将数据存入到管道中     go func(){         resp,err := client.Do(req)         pack := Result{resp,err}         c <- pack     }()      select{     case <- ctx.Done():         tr.CancelRequest(req)         fmt.Println("timeout!")     case res := <-c:         defer res.r.Body.Close()         out,_:= ioutil.ReadAll(res.r.Body)         fmt.Printf("server response:%s",out)     }     return  }  func main() {     process() }

写一个通过context保存上下文,代码例子如:

package main  import (     "github.com/Go-zh/net/context"     "fmt" )  func add(ctx context.Context,a,b int) int {     traceId := ctx.Value("trace_id").(string)     fmt.Printf("trace_id:%v\n",traceId)     return a+b }  func calc(ctx context.Context,a, b int) int{     traceId := ctx.Value("trace_id").(string)     fmt.Printf("trace_id:%v\n",traceId)     //再将ctx传入到add中     return add(ctx,a,b) }  func main() {     //将ctx传递到calc中     ctx := context.WithValue(context.Background(),"trace_id","123456")     calc(ctx,20,30)  }

结合etcd和context使用

关于通过go连接etcd的简单例子:(这里有个小问题需要注意就是etcd的启动方式,默认启动可能会连接不上,尤其你是在虚拟你安装,所以需要通过如下命令启动:./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381)

package main  import (     etcd_client "github.com/coreos/etcd/clientv3"     "time"     "fmt" )  func main() {     cli, err := etcd_client.New(etcd_client.Config{         Endpoints:[]string{"192.168.0.118:2371"},         DialTimeout:5*time.Second,     })     if err != nil{         fmt.Println("connect failed,err:",err)         return     }      fmt.Println("connect success")     defer cli.Close() }

下面一个例子是通过连接etcd,存值并取值

package main  import (     "github.com/coreos/etcd/clientv3"     "time"     "fmt"     "context" )  func main() {     cli,err := clientv3.New(clientv3.Config{         Endpoints:[]string{"192.168.0.118:2371"},         DialTimeout:5*time.Second,     })     if err != nil{         fmt.Println("connect failed,err:",err)         return     }     fmt.Println("connect succ")     defer cli.Close()     ctx,cancel := context.WithTimeout(context.Background(),time.Second)     _,err = cli.Put(ctx,"logagent/conf/","sample_value")     cancel()     if err != nil{         fmt.Println("put failed,err",err)         return     }     ctx, cancel = context.WithTimeout(context.Background(),time.Second)     resp,err := cli.Get(ctx,"logagent/conf/")     cancel()     if err != nil{         fmt.Println("get failed,err:",err)         return     }     for _,ev := range resp.Kvs{         fmt.Printf("%s:%s\n",ev.Key,ev.Value)     } }

关于context官网也有一个例子非常有用,用于控制开启的goroutine的退出,代码如下:

package main  import (     "context"     "fmt" )  func main() {     // gen generates integers in a separate goroutine and     // sends them to the returned channel.     // The callers of gen need to cancel the context once     // they are done consuming generated integers not to leak     // the internal goroutine started by gen.     gen := func(ctx context.Context) <-chan int {         dst := make(chan int)         n := 1         go func() {             for {                 select {                 case <-ctx.Done():                     return // returning not to leak the goroutine                 case dst <- n:                     n++                 }             }         }()         return dst     }      ctx, cancel := context.WithCancel(context.Background())     defer cancel() // cancel when we are finished consuming integers      for n := range gen(ctx) {         fmt.Println(n)         if n == 5 {             break         }     } }

关于官网文档中的WithDeadline演示的代码例子:

package main   import (     "context"     "fmt"     "time" )  func main() {     d := time.Now().Add(50 * time.Millisecond)     ctx, cancel := context.WithDeadline(context.Background(), d)      // Even though ctx will be expired, it is good practice to call its     // cancelation function in any case. Failure to do so may keep the     // context and its parent alive longer than necessary.     defer cancel()      select {     case <-time.After(1 * time.Second):         fmt.Println("overslept")     case <-ctx.Done():         fmt.Println(ctx.Err())     }  }

通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:

package main  import (     "github.com/coreos/etcd/clientv3"     "time"     "fmt"     "context" )  func main() {     cli,err := clientv3.New(clientv3.Config{         Endpoints:[]string{"192.168.0.118:2371"},         DialTimeout:5*time.Second,     })     if err != nil {         fmt.Println("connect failed,err:",err)         return     }     defer cli.Close()     // 这里会阻塞     rch := cli.Watch(context.Background(),"logagent/conf/")     for wresp := range rch{         for _,ev := range wresp.Events{             fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)         }     } }

实现一个kafka的消费者代码的简单例子:

package main  import (     "github.com/Shopify/sarama"     "strings"     "fmt"     "time" )  func main() {     consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)     if err != nil{         fmt.Println("failed to start consumer:",err)         return     }     partitionList,err := consumer.Partitions("nginx_log")     if err != nil {         fmt.Println("Failed to get the list of partitions:",err)         return     }     fmt.Println(partitionList)     for partition := range partitionList{         pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)         if err != nil {             fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)             return         }         defer pc.AsyncClose()         go func(partitionConsumer sarama.PartitionConsumer){             for msg := range pc.Messages(){                 fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))             }         }(pc)     }     time.Sleep(time.Hour)     consumer.Close()  }

但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现

package main  import (     "github.com/Shopify/sarama"     "strings"     "fmt"     "sync" )  var (     wg sync.WaitGroup )  func main() {     consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)     if err != nil{         fmt.Println("failed to start consumer:",err)         return     }     partitionList,err := consumer.Partitions("nginx_log")     if err != nil {         fmt.Println("Failed to get the list of partitions:",err)         return     }     fmt.Println(partitionList)     for partition := range partitionList{         pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)         if err != nil {             fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)             return         }         defer pc.AsyncClose()         go func(partitionConsumer sarama.PartitionConsumer){             wg.Add(1)             for msg := range partitionConsumer.Messages(){                 fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))             }             wg.Done()         }(pc)     }      //time.Sleep(time.Hour)     wg.Wait()     consumer.Close()  }

将客户端需要收集的日志信息放到etcd中

关于etcd处理的代码为:

package main  import (     "github.com/coreos/etcd/clientv3"     "time"     "github.com/astaxie/beego/logs"     "context"     "fmt" )  var Client *clientv3.Client var logConfChan chan string   // 初始化etcd func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){      var keys []string     for _,ip := range ipArrays{         //keyfmt = /logagent/%s/log_config         keys = append(keys,fmt.Sprintf(keyfmt,ip))     }      logConfChan = make(chan string,10)     logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)      Client,err = clientv3.New(clientv3.Config{         Endpoints:addr,         DialTimeout: timeout,     })     if err != nil{         logs.Error("connect failed,err:%v",err)         return     }     logs.Debug("init etcd success")     waitGroup.Add(1)     for _, key := range keys{         ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)         // 从etcd中获取要收集日志的信息         resp,err := Client.Get(ctx,key)         cancel()         if err != nil {             logs.Warn("get key %s failed,err:%v",key,err)             continue         }          for _, ev := range resp.Kvs{             logs.Debug("%q : %q\n",  ev.Key, ev.Value)             logConfChan <- string(ev.Value)         }     }     go WatchEtcd(keys)     return }  func WatchEtcd(keys []string){     // 这里用于检测当需要收集的日志信息更改时及时更新     var watchChans []clientv3.WatchChan     for _,key := range keys{         rch := Client.Watch(context.Background(),key)         watchChans = append(watchChans,rch)     }      for {         for _,watchC := range watchChans{             select{             case wresp := <-watchC:                 for _,ev:= range wresp.Events{                     logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)                     logConfChan <- string(ev.Kv.Value)                 }             default:              }         }         time.Sleep(time.Second)     }     waitGroup.Done() }  func GetLogConf()chan string{     return logConfChan }

同样的这里增加对了限速的处理,毕竟日志收集程序不能影响了当前业务的性能,所以增加了limit.go用于限制速度:

package main  import (     "time"     "sync/atomic"     "github.com/astaxie/beego/logs" )  type SecondLimit struct {     unixSecond int64     curCount int32     limit int32 }  func NewSecondLimit(limit int32) *SecondLimit {     secLimit := &SecondLimit{         unixSecond:time.Now().Unix(),         curCount:0,         limit:limit,     }     return secLimit }  func (s *SecondLimit) Add(count int) {     sec := time.Now().Unix()     if sec == s.unixSecond {         atomic.AddInt32(&s.curCount,int32(count))         return     }     atomic.StoreInt64(&s.unixSecond,sec)     atomic.StoreInt32(&s.curCount, int32(count)) }  func (s *SecondLimit) Wait()bool {     for {         sec := time.Now().Unix()         if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {             time.Sleep(time.Microsecond)             logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)             continue         }          if sec != atomic.LoadInt64(&s.unixSecond) {             atomic.StoreInt64(&s.unixSecond,sec)             atomic.StoreInt32(&s.curCount,0)         }         logs.Debug("limit is exited")         return false     } }

推荐:go语言教程

以上就是go语言实现日志收集系统图文详解的详细内容,更多内容请关注技术你好其它相关文章!

来源链接:
免责声明:
1.资讯内容不构成投资建议,投资者应独立决策并自行承担风险
2.本文版权归属原作所有,仅代表作者本人观点,不代表本站的观点或立场
标签: go语言
上一篇:php获取远程图片并下载保存到本地的方法分析 下一篇:go语言time包的一些使用方法

相关资讯