Skip to content

Shell 脚本

基础概念

什么是 Shell?常见的 Shell 类型有哪些?

答:Shell 是用户与 Linux 内核交互的命令解释器,它接收用户命令并调用相应程序执行。

常见 Shell 类型

Shell特点配置文件
sh (Bourne)原始 Unix Shell/etc/profile
bash (Bourne Again)默认 Shell,功能丰富~/.bashrc, ~/.bash_profile
zsh功能强大,可定制性高~/.zshrc
csh/tcshC 风格语法~/.cshrc
kshKornShell,兼容 sh~/.kshrc
fish友好易用~/.config/fish/config.fish

Shell 脚本的基本结构是什么?

bash
#!/bin/bash
# 指定解释器路径(Shebang)

# 注释说明
# 作者: xxx
# 日期: 2024-01-15
# 描述: 示例脚本

# 设置严格模式
set -euo pipefail

# 定义变量
NAME="World"

# 函数定义
greet() {
    echo "Hello, ${NAME}!"
}

# 主逻辑
main() {
    greet
    echo "Script completed."
}

# 执行主函数
main "$@"

变量与数据类型

如何定义和使用变量?

bash
# 基本变量赋值
name="John"
age=25

# 使用变量
echo $name          # John
echo "${name}"      # John(推荐,更安全)
echo "My name is ${name}, I'm ${age} years old"

# 只读变量
readonly PI=3.14159
declare -r CONSTANT="不可修改"

# 删除变量
unset name

# 环境变量
export PATH="/usr/local/bin:$PATH"
export JAVA_HOME=/usr/lib/jvm/java-8

# 特殊变量
$0      # 脚本名称
$1-$9   # 第1到第9个参数
$#      # 参数个数
$*      # 所有参数(单个字符串)
$@      # 所有参数(独立字符串)
$$      # 当前进程 PID
$?      # 上一个命令的退出状态
$!      # 后台进程的 PID

数组操作

bash
# 定义数组
fruits=("apple" "banana" "cherry")
numbers=(1 2 3 4 5)

# 访问元素
echo ${fruits[0]}        # apple
echo ${fruits[@]}        # 所有元素
echo ${fruits[*]}        # 所有元素
echo ${#fruits[@]}       # 数组长度

# 切片
echo ${fruits[@]:0:2}    # apple banana(前2个)
echo ${fruits[@]:1}      # banana cherry(从索引1开始)

# 追加元素
fruits+=("date")

# 删除元素
unset fruits[1]

# 关联数组(类似字典)
declare -A person
person["name"]="Alice"
person["age"]=30
echo ${person["name"]}   # Alice

字符串处理

常用字符串操作

bash
str="Hello World, Welcome to Shell Scripting"

# 字符串长度
echo ${#str}             # 38

# 子字符串提取
echo ${str:0:5}          # Hello
echo ${str:6:5}          # World

# 子字符串替换
echo ${str/World/Linux}  # 替换第一个
echo ${str//o/O}         # 替换所有

# 删除子串
echo ${str#Hello }       # 从开头删除最短匹配
echo ${str##* ,}         # 从开头删除最长匹配
echo ${str%Scripting}    # 从结尾删除最短匹配
echo ${str%%S*}          # 从结尾删除最长匹配

# 大小写转换
echo ${str^^}            # 全部转大写
echo ${str,,}            # 全部转小写

# 默认值
echo ${var:-default}     # var 未设置或为空时返回 default
echo ${var:=default}     # var 未设置或为空时赋值并返回
echo ${var:+set}         # var 已设置时返回 set
echo ${var:?error}       # var 未设置或为空时报错

# 检查是否包含子串
if [[ "$str" == *"Welcome"* ]]; then
    echo "包含 Welcome"
fi

条件判断与流程控制

if 条件语句

bash
# 基本语法
if [ condition ]; then
    # commands
elif [ condition ]; then
    # commands
else
    # commands
fi

# 文件测试
if [ -f "/etc/passwd" ]; then
    echo "文件存在"
fi

if [ -d "/tmp" ]; then
    echo "目录存在"
fi

if [ -r "/etc/shadow" ]; then
    echo "文件可读"
fi

if [ -w "/tmp" ]; then
    echo "目录可写"
fi

if [ -x "/bin/ls" ]; then
    echo "文件可执行"
fi

# 数值比较
num=10
if [ $num -eq 10 ]; then echo "等于10"; fi
if [ $num -ne 5 ]; then  echo "不等于5"; fi
if [ $num -gt 5 ]; then  echo "大于5"; fi
if [ $num -ge 10 ]; then echo "大于等于10"; fi
if [ $num -lt 20 ]; then echo "小于20"; fi
if [ $num -le 10 ]; then echo "小于等于10"; fi

# 字符串比较
if [ "$str" = "hello" ]; then echo "相等"; fi
if [ "$str" != "world" ]; then echo "不相等"; fi
if [ -z "$empty" ]; then echo "空字符串"; fi
if [ -n "$str" ]; then echo "非空字符串"; fi

# 逻辑运算
if [ -f file ] && [ -r file ]; then echo "文件存在且可读"; fi
if [ -d dir ] || [ -L link ]; then echo "目录或链接"; fi

case 语句

bash
case "$1" in
    start)
        echo "启动服务"
        ;;
    stop)
        echo "停止服务"
        ;;
    restart|reload)
        echo "重启服务"
        ;;
    status)
        echo "查看状态"
        ;;
    *)
        echo "用法: $0 {start|stop|restart|status}"
        exit 1
        ;;
esac

循环语句

bash
# for 循环
for i in {1..10}; do
    echo "数字: $i"
done

# 遍历数组
fruits=("apple" "banana" "cherry")
for fruit in "${fruits[@]}"; do
    echo "水果: $fruit"
done

# C 风格 for 循环
for ((i=0; i<10; i++)); do
    echo "索引: $i"
done

# while 循环
count=0
while [ $count -lt 5 ]; do
    echo "计数: $count"
    ((count++))
done

# until 循环(条件为真时退出)
until [ $count -ge 10 ]; do
    echo "直到: $count"
    ((count++))
done

# 无限循环(配合 break)
while true; do
    read -p "输入 q 退出: " input
    if [ "$input" = "q" ]; then
        break
    fi
done

# continue 跳过当前迭代
for i in {1..10}; do
    if [ $((i % 2)) -eq 0 ]; then
        continue
    fi
    echo "奇数: $i"
done

函数

如何定义和调用函数?

bash
# 基本语法
function_name() {
    local var="局部变量"
    echo "函数被调用"
    return 0  # 返回退出码 0-255
}

# 或使用 function 关键字
function function_name {
    echo "另一种定义方式"
}

# 调用函数
function_name

# 带参数的函数
greet() {
    local name=$1
    local age=${2:-18}  # 默认值
    echo "你好, $name! 你今年 $age 岁。"
    return 0
}

greet "张三" 25
greet "李四"  # age 使用默认值 18

# 返回值(通过 echo 输出)
calculate_sum() {
    local a=$1
    local b=$2
    echo $((a + b))  # 通过 stdout 返回结果
}

result=$(calculate_sum 10 20)
echo "结果是: $result"  # 30

常用命令

文件和目录操作

bash
# 创建目录
mkdir -p /path/to/nested/dirs  # 递归创建

# 复制文件
cp file1.txt file2.txt
cp -r source_dir/ dest_dir/    # 递归复制目录
cp -a source dest              # 保留属性复制

# 移动/重命名
mv old_name new_name
mv file /new/location/

# 删除
rm file.txt
rm -rf directory/              # 强制递归删除(谨慎使用!)

# 查找文件
find /var/log -name "*.log" -mtime +7  # 7天前的日志文件
find /home -type f -size +100M         # 大于100M的文件
find . -name "*.py" -exec rm {} \;     # 查找并删除

# 查看文件内容
cat file.txt           # 显示全部内容
less file.txt          # 分页显示
head -n 20 file.txt    # 前20行
tail -n 20 file.txt    # 后20行
tail -f logfile.log    # 实时跟踪日志
grep "pattern" file    # 搜索匹配行
wc -l file.txt         # 统计行数

文本处理三剑客:grep、sed、awk

grep - 文本搜索

bash
# 基本搜索
grep "error" logfile.log
grep -i "error" logfile.log      # 忽略大小写
grep -r "pattern" /etc/          # 递归搜索目录
grep -v "comment" file           # 反向匹配(不包含)

# 正则表达式
grep -E "^Error:" file           # 以 Error: 开头
grep -E "[0-9]{3}-[0-9]{4}" file  # 匹配电话号码格式
grep -c "pattern" file           # 统计匹配次数
grep -l "pattern" *.txt          # 只列出文件名
grep -A 3 -B 2 "error" log       # 显示后3行前2行上下文

sed - 流编辑器

bash
# 替换文本
sed 's/old/new/g' file.txt               # 全局替换
sed -i.bak 's/foo/bar/g' file.txt        # 直接修改并备份

# 删除行
sed '/^$/d' file.txt                     # 删除空行
sed '1,10d' file.txt                     # 删除第1-10行
sed '/pattern/d' file.txt                # 删除匹配行

# 打印指定行
sed -n '10,20p' file.txt                 # 打印第10-20行
sed -n '/start/,/end/p' file.txt         # 打印两个模式之间的行

# 其他常用操作
sed -i 's/^/#/' file.txt                # 行首添加注释
sed -i 's/\r$//' file.txt               # 删除 Windows 换行符

awk - 文本处理语言

bash
# 基本用法
awk '{print $1}' file.txt               # 打印第一列
awk '{print NR": "$0}' file.txt         # 带行号打印全部内容

# 指定分隔符
awk -F: '{print $1}' /etc/passwd        # 以冒号为分隔符
awk -F',' '{print $1,$3}' data.csv      # CSV 文件处理

# 条件过滤
awk '$3 > 100' file.txt                  # 第三列大于100的行
awk '/error/ {print}' file.txt           # 包含 error 的行
awk 'NR > 10 && NR < 20' file.txt       # 第11-19行

# 统计计算
awk '{sum+=$1; count++} END {print sum/count}' file.txt  # 平均值
awk '{arr[$1]++} END {for(k in arr) print k, arr[k]}' file.txt  # 统计频率

# 实用示例
# 统计各 HTTP 状态码数量
awk '{print $9}' access.log | sort | uniq -c | sort -rn

# 计算总流量
awk '{sum+=$10} END {printf "%.2f MB\n", sum/1024/1024}' access.log

进程管理

bash
# 查看进程
ps aux                    # 所有进程
ps -ef                   # 完整格式
ps aux | grep nginx      # 过滤特定进程

# 实时监控进程
top                       # 动态显示
htop                      # 更友好的界面(需安装)

# 进程控制
kill PID                 # 终止进程
kill -9 PID              # 强制终止
kill -HUP PID            # 重载配置(优雅重启)
pkill -f "nginx"         # 按名称杀死进程
killall nginx            # 杀死所有同名进程

# 后台运行
command &                # 后台执行
nohup command &          # 断开终端继续运行
jobs                     # 查看后台任务
fg %1                    # 前台恢复任务
bg %1                    # 后台恢复暂停的任务

# 查看端口占用
netstat -tlnp | grep 80
ss -tlnp | grep 80
lsof -i :80

网络诊断

bash
# 连通性测试
ping google.com
ping -c 4 192.168.1.1    # 发送4个包
traceroute target.com    # 路由跟踪
mtr target.com           # 结合 ping 和 traceroute

# DNS 解析
nslookup domain.com
dig domain.com
host domain.com

# 端口扫描
nc -zv host port          # 测试端口连通性
telnet host port          # 测试 TCP 连接
curl -I http://example.com  # 查看 HTTP 头信息

# 网络配置
ip addr show              # 查看IP地址
ip route show             # 查看路由表
ifconfig                 # 传统方式(已废弃)

# 抓包
tcpdump -i eth0 port 80  # 抓取80端口数据包
tcpdump -i eth0 host 192.168.1.100  # 抓取特定主机

实用脚本案例

日志分析脚本

bash
#!/bin/bash
# analyze_log.sh - 分析 Nginx 访问日志

LOG_FILE="${1:-/var/log/nginx/access.log}"
TOP_N=10

echo "=== Nginx 日志分析报告 ==="
echo "日志文件: $LOG_FILE"
echo "生成时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo ""

# Top $TOP_N IP 地址
echo "--- Top $TOP_N 访问 IP ---"
awk '{print $1}' "$LOG_FILE" | sort | uniq -c | sort -rn | head -$TOP_N
echo ""

# Top $TOP_N 访问 URL
echo "--- Top $TOP_N 访问 URL ---"
awk '{print $7}' "$LOG_FILE" | sort | uniq -c | sort -rn | head -$TOP_N
echo ""

# HTTP 状态码统计
echo "--- HTTP 状态码统计 ---"
awk '{print $9}' "$LOG_FILE" | sort | uniq -c | sort -rn
echo ""

# 总访问量和独立 IP 数
echo "--- 总体统计 ---"
TOTAL_LINES=$(wc -l < "$LOG_FILE")
UNIQUE_IPS=$(awk '{print $1}' "$LOG_FILE" | sort -u | wc -l)
echo "总请求数: $TOTAL_LINES"
echo "独立 IP 数: $UNIQUE_IPS"

服务监控脚本

bash
#!/bin/bash
# monitor.sh - 基础服务健康检查

SERVICES=("nginx" "mysql" "redis")
ALERT_EMAIL="admin@example.com"
LOG_FILE="/var/log/service_monitor.log"

check_service() {
    local service=$1
    if systemctl is-active --quiet "$service"; then
        echo "[OK] $service 运行正常"
        return 0
    else
        echo "[ERROR] $service 未运行!" 
        return 1
    fi
}

check_disk_space() {
    local threshold=85
    local usage=$(df / | awk 'NR==2{print $5}' | tr -d '%')
    
    if [ "$usage" -gt "$threshold" ]; then
        echo "[WARNING] 磁盘使用率: ${usage}% (阈值: ${threshold}%)"
        return 1
    else
        echo "[OK] 磁盘使用率: ${usage}%"
        return 0
    fi
}

check_memory() {
    local total_mem=$(free -m | awk '/Mem:/ {print $2}')
    local used_mem=$(free -m | awk '/Mem:/ {print $3}')
    local usage=$((used_mem * 100 / total_mem))
    
    if [ "$usage" -gt 90 ]; then
        echo "[WARNING] 内存使用率: ${usage}%"
        return 1
    else
        echo "[OK] 内存使用率: ${usage}%"
        return 0
    fi
}

main() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') 开始检查..." >> "$LOG_FILE"
    
    has_error=0
    
    # 检查服务
    for service in "${SERVICES[@]}"; do
        if ! check_service "$service"; then
            has_error=1
        fi
    done
    
    # 检查资源
    check_disk_space || has_error=1
    check_memory || has_error=1
    
    if [ $has_error -eq 1 ]; then
        echo "发现异常,发送告警邮件..."
        # mail -s "服务告警" "$ALERT_EMAIL" < "$LOG_FILE"
    fi
}

main

Python 脚本

基础知识

Python 运维常用的标准库有哪些?

库名用途常用模块
os操作系统接口os.path, os.environ, os.system
sys系统相关sys.argv, sys.path, sys.exit
subprocess子进程管理run(), Popen(), call()
shutil高级文件操作copy(), move(), rmtree()
glob文件名模式匹配glob(), iglob()
re正则表达式match(), search(), sub()
jsonJSON 处理loads(), dumps()
csvCSV 文件处理reader(), writer()
logging日志记录getLogger(), basicConfig()
datetime日期时间datetime, timedelta, strftime
socket网络编程socket(), connect(), send()
threading多线程Thread, Lock, Event
multiprocessing多进程Process, Pool, Queue
configparser配置文件解析ConfigParser, read()
argparse命令行参数ArgumentParser, add_argument()

os 与 shutil 常用操作

python
import os
import shutil

# ===== os 模块 =====

# 路径操作
os.getcwd()                          # 获取当前工作目录
os.chdir('/path/to/dir')             # 切换目录
os.path.exists('/path')              # 路径是否存在
os.path.isfile('file.txt')           # 是否是文件
os.path.isdir('/dir')                # 是否是目录
os.path.basename('/path/to/file')    # 获取文件名
os.path.dirname('/path/to/file')     # 获取目录名
os.path.split('/path/to/file')       # 分割目录和文件名
os.path.join('/path', 'to', 'file')  # 拼接路径
os.path.splitext('file.txt')         # 分割扩展名

# 文件操作
os.remove('file.txt')                # 删除文件
os.rename('old', 'new')              # 重命名
os.mkdir('new_dir')                  # 创建目录
os.makedirs('a/b/c')                 # 递归创建目录
os.listdir('.')                      # 列出目录内容
os.walk('/path')                     # 遍历目录树

# 环境变量
os.environ.get('HOME')               # 获取环境变量
os.environ['PATH'] = '/new/path'     # 设置环境变量

# 执行系统命令
os.system('ls -la')                  # 执行命令(简单场景)
os.popen('df -h').read()             # 执行命令获取输出

# ===== shutil 模块 =====

shutil.copy('src', 'dst')            # 复制文件
shutil.copy2('src', 'dst')           # 复制文件(保留元数据)
shutil.copytree('src', 'dst')        # 递归复制目录
shutil.move('src', 'dst')            # 移动/重命名
shutil.rmtree('dir')                 # 递归删除目录
shutil.disk_usage('/')               # 磁盘使用情况
shutil.which('python3')              # 查找命令位置

subprocess 执行命令

python
import subprocess

# 推荐方式:subprocess.run()
result = subprocess.run(
    ['ls', '-la'],
    capture_output=True,
    text=True
)
print(result.stdout)    # 标准输出
print(result.stderr)    # 错误输出
print(result.returncode)# 返回码

# 执行复杂命令(使用 shell=True)
result = subprocess.run(
    'ps aux | grep nginx',
    shell=True,
    capture_output=True,
    text=True
)

# Popen 用于需要实时输出的场景
process = subprocess.Popen(
    ['tail', '-f', '/var/log/syslog'],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
)

# 读取实时输出
for line in process.stdout:
    print(line.strip())

# 管道连接多个命令
p1 = subprocess.Popen(['cat', 'file.txt'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', 'error'], stdin=p1.stdout, stdout=subprocess.PIPE)
output = p2.communicate()[0]

常用第三方库

requests - HTTP 请求库

python
import requests

# GET 请求
response = requests.get('https://api.example.com/data')
print(response.status_code)      # 状态码
print(response.json())           # JSON 响应
print(response.text)             # 文本响应
print(response.headers)          # 响应头

# POST 请求
data = {'username': 'admin', 'password': '123456'}
response = requests.post('https://api.example.com/login', json=data)

# 带 headers 和 cookies
headers = {'User-Agent': 'Mozilla/5.0'}
cookies = {'session_id': 'abc123'}
response = requests.get(url, headers=headers, cookies=cookies)

# 超时设置
try:
    response = requests.get(url, timeout=10)
except requests.Timeout:
    print("请求超时")

# 会话对象(保持连接)
session = requests.Session()
session.headers.update({'Authorization': 'Bearer token'})
session.get('https://api.example.com/resource1')
session.get('https://api.example.com/resource2')

# 文件上传
files = {'file': open('report.pdf', 'rb')}
requests.post('https://api.example.com/upload', files=files)

paramiko - SSH 远程连接

python
import paramiko

# SSH 连接执行命令
def ssh_exec(host, username, password, command):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        client.connect(hostname=host, username=username, password=password)
        stdin, stdout, stderr = client.exec_command(command)
        
        print(stdout.read().decode())
        print(stderr.read().decode())
    finally:
        client.close()

# SFTP 文件传输
def sftp_upload(host, username, password, local_file, remote_path):
    transport = paramiko.Transport((host, 22))
    transport.connect(username=username, password=password)
    
    sftp = paramiko.SFTPClient.from_transport(transport)
    sftp.put(local_file, remote_path)
    sftp.close()
    transport.close()

psutil - 系统和进程信息

python
import psutil

# CPU 信息
psutil.cpu_percent(interval=1)        # CPU 使用率
psutil.cpu_count(logical=False)       # 物理核心数
psutil.cpu_count(logical=True)        # 逻辑核心数
psutil.cpu_times()                    # CPU 时间统计

# 内存信息
mem = psutil.virtual_memory()
print(f"总内存: {mem.total / 1024**3:.1f} GB")
print(f"可用内存: {mem.available / 1024**3:.1f} GB")
print(f"内存使用率: {mem.percent}%")

# 磁盘信息
disk = psutil.disk_usage('/')
print(f"磁盘总量: {disk.total / 1024**3:.1f} GB")
print(f"磁盘使用率: {disk.percent}%")

# 网络信息
psutil.net_io_counters()              # 网络 IO 统计
psutil.net_connections()               # 网络连接列表

# 进程管理
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
    if proc.info['cpu_percent'] > 50:
        print(proc.info)

# 获取特定进程
proc = psutil.Process(pid)
proc.name()                           # 进程名
proc.status()                         # 进程状态
proc.memory_info()                    # 内存信息
proc.cpu_percent()                    # CPU 使用率
proc.terminate()                      # 终止进程

Jinja2 - 模板引擎

python
from jinja2 import Template, Environment, FileSystemLoader

# 基本模板渲染
template_str = """
服务器名称: {{ server_name }}
IP 地址: {{ ip_address }}
{% for item in ports %}
端口: {{ item }}
{% endfor %}
"""

template = Template(template_str)
result = template.render(
    server_name='web01',
    ip_address='192.168.1.100',
    ports=[80, 443, 8080]
)
print(result)

# 文件模板
env = Environment(loader=FileSystemLoader('./templates'))
template = env.get_template('nginx.conf.j2')

config = template.render(
    server_name='example.com',
    listen_port=443,
    worker_processes=4
)

with open('/etc/nginx/nginx.conf', 'w') as f:
    f.write(config)

自动化运维脚本

批量部署脚本

python
#!/usr/bin/env python3
"""
批量部署工具
支持多主机并行部署应用
"""

import concurrent.futures
import paramiko
import yaml
from pathlib import Path


class Deployer:
    def __init__(self, config_file):
        with open(config_file) as f:
            self.config = yaml.safe_load(f)
    
    def deploy_to_host(self, host):
        """部署到单台主机"""
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        
        try:
            ssh.connect(
                hostname=host['ip'],
                username=host['user'],
                key_filename=self.config['ssh_key']
            )
            
            # 1. 同步代码
            self._sync_code(ssh, host['deploy_path'])
            
            # 2. 安装依赖
            self._install_deps(ssh, host['deploy_path'])
            
            # 3. 重启服务
            self._restart_service(ssh, host['service_name'])
            
            return {'host': host['ip'], 'status': 'success'}
            
        except Exception as e:
            return {'host': host['ip'], 'status': 'failed', 'error': str(e)}
        finally:
            ssh.close()
    
    def _sync_code(self, ssh, deploy_path):
        """同步代码"""
        commands = [
            f'cd {deploy_path}',
            'git pull origin main',
            'git checkout release'
        ]
        ssh.exec_command(' && '.join(commands))
    
    def _install_deps(self, ssh, deploy_path):
        """安装依赖"""
        ssh.exec_command(f'cd {deploy_path} && pip install -r requirements.txt')
    
    def _restart_service(self, ssh, service):
        """重启服务"""
        ssh.exec_command(f'sudo systemctl restart {service}')
    
    def batch_deploy(self, max_workers=5):
        """并行批量部署"""
        hosts = self.config['hosts']
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            results = list(executor.map(self.deploy_to_host, hosts))
        
        # 输出结果
        success = sum(1 for r in results if r['status'] == 'success')
        failed = len(results) - success
        
        print(f"\n部署完成: 成功 {success}, 失败 {failed}")
        for result in results:
            if result['status'] == 'failed':
                print(f"  [失败] {result['host']}: {result.get('error')}")


if __name__ == '__main__':
    deployer = Deployer('deploy_config.yaml')
    deployer.batch_deploy()

监控告警脚本

python
#!/usr/bin/env python3
"""
Prometheus 告警转发脚本
将 Prometheus 告警推送到钉钉/企微/飞书
"""

import json
import time
import hmac
import hashlib
import base64
import urllib.parse
import requests
from datetime import datetime


class AlertNotifier:
    def __init__(self, webhook_url, secret=None):
        self.webhook_url = webhook_url
        self.secret = secret
    
    def _generate_sign(self, timestamp):
        """生成签名(钉钉)"""
        string_to_sign = f'{timestamp}\n{self.secret}'
        hmac_code = hmac.new(
            self.secret.encode('utf-8'),
            string_to_sign.encode('utf-8'),
            digestmod=hashlib.sha256
        ).digest()
        sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
        return sign
    
    def format_alert_message(self, alerts):
        """格式化告警消息"""
        lines = []
        lines.append(f"## 🚨 Prometheus 告警通知\n")
        lines.append(f"> 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        lines.append("---\n")
        
        for alert in alerts:
            status = alert.get('status', '')
            emoji = '🔴' if status == 'firing' else '🟢'
            
            labels = alert.get('labels', {})
            annotations = alert.get('annotations', {})
            
            lines.append(f"### {emoji} {labels.get('alertname', 'Unknown')}")
            lines.append(f"- **级别**: {labels.get('severity', 'unknown')}")
            lines.append(f"- **实例**: {labels.get('instance', '-')}")
            lines.append(f"- **描述**: {annotations.get('description', '-')}")
            lines.append(f"- **时间**: {alert.get('startsAt', '-')[:19]}\n")
        
        return '\n'.join(lines)
    
    def send_dingtalk(self, alerts):
        """发送钉钉告警"""
        timestamp = str(int(time.time() * 1000))
        sign = self._generate_sign(timestamp)
        
        url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
        
        payload = {
            "msgtype": "markdown",
            "markdown": {
                "title": "Prometheus 告警",
                "text": self.format_alert_message(alerts)
            }
        }
        
        response = requests.post(url, json=payload)
        return response.json()


def handle_alert(request_data):
    """Webhook 处理函数"""
    try:
        data = json.loads(request_data)
        alerts = data.get('alerts', [])
        
        if not alerts:
            return {"status": "no alerts"}
        
        notifier = AlertNotifier(
            webhook_url="https://oapi.dingtalk.com/robot/send?access_token=xxx",
            secret="SEC_xxx"
        )
        
        result = notifier.send_dingtalk(alerts)
        return result
        
    except Exception as e:
        return {"status": "error", "message": str(e)}


if __name__ == '__main__':
    # Flask Webhook 服务示例
    from flask import Flask, request
    
    app = Flask(__name__)
    
    @app.route('/webhook', methods=['POST'])
    def webhook():
        return handle_alert(request.data)
    
    app.run(host='0.0.0.0', port=5000)

日志分析与清理脚本

python
#!/usr/bin/env python3
"""
日志分析清理工具
功能:
1. 分析日志中的错误和警告
2. 清理过期日志
3. 生成统计报告
"""

import os
import re
import gzip
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from collections import Counter, defaultdict


class LogAnalyzer:
    def __init__(self, log_dir, retention_days=30):
        self.log_dir = Path(log_dir)
        self.retention_days = retention_days
        
        # 编译正则表达式
        self.error_pattern = re.compile(r'(ERROR|FATAL|Exception)', re.IGNORECASE)
        self.warning_pattern = re.compile(r'WARN', re.IGNORECASE)
        self.ip_pattern = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
        self.date_pattern = re.compile(r'\d{4}-\d{2}-\d{2}')
    
    def analyze_log_file(self, log_file):
        """分析单个日志文件"""
        stats = {
            'total_lines': 0,
            'errors': [],
            'warnings': [],
            'top_ips': Counter(),
            'error_types': Counter(),
        }
        
        with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
            for line_num, line in enumerate(f, 1):
                stats['total_lines'] += 1
                
                # 检测错误
                if self.error_pattern.search(line):
                    stats['errors'].append({
                        'line': line_num,
                        'content': line.strip()[:200],
                        'type': self.error_pattern.search(line).group()
                    })
                    # 统计错误类型
                    error_type = self._extract_error_type(line)
                    if error_type:
                        stats['error_types'][error_type] += 1
                
                # 检测警告
                elif self.warning_pattern.search(line):
                    stats['warnings'].append({
                        'line': line_num,
                        'content': line.strip()[:200]
                    })
                
                # 统计 IP
                ips = self.ip_pattern.findall(line)
                for ip in ips:
                    stats['top_ips'][ip] += 1
        
        return stats
    
    def _extract_error_type(self, line):
        """提取错误类型"""
        patterns = [
            r'(\w+Exception)',
            r'(\w+Error)',
            r'ERROR:\s*(\w+)',
        ]
        for pattern in patterns:
            match = re.search(pattern, line)
            if match:
                return match.group(1)
        return None
    
    def generate_report(self, output_file=None):
        """生成分析报告"""
        report_lines = []
        report_lines.append("=" * 60)
        report_lines.append(f"日志分析报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report_lines.append(f"日志目录: {self.log_dir}")
        report_lines.append("=" * 60)
        report_lines.append("")
        
        total_stats = {
            'total_files': 0,
            'total_lines': 0,
            'total_errors': 0,
            'total_warnings': 0,
            'all_ips': Counter(),
            'all_errors': Counter(),
        }
        
        # 分析所有日志文件
        for log_file in sorted(self.log_dir.glob('*.log')):
            if not log_file.is_file():
                continue
            
            total_stats['total_files'] += 1
            stats = self.analyze_log_file(log_file)
            
            total_stats['total_lines'] += stats['total_lines']
            total_stats['total_errors'] += len(stats['errors'])
            total_stats['total_warnings'] += len(stats['warnings'])
            total_stats['all_ips'] += stats['top_ips']
            total_stats['all_errors'] += stats['error_types']
            
            # 单文件摘要
            report_lines.append(f"【{log_file.name}】")
            report_lines.append(f"  总行数: {stats['total_lines']:,}")
            report_lines.append(f"  错误数: {len(stats['errors'])}")
            report_lines.append(f"  警告数: {len(stats['warnings'])}")
            report_lines.append("")
        
        # 汇总统计
        report_lines.append("-" * 40)
        report_lines.append("【汇总统计】")
        report_lines.append(f"  日志文件总数: {total_stats['total_files']}")
        report_lines.append(f"  总行数: {total_stats['total_lines']:,}")
        report_lines.append(f"  总错误数: {total_stats['total_errors']:,}")
        report_lines.append(f"  总警告数: {total_stats['total_warnings']:,}")
        report_lines.append("")
        
        # Top 10 IP
        report_lines.append("【Top 10 访问 IP】")
        for ip, count in total_stats['all_ips'].most_common(10):
            report_lines.append(f"  {ip}: {count:,}")
        report_lines.append("")
        
        # Top 10 错误类型
        report_lines.append("【Top 10 错误类型】")
        for error, count in total_stats['all_errors'].most_common(10):
            report_lines.append(f"  {error}: {count}")
        
        report_content = '\n'.join(report_lines)
        
        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(report_content)
            print(f"报告已保存至: {output_file}")
        
        return report_content
    
    def cleanup_old_logs(self, dry_run=True):
        """清理过期日志"""
        cutoff_date = datetime.now() - timedelta(days=self.retention_days)
        cleaned_files = []
        
        for log_file in self.log_dir.iterdir():
            if not log_file.is_file():
                continue
            
            # 检查修改时间
            mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
            if mtime < cutoff_date:
                cleaned_files.append({
                    'file': str(log_file),
                    'size': log_file.stat().st_size,
                    'mtime': mtime.strftime('%Y-%m-%d %H:%M:%S')
                })
                
                if not dry_run:
                    # 先压缩再删除原文件
                    with open(log_file, 'rb') as f_in:
                        with gzip.open(f'{log_file}.gz', 'wb') as f_out:
                            shutil.copyfileobj(f_in, f_out)
                    log_file.unlink()
        
        print(f"{'预览' if dry_run else '清理'}结果:")
        print(f"找到 {len(cleaned_files)} 个超过 {self.retention_days} 天的日志文件")
        for item in cleaned_files:
            action = '将压缩并删除' if not dry_run else '待清理'
            print(f"  [{action}] {item['file']} ({item['size']/1024/1024:.1f}MB, {item['mtime']})")


if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='日志分析清理工具')
    parser.add_argument('--log-dir', required=True, help='日志目录')
    parser.add_argument('--retention-days', type=int, default=30, help='保留天数')
    parser.add_argument('--report', help='报告输出文件')
    parser.add_argument('--cleanup', action='store_true', help='执行清理')
    
    args = parser.parse_args()
    
    analyzer = LogAnalyzer(args.log_dir, args.retention_days)
    
    analyzer.generate_report(args.report)
    
    if args.cleanup:
        analyzer.cleanup_old_logs(dry_run=False)
    else:
        analyzer.cleanup_old_logs(dry_run=True)

Python 包管理与虚拟环境

pip 常用命令

bash
# 安装包
pip install requests
pip install requests==2.28.0          # 指定版本
pip install 'requests>=2.25,<3.0'     # 版本范围
pip install -r requirements.txt      # 从文件安装

# 升级包
pip install --upgrade requests
pip install -U requests

# 卸载包
pip uninstall requests

# 查看已安装包
pip list
pip freeze                            # 导出依赖列表
pip freeze > requirements.txt         # 保存到文件

# 搜索包
pip search keyword

# 查看包信息
pip show requests
pip show -f requests                  # 显示所有文件

# 缓存管理
pip cache info
pip cache purge                       # 清除缓存

venv 虚拟环境

bash
# 创建虚拟环境
python3 -m venv myenv

# 激活虚拟环境
source myenv/bin/activate    # Linux/Mac
myenv\Scripts\activate       # Windows

# 退出虚拟环境
deactivate

# 删除虚拟环境
rm -rf myenv

pyproject.toml / setup.py 项目打包

toml
# pyproject.toml
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

[project]
name = "my-tool"
version = "1.0.0"
description = "我的运维工具"
requires-python = ">=3.8"
dependencies = [
    "requests>=2.28.0",
    "paramiko>=3.0.0",
]

[project.scripts]
mytool = "mytool.cli:main"