0%

Swift并发, 对比Kotlin协程

Hello, 异步函数

1
2
3
4
func hello() async -> String {
try? await Task.sleep(nanoseconds: 1000_000_000)
return "hello"
}

Kotlin:

1
2
3
4
suspend fun hello(): String {
delay(1.seconds)
return "hello"
}

结构化并发

1
2
3
4
let task = Task {
await hello()
}
task.cancel()

Kotlin:

1
2
3
4
5
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch(Dispatchers.Main) {
hello()
}
scope.cancel()

Task

async标记的函数只能在async标记的函数或闭包中调用, Task的初始化器提供了async闭包环境:

1
public init(priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Success)

可选使用TaskPriority参数:

1
2
3
Task(priority: .high) {
...
}

取值:

1
2
high, medium, low,
userInitiated, utility, background,

后面三种等级类似于DispatchQos.

withUnsafeCurrentTask

如果要获取当前task, 可以使用withUnsafeCurrentTask:

1
2
3
4
5
6
withUnsafeCurrentTask { task in
task?.isCancelled
task?.priority
task?.cancel()
...
}

此函数非async, 如果当前环境不存在task, task值为nil.

如果仅想检查任务优先级和任务是否取消, 可以直接使用Task.currentPriorityTask.isCancelled.

Task嵌套

对于Task嵌套的情况:

1
2
3
4
5
Task {
Task {
...
}
}

子Task会继承当前Task的环境(但仍然是独立的Task, 非结构化并发), 如果不想继承可以使用Task.detached(priority:operation:):

1
2
3
4
5
Task {
Task.detached {
...
}
}

CancelBag

task离开作用域时不会被取消, 如果需要可以仿照DisposeBag实现CancelBag(非线程安全):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class CancelBag {

private var cancels = [() -> Void]()

public func on(cancel: @escaping () -> Void) {
self.cancels.append(cancel)
}

private func cancel() {
cancels.forEach { cancel in cancel() }
}

deinit {
self.cancel()
}
}

extension Task {
func cancel(by bag: CancelBag) {
bag.on(cancel: { cancel() })
}
}

使用:

1
2
3
4
5
let bag = CancelBag()

Task {
...
}.cancel(by: bag)

要注意循环引用. 如果不想在task闭包中处理循环引用, 可以使用手动的CancelBag:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class CancelBag {

private var cancels = [() -> Void]()

public func on(cancel: @escaping () -> Void) {
self.cancels.append(cancel)
}

public func cancel() {
cancels.forEach { cancel in cancel() }
cancels.removeAll()
}
}

并在适当时机调用bag.cancel().

async let

1
2
3
async let hello = hello()
async let world = world()
print("\(await hello) \(await world)")

Kotlin:

1
2
3
val hello = async { hello() }
val world = async { world() }
println("${hello.await()} ${world.await()}")

同时执行多个async函数.
比如发送网络请求时, 如果第三个请求的参数依赖前两个请求的结果:

1
2
3
async let x = reqX()
async let y = reqY()
let z = await reqZ(await x, await y)

可以让x, y请求同时发出, 都得到响应后, 再发出z请求

for await

1
2
3
4
5
6
7
8
9
10
11
12
13
func number() async -> Int {
.random(in: 1...100)
}

await withTaskGroup(of: Int.self) { group in
for _ in 1...100 {
group.addTask { await number() }
}

for await i in group {
print(i)
}
}

Kotlin:

1
2
3
4
5
6
7
8
9
10
11
12
13
suspend fun number() = Random.nextInt(0..100)

val numbers = produce {
coroutineScope {
repeat(100) {
launch { send(number()) }
}
}
}

for (i in numbers) {
println(i)
}

for await用于迭代实现了AsyncSequence协议的类型, 如TaskGroup和下文将要提到的AsyncStream.

withTaskGroup(of:)效果类似于corountineScope, 可以添加一系列子任务并等待这些任务完成.

如果子Task会调用throws, 可以使用withThrowingTaskGroup:

1
2
3
4
5
6
7
try await withThrowingTaskGroup(of: Int.self) { group in
group.addTask { throw SomeError() }

for try await i in group {
...
}
}

await all

1
2
3
4
5
6
7
await withTaskGroup(of: Int.self) { group in
for _ in 1...100 {
group.addTask { await number() }
}

await group.reduce(into: []) { result, transition in result.append(transition) }
}
1
2
3
(1..100)
.map { async { number() } }
.awaitAll()

收集子任务结果.

AsyncStream

1
2
3
4
5
6
7
8
9
10
let stream = AsyncStream<Int> { continuation in
for i in 1...100 {
continuation.yield(i)
}
continuation.finish()
}

for await i in stream {
print(i)
}
1
2
3
4
5
6
7
8
let stream = AsyncStream<Int> {
try? await Task.sleep(nanoseconds: 1000_000_000)
return .random(in: 1...100)
}

for await i in stream {
print(i)
}

Kotlin:

1
2
3
4
5
flow {
(1..10).forEach { emit(it) }
}.collect {
println(it)
}
1
2
3
4
5
6
7
8
flow {
while (true) {
delay(1.seconds)
emit(Random.nextInt(1..10))
}
}.collect {
println(it)
}

这里AsyncStream和flow对比比较直观, 但其行为更像ReceiveChannel, 因为flow是冷的.

Actor

1
2
3
4
5
6
7
actor Counter {
var count: Int = 0

func inc() {
count += 1
}
}
1
2
3
let couter = Counter()
await couter.inc()
let count = await couter.count

Kotlin:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sealed interface CounterMsg

object IncCount : CounterMsg
class GetCount(val response: CompletableDeferred<Int> = CompletableDeferred()) : CounterMsg

fun CoroutineScope.counterActor() = actor<CounterMsg> {
var count = 0
for (msg in channel) {
when (msg) {
is GetCount -> msg.response.complete(count)
is IncCount -> ++count
}
}
}

suspend fun SendChannel<CounterMsg>.count() = GetCount().also { send(it) }.response.await()
1
2
3
4
val counter = counterActor()

counter.send(IncCount)
val count = counter.count()

Swift在语言层面支持了actor, 因此使用actor要比Kotlin方便许多.

从外部调用actor中的函数、可变属性和计算属性需要使用await. 如果确定不存在数据竞争, 可以用nonisolated修饰:

1
2
3
nonisolated func foo() -> String {
...
}

这样就可以像调用普通函数一样从外部调用.

GlobalActor

用@globalActor创建一个修饰器, 用此修饰器修饰, 可以让函数/属性/class共同使用一个actor环境:

1
2
3
4
5
6
7
8
9
10
11
12
@globalActor
actor BusActor: GlobalActor {
public static let shared = BusActor()
}

@BusActor func foo() {
...
}

@BusActor var bar = 0

@BusActor class Bus {}

MainActor

MainActor是运行于主线程的GlobalActor:

1
@globalActor final public actor MainActor : GlobalActor

可以把更新UI的函数用@MainActor修饰:

1
2
3
4
@MainActor
func update() {
...
}

以保证此函数一定在主线程执行.

Sendable

Sendable是一个mark协议, 用于标记类型为可安全在actor间移动的, 类似于Rust的Sendtrait.

1
2
public protocol Sendable {
}

以下类型是Sendable的:

  • 基本类型

  • actor类型

  • 由Sendable类型构成的struct

  • 满足以下条件的@Sendable修饰的函数或闭包:

    • 捕获变量是Sendable类型

    • 捕获变量不可变

  • 满足以下条件的显式实现Sendable的class:

    • final修饰, 即不可继承

    • 存储属性是Sendable类型

    • 存储属性不可变

    • 无父类或父类是NSObject

如果确定不符合以上条件的class可以安全Send, 可以实现@unchecked Sendable以跳过编译检查:

1
2
class Foo: @unchecked Sendable {
}

从Callback迁移

1
2
3
4
5
6
7
8
9
10
11
func request(_ completion: (String) -> Void) {
...
}

func request() async -> String {
await withCheckedContinuation { continuation in
request { response in
continuation.resume(returning: response)
}
}
}

Kotlin:

1
2
3
4
5
6
7
8
9
fun request(callback: (String) -> Unit) {
...
}

suspend fun request(): String = suspendCoroutine { continuation ->
request {
continuation.resume(it)
}
}

如果闭包内调用了throws, 可以使用withCheckedThrowingContinuation(function:_:)

如果可以确保调用且仅调用一次resume, 可以使用unsafe版本:

  • withUnsafeContinuation(_:)

  • withUnsafeThrowingContinuation(_:)