Docker Registry 共享 Manifest 风险分析
一、场景与风险分析
1.1 背景场景
在使用 Docker Distribution 作为私有镜像仓库时,发现一个关键问题:多个镜像标签(tag)可能共享同一个 Manifest。当通过 Digest 删除其中一个标签时,会影响所有共享该 Manifest 的标签。
1.2 实际现象
bash
# 创建两个共享 Manifest 的标签
docker tag nginx:latest localhost:5000/nginx:v1
docker tag nginx:latest localhost:5000/nginx:v2
docker push localhost:5000/nginx:v1
docker push localhost:5000/nginx:v2
# 通过 Digest 删除 v1
curl -X DELETE http://registry/v2/nginx/manifests/${DIGEST}
# 结果:v1 和 v2 都不可用
curl http://registry/v2/nginx/manifests/v2
# 返回 4041.3 风险分析
1.3.1 级联删除风险
- 问题:删除一个标签会删除所有共享该 Manifest 的标签
- 影响:
- 生产环境标签被开发标签拖累删除
- 多版本并行时误删重要版本
- 灰度发布中的多个标签同时失效
1.3.2 数据不一致风险
- 现象:标签索引与 Manifest 存储不一致
- 表现:bash
curl http://registry/v2/nginx/tags/list # 显示 ["v1", "v2"] curl http://registry/v2/nginx/manifests/v1 # 返回 404 - Manifest 不存在
1.3.3 运维复杂性风险
- 操作不可逆:删除后难以恢复
- 影响评估难:难以确定哪些标签会受影响
- 监控缺失:缺乏标签共享关系的可视化
1.3.4 磁盘管理风险
- 清理困境:需要清理旧镜像释放空间
- 安全顾虑:担心误删正在使用的镜像
- 策略缺失:缺乏安全的自动化清理流程
二、验证方案
2.1 验证步骤
bash
# 1. 创建测试环境
docker pull nginx:latest
docker tag nginx:latest localhost:5000/nginx:v1
docker tag nginx:latest localhost:5000/nginx:v2
docker push localhost:5000/nginx:v1
docker push localhost:5000/nginx:v2
# 2. 验证共享关系
DIGEST_V1=$(curl -I -H "Accept: ..." http://registry/v2/nginx/manifests/v1 | grep Digest)
DIGEST_V2=$(curl -I -H "Accept: ..." http://registry/v2/nginx/manifests/v2 | grep Digest)
# 确认 DIGEST_V1 == DIGEST_V2
# 3. 测试删除影响
curl -X DELETE http://registry/v2/nginx/manifests/${DIGEST_V1}
# 4. 验证影响
curl http://registry/v2/nginx/manifests/v1 # 应返回 404
curl http://registry/v2/nginx/manifests/v2 # 应返回 404
curl http://registry/v2/nginx/tags/list # 可能仍显示标签2.2 验证结果
通过验证确认:
- 共享 Manifest 的标签会一起被删除
- 标签索引可能出现不一致
- 这是 Docker Distribution 的预期行为
三、解决方案:智能清理脚本
3.1 完整脚本代码
python
#!/usr/bin/env python3
"""
Docker Registry 安全镜像清理工具
"""
import os
import sys
import json
import logging
import argparse
import datetime
from typing import List, Dict, Tuple, Optional
import requests
from urllib.parse import urljoin
# 全局 logger
logger = logging.getLogger('registry_cleaner')
def setup_logging(log_file: Optional[str] = None, verbose: bool = False) -> None:
"""配置全局日志"""
logger.handlers.clear()
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件处理器
if log_file:
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
class RegistryCleaner:
"""安全的 Registry 镜像清理器"""
def __init__(self, registry_url: str, verify_ssl: bool = True):
self.registry_url = registry_url.rstrip('/')
self.session = requests.Session()
self.session.verify = verify_ssl
self.headers = {
'Accept': 'application/vnd.docker.distribution.manifest.v2+json, '
'application/vnd.oci.image.manifest.v1+json'
}
# 测试 DELETE 操作是否可用
self.delete_enabled = self._test_delete_enabled()
def _test_delete_enabled(self) -> bool:
"""测试 Registry 是否支持 DELETE 操作"""
test_url = f"{self.registry_url}/v2/"
try:
response = self.session.delete(test_url, timeout=5)
if response.status_code == 405:
allowed_methods = response.headers.get('Allow', '')
logger.info(f"Registry 支持的方法: {allowed_methods}")
if 'DELETE' not in allowed_methods:
logger.warning("Registry 未启用 DELETE 功能")
return 'DELETE' in allowed_methods
elif response.status_code in [200, 204]:
return True
else:
logger.debug(f"DELETE 测试返回: {response.status_code}")
return True # 假设可用
except Exception as e:
logger.warning(f"DELETE 功能测试失败: {e}")
return True # 假设可用
def _make_request(self, method: str, url: str, **kwargs) -> Optional[requests.Response]:
"""安全的 HTTP 请求封装"""
try:
response = self.session.request(method, url, **kwargs, timeout=30)
logger.debug(f"{method} {url} - 状态码: {response.status_code}")
# 记录重要的状态码
if response.status_code == 405:
logger.warning(f"{method} 方法在 {url} 上不被允许")
allowed = response.headers.get('Allow', '')
if allowed:
logger.warning(f"允许的方法: {allowed}")
elif response.status_code == 401:
logger.error("需要认证")
elif response.status_code == 403:
logger.error("权限不足")
return response
except requests.exceptions.ConnectionError as e:
logger.error(f"连接失败: {e}")
except requests.exceptions.Timeout as e:
logger.error(f"请求超时: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"请求异常: {e}")
return None
def get_tags(self, repository: str) -> List[str]:
"""获取仓库的所有标签"""
url = f"{self.registry_url}/v2/{repository}/tags/list"
logger.debug(f"获取 {repository} 的标签列表")
response = self._make_request('GET', url, headers=self.headers)
if response and response.status_code == 200:
data = response.json()
return data.get('tags', [])
elif response and response.status_code == 404:
logger.warning(f"仓库 {repository} 不存在")
return []
def get_manifest_digest(self, repository: str, tag: str) -> Optional[str]:
"""获取标签的 Manifest Digest"""
url = f"{self.registry_url}/v2/{repository}/manifests/{tag}"
logger.debug(f"获取 {repository}:{tag} 的 digest")
response = self._make_request('HEAD', url, headers=self.headers)
if response and response.status_code == 200:
digest = response.headers.get('Docker-Content-Digest', '').strip()
if digest:
logger.debug(f"{repository}:{tag} -> {digest[:20]}...")
return digest
elif response and response.status_code == 404:
logger.warning(f"标签 {repository}:{tag} 不存在")
elif response and response.status_code == 405:
# HEAD 方法可能不被支持,回退到 GET
return self._get_manifest_digest_via_get(repository, tag)
return None
def _get_manifest_digest_via_get(self, repository: str, tag: str) -> Optional[str]:
"""通过 GET 请求获取 Digest"""
url = f"{self.registry_url}/v2/{repository}/manifests/{tag}"
response = self._make_request('GET', url, headers=self.headers)
if response and response.status_code == 200:
return response.headers.get('Docker-Content-Digest', '').strip()
return None
def find_shared_tags(self, repository: str, target_digest: str) -> List[str]:
"""查找共享指定 Digest 的所有标签"""
all_tags = self.get_tags(repository)
if not all_tags:
return []
logger.debug(f"查找引用 digest {target_digest[:20]}... 的标签")
shared_tags = []
for tag in all_tags:
digest = self.get_manifest_digest(repository, tag)
if digest and digest == target_digest:
shared_tags.append(tag)
if shared_tags:
logger.info(f"发现 {len(shared_tags)} 个标签共享此 Manifest")
return shared_tags
def delete_manifest(self, repository: str, digest: str) -> Tuple[bool, str]:
"""删除 Manifest,返回结果和错误信息"""
if not self.delete_enabled:
logger.warning("Registry 可能未启用 DELETE 操作")
url = f"{self.registry_url}/v2/{repository}/manifests/{digest}"
logger.info(f"尝试删除 {repository} @ {digest[:20]}...")
response = self._make_request('DELETE', url, headers=self.headers)
if not response:
return False, "请求失败"
# 处理各种状态码
if response.status_code in [200, 202, 204]:
logger.info("删除成功")
return True, "删除成功"
elif response.status_code == 404:
logger.warning("Manifest 不存在(可能已被删除)")
return False, "Manifest 不存在"
elif response.status_code == 405:
logger.error("ERROR: DELETE 方法不被允许 (405)")
logger.error("请检查 Registry 配置:")
logger.error(" 在 config.yml 中添加: storage.delete.enabled: true")
logger.error(" 或设置环境变量: REGISTRY_STORAGE_DELETE_ENABLED=true")
return False, "DELETE 方法不被允许 (405)"
elif response.status_code == 401:
logger.error("ERROR: 需要认证 (401)")
return False, "需要认证 (401)"
elif response.status_code == 403:
logger.error("ERROR: 权限不足 (403)")
return False, "权限不足 (403)"
else:
logger.error(f"ERROR: 删除失败,状态码: {response.status_code}")
return False, f"删除失败 ({response.status_code})"
def process_tag(self, repository: str, tag: str, dry_run: bool = False) -> Dict:
"""处理单个标签"""
result = {
'repository': repository,
'tag': tag,
'digest': None,
'shared_with': [],
'decision': 'unknown',
'action_taken': 'none',
'can_delete': False,
'delete_success': False,
'error_code': None,
'error_message': '',
'message': '',
'timestamp': datetime.datetime.now().isoformat()
}
logger.info(f"分析: {repository}:{tag}")
# 1. 获取标签的 Digest
digest = self.get_manifest_digest(repository, tag)
if not digest:
result['decision'] = 'skip'
result['error_message'] = '标签不存在或无法访问'
logger.warning(f" {result['error_message']}")
return result
result['digest'] = digest
# 2. 查找共享标签
all_sharing_tags = self.find_shared_tags(repository, digest)
other_sharing_tags = [t for t in all_sharing_tags if t != tag]
result['shared_with'] = other_sharing_tags
if other_sharing_tags:
# 3. 有共享标签 - 不能删除
result['decision'] = 'keep_shared'
result['can_delete'] = False
result['message'] = f'Manifest 被 {len(other_sharing_tags)} 个其他标签共享'
if dry_run:
result['action_taken'] = 'would_keep (dry-run)'
logger.warning(f" [试运行] {result['message']}")
logger.warning(f" [试运行] 共享标签: {other_sharing_tags}")
logger.warning(f" [试运行] 决策: 保留(不删除 Manifest)")
else:
result['action_taken'] = 'kept'
logger.warning(f" {result['message']}")
logger.warning(f" 共享标签: {other_sharing_tags}")
logger.warning(f" 决策: 保留(不删除 Manifest)")
logger.info(f" 标签 {repository}:{tag} 仍然可用")
else:
# 4. 没有共享标签 - 可以尝试删除
result['decision'] = 'delete'
result['can_delete'] = True
if dry_run:
result['action_taken'] = 'would_delete (dry-run)'
result['message'] = '可安全删除(无其他标签共享)'
logger.info(f" [试运行] {result['message']}")
logger.info(f" [试运行] 决策: 删除 Manifest")
else:
# 实际删除
success, error_msg = self.delete_manifest(repository, digest)
if success:
result['action_taken'] = 'deleted'
result['delete_success'] = True
result['message'] = '已删除 Manifest'
logger.info(f" {result['message']}")
else:
result['action_taken'] = 'delete_failed'
result['delete_success'] = False
result['error_message'] = error_msg
# 提取状态码
if '405' in error_msg:
result['error_code'] = 405
elif '401' in error_msg:
result['error_code'] = 401
elif '403' in error_msg:
result['error_code'] = 403
result['message'] = f'删除失败: {error_msg}'
logger.error(f" {result['message']}")
return result
def process_tags(self, tag_list: List[str], dry_run: bool = False) -> Dict:
"""批量处理标签"""
if not tag_list:
return {}
logger.info(f"开始处理 {len(tag_list)} 个标签")
logger.info("=" * 60)
results = []
stats = {
'total': len(tag_list),
'processed': 0,
'deletable': 0,
'deleted': 0,
'kept_shared': 0,
'skipped': 0,
'failed': 0,
'errors_405': 0,
'errors_401': 0,
'errors_403': 0,
'errors_other': 0
}
for i, tag_spec in enumerate(tag_list, 1):
if ':' not in tag_spec:
logger.warning(f"跳过无效格式: {tag_spec}")
stats['skipped'] += 1
continue
repository, tag = tag_spec.split(':', 1)
logger.info(f"[{i}/{len(tag_list)}]")
result = self.process_tag(repository, tag, dry_run)
results.append(result)
# 更新统计
stats['processed'] += 1
if result['decision'] == 'delete':
stats['deletable'] += 1
if result.get('delete_success'):
stats['deleted'] += 1
elif result.get('error_code') == 405:
stats['errors_405'] += 1
stats['failed'] += 1
elif result.get('error_code') == 401:
stats['errors_401'] += 1
stats['failed'] += 1
elif result.get('error_code') == 403:
stats['errors_403'] += 1
stats['failed'] += 1
elif result.get('error_message'):
stats['errors_other'] += 1
stats['failed'] += 1
elif result['decision'] == 'keep_shared':
stats['kept_shared'] += 1
elif result['decision'] == 'skip':
stats['skipped'] += 1
# 生成报告
report = {
'timestamp': datetime.datetime.now().isoformat(),
'registry': self.registry_url,
'delete_enabled': self.delete_enabled,
'dry_run': dry_run,
'stats': stats,
'results': results
}
# 输出摘要
self._print_summary(stats, dry_run)
return report
def _print_summary(self, stats: Dict, dry_run: bool) -> None:
"""打印摘要信息"""
logger.info("=" * 60)
logger.info("处理完成:")
logger.info(f" 总计标签: {stats['total']}")
logger.info(f" 已处理: {stats['processed']}")
logger.info(f" 可删除: {stats['deletable']}")
logger.info(f" 共享保留: {stats['kept_shared']}")
logger.info(f" 跳过: {stats['skipped']}")
if dry_run:
logger.info(f" 模拟删除: {stats['deletable']}")
if stats['kept_shared'] > 0:
logger.warning(f" 注意: {stats['kept_shared']} 个标签因共享而保留")
else:
logger.info(f" 实际删除: {stats['deleted']}")
logger.info(f" 删除失败: {stats['failed']}")
if stats['errors_405'] > 0:
logger.error(f" DELETE 禁用错误: {stats['errors_405']}")
logger.error(" 请检查 Registry 配置:")
logger.error(" config.yml 需要: storage.delete.enabled: true")
if stats['errors_401'] > 0:
logger.error(f" 认证错误: {stats['errors_401']}")
if stats['errors_403'] > 0:
logger.error(f" 权限错误: {stats['errors_403']}")
def read_tag_list(file_path: str) -> List[str]:
"""从文件读取标签列表"""
tags = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
tags.append(line)
return tags
except Exception as e:
logger.error(f"读取文件失败: {e}")
return []
def save_report(report: Dict, output_file: str) -> None:
"""保存报告到文件"""
try:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
logger.info(f"报告已保存: {output_file}")
except Exception as e:
logger.error(f"保存报告失败: {e}")
def main():
parser = argparse.ArgumentParser(
description='Docker Registry 安全镜像清理工具',
epilog="""
使用示例:
1. 试运行(推荐):
python3 registry_cleaner.py --registry http://localhost:5000 \\
--input tags.txt --dry-run --log dry-run.log
2. 实际执行:
python3 registry_cleaner.py --registry http://localhost:5000 \\
--input tags.txt --log cleanup.log --output report.json
3. 处理 405 错误时的 Registry 配置:
version: 0.1
storage:
delete:
enabled: true
"""
)
parser.add_argument('--registry', '-r', required=True,
help='Registry 地址')
parser.add_argument('--input', '-i', required=True,
help='标签列表文件')
parser.add_argument('--output', '-o',
help='输出报告文件')
parser.add_argument('--log', '-l', default='registry_cleaner.log',
help='日志文件')
parser.add_argument('--dry-run', action='store_true',
help='试运行模式')
parser.add_argument('--insecure', '-k', action='store_true',
help='忽略 SSL 验证')
parser.add_argument('--verbose', '-v', action='store_true',
help='详细输出')
args = parser.parse_args()
setup_logging(args.log, args.verbose)
logger.info("=" * 60)
logger.info("Docker Registry 安全清理工具")
logger.info(f"时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"Registry: {args.registry}")
logger.info(f"模式: {'试运行' if args.dry_run else '实际执行'}")
logger.info("=" * 60)
tags = read_tag_list(args.input)
if not tags:
logger.error("没有有效的标签")
return 1
cleaner = RegistryCleaner(args.registry, not args.insecure)
report = cleaner.process_tags(tags, args.dry_run)
if args.output:
save_report(report, args.output)
if not args.dry_run and report.get('stats', {}).get('errors_405', 0) > 0:
logger.error("\n" + "!" * 60)
logger.error("重要: Registry 未启用 DELETE 功能")
logger.error("请修改配置后重试")
logger.error("!" * 60)
return 2
return 0
if __name__ == '__main__':
sys.exit(main())四、使用指南
4.1 准备阶段
bash
# 1. 创建标签列表文件
cat > tags-to-clean.txt << 'EOF'
# 要清理的镜像标签
nginx:v1
nginx:v2
web-app:dev
redis:6.0-old
EOF
# 2. 确认 Registry 配置
# 检查 Registry 是否启用删除功能
curl -X DELETE http://192.168.148.30:5000/v2/
# 如果返回 405,需要启用删除功能4.2 执行清理
bash
# 1. 先试运行(必须!)
python3 registry_cleaner.py \
--registry http://192.168.148.30:5000 \
--input tags-to-clean.txt \
--dry-run \
--log dry-run-$(date +%Y%m%d).log
# 2. 查看试运行结果
grep -E "决策:|共享标签:|可安全删除" dry-run-20240117.log
# 3. 实际执行(确认无误后)
python3 registry_cleaner.py \
--registry http://192.168.148.30:5000 \
--input tags-to-clean.txt \
--log cleanup-$(date +%Y%m%d).log \
--output report-$(date +%Y%m%d).json4.3 结果分析
bash
# 查看统计摘要
cat report-20240117.json | jq '.stats'
# 查看共享标签详情
cat report-20240117.json | jq '.results[] | select(.decision == "keep_shared")'
# 查看删除成功的标签
cat report-20240117.json | jq '.results[] | select(.action_taken == "deleted")'
# 查看错误信息
cat report-20240117.json | jq '.results[] | select(.error_code != null)'五、Registry 配置
5.1 启用删除功能
yaml
# config.yml
version: 0.1
storage:
filesystem:
rootdirectory: /var/lib/registry
delete:
enabled: true # 必须启用
http:
addr: :50005.2 Docker 运行方式
bash
docker run -d \
-p 5000:5000 \
-e REGISTRY_STORAGE_DELETE_ENABLED=true \
-v /data/registry:/var/lib/registry \
--name registry \
registry:2六、最佳实践
6.1 标签命名规范
bash
# 避免共享 Manifest
# 不推荐 - 共享 Manifest
docker tag myapp:latest myapp:v1.0.0
docker tag myapp:latest myapp:production
# 推荐 - 不同 Manifest
docker build -t myapp:v1.0.0-$(date +%s) .
docker build -t myapp:production-$(date +%s) .6.2 清理策略
bash
# 定期清理脚本
0 2 * * * /usr/local/bin/clean-registry.sh
# clean-registry.sh
#!/bin/bash
python3 /opt/registry_cleaner.py \
--registry http://registry:5000 \
--input /etc/registry/cleanup-list.txt \
--log /var/log/registry/cleanup-$(date +%Y%m%d).log \
--output /var/log/registry/report-$(date +%Y%m%d).json6.3 监控告警
bash
# 监控磁盘使用
df -h /data/registry
# 监控清理结果
if grep -q "errors_405" report.json; then
echo "ALERT: Registry delete not enabled"
fi
# 监控共享标签数量
SHARED_COUNT=$(cat report.json | jq '.stats.kept_shared')
if [ $SHARED_COUNT -gt 10 ]; then
echo "WARNING: Too many shared manifests"
fi