shadowX 分布式任务调度框架实现企业级监控业务

时间:2021-07-26 20:03:30

运维平台对于企业来说是十分重要,运维中的实时监控是运维日常工作之一,具有一个好的任务监控平台能准确有效的发现服务异常,则能为服务正常运行提供可靠性保障,如发现异常能及时有效的降级处理,降低损失。

shadowX是一款开源的监控框架,用Python编写,封装了基础监控类,第三方组件监控,支持扩展业务监控 等等。1.0版中使用到celery,RMQ 来做异步任务,性能 1000个任务/s,同时执行,支持定时任务和循环任务,支持定时器自定义,分布式部署 等等

特点:

shadowX_Server

  • Stable: V3.2.4
  • Beta: V3.2.6
  • Dispatch: V3.2.7

Server端部署

  1. 不同的机房进行单独部署Server
  2. 修改配置为当前机房
  3. 启动Beat 和 Worker

分布式

  1. 运行run/master
  2. 运行run/client连接到其他服务端master进行监控和任务分片处理
  3. 去中心化,动态选将master

任务分片、定时器

  1. 通过client轮循连接多个master,如master出现异常,client会将master中任务分配到其他master的任务队列中;
  2. 定时器Beat通过最小堆算法排序任务执行时间序列;
  3. 异步执行任务、动态导入监控项


项目目录结构

shadowX 分布式任务调度框架实现企业级监控业务

目录简介

config : 项目配置文件 ;

controller:tornado server 控制器类 ;

front:前端页面 ;

master:管理后台 ;

models:tornado models 类 ;

run:包含任务定时器、任务worker类等 ;

server:包含系统调用的服务类 ;

task:任务执行的业务类


系统架构

Tornado Server -> MySQL/Redis -> shadowX Beat -> RabbitMQ -> Worker

Beat定时器实现:

#!/usr/bin/env python
# coding=utf-8
from datetime import datetime, timedelta
from threading import Thread
from task_worker import execute
from dbpool import db_config
from dbpool import db_instance
from task_crontab import crontab_run_nextTime
import time
import heapq

DATE_FORMATE = "%Y-%m-%d %H:%M:%S"


def async(f):
    '''线程异步'''
    def wrapper(*args, **kwargs):
        thr = Thread(target=f, args=args, kwargs=kwargs)
        thr.start()
    return wrapper


def date_seconds(date1, date2):
    '''将时间差转化为秒'''
    ti = date2 - date1
    return ti.days*24*3600 + ti.seconds


def connection_pool():
    '''获取连接池实例 '''
    pool = db_instance.ConnectionPool(**db_config.mysql)
    return pool


def init_task():
    '''初始化任务 '''
    with connection_pool().cursor() as cursor:
        print("init ...")
        init_time = (datetime.now()).strftime(DATE_FORMATE)
        sql = "update tasks set last_run_time = '" + init_time + "'"
        cursor.execute(sql)


@async
def update_task(task_id, run_time):
    '''异步更新任务的执行时间 '''
    with connection_pool().cursor() as cursor:
        sql = "update tasks set last_run_time = '" + run_time + "' where id = %d" % task_id
        cursor.execute(sql)


@async
def execute_task(taskList):
    '''最小堆执行任务 '''
    now_time = datetime.now()
    run_time = list()
    count = 0
    for task in taskList:
        dt = date_seconds(now_time, task['last_run_time'])
        # 如果得到的结果为0,则执行任务,修改时间为最新的next_run_time
        if dt == 0:
            # 执行循环任务
            if task['task_type'] == 1:
                if task['task_status'] == 1:
                    task_id = execute.apply_async(args=[task['task_name']])
                    #print("execute task: " + task['task_name'] + "___ task_id " + str(task_id))
                    run_time.append(task['sec'])
                    next_run_time = (datetime.now() + timedelta(seconds=task['sec']+1)).strftime(DATE_FORMATE)
                    update_task(task_id=task['id'], run_time=next_run_time)
                    count += 1

            # 执行定时任务
            elif task['task_type'] == 2:
                if task['task_status'] == 1:
                    task_id = execute.apply_async(args=[task['task_name']])
                    print("execute task: " + task['task_name'] + "___ task_id " + str(task_id))
                    run_time.append(task['sec'])
                    # 计算下次运行的时间
                    next_run_time = crontab_run_nextTime(task['crontab'])[0]
                    update_task(task_id=task['id'], run_time=next_run_time)
                    count += 1

        elif dt < 0:
            #linu如果得到的结果为负数,则需要修改时间为最新的next_run_time
            if task['task_type'] == 1:
                if task['task_status'] == 1:
                    run_time.append(task['sec'])
                    next_run_time = (datetime.now() + timedelta(seconds=task['sec']+1)).strftime(DATE_FORMATE)
                    update_task(task_id=task['id'], run_time=next_run_time)

            elif task['task_type'] == 2:
                next_run_time = crontab_run_nextTime(task['crontab'])[0]
                update_task(task_id=task['id'], run_time=next_run_time)
        else:
            run_time.append(dt)

    print(count)

    '''计算最小休眠时间 '''
    if run_time.__len__() == 0:
        return 1
    else:
        min_time = (heapq.nsmallest(1, run_time))[0]
        if min_time > 60:
            return 10
        if min_time == 0:
            return 1
        else:
            return min_time


if __name__ == "__main__":
    init_task()
    while True:
        '''可以用本地队列对全量获取task做优化'''
        with connection_pool().cursor() as cursor:
            res = cursor.execute('select * from tasks')
            tasks = sorted(cursor, key=lambda x: x['id'])
            #返回最小休眠时间
            min_sleep_time = execute_task(list(tasks))
            #主进程休眠
            if min_sleep_time is None:
                print("sleeping ...1 s")
                time.sleep(1)
            else:
                print("sleeping ...%d s" % min_sleep_time)
                time.sleep(min_sleep_time)


Worker消费者实现:

#!/usr/bin/env python
# coding=utf-8
from celery.bin import worker as celery_worker
from celery import Celery, platforms
from importlib import import_module, reload


class TWorker():
    def __init__(self):
        """ Celery 配置信息 """
        # rabbitmq 地址
        self.BROKER = 'amqp://admin:guest@127.0.0.1:5672/'
        # redis 地址
        self.BACKEND = 'redis://127.0.0.1:6379/0'
        self.CELERY_ACCEPT_CONTENT = ['json']
        self.CELERY_TASK_SERIALIZER = 'json'
        self.CELERY_RESULT_SERIALIZER = 'json'
        self.CELERY_ENABLE_UTC = False
        self.CELERY_TIMEZONE = 'Asia/Shanghai'
        platforms.C_FORCE_ROOT = True

        # 创建 celery
        self.celery = Celery('task_worker', broker=self.BROKER, backend=self.BACKEND)
        # 定义任务发现
        self.celery.conf.CELERY_IMPORTS = ['task', 'task.all_task']

    def get_celery(self):
        return self.celery

    def dync_load_task(self, import_name):
        """ 动态任务导入 """
        import_name = str(import_name).replace(':', '.')
        modules = import_name.split('.')
        mod = import_module(modules[0])
        for comp in modules[1:]:
            if not hasattr(mod, comp):
                reload(mod)
            mod = getattr(mod, comp)
        return mod

    def worker_start(self):
        """ 启动 worker """
        worker = celery_worker.worker(app=self.celery)
        worker.run(
            broker=self.BROKER,
            concurrency=30,
            traceback=False,
            loglevel='INFO',
        )


""" 实例化Worker """
tw = TWorker()
celery = tw.get_celery()

""" 任务注册"""
@celery.task
def execute(func, *args, **kwargs):
    func = tw.load_task(func)
    return func(*args, **kwargs)
_instances = {}
from .db_connection import MySQLConnectionPool


def ConnectionPool(*args, **kwargs):
    try:
        pool_name = args[0]
    except IndexError:
        pool_name = kwargs['pool_name']

    if pool_name not in _instances:
        _instances[pool_name] = MySQLConnectionPool(*args, **kwargs)
    pool = _instances[pool_name]
    assert isinstance(pool, MySQLConnectionPool)
    return pool


连接池


1000个任务同时执行

shadowX 分布式任务调度框架实现企业级监控业务


github 地址:https://github.com/roancsu/shadowX

欢迎邮件交流:roancsu@163.com