開源數據收集引擎 Logstash 講解和示例講解
一、概述
Logstash 是一個開源的數據收集和日志處理工具,它是 Elastic Stack(ELK Stack)的一部分,用于從各種數據源中采集、轉換和傳輸數據,以幫助分析和可視化大規模數據。Logstash 通常與 Elasticsearch 和 Kibana 一起使用,以實現實時日志分析和監控。
以下是 Logstash 的主要功能和特點:
- 數據采集:Logstash 可以從多種數據源中采集數據,包括日志文件、數據文件、消息隊列、數據庫、網絡流量等。它支持多種輸入插件,以適應不同數據源的需要。
- 數據轉換:Logstash 具有強大的數據轉換功能,可以對采集的數據進行過濾、解析、轉換和豐富操作。它使用過濾插件來對數據執行各種操作,包括正則表達式解析、字段拆分、數據脫敏、時間戳生成等。
- 多通道數據處理:Logstash 允許將數據流式傳輸到不同的通道,以滿足不同的需求。通道可以是 Elasticsearch、Kafka、RabbitMQ 等,或者您可以定義自定義輸出插件。
- 數據過濾和插件:Logstash 有豐富的插件生態系統,包括輸入插件、過濾插件和輸出插件。這些插件可以根據特定需求來配置和擴展,以適應各種數據處理任務。
- 實時數據處理:Logstash 具有實時數據處理能力,可以將數據從源頭到目的地以實時或近實時的方式傳遞。這使得它適用于日志監控、安全分析、性能監控等實時應用。
- 可伸縮性:Logstash 可以與多個Logstash 實例一起部署,以實現數據采集和處理的橫向擴展。這有助于應對大規模數據需求。
- 易于配置:Logstash 使用簡單的配置文件(通常是YAML格式)來定義數據流的處理過程。配置文件非常直觀,易于理解和維護。
- 社區和支持:Logstash 是一個廣泛采用的開源項目,擁有活躍的社區支持和大量的文檔資源。
Logstash 是 Elastic Stack 中的一個重要組件,與 Elasticsearch 和 Kibana 配合使用,可以構建強大的實時日志和數據分析解決方案。它為組織提供了強大的數據采集和處理工具,用于監控、分析和可視化大規模數據。
官方文檔:
二、Logstash 架構
圖片
Logstash 包含3個主要部分: 輸入(inputs),過濾器(filters)和輸出(outputs)
Logstash的事件(logstash將數據流中等每一條數據稱之為一個event)處理流水線有三個主要角色完成:inputs –> filters –> outputs。
- inpust:必須,負責產生事件(Inputs generate events),常用:File、syslog、redis、kakfa、beats(如:Filebeats);官方文檔:https://www.elastic.co/guide/en/logstash/7.17/input-plugins.html
- filters:可選,負責數據處理與轉換(filters modify them),常用:grok、mutate、drop、clone、geoip;官網文檔:https://www.elastic.co/guide/en/logstash/7.17/filter-plugins.html
- outpus:必須,負責數據輸出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、kakfa、statsd;官方文檔:https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html
二、ElasticSearch 部署
這里可以選擇以下部署方式:
- 通過docker-compose部署:通過 docker-compose 快速部署 Elasticsearch 和 Kibana 保姆級教程
- on k8s 部署:ElasticSearch+Kibana on K8s 講解與實戰操作(版本7.17.3)
這里我選擇 docker-compose 部署方式。
1)部署 docker
# 安裝yum-config-manager配置工具
yum -y install yum-utils
# 建議使用阿里云yum源:(推薦)
#yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安裝docker-ce版本
yum install -y docker-ce
# 啟動并開機啟動
systemctl enable --now docker
docker --version
2)部署 docker-compose
curl -SL https://github.com/docker/compose/releases/download/v2.16.0/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
docker-compose --version
3)創建網絡
# 創建
docker network create bigdata
# 查看
docker network ls
4)修改 Linux 句柄數和最大線程數
#查看當前最大句柄數
sysctl -a | grep vm.max_map_count
#修改句柄數
vi /etc/sysctl.conf
vm.max_map_count=262144
#臨時生效,修改后需要重啟才能生效,不想重啟可以設置臨時生效
sysctl -w vm.max_map_count=262144
#修改后需要重新登錄生效
vi /etc/security/limits.conf
# 添加以下內容
* soft nofile 65535
* hard nofile 65535
* soft nproc 4096
* hard nproc 4096
# 重啟服務,-h 立刻重啟,默認間隔一段時間才會開始重啟
reboot -h now
5)下載部署包開始部署
# 這里選擇 docker-compose 部署方式
git clone https://gitee.com/hadoop-bigdata/docker-compose-es-kibana.git
cd docker-compose-es-kibana
chmod -R 777 es kibana
docker-compose -f docker-compose.yaml up -d
docker-compose ps
三、Logstash 部署與配置講解
1)下載Logstash安裝包
訪問官方網站 https://www.elastic.co/downloads/logstash ,下載相應版本的zip文件。
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.11.1-linux-x86_64.tar.gz
2)解壓安裝包文件
tar -xf logstash-8.11.1-linux-x86_64.tar.gz
3)不同場景測試
1)測試1:采用標準的輸入和輸出
cd logstash-8.11.1
# 測試,采用標準的輸入和輸出,#codec=>rubydebug,解析轉換類型:ruby
# codec各類型講解:https://www.elastic.co/guide/en/logstash/7.9/codec-plugins.html
./bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
# 輸入:
hello
# 輸出:
{
"event" => {
"original" => "hello"
},
"host" => {
"hostname" => "local-168-182-110"
},
"@version" => "1",
"@timestamp" => 2023-11-19T02:31:02.485073839Z,
"message" => "hello"
}
圖片
2)測試2:使用配置文件 +標準輸入輸出
配置文件:config/logstash-1.conf
input {
stdin { }
}
output {
stdout { codec => rubydebug }
}
啟動服務
./bin/logstash -f ./config/logstash-1.conf
3)測試3:配置文件+file輸入 +標準的屏幕輸出
配置文件:./config/logstash-2.conf
input {
file {
path => "/var/log/messages"
}
}
output {
stdout {
codec=>rubydebug
}
}
啟動服務
./bin/logstash -f ./config/logstash-2.conf
圖片
4)測試4:配置文件+文件輸入+kafka輸出
kafka 部署,可以參考我以下幾篇文章:
- 【云原生】zookeeper + kafka on k8s 環境部署
- 【中間件】通過 docker-compose 快速部署 Kafka 保姆級教程
配置文件:./config/logstash-3.conf
input {
file {
path => "/var/log/messages"
}
}
output {
kafka {
bootstrap_servers => "192.168.182.110:9092"
topic_id => "messages"
}
}
啟動服務
./bin/logstash -f ./config/logstash-3.conf
消費 kafka 數據
docker exec -it kafka-node1 bash
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic messages --from-beginning
圖片
5)測試5:配置文件+filebeat端口輸入+標準輸出
filebeat 部署,可以參考我以下幾篇文章:
- 輕量級的日志采集組件 Filebeat 講解與實戰操作
- Filebeat on k8s 日志采集實戰操作
服務器產生日志(filebeat)---》logstash服務器
配置文件:./config/logstash-4.conf
input {
beats {
port => 5044
}
}
output {
stdout {
codec => rubydebug
}
}
啟動服務
./bin/logstash -f ./config/logstash-4.conf
啟動后會在本機啟動一個5044端口,不要和系統已啟動的端口沖突即可,配合測試我們在 filebeat 服務器上修改配置文件。
filebeat 配置文件內容:filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
# ------------------------------ Logstash Output -------------------------------
output.logstash:
hosts: ["192.168.182.110:5044"]
啟動 filebeat
./filebeat -e -c filebeat.yml
6)測試6:配置文件+filebeat端口輸入+輸出到kafka
服務器產生日志(filebeat)---> logstash服務器---->kafka服務器
配置文件:./config/logstash-5.conf
input {
beats {
port => 5044
}
}
output {
kafka {
bootstrap_servers => "192.168.182.110:9092"
topic_id => "messages"
}
}
啟動服務
./bin/logstash -f ./config/logstash-5.conf
7)測試7:filebeat數據采集+kafka讀取當輸入+logstash處理+輸出到 ES
服務器產生日志(filebeat)---> kafka服務器__抽取數據___> logstash服務器---->ES
圖片
logstash的配置:./config/logstash-6.conf
input {
kafka {
bootstrap_servers => "10.82.192.110:9092"
topics => ["messages"]
}
}
output {
elasticsearch {
hosts => ["10.82.192.110:9200"]
index => "messageslog-%{+YYYY-MM-dd}"
}
}
filebeat.yml output.kafka 配置:
# ------------------------------ KAFKA Output -------------------------------
output.kafka:
eanbled: true
hosts: ["10.82.192.110:9092"]
version: "2.0.1"
topic: '%{[fields][log_topic]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000
使用 systemctl 啟動 filebeat
# vi /usr/lib/systemd/system/filebeat.service
[Unit]
Descriptinotallow=filebeat server daemon
Documentatinotallow=/opt/filebeat-7.6.2-linux-x86_64/filebeat -help
Wants=network-online.target
After=network-online.target
[Service]
User=root
Group=root
Envirnotallow="BEAT_CONFIG_OPTS=-c /opt/filebeat-7.6.2-linux-x86_64/filebeat.yml"
ExecStart=/opt/filebeat-7.6.2-linux-x86_64/filebeat $BEAT_CONFIG_OPTS
Restart=always
[Install]
WantedBy=multi-user.target
使用 systemctl 啟動 logstash
# vi /usr/lib/systemd/system/logstash.service
[Unit]
Descriptinotallow=logstash
[Service]
User=root
ExecStart=/opt/logstash-8.11.1/bin/logstash -f /opt/logstash-8.11.1/config/logstash-6.conf
Restart=always
[Install]
WantedBy=multi-user.target
啟動服務
systemctl start logstash
systemctl status logstash
四、Logstash filter常用插件
負責數據處理與轉換(filters modify them),常用:grok、mutate、drop、clone、geoip;官網文檔:https://www.elastic.co/guide/en/logstash/7.17/filter-plugins.html
1)使用grok內置的正則案例
grok 插件:Grok是將非結構化日志數據解析為結構化和可查詢內容的好方法,底層原理是基于正則匹配任意文本格式
此工具非常適合syslog日志、apache和其他Web服務器日志、mysql日志,以及一般來說,任何通常為人類而不是計算機消費編寫的日志格式。
grok內置了120種匹配模式,也可以自定義匹配模式:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
filebeat配置:filebeat.yml
##
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
output.logstash:
#指定logstash監聽的IP和端口
hosts: ["192.168.182.110:5044"]
logstash 配置:stdin-grok-stout.conf
cat >> stdin-grok-stout.conf << EOF
input {
#監聽的類型
beats {
#監聽的本地端口
port => 5044
}
}
filter{
grok{
#match => { "message" => "%{COMBINEDAPACHELOG}" }
#上面的"COMBINEDAPACHELOG"變量官方github上已經廢棄,建議使用下面的匹配模式
#參考地址:https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/httpd
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
}
}
output {
stdout {}
elasticsearch {
#定義es集群的主機地址
hosts => ["192.168.182.110:9200"]
#定義索引名稱
index => "hqt-application-pro-%{+YYYY.MM.dd}"
}
}
EOF
2)使用grok自定義的正則案例
參考官網地址:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
配置如下:
cat >> stdin-grok_custom_patterns-stdout.conf << EOF
input {
stdin {}
}
filter {
grok {
#指定模式匹配的目錄,可以使用絕對路徑
#在./patterns目錄下隨便創建一文件,并寫入以下匹配模式
# ORDER_ID [\u4e00-\u9fa5]{10,11}:[0-9A-F]{10,11}
patterns_dir => ["./patterns"]
#匹配模式
#測試數據為:app_name:gotone-payment-api,client_ip:,context:,docker_name:,env:dev,exception:,extend1:,level:INFO,line:-1,log_message:com.gotone.paycenter.controller.task.PayCenterJobHandler.queryPayOrderTask-request:[\\],log_time:2022-11-23 00:00:00.045,log_type:applicationlog,log_version:1.0.0,本次成交的訂單編號為:BEF25A72965,parent_span_id:,product_line:,server_ip:,server_name:gotone-payment-api-c86658cb7-tc8k5,snooper:,span:0,span_id:,stack_message:,threadId:104,trace_id:,user_log_type:
match => { "message" => "%{ORDER_ID:test_order_id}" }
}
}
output {
stdout {}
}
EOF
3)filter插件通用字段案例(添加/刪除字段、tag)
原有字段(nginx的json解析日志)
配置如下:
cat >> stdin-remove_add_field-stout.conf << EOF
input {
beats {
port => 5044
}
}
filter {
mutate {
#移除指定的字段,使用逗號分隔
remove_field => [ "tags","agent","input","log","ecs","version","@version","ident","referrer","auth" ]
#添加指定的字段,使用逗號分隔
#"%{clientip}"使用%可以將已有字段的值當作變量使用
add_field => {
"app_name" => "nginx"
"test_clientip" => "clientip---->%{clientip}"
}
#添加tag
add_tag => [ "linux","web","nginx","test" ]
#移除tag
remove_tag => [ "linux","test" ]
}
}
output {
stdout {}
}
EOF
4)date 插件修改寫入ES的時間案例
測試日志:如下是我們要收集的一條json格式的日志
{"app_name":"gotone-payment-api","client_ip":"","context":"","docker_name":"","env":"dev","exception":"","extend1":"","level":"INFO","line":68,"log_message":"現代金控支付查詢->調用入參[{}]","log_time":"2022-11-23 00:00:00.051","log_type":"applicationlog","log_version":"1.0.0","method_name":"com.gotone.paycenter.dao.third.impl.modernpay.ModernPayApiAbstract.getModernPayOrderInfo","parent_span_id":"","product_line":"","server_ip":"","server_name":"gotone-payment-api-c86658cb7-tc8k5","snooper":"","span":0,"span_id":"","stack_message":"","threadId":104,"trace_id":"gotone-payment-apib4a65777-ce6b-4bcc-8aef-71a7cfffaf2c","user_log_type":""}
配置如下:
cat >> stdin-date-es.conf << EOF
input {
file {
#指定收集的路徑
path => "/var/log/messages"
}
}
filter {
json {
#JSON解析器 可以將json形式的數據轉換為logstash實際的數據結構(根據key:value拆分成字段形式)
source => "message"
}
date {
#匹配時間字段并解析
match => [ "log_time", "yyyy-MM-dd HH:mm:ss.SSS" ]
#將匹配到的時間字段解析后存儲到目標字段,默認字段為"@timestamp"
target => "@timestamp"
timezone => "Asia/Shanghai"
}
}
output {
stdout {}
elasticsearch {
#定義es集群的主機地址
hosts => ["192.168.182.110:9200"]
#定義索引名稱
index => "hqt-application-pro-%{+YYYY.MM.dd}"
}
}
EOF
5)geoip分析原IP地址位置案例
測試數據為:nginx的json格式日志
{"@timestamp":"2022-12-18T03:27:10+08:00","host":"10.0.24.2","clientip":"114.251.122.178","SendBytes":4833,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"43.143.242.47","uri":"/index.html","domain":"43.143.242.47","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","status":"200"}
配置如下:
cat >> beats-geoip-stdout.conf << EOF
input {
file {
#指定收集的路徑
path => "/var/log/test.log"
}
}
filter {
json {
#JSON解析器 可以將json形式的數據轉換為logstash實際的數據結構(根據key:value拆分成字段形式)
source => "message"
}
geoip {
#指定基于哪個字段分析IP地址
source => "client_ip"
#指定IP地址分析模塊所使用的數據庫,默認為GeoLite2-City.mmdb(這里必須再次指定以下,否則不會顯示城市)
database => "/hqtbj/hqtwww/logstash_workspace/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb"
#如果期望查看指定的字段,則可以在這里配置,若不配置,表示顯示所有的查詢字段
#fields => ["city_name","country_name","ip"]
#指定geoip的輸出字段,當有多個IP地址需要分析時(例如源IP和目的IP),則該字段非常有效
#target => "test-geoip-nginx"
}
}
output {
stdout {}
}
EOF
GeoLite2-City.mmdb 下載:https://dev.maxmind.com/geoip/geolite2-free-geolocation-data
圖片
7)mutate組件常用案例
mutate 測試數據 python 腳本:
cat >> generate_log.py << EOF
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : oldboyedu-linux80
import datetime
import random
import logging
import time
import sys
LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# 配置root的logging.Logger實例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1], filemode='a',)
actions = ["瀏覽??", "評論商品", "加?收藏", "加?購物?", "提交訂單", "使?優惠券", "領取優惠券", "搜索", "查看訂單", "付款", "清空購物?"]
while True:
time.sleep(random.randint(1, 5))
user_id = random.randint(1, 10000)
# 對?成的浮點數保留2位有效數字.
price = round(random.uniform(15000, 30000),2)
action = random.choice(actions)
svip = random.choice([0,1])
logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))
EOF
# python generate_log.py /tmp/app.log
8)logstash的多if分支案例
配置如下:
cat >> homework-to-es.conf << EOF
input {
beats {
type => "test-nginx-applogs"
port => 5044
}
file {
type => "test-product-applogs"
path => "/tmp/app.logs"
}
beats {
type => "test-dw-applogs"
port => 8888
}
file {
type => "test-payment-applogs"
path => "/tmp/payment.log"
}
}
filter {
if [type] == "test-nginx-applogs"{
mutate {
remove_field => [ "tags","agent","input","log","ecs","version","@version","ident","referrer","auth","xff","referer","upstreamtime","upstreamhost","tcp_xff"]
}
geoip {
source => "clientip"
database => "/hqtbj/hqtwww/logstash_workspace/data/plugins/filters/geoip/CC/GeoLite2-City.mmdb"
}
useragent {
source => "http_user_agent"
}
}
if [type] == "test-product-applogs" {
mutate {
split => { "message" => "|" }
}
mutate {
add_field => {
"user_id" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
}
mutate {
convert => {
"user_id" => "integer"
"svip" => "boolean"
"price" => "float"
}
}
}
if [type] in [ "test-dw-applogs","test-payment-applogs" ] {
json {
source => "message"
}
date {
match => [ "log_time", "yyyy-MM-dd HH:mm:ss.SSS" ]
target => "@timestamp"
}
}
}
output {
stdout {}
if [type] == "test-nginx-applogs" {
elasticsearch {
hosts => ["192.168.182.110:9200"]
index => "test-nginx-logs-%{+YYYY.MM.dd}"
}
}
if [type] == "test-product-applogs" {
elasticsearch {
hosts => ["192.168.182.110:9200"]
index => "test-product-applogs-%{+YYYY.MM.dd}"
}
}
if [type] in [ "test-dw-applogs","test-payment-applogs" ] {
elasticsearch {
hosts => ["192.168.182.110:9200"]
index => "test-center-applogs-%{+YYYY.MM.dd}"
}
}
}
EOF