改写datax实现业务层mysql 全量,增量,多表合并同步kudu

时间:2025-03-23 07:15:08
# -*- coding: utf-8 -*- # @Time : 2021/10/27 19:04 # @Author : import sys import logging import os import signal import subprocess import time import re import socket import json import codecs import platform import random from traceback import format_exc from datetime import datetime from string import Template from dingtalkchatbot.chatbot import DingtalkChatbot from apscheduler.schedulers.blocking import BlockingScheduler from db import mysql_helper logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO) def isWindows(): return platform.system() == 'Windows' DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) DATAX_VERSION = 'DATAX-OPENSOURCE-3.0' if isWindows(): codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None) CLASS_PATH = ("%s/lib/*") % (DATAX_HOME) else: CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME) LOGBACK_FILE = ("%s/conf/") % (DATAX_HOME) DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME) DEFAULT_PROPERTY_CONF = "-=UTF-8 -= -=file:///dev/urandom -=%s -=%s" % ( DATAX_HOME, LOGBACK_FILE) ENGINE_COMMAND = "/usr/local/java/bin/java -server ${jvm} %s -classpath %s ${params} -mode ${mode} -jobid ${jobid} -job ${job}" % ( DEFAULT_PROPERTY_CONF, CLASS_PATH) REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999" RET_STATE = { "KILL": 143, "FAIL": -1, "OK": 0, "RUN": 1, "RETRY": 2 } jvmParameters = '-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/datax/log' dingding_url = '/robot/send?access_token=b' # 中间表 db_1 = mysql_helper.MysqlHelper(host='', user='user', password="password", database='sitemap', port=3306, charset='utf8mb4') class DingDingBot(object): # WebHook地址 def __init__(self, webhook="/robot/send?access_token=f047e"): # 初始化机器人小丁DingDingBot self.xiaoding = DingtalkChatbot(webhook) # Text消息@所有人 def send_message(self, msg): try: self.xiaoding.send_text(msg='{}'.format(msg), is_at_all=False) except: logging.error(format_exc()) def getLocalIp(): try: return socket.gethostbyname(socket.getfqdn(socket.gethostname())) except: return "Unknown" def isUrl(path): if not path: return False assert (isinstance(path, str)) m = re.match(r"^http[s]?://\S+\w*", path.lower()) if m: return True else: return False def buildStartCommand(options, args): commandMap = {} tempJVMCommand = DEFAULT_JVM if options["jvmParameters"]: tempJVMCommand = tempJVMCommand + " " + options["jvmParameters"] if options["remoteDebug"]: tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG if options["loglevel"]: tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options["loglevel"])) if options["mode"]: commandMap["mode"] = options["mode"] # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对) jobResource = args[0] if not isUrl(jobResource): jobResource = os.path.abspath(jobResource) if jobResource.lower().startswith("file://"): jobResource = jobResource[len("file://"):] jobParams = ("-=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_')) if options["params"]: jobParams = jobParams + " " + options["params"] if options["jobid"]: commandMap["jobid"] = options["jobid"] commandMap["jvm"] = tempJVMCommand commandMap["params"] = jobParams commandMap["job"] = jobResource return Template(ENGINE_COMMAND).substitute(**commandMap) def run(m_json={}, dingding_url='/robot/send?access_token=b9086aab568f1d0xx', jvmParameters='-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/datax/log', ): """ datax调度 :return: """ # try: # 打印json数据 # print("发送任务") # print((m_json, indent=2)) # print("**********************") options = {'jvmParameters': jvmParameters, 'jobid': '-1', 'mode': 'standalone', 'params': None, 'reader': None, 'writer': None, 'remoteDebug': None, 'loglevel': 'error'} f_name = '/opt/datax/bin' + '/' + 'dbname_' + str(random.randint(1000000, 9999999)) + ".json" # 生成存放json配置文件 with open(f_name, 'w', encoding="utf-8") as f: f.write(json.dumps(m_json, indent=2)) args = [f_name] startCommand = buildStartCommand(options, args) child_process = subprocess.Popen(startCommand, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) (stdout, stderr) = child_process.communicate() stdout_decode = stdout.decode('utf-8') DingDingBot(dingding_url).send_message( "[ %s \n %s " % (stdout_decode, stderr.decode('utf-8'))) # except: # # DingDingBot(dingding_url).send_message( # "[ %s " % format_exc()) # 数据处理完毕删除该文件 if os.path.exists(f_name): os.remove(f_name) if 'ERROR' in stdout_decode: print("数据有异常") # 失败 return 0 else: # 成功 return 1 def f_m_json(whether_full=1, db_saas="", dbname="", table_name="", create_time=""): """ :param whether_full: 1.全量 0.增量 2.多表合并 :param db_saas: 业务层mysql :param dbname: :param table_name: :param create_time: :return: """ # 获取表对应字段 information_schema_COLUMNS = db_saas.get_all( "select COLUMN_NAME from information_schema.COLUMNS where table_name = '{}';".format( table_name)) mysql_list = [] kudu_list = [] if whether_full ==2: mysql_list.append("new_saas_id") kudu_list.append({ "index": 0, "name": "new_saas_id", "type": "STRING", "compress": "DEFAULT_COMPRESSION", "encoding": "AUTO_ENCODING", "comment": "主键", "primaryKey": True }) for idx, da in enumerate(information_schema_COLUMNS): COLUMN_NAME = da.get('COLUMN_NAME', "") # 特殊关键字处理 if COLUMN_NAME == 'status': mysql_list.append('`status`') elif COLUMN_NAME == 'reads': mysql_list.append('`reads`') else: mysql_list.append(COLUMN_NAME) if whether_full == 2: idx = idx + 1 if COLUMN_NAME == "id": if whether_full == 2: kudu_list.append({ "index": idx, "name": COLUMN_NAME, "type": "STRING", "compress": "DEFAULT_COMPRESSION", "encoding": "AUTO_ENCODING", "comment": "注解", }) else: kudu_list.append({ "index": idx, "name": COLUMN_NAME, "type": "STRING", "compress": "DEFAULT_COMPRESSION", "encoding": "AUTO_ENCODING", "comment": "主键", "primaryKey": True }) else: kudu_list.append({ "index": idx, "name": COLUMN_NAME, "type": "STRING", "compress": "DEFAULT_COMPRESSION", "encoding": "AUTO_ENCODING", "comment": "注解" }) pass if whether_full == 1: mysql_connection_list = [ { "table": [ table_name ], "jdbcUrl": [ "jdbc:mysql://:3306/{}".format(dbname) ] } ] else: if whether_full == 2: querySql = "select MD5(CONCAT(,'{table_name}')) as new_saas_id,c.* from {table_name} as c ;".format( table_name=table_name) else: querySql = "select * from {table_name} where `create_time`>'{create_time}';".format(table_name=table_name, create_time=create_time) mysql_connection_list = [ { "querySql": [ querySql ], "jdbcUrl": [ "jdbc:mysql://:3306/{}".format(dbname) ] } ] if whether_full == 2: t_table_name = dbname + "_" + re.sub('\d', "", table_name) else: t_table_name = dbname + "_" + table_name m_json = { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "username", "password": "password", "column": mysql_list, "connection": mysql_connection_list } }, "writer": { "name": "kuduwriter", "parameter": { "kuduConfig": { "kudu.master_addresses": ":7051,:7051" }, "table": t_table_name, "replicaCount": 3, "truncate": False, "writeMode": "upsert", "column": kudu_list, "batchSize": 1024, "bufferSize": 2048, "skipFail": False, "encoding": "UTF-8" } } } ], "setting": { "speed": { "channel": 5 }, "errorLimit": { "record": 10000, "percentage": 0.02 } } } } return m_json # 单表全量与增量同步 def fixed_table(): # 判断当前表是否为空 xx_xx_datax_cnt = db_1.get_all("select count(1) from xx_xx_datax_status;") cnt = xx_xx_datax_cnt[0].get('count(1)') # 空表没有数据 if cnt == 0: db_tb_list = [ "xx_xx_page.page_included_statistics_by_everyday", "xx_xx_qqt_polymerize.polymerize", "xx_xx_qqt_ga.ga_all_page", "xx_xx_qqt_inquiry.inquiry", "xx_xx_qqt_site.site", ] for d in db_tb_list: db_tb = re.findall("(.*?)\.(.*)", d) dbname = db_tb[0][0] table_name = db_tb[0][1] data_dict = { "dbname": dbname, "tbname": table_name } db_1.just_insert("xx_xx_datax_status", **data_dict) # 是否需要全量同步 xx_xx_datax_status = db_1.get_all("select * from `xx_xx_datax_status`;") for xx_xx in xx_xx_datax_status: _id = xx_xx.get("id") dbname = xx_xx.get("dbname") table_name = xx_xx.get("tbname") xx_xx_status = xx_xx.get("status") db_saas = mysql_helper.MysqlHelper(host='.x5.x4', user='user', password="password", database=dbname, port=3306, charset='utf8mb4') # 获取最大时间 create_time_list = db_saas.get_all("SELECT MAX(create_time) FROM `{table_name}`;".format(table_name=table_name)) if not create_time_list: # 当前表没有数据 print("%s>>>>>>>>>当前表没有create_time数据" % (dbname + "." + table_name)) continue else: saas_xx_xx_data_create_time = create_time_list[0].get('MAX(create_time)') if not saas_xx_xx_data_create_time or saas_xx_xx_data_create_time is None: print("%s>>>>>>>>>当前表没有create_time数据" % (dbname + "." + table_name)) continue if xx_xx_status == 0: ## 一.全量同步********************************* m_json = f_m_json(whether_full=1, db_saas=db_saas, dbname=dbname, table_name=table_name) pass else: # 如果不为空就增量同步数据 # 获取当前最大时间并记录 ## 增量同步********************************* # 将状态更新为2 db_1.execute("UPDATE `xx_xx_datax_status` set `status`=2 where `id`=%r;" % (_id)) m_json = f_m_json(whether_full=0, db_saas=db_saas, dbname=dbname, table_name=table_name, create_time=saas_xx_xx_data_create_time) pass status = run(m_json=m_json, dingding_url=dingding_url, jvmParameters=jvmParameters, ) if status == 0: pass else: # 首次全量同步表并记录对应表的创建时间,修改状态为1 db_1.execute( "UPDATE `xx_xx_datax_status` set `xx_xx_data_create_time`='%s',`status`=1,`update_time`=%r where `id`=%s;" % ( saas_xx_xx_data_create_time, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _id)) # 统计状态为2的未完成的增量表并发送钉钉消息 status2 = db_1.get_all('SELECT dbname,tbname FROM `xx_xx_datax_status` where `status`=2;') if status2: DingDingBot(dingding_url).send_message( "[ 增量同步失败的表%s " % (str(status2))) # ''' # xx_xx_article.article_c*(AI文章收录) # xx_xx_product.product_i*(AI产品) # xx_xx_polymerize.polymerize(AI聚合页收录) # xx_xx_page.page_category(栏目收录) # xx_xx_product.product_i*(AI产品) # ''' # 多表合并同一张表,全表同步 def unfixed_table(): # 业务层用户表 *为每个客户对应id db_tb_list = [ 'x_x_article.article_c*', 'x_x_product.product_i*', 'x_x_page.page_url_i*', ] for d in db_tb_list: db_tb = re.findall("(.*?)\.(.*)", d) dbname = db_tb[0][0] table_name = db_tb[0][1].replace("*", "") db_saas = mysql_helper.MysqlHelper(host='', user='user', password="password", database=dbname, port=3306, charset='utf8mb4') # 获取当前mysql库所有表,并筛选需要同步表 TABLES = db_saas.get_all("SHOW TABLES") for t in TABLES: tl_list = list(t.values()) if tl_list: t_name = tl_list[0] if t_name.startswith(table_name): create_time_list = db_saas.get_all( "SELECT MAX(create_time) FROM `{table_name}`;".format(table_name=t_name)) if not create_time_list: # 当前表没有数据 print("%s>>>>>>>>>当前表没有create_time数据" % (dbname + "." + t_name)) continue else: saas_xx_xx_data_create_time = create_time_list[0].get('MAX(create_time)') if not saas_xx_xx_data_create_time or saas_xx_xx_data_create_time is None: continue m_json = f_m_json(whether_full=2, db_saas=db_saas, dbname=dbname, table_name=t_name) status = run(m_json=m_json, dingding_url=dingding_url, jvmParameters=jvmParameters, ) db_1.insert_or_update("xx2_datax_status", **{ "dbname": dbname, "tbname": t_name, "xx_xx_data_create_time": str(saas_xx_xx_data_create_time), "status": status, "update_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) pass def job(): try: # 全量,增量同步 fixed_table() # 全量多表合并同步设计 unfixed_table() except KeyboardInterrupt: pass except: DingDingBot(dingding_url).send_message("[ 同步数据失败%s " % (format_exc())) pass def main(): # 设置相应定时任务 scheduler = BlockingScheduler() scheduler.add_job(job, "cron", day_of_week='6', hour='13', minute='41', second='30') scheduler.start()