我们到底能走多远系列47
扯淡:
又是一年新年时,不知道上一年你付出了多少,收获了多少呢?也许你正想着老板会发多少奖金,也许你正想着明年去哪家公司投靠。
这个时间点好好整理一下,思考总结一下,的确是个非常好的机会。
年终的时候各个公司总会评一下绩效,拉出各位的成绩单,你是不是想说:去你妈的成绩单,我不是你的学生,老子努力工作不是为了看你脸色!当然啦,你想说这话的前提是:你很牛b,如果不是也可以想想,然后默默去变牛b。
我大多数的朋友同事都是漂在城市里的人,我们努力的活得更好,想过自己想过的生活,打心里佩服我们自己,选择这个行业,正尝试改变着世界。
所以,加油,各位!
另外,程序员过什么新年?写bug的时间都不够呢!
最后还是祝看到这个文字的朋友:身体健康,阖家欢乐,鸡年大吉,公司上市。
主题:
一般,开一个定时任务很简单,spring写个注解就能跑了,或者单应用的定时任务还有很多其他丰富jar支持。
任务管理系统用于配置任务的一些信息,任务调度使用这些信息实现对业务系统进行调度,实现定时任务。
public class TaskSupport implements BeanPostProcessor, ApplicationListener<ApplicationContextEvent>,
ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(TaskSupport.class);
private ApplicationContext applicationContext;
private RegistryConfig registryConfig;
private ApplicationConfig applicationConfig;
private ProtocolConfig protocolConfig;
// 存储任务bean
private Map<String, Object> taskBeanMap = new HashMap<String, Object>();
// dubbo config
private ServiceConfig<Object> serviceConfig; @Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
collectTaskBean(bean, beanName);
return bean;
} private Object getTarget(Object bean) {
Object target = bean;
while (target instanceof Advised) {
try {
target = ((Advised) bean).getTargetSource().getTarget();
} catch (Exception e) {
target = null;
break;
}
}
return target;
} private void collectTaskBean(Object bean, String beanName) {
Object target = getTarget(bean);
if (target != null) {
Class<?> clazz = target.getClass();
if (!clazz.isAnnotationPresent(Service.class) || !clazz.isAnnotationPresent(Task.class)) {
return;
}
if (!taskBeanMap.containsKey(beanName)) {
logger.info("add task bean {}", beanName);
taskBeanMap.put(beanName, bean);
}
}
}
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (isCurrentApplicationContextRefresh(event)) {
exportTaskDispatcher();
}
} @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
} /**
* 向Task暴露任务分发器服务
*/
protected void exportTaskDispatcher() {
if (serviceConfig != null && serviceConfig.isExported()) {
return;
}
applicationConfig = applicationContext.getBean(ApplicationConfig.class);
registryConfig = applicationContext.getBean("soaRegistryConfig", RegistryConfig.class);
protocolConfig = applicationContext.getBean(ProtocolConfig.class);
TaskDispatcherImpl taskServiceProxyImpl = wireTaskServiceProxy();
exportServiceConfig(taskServiceProxyImpl);
} protected void unexportTaskDispatcher() {
if (serviceConfig != null && serviceConfig.isExported()) {
serviceConfig.unexport();
}
} private TaskDispatcherImpl wireTaskServiceProxy() {
AutowireCapableBeanFactory beanFactory = applicationContext.getAutowireCapableBeanFactory();
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
TaskDispatcherImpl taskServiceProxy = new TaskDispatcherImpl();
// ITaskDispatcher的实现bean载入到spring bean容器中,等待dubbo接口暴露
taskServiceProxy = (TaskDispatcherImpl) beanFactory.initializeBean(taskServiceProxy, "taskServiceProxy");
// 这里把筛选出的taskBeanMap注入到TaskDispatcherImpl,直接可以使用
taskServiceProxy.setTaskBeanMap(taskBeanMap);
taskServiceProxy.setApplicationConfig(applicationConfig);
taskServiceProxy.setRegistryConfig(registryConfig);
defaultListableBeanFactory.registerSingleton("taskServiceProxy", taskServiceProxy);
return taskServiceProxy;
}
/**
* dubbo接口暴露给任务调度系统
*
*/
private void exportServiceConfig(Object proxy) {
serviceConfig = new ServiceConfig<Object>();
serviceConfig.setApplication(applicationConfig);
serviceConfig.setRegistry(registryConfig);
serviceConfig.setProtocol(protocolConfig);
// 把这个接口暴露出去
serviceConfig.setInterface(ITaskDispatcher.class);
serviceConfig.setRef(proxy);
serviceConfig.setRetries(0);
// 各个业务系统的group不同,这里充分利用了dubbo的属性
serviceConfig.setGroup(applicationConfig.getName());
serviceConfig.export();
} /**
* 是否是当前上下文,防止重复加载和过早加载
*
* @param event
* @return
*/
private boolean isCurrentApplicationContextRefresh(ApplicationEvent event) {
return event instanceof ContextRefreshedEvent
&& ((ContextRefreshedEvent) event).getApplicationContext() == applicationContext;
}
}
这样一来,所有应用都会暴露一个ITaskDispatcher 类的方法出去,但是各个group不一样。ITaskDispatcher定义的方法:
public interface ITaskDispatcher {
public void dispatch(TaskInvokeInfoDto taskInvokeInfoDto);
}
dispatch方法是调度中心调度触发启动任务的方法,根据TaskInvokeInfoDto这个参数里的定义,需要定位到哪一个应用的哪一个类的那一个方法,这个方法的参数是什么,定位到后执行它,这就是dispatch要实现的功能。
先看一下TaskInvokeInfoDto的定义:
private String appName;//定位到哪个应用,dubbo的group区分
private String beanName;//定位到哪个类
private String methodName;// 定位到哪个方法
private String[] parameterTypes;//方法的参数类型,有重载的情况
private String[] args;//参数值
那么dispatch的核心代码:
public void dispatch(TaskInvokeInfoDto taskInvokeInfoDto) {
try {
Method method = findMethod(taskInvokeInfoDto);
Class<?>[] parameterClazzs = method.getParameterTypes();
if (parameterClazzs.length == 0) {
ReflectionUtils.invokeMethod(method, taskBeanMap.get(taskInvokeInfoDto.getBeanName()));
} else {
Object[] parameterObjs = new Object[parameterClazzs.length];
for (int i = 0; i < parameterClazzs.length; i++) {
parameterObjs[i] = Jackson.base().readValue(taskInvokeInfoDto.getArgs()[i], parameterClazzs[i]);
}
ReflectionUtils.invokeMethod(method, taskBeanMap.get(taskInvokeInfoDto.getBeanName()), parameterObjs);
}
} catch (Exception e) {
logger.error("execute error...", e);
}
}
// 上面将的定位逻辑
private Method findMethod(TaskInvokeInfoDto taskInvokeInfoDto) {
Object bean = taskBeanMap.get(taskInvokeInfoDto.getBeanName());
Method method = null;
if (ArrayUtils.isEmpty(taskInvokeInfoDto.getParameterTypes())) {
method = ReflectionUtils.findMethod(bean.getClass(), taskInvokeInfoDto.getMethodName());
} else {
final int paramCount = taskInvokeInfoDto.getParameterTypes().length;
Class<?>[] clazzArray = new Class<?>[paramCount];
for (int i = 0; i < paramCount; i++) {
try {
clazzArray[i] = ClassUtils.getClass(taskInvokeInfoDto.getParameterTypes()[i]);
} catch (ClassNotFoundException e) {
logger.info("根据参数类型的字符串创建class对象时失败", e);
return null;
}
}
method = ReflectionUtils.findMethod(bean.getClass(), taskInvokeInfoDto.getMethodName(), clazzArray);
}
return method;
}
以上只要在调度中心处调用dubbo来控制任务执行就可以实现整个任务中心的核心功能。
当然,这里只是简单的尝试性的实现,还有很多优化和扩展可以做,比如任务日志打印收集,任务应用存活状态心跳监控,等等。
以前看到过一篇去哪网的吹b文章,吹了半天,仔细看了他提到的功能和没实现的功能,搞过的人都会觉得做一个其实不难,只是人家分享的时候感觉很厉害,其实他自己心里清楚自己这个系统也是处处是坑。虽然吹b,不过也会给我们各种启发。
总结:
1,代码中利用spring的BeanPostProcessor,筛选出自己需要的bean的方式又是一种新的技巧,我在《请求路由到业务方法设计(2)》中需要筛选bean map用了另一种方式。不知道网友还有其他的想法吗?
2,反射相关的api还可以继续深入学习。
让我们继续前行
----------------------------------------------------------------------
努力不一定成功,但不努力肯定不会成功。