Skip to content

ELK-经典案例

Filebeat+Logstash+ES+Kibana 日志架构

Filebeat收集客户端日志,Logstash收到日志进行数据处理过滤等操作,Elasticsearch存储日志,Kibana看板展示数据。

filebeat 日志采集推送Logstash

bash
vim filebeat.yml 
filebeat.inputs:
- type: log
  enabled: true                        		# 开启日志
  paths:
    - /var/log/nginx/access.log      	# 指定收集的日志文件
  json.keys_under_root: true
  tags: ["nginx-access"]
  fields:           				# 可以使用 fields 配置选项设置一些参数字段添加到 output 中
    service_name: elk-nginx
    log_type: log-access
    service_id: 192.168.148.40
- type: log
  enabled: true           			# 开启日志
  paths:
    - /var/log/nginx/error.log            # 指定收集的日志文件
  tags: ["nginx-error"]
  fields:           				# 可以使用 fields 配置选项设置一些参数字段添加到 output 中
    service_name: filebeat
    log_type: log-error
    service_id: 192.168.148.40
  
output.logstash:
  hosts: ["192.168.148.45:5044"]  	# 指定Logstash服务器的地址和端口

logstash 日志处理推送ES

bash
cat > /usr/local/logstash-8.12.0/conf.d/filebeat_to_es.conf << 'EOF'
input {
  beats {
    port => 5044 			# 端口号,必须与 Filebeat 配置一致
  }
}

filter {
  date {
    match => [ "@timestamp", "yyyy-MM-dd HH:mm:ss Z" ]
  }
  mutate {
    remove_field => ["@version", "agent", "cloud", "host", "input", "log", "tags", "_index", "_source", "ecs", "event"]
  }
}

output {
  elasticsearch {
    hosts => ["192.168.148.41:9200","192.168.148.42:9200","192.168.148.43:9200"]
    index => "%{[fields][service_name]}-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "123456"
  }
  stdout {
    codec => rubydebug
  }
}
EOF

检测

bash
# 语法检测
sudo -u logstash logstash -f /usr/local/logstash-8.12.0/conf.d/ -t

# 前台运行
sudo -u logstash logstash -f /usr/local/logstash-8.12.0/conf.d/

# 查看索引
curl -u elastic:123456 -X GET "192.168.148.41:9200/_cat/indices/elk-nginx-*?v"

Kibana查看数据

Managert —> Stack Management —> kibana —> 数据视图 —> 创建数据视图

  • 名称:自定义
  • 索引模式:这个是数据推送到ES集群时设置的索引,支持模糊匹配elk-nginx-*

创建后再Discover进行查看

Filebeat+Kafka+Logstash+ES+Kibana 日志架构

Filebeat收集客户端日志,kafka消息订阅,Logstash读取kafka日志进行数据处理过滤等操作,Elasticsearch存储日志,Kibana看板展示数据。

filebeat 日志采集推送Kafka

bash
vim filebeat.yml 
filebeat.inputs:
- type: log
  enabled: true                        		# 开启日志
  paths:
    - /var/log/nginx/access.log      	# 指定收集的日志文件
  json.keys_under_root: true
  tags: ["nginx-access"]
  fields:           				# 可以使用 fields 配置选项设置一些参数字段添加到 output 中
    service_name: elk-nginx
    log_type: log-access
    service_id: 192.168.148.40
- type: log
  enabled: true           			# 开启日志
  paths:
    - /var/log/nginx/error.log            # 指定收集的日志文件
  tags: ["nginx-error"]
  fields:           				# 可以使用 fields 配置选项设置一些参数字段添加到 output 中
    service_name: filebeat
    log_type: log-error
    service_id: 192.168.148.40
  
output.kafka:
  hosts: ["11.0.1.120:9092, 11.0.1.121:9092, 11.0.1.122:9092"]
  topic: filebeat-log   #指定kafka的topic
  partition.round_robin:
    reachable_only: true #true表示只发布到可用的分区,false时表示所有分区,如果一个节点down会导致block
  required_acks: 1  #如果为0,表示不确认,错误消息可能会丢失,1等待写入主分区(默认),-1等待写入副本分区
  compression: gzip  
  max_message_bytes: 1000000 #每条消息最大长度,以字节为单位,如果超过将丢弃

Logstash 读取Kafka日志推送ES

配置 Logstash 发送读取 Kafka 日志到 Elasticsearch

bash
cat > /usr/local/logstash-8.12.0/logstash/conf.d/kafka-to-es.conf << 'EOF'
input {
 kafka {
   bootstrap_servers => "11.0.1.120:9092, 11.0.1.121:9092, 11.0.1.122:9092"
   topics => "filebeat-log"
   codec => "json"
    #group_id => "logstash" #消费者组的名称
    #consumer_threads => "3" #建议设置为和kafka的分区相同的值为线程数
    #topics_pattern => "nginx-.*" #通过正则表达式匹配topic,而非用上面topics=>指定固定值
 }
}
output {
  #stdout {}   #调试使用
  if "nginx-access" in [tags] {
   elasticsearch {
     hosts => ["192.168.148.41:9200","192.168.148.41:9200","192.168.148.41:9200"]
     index => "logstash-kafka-nginx-accesslog-%{+YYYY.MM.dd}"
     user => "elastic"
     password => "123456"
   }
 }
  if "nginx-error" in [tags] {
    elasticsearch {
      hosts => ["192.168.148.41:9200","192.168.148.41:9200","192.168.148.41:9200"]
      index => "logstash-kafka-nginx-errorlog-%{+YYYY.MM.dd}"
      user => "elastic"
      password => "123456"
    }
  }
  if "syslog" in [tags] {
    elasticsearch {
      hosts => ["192.168.148.41:9200","192.168.148.41:9200","192.168.148.41:9200"]
      index => "logstash-kafka-syslog-%{+YYYY.MM.dd}"
      user => "elastic"
      password => "123456"
    }
  }
}
EOF

Kibana查看数据

Managert —> Stack Management —> kibana —> 数据视图 —> 创建数据视图

  • 名称:自定义
  • 索引模式:这个是数据推送到ES集群时设置的索引,支持模糊匹配logstash-kafka-*

创建后再Discover进行查看

Logstash 采集日志写入 MySQL

ES中的日志后续会被删除,但有些重要数据,比如状态码、客户端IP、客户端浏览器版本等,后期可以会按月或年做数据统计等,因此需要持久保存,可以将重要数据写入数据库达到持久保存目的。

数据库操作

sql
-- 创建库和表并授权用户登录
create database elk character set utf8mb4;
use elk;
CREATE TABLE elklog (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    clientip VARCHAR(39),
    uri VARCHAR(2048),
    status TINYINT UNSIGNED,
    time DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_time_status (time, status),
    INDEX idx_clientip (clientip),
    INDEX idx_uri (uri(255))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

-- 数据库授权
create user elk@"192.168.148.%" identified by '123456';
grant all privileges on elk.* to elk@"192.168.148.%";
flush privileges;

Logstash安装插件

Logstash 配置 mysql-connector-java 包。MySQL Connector/J是MySQL官方JDBC驱动程序,JDBC(Java Data Base Connectivity,java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成

bash
# 下载
#官方下载地址:https://dev.mysql.com/downloads/connector/
wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.28-1.el8.noarch.rpm

# 安装
yum -y install /home/weihu/src/mysql-connector-j-8.4.0-1.el8.noarch.rpm
rpm -ql mysql-connector*

# 复制jar文件到logstash指定的目录下
# mysql-connector-java.jar是链接文件,其实复制哪个都可以
ll /usr/share/java/mysql-connector-java.jar

mkdir /usr/local/logstash-8.12.0/liboutput/
cp /usr/share/java/mysql-connector-j.jar /usr/local/logstash-8.12.0/liboutput/


# 更改 Gem 源
#Logstash 基于 Ruby 语言实现 , 后续需要安装相关插件,需要从 gem 源下载依赖包
#ruby 语言使用国外的gem源, 由于网络原因,从国内访问很慢而且不稳定
yum -y install ruby
gem -v
gem sources -l

#修改指定国内的源
gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
gem sources -l

# 安装logstash-output-jdbc插件
#查看安装插件,默认只有input-jdbc插件,需要安装output-jdbc插件
/usr/local/logstash-8.12.0/bin/logstash-plugin list |grep jdbc
logstash-integration-jdbc
 ├── logstash-input-jdbc
 ├── logstash-filter-jdbc_streaming
 └── logstash-filter-jdbc_static
 
 ####在线安装output-jdbc插件,可能会等较长时间
/usr/local/logstash-8.12.0/bin/logstash-plugin install logstash-output-jdbc


####离线安装
####如果无法在线安装,可以先从已经安装的主机导出插件,再导入
#导出插件
/usr/local/logstash-8.12.0/bin/logstash-plugin prepare-offline-pack logstash-output-jdbc
ll /usr/local/logstash-8.12.0/logstash-offline-plugins-8.12.0.zip

#离线导入插件
/usr/local/logstash-8.12.0/bin/logstash-plugin install file:///usr/local/logstash-8.12.0/logstash-offline-plugins-8.12.0.zip

#再次检查安装拆插件
/usr/local/logstash-8.12.0/bin/logstash-plugin list |grep jdbc

配置filebeate

bash
vim /usr/local/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  json.keys_under_root: true
  paths:
  - /var/log/nginx/error.log
  tags: [wuuuuuu]

- type: log
  enable: true
  paths:
  - /var/log/messages

output.logstash:
  hosts: ["192.168.148.45:5044"]

#修改完重启filebeat即可
systemctl restart filebeat

配置logstash

bash
cat > /usr/local/logstash-8.12.0/conf.d/mysq_filebeat.conf << 'EOF'
input {
  beats {
    port => 5044
  }
}

filter {
  if "wuuuuuu" in [tags] {
    mutate {
      rename => { "@timestamp" => "times" }
    }
  }
}

output {
  stdout {
    codec=>rubydebug
  }

  if "wuuuuuu" in [tags] {
    jdbc {
      driver_jar_path => "/usr/local/logstash-8.12.0/liboutput/mysql-connector-j.jar"
      driver_class => "com.mysql.jdbc.Driver"
      connection_string => "jdbc:mysql://11.0.1.138:3306/elk?user=elk&password=123456&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT"
      statement => ["INSERT INTO elklog (clientip,uri,status,time) VALUES(?,?,?,?)","clientip","uri","status","times"]
    }
  }
}

# 注意配置文件的归属和启动程序的用于保持一致
chown logstash:logstash /usr/local/logstash-8.12.0/conf.d/mysq_filebeat.conf

# 语法检测
/usr/local/logstash-8.12.0/bin/logstash -f /usr/local/logstash-8.12.0/test/mysq_filebeat.conf -t

Nginx 日志格式化

bash
# nginx 日志格式化
log_format access_json '{"@timestamp":"$time_iso8601",'
    '"host":"$server_addr",'
    '"clientip":"$remote_addr",'
    '"size":$body_bytes_sent,'
    '"responsetime":$request_time,'
    '"upstreamtime":"$upstream_response_time",'
    '"upstreamhost":"$upstream_addr",'
    '"http_host":"$host",'
    '"uri":"$uri",'
    '"domain":"$host",'
    '"xff":"$http_x_forwarded_for",'
    '"referer":"$http_referer",'
    '"tcp_xff":"$proxy_protocol_addr",'
    '"http_user_agent":"$http_user_agent",'
    '"status":"$status"}';
access_log /var/log/nginx/access_json.log access_json;

验证数据

也可以使用MySQL连接工具,比如navicat

bash
mysql -h11.0.1.138 -uelk -p'123456' -e 'select clientip,uri,status,time from elk.elklog'