运维平台对于企业来说是十分重要,运维中的实时监控是运维日常工作之一,具有一个好的任务监控平台能准确有效的发现服务异常,则能为服务正常运行提供可靠性保障,如发现异常能及时有效的降级处理,降低损失。
shadowX是一款开源的监控框架,用Python编写,封装了基础监控类,第三方组件监控,支持扩展业务监控 等等。1.0版中使用到celery,RMQ 来做异步任务,性能 1000个任务/s,同时执行,支持定时任务和循环任务,支持定时器自定义,分布式部署 等等
特点:
shadowX_Server
- Stable: V3.2.4
- Beta: V3.2.6
- Dispatch: V3.2.7
Server端部署
- 不同的机房进行单独部署Server
- 修改配置为当前机房
- 启动Beat 和 Worker
分布式
- 运行run/master
- 运行run/client连接到其他服务端master进行监控和任务分片处理
- 去中心化,动态选将master
任务分片、定时器
- 通过client轮循连接多个master,如master出现异常,client会将master中任务分配到其他master的任务队列中;
- 定时器Beat通过最小堆算法排序任务执行时间序列;
- 异步执行任务、动态导入监控项
项目目录结构
目录简介
config : 项目配置文件 ;
controller:tornado server 控制器类 ;
front:前端页面 ;
master:管理后台 ;
models:tornado models 类 ;
run:包含任务定时器、任务worker类等 ;
server:包含系统调用的服务类 ;
task:任务执行的业务类
系统架构
Tornado Server -> MySQL/Redis -> shadowX Beat -> RabbitMQ -> Worker
Beat定时器实现:
#!/usr/bin/env python # coding=utf-8 from datetime import datetime, timedelta from threading import Thread from task_worker import execute from dbpool import db_config from dbpool import db_instance from task_crontab import crontab_run_nextTime import time import heapq DATE_FORMATE = "%Y-%m-%d %H:%M:%S" def async(f): '''线程异步''' def wrapper(*args, **kwargs): thr = Thread(target=f, args=args, kwargs=kwargs) thr.start() return wrapper def date_seconds(date1, date2): '''将时间差转化为秒''' ti = date2 - date1 return ti.days*24*3600 + ti.seconds def connection_pool(): '''获取连接池实例 ''' pool = db_instance.ConnectionPool(**db_config.mysql) return pool def init_task(): '''初始化任务 ''' with connection_pool().cursor() as cursor: print("init ...") init_time = (datetime.now()).strftime(DATE_FORMATE) sql = "update tasks set last_run_time = '" + init_time + "'" cursor.execute(sql) @async def update_task(task_id, run_time): '''异步更新任务的执行时间 ''' with connection_pool().cursor() as cursor: sql = "update tasks set last_run_time = '" + run_time + "' where id = %d" % task_id cursor.execute(sql) @async def execute_task(taskList): '''最小堆执行任务 ''' now_time = datetime.now() run_time = list() count = 0 for task in taskList: dt = date_seconds(now_time, task['last_run_time']) # 如果得到的结果为0,则执行任务,修改时间为最新的next_run_time if dt == 0: # 执行循环任务 if task['task_type'] == 1: if task['task_status'] == 1: task_id = execute.apply_async(args=[task['task_name']]) #print("execute task: " + task['task_name'] + "___ task_id " + str(task_id)) run_time.append(task['sec']) next_run_time = (datetime.now() + timedelta(seconds=task['sec']+1)).strftime(DATE_FORMATE) update_task(task_id=task['id'], run_time=next_run_time) count += 1 # 执行定时任务 elif task['task_type'] == 2: if task['task_status'] == 1: task_id = execute.apply_async(args=[task['task_name']]) print("execute task: " + task['task_name'] + "___ task_id " + str(task_id)) run_time.append(task['sec']) # 计算下次运行的时间 next_run_time = crontab_run_nextTime(task['crontab'])[0] update_task(task_id=task['id'], run_time=next_run_time) count += 1 elif dt < 0: #linu如果得到的结果为负数,则需要修改时间为最新的next_run_time if task['task_type'] == 1: if task['task_status'] == 1: run_time.append(task['sec']) next_run_time = (datetime.now() + timedelta(seconds=task['sec']+1)).strftime(DATE_FORMATE) update_task(task_id=task['id'], run_time=next_run_time) elif task['task_type'] == 2: next_run_time = crontab_run_nextTime(task['crontab'])[0] update_task(task_id=task['id'], run_time=next_run_time) else: run_time.append(dt) print(count) '''计算最小休眠时间 ''' if run_time.__len__() == 0: return 1 else: min_time = (heapq.nsmallest(1, run_time))[0] if min_time > 60: return 10 if min_time == 0: return 1 else: return min_time if __name__ == "__main__": init_task() while True: '''可以用本地队列对全量获取task做优化''' with connection_pool().cursor() as cursor: res = cursor.execute('select * from tasks') tasks = sorted(cursor, key=lambda x: x['id']) #返回最小休眠时间 min_sleep_time = execute_task(list(tasks)) #主进程休眠 if min_sleep_time is None: print("sleeping ...1 s") time.sleep(1) else: print("sleeping ...%d s" % min_sleep_time) time.sleep(min_sleep_time)
Worker消费者实现:
#!/usr/bin/env python # coding=utf-8 from celery.bin import worker as celery_worker from celery import Celery, platforms from importlib import import_module, reload class TWorker(): def __init__(self): """ Celery 配置信息 """ # rabbitmq 地址 self.BROKER = 'amqp://admin:guest@127.0.0.1:5672/' # redis 地址 self.BACKEND = 'redis://127.0.0.1:6379/0' self.CELERY_ACCEPT_CONTENT = ['json'] self.CELERY_TASK_SERIALIZER = 'json' self.CELERY_RESULT_SERIALIZER = 'json' self.CELERY_ENABLE_UTC = False self.CELERY_TIMEZONE = 'Asia/Shanghai' platforms.C_FORCE_ROOT = True # 创建 celery self.celery = Celery('task_worker', broker=self.BROKER, backend=self.BACKEND) # 定义任务发现 self.celery.conf.CELERY_IMPORTS = ['task', 'task.all_task'] def get_celery(self): return self.celery def dync_load_task(self, import_name): """ 动态任务导入 """ import_name = str(import_name).replace(':', '.') modules = import_name.split('.') mod = import_module(modules[0]) for comp in modules[1:]: if not hasattr(mod, comp): reload(mod) mod = getattr(mod, comp) return mod def worker_start(self): """ 启动 worker """ worker = celery_worker.worker(app=self.celery) worker.run( broker=self.BROKER, concurrency=30, traceback=False, loglevel='INFO', ) """ 实例化Worker """ tw = TWorker() celery = tw.get_celery() """ 任务注册""" @celery.task def execute(func, *args, **kwargs): func = tw.load_task(func) return func(*args, **kwargs)
_instances = {} from .db_connection import MySQLConnectionPool def ConnectionPool(*args, **kwargs): try: pool_name = args[0] except IndexError: pool_name = kwargs['pool_name'] if pool_name not in _instances: _instances[pool_name] = MySQLConnectionPool(*args, **kwargs) pool = _instances[pool_name] assert isinstance(pool, MySQLConnectionPool) return pool
连接池
1000个任务同时执行
github 地址:https://github.com/roancsu/shadowX
欢迎邮件交流:roancsu@163.com