基于elasticsearch的日志分析系统搭建-如何对filebeat二次开发
发布于 1 年前 作者 liangdas 3008 次浏览 来自 分享

elasticsearch 方案介绍

最近公司希望能对SDK的日志进行处理和分析,因此开始研究日志分析系统,以前没有过这方面的经验就优先从网上找解决方案了。 目前日志分析解决方案还是非常丰富的,开源的解决方案也不少,比较经典的就是filebeat+logstash+elasticsearch+kibana了

  1. logstash 日志采集和日志格式清洗(提取日志中的关键字段,规范化日志格式)
  2. elasticsearch 日志存储,搜索(规范过后的json日志格式)
  3. kibana 日志查询可视化,可生成报表等等

示例: QB20170701-1.png

QB20170701-2.png

filebeat二次开发

本篇文章的重点不是日志系统的搭建,filebeat+logstash+elasticsearch+kibana 的搭建方案网上文章非常多,可以自行百度,本篇文章主要讲解如何对filebeat进行二次开发,省去logstash这个中间环节

logstash 和filebeat 是什么关系?

  1. filebeat (golang) 着重日志采集
  2. logstash (java) 日志采集+日志清洗整理(配置方式对运维人员友好)—性能较低,耗内存

因此目前主流的搭建方式是filebeat做日志采集然–>logstash进行日志清洗整理–>elasticsearch。 filebeat是基于golang开发的性能更高,如果在filebeat就进行日志清洗的话在性能满足的情况下就可以省去logstash这个中间环节了。那么如何用filebeat进行日志清洗呢?

其实filebeat本身是就有部分日志清洗整理的功能,例如多行日志合并功能multiline(这个功能跟logstash一样)。因此filebeat本身就支持日志清洗功能,只是没有logstash方便而已。如果你是懂golang的开发或运维人员就完全可以拷贝一份filebeat源码进行二次开发。

二次开发步骤

1. 拷贝filebeat源码

建议直接clone https://github.com/elastic/beats工程,filebeat是其中的一个子功能,且需要依赖libbeat子工程

2. 编译

  1. 将beats工程设置为gopath
  2. 在beats更目录编译

go install github.com/elastic/beats/filebeat/

3. 二次开发原理

filebeat按行读取日志文件的内容,然后扔给管道处理器处理,因此我们只需要实现自己的管道就可以对日志进行各种操作了

管道定义

/github.com/elastic/beats/filebeat/harvester/reader/reader.go type Reader interface { Next() (Message, error) }

在什么地方添加自定义的管道?

/github.com/elastic/beats/filebeat/prospector/log/harvester.go

func (h *Harvester) newLogFileReader() (reader.Reader, error) {
  var r reader.Reader
  var err error
  // TODO: NewLineReader uses additional buffering to deal with encoding and testing
  //       for new lines in input stream. Simple 8-bit based encodings, or plain
  //       don't require 'complicated' logic.
  h.log, err = NewLog(h.source, h.config.LogConfig)
  if err != nil {
	  return nil, err
  }

  r, err = reader.NewEncode(h.log, h.encoding, h.config.BufferSize)
  if err != nil {
	  return nil, err
  }

  if h.config.JSON != nil {
	  r = reader.NewJSON(r, h.config.JSON)
  }
  r = reader.NewStripNewline(r)
  r = reader.NewCallStack(r,"\n", h.config.MaxBytes)   //我自定义的管道
  r = reader.NewMyMultiline(r,"\n", h.config.MaxBytes) //我自定义的管道
  if h.config.Multiline != nil {
	  r, err = reader.NewMultiline(r, "\n", h.config.MaxBytes, h.config.Multiline)
	  if err != nil {
		  return nil, err
	  }
  }

  r = reader.NewArranged(r)  //我自定义的管道

  return reader.NewLimit(r, h.config.MaxBytes), nil

}

可以看到以上的代码里面添加了多个管道,每一个管道初始化时都会传入上一个管道,这样日志流就可以一个一个管道的往下处理了

管道内如何对日志进行处理

// Next returns the next line.
func (p *Arranged) Next() (Message, error) {
	message, err := p.reader.Next()  //调用上一个管道的Next()得到一条日志
	reglog := regexp.MustCompile(LOG)   //定义正则表达式
	names:=reglog.SubexpNames()          //定义正在表达式
	clientId:=p.Submatch(regclient,string(message.Content),1)
	if clientId!=""{
	   //正则表达式匹配到了clientId字段,将提取到的字段加入到 规范后的日志格式中
		message.AddFields(common.MapStr{"clientId":clientId})
	}
	uripath:=p.Submatch(reguripath,string(message.Content),0)
	if uripath!=""{
		message.AddFields(common.MapStr{"uripath":uripath})
	}
	USERID:=p.Submatch(reguserid,string(message.Content),1)
	if USERID!=""{
		message.AddFields(common.MapStr{"userId":USERID})
	}
	.....
	return message, err
}

filebeat最终输出的日志格式是json格式

{
  "[@timestamp](/user/timestamp)": "2017-06-26T12:07:33.000Z",
  "appId": "9999",
  "beat": {
	"hostname": "zenbutekiMacBook-Pro.local",
	"name": "zenbutekiMacBook-Pro.local",
	"version": "6.0.0-alpha3"
  },
  "clientId": "Android_3.97_weixinPay,tyGuest,tyAccount.yisdkpay.0-hall6.shenmasearch.tu",
  "level": "I",
  "message": "日志原生内容]}",
  "offset": 89567,
  "prospector": {
	"type": "log"
  },
  "source": "/Users/love/Documents/tysdk.test.log",
  "uripath": "/open/v4/user/getLoginType"
}

json中的clientId appId uripath 等字段都是管道从原生日志中提取到的

献上部分正则表达式

  USERNAME string=`[a-zA-Z0-9._-]+`
  USER string=USERNAME
  INT string=`(?:[+-]?(?:[0-9]+))`
  BASE10NUM string=`(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))`
  
  POSINT string=`\b(?:[1-9][0-9]*)\b`
  NONNEGINT string=`\b(?:[0-9]+)\b`
  WORD string=`\b\w+\b`
  NOTSPACE string=`\S+`
  SPACE string=`\s*`
  DATA string=`.*?`
  GREEDYDATA string=`.*`
  UUID string=`[A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}`

  // Networking
  MAC string=`(?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})`
  CISCOMAC string=`(?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})`
  WINDOWSMAC string=`(?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})`
  COMMONMAC string=`(?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})`
  IPV6 string=`((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?`
  IPV4 string=`(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))`
  IP string=fmt.Sprintf(`(?:%s|%s)`,IPV6,IPV4)
  HOSTNAME string=`\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)`
  HOST string=HOSTNAME
  IPORHOST string=fmt.Sprintf(`(?:%s|%s)`,HOSTNAME,IP)
  HOSTPORT string=fmt.Sprintf(`%s:%s`,IPORHOST,POSINT)

  // paths
  
  
  TTY string=`(?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))`
  WINPATH string=`(?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+`
  URIPROTO string=`[A-Za-z]+(\+[A-Za-z+]+)?`
  URIHOST string=fmt.Sprintf(`%s(?::%s)?`,IPORHOST,POSINT)
  // uripath comes loosely from RFC1738, but mostly from what Firefox
  // doesn't turn into %XX
  URIPATH string=`(?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+`
  //URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
  URIPARAM string=`\?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*`
  URIPATHPARAM string=fmt.Sprintf(`%s(?:%s)?`,URIPATH,URIPARAM)
  URI string=fmt.Sprintf(`%s://(?:%s(?::[^@]*)?@)?(?:%s)?(?:%s)?`,URIPROTO,USER,URIHOST,URIPATHPARAM)

  // Months: January, Feb, 3, 03, 12, December
  MONTH string=`\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b`
  MONTHNUM string=`(?:0?[1-9]|1[0-2])`
  MONTHNUM2 string=`(?:0[1-9]|1[0-2])`
  MONTHDAY string=`(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])`

  // Days: Monday, Tue, Thu, etc...
  DAY string=`(?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)`
  HOUR string=`(?:2[0123]|[01]?[0-9])`
  MINUTE string=`(?:[0-5][0-9])`
  // '60' is a leap second in most time standards and thus is valid.
  SECOND string=`(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)`
  TIME string=fmt.Sprintf(`%s:%s(?::%s)`,HOUR,MINUTE,SECOND)
  // Syslog Dates: Month Day HH:MM:SS
  SYSLOGTIMESTAMP string=fmt.Sprintf(`%s +%s %s`,MONTH,MONTHDAY,TIME)

使用这些正则表达式之前请自行进行测试

import (
	"testing"
	"regexp"
	"fmt"
)

func TestSYSLOGTIMESTAMP(t *testing.T)  {
	reg := regexp.MustCompile(fmt.Sprintf("(?P<month>%s) +(?P<day>%s) (?P<time>%s)",MONTH,MONTHDAY,TIME))
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("Jun 26 14:35:07"))
	fmt.Println("")
}
func TestTIME(t *testing.T)  {
	reg := regexp.MustCompile(fmt.Sprintf("(?P<time>%s)",TIME))
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("14:35:07"))
	fmt.Println("")
	fmt.Printf("%q",reg.FindAllSubmatch([]byte("Jun 26 14:35:07"),-1))
	fmt.Println("")
	fmt.Printf("%q", reg.FindAllString("14:35:07", -1))
	fmt.Println("")
}
func TestMONTH(t *testing.T)  {
	if matched, _ := regexp.Match(fmt.Sprintf(`%s`,MONTH), []byte("Jun")); matched {
		fmt.Println("匹配成功")
	}else{
		fmt.Println("匹配失败",fmt.Sprintf(`%s`,MONTH))
	}
}
func TestMONTHDAY(t *testing.T)  {
	if matched, _ := regexp.Match(fmt.Sprintf(`%s`,MONTHDAY), []byte("26")); matched {
		fmt.Println("匹配成功")
	}else{
		fmt.Println("匹配失败",fmt.Sprintf(`%s`,MONTHDAY))
	}
}
func TestURI(t *testing.T)  {
	reg := regexp.MustCompile(URIPATH)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("Jun 26 14:33:58 HY-10-3-0-14 tysdk00095: 06-26 14:33:58.231427 [-] I 833088496 | webget httpurl return= {\"code\":200} httpurl= http://10.3.13.16/push_register?FFS=22"))
	fmt.Println("")
}
func TestURIPATH(t *testing.T)  {
	reg := regexp.MustCompile(URIPATH)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("Jun 26 14:33:59 HY-10-3-0-14 tysdk00095: 06-26 14:33:58.687666 [-] I 832980544 | HTTPREQUEST /open/v3/sns/getFriendsAndRequests "))
	fmt.Println("")
}

func TestBASE10NUM(t *testing.T)  {
	reg := regexp.MustCompile(BASE10NUM)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("fs3.46  s"))
	fmt.Println("")
}
func TestCLIENTID(t *testing.T)  {
	reg := regexp.MustCompile(CLIENTID)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("clientId= Android_3.97_huawei.huawei,yisdkpay.0-hall3.huawei.tu"))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("clientId= IOS_3.91_tuyoo.appStore,weixinPay,alipay.0-hall6.appStore.newkaixin"))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("IOS_3.98_weixin,tuyoo.appStore,weixinPay,alipaywap.0-hall6.appStore.tyddz"))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("clientId= Android_4.55_tyGuest,weixinPay,tyAccount.yisdkpay.0-hall6.baidusearch.tu"))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`clientId= ["Android_4.55_tyGuest,weixinPay,tyAccount.yisdkpay.0-hall6.baidusearch.tu"]"`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`clientId= [\"Android_3.97_huawei.huawei,yisdkpay.0-hall3.huawei.tu\"]`))

	fmt.Println("")
}

func TestLOG(t *testing.T)  {
	reg := regexp.MustCompile(LOG)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("Jun 26 14:33:58 HY-10-3-0-14 tysdk00095: 06-26 14:33:57.482275 [-] I 741732768 | tcp ipport= [\"123.56.178.156\", 8137] userId= 260456839 clientId= Android_4.55_tyGuest,weixinPay,tyAccount.yisdkpay.0-hall6.baidusearch.tu"))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch("Jun 26 12:07:32 HY-10-3-0-14 tysdk00095: 06-26 12:07:32.005709 [-] I 726510856 | HTTPRESPONSE /open/v3/user/processSnsId"))
	fmt.Println("")

	//tm2, _ := time.Parse("2006 Jan 02 15:04:05", "2017 Jun 26 14:33:58")
	//fmt.Println(tm2.Unix())
	//formate:="2006-01-02 15:04:05"
	//fmt.Println(tm2.Format(formate))
}

func TestUSERID(t *testing.T)  {
	reg := regexp.MustCompile(USERID)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`user_id":["2799451883"]"`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`user_id':['2799451883']"`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"userId": 250349189`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"userId":206061414,`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`'userId':206061414,`))
	fmt.Println("")
}

func TestAPPID(t *testing.T)  {
	reg := regexp.MustCompile(APPID)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"appId": 9999,`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`'appId': '10063'`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"appId":["9999"]`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"appId":9999,`))
	fmt.Println("")
}
func TestGAMEID(t *testing.T)  {
	reg := regexp.MustCompile(GAMEID)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"gameId": 9999,`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`'gameId': '10063'`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"gameId":["9999"]`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"gameId":9999,`))
	fmt.Println("")
}
func TestIMEI(t *testing.T)  {
	reg := regexp.MustCompile(IMEI)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"imei":"862455037856567"`))
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`"imei": "862455037856567"`))
	fmt.Println("")
}
func TestCALLSTACK(t *testing.T)  {
	reg := regexp.MustCompile(CALLSTACK)
	fmt.Printf("%q",reg.SubexpNames())
	fmt.Println("")
	fmt.Printf("%q",reg.FindStringSubmatch(`Jun 26 00:07:51 HY-10-3-0-14 tysdk00095: 06-26 00:07:50.819327 [-] E 444796392 | ************************************************************`))
	fmt.Println("")
}

完结

filebeat是使用golang开发的,对应一般的日志而言已经完全满足性能上的需求了,学会了对filebeat的二次开发可以减小系统复杂度

5 回复

结贴

有用,代码有吗?不太懂go语言

@ouaisun 既然不懂golang 要代码也没有用,如果懂golang,看文档也能自己改,所以要不要代码不是重点

@liangdas 只是想拿到代码去理解下,看看能不能编译运行出来,主要是Arranged这个结构体的字段

@ouaisun 代码都在github上去clone下来就行

回到顶部