1 Event-Driven Architecture基础
EDA(Event-Driven Architecture) 是一种实现组件之间松耦合、易扩展的架构方式, 在本节中, 我们先介绍EDA的基础组件, 让读者对EDA设计架构方式有一个基本的认识,一个最简单的EDA设计需要包含如下几个组件。
- Events:需要被处理的数据。
- Event Handlers:处理Events的方式方法。
- Event Loop:维护Events和Event Handlers之间的交互流程。
如图所示, EventA将被HandlerA处理, 而EventB将被HandlerB处理,这一切的分配都是由EventLoop所控制的。
1.1 Events
Events是EDA中的重要角色, 一个Event至少需要包含两个属性:类型和数据, Event的类型决定了它会被哪个Handler处理, 数据是在Handler中代加工的材料, 下面写一个简单的程序,代码如所示。
/*Event 只包含了该Event所属的类型和所包含的数据*/public class Event {private final String type;private final String data;public Event(String type, String data) {this.type = type;this.data = data;}public String getType() {return type;}public String getData() {return data;}}
1.2 Event Handlers
EventHandlers主要用于处理Event,比如一些filtering或者transforming数据的操作等,下面我们写两个比较简单的方法,代码如下:
// 用于处理A类型的Eventpublic static void handleEventA(Event e) {System.out.println(e.getData().toLowerCase());}
// 用于处理B类型的Eventpublic static void hanleEventB(Event e) {System.out.println(e.getData().toUpperCase());}
1.3 Event Loop
Event Loop处理接收到的所有Event, 并且将它们分配给合适的Handler去处理, 代码如下:
Event e;while( !events.isEmpty()) {// 从消息队列中不断移除,根据不同的类型进行处理e = events.remove();switch (e.getType()) {case "A":handleEventA(e);break;case "B":hanleEventB(e);break;}}
完整代码:
import java.util.LinkedList;import java.util.Queue;public class FooEventDrivenExample {// 用于处理A类型的Eventpublic static void handleEventA(Event e) {System.out.println(e.getData().toLowerCase());}// 用于处理B类型的Eventpublic static void hanleEventB(Event e) {System.out.println(e.getData().toUpperCase());}public static void main(String[] args) {Queue<Event> events = new LinkedList<>();events.add(new Event("A", "Hello"));events.add(new Event("B", "I am Event B"));events.add(new Event("A", "I am Event A"));Event e;while( !events.isEmpty()) {// 从消息队列中不断移除,根据不同的类型进行处理e = events.remove();switch (e.getType()) {case "A":handleEventA(e);break;case "B":hanleEventB(e);break;}}}}
2 开发一个Event-Driven框架
通过1节的基础知识介绍,我们大致可以知道,一个基于事件驱动的架构设计,总体来讲会涉及如下几个重要组件:事件消息(Event) 、针对该事件的具体处理器(Handler) 、接受事件消息的通道(29.1.3节中的queue) , 以及对事件消息如何进行分配(Event Loop) 。
2.1 同步EDA框架设计
(1) Message
回顾1节基础部分的介绍, 在基于Message的系统中, 每一个Event也可以被称为Message, Message是对Event更高一个层级的抽象, 每一个Message都有一个特定的Type用于与对应的Handler做关联, 3是Message接口的定义。
public interface Message {/*返回Message的类型*/Class<? extends Message> getType();}
(2) Channel
第二个比较重要的概念就是Channels,Channel主要用于接受来自EventLoop分配的消息,每一个Channel负责处理一种类型的消息(当然这取决于你对消息如何进行分配) ,4是Channel接口的定义:
public interface Channel <E extends Message>{/*** dispatch方法用于负责Message的调度*/void dispatch(E message);}
(3) Dynamic Router
Router的作用类似于1节中的EventLoop, 其主要是帮助Event找到合适的Channel并且传送给它, Dynamic Routers代码定义如5所示。
public interface DynamicRouter<E extends Message> {/*** 针对每一种Message类型注册相关的Channel,只有找到合适的channel该Message才会被处理* @param messageType* @param channel*/void registerChannel(Class<? extends E> messageType, Channel<? extends E> channel);/*** 为相应的Channel分配Message* @param message*/void dispatch(E message);}
Router如何知道要将Message分配给哪个Channel呢?换句话说,Router需要了解到Channel的存在,因此registerChannel() 方法的作用就是将相应的Channel注册给Router,dispatch方法则是根据Message的类型进行路由匹配。
(4) Event
Event是对Message的一个最简单的实现, 在以后的使用中, 将Event直接作为其他Message的基类即可(这种做法有点类似于适配器模式) , Event接口的定义如6所示。
public class Event implements Message{@Overridepublic Class<? extends Message> getType() {return getClass();}}
(5) EventDispatcher
EventDispatcher是对DynamicRouter的一个最基本的实现,适合在单线程的情况下进行使用,因此不需要考虑线程安全的问题。EventDispatcher接口的定义如7所示。
import java.util.HashMap;import java.util.Map;/*EventDispatcher不是一个线程安全的类*/public class EventDispatcher implements DynamicRouter<Message> {// 用于保存Channel和Message之间的关系private final Map<Class<? extends Message>, Channel> routerTable;public EventDispatcher(){// 初始化RouteTable,但是在该实现中,我们使用HashMap作为路由表this.routerTable = new HashMap<>();}@Overridepublic void registerChannel(Class<? extends Message> messageType, Channel<? extends Message> channel) {this.routerTable.put(messageType, channel);}@Overridepublic void dispatch(Message message) {if ( routerTable.containsKey(message.getType())) {// 直接获取对应的Channel处理的MessagerouterTable.get(message.getType()).dispatch(message);} else {throw new MessageMatcherException("Can't match the channel for ");}}}
在EventDispatcher中有一个注册表routerTable, 主要用于存放不同类型Message对应的Channel, 如果没有与Message相对应的Channel, 则会抛出无法匹配的异常, 示例代码如8所示。
public class MessageMatcherException extends RuntimeException{public MessageMatcherException(String message) {super(message);}}
简单的测试用例
public class EventDispatcherExample {/*** InputEvent中定义了两个属性X和Y,主要用于在其他Channel中的运算*/static class InputEvent extends Event {private final int x;private final int y;InputEvent(int x, int y) {this.x = x;this.y = y;}public int getX() {return x;}public int getY() {return y;}}/*** 用于存放结果的Event*/static class ResultEvent extends Event {private final int result;public ResultEvent(int result) {this.result = result;}public int getResult() {return result;}}/*** 处理ResultEvent的Handler(Channel),只是简单地将计算结果输出到控制台*/static class ResultEventHandler implements Channel<ResultEvent> {@Overridepublic void dispatch(ResultEvent message) {System.out.println("The reuslt is :" + message.getResult());}}/*** InputEventHandler需要向Router发送Event,因此在构造的时候需要传入Dispatcher*/static class InputEventHandler implements Channel<InputEvent> {private final EventDispatcher dispatcher;InputEventHandler(EventDispatcher dispatcher) {this.dispatcher = dispatcher;}/*** 将计算的结果构造成新的Event提交给Router* @param message*/@Overridepublic void dispatch(InputEvent message) {System.out.println("test");int result = message.getX() + message.getY();dispatcher.dispatch(new ResultEvent(result));}}public static void main(String[] args) {// 构造RouterEventDispatcher dispatcher = new EventDispatcher();// 将Event 和 Handler(Channel)的绑定关系注册到Dispatcherdispatcher.registerChannel(InputEvent.class, new InputEventHandler(dispatcher));dispatcher.registerChannel(ResultEvent.class, new ResultEventHandler());dispatcher.dispatch(new InputEvent(1,2));}}
由于所有的类都存放于一个文件中,因此看起来测试代码比较多,其实结构还是非常清晰的, InputEvent是一个Message,它包含了两个Int类型的属性,而InputEventHandler是对InputEvent消息的处理,接收到了InputEvent消息之后,分别对X和Y进行相加操作,然后将结果封装成ResultEvent提交给EventDispatcher,ResultEvent相对比较简单,只包含了计算结果的属性,ResultEventHandler则将计算结果输出到控制台上。
通过上面这个例子的运行你会发现,不同数据的处理过程之间根本无须知道彼此的存在, 一切都由Event Dispatcher这个Router来控制, 它会给你想要的一切, 这是一种稀疏耦合(松耦合) 的设计。图29-2所示的为同步EDA架构类图。

2.2 异步EDA框架设计
在2.1节中,我们实现了一个基本的EDA框架,但是这个框架在应对高并发的情况下还是存在一些问题的,具体如下。
- Event Dispatcher不是线程安全的类, 在多线程的情况下,registerChannel方法会引起数据不一致的问题。
- 就目前言,我们实现的所有Channel都无法并发消费Message,比如 InputEventHandler只能逐个处理Message,低延迟的消息处理还会导致Dispatcher出现积压。
在本节中, 我们将对2.1节中的EDA框架进行扩充,使其可支持并发任务的执行,下面定义了一个新的AsyncChannel作为基类, 该类中提供了Message的并发处理能力, 代码如10所示。
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public abstract class AsyncChannel implements Channel<Event>{// 在AsyncChannel 中将使用ExecutorServcie多线程的方式提交给Messageprivate final ExecutorService executorService;// 默认构造函数,提交了CPU的核数*2的线程数量public AsyncChannel() {this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));}// 用户自定义的ExecutorServicepublic AsyncChannel(ExecutorService executorService) {this.executorService = executorService;}// 重写dispatch方法,并且用final修饰,避免子类重写@Overridepublic void dispatch(Event message) {executorService.submit(() -> this.handle(message));}// 提供抽象方法,供子类实现具体的Message处理protected abstract void handle(Event message);// 提供关闭ExecutorService的方法void stop() {if ( null != executorService && !executorService.isShutdown())executorService.shutdown();}}
其次,还需要提供新的EventDispatcher类AsyncEventDispatcher负责以并发的方式dispatchMessage, 其中Event对应的Channel只能是Async Channel类型,并且也对外暴露了 shutdown方法, 代码如所示。
import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public class AsyncEventDispatcher implements DynamicRouter<Event>{private final Map<Class<? extends Event>, AsyncChannel> routerTable;public AsyncEventDispatcher() {this.routerTable = new ConcurrentHashMap<>();}@Overridepublic void registerChannel(Class<? extends Event> messageType, Channel<? extends Event> channel) {// 在AsyncEventDispatcher中,channel必须是AsyncChannel类型if ( !(channel instanceof AsyncChannel )) {throw new IllegalArgumentException("The channel must be ");}this.routerTable.put(messageType, (AsyncChannel) channel);}@Overridepublic void dispatch(Event message) {if ( routerTable.containsKey(message.getType())) {routerTable.get(message.getType()).dispatch(message);} else {throw new MessageMatcherException("");}}public void shutdown() {// 关闭所有的Channel以释放资源routerTable.values().forEach(AsyncChannel::stop);}}
在AsyncEventDispatcher中,routerTable使用线程安全的Map定义,在注册Channel的时候,如果其不是AsyncChannel的类型,则会抛出异常。
异步EDA架构类图
import java.util.concurrent.TimeUnit;public class AsyncEventDispatcherExample {// 主要用于处理InputEvent, 但是需要继承AsyncChannelstatic class AsyncInputEventHandler extends AsyncChannel {private final AsyncEventDispatcher dispatcher;AsyncInputEventHandler(AsyncEventDispatcher dispatcher) {this.dispatcher = dispatcher;}// 不同于以同步的方式实现dispatch,异步的方式需要实现handle@Overrideprotected void handle(Event message) {EventDispatcherExample.InputEvent inputEvent =(EventDispatcherExample.InputEvent)message;System.out.println("test");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}int result = inputEvent.getX() + inputEvent.getY();dispatcher.dispatch(new EventDispatcherExample.ResultEvent(result));}}// 主要用于处理InputEvent,但是需要继承AsyncChannelstatic class AsyncResultEventHandler extends AsyncChannel {@Overrideprotected void handle(Event message) {EventDispatcherExample.ResultEvent resultEvent =(EventDispatcherExample.ResultEvent) message;try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("the result is :" + resultEvent.getResult());}}public static void main(String[] args) {// 定义AsyncEventDispatcherAsyncEventDispatcher dispatcher = new AsyncEventDispatcher();// 注册dispatcher.registerChannel(EventDispatcherExample.InputEvent.class, new AsyncInputEventHandler(dispatcher));dispatcher.registerChannel(EventDispatcherExample.ResultEvent.class, new AsyncResultEventHandler());// 提交需要处理的Messagedispatcher.dispatch(new EventDispatcherExample.InputEvent(1,2));}}
当dispatcher分配一个Event的时候,如果执行非常缓慢也不会影响下一个Event被dispatch,这主要得益于我们采用了异步的处理方式(ExecutorService本身存在的任务队列可以允许异步提交一定数量级的数据)。
3 Event-Driven的使用
在本节中,我们模拟一个简单的聊天应用程序,借助于我们在2节开发的EDA小框架,首先我们要为聊天应用程序定义如下几个类型的Event。
- User Online Event: 当用户上线时来到聊天室的Event。
- User Offline Event: 当用户下线时退出聊天室的Event。
- User Chat Event:用户在聊天室中发送聊天信息的Event。
3.1 Chat Event
首先, 我们定义一个User对象, 代表聊天室的参与者, 比较简单就是一个名字, 代码如下:
public class User {private final String name;public User(String name) {this.name = name;}public String getName() {return name;}}
下面定义一个UserOnlineEvent,代表用户上线的Event,代码如下:
public class UserOnlineEvent extends Event{private final User user;public UserOnlineEvent(User user) {this.user = user;}public User getUser() {return user;}}
下面定义一个UserOfflineEvent,代表用户下线的Event,代码如下:
public class UserOfflineEvent extends UserOnlineEvent{public UserOfflineEvent(User user) {super(user);}}
下面定义一个UserChatEvent,代表用户发送了聊天信息的Event,代码如下:
public class UserChatEvent extends UserOnlineEvent{// ChatEvent需要有聊天的信息private final String message;public UserChatEvent(User user, String message) {super(user);this.message = message;}public String getMessage() {return message;}}
UserChatEvent比其他两个Event多了代表聊天内容的message属性。
3.2 Chat Channel(Handler)
所有的Handler都非常简单, 只是将接收到的信息输出到控制台,由于是在多线程的环境下运行, 因此我们需要继承AsyncChannel。
下面定义一个UserOnlineEventChannel, 主要用于处理UserOnlineEvent事件, 代码如下:
// 用户上线的Event,简单输出用户上线即可public class UserOnlineEventChannel extends AsyncChannel{@Overrideprotected void handle(Event message) {UserOnlineEvent event = (UserOnlineEvent) message;System.out.println("The User ");}}
UserOfflineEventChannel代码:
public class UserOfflineEventChannel extends AsyncChannel{@Overrideprotected void handle(Event message) {UserOfflineEvent event = (UserOfflineEvent) message;System.out.println("offline");}}
UserChatEventChannel:
public class UserChatEventChannel extends AsyncChannel{@Overrideprotected void handle(Event message) {UserChatEvent event = (UserChatEvent) message;System.out.println(event.getMessage());}}
3.3 Chat User线程
我们定义完Event和接受Event的Channel后,现在定义一个代表聊天室参与者的User线程,代码如下:
import java.util.concurrent.TimeUnit;import static java.util.concurrent.ThreadLocalRandom.current;public class UserChatThread extends Thread{private final User user;private final AsyncEventDispatcher dispatcher;public UserChatThread(User user, AsyncEventDispatcher dispatcher) {super(user.getName());this.user = user;this.dispatcher = dispatcher;}@Overridepublic void run() {try {// User上线,发送Online Eventdispatcher.dispatch(new UserOnlineEvent(user));for(int i = 0; i < 5; i++) {// 发送User的聊天信息dispatcher.dispatch(new UserChatEvent(user, getName() + "-hello-" + i));TimeUnit.SECONDS.sleep(current().nextInt(10));}} catch (InterruptedException e) {e.printStackTrace();} finally {// User下线,发送Offline Eventdispatcher.dispatch(new UserOfflineEvent(user));}}}
当User线程启动的时候,首先发送OnlineEvent,然后发送五条聊天信息,之后下线,在下线的时候发送OfflineEvent,下面写一个简单的程序测试一下:
public class UserChatApplication {public static void main(String[] args) {// 定义异步的Routerfinal AsyncEventDispatcher dispatcher = new AsyncEventDispatcher();// 为Router注册Channel和Event之间的关系dispatcher.registerChannel(UserOnlineEvent.class, new UserOnlineEventChannel());dispatcher.registerChannel(UserOfflineEvent.class, new UserOfflineEventChannel());dispatcher.registerChannel(UserChatEvent.class, new UserChatEventChannel());// 启动三个登录聊天室的Usernew UserChatThread(new User("Leo"), dispatcher).start();new UserChatThread(new User("Alex"), dispatcher).start();new UserChatThread(new User("Tina"), dispatcher).start();}}
