Hello, 异步函数
func hello() async -> String {
try? await Task.sleep(nanoseconds: 1000_000_000)
return "hello"
}
Kotlin:
suspend fun hello(): String {
delay(1.seconds)
return "hello"
}
结构化并发
let task = Task {
await hello()
}
task.cancel()
Kotlin:
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch(Dispatchers.Main) {
hello()
}
scope.cancel()
Task
async标记的函数只能在async标记的函数或闭包中调用, Task的初始化器提供了async闭包环境:
public init(priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Success)
可选使用TaskPriority参数:
Task(priority: .high) {
...
}
取值:
high, medium, low,
userInitiated, utility, background,
后面三种等级类似于DispatchQos
.
withUnsafeCurrentTask
如果要获取当前task, 可以使用withUnsafeCurrentTask
:
withUnsafeCurrentTask { task in
task?.isCancelled
task?.priority
task?.cancel()
...
}
此函数非async, 如果当前环境不存在task, task值为nil.
如果仅想检查任务优先级和任务是否取消, 可以直接使用Task.currentPriority
和Task.isCancelled
.
Task嵌套
对于Task嵌套的情况:
Task {
Task {
...
}
}
子Task会继承当前Task的环境(但仍然是独立的Task, 非结构化并发), 如果不想继承可以使用Task.detached(priority:operation:)
:
Task {
Task.detached {
...
}
}
CancelBag
task离开作用域时不会被取消, 如果需要可以仿照DisposeBag实现CancelBag(非线程安全):
public final class CancelBag {
private var cancels = [() -> Void]()
public func on(cancel: @escaping () -> Void) {
self.cancels.append(cancel)
}
private func cancel() {
cancels.forEach { cancel in cancel() }
}
deinit {
self.cancel()
}
}
extension Task {
func cancel(by bag: CancelBag) {
bag.on(cancel: { cancel() })
}
}
使用:
let bag = CancelBag()
Task {
...
}.cancel(by: bag)
要注意循环引用. 如果不想在task闭包中处理循环引用, 可以使用手动的CancelBag:
public final class CancelBag {
private var cancels = [() -> Void]()
public func on(cancel: @escaping () -> Void) {
self.cancels.append(cancel)
}
public func cancel() {
cancels.forEach { cancel in cancel() }
cancels.removeAll()
}
}
并在适当时机调用bag.cancel()
.
async let
async let hello = hello()
async let world = world()
print("\(await hello) \(await world)")
Kotlin:
val hello = async { hello() }
val world = async { world() }
println("${hello.await()} ${world.await()}")
同时执行多个async函数. 比如发送网络请求时, 如果第三个请求的参数依赖前两个请求的结果:
async let x = reqX()
async let y = reqY()
let z = await reqZ(await x, await y)
可以让x, y请求同时发出, 都得到响应后, 再发出z请求
for await
func number() async -> Int {
.random(in: 1...100)
}
await withTaskGroup(of: Int.self) { group in
for _ in 1...100 {
group.addTask { await number() }
}
for await i in group {
print(i)
}
}
Kotlin:
suspend fun number() = Random.nextInt(0..100)
val numbers = produce {
coroutineScope {
repeat(100) {
launch { send(number()) }
}
}
}
for (i in numbers) {
println(i)
}
for await
用于迭代实现了AsyncSequence
协议的类型, 如TaskGroup
和下文将要提到的AsyncStream
.
withTaskGroup(of:)
效果类似于corountineScope
, 可以添加一系列子任务并等待这些任务完成.
如果子Task会调用throws, 可以使用withThrowingTaskGroup
:
try await withThrowingTaskGroup(of: Int.self) { group in
group.addTask { throw SomeError() }
for try await i in group {
...
}
}
await all
await withTaskGroup(of: Int.self) { group in
for _ in 1...100 {
group.addTask { await number() }
}
await group.reduce(into: []) { result, transition in result.append(transition) }
}
(1..100)
.map { async { number() } }
.awaitAll()
收集子任务结果.
AsyncStream
let stream = AsyncStream<Int> { continuation in
for i in 1...100 {
continuation.yield(i)
}
continuation.finish()
}
for await i in stream {
print(i)
}
let stream = AsyncStream<Int> {
try? await Task.sleep(nanoseconds: 1000_000_000)
return .random(in: 1...100)
}
for await i in stream {
print(i)
}
Kotlin:
flow {
(1..10).forEach { emit(it) }
}.collect {
println(it)
}
flow {
while (true) {
delay(1.seconds)
emit(Random.nextInt(1..10))
}
}.collect {
println(it)
}
这里AsyncStream和flow对比比较直观, 但其行为更像ReceiveChannel, 因为flow是冷的.
Actor
actor Counter {
var count: Int = 0
func inc() {
count += 1
}
}
let couter = Counter()
await couter.inc()
let count = await couter.count
Kotlin:
sealed interface CounterMsg
object IncCount : CounterMsg
class GetCount(val response: CompletableDeferred<Int> = CompletableDeferred()) : CounterMsg
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var count = 0
for (msg in channel) {
when (msg) {
is GetCount -> msg.response.complete(count)
is IncCount -> ++count
}
}
}
suspend fun SendChannel<CounterMsg>.count() = GetCount().also { send(it) }.response.await()
val counter = counterActor()
counter.send(IncCount)
val count = counter.count()
Swift在语言层面支持了actor, 因此使用actor要比Kotlin方便许多.
从外部调用actor中的函数、可变属性和计算属性需要使用await. 如果确定不存在数据竞争, 可以用nonisolated
修饰:
nonisolated func foo() -> String {
...
}
这样就可以像调用普通函数一样从外部调用.
GlobalActor
用@globalActor创建一个修饰器, 用此修饰器修饰, 可以让函数/属性/class共同使用一个actor环境:
@globalActor
actor BusActor: GlobalActor {
public static let shared = BusActor()
}
@BusActor func foo() {
...
}
@BusActor var bar = 0
@BusActor class Bus {}
MainActor
MainActor是运行于主线程的GlobalActor:
@globalActor final public actor MainActor : GlobalActor
可以把更新UI的函数用@MainActor修饰:
@MainActor
func update() {
...
}
以保证此函数一定在主线程执行.
Sendable
Sendable是一个mark协议, 用于标记类型为可安全在actor间移动的, 类似于Rust的Send
trait.
public protocol Sendable {
}
以下类型是Sendable的:
-
基本类型
-
actor类型
-
由Sendable类型构成的struct
-
满足以下条件的@Sendable修饰的函数或闭包:
-
捕获变量是Sendable类型
-
捕获变量不可变
-
-
满足以下条件的显式实现Sendable的class:
-
final修饰, 即不可继承
-
存储属性是Sendable类型
-
存储属性不可变
-
无父类或父类是NSObject
-
如果确定不符合以上条件的class可以安全Send, 可以实现@unchecked Sendable以跳过编译检查:
class Foo: @unchecked Sendable {
}
从Callback迁移
func request(_ completion: (String) -> Void) {
...
}
func request() async -> String {
await withCheckedContinuation { continuation in
request { response in
continuation.resume(returning: response)
}
}
}
Kotlin:
fun request(callback: (String) -> Unit) {
...
}
suspend fun request(): String = suspendCoroutine { continuation ->
request {
continuation.resume(it)
}
}
如果闭包内调用了throws, 可以使用withCheckedThrowingContinuation(function:_:)
如果可以确保调用且仅调用一次resume
, 可以使用unsafe版本:
-
withUnsafeContinuation(_:)
-
withUnsafeThrowingContinuation(_:)