前言
最近在看线程池,就全局搜了之前公司代码,关键字 Executors,一下子就看见了在使用guava的AysncEventBus,在我使用的感觉来就是单个jvm里面的异步执行框架。它并没有每次都让我们显式让我们直接调用线程池的执行任务,而是帮助我们有封装了一层。
简单案例
package org.example.eventbus;/*** @author huskyui*/public class EventBusDemo {public static void main(String[] args) {EventHandler eventHandler = new EventHandler();EventBusRegisterCenter.register(eventHandler);for (int i = 0; i < 10; i++) {EventBusRegisterCenter.post(i + "");}for (int i = 0; i < 5; i++) {EventBusRegisterCenter.post(i);}}}package org.example.eventbus;import com.google.common.eventbus.AsyncEventBus;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Executors;/*** @author huskyui*/@Slf4jpublic class EventBusRegisterCenter {private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());public static void register(Object object) {System.out.println("register" + object);eventBus.register(object);}public static void unregister(Object obj) {eventBus.unregister(obj);}public static void post(Object event) {log.info("post event "+event);eventBus.post(event);}}package org.example.eventbus;import com.google.common.eventbus.DeadEvent;import com.google.common.eventbus.Subscribe;import lombok.extern.slf4j.Slf4j;/*** @author huskyui*/@Slf4jpublic class EventHandler {@Subscribepublic void handlerString(String msg){log.info("处理string类型数据 {}",msg);}@Subscribepublic void handlerDeadEvent(DeadEvent deadEvent){log.info("dead event {}",deadEvent.getEvent());}}
解析
构造函数
private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
主要是将我们自定义的线程池赋值给EventBus成员变量
private final Executor executor;EventBus(String identifier, Executor executor, Dispatcher dispatcher,SubscriberExceptionHandler exceptionHandler) {this.identifier = checkNotNull(identifier);this.executor = checkNotNull(executor);this.dispatcher = checkNotNull(dispatcher);this.exceptionHandler = checkNotNull(exceptionHandler);}
subscribe
EventBusRegisterCenter.register(eventHandler);
通过反射,遍历对应类中方法标注了@Subscribe注解方法,并加入到一个guava实现的map中,key是一个对象,处理特定任务,value是对应方法的相关信息
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =Maps.newConcurrentMap();/*** The event bus this registry belongs to.*/@Weak private final EventBus bus;SubscriberRegistry(EventBus bus) {this.bus = checkNotNull(bus);}/*** Registers all subscriber methods on the given listener object.*/void register(Object listener) {// 当前类 @Subscribe注解的method 列表Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {// method的parameter ,AysncEventBus必须是方法是单个Object,你可以将多个参数封装到一个类里面,然后使用这个封装类Class<?> eventType = entry.getKey();// method相关信息,以便后续调用反射Collection<Subscriber> eventMethodsInListener = entry.getValue();// cow 好像是获取快照CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);if (eventSubscribers == null) {CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();eventSubscribers = MoreObjects.firstNonNull(// 如果为空,新建一个cow set加入map中subscribers.putIfAbsent(eventType, newSet), newSet);}// 不为空,执行在cow set中加入eventSubscribers.addAll(eventMethodsInListener);}}
post
EventBusRegisterCenter.post(i + "");
根据传递给这个参数类型,在subscribers里面找到对应的处理类,遍历执行,可能有多个subscribe同一个类型的
public void post(Object event) {Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);if (eventSubscribers.hasNext()) {// 直接将迭代器传过去dispatcher.dispatch(event, eventSubscribers);} else if (!(event instanceof DeadEvent)) {// the event had no subscribers and was not itself a DeadEventpost(new DeadEvent(this, event));}}@Overridevoid dispatch(Object event, Iterator<Subscriber> subscribers) {checkNotNull(event);// queue的类型是ConcurrentLinkedQueue,主要是保存任务的while (subscribers.hasNext()) {queue.add(new EventWithSubscriber(event, subscribers.next()));}EventWithSubscriber e;while ((e = queue.poll()) != null) {// 开始进入和线程池相关的操作e.subscriber.dispatchEvent(e.event);}}// 这边其实是新建一个Runnable,提交给我们刚开始创建自定义线程池final void dispatchEvent(final Object event) {// 线程池execute 无返回值任务executor.execute(new Runnable() {@Overridepublic void run() {try {// 反射形式调用方法invokeSubscriberMethod(event);} catch (InvocationTargetException e) {bus.handleSubscriberException(e.getCause(), context(event));}}});}/*** Invokes the subscriber method. This method can be overridden to make the invocation* synchronized.*/@VisibleForTestingvoid invokeSubscriberMethod(Object event) throws InvocationTargetException {try {// target是实例,method是反射相关的// 这里也验证了之前说的,只能是一个参数method.invoke(target, checkNotNull(event));} catch (IllegalArgumentException e) {throw new Error("Method rejected target/argument: " + event, e);} catch (IllegalAccessException e) {throw new Error("Method became inaccessible: " + event, e);} catch (InvocationTargetException e) {if (e.getCause() instanceof Error) {throw (Error) e.getCause();}throw e;}}
AsyncEventBus和Spring结合使用
先举个例子
@Componentclass ServiceA{@Autowriedprivate ServiceB serviceB;@Subscribepublic void handle(CouponEvent event) {serviceB.do(event);}}
使用几乎一样。但是其中有一个需要考虑,我们处理方法的时候,应该会调用一些Spring容器里面的SpringBean
这里就会牵扯到SpringBean的生命周期
我们如果直接写 eventBusCenter.register(new ServiceA())我们就会发现,ServiceA里面的变量serviceB没有赋值
我们需要在properties set之后,进行注入
这边牵扯到@PostConstruct注解,这个注解会在bean的成员变量加载后,执行该方法
所以最终的大致样子
@Componentclass ServiceA{@Autowriedprivate ServiceB serviceB;@Subscribepublic void handle(CouponEvent event) {serviceB.do(event);}@PostConstructpublic void register() {AsyncEventBusCenter.register(this);}}public class AsyncEventBusCenter {private final static AsyncEventBus ASYNC_EVENT_BUS = new AsyncEventBus(Executors.newCachedThreadPool());public static void register(Object handler) {ASYNC_EVENT_BUS.register(handler);}}
