改写datax实现业务层mysql 全量,增量,多表合并同步kudu
# -*- coding: utf-8 -*-
# @Time : 2021/10/27 19:04
# @Author :
import sys
import logging
import os
import signal
import subprocess
import time
import re
import socket
import json
import codecs
import platform
import random
from traceback import format_exc
from datetime import datetime
from string import Template
from dingtalkchatbot.chatbot import DingtalkChatbot
from apscheduler.schedulers.blocking import BlockingScheduler
from db import mysql_helper
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
level=logging.INFO)
def isWindows():
return platform.system() == 'Windows'
DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/") % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-=UTF-8 -= -=file:///dev/urandom -=%s -=%s" % (
DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "/usr/local/java/bin/java -server ${jvm} %s -classpath %s ${params} -mode ${mode} -jobid ${jobid} -job ${job}" % (
DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
RET_STATE = {
"KILL": 143,
"FAIL": -1,
"OK": 0,
"RUN": 1,
"RETRY": 2
}
jvmParameters = '-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/datax/log'
dingding_url = '/robot/send?access_token=b'
# 中间表
db_1 = mysql_helper.MysqlHelper(host='', user='user',
password="password",
database='sitemap', port=3306,
charset='utf8mb4')
class DingDingBot(object):
# WebHook地址
def __init__(self,
webhook="/robot/send?access_token=f047e"):
# 初始化机器人小丁DingDingBot
self.xiaoding = DingtalkChatbot(webhook)
# Text消息@所有人
def send_message(self, msg):
try:
self.xiaoding.send_text(msg='{}'.format(msg), is_at_all=False)
except:
logging.error(format_exc())
def getLocalIp():
try:
return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
except:
return "Unknown"
def isUrl(path):
if not path:
return False
assert (isinstance(path, str))
m = re.match(r"^http[s]?://\S+\w*", path.lower())
if m:
return True
else:
return False
def buildStartCommand(options, args):
commandMap = {}
tempJVMCommand = DEFAULT_JVM
if options["jvmParameters"]:
tempJVMCommand = tempJVMCommand + " " + options["jvmParameters"]
if options["remoteDebug"]:
tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
if options["loglevel"]:
tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options["loglevel"]))
if options["mode"]:
commandMap["mode"] = options["mode"]
# jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
jobResource = args[0]
if not isUrl(jobResource):
jobResource = os.path.abspath(jobResource)
if jobResource.lower().startswith("file://"):
jobResource = jobResource[len("file://"):]
jobParams = ("-=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
if options["params"]:
jobParams = jobParams + " " + options["params"]
if options["jobid"]:
commandMap["jobid"] = options["jobid"]
commandMap["jvm"] = tempJVMCommand
commandMap["params"] = jobParams
commandMap["job"] = jobResource
return Template(ENGINE_COMMAND).substitute(**commandMap)
def run(m_json={},
dingding_url='/robot/send?access_token=b9086aab568f1d0xx',
jvmParameters='-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/datax/log', ):
"""
datax调度
:return:
"""
# try:
# 打印json数据
# print("发送任务")
# print((m_json, indent=2))
# print("**********************")
options = {'jvmParameters': jvmParameters,
'jobid': '-1', 'mode': 'standalone', 'params': None, 'reader': None, 'writer': None,
'remoteDebug': None,
'loglevel': 'error'}
f_name = '/opt/datax/bin' + '/' + 'dbname_' + str(random.randint(1000000, 9999999)) + ".json"
# 生成存放json配置文件
with open(f_name, 'w', encoding="utf-8") as f:
f.write(json.dumps(m_json, indent=2))
args = [f_name]
startCommand = buildStartCommand(options, args)
child_process = subprocess.Popen(startCommand, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
(stdout, stderr) = child_process.communicate()
stdout_decode = stdout.decode('utf-8')
DingDingBot(dingding_url).send_message(
"[ %s \n %s " % (stdout_decode, stderr.decode('utf-8')))
# except:
#
# DingDingBot(dingding_url).send_message(
# "[ %s " % format_exc())
# 数据处理完毕删除该文件
if os.path.exists(f_name):
os.remove(f_name)
if 'ERROR' in stdout_decode:
print("数据有异常")
# 失败
return 0
else:
# 成功
return 1
def f_m_json(whether_full=1, db_saas="", dbname="", table_name="", create_time=""):
"""
:param whether_full: 1.全量 0.增量 2.多表合并
:param db_saas: 业务层mysql
:param dbname:
:param table_name:
:param create_time:
:return:
"""
# 获取表对应字段
information_schema_COLUMNS = db_saas.get_all(
"select COLUMN_NAME from information_schema.COLUMNS where table_name = '{}';".format(
table_name))
mysql_list = []
kudu_list = []
if whether_full ==2:
mysql_list.append("new_saas_id")
kudu_list.append({
"index": 0,
"name": "new_saas_id",
"type": "STRING",
"compress": "DEFAULT_COMPRESSION",
"encoding": "AUTO_ENCODING",
"comment": "主键",
"primaryKey": True
})
for idx, da in enumerate(information_schema_COLUMNS):
COLUMN_NAME = da.get('COLUMN_NAME', "")
# 特殊关键字处理
if COLUMN_NAME == 'status':
mysql_list.append('`status`')
elif COLUMN_NAME == 'reads':
mysql_list.append('`reads`')
else:
mysql_list.append(COLUMN_NAME)
if whether_full == 2:
idx = idx + 1
if COLUMN_NAME == "id":
if whether_full == 2:
kudu_list.append({
"index": idx,
"name": COLUMN_NAME,
"type": "STRING",
"compress": "DEFAULT_COMPRESSION",
"encoding": "AUTO_ENCODING",
"comment": "注解",
})
else:
kudu_list.append({
"index": idx,
"name": COLUMN_NAME,
"type": "STRING",
"compress": "DEFAULT_COMPRESSION",
"encoding": "AUTO_ENCODING",
"comment": "主键",
"primaryKey": True
})
else:
kudu_list.append({
"index": idx,
"name": COLUMN_NAME,
"type": "STRING",
"compress": "DEFAULT_COMPRESSION",
"encoding": "AUTO_ENCODING",
"comment": "注解"
})
pass
if whether_full == 1:
mysql_connection_list = [
{
"table": [
table_name
],
"jdbcUrl": [
"jdbc:mysql://:3306/{}".format(dbname)
]
}
]
else:
if whether_full == 2:
querySql = "select MD5(CONCAT(,'{table_name}')) as new_saas_id,c.* from {table_name} as c ;".format(
table_name=table_name)
else:
querySql = "select * from {table_name} where `create_time`>'{create_time}';".format(table_name=table_name,
create_time=create_time)
mysql_connection_list = [
{
"querySql": [
querySql
],
"jdbcUrl": [
"jdbc:mysql://:3306/{}".format(dbname)
]
}
]
if whether_full == 2:
t_table_name = dbname + "_" + re.sub('\d', "", table_name)
else:
t_table_name = dbname + "_" + table_name
m_json = {
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "username",
"password": "password",
"column": mysql_list,
"connection": mysql_connection_list
}
},
"writer": {
"name": "kuduwriter",
"parameter": {
"kuduConfig": {
"kudu.master_addresses": ":7051,:7051"
},
"table": t_table_name,
"replicaCount": 3,
"truncate": False,
"writeMode": "upsert",
"column": kudu_list,
"batchSize": 1024,
"bufferSize": 2048,
"skipFail": False,
"encoding": "UTF-8"
}
}
}
],
"setting": {
"speed": {
"channel": 5
},
"errorLimit": {
"record": 10000,
"percentage": 0.02
}
}
}
}
return m_json
# 单表全量与增量同步
def fixed_table():
# 判断当前表是否为空
xx_xx_datax_cnt = db_1.get_all("select count(1) from xx_xx_datax_status;")
cnt = xx_xx_datax_cnt[0].get('count(1)')
# 空表没有数据
if cnt == 0:
db_tb_list = [
"xx_xx_page.page_included_statistics_by_everyday",
"xx_xx_qqt_polymerize.polymerize",
"xx_xx_qqt_ga.ga_all_page",
"xx_xx_qqt_inquiry.inquiry",
"xx_xx_qqt_site.site",
]
for d in db_tb_list:
db_tb = re.findall("(.*?)\.(.*)", d)
dbname = db_tb[0][0]
table_name = db_tb[0][1]
data_dict = {
"dbname": dbname,
"tbname": table_name
}
db_1.just_insert("xx_xx_datax_status", **data_dict)
# 是否需要全量同步
xx_xx_datax_status = db_1.get_all("select * from `xx_xx_datax_status`;")
for xx_xx in xx_xx_datax_status:
_id = xx_xx.get("id")
dbname = xx_xx.get("dbname")
table_name = xx_xx.get("tbname")
xx_xx_status = xx_xx.get("status")
db_saas = mysql_helper.MysqlHelper(host='.x5.x4', user='user',
password="password",
database=dbname, port=3306,
charset='utf8mb4')
# 获取最大时间
create_time_list = db_saas.get_all("SELECT MAX(create_time) FROM `{table_name}`;".format(table_name=table_name))
if not create_time_list:
# 当前表没有数据
print("%s>>>>>>>>>当前表没有create_time数据" % (dbname + "." + table_name))
continue
else:
saas_xx_xx_data_create_time = create_time_list[0].get('MAX(create_time)')
if not saas_xx_xx_data_create_time or saas_xx_xx_data_create_time is None:
print("%s>>>>>>>>>当前表没有create_time数据" % (dbname + "." + table_name))
continue
if xx_xx_status == 0:
## 一.全量同步*********************************
m_json = f_m_json(whether_full=1, db_saas=db_saas, dbname=dbname,
table_name=table_name)
pass
else:
# 如果不为空就增量同步数据
# 获取当前最大时间并记录
## 增量同步*********************************
# 将状态更新为2
db_1.execute("UPDATE `xx_xx_datax_status` set `status`=2 where `id`=%r;" % (_id))
m_json = f_m_json(whether_full=0, db_saas=db_saas, dbname=dbname,
table_name=table_name, create_time=saas_xx_xx_data_create_time)
pass
status = run(m_json=m_json, dingding_url=dingding_url,
jvmParameters=jvmParameters, )
if status == 0:
pass
else:
# 首次全量同步表并记录对应表的创建时间,修改状态为1
db_1.execute(
"UPDATE `xx_xx_datax_status` set `xx_xx_data_create_time`='%s',`status`=1,`update_time`=%r where `id`=%s;" % (
saas_xx_xx_data_create_time, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _id))
# 统计状态为2的未完成的增量表并发送钉钉消息
status2 = db_1.get_all('SELECT dbname,tbname FROM `xx_xx_datax_status` where `status`=2;')
if status2:
DingDingBot(dingding_url).send_message(
"[ 增量同步失败的表%s " % (str(status2)))
# '''
# xx_xx_article.article_c*(AI文章收录)
# xx_xx_product.product_i*(AI产品)
# xx_xx_polymerize.polymerize(AI聚合页收录)
# xx_xx_page.page_category(栏目收录)
# xx_xx_product.product_i*(AI产品)
# '''
# 多表合并同一张表,全表同步
def unfixed_table():
# 业务层用户表 *为每个客户对应id
db_tb_list = [
'x_x_article.article_c*',
'x_x_product.product_i*',
'x_x_page.page_url_i*',
]
for d in db_tb_list:
db_tb = re.findall("(.*?)\.(.*)", d)
dbname = db_tb[0][0]
table_name = db_tb[0][1].replace("*", "")
db_saas = mysql_helper.MysqlHelper(host='', user='user',
password="password",
database=dbname, port=3306,
charset='utf8mb4')
# 获取当前mysql库所有表,并筛选需要同步表
TABLES = db_saas.get_all("SHOW TABLES")
for t in TABLES:
tl_list = list(t.values())
if tl_list:
t_name = tl_list[0]
if t_name.startswith(table_name):
create_time_list = db_saas.get_all(
"SELECT MAX(create_time) FROM `{table_name}`;".format(table_name=t_name))
if not create_time_list:
# 当前表没有数据
print("%s>>>>>>>>>当前表没有create_time数据" % (dbname + "." + t_name))
continue
else:
saas_xx_xx_data_create_time = create_time_list[0].get('MAX(create_time)')
if not saas_xx_xx_data_create_time or saas_xx_xx_data_create_time is None:
continue
m_json = f_m_json(whether_full=2, db_saas=db_saas, dbname=dbname,
table_name=t_name)
status = run(m_json=m_json, dingding_url=dingding_url,
jvmParameters=jvmParameters, )
db_1.insert_or_update("xx2_datax_status", **{
"dbname": dbname,
"tbname": t_name,
"xx_xx_data_create_time": str(saas_xx_xx_data_create_time),
"status": status,
"update_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
pass
def job():
try:
# 全量,增量同步
fixed_table()
# 全量多表合并同步设计
unfixed_table()
except KeyboardInterrupt:
pass
except:
DingDingBot(dingding_url).send_message("[ 同步数据失败%s " % (format_exc()))
pass
def main():
# 设置相应定时任务
scheduler = BlockingScheduler()
scheduler.add_job(job, "cron", day_of_week='6', hour='13', minute='41', second='30')
scheduler.start()