Skip to content

ELK-部署

二进制部署

环境准备

主机名IP 地址部署组件
nginx192.168.148.40Logstash, Nginx(应用)
es-node01192.168.148.41Elasticsearch-8.12.0
es-node02192.168.148.42Elasticsearch-8.12.0
es-node03192.168.148.43Elasticsearch-8.12.0
Kibana192.168.148.44Kibana-8.12.0
logstash192.168.148.45Logstash-8.12.0

环境准备要点

  • 系统资源: 建议 Linux 系统。Elasticsearch 和 Logstash 对内存和 CPU 消耗较高,建议配置 2核4G 或更高。
  • Java 环境: ELK 组件依赖 Java,需确保安装合适版本的 JDK (如 OpenJDK 11)。
  • 系统配置: 需要调整系统参数以支持 ELK 运行,例如增加最大文件打开数 (nofile) 和最大内存映射区域数 (vm.max_map_count)。

设置主机 hostname

所有ES主机都操作

bash
#更改主机名 
es-node01节点:hostnamectl set-hostname es-node01 && exec bash
es-node02节点:hostnamectl set-hostname es-node02 && exec bash
es-node03节点:hostnamectl set-hostname es-node03 && exec bash

#配置域名解析
cat >> /etc/hosts << 'EOF'
192.168.148.41  es-node01
192.168.148.42  es-node02
192.168.148.43  es-node03
EOF

关闭防火墙

所有主机都操作

bash
# 关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
# 关闭selinux
sed -i 's#SELINUX=enforcing#SELINUX=disabled#g' /etc/selinux/config
setenforce 0
getenforce

部署 ElasticSearch

下载地址:https://www.elastic.co/cn/downloads/elasticsearch?pg=global&plcmt=nav&cta=205352

历史版本:https://www.elastic.co/downloads/past-releases#elasticsearch

时间同步

es三台节点需要操作

bash
# CentOS/RHEL (可能需先安装epel-release)
dnf -y install ntpdate

# 执行同步
ntpdate ntp.aliyun.com

配置免密

es-node01节点需要操作

bash
# es-node01 节点上生成密钥对
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa -q
# es-node01 配置所有集群节点的免密登录
for i in 192.168.148.42 192.168.148.43;do ssh-copy-id $i;done

内核优化

es三台节点需要操作

bash
# 修改资源限制配置
cat >>/etc/security/limits.conf <<EOF
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
* soft stack 10240
EOF

# 增加最大内存映射数
# 内核参数 vm.max_map_count 用于限制一个进程可以拥有的VMA(虚拟内存区域)的数量
echo "vm.max_map_count = 262144" >> /etc/sysctl.conf
# 设置系统最大打开的文件描述符数
echo "fs.file-max = 1000000" >> /etc/sysctl.conf

# 加载生效
sysctl -p 

# 可以查看默认值
sysctl -a |grep vm.max_map_count

安装JDK

es三个节点都操作

bash
wget https://download.java.net/java/GA/jdk22.0.1/c7ec1332f7bb44aeba2eb341ae18aca4/8/GPL/openjdk-22.0.1_linux-x64_bin.tar.gz
tar xvf openjdk-22.0.1_linux-x64_bin.tar.gz -C /usr/local/

# 编辑环境变量配置文件
cat >> /etc/profile << "EOF"
export JAVA_HOME=/usr/local/jdk-22.0.1
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
export PATH=$PATH:${JAVA_PATH}
EOF

source /etc/profile

# 检测
java -version

安装服务

es三个节点都需要部署

bash
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.12.0-linux-x86_64.tar.gz
tar xvf elasticsearch-8.12.0-linux-x86_64.tar.gz -C /usr/local

# 创建程序用户
useradd -Ms /sbin/nologin elastic

# 创建数据目录
mkdir -p /data/elasticsearch/data

# 修改文件属性
chown -R elastic:elastic /usr/local/elasticsearch-8.12.0/
chown -R elastic:elastic /data/elasticsearch/

# 设施软连接
ln -s /usr/local/elasticsearch-8.12.0/bin/* /usr/local/bin/

修改配置文件

bash
# 修改前备份配置文件
cp /usr/local/elasticsearch-8.12.0/config/elasticsearch.yml{,.bak}


#### 配置文件详解
#ELK集群名称,单节点无需配置,同一个集群内每个节点的此项必须相同,新加集群的节点此项和其它节点相同即可加入集群,而无需再验证
cluster.name: elk-cluster
#当前节点在集群内的节点名称,同一集群中每个节点要确保此名称唯一
node.name: es-node01
#ES 数据保存目录,包安装默认路径:/var/lib/elasticsearch/
path.data: /data/elasticsearch/data
#ES 日志保存目录,包安装默认路径:/var/llog/elasticsearch/
path.logs: /usr/local/elasticsearch/logs/
#服务启动的时候立即分配(锁定)足够的内存,防止数据写入swap,提高启动速度,但是true会导致启动失败,需要优化
bootstrap.memory_lock: false
#指定该节点用于集群的监听IP,默认监听在127.0.0.1:9300,集群模式必须修改此行,单机默认即可
network.host: 0.0.0.0 
#监听端口
http.port: 9200
#发现集群的node节点列表,可以添加部分或全部节点IP
#在新增节点到已有集群时,此处需指定至少一个已经在集群中的节点地址
discovery.seed_hosts: ["192.168.148.41", "192.168.148.42", "192.168.148.43"]
#集群初始化时指定希望哪些节点可以被选举为 master,只在初始化时使用,新加节点到已有集群时此项可不配置
cluster.initial_master_nodes: ["192.168.148.41", "192.168.148.42", "192.168.148.43"]
#一个集群中的 N 个节点启动后,才允许进行数据恢复处理,默认是1,一般设为为所有节点的一半以上,防止出现脑裂现象
#当集群无法启动时,可以将之修改为1,或者将下面行注释掉,实现快速恢复启动
gateway.recover_after_nodes: 2
#设置是否可以通过正则表达式或者_all匹配索引库进行删除或者关闭索引库,默认true表示必须需要明确指定索引库名称,不能使用正则表达式和_all,生产环境建议设置为 true,防止误删索引库。
action.destructive_requires_name: true
#如果不参与主节点选举设为false,默认值为true
# node.master: false
#存储数据,默认值为true,此值为false则不存储数据而成为一个路由节点
#如果将原有的true改为false,需要先执行./elasticsearch/bin/elasticsearch-node repurpose 清理数据
# node.data: true
#7.x以后版本下面指令已废弃,在2.x 5.x 6.x 版本中用于配置节点发现列表
# discovery.zen.ping.unicast.hosts: ["10.0.0.101", "10.0.0.102","10.0.0.103"]
#8.X版后默认即开启Xpack功能,可以修改为false禁用
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12
#开启跨域访问支持,默认为false
http.cors.enabled: true
# 跨域访问允许的域名地址,(允许所有域名)以上使用正则
http.cors.allow-origin: "*"
http.cors.allow-methods: OPTIONS,HEAD,GET,POST,PUT,DELETE
http.cors.allow-headers: "X-Requested-With,Content-Type,Content-Length,X-User"

transport.port: 9300

内存优化

bash
# 内存优化建议:为了保证性能,每个ES节点的JVM内存设置具体要根据 node 要存储的数据量来估算,建议符合下面约定
# * 在内存和数据量有一个建议的比例:对于一般日志类文件,1G 内存能存储48G~96GB数据
# * JVM 堆内存最大不要超过30GB
# * 单个分片控制在30-50GB,太大查询会比较慢,索引恢复和更新时间越长;分片太小,会导致索引碎片化越严重,性能也会下降

#默认为4G,根据业务需求自己定义,但是要符合jvm堆内存的原则
sed -i 's/## -Xms4g/-Xms4g/' /usr/local/elasticsearch-8.12.0/config/jvm.options
sed -i 's/## -Xmx4g/-Xmx4g/' /usr/local/elasticsearch-8.12.0/config/jvm.options

配置证书

ELK8默认是开启安全认证的

在es-node01节点上操作,然后将证书同步到其它节点

使用现有CA证书
bash
elasticsearch-certutil cert --ca /path/to/your/existing/elastic-stack-ca.p12 --ca-pass "<your_ca_password>" --out /usr/local/elasticsearch-8.12.0/config/certs/elastic-certificates.p12 --pass ""
  • --ca:指定你已有的CA证书文件的路径。
  • --ca-pass:如果你的CA证书在生成时设置了密码,在此指定该密码。如果CA证书没有密码,可以省略此参数。许多教程建议为了方便,生成CA时不设密码。
  • --out--pass:指定输出的节点证书路径和密码。你希望无密码,所以用 --pass ""
生成一个新的CA证书
bash
# 1. 首先,生成一个新的CA证书(Certificate Authority)
# 同样,这里使用 --pass "" 将CA证书的密码设置为空
elasticsearch-certutil ca --out /usr/local/elasticsearch-8.12.0/config/certs/elastic-stack-ca.p12 --pass ""

# 2. 然后,使用刚生成的CA证书来签发节点证书
elasticsearch-certutil cert --ca /usr/local/elasticsearch-8.12.0/config/certs/elastic-stack-ca.p12 --out /usr/local/elasticsearch-8.12.0/config/certs/elastic-certificates.p12 --pass ""


# 如果你也生成了CA证书,也请一并设置权限
chown -R elastic:elastic /usr/local/elasticsearch-8.12.0/config/
chmod -R 640 /usr/local/elasticsearch-8.12.0/config/certs/elastic-certificates.p12
证书同步

在es-node01上执行,将上一步的证书同步到其它节点上

bash
for i in 192.168.148.42 192.168.148.43;do
 sudo -u elastic scp -rp /usr/local/elasticsearch-8.12.0/config/certs $i:/usr/local/elasticsearch-8.12.0/config/
done

# 如果在创建证书的过程中加了密码,需要将你的密码加入到你的Elasticsearch keystore中去。每个节点都需要
bin/elasticsearch-keystore add xpack.security.transport.ssl.keystore.secure_password
bin/elasticsearch-keystore add xpack.security.transport.ssl.truststore.secure_password

配置service管理

bash
cat > /usr/lib/systemd/system/elasticsearch.service << 'EOF'
[Unit]
Description=Elasticsearch-8.12.0 server
Documentation=https://www.elastic.co/products/elasticsearch
After=network-online.target
Wants=network-online.target

[Service]
Type=forking
User=elastic
Group=elastic
ExecStart=/usr/local/elasticsearch-8.12.0/bin/elasticsearch -d
LimitNOFILE=65535
LimitNPROC=4096
LimitMEMLOCK=infinity
Restart=on-failure
RestartSec=10
WorkingDirectory=/usr/local/elasticsearch-8.12.0

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable elasticsearch --now

创建用户密码(可选)

给认证的集群创建用户密码所有es节点都操作

bash
# 按照命令提示输入所有用户的初始化密码,全部输入password就OK,我这里全部是123456,生产环境需要复杂一点
elasticsearch-setup-passwords interactive
  • elastic 账号:拥有 superuser 角色,是内置的超级用户。
  • kibana 账号:拥有 kibana_system 角色,用户 kibana 用来连接 elasticsearch 并与之通信。Kibana 服务器以该用户身份提交请求以访问集群监视 API 和 .kibana 索引。不能访问 index。
  • logstash_system 账号:拥有 logstash_system 角色。用户 Logstash 在 Elasticsearch 中存储监控信息时使用。
  • beats_system账号:拥有 beats_system 角色。用户 Beats 在 Elasticsearch 中存储监控信息时使用。

验证集群

bash
# 集群设置了安全认证,如果直接访问,会报一下错误
curl http://192.168.148.41:9200

# 进行安全访问
curl http://elastic:123456@192.168.148.41:9200

# 查看集群状态
curl http://elastic:123456@192.168.148.41:9200/_cat/health

# 查看所有节点信息
curl http://elastic:123456@192.168.148.41:9200/_cat/nodes?v

# 查看集群健康性,获取到的是一个json格式的返回值,那就可以通过python等工具对其中的信息进行分析
# 注意:status 字段为green才是正常状态
curl http://elastic:123456@192.168.148.41:9200/_cluster/health?pretty=true

# 列出所有的索引 以及每个索引的相关信息
curl http://elastic:123456@192.168.148.41:9200/_cat/indices?v

nginx代理(可选)

nginx
upstream elasticsearch {
  zone elasticsearch 64K;
  server 192.168.148.41:9200;
  server 192.168.148.42:9200;
  server 192.168.148.43:9200;
}
server {
  listen 9200;
  server_name locahost;
  location / {
    proxy_pass http://elasticsearch;
    proxy_redirect off;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  }
  access_log logs/es_access.log;
}

部署 Kibana

下载地址:https://www.elastic.co/cn/downloads/kibana

历史版本:https://www.elastic.co/downloads/past-releases#kibana

安装服务

bash
wget https://artifacts.elastic.co/downloads/kibana/kibana-8.12.0-linux-x86_64.tar.gz
tar xvf kibana-8.12.0-linux-x86_64.tar.gz -C /usr/local

# 修改配置文件
cp /usr/local/kibana-8.12.0/config/kibana.yml{,.bak}
vim /usr/local/kibana-8.12.0/config/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.148.41","http://192.168.148.42:9200","http://192.168.148.43:9200"]
elasticsearch.username: "kibana"
elasticsearch.password: "123456"
i18n.locale: "zh-CN"


# 创建程序用户
useradd -Ms /sbin/nologin kibana
chown -R kibana:kibana /usr/local/kibana-8.12.0

配置文件详解

yaml
# Kibana服务相关
server.host: "localhost",kibana的主机地址。
server.port: 5601,kibana的默认监听端口。
server.basePath: "",如果您在代理后面运行,则可以指定安装Kibana的路径。使用server.rewriteBasePath设置告诉Kibana是否应从其收到的请求中删除basePath,并防止在启动时发生弃用警告。此设置不能以斜杠/结尾。
server.rewriteBasePath: false,默认值为false,指定Kibana是否应重写以server.basePath为前缀的请求,或者要求它们由反向代理重写。在Kibana 6.3之前,此设置实际上始终为false,并且在Kibana 7.0中默认为true。
server.maxPayloadBytes: 1048576 传入服务器请求的最大有效负载大小(以字节为单位)。
server.name: "your-hostname",kibana的服务名称。

# elasticsearch相关配置
elasticsearch.url: "http://localhost:9200",kibana访问es的URL。
elasticsearch.preserveHost: true,默认值为true,当此设置的值为true时,Kibana使用server.host设置中指定的主机名。当此设置的值为false时,Kibana使用连接到此Kibana实例的主机的主机名。

kibana.index: ".kibana",默认值:.kibana,Kibana使用Elasticsearch中的索引来存储已保存的搜索,可视化和仪表板。如果索引尚不存在,Kibana会创建一个新索引。

kibana.defaultAppId: "home", 默认值home,要加载的默认应用程序。

elasticsearch.username: "user"和elasticsearch.password: "pass",如果您的Elasticsearch受基本身份验证保护,则这些设置提供Kibana服务器用于在启动时对Kibana索引执行维护的用户名和密码。您的Kibana用户仍需要使用通过Kibana服务器代理的Elasticsearch进行身份验证。

server.ssl.enabled: false,从Kibana服务器到浏览器的传出请求启用SSL。设置为true时,需要server.ssl.certificate和server.ssl.key。
server.ssl.certificate: /path/to/your/server.crt,PEM格式的SSL证书。
server.ssl.key: /path/to/your/server.key,SSL秘钥文件的路径。

elasticsearch.ssl.certificate: /path/to/your/client.crt和elasticsearch.ssl.key: /path/to/your/client.key,可选设置,提供PEM格式SSL证书和密钥文件的路径。这些文件用于验证Kibana到Elasticsearch的身份,并且在Elasticsearch中的xpack.ssl.verification_mode设置为certificate或full时是必需的。
elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ],可选设置,使您可以为Elasticsearch实例的证书颁发机构指定PEM文件的路径列表。
elasticsearch.ssl.verificationMode: full,完全控制Elasticsearch提供的证书验证。有效值为none,certificate和full。完整执行主机名验证,证书不执行。
elasticsearch.pingTimeout: 1500,elasticsearch.requestTimeout设置的值等待Elasticsearch响应ping的时间(以毫秒为单位)。
elasticsearch.requestTimeout: 30000,等待后端或Elasticsearch响应的时间(以毫秒为单位)。这个值必须是一个正整数。
elasticsearch.requestHeadersWhitelist: [ authorization ],要发送到Elasticsearch的Kibana客户端标头列表。如不发送客户端标头,请将此值设置为空。

elasticsearch.customHeaders: {},要发送到Elasticsearch的标题名称和值。 无论elasticsearch.requestHeadersWhitelist配置如何,客户端标头都不能覆盖任何自定义标头。
elasticsearch.shardTimeout: 30000, Elasticsearch等待分片响应的时间(以毫秒为单位)。设置为0以禁用。
elasticsearch.startupTimeout: 5000,在Kibana刚启动时等待Elasticsearch的时间(以毫秒为单位),然后重试。
elasticsearch.logQueries: false,记录发送到Elasticsearch的查询。 需要将logging.verbose设置为true。

# 日志相关
logging.verbose: false,将此设置的值设置为true,以记录所有事件,包括系统使用信息 和所有请求。
logging.dest: stdout,允许指定Kibana存储日志输出的文件。
logging.silent: false, 将此设置的值设置为true,以禁用所有日志输出。
logging.quiet: false,将此设置的值设置为true,以抑制除错误消息之外的所有日志输出。

# 其它配置
ops.interval: 5000,将间隔(以毫秒为单位)设置为采样系统和处理性能 指标。最低是100 ms。默认为5000。
i18n.locale: "en",配置多语言版本配置,目前貌似汉语支持还不太行。
pid.file: /var/run/kibana.pid,指定Kibana创建进程标识文件的路径。

内核优化

bash
# 修改资源限制配置
cat >>/etc/security/limits.conf <<EOF
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
* soft stack 10240
EOF

# 临时生效
ulimit -n 65536

# 设置系统最大打开的文件描述符数
echo "fs.file-max = 1000000" >> /etc/sysctl.conf

# 加载生效
sysctl -p

创建service管理文件

bash
cat > /usr/lib/systemd/system/kibana.service << 'EOF'
[Unit]
Description=Kibana-8.12.0 server
Documentation=https://www.elastic.co/guide/en/kibana/current/index.html
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
User=kibana
Group=kibana
Environment="CONFIG_PATH=/usr/local/kibana-8.12.0/config/kibana.yml"
Environment="NODE_OPTIONS=--max-old-space-size=2048"
ExecStart=/usr/local/kibana-8.12.0/bin/kibana
Restart=on-failure
RestartSec=3
StartLimitInterval=60
StartLimitBurst=3
WorkingDirectory=/usr/local/kibana-8.12.0
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable kibana --now

创建视图

数据流需要logstash或者filebeat推送才会自动创建索引

Managert —> Stack Management —> kibana —> 数据视图 —> 创建数据视图

  • 名称:自定义
  • 索引模式:这个是数据推送到ES集群时设置的索引,支持模糊匹配nginx-access-*

创建后再Discover进行查看

部署 metricbeat

下载地址:https://www.elastic.co/cn/downloads/beats/metricbeat

历史版本:https://www.elastic.co/downloads/past-releases#metricbeat

在所有es节点执行进行es集群的监控

安装服务

bash
curl -L -O https://artifacts.elastic.co/downloads/beats/metricbeat/metricbeat-8.12.2-linux-x86_64.tar.gz
tar xzvf metricbeat-8.12.2-linux-x86_64.tar.gz
mv metricbeat-8.12.2-linux-x86_64 /usr/local/metricbeat-8.12.2
ln -s /usr/local/metricbeat-8.12.2/metricbeat /usr/local/bin/

# 创建程序用户
useradd -Ms /sbin/nologin metricbeat
chown -R metricbeat:metricbeat /usr/local/metricbeat-8.12.2

# 修改配置文件
cp /usr/local/metricbeat-8.12.2/metricbeat.yml{,.bak}
output.elasticsearch:
  hosts: ["https://localhost:9200"]
  username: "metricbeat_internal"
  password: "123456"

配置模块

bash
cp /usr/local/metricbeat-8.12.2/modules.d/elasticsearch-xpack.yml.disabled /usr/local/metricbeat-8.12.2/modules.d/elasticsearch-xpack.yml
# 设置es集群账号信息
- module: elasticsearch
  xpack.enabled: true
  period: 10s
  hosts: ["http://localhost:9200"]
  username: "metricbeat_internal"
  password: "123456"
  
# 启动模块
metricbeat modules enable elasticsearch-xpack

内核优化(可选)

如果监控不在es集群上部署可以进行内核优化

bash
# 修改资源限制配置
cat >>/etc/security/limits.conf <<EOF
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
* soft stack 10240
EOF

# 临时生效
ulimit -n 65536

# 设置系统最大打开的文件描述符数
echo "fs.file-max = 1000000" >> /etc/sysctl.conf

# 加载生效
sysctl -p

创建service管理文件

bash
cat >  /usr/lib/systemd/system/metricbeat.service << 'EOF'
[Unit]
Description=Metricbeat-8.12.2 server
Documentation=https://www.elastic.co/guide/en/beats/metricbeat/current/index.html
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
User=metricbeat
Group=metricbeat
ExecStart=/usr/local/metricbeat-8.12.2/metricbeat -e -c /usr/local/metricbeat-8.12.2/metricbeat.yml
Restart=on-failure
RestartSec=3
StartLimitInterval=60
StartLimitBurst=3
LimitNOFILE=65536
UMask=0027

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable metricbeat --now

部署 filebeat

下载地址:https://www.elastic.co/cn/downloads/beats/filebeat

历史版本:https://www.elastic.co/downloads/past-releases#filebeat

安装服务

bash
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz
tar xvf filebeat-8.12.0-linux-x86_64.tar.gz
mv filebeat-8.12.0-linux-x86_64 /usr/local/filebeat

# 创建程序用户
useradd -Ms /sbin/nologin filebeat
chown -R filebeat:filebeat /usr/local/filebeat

# 创建软连接
ln -s /usr/local/filebeat/filebeat /usr/local/bin/filebeat

# 备份配置文件
cp -p /usr/local/filebeat/filebeat.yml{,.bak}

内核优化(可选)

如果监控不在es集群上部署可以进行内核优化

bash
# 修改资源限制配置
cat >>/etc/security/limits.conf <<EOF
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
* soft stack 10240
EOF

# 临时生效
ulimit -n 65536

# 设置系统最大打开的文件描述符数
echo "fs.file-max = 1000000" >> /etc/sysctl.conf

# 加载生效
sysctl -p

创建service管理

bash
cat > /usr/lib/systemd/system/filebeat.service << 'EOF'
[Unit]
Description=Filebeat-8.12.0 server
Documentation=https://www.elastic.co/guide/en/beats/filebeat/current/index.html
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
User=filebeat
Group=filebeat
ExecStart=/usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml
Restart=on-failure
RestartSec=3
StartLimitInterval=60
StartLimitBurst=3
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable filebeat --now

日志推送

日志推送到 logstash
bash
# nginx 日志格式化
log_format access_json '{"@timestamp":"$time_iso8601",'
    '"host":"$server_addr",'
    '"clientip":"$remote_addr",'
    '"size":$body_bytes_sent,'
    '"responsetime":$request_time,'
    '"upstreamtime":"$upstream_response_time",'
    '"upstreamhost":"$upstream_addr",'
    '"http_host":"$host",'
    '"uri":"$uri",'
    '"domain":"$host",'
    '"xff":"$http_x_forwarded_for",'
    '"referer":"$http_referer",'
    '"tcp_xff":"$proxy_protocol_addr",'
    '"http_user_agent":"$http_user_agent",'
    '"status":"$status"}';
access_log /var/log/nginx/access_json.log access_json;

vim filebeat.yml 
filebeat.inputs:
- type: log
  enabled: true                        		# 开启日志
  paths:
    - /var/log/nginx/access_json.log      	# 指定收集的日志文件
  json.keys_under_root: true
  tags: ["nginx-access"]
filebeat.inputs:
- type: log
  enabled: true           			# 开启日志
  paths:
  - /var/log/*            			# 指定收集的日志文件
  
output.logstash:
  hosts: ["192.168.148.45:5044"]  	# 指定Logstash服务器的地址和端口
日志推送到ES
bash
# 修改filebeat.yml
# 输入配置
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access_json.log
  json.keys_under_root: true 			# 默认false,只识别为普通文本,会将全部日志数据存储至message字段,改为true则会以Json格式存储
 json.overwrite_keys: true  			# 设为true,使用json格式日志中自定义的key替代默认的message字段,此项可选
 tags: ["nginx-access"]     			# 指定tag,用于分类
  
- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
 tags: ["nginx-error"]

# 输出配置
output.elasticsearch:
  hosts: ["192.168.148.41:9200","192.168.148.42:9200","192.168.148.43:9200"]
  username: "elastic"
  password: "123456"
  # 单个bulk API 索引请求中批量的最大事件数。默认值是 1600;增加批次大小可以通过降低发送事件的开销来提高性能,但过大的批次大小也可能增加处理时间,可能导致 API错误、连接被杀、发布请求超时,最终导致吞吐量降低。将 bulk_max_size 设置为 0 或负值将禁用批次分割
  bulk_max_size: 2048
  indices:
  - index: "nginx-access-%{[agent.version]}-%{+yyy.MM.dd}"
    when.contains:
      tags: "nginx-access"   			# 如果记志中有access的tag,就记录到nginx-access的索引中
  - index: "nginx-error-%{[agent.version]}-%{+yyy.MM.dd}"
    when.contains:
      tags: "nginx-error"   			# 如果记志中有error的tag,就记录到nginx-error的索引中
  
setup.ilm.enabled: false 				# 关闭索引生命周期ilm功能,默认开启时索引名称只能为filebeat-*
setup.template.name: "nginx" 			# 定义模板名称,要自定义索引名称,必须指定此项,否则无法启动
setup.template.pattern: "nginx-*" 		# 定义模板的匹配索引名称,要自定义索引名称,必须指定此项,否则无法启动

# 修改配位置文件后重启
systemctl restart filebeat
日志推送到 Redis
bash
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access_json.log
  json.keys_under_root: true 		#默认False会将json数据存储至message,改为true则会独立message外存储
  json.overwrite_keys: true  		#设为true,覆盖默认的message字段,使用自定义json格式中的key
  tags: ["nginx-access"]
- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
 tags: ["nginx-error"]

output.redis:
  hosts: ["10.0.0.104:6379"]
  key: "filebeat"
  #password: "123456"
  #db: 0
  
# 改完配置重启
systemctl restart filebeat
标椎输入到 kafka
bash
filebeat.inputs:
- type: stdin
 enabled: true
  
output.kafka:
 hosts: ["10.0.0.201:9092", "10.0.0.202:9092", "10.0.0.203:9092"]
 topic: filebeat-log   			#指定kafka的topic
 partition.round_robin:
   reachable_only: true   		#true表示只发布到可用的分区,false时表示所有分区,如果一个节点down,会block
 required_acks: 1  				#如果为0,错误消息可能会丢失,1等待写入主分区(默认),-1等待写入副本分区
 compression: gzip  
 max_message_bytes: 1000000 	#每条消息最大长度,以字节为单位,如果超过将丢弃


# 改完配置重启
systemctl restart filebeat
Tomcat日志推送ES
bash
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /usr/local/tomcat/logs/localhost_access_log.*
    #- /var/log/tomcat9/localhost_access_log.* #包安装
 json.keys_under_root: true
 json.overwrite_keys: false
 tags: ["tomcat-access"]
- type: log
 enabled: true
 paths:
   - /usr/local/tomcat/logs/catalina.*.log
 tags: ["tomcat-error"]
 multiline.type: pattern            #*此为默认值,可省略
 multiline.pattern: '^[0-3][0-9]-'  #*正则表达式匹配以两位,或者为'^\d{2}'
 multiline.negate: true             #*negate否定无效
 multiline.match: after
 multiline.max_lines: 5000          #*默认只合并500行,指定最大合并5000行
  
output.elasticsearch:
  hosts: ["192.168.148.41:9092", "192.168.148.42:9092", "192.168.148.43:9092"]
  username: "elastic"
  password: "123456"
  indices:
    - index: "tomcat-access-%{[agent.version]}-%{+yyy.MM.dd}"
     when.contains:
       tags: "tomcat-access"
    - index: "tomcat-error-%{[agent.version]}-%{+yyy.MM.dd}"
     when.contains:
       tags: "tomcat-error"
  
setup.ilm.enabled: false
setup.template.name: "tomcat"
setup.template.pattern: "tomcat-*"


# 改完配置重启
systemctl restart filebeat

检测索引

如果是filebeat直接输出到ES集群中,ES的索引会自动创建

bash
# 注意:filebeat使用的是普通用户,和日志的归属不同会引起无权访问问题,需要设置权限给filebeat
chown -R :filebeat /var/log/nginx/

# 查看索引:正常返回例如 nginx-access-8.12.0-年.月.日的索引
curl -u elastic:123456 -X GET "192.168.148.41:9200/_cat/indices/nginx-access-*?v"

部署 logstash

下载地址:https://www.elastic.co/cn/downloads/logstash

历史版本:https://www.elastic.co/downloads/past-releases#logstash

安装服务

bash
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.12.0-linux-x86_64.tar.gz
tar -xvf logstash-8.12.0-linux-x86_64.tar.gz -C /usr/local/

# 保存输入输出配置文件
mkdir /usr/local/logstash-8.12.0/conf.d

# 创建程序用户
useradd -Ms /sbin/nologin logstash
chown -R logstash:logstash /usr/local/logstash-8.12.0

# 创建软连接
ln -s /usr/local/logstash-8.12.0/bin/logstash /usr/local/bin/

# 创建数据目录
mkdir -p /data/logstash
chown -R logstash:logstash /data/logstash

# 创建日志目录
mkdir -p /var/log/logstash
chown -R logstash:logstash /var/log/logstash

修改配置文件

bash
# 备份配置文件
cp -p /usr/local/logstash-8.12.0/config/logstash.yml{,.bak}

# 根据实际情况修改
vim /usr/local/logstash-8.12.0/config/logstash.yml

node.name: logstash-node01
pipeline.workers: 2
pipeline.batch.size: 1000    	# 批量从IPNPUT读取的消息个数,可以根据ES的性能做性能优化
pipeline.batch.delay: 5      	# 处理下一个事件前的最长等待时长,以毫秒ms为单位,可以根据ES的性能做性能优化
path.data: /data/logstash 		# 数据位置
path.logs: /var/log/logstash 	# 日志位置

内核优化

bash
# 修改资源限制配置
cat >>/etc/security/limits.conf <<EOF
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
* soft stack 10240
EOF

# 临时生效
ulimit -n 65536

# 设置系统最大打开的文件描述符数
echo "fs.file-max = 1000000" >> /etc/sysctl.conf

# 加载生效
sysctl -p

内存优化

bash
# 根据实际情况设置内存大小
vim /usr/local/logstash-8.12.0/configjvm.options
-Xms1g
-Xmx1g

logstash 常用命令

bash
logstash --help

常用选项

  • -e:指定配置内容
  • -f:指定配置文件,支持绝对路径,如果用相对路径,是相对于/usr/share/logstash/的路径
  • -t:语法检查
  • -r:修改配置文件后自动加载生效,注意:有时候修改配置还需要重新启动生效

创建service管理

bash
cat > /usr/lib/systemd/system/logstash.service << 'EOF'
[Unit]
Description=Logstash-8.12.0 server
Documentation=https://www.elastic.co/guide/en/logstash/current/index.html
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=logstash
Group=logstash
ExecStart=/usr/local/logstash-8.12.0/bin/logstash -f /usr/local/logstash-8.12.0/conf.d/
LimitNOFILE=65536
LimitMEMLOCK=infinity
Restart=on-failure
RestartSec=5
WorkingDirectory=/usr/local/logstash-8.12.0

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable logstash --now

输入 Input 插件

标准输入

codec 用于输入数据的编解码器,默认值为plain表示单行字符串,若设置为json,表示按照json方式解析

bash
# 以配置文件实现标准输入
cat > /usr/local/logstash-8.12.0/conf.d/stdin_to_stdout.conf << 'EOF'
input {
  stdin {
    type => "stdin_type"      #自定义事件类型,可用于后续判断
    tags => "stdin_tag"       #自定义事件tag,可用于后续判断
    codec => "json"           #指定Json 格式
  }
}
output {
  stdout {
    codec => "rubydebug" #输出格式,此为默认值,可省略
  }
}
EOF
从文件输入

Logstash 会记录每个文件的读取位置,下次自动从此位置继续向后读取

bash
cat > /usr/local/logstash-8.12.0/conf.d/file_to_stdout.conf << 'EOF'
input {
  file {
    path => "/tmp/wu.*"
    type => "wulog" # 自定义的type字段
    exclude => "*.txt" # 排除.txt文件
    start_position => "beginning" # 从头开始读取
    stat_interval => "3" # 检查文件更新的时间间隔,单位为秒
    codec => json # 如果文件是Json格式,需要指定此项才能解析,如果不是Json格式而添加此行也不会影响结果
  }

  file {
    path => "/var/log/messages" # 监控的系统日志文件路径
    type => "syslog" # 指定数据类型为syslog
    start_position => "beginning" # 从头开始读取
    stat_interval => 3 # 检查文件更新的时间间隔,单位为秒
  }
}

output {
  stdout {
    codec => "rubydebug" # 输出的编解码器
  }
}
EOF
从 Http 请求中取数据
bash
cat > /usr/local/logstash-8.12.0/conf.d/http_to_stdout.conf << 'EOF'
input {
  http {
    port =>6666
    codec => json
  }
}
output {
  stdout {
    codec => rubydebug
  }
}
EOF
从 Filebeat 读取数据
bash
vim filebeat.yml 
filebeat.inputs:
- type: log
  enabled: true                        #开启日志
  paths:
    - /var/log/nginx/access_json.log      #指定收集的日志文件
  json.keys_under_root: true
  tags: ["nginx-access"]
filebeat.inputs:
- type: log
  enabled: true           #开启日志
  paths:
  - /var/log/*            #指定收集的日志文件
output.logstash:
  hosts: ["11.0.1.139:5044"]  #指定Logstash服务器的地址和端口

# Logstash配置
cat > /usr/local/logstash-8.12.0/conf.d/filebeat_to_stdout.conf << 'EOF'
input {
  beats {
    port => 5044 # 端口号,必须与 Filebeat 配置一致
  }
}

output {
  stdout {
    codec => "rubydebug" # 使用 rubydebug 编解码器
  }
}
EOF
从 Redis 中读取数据

支持由多个 Logstash 从 Redis 读取日志,提高性能。Logstash 从 Redis 收集完数据后,将删除对应的列表Key

bash
cat > /usr/local/logstash-8.12.0/conf.d/redis_to_stdout.conf << 'EOF'
input {
  redis {
    host => 'Redis_IP'
    port => "6379"
    password => "123456"
    db => "0"
    data_type => 'list'
    key => "nginx-accesslog"
  }
}

output {
  stdout {
    codec => rubydebug
  }
}
EOF
从 Kafka 中读取数据
bash
cat > /usr/local/logstash-8.12.0/test/kakfa_to_stdout.conf << 'EOF'
input {
  kafka {
    bootstrap_servers => "10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092" # Kafka 集群的地址
    group_id => "logstash" 	# Kafka 消费者组的ID
    topics => ["nginx-accesslog", "nginx-errorlog"] # 订阅的Kafka主题列表
    codec => "json" 		# 指定编解码器
    consumer_threads => 8 	# Kafka消费者线程数
  }
}

output {
  stdout {
    codec => "rubydebug" 	# 使用rubydebug编解码器输出到标准输出
  }
}
EOF

输出 Output 插件

Stdout 插件

stdout 插件将数据输出到屏幕终端,主要用于调试

bash
output {
  stdout {
  codec => rubydebug
  }
}
File 插件
bash
output {
  stdout {
    codec => rubydebug
  }
  file {
    path => "/var/log/test.log"
  }
}
Elasticsearch 插件

官方文档:https://www.elastic.co/guide/en/logstash/8.12/plugins-outputs-elasticsearch.html

bash
# 当日志量较小时,可以按月或周生成索引,当日志量比较大时,会按天生成索引,以方便后续按天删除
output {
  elasticsearch {
    hosts =>["192.168.148.41:9200","192.168.148.42:9200","192.168.148.43:9200"]   #一般写ES中data节点地址
    index => "app-%{+YYYY.MM.dd}"               #指定索引名称,建议加时间,按天建立索引
    #index => "%{[@metadata][target_index]}"   #使用字段[@metadata][target_index]值做为索引名
    template_overwrite => true                  #覆盖索引模板,此项可选,默认值为false
  }
}

# 配置文件实现
cat  > /usr/local/logstash-8.12.0/conf.d/logstash-es.conf << 'EOF'
input {
  file {
    path => "/var/log/syslog"
    start_position => "beginning"
    stat_interval => 3
  }
}
output {
  elasticsearch {
    hosts => ["192.168.148.41:9200","192.168.148.42:9200","192.168.148.43:9200"]
    index => "syslog-%{+YYYY.MM.dd}"
  }
}
EOF
Redis 插件

Logstash 支持将日志转发至 Redis

官方文档:https://www.elastic.co/guide/en/logstash/8.12/plugins-outputs-redis.html

bash
cat > /usr/local/logstash-8.12.0/conf.d/file_to_redis.conf << 'EOF'
input { 
  file {
    path => "/var/log/nginx/access.log"
    type => 'nginx-accesslog'    
    start_position => "beginning"
    stat_interval => "3" 
    codec ==> json                
  }
}
output {
  if [type] == 'nginx-accesslog' {
    redis {
      host => 'Redis_IP'
      port => "6379"
      password => "123456"
      db => "0"
      data_type => 'list'
      key => "nginx-accesslog"
      }
}
EOF
Kafka 插件

Logstash 支持将日志转发至 Kafka

官方文档:https://www.elastic.co/guide/en/logstash/8.12/plugins-outputs-kafka.html

bash
cat > /usr/local/logstash-8.12.0/conf.d/file_to_kafka.conf << 'EOF'
input {
  file {
    path => "/var/log/nginx/access.log"
    type => 'nginx-accesslog'
    start_position => "beginning"
    stat_interval => 3
    codec => json # 指定编解码器,如果日志是 JSON 格式
  }
  file {
    path => "/var/log/nginx/error.log"
    type => 'nginx-errorlog'
    start_position => "beginning"
    stat_interval => 3
    # 如果日志不是 JSON 格式,这里不需要 codec 配置
  }
}

filter {
  # 这里可以添加 filter 来处理日志数据,如果需要
}

output {
  if [type] == 'nginx-accesslog' {
    kafka {
      bootstrap_servers => '11.0.1.120:9092, 11.0.1.121:9092, 11.0.1.122:9092'
      topic_id => 'nginx-accesslog' # 使用 topics 而不是 topic_id
      codec => json
    }
 } 
 if [type] == 'nginx-errorlog' {
   kafka {
     bootstrap_servers => 'Kafka_IP1:9092, Kafka_IP2:9092, Kafka_IP3:9092'
     topic_id => 'nginx-errorlog' # 使用 topics
     codec => json
     }
   }
}
EOF

过滤 Filter 插件

数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便进行更强大的分析和实现商业价值

Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响

bash
grok:正则匹配任意文本
date:处理时间字段
user_agent:分析客户端设备
geoip:分析客户端IP地理位置
mutate:对字符串文本进行处理,转换,字母大小写,切分,重命名,添加字段,删除字段

if分支语句:

if [type] == "xxx" {

} else if [type] == "xxx" {

} else {

}
Grok 插件

Grok 是一个过滤器插件,可帮助您描述日志格式的结构。有超过200种 grok模式抽象概念,如IPv6地址,UNIX路径和月份名称。 为了将日志行与格式匹配, 生产环境常需要将非结构化的数据解析成 json 结构化数据格式

bash
cat > /usr/local/logstash-8.12.0/conf.d/nginx_grok_stdout.conf << 'EOF'
input {
  file {
    start_position => "beginning"
    path => "/var/log/nginx/access.log"
  }
}

filter{
  grok{
    match => {
      "message" => "%{COMMONAPACHELOG}"    #官方自带的Apache转换模块,与nginx格式一致
    }
  }
}

output {
  stdout {}
}
EOF


# 插入一条访问日志
echo '139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] "GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1" 200 1949 "https://www.aaa.com/" "Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36" "-"' >> /var/log/nginx/access_json.log

# 看结果
logstash -f /usr/local/logstash-8.12.0/conf.d/nginx_grok_stdout.conf
......
{
          "http" => {
        "response" => {
            "status_code" => 200,
                   "body" => {
                "bytes" => 1949
            }
        },
         "request" => {
            "method" => "GET"
        },
         "version" => "1.1"
    },
         "event" => {
        "original" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\""
    },
           "url" => {
        "original" => "/wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION"
    },
    "@timestamp" => 2024-06-30T03:02:08.019405702Z,
       "message" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\"",
     "timestamp" => "05/Jul/2022:19:48:03 +0000",
          "host" => {
        "name" => "logstash"
    },
      "@version" => "1",
        "source" => {
        "address" => "139.226.81.24"
    },
           "log" => {
        "file" => {
            "path" => "/var/log/nginx/access.log"
        }
    }
}
Geoip 插件

geoip 根据 ip 地址提供的对应地域信息,比如:经纬度,国家,城市名等,以方便进行地理数据分析

bash
cat > /usr/local/logstash-8.12.0/conf.d/nginx-geoip_stdout.conf << 'EOF'
input {
  file {
    start_position => "beginning"
    path => "/var/log/nginx/access.log"
  }
}

filter{
  grok{
    match => {
      "message" => "%{COMMONAPACHELOG}"    #官方自带的Apache转换模块,与nginx格式一致
    }
  }

  geoip {
    source => "[source][address]"
    target => "geoip"
  }
}

output {
  stdout {}
}
EOF


# 执行结果
logstash -f /usr/local/logstash-8.12.0/conf.d/nginx-geoip_stdout.conf
......
{
         "geoip" => {
        "geo" => {
                    "location" => {
                "lon" => 121.4581,
                "lat" => 31.2222
            },
                 "region_name" => "Shanghai",
             "region_iso_code" => "CN-SH",
                   "city_name" => "Shanghai",
            "country_iso_code" => "CN",
                "country_name" => "China",
              "continent_code" => "AS",
                    "timezone" => "Asia/Shanghai"
        },
         "ip" => "139.226.81.24"
    },
     "timestamp" => "05/Jul/2022:19:48:03 +0000",
         "event" => {
        "original" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\""
    },
           "url" => {
        "original" => "/wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION"
    },
          "http" => {
         "request" => {
            "method" => "GET"
        },
        "response" => {
                   "body" => {
                "bytes" => 1949
            },
            "status_code" => 200
        },
         "version" => "1.1"
    },
           "log" => {
        "file" => {
            "path" => "/var/log/nginx/access.log"
        }
    },
       "message" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\"",
        "source" => {
        "address" => "139.226.81.24"
    },
    "@timestamp" => 2024-06-30T03:16:44.384058522Z,
      "@version" => "1",
          "host" => {
        "name" => "logstash"
    }
}
Date 插件

官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html

Date插件可以将日志中的指定的日期字符串对应的源字段生成新的目标字段。 然后替换@timestamp 字段(此字段默认为当前写入logstash的时间而非日志本身的时间)或指定的其他字段

bash
cat > /usr/local/logstash-8.12.0/conf.d/nginx-grok_date_stdout.conf << 'EOF'
input {
  file {
    start_position => "beginning"
    path => "/var/log/nginx/access.log"
  }
}

filter{
  grok{
    match => {
      "message" => "%{COMMONAPACHELOG}"    #官方自带的Apache转换模块,与nginx格式一致
    }
  }

  date{
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    #target => "access_time"   #将时间写入新生成的access_time字段,源字段仍保留
    target => "@timestamp"     #将时间覆盖原有的@timestamp字段
    timezone => "Asia/Shanghai"
  }


}

output {
  stdout {}
}


# 将UNIX时间转换指定格式
date {
 match => ["timestamp","UNIX","YYYY-MM-dd HH:mm:ss"]
 target =>"@timestamp"
 timezone => "Asia/shanghai"
}
EOF
  • match:类型为数组,用于指定需要使用的源字段名和对应的时间格式
  • target:类型为字符串,用于指定生成的目标字段名,默认是 @timestamp
  • timezone:类型为字符串,用于指定时区域
Useragent 插件

useragent 插件可以根据请求中的 user-agent 字段,解析出浏览器设备、操作系统等信息, 以方便后续的分析使用

bash
cat > /usr/local/logstash-8.12.0/conf.d/nginx-grok_useragent_stdout.conf << 'EOF'
input {
  file {
    start_position => "beginning"
    path => "/var/log/nginx/access.log"
  }
}

filter{
  grok{
    match => {
      "message" => "%{COMMONAPACHELOG}"    #官方自带的Apache转换模块,与nginx格式一致
    }
  }

  date{
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    #target => "access_time"
    target => "@timestamp"
    timezone => "Asia/Shanghai"
  }

  useragent{
    source => "message"
    target => "useragent"
  }

}

output {
  stdout {}
}
EOF

# 模拟生成nginx日志
echo '139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] "GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1" 200 1949 "https://www.aaa.com/" "Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36" "-"' >> /var/log/nginx/access.log


# 结果
logstash -f /usr/local/logstash-8.12.0/conf.d/nginx-grok_useragent_stdout.conf
...
{
    "@timestamp" => 2022-07-05T19:48:03.000Z,
          "host" => {
        "name" => "logstash"
    },
         "event" => {
        "original" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\""
    },
          "http" => {
         "request" => {
            "method" => "GET"
        },
        "response" => {
            "status_code" => 200,
                   "body" => {
                "bytes" => 1949
            }
        },
         "version" => "1.1"
    },
           "log" => {
        "file" => {
            "path" => "/var/log/nginx/access.log"
        }
    },
      "@version" => "1",
     "timestamp" => "05/Jul/2022:19:48:03 +0000",
     "useragent" => {
           "name" => "Chrome Mobile WebView",
         "device" => {
            "name" => "XiaoMi Redmi K30 Pro"
        },
             "os" => {
               "full" => "Android 11",
               "name" => "Android",
            "version" => "11"
        },
        "version" => "78.0.3904.108"
    },
        "source" => {
        "address" => "139.226.81.24"
    },
       "message" => "139.226.81.24 - - [05/Jul/2022:19:48:03 +0000] \"GET /wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION HTTP/1.1\" 200 1949 \"https://www.aaa.com/\" \"Mozilla/5.0 (Linux; U; Android 11; zh-CN; Redmi K30 Pro Build/RKQ1.200826.002) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 Quark/5.4.9.201 Mobile Safari/537.36\" \"-\"",
           "url" => {
        "original" => "/wp-content/themes/dux/js/libs/ias.min.js?ver=THEME_VERSION"
    }
}
Mutate 插件

Mutate 插件主要是对字段进行、类型转换、删除、替换、更新等操作,可以使用以下函数

bash
# Split 切割
# mutate 中的 split 字符串切割,指定字符做为分隔符,切割的结果用于生成新的列表元素
filter {
   mutate {
        #字段分隔符
       split => { "message" => "|" } #将message字段按 | 分割成名称message列表中多个列表元素
   }
}

# remove_field 删除字段
filter {
   mutate {
        #remove_field => ["headers","message", "agent"] #7.X
       remove_field => ["timestamp","message", "http"] #8.X
   }
}

# add_field 添加字段
filter {
    #mutate 切割操作
   mutate {
        #字段分隔符
       split => { "message" => "|" }
        #添加字段,将message的列表的第0个元素添加字段名user_id
       add_field => {
            "user_id" => "%{[message][0]}"  
            "action" => "%{[message][1]}"
            "time" => "%{[message][2]}"
       }
        #添加字段做索引名
        #add_field => {"[@metadata][target_index]" => "app-%{+YYY.MM.dd}"} 
        #删除无用字段
       remove_field => ["headers","message"]
   }
}

#  convert 转换
filter {
    #mutate 切割操作
   mutate {
        #字段分隔符
 split => { "message" => "|" }
        #添加字段
       add_field => {
            "user_id" => "%{[message][0]}"
            "action" => "%{[message][1]}"
            "time" => "%{[message][2]}"
       }
        #删除无用字段
       remove_field => ["headers","message"]
        #对新添加字段进行格式转换
       convert => {
            "user_id" => "integer"
            "action" => "string"
            "time" => "string"
       }
        #convert => ["excute_time","float] #此格式也可以支持
        #convert => ["time","string" ]
   }
}

# gsub 替换
filter {
 mutate {
 gsub=>["message","\n", " "] #将message字段中的换行替换为空格
 }
}
  • remove_field:删除字段
  • split:字符串切割,相当于awk取列
  • add_field:添加字段
  • convert:类型转换,支持的数据类型:integer,integer_eu,float,float_eu,string,boolean
  • gsub:字符串替换
  • rename:字符串改名
  • lowercase:转换字符串为小写
条件判断

Filter 语句块中支持 if 条件判断功能

bash
filter {
 mutate {
 gsub=>["message","\n", " "] #将message字段中的换行替换为空格
 }
}
filter {
    if "access" in [tags][0] {
       mutate {
           add_field => { "target_index" => "access-%{+YYYY.MM.dd}"}
       }
   }
    else if "error" in [tags][0] {
       mutate {
           add_field => { "target_index" => "error-%{+YYYY.MM.dd}"}
       }
   }
 else if "system" in [tags][0] {
 mutate {
           add_field => { "target_index" => "system-%{+YYYY.MM.dd}"}
       }
 }
}

Docker 部署

bash
cat > docker-compose.yml << 'EOF'
version: '3'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
    environment:
      discovery.type: single-node
    volumes:
      - es_data:/usr/share/elasticsearch/data

  logstash:
    image: docker.elastic.co/logstash/logstash:8.12.0
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    depends_on:
      - elasticsearch

  kibana:
    image: docker.elastic.co/kibana/kibana:8.12.0
    depends_on:
      - elasticsearch

volumes:
  es_data:
EOF

docker-compose up -d