定时任务管理中心(dubbo+spring)-我们到底能走多远系列47

时间:2023-03-08 17:27:26
定时任务管理中心(dubbo+spring)-我们到底能走多远系列47

我们到底能走多远系列47

扯淡:

  又是一年新年时,不知道上一年你付出了多少,收获了多少呢?也许你正想着老板会发多少奖金,也许你正想着明年去哪家公司投靠。

  这个时间点好好整理一下,思考总结一下,的确是个非常好的机会。

  年终的时候各个公司总会评一下绩效,拉出各位的成绩单,你是不是想说:去你妈的成绩单,我不是你的学生,老子努力工作不是为了看你脸色!当然啦,你想说这话的前提是:你很牛b,如果不是也可以想想,然后默默去变牛b。

  我大多数的朋友同事都是漂在城市里的人,我们努力的活得更好,想过自己想过的生活,打心里佩服我们自己,选择这个行业,正尝试改变着世界。

  所以,加油,各位!

  另外,程序员过什么新年?写bug的时间都不够呢!

  最后还是祝看到这个文字的朋友:身体健康,阖家欢乐,鸡年大吉,公司上市。

主题:

  一般,开一个定时任务很简单,spring写个注解就能跑了,或者单应用的定时任务还有很多其他丰富jar支持。

常规的一个场景:
  一个系统一般都会有很多业务模块组成,这些业务模块被封装成一个个独立部署的应用拆分出去,独立维护。各个业务模块都会有自己定制的定时任务要跑,一般都会依赖自己业务数据和逻辑,很自然的写在鸽子应用中。
那么定时任务管理中心要做的是统一管理这些散落在各个业务模块中的定时任务。
  
统一管理的好处是:
1,全系统定时任务一目了然,便于排查
2,任务执行相关信息统一到一起,比如日志,而任务业务代码开发和任务配置解耦 
3,针对任务功能的开发升级集中到一个应用中了
这里粗略设计一个定时任务管理系统抛砖引玉。
大致划分以下三个部分:
1,任务管理系统
2,任务调度系统
3,业务实现的任务逻辑
定时任务管理中心(dubbo+spring)-我们到底能走多远系列47任务管理系统用于配置任务的一些信息,任务调度使用这些信息实现对业务系统进行调度,实现定时任务。
拆解后各个组件的关系如下:
定时任务管理中心(dubbo+spring)-我们到底能走多远系列47
当然,为了跟好的描述这个系统,以上图是一个简化的设计图。
如何实现呢?
这里提供一个代码的方案,实际开发中结合实际场景和当时技术遗产还有很多的技术方案可以设计,还可以深度挖掘。
首先我们给每个应用提供任务管理中心的jar包,在业务应用启动的时候我们要把任务service收集起来,放入一个map,然后统一提供出一个dubbo接口,用dubbo的group区分各个应用。当任务调度需要调用到这个应用的某个任务service时,再从map中拿出spring bean执行任务方法。
以上功能的jar的核心代码如下:
public class TaskSupport implements BeanPostProcessor, ApplicationListener<ApplicationContextEvent>,
ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(TaskSupport.class);
private ApplicationContext applicationContext;
private RegistryConfig registryConfig;
private ApplicationConfig applicationConfig;
private ProtocolConfig protocolConfig;
// 存储任务bean
private Map<String, Object> taskBeanMap = new HashMap<String, Object>();
// dubbo config
private ServiceConfig<Object> serviceConfig; @Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
collectTaskBean(bean, beanName);
return bean;
} private Object getTarget(Object bean) {
Object target = bean;
while (target instanceof Advised) {
try {
target = ((Advised) bean).getTargetSource().getTarget();
} catch (Exception e) {
target = null;
break;
}
}
return target;
} private void collectTaskBean(Object bean, String beanName) {
Object target = getTarget(bean);
if (target != null) {
Class<?> clazz = target.getClass();
if (!clazz.isAnnotationPresent(Service.class) || !clazz.isAnnotationPresent(Task.class)) {
return;
}
if (!taskBeanMap.containsKey(beanName)) {
logger.info("add task bean {}", beanName);
taskBeanMap.put(beanName, bean);
}
}
}
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (isCurrentApplicationContextRefresh(event)) {
exportTaskDispatcher();
}
} @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
} /**
* 向Task暴露任务分发器服务
*/
protected void exportTaskDispatcher() {
if (serviceConfig != null && serviceConfig.isExported()) {
return;
}
applicationConfig = applicationContext.getBean(ApplicationConfig.class);
registryConfig = applicationContext.getBean("soaRegistryConfig", RegistryConfig.class);
protocolConfig = applicationContext.getBean(ProtocolConfig.class);
TaskDispatcherImpl taskServiceProxyImpl = wireTaskServiceProxy();
exportServiceConfig(taskServiceProxyImpl);
} protected void unexportTaskDispatcher() {
if (serviceConfig != null && serviceConfig.isExported()) {
serviceConfig.unexport();
}
} private TaskDispatcherImpl wireTaskServiceProxy() {
AutowireCapableBeanFactory beanFactory = applicationContext.getAutowireCapableBeanFactory();
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
TaskDispatcherImpl taskServiceProxy = new TaskDispatcherImpl();
// ITaskDispatcher的实现bean载入到spring bean容器中,等待dubbo接口暴露
taskServiceProxy = (TaskDispatcherImpl) beanFactory.initializeBean(taskServiceProxy, "taskServiceProxy");
// 这里把筛选出的taskBeanMap注入到TaskDispatcherImpl,直接可以使用
taskServiceProxy.setTaskBeanMap(taskBeanMap);
taskServiceProxy.setApplicationConfig(applicationConfig);
taskServiceProxy.setRegistryConfig(registryConfig);
defaultListableBeanFactory.registerSingleton("taskServiceProxy", taskServiceProxy);
return taskServiceProxy;
}
/**
* dubbo接口暴露给任务调度系统
*
*/
private void exportServiceConfig(Object proxy) {
serviceConfig = new ServiceConfig<Object>();
serviceConfig.setApplication(applicationConfig);
serviceConfig.setRegistry(registryConfig);
serviceConfig.setProtocol(protocolConfig);
// 把这个接口暴露出去
serviceConfig.setInterface(ITaskDispatcher.class);
serviceConfig.setRef(proxy);
serviceConfig.setRetries(0);
// 各个业务系统的group不同,这里充分利用了dubbo的属性
serviceConfig.setGroup(applicationConfig.getName());
serviceConfig.export();
} /**
* 是否是当前上下文,防止重复加载和过早加载
*
* @param event
* @return
*/
private boolean isCurrentApplicationContextRefresh(ApplicationEvent event) {
return event instanceof ContextRefreshedEvent
&& ((ContextRefreshedEvent) event).getApplicationContext() == applicationContext;
}
}

这样一来,所有应用都会暴露一个ITaskDispatcher 类的方法出去,但是各个group不一样。ITaskDispatcher定义的方法:

public interface ITaskDispatcher {
public void dispatch(TaskInvokeInfoDto taskInvokeInfoDto);
}

dispatch方法是调度中心调度触发启动任务的方法,根据TaskInvokeInfoDto这个参数里的定义,需要定位到哪一个应用的哪一个类的那一个方法,这个方法的参数是什么,定位到后执行它,这就是dispatch要实现的功能。

先看一下TaskInvokeInfoDto的定义:

private String appName;//定位到哪个应用,dubbo的group区分
private String beanName;//定位到哪个类
private String methodName;// 定位到哪个方法
private String[] parameterTypes;//方法的参数类型,有重载的情况
private String[] args;//参数值

那么dispatch的核心代码:

public void dispatch(TaskInvokeInfoDto taskInvokeInfoDto) {
try {
Method method = findMethod(taskInvokeInfoDto);
Class<?>[] parameterClazzs = method.getParameterTypes();
if (parameterClazzs.length == 0) {
ReflectionUtils.invokeMethod(method, taskBeanMap.get(taskInvokeInfoDto.getBeanName()));
} else {
Object[] parameterObjs = new Object[parameterClazzs.length];
for (int i = 0; i < parameterClazzs.length; i++) {
parameterObjs[i] = Jackson.base().readValue(taskInvokeInfoDto.getArgs()[i], parameterClazzs[i]);
}
ReflectionUtils.invokeMethod(method, taskBeanMap.get(taskInvokeInfoDto.getBeanName()), parameterObjs);
}
} catch (Exception e) {
logger.error("execute error...", e);
}
}
// 上面将的定位逻辑
private Method findMethod(TaskInvokeInfoDto taskInvokeInfoDto) {
Object bean = taskBeanMap.get(taskInvokeInfoDto.getBeanName());
Method method = null;
if (ArrayUtils.isEmpty(taskInvokeInfoDto.getParameterTypes())) {
method = ReflectionUtils.findMethod(bean.getClass(), taskInvokeInfoDto.getMethodName());
} else {
final int paramCount = taskInvokeInfoDto.getParameterTypes().length;
Class<?>[] clazzArray = new Class<?>[paramCount];
for (int i = 0; i < paramCount; i++) {
try {
clazzArray[i] = ClassUtils.getClass(taskInvokeInfoDto.getParameterTypes()[i]);
} catch (ClassNotFoundException e) {
logger.info("根据参数类型的字符串创建class对象时失败", e);
return null;
}
}
method = ReflectionUtils.findMethod(bean.getClass(), taskInvokeInfoDto.getMethodName(), clazzArray);
}
return method;
}

以上只要在调度中心处调用dubbo来控制任务执行就可以实现整个任务中心的核心功能。

当然,这里只是简单的尝试性的实现,还有很多优化和扩展可以做,比如任务日志打印收集,任务应用存活状态心跳监控,等等。

以前看到过一篇去哪网的吹b文章,吹了半天,仔细看了他提到的功能和没实现的功能,搞过的人都会觉得做一个其实不难,只是人家分享的时候感觉很厉害,其实他自己心里清楚自己这个系统也是处处是坑。虽然吹b,不过也会给我们各种启发。

总结:

1,代码中利用spring的BeanPostProcessor,筛选出自己需要的bean的方式又是一种新的技巧,我在《请求路由到业务方法设计(2)》中需要筛选bean map用了另一种方式。不知道网友还有其他的想法吗?

2,反射相关的api还可以继续深入学习。

让我们继续前行

----------------------------------------------------------------------

努力不一定成功,但不努力肯定不会成功。