发送消息有4种渠道
1. io
2. bus
3. stage
4. public
前面已经讲过了关于EasyManager的初始化。
public class EasyManager implements ApplicationContextAware {
//实现ApplicationContextAware,能够得到applicationContext
private String scanPackage;
private ApplicationContext applicationContext;
private Map<String, EasyResolver> resolvers = new HashMap<>();
public void init() {
//开始扫描
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);
provider.addIncludeFilter(new AnnotationTypeFilter(EasyWorker.class));
Set<BeanDefinition> candidates = provider.findCandidateComponents(scanPackage);
for (BeanDefinition candidate : candidates) {
try {
String className = candidate.getBeanClassName();
Class<?> cls = Class.forName(className);
EasyWorker easyWorker = cls.getAnnotation(EasyWorker.class);
if (null != easyWorker) {
try {
Object target = load ? applicationContext.getBean(cls) : null;
Method[] methods = cls.getDeclaredMethods();
for (Method m : methods) {
EasyMapping commandMapping = m.getAnnotation(EasyMapping.class);
if (null != commandMapping) {
resolvers.put(commandMapping.mapping(), new EasyResolver(easyWorker.group(), easyWorker.module(), commandMapping.mapping(), m, target));
cmdList.add(commandMapping.mapping());
}
}
} catch (Exception e) {
throw new ServiceException("error in analyzeClass", e);
}
}
} catch (ClassNotFoundException e) {
}
}
}
//执行的
public void execute(String command, Message message) {
long start = System.nanoTime();
EasyResolver resolver = getResolver(command);
if (null != resolver) {
resolver.execute(message); //method.invoke(target,msg);
}
}
public EasyResolver getResolver(String command) {
return resolvers.get(command);
}
public List<String> getCmdList() {
return cmdList;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
## 先来讲解下stage的消息处理 ##
## BalanceBusinessExector的调用流程图##
下面来重点看下BalanceBusinessExector
在使用分组执行器之前,会先创建一个RouteInfo(group,info) group只要为了寻找到分组执行器,而info主要是为了寻找到Executor。
- BalanceBusinessExector
根据配置创建几组分组执行器
Map<String, ExecutorPoolGroup> groups = new HashMap<>();
public BalanceBusinessExector(long time, Map<String, Integer> groupConfigMap) {
for(String key : groupConfigMap.keyValue()) {
groups.put(key, new ExecutorPoolGroup(groupConfigMap.get(size));
}
}
public void execute(IRunnable runnable, RouteInfo routeInfo) {
groups.get(routeInfo.getGroup()).execute(routeInfo.getInfo(), runnable);
}
- ExecutorPoolGroup
根据配置先创建几个执行器,利用guava cache来,根据key来寻找相对应的Executor( ThreadLocalRandom.current() 这个是ThreadLoacl,每个线程一个变量,所以不会引起并发。随机给出一个Executor)
LoadingCache<String, Executor> routeCacheMap ;
Executor[] executors;
public ExecutorPoolGroup(int size) {
executors = new Executor[size];
this.routeCacheMap = CacheBuilder.newBuilder()
.expireAfterAccess(clean_gap, TimeUnit.MILLISECONDS)
.build(new CacheLoader<String, Executor>() {
public Executor load(String key) throws Exception {
int threadIndex = ThreadLocalRandom.current().nextInt(executors.length);
return executors[threadIndex];
});
}
public void execute(String info, IRunnable runnable) {
this.routeCacheMap.getUnchecked(info).execute(runnable);
}
-
Executor
将要执行的IRunnable放入到队列中,并有个线程会触发.
private final IMsgQueue<IRunnable> queue;
private Executor(final String name) {
this.name = name;
this.queue = MsgQueueFactory.getInstance().createQueue(name).start();
}
public void execute(IRunnable command) {
queue.add(command);
}
总结: 初始化时: 先创建几组分组执行器,在执行器中,再创建几个执行器
调用时:eg:发送到场景中,先根据key(stage)来找出分组执行器。再根据stageId来从缓存中找出相应的Executor,然后将IRunnable放入到队列中. 这个就叫做负载均衡
使用这个可以比较平均的找到每个Executor。
下面看着2种队列
- MsgBlockingQueue
他其实就用到BlockingQueue队列,他是一种先进先出,当大小为0时,线程一直在等待,一直到有数据时,线程才唤醒.
主要是启动一个线程,从队列中取出,然后利用java反射调用action.
private final BlockingQueue<IRunnable> queue = new LinkedBlockingQueue<>();
//加入到队列中
public boolean add(IRunnable e) {
return queue.add(e);
}
// 启动
public void start(final String name) {
Thread t = new Thread(name) {
public void run() {
while (true) {
IRunnable task = queue.take();
task.run(); //method.invoke(target,msg);
}
}
};
t.start();
}
-
MsgDisruptorQueue
使用线程池的话,能不有效地来回使用资源。当线程池中没有线程时,会再重新创建一个线程http://ifeve.com/disruptor/ 这个是介绍disruptor ,这个不使用锁,处理并发,效率很快的。
http://ifeve.com/sharing-data-among-threads-without-contention/
private Disruptor<DisruptorEvent> disruptor;
private RingBuffer<DisruptorEvent> ringBuffer;
private RunnableEventHandler runnableEventHandler = new RunnableEventHandler();
private ExecutorService executor = Executors.newCachedThreadPool(new GameThreadFactory(name));
private final int BUFFER_SIZE = Util.ceilingNextPowerOfTwo(1024*64);
@Override
public boolean add(final IRunnable e) {
long remainingCapacity = ringBuffer.remainingCapacity();
if( remainingCapacity < 10){
execute(e);
return true;
}
//放到指针上,并且发布
long next = ringBuffer.next();
try{
DisruptorEvent runnable = ringBuffer.get(next);
runnable.copy(e);
}finally{
ringBuffer.publish(next);
}
return true;
}
//直接调用
private void execute(IRunnable e) {
executor.submit(new Runnable() {
public void run() {
DisruptorEvent runnable = new DisruptorEvent(name);
runnable.copy(e);
runnableEventHandler.onEvent(runnable, 1, true);
}
});
}
public void start(String name) {
disruptor = new Disruptor<>(new RunnableEventFactory(name), BUFFER_SIZE, executor);
disruptor.handleEventsWith(runnableEventHandler);
ringBuffer = disruptor.start();
}