1341 字
7 分钟
Swift 并发, 对比 Kotlin协程

Hello, 异步函数#

func hello() async -> String {
    try? await Task.sleep(nanoseconds: 1000_000_000)
    return "hello"
}

Kotlin:

suspend fun hello(): String {
    delay(1.seconds)
    return "hello"
}

结构化并发#

let task = Task {
    await hello()
}
task.cancel()

Kotlin:

val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch(Dispatchers.Main) {
    hello()
}
scope.cancel()

Task#

async标记的函数只能在async标记的函数或闭包中调用, Task的初始化器提供了async闭包环境:

public init(priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Success)

可选使用TaskPriority参数:

Task(priority: .high) {
    ...
}

取值:

high, medium, low,
userInitiated, utility, background,

后面三种等级类似于DispatchQos.

withUnsafeCurrentTask#

如果要获取当前task, 可以使用withUnsafeCurrentTask:

withUnsafeCurrentTask { task in
    task?.isCancelled
    task?.priority
    task?.cancel()
    ...
}

此函数非async, 如果当前环境不存在task, task值为nil.

如果仅想检查任务优先级和任务是否取消, 可以直接使用Task.currentPriorityTask.isCancelled.

Task嵌套#

对于Task嵌套的情况:

Task {
    Task {
        ...
    }
}

子Task会继承当前Task的环境(但仍然是独立的Task, 非结构化并发), 如果不想继承可以使用Task.detached(priority:operation:):

Task {
    Task.detached {
        ...
    }
}

CancelBag#

task离开作用域时不会被取消, 如果需要可以仿照DisposeBag实现CancelBag(非线程安全):

public final class CancelBag {

    private var cancels = [() -> Void]()

    public func on(cancel: @escaping () -> Void) {
        self.cancels.append(cancel)
    }

    private func cancel() {
        cancels.forEach { cancel in cancel() }
    }
    
    deinit {
        self.cancel()
    }
}

extension Task {
    func cancel(by bag: CancelBag) {
        bag.on(cancel: { cancel() })
    }
}

使用:

let bag = CancelBag()

Task {
    ...
}.cancel(by: bag)

要注意循环引用. 如果不想在task闭包中处理循环引用, 可以使用手动的CancelBag:

public final class CancelBag {

    private var cancels = [() -> Void]()

    public func on(cancel: @escaping () -> Void) {
        self.cancels.append(cancel)
    }

    public func cancel() {
        cancels.forEach { cancel in cancel() }
        cancels.removeAll()
    }
}

并在适当时机调用bag.cancel().

async let#

async let hello = hello()
async let world = world()
print("\(await hello) \(await world)")

Kotlin:

val hello = async { hello() }
val world = async { world() }
println("${hello.await()} ${world.await()}")

同时执行多个async函数. 比如发送网络请求时, 如果第三个请求的参数依赖前两个请求的结果:

async let x = reqX()
async let y = reqY()
let z = await reqZ(await x, await y)

可以让x, y请求同时发出, 都得到响应后, 再发出z请求

for await#

func number() async -> Int {
    .random(in: 1...100)
}

await withTaskGroup(of: Int.self) { group in
    for _ in 1...100 {
        group.addTask { await number() }
    }
    
    for await i in group {
        print(i)
    }
}

Kotlin:

suspend fun number() = Random.nextInt(0..100)

val numbers = produce {
    coroutineScope {
        repeat(100) {
            launch { send(number()) }
        }
    }
}

for (i in numbers) {
    println(i)
}

for await用于迭代实现了AsyncSequence协议的类型, 如TaskGroup和下文将要提到的AsyncStream.

withTaskGroup(of:)效果类似于corountineScope, 可以添加一系列子任务并等待这些任务完成.

如果子Task会调用throws, 可以使用withThrowingTaskGroup:

try await withThrowingTaskGroup(of: Int.self) { group in
    group.addTask { throw SomeError() }

    for try await i in group {
	...
    }
}

await all#

await withTaskGroup(of: Int.self) { group in
    for _ in 1...100 {
        group.addTask { await number() }
    }
    
    await group.reduce(into: []) { result, transition in result.append(transition) }
}
(1..100)
    .map { async { number() } }
    .awaitAll()

收集子任务结果.

AsyncStream#

let stream = AsyncStream<Int> { continuation in
    for i in 1...100 {
        continuation.yield(i)
    }
    continuation.finish()
}

for await i in stream {
    print(i)
}
let stream = AsyncStream<Int> {
    try? await Task.sleep(nanoseconds: 1000_000_000)
    return .random(in: 1...100)
}

for await i in stream {
    print(i)
}

Kotlin:

flow {
    (1..10).forEach { emit(it) }
}.collect {
    println(it)
}
flow {
    while (true) {
        delay(1.seconds)
        emit(Random.nextInt(1..10))
    }
}.collect {
    println(it)
}

这里AsyncStream和flow对比比较直观, 但其行为更像ReceiveChannel, 因为flow是冷的.

Actor#

actor Counter {
    var count: Int = 0
    
    func inc() {
        count += 1
    }
}
let couter = Counter()
await couter.inc()
let count = await couter.count

Kotlin:

sealed interface CounterMsg

object IncCount : CounterMsg
class GetCount(val response: CompletableDeferred<Int> = CompletableDeferred()) : CounterMsg

fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var count = 0
    for (msg in channel) {
        when (msg) {
            is GetCount -> msg.response.complete(count)
            is IncCount -> ++count
        }
    }
}

suspend fun SendChannel<CounterMsg>.count() = GetCount().also { send(it) }.response.await()
val counter = counterActor()

counter.send(IncCount)
val count = counter.count()

Swift在语言层面支持了actor, 因此使用actor要比Kotlin方便许多.

从外部调用actor中的函数、可变属性和计算属性需要使用await. 如果确定不存在数据竞争, 可以用nonisolated修饰:

nonisolated func foo() -> String {
    ...
}

这样就可以像调用普通函数一样从外部调用.

GlobalActor#

用@globalActor创建一个修饰器, 用此修饰器修饰, 可以让函数/属性/class共同使用一个actor环境:

@globalActor
actor BusActor: GlobalActor {
    public static let shared = BusActor()
}

@BusActor func foo() {
    ...
}

@BusActor var bar = 0

@BusActor class Bus {}

MainActor#

MainActor是运行于主线程的GlobalActor:

@globalActor final public actor MainActor : GlobalActor

可以把更新UI的函数用@MainActor修饰:

@MainActor
func update() {
    ...
}

以保证此函数一定在主线程执行.

Sendable#

Sendable是一个mark协议, 用于标记类型为可安全在actor间移动的, 类似于Rust的Sendtrait.

public protocol Sendable {
}

以下类型是Sendable的:

  • 基本类型

  • actor类型

  • 由Sendable类型构成的struct

  • 满足以下条件的@Sendable修饰的函数或闭包:

    • 捕获变量是Sendable类型

    • 捕获变量不可变

  • 满足以下条件的显式实现Sendable的class:

    • final修饰, 即不可继承

    • 存储属性是Sendable类型

    • 存储属性不可变

    • 无父类或父类是NSObject

如果确定不符合以上条件的class可以安全Send, 可以实现@unchecked Sendable以跳过编译检查:

class Foo: @unchecked Sendable {
}

从Callback迁移#

func request(_ completion: (String) -> Void) {
    ...
}

func request() async -> String {
    await withCheckedContinuation { continuation in
        request { response in
            continuation.resume(returning: response)
        }
    }
}

Kotlin:

fun request(callback: (String) -> Unit) {
    ...
}

suspend fun request(): String = suspendCoroutine { continuation ->
    request {
        continuation.resume(it)
    }
}

如果闭包内调用了throws, 可以使用withCheckedThrowingContinuation(function:_:)

如果可以确保调用且仅调用一次resume, 可以使用unsafe版本:

  • withUnsafeContinuation(_:)

  • withUnsafeThrowingContinuation(_:)

Swift 并发, 对比 Kotlin协程
https://shsuco.com/posts/swift并发-对比kotlin协程/
作者
shsuco
发布于
2022-07-27
许可协议
CC BY-NC-SA 4.0