Elasticsearch设计了Ingest特性,在数据写入前做一些前置的过滤、转换等简单的数据处理逻辑,能支持Logstash的大多数常用场景,在5.0版本中发布。为数据的采集和处理提供了一种新的方式,可以在许多场景下不使用Logstash,简化了系统架构。
完成预处理的有个独立的Node角色 Ingest Node,默认集群每个节点都具有ingest的作用,也可以独立节点。其开启方式为:在 elasticsearch.yml 中定义(默认开启):
node.ingest:false
Ingest Node作为ES索引文档的一部分,只处理单个数据,局限在上下文中,不能与外部打交道。
Ingest 节点接收到数据之后,根据请求参数中指定的管道流 id,找到对应的已注册管道流,对数据进行处理,然后将处理过后的数据,然后将文档传回 index(索引)或者 bulk APIs。
确定ingest节点后,要定义pipeline,pipeline中指定具体的逻辑。
PUT _ingest/pipeline/log_pipeline
{
"description":"log pipeline",
"processors":[
{
"grok":{
"field":"message",
"patterns":[
"%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:project}\] \[%{DATA:thread}\] %{GREEDYDATA:message}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"yyyy-MM-dd HH:mm:ss,SSS"
],
"timezone": "Asia/ShanghAI"
}
}
]
}
Ingest API共分为4种操作,分别对应:
Ingest Pipeline的使用:
Ingest Pipeline的Processor,相当于 Logstash 的 filter 插件。事实上其主要处理器来自 Logstash 的 filter 代码。目前最重要的几个处理器分别是:
还有其他很多处理器,Append,rename,remove,attachement,geoip 和 user-agent,有的还需要单独安装 。