Flask项目中PostgreSQL 和 Elasticsearch (ES) 的批量更新

时间:2024-11-13 18:51:02

在 Flask 项目中实现 PostgreSQL 和 Elasticsearch (ES) 的同步更新可以通过以下几步完成,结合了批量更新、定时刷新和手动触发,以确保同步高效、灵活并尽可能一致。以下是完整的设计和代码实现:

方案设计

  1. 批量更新数据库:对数据库的数据进行批量更新操作,将成功更新的记录加入到 Elasticsearch 缓冲区。
  2. 定时任务刷新缓冲区:使用 APScheduler 配置定时任务,每隔一段时间检查缓冲区,若缓冲区有数据则将其批量更新到 Elasticsearch。
  3. 手动更新接口:提供手动触发刷新缓冲区的接口,以便在数据未达到批量更新的数量要求时,也可以随时触发缓冲区更新。

这种方案确保在数据量大时批量更新,减少对 Elasticsearch 的频繁写入。同时,定时任务和手动触发可以保证在数据量不足的情况下也能及时更新。

代码实现

以下是完整的代码实现,包含 Flask 路由、批量更新、定时任务和手动触发更新。

1. 初始化 PostgreSQL、Elasticsearch、缓冲区和定时任务

from flask import Flask, request, jsonify
import psycopg2
from elasticsearch import Elasticsearch, helpers
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

app = Flask(__name__)

# 初始化 PostgreSQL 客户端
db_conn = psycopg2.connect(
    dbname="your_db",
    user="your_user",
    password="your_password",
    host="localhost",
    port="5432"
)

# 初始化 Elasticsearch 客户端
es = Elasticsearch("http://localhost:9200")

# 缓冲区和批量大小
es_buffer = []
BULK_SIZE = 100

# 配置定时任务
scheduler = BackgroundScheduler()
scheduler.start()

def add_to_es_buffer(record):
    """
    将记录加入缓冲区
    """
    es_buffer.append({
        "_op_type": "update",
        "_index": "your_index",
        "_id": record["id"],
        "doc": {
            "field1": record["field1"],
            "field2": record["field2"]
        }
    })

def flush_es_buffer():
    """
    检查缓冲区,若有数据则批量更新 Elasticsearch
    """
    global es_buffer
    if es_buffer:
        try:
            helpers.bulk(es, es_buffer)
            print(f"{datetime.now()}: ES buffer flushed successfully with {len(es_buffer)} records.")
            es_buffer = []  # 清空缓冲区
        except Exception as e:
            print(f"{datetime.now()}: Error flushing ES buffer: {e}")

# 定时任务:每隔 5 分钟检查缓冲区
scheduler.add_job(flush_es_buffer, 'interval', minutes=5)

2. 批量更新 PostgreSQL 并将数据加入 ES 缓冲区

@app.route('/bulk_update', methods=['POST'])
def bulk_update():
    """
    批量更新 PostgreSQL 并将数据加入 Elasticsearch 缓冲区
    """
    data = request.json
    records = data.get("records", [])

    try:
        # Step 1: 批量更新 PostgreSQL 数据
        with db_conn:
            with db_conn.cursor() as cursor:
                update_query = """
                UPDATE your_table
                SET field1 = data.field1, field2 = data.field2
                FROM (VALUES %s) AS data (id, field1, field2)
                WHERE your_table.id = data.id
                """
                values = [(record["id"], record["field1"], record["field2"]) for record in records]
                psycopg2.extras.execute_values(cursor, update_query, values)

        # Step 2: 将数据加入 Elasticsearch 缓冲区
        for record in records:
            add_to_es_buffer(record)

        # Step 3: 如果缓冲区达到 BULK_SIZE,立即更新
        if len(es_buffer) >= BULK_SIZE:
            flush_es_buffer()

        return jsonify({"status": "success", "message": f"{len(records)} records updated in PG and buffered for ES"}), 200

    except Exception as e:
        db_conn.rollback()
        return jsonify({"status": "error", "message": str(e)}), 500

3. 手动触发缓冲区更新的 API

@app.route('/manual_flush', methods=['POST'])
def manual_flush():
    """
    手动触发,将缓冲区中的数据立即写入 Elasticsearch
    """
    if not es_buffer:
        return jsonify({"status": "success", "message": "No records in ES buffer to flush"}), 200

    try:
        flush_es_buffer()
        return jsonify({"status": "success", "message": "ES buffer flushed manually"}), 200
    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500

测试请求

批量更新测试

curl -X POST http://localhost:5000/bulk_update -H "Content-Type: application/json" -d '{
    "records": [
        {"id": "1", "field1": "new_value1", "field2": "new_value2"},
        {"id": "2", "field1": "new_value3", "field2": "new_value4"}
    ]
}'

手动更新测试

curl -X POST http://localhost:5000/manual_flush

方案特点

  • 自动与手动结合:定时任务自动刷新缓冲区,手动更新接口则可在必要时立即触发。
  • 批量操作与数据一致性:数据库批量更新与 Elasticsearch 批量更新确保高效写入,避免频繁写入操作。
  • 事务处理与错误回滚:确保 PostgreSQL 数据库的事务完整性,若数据库更新出错则进行回滚。