1.场景描述
虽然Thread为我们提供了可获取状态, 以及判断是否alive的方法, 但是这些方法均是针对线程本身的, 而我们提交的任务Runnable在运行过程中所处的状态如何是无法直接获得的, 比如它什么时候开始, 什么时候结束, 最不好的一种体验是无法获得Runnable任务执行后的结果。一般情况下想要获得最终结果, 我们不得不为Thread或者Runnable传入共享变量,但是在多线程的情况下,共享变量将导致资源的竞争从而增加了数据不一致性的安全隐患。
2.当观察者模式遇到Thread
当某个对象发生状态改变需要通知第三方的时候,观察者模式就特别适合胜任这样的工作。观察者模式需要有事件源, 也就是引发状态改变的源头, 很明显Thread负责执行任务的逻辑单元,它最清楚整个过程的始末周期,而事件的接收者则是通知接受者一方,严格意义上的观察者模式是需要Observer的集合的, 我们在这里不需要完全遵守这样的规则,只需将执行任务的每一个阶段都通知给观察者即可。
2.1接口定义
1.Observable接口定义
Observable接口定义的代码如所示
public interface Observable {// 任务生命周期的枚举类型enum Cycle{STARTED, RUNNING, DONE, ERROR}// 获取当前任务的生命周期状态Cycle getCycle();// 定义启动线程的方法,主要作用是为了屏蔽Thread的其他方法void start();// 定义线程的打断方法,作用与start方法一样,也是为了屏蔽Thread的其他方法void interrupt();}
该接口主要是暴露给调用者使用的,其中四个枚举类型分别代表了当前任务执行生命周期的各个阶段,具体如下
- getCycle() 方法用于获取当前任务处于哪个执行阶段。
- start() 方法的目的主要是为了屏蔽Thread类其他的API, 可通过Observable的start对线程进行启动。
- interrupt(方法的作用与start一样, 可通过Observable的interrupt对当前线程进行中断。
2.TaskLifecycle接口定义
public interface TaskLifecycle<T> {// 任务启动时会触发onStart方法void onStart(Thread thread);// 任务正在运行时会触发onRunning方法void onRunning(Thread thread);// 任务运行结束时会触发onFinish方法,其中result是任务执行结束后的结果void onFinish(Thread thread, T result);// 任务执行报错时会触发onError方法void onError(Thread thread, Exception e);//生命周期接口的空实现(Adapter)class EmptyLifecycle<T> implements TaskLifecycle<T> {@Overridepublic void onStart(Thread thread) {}@Overridepublic void onRunning(Thread thread) {}@Overridepublic void onFinish(Thread thread, T result) {}@Overridepublic void onError(Thread thread, Exception e) {}}}
3.Task函数接口定义
@FunctionalInterfacepublic interface Task<T> {// 任务执行接口,该接口允许有返回值T call();}
2.2 ObservableThread实现

public class ObservableThread<T> extends Thread implements Observable {private final TaskLifecycle<T> lifecycle;private final Task<T> task;private Cycle cycle;// 指定Task的实现,默认情况下使用EmptyLifecyclepublic ObservableThread(Task<T> task) throws IllegalAccessException {this(new TaskLifecycle.EmptyLifecycle<>(), task);}// 指定TaskLifecycle的同时指定Taskpublic ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) throws IllegalAccessException {super();// Task不允许为nullif ( task == null )throw new IllegalAccessException("The task is required.");this.lifecycle = lifecycle;this.task = task;}@Overridepublic void run() {// 在执行线程逻辑单元的时候,分别触发相应的事件this.update(Cycle.STARTED, null,null);try {this.update(Cycle.RUNNING, null, null);T result = this.task.call();this.update(Cycle.DONE, result, null);}catch (Exception e) {this.update(Cycle.ERROR, null, e);}}private void update(Cycle cycle, T result, Exception e) {this.cycle = cycle;if ( lifecycle == null )return;try {switch (cycle) {case STARTED:this.lifecycle.onError(currentThread());break;case RUNNING:this.lifecycle.onRunning(currentThread());break;case DONE:this.lifecycle.onFinish(currentThread(), result);break;case ERROR:this.lifecycle.onError(currentThread(), e);break;}}catch (Exception ex) {if ( cycle == Cycle.ERROR ) {throw ex;}}}@Overridepublic Cycle getCycle() {return this.cycle;}}
重写父类的run方法, 并且将其修饰为final类型, 不允许子类再次对其进行重写, run方法在线程的运行期间,可监控任务在执行过程中的各个生命周期阶段,任务每经过一个阶段相当于发生了一次事件。
update方法用于通知时间的监听者, 此时任务在执行过程中发生了什么, 最主要的通知是异常的处理。如果监听者也就是Task Lifecycle, 在响应某个事件的过程中出现了意外,则会导致任务的正常执行受到影响,因此需要进行异常捕获,并忽略这些异常信息以保证Task Lifecycle的实现不影响任务的正确执行, 但是如果任务执行过程中出现错误并且抛出了异常, 那么update方法就不能忽略该异常, 需要继续抛出异常, 保持与call方法同样的意图。
2.3 测试用例代码实现
测试代码01:
public class TaskClient {public static void main(String[] args) throws IllegalAccessException {Observable observable = new ObservableThread<>(() -> {try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("finished done.");return null;});observable.start();}}
测试代码02:
import java.util.concurrent.TimeUnit;public class TaskClient1 {public static void main(String[] args) throws IllegalAccessException {final TaskLifecycle<String> lifecycle = new TaskLifecycle.EmptyLifecycle<String>(){@Overridepublic void onFinish(Thread thread, String result) {System.out.println("The result is " + result);}};Observable observable = new ObservableThread<>(lifecycle, () ->{try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(" finished done.");return "Hello Observer";});}}
