Kotlin协程指南–Kotlin中的并发编程

更新10/29/2018

介绍与动机

在本文中,您将了解Kotlin Coroutines:它们是什么,它们的外观以及工作方式。演示的代码示例已使用Kotlin 1.3.0和 kotlinx。协程 1.0.0.
Kotlin协同程序是“更大的功能”之一,如以下引自JetBrains的报价所示 博客:

我们都知道,在高负载下阻塞是不好的,轮询是不可行的,世界正变得越来越基于推送和异步。许多语言(从2012年的C#开始)都通过专用语言结构(例如async / await关键字)支持异步编程。在Kotlin中,我们推广了这个概念,以便库可以定义其构造的版本,并且async不是关键字,而只是函数。
这种设计允许集成不同的异步API:期货/承诺,回调传递等。它也足以表示惰性生成器(收益)并涵盖其他一些用例。

Kotlin团队引入了协程,为并发编程提供了简单的方法。我们中的绝大多数人可能已经使用了一些基于线程的并发工具,例如Java的并发API。我已经使用大量并发Java代码,并且大多数都同意API的成熟度。

爪哇并发与Kotlin协程

如果您仍然陷入困境 穿线并发 在Java中,我可以推荐这本书 实践中的Java并发 给你。

尽管Java的并发工具经过精心设计,但通常难以使​​用且使用起来相当繁琐。另一个问题是Java不会直接鼓励非阻塞编程。您通常会发现自己启动线程时没有想到它们的成本很高,并且会很快引入阻塞计算(由于锁,睡眠,等待等)。正在申请 非阻塞模式 否则,这确实很困难且容易出错。

另一方面,Kotlin Coroutines旨在通过隐藏大多数开发人员的复杂内容而变得更加容易,并且看起来像顺序代码。它们提供了一种运行异步代码的方式,而不必 线程,这为应用程序提供了新的可能性。而不是阻塞线程,计算正在 暂停.
许多资料将协程描述为“轻量级线程”。但是,它们不是我们从例如Java中了解到的线程。与线程相比,协程通常非常 便宜的 在创建过程中,线程自然不会产生额外的开销。原因之一是它们没有直接映射到本机线程。正如您将看到的,协程在主要由库管理的线程池中执行。
另一个关键区别是“局限性”:线程非常有限,因为它们依赖于可用的本机线程,另一端的协程几乎是免费的,并且可以一次启动数千个(尽管这取决于它们的计算能力)。

并发编程风格

各种语言中存在不同类型的异步/并发编程风格,例如:
*基于回调(JavaScript)
*基于未来/承诺(Java,JavaScript)
*基于异步/等待(C#)等

所有这些概念都可以用协程实现,因为Kotlin并没有直接规定其中的任何一个。
与基于回调的编程相反,协程还提供了另一种好处,即协程促进了一种顺序的异步编程:尽管协程可以执行多个并行计算,但是您的代码仍然可以像我们喜欢的那样看起来是顺序的。

Kotlin协程的概念

术语和概念 “协程”除了新东西。根据Wikipedia的文章,它是1958年创建的。许多现代编程语言都提供本机支持:C#,Go,Python,Ruby等。协程的实现(同样在Kotlin中)通常基于所谓的 “继续”,它们是“计算机程序控制状态的抽象表示”。我们将在本文的下一章中学习更多有关此的内容。

协程入门

要使用Kotlin协同程序设置项目,请逐步使用此步骤 参考 或只是查看我的示例存储库 的GitHub 并将其用作模板。

Kotlin协程成分

As already hinted, the Kotlin coroutine library provides an understandable high-level API that lets us start quickly. One new modifier we need to learn 是 suspend, which 是 used to mark a method as "悬浮".
我们将看一些使用的API的简单示例 kotlinx。协程 在下一节中。但首先,让我们学习一下 悬浮 功能是。

暂停功能

Coroutines rely on the suspend keyword, which 是 a modifier used to mark functions as "悬浮", i.e., that calls to such functions 可能会暂停 在任何时候. We can only call 这些 functions from coroutines or other 悬浮 functions.

suspend fun myMethod(p: String): Boolean {
    //...
}

正如我们在上面的示例中看到的那样,挂起函数看起来像一个常规函数,并添加了一个附加修饰符。请记住,从正常功能调用此类功能会导致 编译错误.

我们可以将协程看作是对常规函数和挂起函数的一系列调用。该序列可以选择在执行结束时提供结果。

动手

最后,让我们来看一些实际的协同程序。在第一个示例中,将显示基础知识:

fun main(args: Array<String>) = runBlocking { //(1)
    val job = GlobalScope.launch { //(2)
        val result = 悬浮Function() //(3)
        print("$result")
    }
    print("The result: ")
    job.join() //(4)
}
// prints "The result: 5"

在此示例中,您可以看到两个新功能, (1) runBlocking(2) launch,这两个都是 协程建设者. We can utilize various builders, 和 they all start a coroutine with different purposes: launch (fire 和 forget, can also be canceled), async (returns 诺言), runBlocking (blocks thread) 和 more. We can start coroutines in different scopes. In this example, the GlobalScope 是 used to spawn a launch coroutine that 是 as a result of this limited to the application lifecycle itself. This approach 是 fine for most examples we will see in this 文章, but it will most probably not be sufficient for your real-life applications. 跟随ing the concept of “结构化并发”,我们需要将协程限制在不同的范围内,以使其可维护和可管理。了解有关的概念 CoroutineScope 这里.

让我们观察一下这段代码的作用:内部协程由 (2) launch 做实际的工作:我们称之为 (3) 悬浮 function 和 then the coroutine prints its result. The main thread, after starting the coroutine, prints a String before the coroutine finishes.
Coroutines started 通过 launch return a Job immediately, which we can use to cancel the computation or await its completion with (4) join() as we see 这里. Since calling join() 可能会暂停, we need to wrap this call into another coroutine, which 是 why we use runBlocking as a wrapper. This concrete coroutine builder (1) "是 designed to bridge regular 块ing code to 悬浮 functions, 和 we can use it in main functions 和 tests”(引用 API)。如果我们删除了作业的加入,程序将在协程可以打印结果之前停止。

It's possible to spawn launch in the scope of the outer runBlocking coroutine directly. To do so, we change GlobalScope.launch to just launch. As a result, we can now also remove the explicit join since runBlocking won't complete before all of its child coroutines finish. This example again 是 a demonstration of 结构化并发,这是我们接下来将要详细介绍的原理。

结构化并发

As mentioned in the previous section, we can structure our coroutines in a way that they are more manageable 通过 creating a certain hierarchy between them. Imagine you work with a user interface which, for any reason, we need to terminate at a certain point due to some event. If we started coroutines which handle certain tasks in that UI, 这些 also should be terminated when the main task stops. With coroutines, it's vital to keep in mind that each coroutine can run in a different scope. We often want to group multiple coroutines 通过 a shared scope so that they can, for instance, be canceled altogether easily. Let's re-use the basic coroutine composition we saw in the first code snippet above but now with launch running in the scope of runBlocking 和 slightly different tasks:

fun main(args: Array<String>) {
    runBlocking {
        launch {
            delay(500)
            println("您好启动")
        }
        println("您好启动后来自runBlocking")
    }
    println("完成runBlocking")
}

此代码输出以下输出:

您好启动后来自runBlocking
您好启动
完成runBlocking

This output tells us that runBlocking does not complete before its child coroutine started 通过 launch finishes its work. Further, we can use this structure to easily delegate the cancellation on a certain coroutine down to its child coroutines:


fun main(args: Array<String>) { runBlocking { val outerLaunch = launch { launch { while (true) { delay(300) println("您好,第一次内部发布") } } launch { while (true) { delay(300) println("您好第二次内部发布") } } } println("外部启动后来自runBlocking的你好") delay(800) outerLaunch.cancel() } println("完成runBlocking") }

这段代码显示如下:

外部启动后来自runBlocking的你好
您好,第一次内部发布
您好第二次内部发布
您好,第一次内部发布
您好第二次内部发布
完成runBlocking

In this example, we can see a launch that creates an outer coroutine that then again launches two inner coroutines, all in the same scope. We cancel the outer coroutine which then delegates its cancelation to the inner coroutines, 和 nothing keeps running afterward. This approach also handles errors correctly since an exception happening in an arbitrary child coroutine will make all coroutines in its scope stop.

自订范围

Last but not least, we want to go a step further 和 create our very own CoroutineScope. In the last examples, we simply used the scope given 通过 runBlocking, which we only did for convenience. In real-life applications, it's necessary to create custom scopes to manage one's coroutines effectively. The API comes with a simple builder for this: coroutineScope. It's documentation 州s:

创建新的[CoroutineScope]并使用此范围调用指定的暂停块。提供的范围从外部范围继承其[coroutineContext] [CoroutineScope.coroutineContext],但覆盖上下文的[Job]。此功能是专为 平行分解 工作的。 当此范围中的任何子协程失败时,此范围将失败,并且所有其他子级都将被取消 (有关其他行为,请参见[supervisorScope])。

fun main(args: Array<String>) = runBlocking {
    coroutineScope {
        val outerLaunch = launch {
            launch {
                while (true) {
                    delay(300)
                    println("您好,第一次内部发布")
                }
            }
            launch {
                while (true) {
                    delay(300)
                    println("您好第二次内部发布")
                }
            }
        }

        println("外部启动后来自runBlocking的你好")
        delay(800)
        outerLaunch.cancel()
    }
    println("finished coroutineScope")
}

该代码看起来与我们之前看到的代码非常相似,但是现在我们在自定义范围内运行协程。有关更多参考,请阅读 这篇文章关于协程的结构化并发.

更深入

下面是一个更生动的示例:想象一下,您必须从应用程序发送电子邮件。请求收件人地址和呈现邮件正文是两项昂贵的任务,尽管它们彼此独立。聪明并且使用Kotlin,您想利用协程,同时执行两个任务。
如图所示:

suspend fun sendEmail(r: String, msg: String): Boolean { //(6)
    delay(2000)
    println("Sent '$msg' to $r")
    return true
}

suspend fun getReceiverAddressFromDatabase(): String { //(4)
    delay(1000)
    return "coroutine@kotlin.org"
}

suspend fun sendEmailSuspending(): Boolean {
    val msg = GlobalScope.async { //(3)
        delay(500)
        "The message content"
    }
    val recipient = GlobalScope.async { 
        getReceiverAddressFromDatabase() //(5)
    } 
    println("Waiting for email data")
    val sendStatus = GlobalScope.async {
        sendEmail(recipient.await(), msg.await()) //(7)
    }
    return sendStatus.await() //(8)
}

fun main(args: Array<String>) = runBlocking { //(1)
    val job = GlobalScope.launch {
        sendEmailSuspending() //(2)
        println("Email sent successfully.")
    }
    job.join() //(9)
    println("Finished")
}

首先,就像在前面的示例中看到的那样,我们使用 (1) launch builder inside a runBlocking builder so that we can (9) 等待协程的完成。此结构不是新结构,也不是新结构 (2) calling a 悬浮 function (sendEmailSuspending)。
此方法使用 (3) 内部协程,用于获取消息内容和 (4) another suspend method getReceiverAddressFromDatabase for getting the address. We execute both tasks in a separate coroutine built with (5) async. Note, that the calls to delay represent a 非阻塞, coroutine-suspending, alternative to Thread.sleep, which we use for mocking expensive computations 这里.

异步协程生成器

The async builder 是 simple 和 easy in its conception. As we know from many other languages, it returns a 诺言, which 是 of type Deferred in Kotlin. By the way, 诺言, future, deferred or delay are often used interchangeably for describing the same concept: The async method 诺言 计算我们可以随时等待或请求的值。

We can observe the "等待ing" on Kotlin’s Deferred object in (7) 暂停功能 (6) 是 called with the results of both prior computations. The method await() 是 called on instances of Deferred which suspends until the results become available. The call to sendEmail 是 also part of an async builder. Finally, we await its completion in (8) 返回结果之前。

共享可变状态

阅读本文时,您可能已经担心协程之间的同步,因为我还没有提及任何相关内容。我至少有这个担心,因为协程可以同时在共享状态下工作。很明显,意识到这一点与我们从其他语言(如Java)所知道的一样重要。我们可以利用类似的策略 线程安全的数据结构, 限制 执行到 单线程 或使用 .
除了常见的模式外,Kotlin协程还提倡“通过交流共享”的概念(请参见 质量检查)。

具体而言,“角色”可以用来表示我们在协程之间安全共享的状态。协程可以使用actor通过他们发送和接收消息。让我们看看它是如何工作的:

演员们

sealed class CounterMsg {
    object IncCounter : CounterMsg() // one-way message to increment counter
    class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with channel for reply.
}

fun counterActor() = GlobalScope.actor<CounterMsg> { //(1)
    var counter = 0 //(9) </b>actor 州, not shared
    for (msg in channel) { // handle incoming messages
        when (msg) {
            是 CounterMsg.IncCounter -> counter++ //(4)
            是 CounterMsg.GetCounter -> msg.response.send(counter) //(3)
        }
    }
}

suspend fun getCurrentCount(counter: SendChannel<CounterMsg>): Int { //(8)
    val response = Channel<Int>() //(2)
    counter.send(CounterMsg.GetCounter(response))
    val receive = response.receive()
    println("Counter = $receive")
    return receive
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val counter = counterActor()

    GlobalScope.launch { //(5)
            while(getCurrentCount(counter) < 100){
                delay(100)
                println("sending IncCounter message")
                counter.send(CounterMsg.IncCounter) //(7)
            }
        }

    GlobalScope.launch { //(6)
        while ( getCurrentCount(counter) < 100) {
            delay(200)
        }
    }.join()
    counter.close() // shutdown the actor
}

此示例显示了 (1) Actor,它是协程本身,可以在任何环境下工作。演员拿着 (9) 相关的 of this sample application, which 是 the mutable counter variable. Another important feature we haven’t considered so far 是 a (2) Channel.

频道

频道 提供一种转移 价值流, similar to what we know as BlockingQueue (enables producer-consumer pattern) in 爪哇 but without any 块ing methods. Instead, sendreceive are 悬浮 functions used for 提供和消费 渠道中的对象,使用 先进先出 战略。

演员默认情况下连接到此类通道,其他协程 (7) can use to communicate with other coroutines. In the example, the actor iterates over the stream of messages from its channel (for works with 悬浮 calls) handling them according to their type: (4) IncCounter 消息通过增加计数器使actor更改其状态,同时 (3) GetCounter 通过发送一个 独立讯息 to the GetCounter's SendChannel.
第一协程 (5) in main launches a task that sends (7) IncCounter 只要计数器小于100,就会向actor发送消息。 (6) 一个等待,直到计数器达到100。这两个协程都使用了暂停功能 (8) getCurrentCounter, which sends a GetCounter message to the actor 和 suspends 通过 等待ing for receive to return.

如我们所见,整个可变 密闭的 特定演员协程,从而解决了 共享的可变状态 并遵循 通过交流分享.

更多功能和示例

您可以在以下位置找到更多示例和出色的文档 这些 文件。

工作原理-Kotlin协程的实现

Coroutines do not rely on features of the operating system or the JVM. Instead, the compiler transforms coroutines 和 suspend functions 和 produces a 状态机 它能够处理一般的悬浮液,并能够绕过悬浮的协程保持其状态。概念 延续 启用此功能。编译器会将连续性作为附加参数添加到每个挂起函数中。这种技术称为 “继续传球风格”.

让我们看一下下面的函数在添加了延续之后的样子:

suspend fun sampleSuspendFun(x: Int): Int {
    delay(2000)
    return x * x
}

编译器将此函数修改为如下形式:

public static final Object sampleSuspendFun(int x, @NotNull Continuation var1)

It adds another parameter, the Continuation, to it. Now, if we call this function from a coroutine, the compiler will pass the code that occurs after the sampleSuspendFun function call as a continuation. After the sampleSuspendFun completes its work, it will trigger the continuation. That's just what we know from a callback-based programming model already but hidden 通过 the compiler. To be honest, this 是 just a simplified explanation. You can learn more about the details 这里.

与Roman Elizarov(JetBrains)进行质量检查

我有机会向Roman Elizarov提出了几个问题,他为JetBrains工作,并对Kotlin协程负有高度责任。阅读他说的话:

问: 我什么时候应该使用协程,是否有仍然需要显式线程化的用例?

A: 经验法则 罗马:
协程适用于执行以下任务的异步任务 等待 在大多数情况下线程用于CPU密集型任务。

问: “轻量线程”一词在我看来有点不合适,因为它掩盖了协程的事实 依靠线程 并且我希望该库在线程池中执行它们。*我更喜欢一些简单的东西,例如正在执行,已停止的“任务”等。为什么仍然决定像线程一样描述它们?

A: 罗曼回答说,“轻量级线程”是很肤浅的,而“协程从用户的角度来看在许多方面都像线程。”

问: 如果协程类似于线程,则必须在不同协程之间同步共享状态,对吗? *

A: 罗曼告诉我,我们仍然可以使用已知的同步模式,但是强烈建议在使用协程时完全没有任何可变的共享状态。相反,协程鼓励“交流共享”风格。

结论

与Java相反,Kotlin提倡一种完全不同的并发编程风格,即 非阻塞 当然也不会使我们开始大量的本机线程。
编写并发Java代码通常意味着启动太多线程或忘记适当的线程池管理。但是,这种粗心大意会带来巨大的开销,从而导致执行速度变慢。协程,作为替代,据说是 “轻量级线程”,它描述了一个事实,即它们未映射到本机线程,因此不会拖延我们通常必须处理的所有风险和问题(死锁,饥饿等)。如我们所见,使用协程,我们通常不必担心阻塞线程。此外,只要我们追求同步,同步就更直接了,理想情况下甚至没有必要。 “通过交流分享” 原理。

协程还使我们能够处理几种不同类型的并发编程。许多已经可用,并且可以添加其他。
特别是Java开发人员,很可能会喜欢async / await样式,该样式看起来与他们通过使用期货已经知道的样式类似。它不仅是可比的替代产品,而且是对我们已经知道的重大改进。
并发Java代码附带了很多 检查异常, 防守锁定 策略与一般 样板 码。所有这些都可以通过协程进行改进,协程允许并发代码看起来 顺序的,易于管理且易于阅读。

透视

协程将最终与 Kotlin 1.3。 Kotlin团队将删除它的“实验”性质,协程将保持稳定状态。

今天(29/10/2018),Kotlin 1.3已发布,其中包括稳定版的协程ðŸ™,

请随时与我们取得联系,反馈总是很感激ðŸ™,如果您愿意,也可以看看我的 推特 如果您对更多Kotlin的东西感兴趣,请遵循。非常感谢。如果您想进一步了解Kotlin的美丽功能,我强烈推荐这本书 行动中的科特林 和我的 其他文章 给你。

西蒙

关于15条想法“Kotlin协程指南–Kotlin中的并发编程

发表评论

您的电子邮件地址不会被公开。 必需的地方已做标记 *