logstash 版本(6.2)


配置中的 codec 到底是什么

codec 属性用来配置编解码插件,它负责对输入输出的内容表现形式进行处理,插件本质上是个流式过滤器。 以下是常用的插件:

  • json

读取JSON格式的内容,在JSON数组中为每个元素创建一个事件

  • line

从行读取文本数据

  • plain

读取原始内容,不会进行转化

  • rubydebug

将输出打印到 logstash 的事件中

所有的插件请查看:https://www.elastic.co/guide/en/logstash/current/codec-plugins.html#codec-plugins

使用配置文件

  • conf 文件 定义Logstash处理管道的管道配置文件

  • pipelines.yml 文件 pipeline 文件用于在单个实例中配置多个管道处理数据,是 logstash 的设置文件,每个管道拥有自己独立的线程,输入从管道中接受输入事件并按固定大小单位(batch size)存放到管道序列中(默认在内存,可以持久化到硬盘),过滤,输出从管道中消费序列,可以通过配置 batch size管道工作线程 提高性能。具体参考: Tuning and Profiling Logstash Performance

  • 过滤增强插件

  • Grok 过滤插件 可以将非结构的数据转换成结构化的数据便于查询,Grok 通过组合文本匹配模式(text pattern)来匹配你的内容的方式工作。

  • Geoip 增强过滤插件 通过你指定的ip地址列,查询额外的地址信息:经纬度,国家,区域,编码等。

  • mutate 过滤转换插件 对事件字段执行一般转换。您可以重命名,删除,替换和修改事件中的字段

  • 更多插件请查看:Output Plugins

启动服务

  • 启动参数
    • --node.name NAME 指定 logstash 实例的名称,默认为当前的主机名。

    • -f, --path.config CONFIG_PATH 从特定文件或目录加载 Logstash 配置。如果给出一个目录,那么该目录中的所有文件将按字典顺序连接,然后解析为单个配置文件。 您可以指定通配符(globs),并且任何匹配的文件将按照上述顺序加载。例如,您可以使用通配符功能按名称加载特定文件:

      bin/logstash –debug -f ‘/tmp/{one,two,three}’

      使用此命令,Logstash会连接三个配置文件/ tmp / one,/ tmp / two和/ tmp / three,并将它们解析为单个配置。

    • -e, --config.string CONFIG_STRING 使用给定的字符串作为配置数据。与配置文件相同的语法。

    • -t, --config.test_and_exit 修改配置文件不重启服务:

      bin/logstash -f first-pipeline.conf -t

    • -r, --config.reload.automatic 测试,转换配置文件,并打印错误:

      bin/logstash -f first-pipeline.conf -r

使用 docker 获取镜像

发布的镜像存放在 www.docker.elastic.co,源码放在GitHub.

默认版本安装 X-PACK oss 版本不安装 X-PACK。

配置

容器的管道配置文件目录:/usr/share/logstash/pipeline/ 配置文件目录:/usr/share/logstash/config/

环境变量

PIPELINE_WORKERS 对应 pipeline.workers LOG_LEVEL 对应 log.level XPACK_MONITORING_ENABLED 对应 XPACK_MONITORING_ENABLED

Kafka input plugin 插件配置

元数据字段

  • [@metadata][kafka][topic]:消息来源处的原始 Kafka topic。
  • [@metadata][kafka][consumer_group]:消费者组
  • [@metadata][kafka][partition]:此讯息的分区信息。
  • [@metadata][kafka][offset]: 此消息的原始记录偏移量。
  • [@metadata][kafka][key]:Record key, 如果有的话。
  • [@metadata][kafka][timestamp]:kafka broker 收到此消息时的时间戳。

如何使用:

input {
  kafka {    
    ...
    decorate_events => true
  }
}

filter {    
  mutate {
    add_field => {"[@metadata][index]" => "%{[kafka][topic]}"}
  }
}

output {
  elasticsearch {
    index => "kafka-%{[@metadata][index]}-%{+YYYY.MM.dd}"
    ...
  }
}

常用的配置选项

  • auto_commit_interval_ms:消费者的 offset 提交 kafka 时间频率。 单位:毫秒 默认值:5000

  • bootstrap_servers:用于建立群集初始连接的 Kafka 实例的 URL 列表。 格式如下:"host1:port1,host2:port2" 默认值:"localhost:9092"

  • client_id:发出请求时传递给服务器的 id 字符串。这样做的目的是通过允许包含逻辑应用程序名称,能够跟踪 ip / port 之外的请求源。 默认值:"logstash"

  • decorate_events:是否添加 kafka 的元数据。 默认值:false

  • group_id:此用户所属组的标识符。消费者组是由多个处理器组成的单个逻辑用户。主题中的消息将分发给具有相同 group_id 的所有 Logstash 实例 默认值:"logstash"

  • topics:要订阅的主题列表。 -默认值:["logstash"]

  • topics_pattern:要订阅的主题正则表达式模式。使用此配置时,topics 配置将被忽略。

  • id:在有多个 kafka input 时,在这种情况下添加命名标识将有助于在使用监视 API 时监视 Logstash。

Elasticsearch output plugin 插件配置

常用的配置选项

  • action :告示 Elasticsearch 要做的操作
  • 默认值是 index
    • index :索引一个文档
    • delete :通过 id 删除文档
    • create :索引一个文档,如果 id 已经存在索引中则创建失败
    • update :通过 id 更新文档。如果文档不存在时如何处理,请查看upsert选项。
  • document_id:指定索引文档的 id。
  • 没有默认值,默认Elasticsearch会创建随机 id
  • document_type:指定索引文档的类型。
  • 下个版本将被移除,原因见注脚。
  • hosts:设置远程实例的host,给定数组的 host 会对请求负载均衡。
  • 默认值:["127.0.0.1"] 网址中出现的任何特殊字符必须是 URL 转义的!例如,这意味着#应该是%23。
  • index:指定索引的名称。
  • 默认值:"logstash-%{+YYYY.MM.dd}" 完整的属性请查看:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-document_type

下面放一个从 kafka 收集数据,输出到 Elasticsearch 的配置:

input {
  kafka {
  	group_id => "test-consumer-group"
    topics => ["test"]
    bootstrap_servers => "127.0.0.1:9092"
    codec => json
  }
}

output {
  stdout { codec => rubydebug }

  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    document_id => "%{id}"
  }
}

%{id} 解释: 假设我的输入数据格式为:

{
    "id":"123",
    "level":"info",
    "message":"Hii",
    "timestamp":"2017-02-02T08:32:09.535Z"}

document_id => "%{id}"及使用数据的 id 作为文档索引。


注:开始时,我们把“index”类比为sql数据库的“database”,“type”类比为“table”。 但是这是个错的认知,在 SQL 数据库中,表格彼此独立。一个表中的列与另一个表中的相同名称的列没有关系。但是映射类型(type)中的字段不是这种情况。 在 Elasticsearch 索引中,在不同映射类型(type)中具有相同名称的字段在内部由相同的 Lucene 字段支持。 这会导致你希望删除一个类型中的字段时另一个类型的相同字段会出错。 最重要的是,存储同一索引中具有很少或没有共同字段的不同实体会导致稀疏数据并干扰 Lucene 高效压缩文档的能力。 解决方法:

  1. 为每一个文档类型创建索引
  2. 自己定义一个 type 字段