欢迎光临本地信息咨询网
详情描述

方案1:基础版本 - 使用字典统计频率

import re
from collections import defaultdict
from typing import Dict, List, Tuple

def find_duplicates_basic(log_file: str, key_column: int = None) -> Dict[str, List[int]]:
    """
    查找日志中的重复行

    Args:
        log_file: 日志文件路径
        key_column: 指定哪一列作为去重键(从0开始),None表示使用整行

    Returns:
        字典:{重复内容: [行号列表]}
    """
    duplicates = defaultdict(list)

    with open(log_file, 'r', encoding='utf-8') as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue

            # 提取关键字段
            if key_column is not None:
                # 按空格分割(可根据实际日志格式调整)
                parts = line.split()
                if len(parts) > key_column:
                    key = parts[key_column]
                else:
                    key = line
            else:
                key = line

            duplicates[key].append(line_num)

    # 只保留重复的行
    return {key: lines for key, lines in duplicates.items() if len(lines) > 1}


def print_duplicates(duplicates: Dict[str, List[int]], limit: int = 10):
    """打印重复内容"""
    if not duplicates:
        print("未发现重复数据")
        return

    print(f"发现 {len(duplicates)} 组重复数据:")
    print("-" * 80)

    for i, (content, lines) in enumerate(duplicates.items()):
        if i >= limit:
            print(f"... 还有 {len(duplicates) - limit} 组未显示")
            break

        print(f"重复内容: {content[:100]}..." if len(content) > 100 else f"重复内容: {content}")
        print(f"出现位置: 第 {', '.join(map(str, lines))} 行")
        print("-" * 60)


# 使用示例
if __name__ == "__main__":
    # 示例1:使用整行作为判断依据
    duplicates = find_duplicates_basic("sync_log.txt")
    print_duplicates(duplicates)

    # 示例2:使用特定列(如第2列,索引1)
    # duplicates = find_duplicates_basic("sync_log.txt", key_column=1)

方案2:高级版本 - 支持多种日志格式和模式

import re
from collections import defaultdict, Counter
from datetime import datetime
from typing import Dict, List, Set, Optional
import json
import csv

class LogDuplicateFinder:
    """日志重复数据查找器"""

    def __init__(self, log_format: str = "auto"):
        """
        初始化

        Args:
            log_format: 日志格式,可选 'json', 'csv', 'nginx', 'syslog', 'auto'
        """
        self.log_format = log_format

    def find_duplicates(
        self, 
        log_file: str, 
        key_fields: List[str] = None,
        time_window: Optional[int] = None
    ) -> Dict[str, Dict]:
        """
        查找重复数据

        Args:
            log_file: 日志文件路径
            key_fields: 用于去重的关键字段列表
            time_window: 时间窗口(秒),None表示不考虑时间

        Returns:
            重复数据统计
        """
        self.log_data = []
        duplicates = defaultdict(list)

        with open(log_file, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue

                # 解析日志行
                parsed = self._parse_line(line, line_num)
                if parsed:
                    self.log_data.append(parsed)

                    # 生成唯一键
                    key = self._generate_key(parsed, key_fields)

                    # 如果指定时间窗口,将时间信息加入键
                    if time_window and 'timestamp' in parsed:
                        time_key = f"{key}_{parsed['timestamp'] // time_window}"
                        duplicates[time_key].append(line_num)
                    else:
                        duplicates[key].append(line_num)

        # 收集结果
        result = {
            'total_lines': len(self.log_data),
            'unique_lines': len(duplicates),
            'duplicate_groups': [],
            'statistics': {}
        }

        for key, lines in duplicates.items():
            if len(lines) > 1:
                # 获取示例内容
                sample_line = self.log_data[lines[0] - 1]

                result['duplicate_groups'].append({
                    'key': key,
                    'count': len(lines),
                    'lines': lines,
                    'sample': sample_line.get('raw', '')[:200],
                    'first_occurrence': lines[0],
                    'last_occurrence': lines[-1]
                })

        # 统计信息
        if result['duplicate_groups']:
            counts = [group['count'] for group in result['duplicate_groups']]
            result['statistics'] = {
                'total_duplicates': sum(counts) - len(counts),
                'max_duplicates': max(counts),
                'avg_duplicates': sum(counts) / len(counts),
                'most_common_key': max(result['duplicate_groups'], 
                                      key=lambda x: x['count'])['key']
            }

        return result

    def _parse_line(self, line: str, line_num: int) -> Dict:
        """解析单行日志"""
        result = {'raw': line, 'line_num': line_num}

        if self.log_format == 'json' or (self.log_format == 'auto' and line.startswith('{')):
            try:
                result.update(json.loads(line))
            except:
                pass

        elif self.log_format == 'csv' or (self.log_format == 'auto' and ',' in line):
            try:
                reader = csv.reader([line])
                parts = next(reader)
                for i, part in enumerate(parts):
                    result[f'col_{i}'] = part
            except:
                pass

        # 尝试提取时间戳
        time_patterns = [
            r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}',  # ISO格式
            r'\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}',     # Apache格式
        ]

        for pattern in time_patterns:
            match = re.search(pattern, line)
            if match:
                try:
                    # 尝试转换为时间戳
                    dt_str = match.group()
                    dt_str = dt_str.replace('[', '').replace(']', '')

                    # 尝试多种格式
                    formats = [
                        '%Y-%m-%d %H:%M:%S',
                        '%Y-%m-%dT%H:%M:%S',
                        '%d/%b/%Y:%H:%M:%S',
                    ]

                    for fmt in formats:
                        try:
                            dt = datetime.strptime(dt_str, fmt)
                            result['timestamp'] = dt.timestamp()
                            result['datetime'] = dt_str
                            break
                        except:
                            continue
                except:
                    continue

        # 提取IP地址
        ip_match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', line)
        if ip_match:
            result['ip'] = ip_match.group()

        # 提取URL
        url_match = re.search(r'(GET|POST|PUT|DELETE)\s+([^\s]+)', line)
        if url_match:
            result['method'] = url_match.group(1)
            result['url'] = url_match.group(2)

        return result

    def _generate_key(self, parsed: Dict, key_fields: List[str] = None) -> str:
        """生成唯一键"""
        if key_fields:
            key_parts = []
            for field in key_fields:
                if field in parsed:
                    key_parts.append(str(parsed[field]))
                else:
                    key_parts.append('')
            return '|'.join(key_parts)
        else:
            return parsed['raw']

    def export_results(self, result: Dict, output_format: str = 'text'):
        """导出结果"""
        if output_format == 'json':
            with open('duplicates.json', 'w', encoding='utf-8') as f:
                json.dump(result, f, indent=2, ensure_ascii=False)
            print("结果已导出到 duplicates.json")

        elif output_format == 'csv':
            with open('duplicates.csv', 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerow(['重复内容', '出现次数', '行号列表'])
                for group in result['duplicate_groups']:
                    writer.writerow([
                        group['sample'],
                        group['count'],
                        ';'.join(map(str, group['lines']))
                    ])
            print("结果已导出到 duplicates.csv")

        else:  # text
            print(f"日志分析报告")
            print(f"总行数: {result['total_lines']}")
            print(f"唯一行数: {result['unique_lines']}")
            print(f"重复组数: {len(result['duplicate_groups'])}")

            if result['duplicate_groups']:
                print(f"\n重复统计:")
                print(f"  总重复次数: {result['statistics']['total_duplicates']}")
                print(f"  最大重复次数: {result['statistics']['max_duplicates']}")
                print(f"  平均重复次数: {result['statistics']['avg_duplicates']:.2f}")

                print(f"\n前10组重复数据:")
                for i, group in enumerate(result['duplicate_groups'][:10]):
                    print(f"\n{i+1}. 重复内容: {group['sample']}")
                    print(f"   出现次数: {group['count']}")
                    print(f"   位置: 第 {group['first_occurrence']} 行 (首次), 第 {group['last_occurrence']} 行 (末次)")
                    print(f"   所有行号: {group['lines']}")


# 使用示例
if __name__ == "__main__":
    # 创建查找器
    finder = LogDuplicateFinder(log_format='auto')

    # 查找重复数据(使用URL作为去重键)
    result = finder.find_duplicates(
        log_file="sync_log.txt",
        key_fields=['url', 'method'],  # 根据实际情况调整
        time_window=60  # 60秒内相同内容算重复
    )

    # 输出结果
    finder.export_results(result, output_format='text')

    # 也可以导出为JSON或CSV
    # finder.export_results(result, output_format='json')

方案3:针对特定同步日志的优化版本

import re
from collections import defaultdict
import hashlib
import argparse

def find_sync_log_duplicates(
    log_file: str,
    check_fields: List[str] = None,
    min_occurrences: int = 2,
    ignore_timestamp: bool = False
) -> Dict:
    """
    专门用于同步日志的重复检测

    Args:
        log_file: 日志文件
        check_fields: 需要检查的字段 ['timestamp', 'operation', 'object_id', 'status']
        min_occurrences: 最小出现次数
        ignore_timestamp: 是否忽略时间戳差异

    Returns:
        重复数据
    """
    if check_fields is None:
        check_fields = ['operation', 'object_id']

    # 常见同步日志模式
    patterns = {
        'timestamp': r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]',
        'operation': r'操作[::]\s*(\w+)',
        'object_id': r'对象ID[::]\s*([\w-]+)',
        'status': r'状态[::]\s*(\w+)',
        'user': r'用户[::]\s*(\w+)',
        'source': r'源[::]\s*([\w:/.-]+)',
        'target': r'目标[::]\s*([\w:/.-]+)'
    }

    log_entries = []
    field_extractor = re.compile('|'.join(f'({pattern})' for pattern in patterns.values()))

    with open(log_file, 'r', encoding='utf-8') as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue

            entry = {'raw': line, 'line_num': line_num}

            # 提取字段
            for field_name, pattern in patterns.items():
                match = re.search(pattern, line)
                if match:
                    entry[field_name] = match.group(1)

            log_entries.append(entry)

    # 生成键
    duplicates = defaultdict(list)
    for entry in log_entries:
        key_parts = []
        for field in check_fields:
            value = entry.get(field, '')

            if field == 'timestamp' and ignore_timestamp:
                # 只保留日期部分,忽略具体时间
                if ' ' in value:
                    value = value.split(' ')[0]

            key_parts.append(str(value))

        key = '|'.join(key_parts)
        duplicates[key].append(entry['line_num'])

    # 过滤结果
    result = []
    for key, lines in duplicates.items():
        if len(lines) >= min_occurrences:
            # 获取示例
            sample_idx = lines[0] - 1
            if 0 <= sample_idx < len(log_entries):
                sample = log_entries[sample_idx]

                result.append({
                    'key': key,
                    'count': len(lines),
                    'lines': lines,
                    'sample': sample['raw'],
                    'details': {k: sample.get(k, 'N/A') for k in check_fields}
                })

    return {
        'total_entries': len(log_entries),
        'duplicate_entries': sum(len(r['lines']) for r in result),
        'duplicate_groups': sorted(result, key=lambda x: x['count'], reverse=True)
    }


def main():
    parser = argparse.ArgumentParser(description='同步日志重复数据检测工具')
    parser.add_argument('log_file', help='日志文件路径')
    parser.add_argument('-f', '--fields', nargs='+', 
                       default=['operation', 'object_id'],
                       help='检查的字段')
    parser.add_argument('-m', '--min', type=int, default=2,
                       help='最小重复次数')
    parser.add_argument('-i', '--ignore-time', action='store_true',
                       help='忽略时间戳差异')
    parser.add_argument('-o', '--output', choices=['text', 'json', 'csv'],
                       default='text', help='输出格式')

    args = parser.parse_args()

    # 执行检测
    results = find_sync_log_duplicates(
        log_file=args.log_file,
        check_fields=args.fields,
        min_occurrences=args.min,
        ignore_timestamp=args.ignore_time
    )

    # 输出结果
    if args.output == 'json':
        import json
        print(json.dumps(results, indent=2, ensure_ascii=False))
    else:
        print(f"日志分析完成")
        print(f"总日志条目: {results['total_entries']}")
        print(f"重复条目数: {results['duplicate_entries']}")
        print(f"重复组数: {len(results['duplicate_groups'])}")

        if results['duplicate_groups']:
            print(f"\n重复检测结果 (按重复次数排序):")
            for i, group in enumerate(results['duplicate_groups'][:20]):
                print(f"\n{i+1}. 重复次数: {group['count']}")
                print(f"   检查字段: {group['details']}")
                print(f"   示例: {group['sample'][:150]}...")
                print(f"   行号: {group['lines']}")


if __name__ == "__main__":
    main()

快速使用指南

1. 安装依赖

pip install pandas  # 如果需要使用pandas版本

2. 最简单用法

# 直接查找完全重复的行
python -c "
from collections import defaultdict
with open('sync.log') as f:
    lines = f.readlines()

counts = defaultdict(list)
for i, line in enumerate(lines):
    counts[line.strip()].append(i+1)

for line, nums in counts.items():
    if len(nums) > 1:
        print(f'重复: {line[:50]}... 出现在行: {nums}')
"

3. 使用Pandas版本(适合大数据量)

import pandas as pd

# 读取日志
df = pd.read_csv('sync.log', sep='\t', header=None, names=['timestamp', 'operation', 'details'])

# 查找重复
duplicates = df[df.duplicated(subset=['operation', 'details'], keep=False)]

# 按操作类型分组显示
for operation, group in duplicates.groupby('operation'):
    print(f"\n操作 '{operation}' 的重复记录:")
    print(group[['timestamp', 'details']].to_string(index=False))

使用建议

选择合适的方案

  • 小文件:使用方案1基础版本
  • 结构化日志:使用方案2高级版本
  • 特定同步日志:使用方案3

常见同步日志重复场景

  • 相同操作重复执行
  • 数据同步重复提交
  • 网络重试导致的重复
  • 定时任务配置错误

优化建议

  • 大文件使用生成器逐行读取
  • 考虑使用Bloom Filter减少内存
  • 使用多线程处理多个日志文件

这个工具可以帮助你快速发现同步日志中的重复数据,根据实际日志格式调整正则表达式即可。

相关帖子
在Windows批处理(.bat)文件中获取拖入文件信息的各种方法
在Windows批处理(.bat)文件中获取拖入文件信息的各种方法
如何识别生活中那些正在持续消耗你情绪价值的人际关系与社交场合?
如何识别生活中那些正在持续消耗你情绪价值的人际关系与社交场合?
临期食品折扣店的货源从何而来?它们的供应链究竟是怎么运作的?
临期食品折扣店的货源从何而来?它们的供应链究竟是怎么运作的?
老房原有铝芯电线是否必须全部更换为铜芯线,如何判断?
老房原有铝芯电线是否必须全部更换为铜芯线,如何判断?
手机、平板电脑的锂电池在什么情况下可能成为家庭隐患?
手机、平板电脑的锂电池在什么情况下可能成为家庭隐患?
扬州市救护车转院转运回家&长途跨省救护车转运24小时电话
扬州市救护车转院转运回家&长途跨省救护车转运24小时电话
高效管理你的Linux系统: Debian操作系统常用命令指南
高效管理你的Linux系统: Debian操作系统常用命令指南
长期受噪音困扰影响休息,如何进行自我心理调节与舒缓?
长期受噪音困扰影响休息,如何进行自我心理调节与舒缓?
济南市网站建设服务%精准获客系统,高端网站开发设计
济南市网站建设服务%精准获客系统,高端网站开发设计
如果社保已经断缴了,在2026年还有没有机会进行补缴操作?
如果社保已经断缴了,在2026年还有没有机会进行补缴操作?
湛江市企业网站建设公司&做网站公司,收费标准
湛江市企业网站建设公司&做网站公司,收费标准
2026年通过哪些官方或正规的线上平台,可以查询到可靠的跨境招聘信息?
2026年通过哪些官方或正规的线上平台,可以查询到可靠的跨境招聘信息?
杭州市救护车长途跨省护送病人出院@120救护车一次多少钱
杭州市救护车长途跨省护送病人出院@120救护车一次多少钱
黄石市专业网站设计制作%定制化网站建设,小程序开发
黄石市专业网站设计制作%定制化网站建设,小程序开发
在马路上看到哪些问题可以通过随手拍上报并获得奖励?
在马路上看到哪些问题可以通过随手拍上报并获得奖励?
2026年异地销户是否支持线上办理,话费余额如何处理?
2026年异地销户是否支持线上办理,话费余额如何处理?
web面试常问http缓存解析相关
web面试常问http缓存解析相关
徐州市长途救护车出租&重症急救车出租,转院接送
徐州市长途救护车出租&重症急救车出租,转院接送
外卖骑手在等待取餐时,平台系统是如何自动识别和记录等时时间的?
外卖骑手在等待取餐时,平台系统是如何自动识别和记录等时时间的?
收到莫名消费短信,如何立即采取措施防止个人账户资金被他人继续使用?
收到莫名消费短信,如何立即采取措施防止个人账户资金被他人继续使用?