探秘携程
以一个新手的视觉探秘携程,慢慢透析携程的使用、深入
第一个携程
最简单的例子
fun main() {GlobalScope.launch {printMessage("第一个携程")}printMessage("主线程执行,代码在GlobalScope.launch之后")}
(当前运行线程:main) —> 主线程执行,代码在GlobalScope.launch之后
结论:
GlobalScope.launch不会阻塞主线程的执行GlobalScope.launch内部的代码并没有被执行,是因为主线程结束了,所以GlobalScope是类似主线程的守护线程
改造代码,让主线程不销毁
GlobalScope.launch {printMessage("第一个携程")}printMessage("主线程执行,代码在GlobalScope.launch之后")Thread.sleep(10)
�(当前运行线程:main) —> 主线程执行,代码在GlobalScope.launch之后 (当前运行线程:DefaultDispatcher-worker-1) —> 第一个携程
结论:
- 主线程等待10mm后,携程可以执行完毕,并打印
- 携程的默认执行线程不是主线程,而是
DefaultDispatcher-worker-1
携程运行准备时间
改为主线程只sleep 1mm
GlobalScope.launch {printMessage("第一个携程")}printMessage("主线程执行,代码在GlobalScope.launch之后")Thread.sleep(1)
(当前运行线程:main; 1625545254752) —> 主线程执行,代码在GlobalScope.launch之后
结论:
- 携程开始执行是需要一段时间的,1mm不足以携程完成初始化
- 携程有资源开销
Dispatchers 调度器
Dispatcher决定了当前携程执行所在的线程,也就是携程的上下文CoroutineContext,调度器层级实现了CoroutineContext接口
调度器系统提供的有4种
- Default: 默认实现,就是上面代码
GlobalScope.launch等的默认参数调度器,最大为核心数 - Main: 在有界面概念的空间才有的,如Android的main线程,不引入Android包报错
- Unconfined:不限制线程,在那个线程调用代码,就在哪个线程执行携程(循环嵌套不保证执行顺序,只能保证最外层执行顺序)
- IO: 阻塞线程,可以创建64或者更大的线程数,与Default共享线程池
第一个Dispatcher
fun m4(){GlobalScope.launch(Dispatchers.Unconfined) {printMessage("Dispatchers.Unconfined 携程")}printMessage("主线程执行,代码在GlobalScope.launch之后")}
�(当前运行线程:main; 1625554888325) —> Dispatchers.Unconfined 携程 距离上一次打印间隔为 3 (当前运行线程:main; 1625554888328) —> 主线程执行,代码在GlobalScope.launch之后
结论:
Dispatchers.Unconfined是在当前线程调用了携程空间,会阻塞当前线程。(思考:coroutine实质上就是线程的一部分,只是增加了调度器的概念,不让线程空闲下来;后续的挂起coroutine的线程,恢复coroutine的线程可能不是同一个,由Dispatcher来决定)- CoroutineContext如果和非携程空间是同一个线程,那么代码是顺序执行的(就是说携程空间依赖线程)
Dispatcher内部使用Dispatcher
fun m5(){GlobalScope.launch(Dispatchers.Default) {printMessage("Dispatchers.Default 携程")delay(100)GlobalScope.launch (Dispatchers.Unconfined){printMessage("Dispatchers.Unconfined 携程,内部")}}Thread.sleep(20)printMessage("主线程执行,代码在GlobalScope.launch之后")Thread.sleep(2000)}
(当前运行线程:DefaultDispatcher-worker-1; 1625556460046) —> Dispatchers.Default 携程 距离上一次打印间隔为 16 (当前运行线程:main; 1625556460062) —> 主线程执行,代码在GlobalScope.launch之后 距离上一次打印间隔为 100 (当前运行线程:DefaultDispatcher-worker-1; 1625556460162) —> Dispatchers.Unconfined 携程,内部
结论:
Dispatchers.Unconfined运行的CoroutineContext是由开启这个空间的线程决定的,两次打印的线程名称相同- 调度器Dispatcher决定执行的线程,这也是携程的核心,调度器(携程空间、上下文)控制一切
IO、Default共享线程池
fun m6(){GlobalScope.launch(Dispatchers.IO) {printMessage("携程第1部分被触发")}printMessage("主线程执行,代码在携程第一部分之后")Thread.sleep(10)GlobalScope.launch {printMessage("携程第2部分被触发")}Thread.sleep(1000)}
�(当前运行线程:main; 1625557585475) —> 主线程执行,代码在携程第一部分之后距离上一次打印间隔为 3 (当前运行线程:DefaultDispatcher-worker-1; 1625557585478) —> 携程第1部分被触发 距离上一次打印间隔为 11 (当前运行线程:DefaultDispatcher-worker-1; 1625557585489) —> 携程第2部分被触发
结论:
- 2次打印线程名称相同,确实共享线程池
Coroutine启动方式
public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit): Job
第二个参数 start即为启动方式,为什么会有启动方式呢,Java中线程只有手动调用start的方式启动
| DEFAULT | 立即执行协程体 |
|---|---|
| ATOMIC | 立即执行协程体,但在开始运行之前无法取消 |
| UNDISPATCHED | 立即在当前线程执行协程体,直到第一个 suspend 调用 |
| LAZY | 只有在需要的情况下运行 |
一个非常详细的博客:https://www.jianshu.com/p/6cf528f423f6
UNDISPATCHED 到底做了什么
这里涉及到一个自定义的拦截器,这样才能看得清拦截器是否被调用
拦截器内部:
class MyContinuation<T>(private val c : Continuation<T>) : Continuation<T> {override val context: CoroutineContextget() = c.contextoverride fun resumeWith(result: Result<T>) {log("MyContinuation拦截器被调用:$result")c.resumeWith(result)}}
拦截器:
class MyInterceptor : ContinuationInterceptor {override val key: CoroutineContext.Key<*>get() = ContinuationInterceptoroverride fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {return MyContinuation(continuation)}}
不同启动方式:
fun m11(){GlobalScope.launch(context = MyInterceptor(), start = CoroutineStart.DEFAULT) {log(1)}log(2)Thread.sleep(100)}fun m12(){GlobalScope.launch(context = MyInterceptor(), start = CoroutineStart.UNDISPATCHED) {log(1)}log(2)Thread.sleep(100)}
m11打印结果:
(当前运行线程:main; 1625764915423) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625764915424) —> 1 (当前运行线程:main; 1625764915427) —> 2
m12打印结果:
(当前运行线程:main; 1625764878878) —> 1 (当前运行线程:main; 1625764878880) —> 2
结论:
- 可以很清楚的看出,
CoroutineStart.UNDISPATCHED会取消拦截器的调用,那么拦截器的子类,调度器切换线程也不会生效 CoroutineStart.UNDISPATCHED只会取消当前第一个拦截,后续的拦截不会取消,也符合start的命名,就是开始状态不拦截
上下文 CoroutineContext
class MyContext : CoroutineContext{// 提供了一个初始类initial R,一个函数operation,需要你返回一个初始类型一样的R// 猜测:这里需要提供就是 CoroutineContext.Element,应该是由自己的环境决定是返回原始R,还是经过operation进行变换后的Roverride fun <R> fold(initial: R, operation: (R, CoroutineContext.Element) -> R): R {TODO("Not yet implemented")}// 猜测:key类型是null接口,应该是用来标记Element的类型,key可能是不同泛型E的单例// 一种key代表一种Element,这里决定当前上下文是否包含某种key的对应的Elementoverride fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {TODO("Not yet implemented")}// 猜测:当前上下文中减去某种key对应的Element,移除后返回自己override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {TODO("Not yet implemented")}}
写一个自己的上下文实现携程上下文接口,必须要实现的方法有3个,先猜测3个方法的意义,再去看接口描述
/*
Persistent context for the coroutine. It is an indexed set of [Element] instances.
An indexed set is a mix between a set and a map.
Every element in this set has a unique [Key].
*/
CoroutineContext 既是一个Element的集合,每个Element既是元素,又是CoroutineContext本身,类似组合模式,自己包含自己。每个Element对应唯一的key,这个猜测正确。
Element是单例
内部的Key<E : Element>同样是单例,一一对应的关系
EmptyCoroutineContext空携程上下文
内部实现方法很简单,因为是空实现
CombinedContext左偏列表
所有携程的组合方式,每个左偏是一个CoroutineContext ,右边是Element,遍历的时候直到左边也是Element为止
挂起函数 suspend
挂起函数不会阻塞线程的执行
delay(1000)job.await()
上面这些都是 suspend函数
public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit): Job {val newContext = newCoroutineContext(context)val coroutine = if (start.isLazy)LazyStandaloneCoroutine(newContext, block) elseStandaloneCoroutine(newContext, active = true)coroutine.start(start, coroutine, block)return coroutine}public fun <T> CoroutineScope.async(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> T): Deferred<T> {val newContext = newCoroutineContext(context)val coroutine = if (start.isLazy)LazyDeferredCoroutine(newContext, block) elseDeferredCoroutine<T>(newContext, active = true)coroutine.start(start, coroutine, block)return coroutine}
launch、async中的block也是挂起函数
挂起和恢复线程可能不是同一个
携程的挂起、恢复可能不是同一个,具体取决恢复时的状态,
如果恢复时job已经执行完毕,那么将不会切换线程;
如果恢复时job没有执行完毕,由恢复携程的线程调用。
先上一个挂起函数的例子
fun m21(){GlobalScope.launch(MyInterceptor()) {log(1)val job = async {delay(500)log(4)"Hello"}log(3)val result = job.await()log(result)}log(2)Thread.sleep(1000)}
打印结果:
(当前运行线程:main; 1625767051406) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625767051407) —> 1 (当前运行线程:main; 1625767051410) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625767051419) —> 3 (当前运行线程:main; 1625767051423) —> 2 (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767051933) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767051933) —> 4 (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767051934) —> MyContinuation拦截器被调用:Success(Hello) (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767051934) —> Hello
结论:
delay会挂起函数await也会挂起函数job.await()是由delay完成后恢复的,所在线程与delay恢复后的线程一致
一个与上面差不多的例子,只是多睡了100mm
fun m22(){GlobalScope.launch(MyInterceptor()) {log(1)val job = async {delay(500)log(4)"Hello"}log(3)Thread.sleep(600) // 比上面的挂起delay挂起函数多休息100mmval result = job.await()log(result)}log(2)Thread.sleep(1000)}
除了多了一个Thread.sleep(600),没有任何区别
打印结果:
(当前运行线程:main; 1625767550618) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625767550619) —> 1 (当前运行线程:main; 1625767550623) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:main; 1625767550630) —> 3 (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767551132) —> MyContinuation拦截器被调用:Success(kotlin.Unit) (当前运行线程:kotlinx.coroutines.DefaultExecutor; 1625767551132) —> 4 (当前运行线程:main; 1625767551237) —> Hello (当前运行线程:main; 1625767551237) —> 2
结论:
- 拦截器少运行了一次!
job.await()没有被拦截到 job.await()直接在main线程中执行了- 因为
Thread.sleep(600),上面的挂起函数delay(500)已经执行完毕,所以job.await()直接拿到了结果,不需要等待函数恢复(函数恢复的时候会进行一次拦截)
挂起的时机
lauch 、 async 方法虽然不是suspend方法,但是block: suspend CoroutineScope.() -> Unit是挂起函数
suspend函数结束后是否进行调度是根据当前执行结果确定的
查看suspendCancellableCoroutine **{ }**方法
@SinceKotlin("1.3")@PublishedApi // This class is Published API via serialized representation of SafeContinuation, don't rename/moveinternal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
�只有COROUTINE_SUSPENDED状态下的函数才会真正挂起函
携程取消
Coroutine开启后,只有它的所有子Job完成,该Job才算完成,属性并发结构。
Job的方法,看着与Thread + Future 很像,都是可以取消的。
其实,Job的取消方法与Thread的interrupt 类似,都是发出中断指令,只要挂起的方法才能收到该指令,自动取消
例如:Job中写一个while(true)循环是没法取消的。
父Job取消,其中的所有子Job都会取消
private var index = 0private var job1: Job? = nullprivate var job2: Job? = nullfun m1() {job1 = GlobalScope.launch (){job2 = launch {try {while (true) {delay(500)log(index++)}} catch (e: Exception) {log("循环携程被取消 $e")}}try {job1!!.join()} catch (e: Exception) {log("调用join的携程被取消 $e")}}}// 查看job1是否完成fun m2() {Thread().interrupt()log("Job1是否执行完毕:${job1?.isCompleted}; Job1是否取消:${job1?.isCancelled}")}// 查看job2是否完成fun m3() {log("Job2是否执行完毕:${job2?.isCompleted}; Job1是否取消:${job2?.isCancelled}")}// 取消job1(外部携程)fun m4(){job1?.cancel()}// 取消job2(内部携程)fun m5(){job2?.cancel()}
取消父携程
调用m3()
�2021-07-15 15:23:45.843 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:1700132毫秒) —> 232021-07-15 15:23:46.350 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:506毫秒) —> 24 2021-07-15 15:23:46.863 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:514毫秒) —> 25 2021-07-15 15:23:47.374 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:511毫秒) —> 26 2021-07-15 15:23:47.879 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:505毫秒) —> 27 2021-07-15 15:23:48.303 32335-32483/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-1;携程:null; 距离上次间隔:424毫秒) —> 循环携程被取消 kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@ec6c707 2021-07-15 15:23:48.304 32335-32484/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-2;携程:null; 距离上次间隔:0毫秒) —> 调用join的携程被取消 kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@ec6c707
结论:
- 取消父携程,所有的子携程都会取消(是否取消成功取决于是否有挂起状态,suspend)
- 挂起状态的携程被取消,会报一个
JobCancellationException,但是该异常不会再往父类传递,也不会引起崩溃 - join不关心内部执行状态,正常结束,异常取消都可以,只关心是否结束,即不再阻塞
- join调用的携程被取消(父携程被取消),会抛出一个
JobCancellationException
奇怪的表现
一个奇怪的地方:如果使用自己的拦截器,join调度线程不会抛出JobCancellationException异常
ob1 = GlobalScope.launch(MyInterceptor()) // 修改代码使用自己的拦截器
2021-07-15 15:44:12.993 1498-1498/com.xxd.coroutine I/System.out: (线程:main;携程:null; 距离上次间隔:381毫秒) —> MyContinuation拦截器被调用:Failure(kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@fa69946)2021-07-15 15:44:12.994 1498-1498/com.xxd.coroutine I/System.out: (线程:main;携程:null; 距离上次间隔:1毫秒) —> 循环携程被取消 kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@fa69946 2021-07-15 15:44:12.996 1498-1498/com.xxd.coroutine I/System.out: (线程:main;携程:null; 距离上次间隔:2毫秒) —> MyContinuation拦截器被调用:Success(kotlin.Unit)
缺少了一个调用join的携程被取消,取而代之的是Success(kotlin.Unit)
取消子携程
调用m4()
(线程:DefaultDispatcher-worker-2;携程:null; 距离上次间隔:47毫秒) —> 循环携程被取消 kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@fa69946
结论:
- 子携程取消不会影响父携程
- 父携程如果挂起了该子携程,被取消收不到
JobCancellationException
join、async的区别
join不关心执行的结果,只关心是否执行完毕,异常、正常都返回,异常中断也不会往上抛(join调用的携程中断会抛出一个JobCancellationException)
async关心执行的结果正常,异常状态,异常会抛出该异常,await的时候会接收到该异常
异常传播
携程中的异常有一套传播机制,必须弄明白这套机制,才能准确使用携程
处理流程
launch 会在内部出现未捕获的异常时尝试触发对父协程的取消,能否取消要看作用域的定义,如果取消成功,那么异常传递给父协程,否则传递给启动时上下文中配置的 CoroutineExceptionHandler 中,如果没有配置,会查找全局(JVM上)的 CoroutineExceptionHandler 进行处理,如果仍然没有,那么就将异常交给当前线程的 UncaughtExceptionHandler 处理;而 async 则在未捕获的异常出现时同样会尝试取消父协程,但不管是否能够取消成功都不会后其他后续的异常处理,直到用户主动调用 await 时将异常抛出。
作用域
- 通过 GlobeScope 启动的协程单独启动一个协程作用域,内部的子协程遵从默认的作用域规则。通过 GlobeScope 启动的协程“自成一派”。
- coroutineScope 是继承外部 Job 的上下文创建作用域,在其内部的取消操作是双向传播的,子协程未捕获的异常也会向上传递给父协程。它更适合一系列对等的协程并发的完成一项工作,任何一个子协程异常退出,那么整体都将退出,简单来说就是”一损俱损“。这也是协程内部再启动子协程的默认作用域。
- supervisorScope 同样继承外部作用域的上下文,但其内部的取消操作是单向传播的,父协程向子协程传播,反过来则不然,这意味着子协程出了异常并不会影响父协程以及其他兄弟协程。它更适合一些独立不相干的任务,任何一个任务出问题,并不会影响其他任务的工作,简单来说就是”自作自受“,例如 UI,我点击一个按钮出了异常,其实并不会影响手机状态栏的刷新。需要注意的是,supervisorScope 内部启动的子协程内部再启动子协程,如无明确指出,则遵守默认作用域规则,也即 supervisorScope 只作用域其直接子协程。
验证GlobalScope
fun m3() {GlobalScope.launch(Dispatchers.Main + CoroutineName("3") + coroutineExceptionHandler) {log(1, this)try {GlobalScope.launch {delay(10)throw RuntimeException("异常3")}} catch (e: Exception) {log(e, this)}}}fun m4() {GlobalScope.launch(Dispatchers.Main + CoroutineName("4") + coroutineExceptionHandler) {log(1, this)GlobalScope.launch {delay(10)throw RuntimeException("异常4")}}}
以上2个方法均获取不到异常,页面崩溃,可见GlobalScope是独立的作用域,既不继承外面的作用域,也不抛出异常给外部
推广:是否所有Scope空间都是独立作用域
验证所有CoroutineScope
来一个自己的Scope
object MyScope : CoroutineScope {override val coroutineContext: CoroutineContextget() = Dispatchers.IO + CoroutineName("MyScope")}
重复上面的2个方法
fun m5() {MyScope.launch(coroutineExceptionHandler) {log(1, this)try {MyScope.launch {delay(10)throw RuntimeException("异常3")}} catch (e: Exception) {log(e, this)}}}fun m6() {MyScope.launch(coroutineExceptionHandler) {log(1, this)MyScope.launch {delay(10)throw RuntimeException("异常4")}}}
以上2个方法均获取不到异常,页面崩溃,可见MyScope是独立的作用域,既不继承外面的作用域,也不抛出异常给外部
结论:
- 所有的
CoroutineScope都是独立作用域,context独立创建,不继承
try catch能抓到异常的位置
try catch 能在携程中能像同步一样捕获异常,但是只在代码块中有效,不能作用于子携程
fun m7() {job1 = MyScope.launch() {log(1)try { // 可以抓到 RuntimeException("async 抛出来的")coroutineScope {launch {try { // 可以抓到一个 JobCancellationException 异常delay(10000)log(2)} catch (e: Exception) {log("3 $e")}}try { // 这里抓捕不到异常async {delay(10)throw RuntimeException("async 抛出来的")}} catch (e: Exception) {log("5 $e")}log(4)}} catch (e: Exception) {log("6 $e")}}}
2021-07-15 19:53:05.576 8702-8744/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-3;携程:null; 距离上次间隔:241047毫秒) —> 1 2021-07-15 19:53:05.579 8702-8744/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-3;携程:null; 距离上次间隔:4毫秒) —> 4 2021-07-15 19:53:05.594 8702-8745/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-4;携程:null; 距离上次间隔:15毫秒) —> 3 kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=ScopeCoroutine{Cancelling}@ec6c707 2021-07-15 19:53:05.595 8702-8745/com.xxd.coroutine I/System.out: (线程:DefaultDispatcher-worker-4;携程:null; 距离上次间隔:1毫秒) —> 6 java.lang.RuntimeException: async 抛出来的
抓不到5,可以抓到6
结论:
- try catch 可以抓到携程的异步异常
- try catch 只能抓到同一个携程(代码块)下的异常
- async内发生异常的时候虽然不会抛出,但是也会尝试取消父携程
- coroutineScope 不产生新的环境,不做调度,可以管理之下所有异常
�未理解的问题
lifecycleScope.launchWhenStarted后面的代码块中不能放入2个Flow的collect,否则第二个收不到消息
lifecycleScope.launchWhenStarted {activityViewModel.uiEvent.collect{when (it) {is TopicDetailUiEvent.ReloadEvent -> {reload()}else -> {}}}}lifecycleScope.launchWhenStarted {activityViewModel.uiEvent.collect{when (it) {is TopicDetailUiEvent.ReloadEvent -> {reload()}else -> {}}}}
