ELK-经典案例
Filebeat+Logstash+ES+Kibana 日志架构
Filebeat收集客户端日志,Logstash收到日志进行数据处理过滤等操作,Elasticsearch存储日志,Kibana看板展示数据。
filebeat 日志采集推送Logstash
bash
vim filebeat.yml
filebeat.inputs:
- type: log
enabled: true # 开启日志
paths:
- /var/log/nginx/access.log # 指定收集的日志文件
json.keys_under_root: true
tags: ["nginx-access"]
fields: # 可以使用 fields 配置选项设置一些参数字段添加到 output 中
service_name: elk-nginx
log_type: log-access
service_id: 192.168.148.40
- type: log
enabled: true # 开启日志
paths:
- /var/log/nginx/error.log # 指定收集的日志文件
tags: ["nginx-error"]
fields: # 可以使用 fields 配置选项设置一些参数字段添加到 output 中
service_name: filebeat
log_type: log-error
service_id: 192.168.148.40
output.logstash:
hosts: ["192.168.148.45:5044"] # 指定Logstash服务器的地址和端口logstash 日志处理推送ES
bash
cat > /usr/local/logstash-8.12.0/conf.d/filebeat_to_es.conf << 'EOF'
input {
beats {
port => 5044 # 端口号,必须与 Filebeat 配置一致
}
}
filter {
date {
match => [ "@timestamp", "yyyy-MM-dd HH:mm:ss Z" ]
}
mutate {
remove_field => ["@version", "agent", "cloud", "host", "input", "log", "tags", "_index", "_source", "ecs", "event"]
}
}
output {
elasticsearch {
hosts => ["192.168.148.41:9200","192.168.148.42:9200","192.168.148.43:9200"]
index => "%{[fields][service_name]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "123456"
}
stdout {
codec => rubydebug
}
}
EOF检测
bash
# 语法检测
sudo -u logstash logstash -f /usr/local/logstash-8.12.0/conf.d/ -t
# 前台运行
sudo -u logstash logstash -f /usr/local/logstash-8.12.0/conf.d/
# 查看索引
curl -u elastic:123456 -X GET "192.168.148.41:9200/_cat/indices/elk-nginx-*?v"Kibana查看数据
Managert —> Stack Management —> kibana —> 数据视图 —> 创建数据视图
- 名称:自定义
- 索引模式:这个是数据推送到ES集群时设置的索引,支持模糊匹配
elk-nginx-*
创建后再Discover进行查看
Filebeat+Kafka+Logstash+ES+Kibana 日志架构
Filebeat收集客户端日志,kafka消息订阅,Logstash读取kafka日志进行数据处理过滤等操作,Elasticsearch存储日志,Kibana看板展示数据。
filebeat 日志采集推送Kafka
bash
vim filebeat.yml
filebeat.inputs:
- type: log
enabled: true # 开启日志
paths:
- /var/log/nginx/access.log # 指定收集的日志文件
json.keys_under_root: true
tags: ["nginx-access"]
fields: # 可以使用 fields 配置选项设置一些参数字段添加到 output 中
service_name: elk-nginx
log_type: log-access
service_id: 192.168.148.40
- type: log
enabled: true # 开启日志
paths:
- /var/log/nginx/error.log # 指定收集的日志文件
tags: ["nginx-error"]
fields: # 可以使用 fields 配置选项设置一些参数字段添加到 output 中
service_name: filebeat
log_type: log-error
service_id: 192.168.148.40
output.kafka:
hosts: ["11.0.1.120:9092, 11.0.1.121:9092, 11.0.1.122:9092"]
topic: filebeat-log #指定kafka的topic
partition.round_robin:
reachable_only: true #true表示只发布到可用的分区,false时表示所有分区,如果一个节点down会导致block
required_acks: 1 #如果为0,表示不确认,错误消息可能会丢失,1等待写入主分区(默认),-1等待写入副本分区
compression: gzip
max_message_bytes: 1000000 #每条消息最大长度,以字节为单位,如果超过将丢弃Logstash 读取Kafka日志推送ES
配置 Logstash 发送读取 Kafka 日志到 Elasticsearch
bash
cat > /usr/local/logstash-8.12.0/logstash/conf.d/kafka-to-es.conf << 'EOF'
input {
kafka {
bootstrap_servers => "11.0.1.120:9092, 11.0.1.121:9092, 11.0.1.122:9092"
topics => "filebeat-log"
codec => "json"
#group_id => "logstash" #消费者组的名称
#consumer_threads => "3" #建议设置为和kafka的分区相同的值为线程数
#topics_pattern => "nginx-.*" #通过正则表达式匹配topic,而非用上面topics=>指定固定值
}
}
output {
#stdout {} #调试使用
if "nginx-access" in [tags] {
elasticsearch {
hosts => ["192.168.148.41:9200","192.168.148.41:9200","192.168.148.41:9200"]
index => "logstash-kafka-nginx-accesslog-%{+YYYY.MM.dd}"
user => "elastic"
password => "123456"
}
}
if "nginx-error" in [tags] {
elasticsearch {
hosts => ["192.168.148.41:9200","192.168.148.41:9200","192.168.148.41:9200"]
index => "logstash-kafka-nginx-errorlog-%{+YYYY.MM.dd}"
user => "elastic"
password => "123456"
}
}
if "syslog" in [tags] {
elasticsearch {
hosts => ["192.168.148.41:9200","192.168.148.41:9200","192.168.148.41:9200"]
index => "logstash-kafka-syslog-%{+YYYY.MM.dd}"
user => "elastic"
password => "123456"
}
}
}
EOFKibana查看数据
Managert —> Stack Management —> kibana —> 数据视图 —> 创建数据视图
- 名称:自定义
- 索引模式:这个是数据推送到ES集群时设置的索引,支持模糊匹配
logstash-kafka-*
创建后再Discover进行查看
Logstash 采集日志写入 MySQL
ES中的日志后续会被删除,但有些重要数据,比如状态码、客户端IP、客户端浏览器版本等,后期可以会按月或年做数据统计等,因此需要持久保存,可以将重要数据写入数据库达到持久保存目的。
数据库操作
sql
-- 创建库和表并授权用户登录
create database elk character set utf8mb4;
use elk;
CREATE TABLE elklog (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
clientip VARCHAR(39),
uri VARCHAR(2048),
status TINYINT UNSIGNED,
time DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_time_status (time, status),
INDEX idx_clientip (clientip),
INDEX idx_uri (uri(255))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- 数据库授权
create user elk@"192.168.148.%" identified by '123456';
grant all privileges on elk.* to elk@"192.168.148.%";
flush privileges;Logstash安装插件
Logstash 配置 mysql-connector-java 包。MySQL Connector/J是MySQL官方JDBC驱动程序,JDBC(Java Data Base Connectivity,java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成
bash
# 下载
#官方下载地址:https://dev.mysql.com/downloads/connector/
wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.28-1.el8.noarch.rpm
# 安装
yum -y install /home/weihu/src/mysql-connector-j-8.4.0-1.el8.noarch.rpm
rpm -ql mysql-connector*
# 复制jar文件到logstash指定的目录下
# mysql-connector-java.jar是链接文件,其实复制哪个都可以
ll /usr/share/java/mysql-connector-java.jar
mkdir /usr/local/logstash-8.12.0/liboutput/
cp /usr/share/java/mysql-connector-j.jar /usr/local/logstash-8.12.0/liboutput/
# 更改 Gem 源
#Logstash 基于 Ruby 语言实现 , 后续需要安装相关插件,需要从 gem 源下载依赖包
#ruby 语言使用国外的gem源, 由于网络原因,从国内访问很慢而且不稳定
yum -y install ruby
gem -v
gem sources -l
#修改指定国内的源
gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
gem sources -l
# 安装logstash-output-jdbc插件
#查看安装插件,默认只有input-jdbc插件,需要安装output-jdbc插件
/usr/local/logstash-8.12.0/bin/logstash-plugin list |grep jdbc
logstash-integration-jdbc
├── logstash-input-jdbc
├── logstash-filter-jdbc_streaming
└── logstash-filter-jdbc_static
####在线安装output-jdbc插件,可能会等较长时间
/usr/local/logstash-8.12.0/bin/logstash-plugin install logstash-output-jdbc
####离线安装
####如果无法在线安装,可以先从已经安装的主机导出插件,再导入
#导出插件
/usr/local/logstash-8.12.0/bin/logstash-plugin prepare-offline-pack logstash-output-jdbc
ll /usr/local/logstash-8.12.0/logstash-offline-plugins-8.12.0.zip
#离线导入插件
/usr/local/logstash-8.12.0/bin/logstash-plugin install file:///usr/local/logstash-8.12.0/logstash-offline-plugins-8.12.0.zip
#再次检查安装拆插件
/usr/local/logstash-8.12.0/bin/logstash-plugin list |grep jdbc配置filebeate
bash
vim /usr/local/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
json.keys_under_root: true
paths:
- /var/log/nginx/error.log
tags: [wuuuuuu]
- type: log
enable: true
paths:
- /var/log/messages
output.logstash:
hosts: ["192.168.148.45:5044"]
#修改完重启filebeat即可
systemctl restart filebeat配置logstash
bash
cat > /usr/local/logstash-8.12.0/conf.d/mysq_filebeat.conf << 'EOF'
input {
beats {
port => 5044
}
}
filter {
if "wuuuuuu" in [tags] {
mutate {
rename => { "@timestamp" => "times" }
}
}
}
output {
stdout {
codec=>rubydebug
}
if "wuuuuuu" in [tags] {
jdbc {
driver_jar_path => "/usr/local/logstash-8.12.0/liboutput/mysql-connector-j.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://11.0.1.138:3306/elk?user=elk&password=123456&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT"
statement => ["INSERT INTO elklog (clientip,uri,status,time) VALUES(?,?,?,?)","clientip","uri","status","times"]
}
}
}
# 注意配置文件的归属和启动程序的用于保持一致
chown logstash:logstash /usr/local/logstash-8.12.0/conf.d/mysq_filebeat.conf
# 语法检测
/usr/local/logstash-8.12.0/bin/logstash -f /usr/local/logstash-8.12.0/test/mysq_filebeat.conf -tNginx 日志格式化
bash
# nginx 日志格式化
log_format access_json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"uri":"$uri",'
'"domain":"$host",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"tcp_xff":"$proxy_protocol_addr",'
'"http_user_agent":"$http_user_agent",'
'"status":"$status"}';
access_log /var/log/nginx/access_json.log access_json;验证数据
也可以使用MySQL连接工具,比如navicat
bash
mysql -h11.0.1.138 -uelk -p'123456' -e 'select clientip,uri,status,time from elk.elklog'