Shell 脚本
基础概念
什么是 Shell?常见的 Shell 类型有哪些?
答:Shell 是用户与 Linux 内核交互的命令解释器,它接收用户命令并调用相应程序执行。
常见 Shell 类型:
| Shell | 特点 | 配置文件 |
|---|---|---|
| sh (Bourne) | 原始 Unix Shell | /etc/profile |
| bash (Bourne Again) | 默认 Shell,功能丰富 | ~/.bashrc, ~/.bash_profile |
| zsh | 功能强大,可定制性高 | ~/.zshrc |
| csh/tcsh | C 风格语法 | ~/.cshrc |
| ksh | KornShell,兼容 sh | ~/.kshrc |
| fish | 友好易用 | ~/.config/fish/config.fish |
Shell 脚本的基本结构是什么?
bash
#!/bin/bash
# 指定解释器路径(Shebang)
# 注释说明
# 作者: xxx
# 日期: 2024-01-15
# 描述: 示例脚本
# 设置严格模式
set -euo pipefail
# 定义变量
NAME="World"
# 函数定义
greet() {
echo "Hello, ${NAME}!"
}
# 主逻辑
main() {
greet
echo "Script completed."
}
# 执行主函数
main "$@"变量与数据类型
如何定义和使用变量?
bash
# 基本变量赋值
name="John"
age=25
# 使用变量
echo $name # John
echo "${name}" # John(推荐,更安全)
echo "My name is ${name}, I'm ${age} years old"
# 只读变量
readonly PI=3.14159
declare -r CONSTANT="不可修改"
# 删除变量
unset name
# 环境变量
export PATH="/usr/local/bin:$PATH"
export JAVA_HOME=/usr/lib/jvm/java-8
# 特殊变量
$0 # 脚本名称
$1-$9 # 第1到第9个参数
$# # 参数个数
$* # 所有参数(单个字符串)
$@ # 所有参数(独立字符串)
$$ # 当前进程 PID
$? # 上一个命令的退出状态
$! # 后台进程的 PID数组操作
bash
# 定义数组
fruits=("apple" "banana" "cherry")
numbers=(1 2 3 4 5)
# 访问元素
echo ${fruits[0]} # apple
echo ${fruits[@]} # 所有元素
echo ${fruits[*]} # 所有元素
echo ${#fruits[@]} # 数组长度
# 切片
echo ${fruits[@]:0:2} # apple banana(前2个)
echo ${fruits[@]:1} # banana cherry(从索引1开始)
# 追加元素
fruits+=("date")
# 删除元素
unset fruits[1]
# 关联数组(类似字典)
declare -A person
person["name"]="Alice"
person["age"]=30
echo ${person["name"]} # Alice字符串处理
常用字符串操作
bash
str="Hello World, Welcome to Shell Scripting"
# 字符串长度
echo ${#str} # 38
# 子字符串提取
echo ${str:0:5} # Hello
echo ${str:6:5} # World
# 子字符串替换
echo ${str/World/Linux} # 替换第一个
echo ${str//o/O} # 替换所有
# 删除子串
echo ${str#Hello } # 从开头删除最短匹配
echo ${str##* ,} # 从开头删除最长匹配
echo ${str%Scripting} # 从结尾删除最短匹配
echo ${str%%S*} # 从结尾删除最长匹配
# 大小写转换
echo ${str^^} # 全部转大写
echo ${str,,} # 全部转小写
# 默认值
echo ${var:-default} # var 未设置或为空时返回 default
echo ${var:=default} # var 未设置或为空时赋值并返回
echo ${var:+set} # var 已设置时返回 set
echo ${var:?error} # var 未设置或为空时报错
# 检查是否包含子串
if [[ "$str" == *"Welcome"* ]]; then
echo "包含 Welcome"
fi条件判断与流程控制
if 条件语句
bash
# 基本语法
if [ condition ]; then
# commands
elif [ condition ]; then
# commands
else
# commands
fi
# 文件测试
if [ -f "/etc/passwd" ]; then
echo "文件存在"
fi
if [ -d "/tmp" ]; then
echo "目录存在"
fi
if [ -r "/etc/shadow" ]; then
echo "文件可读"
fi
if [ -w "/tmp" ]; then
echo "目录可写"
fi
if [ -x "/bin/ls" ]; then
echo "文件可执行"
fi
# 数值比较
num=10
if [ $num -eq 10 ]; then echo "等于10"; fi
if [ $num -ne 5 ]; then echo "不等于5"; fi
if [ $num -gt 5 ]; then echo "大于5"; fi
if [ $num -ge 10 ]; then echo "大于等于10"; fi
if [ $num -lt 20 ]; then echo "小于20"; fi
if [ $num -le 10 ]; then echo "小于等于10"; fi
# 字符串比较
if [ "$str" = "hello" ]; then echo "相等"; fi
if [ "$str" != "world" ]; then echo "不相等"; fi
if [ -z "$empty" ]; then echo "空字符串"; fi
if [ -n "$str" ]; then echo "非空字符串"; fi
# 逻辑运算
if [ -f file ] && [ -r file ]; then echo "文件存在且可读"; fi
if [ -d dir ] || [ -L link ]; then echo "目录或链接"; ficase 语句
bash
case "$1" in
start)
echo "启动服务"
;;
stop)
echo "停止服务"
;;
restart|reload)
echo "重启服务"
;;
status)
echo "查看状态"
;;
*)
echo "用法: $0 {start|stop|restart|status}"
exit 1
;;
esac循环语句
bash
# for 循环
for i in {1..10}; do
echo "数字: $i"
done
# 遍历数组
fruits=("apple" "banana" "cherry")
for fruit in "${fruits[@]}"; do
echo "水果: $fruit"
done
# C 风格 for 循环
for ((i=0; i<10; i++)); do
echo "索引: $i"
done
# while 循环
count=0
while [ $count -lt 5 ]; do
echo "计数: $count"
((count++))
done
# until 循环(条件为真时退出)
until [ $count -ge 10 ]; do
echo "直到: $count"
((count++))
done
# 无限循环(配合 break)
while true; do
read -p "输入 q 退出: " input
if [ "$input" = "q" ]; then
break
fi
done
# continue 跳过当前迭代
for i in {1..10}; do
if [ $((i % 2)) -eq 0 ]; then
continue
fi
echo "奇数: $i"
done函数
如何定义和调用函数?
bash
# 基本语法
function_name() {
local var="局部变量"
echo "函数被调用"
return 0 # 返回退出码 0-255
}
# 或使用 function 关键字
function function_name {
echo "另一种定义方式"
}
# 调用函数
function_name
# 带参数的函数
greet() {
local name=$1
local age=${2:-18} # 默认值
echo "你好, $name! 你今年 $age 岁。"
return 0
}
greet "张三" 25
greet "李四" # age 使用默认值 18
# 返回值(通过 echo 输出)
calculate_sum() {
local a=$1
local b=$2
echo $((a + b)) # 通过 stdout 返回结果
}
result=$(calculate_sum 10 20)
echo "结果是: $result" # 30常用命令
文件和目录操作
bash
# 创建目录
mkdir -p /path/to/nested/dirs # 递归创建
# 复制文件
cp file1.txt file2.txt
cp -r source_dir/ dest_dir/ # 递归复制目录
cp -a source dest # 保留属性复制
# 移动/重命名
mv old_name new_name
mv file /new/location/
# 删除
rm file.txt
rm -rf directory/ # 强制递归删除(谨慎使用!)
# 查找文件
find /var/log -name "*.log" -mtime +7 # 7天前的日志文件
find /home -type f -size +100M # 大于100M的文件
find . -name "*.py" -exec rm {} \; # 查找并删除
# 查看文件内容
cat file.txt # 显示全部内容
less file.txt # 分页显示
head -n 20 file.txt # 前20行
tail -n 20 file.txt # 后20行
tail -f logfile.log # 实时跟踪日志
grep "pattern" file # 搜索匹配行
wc -l file.txt # 统计行数文本处理三剑客:grep、sed、awk
grep - 文本搜索
bash
# 基本搜索
grep "error" logfile.log
grep -i "error" logfile.log # 忽略大小写
grep -r "pattern" /etc/ # 递归搜索目录
grep -v "comment" file # 反向匹配(不包含)
# 正则表达式
grep -E "^Error:" file # 以 Error: 开头
grep -E "[0-9]{3}-[0-9]{4}" file # 匹配电话号码格式
grep -c "pattern" file # 统计匹配次数
grep -l "pattern" *.txt # 只列出文件名
grep -A 3 -B 2 "error" log # 显示后3行前2行上下文sed - 流编辑器
bash
# 替换文本
sed 's/old/new/g' file.txt # 全局替换
sed -i.bak 's/foo/bar/g' file.txt # 直接修改并备份
# 删除行
sed '/^$/d' file.txt # 删除空行
sed '1,10d' file.txt # 删除第1-10行
sed '/pattern/d' file.txt # 删除匹配行
# 打印指定行
sed -n '10,20p' file.txt # 打印第10-20行
sed -n '/start/,/end/p' file.txt # 打印两个模式之间的行
# 其他常用操作
sed -i 's/^/#/' file.txt # 行首添加注释
sed -i 's/\r$//' file.txt # 删除 Windows 换行符awk - 文本处理语言
bash
# 基本用法
awk '{print $1}' file.txt # 打印第一列
awk '{print NR": "$0}' file.txt # 带行号打印全部内容
# 指定分隔符
awk -F: '{print $1}' /etc/passwd # 以冒号为分隔符
awk -F',' '{print $1,$3}' data.csv # CSV 文件处理
# 条件过滤
awk '$3 > 100' file.txt # 第三列大于100的行
awk '/error/ {print}' file.txt # 包含 error 的行
awk 'NR > 10 && NR < 20' file.txt # 第11-19行
# 统计计算
awk '{sum+=$1; count++} END {print sum/count}' file.txt # 平均值
awk '{arr[$1]++} END {for(k in arr) print k, arr[k]}' file.txt # 统计频率
# 实用示例
# 统计各 HTTP 状态码数量
awk '{print $9}' access.log | sort | uniq -c | sort -rn
# 计算总流量
awk '{sum+=$10} END {printf "%.2f MB\n", sum/1024/1024}' access.log进程管理
bash
# 查看进程
ps aux # 所有进程
ps -ef # 完整格式
ps aux | grep nginx # 过滤特定进程
# 实时监控进程
top # 动态显示
htop # 更友好的界面(需安装)
# 进程控制
kill PID # 终止进程
kill -9 PID # 强制终止
kill -HUP PID # 重载配置(优雅重启)
pkill -f "nginx" # 按名称杀死进程
killall nginx # 杀死所有同名进程
# 后台运行
command & # 后台执行
nohup command & # 断开终端继续运行
jobs # 查看后台任务
fg %1 # 前台恢复任务
bg %1 # 后台恢复暂停的任务
# 查看端口占用
netstat -tlnp | grep 80
ss -tlnp | grep 80
lsof -i :80网络诊断
bash
# 连通性测试
ping google.com
ping -c 4 192.168.1.1 # 发送4个包
traceroute target.com # 路由跟踪
mtr target.com # 结合 ping 和 traceroute
# DNS 解析
nslookup domain.com
dig domain.com
host domain.com
# 端口扫描
nc -zv host port # 测试端口连通性
telnet host port # 测试 TCP 连接
curl -I http://example.com # 查看 HTTP 头信息
# 网络配置
ip addr show # 查看IP地址
ip route show # 查看路由表
ifconfig # 传统方式(已废弃)
# 抓包
tcpdump -i eth0 port 80 # 抓取80端口数据包
tcpdump -i eth0 host 192.168.1.100 # 抓取特定主机实用脚本案例
日志分析脚本
bash
#!/bin/bash
# analyze_log.sh - 分析 Nginx 访问日志
LOG_FILE="${1:-/var/log/nginx/access.log}"
TOP_N=10
echo "=== Nginx 日志分析报告 ==="
echo "日志文件: $LOG_FILE"
echo "生成时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo ""
# Top $TOP_N IP 地址
echo "--- Top $TOP_N 访问 IP ---"
awk '{print $1}' "$LOG_FILE" | sort | uniq -c | sort -rn | head -$TOP_N
echo ""
# Top $TOP_N 访问 URL
echo "--- Top $TOP_N 访问 URL ---"
awk '{print $7}' "$LOG_FILE" | sort | uniq -c | sort -rn | head -$TOP_N
echo ""
# HTTP 状态码统计
echo "--- HTTP 状态码统计 ---"
awk '{print $9}' "$LOG_FILE" | sort | uniq -c | sort -rn
echo ""
# 总访问量和独立 IP 数
echo "--- 总体统计 ---"
TOTAL_LINES=$(wc -l < "$LOG_FILE")
UNIQUE_IPS=$(awk '{print $1}' "$LOG_FILE" | sort -u | wc -l)
echo "总请求数: $TOTAL_LINES"
echo "独立 IP 数: $UNIQUE_IPS"服务监控脚本
bash
#!/bin/bash
# monitor.sh - 基础服务健康检查
SERVICES=("nginx" "mysql" "redis")
ALERT_EMAIL="admin@example.com"
LOG_FILE="/var/log/service_monitor.log"
check_service() {
local service=$1
if systemctl is-active --quiet "$service"; then
echo "[OK] $service 运行正常"
return 0
else
echo "[ERROR] $service 未运行!"
return 1
fi
}
check_disk_space() {
local threshold=85
local usage=$(df / | awk 'NR==2{print $5}' | tr -d '%')
if [ "$usage" -gt "$threshold" ]; then
echo "[WARNING] 磁盘使用率: ${usage}% (阈值: ${threshold}%)"
return 1
else
echo "[OK] 磁盘使用率: ${usage}%"
return 0
fi
}
check_memory() {
local total_mem=$(free -m | awk '/Mem:/ {print $2}')
local used_mem=$(free -m | awk '/Mem:/ {print $3}')
local usage=$((used_mem * 100 / total_mem))
if [ "$usage" -gt 90 ]; then
echo "[WARNING] 内存使用率: ${usage}%"
return 1
else
echo "[OK] 内存使用率: ${usage}%"
return 0
fi
}
main() {
echo "$(date '+%Y-%m-%d %H:%M:%S') 开始检查..." >> "$LOG_FILE"
has_error=0
# 检查服务
for service in "${SERVICES[@]}"; do
if ! check_service "$service"; then
has_error=1
fi
done
# 检查资源
check_disk_space || has_error=1
check_memory || has_error=1
if [ $has_error -eq 1 ]; then
echo "发现异常,发送告警邮件..."
# mail -s "服务告警" "$ALERT_EMAIL" < "$LOG_FILE"
fi
}
mainPython 脚本
基础知识
Python 运维常用的标准库有哪些?
| 库名 | 用途 | 常用模块 |
|---|---|---|
| os | 操作系统接口 | os.path, os.environ, os.system |
| sys | 系统相关 | sys.argv, sys.path, sys.exit |
| subprocess | 子进程管理 | run(), Popen(), call() |
| shutil | 高级文件操作 | copy(), move(), rmtree() |
| glob | 文件名模式匹配 | glob(), iglob() |
| re | 正则表达式 | match(), search(), sub() |
| json | JSON 处理 | loads(), dumps() |
| csv | CSV 文件处理 | reader(), writer() |
| logging | 日志记录 | getLogger(), basicConfig() |
| datetime | 日期时间 | datetime, timedelta, strftime |
| socket | 网络编程 | socket(), connect(), send() |
| threading | 多线程 | Thread, Lock, Event |
| multiprocessing | 多进程 | Process, Pool, Queue |
| configparser | 配置文件解析 | ConfigParser, read() |
| argparse | 命令行参数 | ArgumentParser, add_argument() |
os 与 shutil 常用操作
python
import os
import shutil
# ===== os 模块 =====
# 路径操作
os.getcwd() # 获取当前工作目录
os.chdir('/path/to/dir') # 切换目录
os.path.exists('/path') # 路径是否存在
os.path.isfile('file.txt') # 是否是文件
os.path.isdir('/dir') # 是否是目录
os.path.basename('/path/to/file') # 获取文件名
os.path.dirname('/path/to/file') # 获取目录名
os.path.split('/path/to/file') # 分割目录和文件名
os.path.join('/path', 'to', 'file') # 拼接路径
os.path.splitext('file.txt') # 分割扩展名
# 文件操作
os.remove('file.txt') # 删除文件
os.rename('old', 'new') # 重命名
os.mkdir('new_dir') # 创建目录
os.makedirs('a/b/c') # 递归创建目录
os.listdir('.') # 列出目录内容
os.walk('/path') # 遍历目录树
# 环境变量
os.environ.get('HOME') # 获取环境变量
os.environ['PATH'] = '/new/path' # 设置环境变量
# 执行系统命令
os.system('ls -la') # 执行命令(简单场景)
os.popen('df -h').read() # 执行命令获取输出
# ===== shutil 模块 =====
shutil.copy('src', 'dst') # 复制文件
shutil.copy2('src', 'dst') # 复制文件(保留元数据)
shutil.copytree('src', 'dst') # 递归复制目录
shutil.move('src', 'dst') # 移动/重命名
shutil.rmtree('dir') # 递归删除目录
shutil.disk_usage('/') # 磁盘使用情况
shutil.which('python3') # 查找命令位置subprocess 执行命令
python
import subprocess
# 推荐方式:subprocess.run()
result = subprocess.run(
['ls', '-la'],
capture_output=True,
text=True
)
print(result.stdout) # 标准输出
print(result.stderr) # 错误输出
print(result.returncode)# 返回码
# 执行复杂命令(使用 shell=True)
result = subprocess.run(
'ps aux | grep nginx',
shell=True,
capture_output=True,
text=True
)
# Popen 用于需要实时输出的场景
process = subprocess.Popen(
['tail', '-f', '/var/log/syslog'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 读取实时输出
for line in process.stdout:
print(line.strip())
# 管道连接多个命令
p1 = subprocess.Popen(['cat', 'file.txt'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'error'], stdin=p1.stdout, stdout=subprocess.PIPE)
output = p2.communicate()[0]常用第三方库
requests - HTTP 请求库
python
import requests
# GET 请求
response = requests.get('https://api.example.com/data')
print(response.status_code) # 状态码
print(response.json()) # JSON 响应
print(response.text) # 文本响应
print(response.headers) # 响应头
# POST 请求
data = {'username': 'admin', 'password': '123456'}
response = requests.post('https://api.example.com/login', json=data)
# 带 headers 和 cookies
headers = {'User-Agent': 'Mozilla/5.0'}
cookies = {'session_id': 'abc123'}
response = requests.get(url, headers=headers, cookies=cookies)
# 超时设置
try:
response = requests.get(url, timeout=10)
except requests.Timeout:
print("请求超时")
# 会话对象(保持连接)
session = requests.Session()
session.headers.update({'Authorization': 'Bearer token'})
session.get('https://api.example.com/resource1')
session.get('https://api.example.com/resource2')
# 文件上传
files = {'file': open('report.pdf', 'rb')}
requests.post('https://api.example.com/upload', files=files)paramiko - SSH 远程连接
python
import paramiko
# SSH 连接执行命令
def ssh_exec(host, username, password, command):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=host, username=username, password=password)
stdin, stdout, stderr = client.exec_command(command)
print(stdout.read().decode())
print(stderr.read().decode())
finally:
client.close()
# SFTP 文件传输
def sftp_upload(host, username, password, local_file, remote_path):
transport = paramiko.Transport((host, 22))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(local_file, remote_path)
sftp.close()
transport.close()psutil - 系统和进程信息
python
import psutil
# CPU 信息
psutil.cpu_percent(interval=1) # CPU 使用率
psutil.cpu_count(logical=False) # 物理核心数
psutil.cpu_count(logical=True) # 逻辑核心数
psutil.cpu_times() # CPU 时间统计
# 内存信息
mem = psutil.virtual_memory()
print(f"总内存: {mem.total / 1024**3:.1f} GB")
print(f"可用内存: {mem.available / 1024**3:.1f} GB")
print(f"内存使用率: {mem.percent}%")
# 磁盘信息
disk = psutil.disk_usage('/')
print(f"磁盘总量: {disk.total / 1024**3:.1f} GB")
print(f"磁盘使用率: {disk.percent}%")
# 网络信息
psutil.net_io_counters() # 网络 IO 统计
psutil.net_connections() # 网络连接列表
# 进程管理
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
if proc.info['cpu_percent'] > 50:
print(proc.info)
# 获取特定进程
proc = psutil.Process(pid)
proc.name() # 进程名
proc.status() # 进程状态
proc.memory_info() # 内存信息
proc.cpu_percent() # CPU 使用率
proc.terminate() # 终止进程Jinja2 - 模板引擎
python
from jinja2 import Template, Environment, FileSystemLoader
# 基本模板渲染
template_str = """
服务器名称: {{ server_name }}
IP 地址: {{ ip_address }}
{% for item in ports %}
端口: {{ item }}
{% endfor %}
"""
template = Template(template_str)
result = template.render(
server_name='web01',
ip_address='192.168.1.100',
ports=[80, 443, 8080]
)
print(result)
# 文件模板
env = Environment(loader=FileSystemLoader('./templates'))
template = env.get_template('nginx.conf.j2')
config = template.render(
server_name='example.com',
listen_port=443,
worker_processes=4
)
with open('/etc/nginx/nginx.conf', 'w') as f:
f.write(config)自动化运维脚本
批量部署脚本
python
#!/usr/bin/env python3
"""
批量部署工具
支持多主机并行部署应用
"""
import concurrent.futures
import paramiko
import yaml
from pathlib import Path
class Deployer:
def __init__(self, config_file):
with open(config_file) as f:
self.config = yaml.safe_load(f)
def deploy_to_host(self, host):
"""部署到单台主机"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(
hostname=host['ip'],
username=host['user'],
key_filename=self.config['ssh_key']
)
# 1. 同步代码
self._sync_code(ssh, host['deploy_path'])
# 2. 安装依赖
self._install_deps(ssh, host['deploy_path'])
# 3. 重启服务
self._restart_service(ssh, host['service_name'])
return {'host': host['ip'], 'status': 'success'}
except Exception as e:
return {'host': host['ip'], 'status': 'failed', 'error': str(e)}
finally:
ssh.close()
def _sync_code(self, ssh, deploy_path):
"""同步代码"""
commands = [
f'cd {deploy_path}',
'git pull origin main',
'git checkout release'
]
ssh.exec_command(' && '.join(commands))
def _install_deps(self, ssh, deploy_path):
"""安装依赖"""
ssh.exec_command(f'cd {deploy_path} && pip install -r requirements.txt')
def _restart_service(self, ssh, service):
"""重启服务"""
ssh.exec_command(f'sudo systemctl restart {service}')
def batch_deploy(self, max_workers=5):
"""并行批量部署"""
hosts = self.config['hosts']
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(self.deploy_to_host, hosts))
# 输出结果
success = sum(1 for r in results if r['status'] == 'success')
failed = len(results) - success
print(f"\n部署完成: 成功 {success}, 失败 {failed}")
for result in results:
if result['status'] == 'failed':
print(f" [失败] {result['host']}: {result.get('error')}")
if __name__ == '__main__':
deployer = Deployer('deploy_config.yaml')
deployer.batch_deploy()监控告警脚本
python
#!/usr/bin/env python3
"""
Prometheus 告警转发脚本
将 Prometheus 告警推送到钉钉/企微/飞书
"""
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
import requests
from datetime import datetime
class AlertNotifier:
def __init__(self, webhook_url, secret=None):
self.webhook_url = webhook_url
self.secret = secret
def _generate_sign(self, timestamp):
"""生成签名(钉钉)"""
string_to_sign = f'{timestamp}\n{self.secret}'
hmac_code = hmac.new(
self.secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def format_alert_message(self, alerts):
"""格式化告警消息"""
lines = []
lines.append(f"## 🚨 Prometheus 告警通知\n")
lines.append(f"> 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
lines.append("---\n")
for alert in alerts:
status = alert.get('status', '')
emoji = '🔴' if status == 'firing' else '🟢'
labels = alert.get('labels', {})
annotations = alert.get('annotations', {})
lines.append(f"### {emoji} {labels.get('alertname', 'Unknown')}")
lines.append(f"- **级别**: {labels.get('severity', 'unknown')}")
lines.append(f"- **实例**: {labels.get('instance', '-')}")
lines.append(f"- **描述**: {annotations.get('description', '-')}")
lines.append(f"- **时间**: {alert.get('startsAt', '-')[:19]}\n")
return '\n'.join(lines)
def send_dingtalk(self, alerts):
"""发送钉钉告警"""
timestamp = str(int(time.time() * 1000))
sign = self._generate_sign(timestamp)
url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
payload = {
"msgtype": "markdown",
"markdown": {
"title": "Prometheus 告警",
"text": self.format_alert_message(alerts)
}
}
response = requests.post(url, json=payload)
return response.json()
def handle_alert(request_data):
"""Webhook 处理函数"""
try:
data = json.loads(request_data)
alerts = data.get('alerts', [])
if not alerts:
return {"status": "no alerts"}
notifier = AlertNotifier(
webhook_url="https://oapi.dingtalk.com/robot/send?access_token=xxx",
secret="SEC_xxx"
)
result = notifier.send_dingtalk(alerts)
return result
except Exception as e:
return {"status": "error", "message": str(e)}
if __name__ == '__main__':
# Flask Webhook 服务示例
from flask import Flask, request
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def webhook():
return handle_alert(request.data)
app.run(host='0.0.0.0', port=5000)日志分析与清理脚本
python
#!/usr/bin/env python3
"""
日志分析清理工具
功能:
1. 分析日志中的错误和警告
2. 清理过期日志
3. 生成统计报告
"""
import os
import re
import gzip
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from collections import Counter, defaultdict
class LogAnalyzer:
def __init__(self, log_dir, retention_days=30):
self.log_dir = Path(log_dir)
self.retention_days = retention_days
# 编译正则表达式
self.error_pattern = re.compile(r'(ERROR|FATAL|Exception)', re.IGNORECASE)
self.warning_pattern = re.compile(r'WARN', re.IGNORECASE)
self.ip_pattern = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
self.date_pattern = re.compile(r'\d{4}-\d{2}-\d{2}')
def analyze_log_file(self, log_file):
"""分析单个日志文件"""
stats = {
'total_lines': 0,
'errors': [],
'warnings': [],
'top_ips': Counter(),
'error_types': Counter(),
}
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
stats['total_lines'] += 1
# 检测错误
if self.error_pattern.search(line):
stats['errors'].append({
'line': line_num,
'content': line.strip()[:200],
'type': self.error_pattern.search(line).group()
})
# 统计错误类型
error_type = self._extract_error_type(line)
if error_type:
stats['error_types'][error_type] += 1
# 检测警告
elif self.warning_pattern.search(line):
stats['warnings'].append({
'line': line_num,
'content': line.strip()[:200]
})
# 统计 IP
ips = self.ip_pattern.findall(line)
for ip in ips:
stats['top_ips'][ip] += 1
return stats
def _extract_error_type(self, line):
"""提取错误类型"""
patterns = [
r'(\w+Exception)',
r'(\w+Error)',
r'ERROR:\s*(\w+)',
]
for pattern in patterns:
match = re.search(pattern, line)
if match:
return match.group(1)
return None
def generate_report(self, output_file=None):
"""生成分析报告"""
report_lines = []
report_lines.append("=" * 60)
report_lines.append(f"日志分析报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report_lines.append(f"日志目录: {self.log_dir}")
report_lines.append("=" * 60)
report_lines.append("")
total_stats = {
'total_files': 0,
'total_lines': 0,
'total_errors': 0,
'total_warnings': 0,
'all_ips': Counter(),
'all_errors': Counter(),
}
# 分析所有日志文件
for log_file in sorted(self.log_dir.glob('*.log')):
if not log_file.is_file():
continue
total_stats['total_files'] += 1
stats = self.analyze_log_file(log_file)
total_stats['total_lines'] += stats['total_lines']
total_stats['total_errors'] += len(stats['errors'])
total_stats['total_warnings'] += len(stats['warnings'])
total_stats['all_ips'] += stats['top_ips']
total_stats['all_errors'] += stats['error_types']
# 单文件摘要
report_lines.append(f"【{log_file.name}】")
report_lines.append(f" 总行数: {stats['total_lines']:,}")
report_lines.append(f" 错误数: {len(stats['errors'])}")
report_lines.append(f" 警告数: {len(stats['warnings'])}")
report_lines.append("")
# 汇总统计
report_lines.append("-" * 40)
report_lines.append("【汇总统计】")
report_lines.append(f" 日志文件总数: {total_stats['total_files']}")
report_lines.append(f" 总行数: {total_stats['total_lines']:,}")
report_lines.append(f" 总错误数: {total_stats['total_errors']:,}")
report_lines.append(f" 总警告数: {total_stats['total_warnings']:,}")
report_lines.append("")
# Top 10 IP
report_lines.append("【Top 10 访问 IP】")
for ip, count in total_stats['all_ips'].most_common(10):
report_lines.append(f" {ip}: {count:,}")
report_lines.append("")
# Top 10 错误类型
report_lines.append("【Top 10 错误类型】")
for error, count in total_stats['all_errors'].most_common(10):
report_lines.append(f" {error}: {count}")
report_content = '\n'.join(report_lines)
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report_content)
print(f"报告已保存至: {output_file}")
return report_content
def cleanup_old_logs(self, dry_run=True):
"""清理过期日志"""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
cleaned_files = []
for log_file in self.log_dir.iterdir():
if not log_file.is_file():
continue
# 检查修改时间
mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
if mtime < cutoff_date:
cleaned_files.append({
'file': str(log_file),
'size': log_file.stat().st_size,
'mtime': mtime.strftime('%Y-%m-%d %H:%M:%S')
})
if not dry_run:
# 先压缩再删除原文件
with open(log_file, 'rb') as f_in:
with gzip.open(f'{log_file}.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
log_file.unlink()
print(f"{'预览' if dry_run else '清理'}结果:")
print(f"找到 {len(cleaned_files)} 个超过 {self.retention_days} 天的日志文件")
for item in cleaned_files:
action = '将压缩并删除' if not dry_run else '待清理'
print(f" [{action}] {item['file']} ({item['size']/1024/1024:.1f}MB, {item['mtime']})")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='日志分析清理工具')
parser.add_argument('--log-dir', required=True, help='日志目录')
parser.add_argument('--retention-days', type=int, default=30, help='保留天数')
parser.add_argument('--report', help='报告输出文件')
parser.add_argument('--cleanup', action='store_true', help='执行清理')
args = parser.parse_args()
analyzer = LogAnalyzer(args.log_dir, args.retention_days)
analyzer.generate_report(args.report)
if args.cleanup:
analyzer.cleanup_old_logs(dry_run=False)
else:
analyzer.cleanup_old_logs(dry_run=True)Python 包管理与虚拟环境
pip 常用命令
bash
# 安装包
pip install requests
pip install requests==2.28.0 # 指定版本
pip install 'requests>=2.25,<3.0' # 版本范围
pip install -r requirements.txt # 从文件安装
# 升级包
pip install --upgrade requests
pip install -U requests
# 卸载包
pip uninstall requests
# 查看已安装包
pip list
pip freeze # 导出依赖列表
pip freeze > requirements.txt # 保存到文件
# 搜索包
pip search keyword
# 查看包信息
pip show requests
pip show -f requests # 显示所有文件
# 缓存管理
pip cache info
pip cache purge # 清除缓存venv 虚拟环境
bash
# 创建虚拟环境
python3 -m venv myenv
# 激活虚拟环境
source myenv/bin/activate # Linux/Mac
myenv\Scripts\activate # Windows
# 退出虚拟环境
deactivate
# 删除虚拟环境
rm -rf myenvpyproject.toml / setup.py 项目打包
toml
# pyproject.toml
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "my-tool"
version = "1.0.0"
description = "我的运维工具"
requires-python = ">=3.8"
dependencies = [
"requests>=2.28.0",
"paramiko>=3.0.0",
]
[project.scripts]
mytool = "mytool.cli:main"