Python 操作数据库:读取 Clickhouse 数据存入csv文件

时间:2024-11-10 18:49:37

import pandas as pd
from clickhouse_driver import Client
import timeit
import logging
import threading
from threading import Lock
from queue import Queue
from typing import List, Dict, Set
from contextlib import contextmanager
import os
import time

# 配置参数
CONFIG = {
    'DB': {
        'host': 'xxx',
        'database': 'xxx',
        'user': 'xxxx',
        'password': 'xxxx'
    },
    'BATCH_SIZE': 5000,
    'TOTAL_RECORDS': 1000000,
    'NUM_THREADS': 5,
    'OUTPUT_FILE': 'yyxs_ck2excel_v4.csv',
    'MAX_RETRIES': 3,           # 最大重试次数
    'RETRY_DELAY': 5,           # 重试延迟(秒)
    'CONNECTION_TIMEOUT': 60    # 连接超时时间(秒)
}

# 设置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s.%(msecs)d - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

class DatabaseManager:
    _thread_local = threading.local()

    @classmethod
    @contextmanager
    def get_connection(cls):
        """线程安全的数据库连接管理器"""
        retry_count = 0
        while retry_count < CONFIG['MAX_RETRIES']:
            try:    
                if not hasattr(cls._thread_local, "client"):
                    cls._thread_local.client = Client(
                        **CONFIG['DB'],
                        connect_timeout=CONFIG['CONNECTION_TIMEOUT']
                    )
                    logger.info(f"Created new database connection for thread {threading.current_thread().name}")
                yield cls._thread_local.client
                break
            except Exception as e:
                retry_count += 1
                logger.error(f"Database connection error (attempt {retry_count}): {str(e)}")
                if hasattr(cls._thread_local, "client"):
                    cls._thread_local.client.disconnect()
                    delattr(cls._thread_local, "client")
                if retry_count < CONFIG['MAX_RETRIES']:
                    time.sleep(CONFIG['RETRY_DELAY'])
                else:
                    raise
    
    @classmethod
    def close_all_connections(cls):
        """关闭当前线程的数据库连接"""
        if hasattr(cls._thread_local, "client"):
            cls._thread_local.client.disconnect()
            delattr(cls._thread_local, "client")
            logger.info(f"Closed database connection for thread {threading.current_thread().name}")

class DataProcessor:
    def __init__(self):
        self.columns = [
            "a", "b", "c", "d"
        ]
        self.query = '''
            SELECT
               a,b,c,d
            FROM
                table_name
            ORDER BY
               a,b,c,d 
        '''
        self.file_lock = Lock()  # 添加文件写入锁
        self.total_rows = 0      # 添加行数统计
        self.processed_batches = set()  # 记录已成功处理的批次
        self.failed_batches = set()     # 记录失败的批次
    def fetch_data_batch(self, batch_size: int, start: int) -> List[tuple]:
        """获取一批数据,带重试机制"""
        retry_count = 0
        while retry_count < CONFIG['MAX_RETRIES']:
            try:
                with DatabaseManager.get_connection() as client:
                    query_with_limit = f"{self.query} LIMIT {batch_size} OFFSET {start}"
                    result = client.execute(query_with_limit)
                    logger.info(f"Fetched {len(result)} records starting from {start}.")
                    return result
            except Exception as e:
                retry_count += 1
                logger.error(f"Error fetching batch starting at {start} (attempt {retry_count}): {str(e)}")
                if retry_count < CONFIG['MAX_RETRIES']:
                    time.sleep(CONFIG['RETRY_DELAY'])
                else:
                    raise

    def save_to_csv(self, df: pd.DataFrame, file_name: str, batch_start: int):
        """保存数据到CSV文件"""
        try:
            with self.file_lock:  # 使用锁保护文件写入
                file_exists = os.path.exists(file_name) and os.path.getsize(file_name) > 0
                df.to_csv(
                    file_name, 
                    mode='a', 
                    header= not file_exists,
                    index=False
                )
                self.total_rows += len(df)
                self.processed_batches.add(batch_start)
                logger.info(f"Appended {len(df)} records to {file_name}. Total rows: {self.total_rows}")
        except Exception as e:
            logger.error(f"Error saving to CSV: {str(e)}")
            raise

    def process_batch(self, start: int, batch_size: int, output_file: str):
        """处理单个批次的数据"""
        try:
            if start in self.processed_batches:
                logger.info(f"Batch {start} already processed, skipping.")
                return True

            result_batch = self.fetch_data_batch(batch_size, start)
            df_batch = pd.DataFrame(result_batch, columns=self.columns)
            self.save_to_csv(df_batch, output_file, start)
            return True
        except Exception as e:
            logger.error(f"Error processing batch starting at {start}: {str(e)}")
            self.failed_batches.add(start)
            return False

def main_v1():
    try:
        processor = DataProcessor()
        output_file = CONFIG['OUTPUT_FILE']
        
        # 清空或创建输出文件
        with open(output_file, 'w', encoding='utf-8') as f:
            pass

        queue = Queue()
        retry_queue = Queue()  # 用于重试失败的批次
        threads = []

        def worker():
            while True:
                try:
                    start = queue.get()
                    if start is None:
                        break
                    
                    success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)
                    if not success:
                        retry_queue.put(start)
                    
                    queue.task_done()
                except Exception as e:
                    logger.error(f"Worker thread error: {str(e)}")
                finally:
                    queue.task_done()

        # 启动工作线程
        for _ in range(CONFIG['NUM_THREADS']):
            t = threading.Thread(target=worker)
            t.daemon = True
            t.start()
            threads.append(t)

        # 添加任务到队列
        for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):
            queue.put(start)

        # 等待主要处理完成
        queue.join()

        # 处理失败的批次
        while not retry_queue.empty():
            start = retry_queue.get()
            logger.info(f"Retrying failed batch starting at {start}")
            if processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file):
                logger.info(f"Successfully retried batch {start}")
            else:
                logger.error(f"Failed to process batch {start} after retries")

        # 停止所有线程
        for _ in threads:
            queue.put(None)
        for t in threads:
            t.join()

        # 最终验证
        logger.info(f"Processing completed. Total rows: {processor.total_rows}")
        logger.info(f"Processed batches: {len(processor.processed_batches)}")
        logger.info(f"Failed batches: {len(processor.failed_batches)}")
        
        if processor.failed_batches:
            logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")

    except Exception as e:
        logger.error(f"Main process error: {str(e)}")
        raise
    finally:
        DatabaseManager.close_all_connections()

def main():
    try:
        processor = DataProcessor()
        output_file = CONFIG['OUTPUT_FILE']
        
        # 清空或创建输出文件
        with open(output_file, 'w', encoding='utf-8') as f:
            pass

        queue = Queue()
        retry_queue = Queue()
        threads = []

        def worker():
            while True:
                try:
                    start = queue.get()
                    if start is None:  # 退出信号
                        queue.task_done()
                        break
                    
                    try:
                        success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)
                        if not success:
                            retry_queue.put(start)
                    except Exception as e:
                        logger.error(f"Error processing batch at offset {start}: {str(e)}")
                        retry_queue.put(start)
                    finally:
                        queue.task_done()  # 只在这里调用一次
                except Exception as e:
                    logger.error(f"Worker thread error: {str(e)}")
                    # 不要在这里调用 queue.task_done()

        # 启动工作线程
        for _ in range(CONFIG['NUM_THREADS']):
            t = threading.Thread(target=worker)
            t.daemon = True
            t.start()
            threads.append(t)

        # 添加任务到队列
        total_batches = (CONFIG['TOTAL_RECORDS'] + CONFIG['BATCH_SIZE'] - 1) // CONFIG['BATCH_SIZE']
        for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):
            queue.put(start)

        # 等待主要处理完成
        queue.join()

        # 处理失败的批次
        retry_count = 0
        max_retries = 3
        while not retry_queue.empty() and retry_count < max_retries:
            retry_count += 1
            retry_size = retry_queue.qsize()
            logger.info(f"Retrying {retry_size} failed batches (attempt {retry_count})")
            
            # 将失败的批次重新放入主队列
            for _ in range(retry_size):
                start = retry_queue.get()
                queue.put(start)
            
            # 等待重试完成
            queue.join()

        # 停止所有线程
        for _ in threads:
            queue.put(None)
        for t in threads:
            t.join()

        # 最终验证
        logger.info(f"Processing completed. Total rows: {processor.total_rows}")
        logger.info(f"Expected batches: {total_batches}")
        logger.info(f"Processed batches: {len(processor.processed_batches)}")
        logger.info(f"Failed batches: {len(processor.failed_batches)}")
        
        if processor.failed_batches:
            logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")

        # 验证数据完整性
        try:
            df_final = pd.read_csv(output_file)
            actual_rows = len(df_final)
            logger.info(f"Final CSV file contains {actual_rows} rows")
            
            if actual_rows != processor.total_rows:
                logger.warning(f"Row count mismatch: CSV has {actual_rows} rows, but processed {processor.total_rows} rows")
                
            # 检查是否有重复的表头
            duplicate_headers = df_final[df_final.iloc[:, 0] == df_final.columns[0]]
            if not duplicate_headers.empty:
                logger.warning(f"Found {len(duplicate_headers)} duplicate headers at rows: {duplicate_headers.index.tolist()}")
                # 清理重复表头
                df_final = df_final[df_final.iloc[:, 0] != df_final.columns[0]]
                df_final.to_csv(output_file, index=False)
                logger.info(f"Cleaned CSV file now contains {len(df_final)} rows")
                
        except Exception as e:
            logger.error(f"Error validating final CSV file: {str(e)}")

    except Exception as e:
        logger.error(f"Main process error: {str(e)}")
        raise
    finally:
        DatabaseManager.close_all_connections()

if __name__ == "__main__":
    start_time = timeit.default_timer()
    try:
        main()
        elapsed_time = timeit.default_timer() - start_time
        logger.info(f"数据提取和存储完成,耗时: {elapsed_time:.2f} 秒")
    except Exception as e:
        logger.error(f"程序执行失败: {str(e)}")
        raise
主要类
  • DatabaseManager

管理数据库连接的线程安全类

使用 threading.local() 确保每个线程有自己的连接

包含重试机制和连接管理功能

  • DataProcessor

处理数据的核心类

定义了数据列和查询语句

处理数据批次的获取和保存

跟踪处理状态和失败的批次

2. 工作流程

  • 初始化

创建空的输出文件

初始化线程池和任务队列

  • 数据处理

将总数据量分成多个批次

多个工作线程并行处理数据批次

每个批次:

  • 从数据库获取数据
  • 转换为 DataFrame
  • 保存到 CSV 文件
  • 错误处理

失败的批次会进入重试队列

最多重试 3 次

记录所有失败的批次

  • 数据验证

检查最终 CSV 文件的行数

检查和清理重复的表头

验证数据完整性

3. 特点

  • 线程安全

使用线程本地存储管理数据库连接

文件写入使用锁保护

  • 容错机制

数据库连接重试

批次处理重试

详细的日志记录

  • 性能优化

批量处理数据

多线程并行处理

使用队列管理任务

  • 监控和日志

详细的日志记录

处理进度跟踪

执行时间统计

这个程序适合处理大量数据的导出任务,具有良好的容错性和可靠性。