import re
from collections import defaultdict
from typing import Dict, List, Tuple
def find_duplicates_basic(log_file: str, key_column: int = None) -> Dict[str, List[int]]:
"""
查找日志中的重复行
Args:
log_file: 日志文件路径
key_column: 指定哪一列作为去重键(从0开始),None表示使用整行
Returns:
字典:{重复内容: [行号列表]}
"""
duplicates = defaultdict(list)
with open(log_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
# 提取关键字段
if key_column is not None:
# 按空格分割(可根据实际日志格式调整)
parts = line.split()
if len(parts) > key_column:
key = parts[key_column]
else:
key = line
else:
key = line
duplicates[key].append(line_num)
# 只保留重复的行
return {key: lines for key, lines in duplicates.items() if len(lines) > 1}
def print_duplicates(duplicates: Dict[str, List[int]], limit: int = 10):
"""打印重复内容"""
if not duplicates:
print("未发现重复数据")
return
print(f"发现 {len(duplicates)} 组重复数据:")
print("-" * 80)
for i, (content, lines) in enumerate(duplicates.items()):
if i >= limit:
print(f"... 还有 {len(duplicates) - limit} 组未显示")
break
print(f"重复内容: {content[:100]}..." if len(content) > 100 else f"重复内容: {content}")
print(f"出现位置: 第 {', '.join(map(str, lines))} 行")
print("-" * 60)
# 使用示例
if __name__ == "__main__":
# 示例1:使用整行作为判断依据
duplicates = find_duplicates_basic("sync_log.txt")
print_duplicates(duplicates)
# 示例2:使用特定列(如第2列,索引1)
# duplicates = find_duplicates_basic("sync_log.txt", key_column=1)
import re
from collections import defaultdict, Counter
from datetime import datetime
from typing import Dict, List, Set, Optional
import json
import csv
class LogDuplicateFinder:
"""日志重复数据查找器"""
def __init__(self, log_format: str = "auto"):
"""
初始化
Args:
log_format: 日志格式,可选 'json', 'csv', 'nginx', 'syslog', 'auto'
"""
self.log_format = log_format
def find_duplicates(
self,
log_file: str,
key_fields: List[str] = None,
time_window: Optional[int] = None
) -> Dict[str, Dict]:
"""
查找重复数据
Args:
log_file: 日志文件路径
key_fields: 用于去重的关键字段列表
time_window: 时间窗口(秒),None表示不考虑时间
Returns:
重复数据统计
"""
self.log_data = []
duplicates = defaultdict(list)
with open(log_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
# 解析日志行
parsed = self._parse_line(line, line_num)
if parsed:
self.log_data.append(parsed)
# 生成唯一键
key = self._generate_key(parsed, key_fields)
# 如果指定时间窗口,将时间信息加入键
if time_window and 'timestamp' in parsed:
time_key = f"{key}_{parsed['timestamp'] // time_window}"
duplicates[time_key].append(line_num)
else:
duplicates[key].append(line_num)
# 收集结果
result = {
'total_lines': len(self.log_data),
'unique_lines': len(duplicates),
'duplicate_groups': [],
'statistics': {}
}
for key, lines in duplicates.items():
if len(lines) > 1:
# 获取示例内容
sample_line = self.log_data[lines[0] - 1]
result['duplicate_groups'].append({
'key': key,
'count': len(lines),
'lines': lines,
'sample': sample_line.get('raw', '')[:200],
'first_occurrence': lines[0],
'last_occurrence': lines[-1]
})
# 统计信息
if result['duplicate_groups']:
counts = [group['count'] for group in result['duplicate_groups']]
result['statistics'] = {
'total_duplicates': sum(counts) - len(counts),
'max_duplicates': max(counts),
'avg_duplicates': sum(counts) / len(counts),
'most_common_key': max(result['duplicate_groups'],
key=lambda x: x['count'])['key']
}
return result
def _parse_line(self, line: str, line_num: int) -> Dict:
"""解析单行日志"""
result = {'raw': line, 'line_num': line_num}
if self.log_format == 'json' or (self.log_format == 'auto' and line.startswith('{')):
try:
result.update(json.loads(line))
except:
pass
elif self.log_format == 'csv' or (self.log_format == 'auto' and ',' in line):
try:
reader = csv.reader([line])
parts = next(reader)
for i, part in enumerate(parts):
result[f'col_{i}'] = part
except:
pass
# 尝试提取时间戳
time_patterns = [
r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}', # ISO格式
r'\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}', # Apache格式
]
for pattern in time_patterns:
match = re.search(pattern, line)
if match:
try:
# 尝试转换为时间戳
dt_str = match.group()
dt_str = dt_str.replace('[', '').replace(']', '')
# 尝试多种格式
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%d/%b/%Y:%H:%M:%S',
]
for fmt in formats:
try:
dt = datetime.strptime(dt_str, fmt)
result['timestamp'] = dt.timestamp()
result['datetime'] = dt_str
break
except:
continue
except:
continue
# 提取IP地址
ip_match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', line)
if ip_match:
result['ip'] = ip_match.group()
# 提取URL
url_match = re.search(r'(GET|POST|PUT|DELETE)\s+([^\s]+)', line)
if url_match:
result['method'] = url_match.group(1)
result['url'] = url_match.group(2)
return result
def _generate_key(self, parsed: Dict, key_fields: List[str] = None) -> str:
"""生成唯一键"""
if key_fields:
key_parts = []
for field in key_fields:
if field in parsed:
key_parts.append(str(parsed[field]))
else:
key_parts.append('')
return '|'.join(key_parts)
else:
return parsed['raw']
def export_results(self, result: Dict, output_format: str = 'text'):
"""导出结果"""
if output_format == 'json':
with open('duplicates.json', 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print("结果已导出到 duplicates.json")
elif output_format == 'csv':
with open('duplicates.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['重复内容', '出现次数', '行号列表'])
for group in result['duplicate_groups']:
writer.writerow([
group['sample'],
group['count'],
';'.join(map(str, group['lines']))
])
print("结果已导出到 duplicates.csv")
else: # text
print(f"日志分析报告")
print(f"总行数: {result['total_lines']}")
print(f"唯一行数: {result['unique_lines']}")
print(f"重复组数: {len(result['duplicate_groups'])}")
if result['duplicate_groups']:
print(f"\n重复统计:")
print(f" 总重复次数: {result['statistics']['total_duplicates']}")
print(f" 最大重复次数: {result['statistics']['max_duplicates']}")
print(f" 平均重复次数: {result['statistics']['avg_duplicates']:.2f}")
print(f"\n前10组重复数据:")
for i, group in enumerate(result['duplicate_groups'][:10]):
print(f"\n{i+1}. 重复内容: {group['sample']}")
print(f" 出现次数: {group['count']}")
print(f" 位置: 第 {group['first_occurrence']} 行 (首次), 第 {group['last_occurrence']} 行 (末次)")
print(f" 所有行号: {group['lines']}")
# 使用示例
if __name__ == "__main__":
# 创建查找器
finder = LogDuplicateFinder(log_format='auto')
# 查找重复数据(使用URL作为去重键)
result = finder.find_duplicates(
log_file="sync_log.txt",
key_fields=['url', 'method'], # 根据实际情况调整
time_window=60 # 60秒内相同内容算重复
)
# 输出结果
finder.export_results(result, output_format='text')
# 也可以导出为JSON或CSV
# finder.export_results(result, output_format='json')
import re
from collections import defaultdict
import hashlib
import argparse
def find_sync_log_duplicates(
log_file: str,
check_fields: List[str] = None,
min_occurrences: int = 2,
ignore_timestamp: bool = False
) -> Dict:
"""
专门用于同步日志的重复检测
Args:
log_file: 日志文件
check_fields: 需要检查的字段 ['timestamp', 'operation', 'object_id', 'status']
min_occurrences: 最小出现次数
ignore_timestamp: 是否忽略时间戳差异
Returns:
重复数据
"""
if check_fields is None:
check_fields = ['operation', 'object_id']
# 常见同步日志模式
patterns = {
'timestamp': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]',
'operation': r'操作[::]\s*(\w+)',
'object_id': r'对象ID[::]\s*([\w-]+)',
'status': r'状态[::]\s*(\w+)',
'user': r'用户[::]\s*(\w+)',
'source': r'源[::]\s*([\w:/.-]+)',
'target': r'目标[::]\s*([\w:/.-]+)'
}
log_entries = []
field_extractor = re.compile('|'.join(f'({pattern})' for pattern in patterns.values()))
with open(log_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
entry = {'raw': line, 'line_num': line_num}
# 提取字段
for field_name, pattern in patterns.items():
match = re.search(pattern, line)
if match:
entry[field_name] = match.group(1)
log_entries.append(entry)
# 生成键
duplicates = defaultdict(list)
for entry in log_entries:
key_parts = []
for field in check_fields:
value = entry.get(field, '')
if field == 'timestamp' and ignore_timestamp:
# 只保留日期部分,忽略具体时间
if ' ' in value:
value = value.split(' ')[0]
key_parts.append(str(value))
key = '|'.join(key_parts)
duplicates[key].append(entry['line_num'])
# 过滤结果
result = []
for key, lines in duplicates.items():
if len(lines) >= min_occurrences:
# 获取示例
sample_idx = lines[0] - 1
if 0 <= sample_idx < len(log_entries):
sample = log_entries[sample_idx]
result.append({
'key': key,
'count': len(lines),
'lines': lines,
'sample': sample['raw'],
'details': {k: sample.get(k, 'N/A') for k in check_fields}
})
return {
'total_entries': len(log_entries),
'duplicate_entries': sum(len(r['lines']) for r in result),
'duplicate_groups': sorted(result, key=lambda x: x['count'], reverse=True)
}
def main():
parser = argparse.ArgumentParser(description='同步日志重复数据检测工具')
parser.add_argument('log_file', help='日志文件路径')
parser.add_argument('-f', '--fields', nargs='+',
default=['operation', 'object_id'],
help='检查的字段')
parser.add_argument('-m', '--min', type=int, default=2,
help='最小重复次数')
parser.add_argument('-i', '--ignore-time', action='store_true',
help='忽略时间戳差异')
parser.add_argument('-o', '--output', choices=['text', 'json', 'csv'],
default='text', help='输出格式')
args = parser.parse_args()
# 执行检测
results = find_sync_log_duplicates(
log_file=args.log_file,
check_fields=args.fields,
min_occurrences=args.min,
ignore_timestamp=args.ignore_time
)
# 输出结果
if args.output == 'json':
import json
print(json.dumps(results, indent=2, ensure_ascii=False))
else:
print(f"日志分析完成")
print(f"总日志条目: {results['total_entries']}")
print(f"重复条目数: {results['duplicate_entries']}")
print(f"重复组数: {len(results['duplicate_groups'])}")
if results['duplicate_groups']:
print(f"\n重复检测结果 (按重复次数排序):")
for i, group in enumerate(results['duplicate_groups'][:20]):
print(f"\n{i+1}. 重复次数: {group['count']}")
print(f" 检查字段: {group['details']}")
print(f" 示例: {group['sample'][:150]}...")
print(f" 行号: {group['lines']}")
if __name__ == "__main__":
main()
pip install pandas # 如果需要使用pandas版本
# 直接查找完全重复的行
python -c "
from collections import defaultdict
with open('sync.log') as f:
lines = f.readlines()
counts = defaultdict(list)
for i, line in enumerate(lines):
counts[line.strip()].append(i+1)
for line, nums in counts.items():
if len(nums) > 1:
print(f'重复: {line[:50]}... 出现在行: {nums}')
"
import pandas as pd
# 读取日志
df = pd.read_csv('sync.log', sep='\t', header=None, names=['timestamp', 'operation', 'details'])
# 查找重复
duplicates = df[df.duplicated(subset=['operation', 'details'], keep=False)]
# 按操作类型分组显示
for operation, group in duplicates.groupby('operation'):
print(f"\n操作 '{operation}' 的重复记录:")
print(group[['timestamp', 'details']].to_string(index=False))
选择合适的方案:
常见同步日志重复场景:
优化建议:
这个工具可以帮助你快速发现同步日志中的重复数据,根据实际日志格式调整正则表达式即可。