Combine
文中写一些 Swift 方法签名时,会带上 label,如
subscribe(_ subscriber:)
,正常作为 Selector 的写法时会忽略掉 label,只写作subscribe(_:)
,本文特意带上 label 以使含义更清晰。
Combine Framework
Overview
在 App 运行过程中会发生各种各样的异步事件,如网络请求的返回,Notification 的发送等。在处理这些异步事件时,我们经常会使用异步回调、代理方法等。Combine 框架提供了一种声明式的 Swift API,可以将一个异步事件的处理逻辑表示成单独的一个处理链,链上的每个节点接收上一个节点的处理结果,执行自己的处理逻辑,然后传递给下一个节点。
Combine 框架采用 publisher-subscriber 模式:
- 协议
Publisher
表示一种能够随时间产生一系列值的类型。Combine 还为这些 Publishers 提供了许多 operators 来处理从上游接收的值,然后再重新发送到下游。 - 在这个处理链的最末端,是由协议
Subscriber
表示的订阅者类型,接收并处理发送给它的值。 - 当一个 Subscriber 订阅到一个 Publisher 上时,会接收到一个新生成的由协议
Subscription
表示的类型对象,Subscriber 通过该对象来向 Publisher 请求值,而 Publisher 也只有在接收到 Subscriber 的显式请求时才会分发值。
Publisher 和 Subscriber 的交互流程
- 首先,subscriber 调用 publisher 的
subscribe(_ subscriber:)
方法,将自己作为参数传过去;subscribe(_ subscriber:)
会接着调用 publisher 的receive(subscriber:)
方法。 - 方法
receive(subscriber:)
是Publisher
协议的要求方法,所有遵循 Publisher 的类型都必须实现该方法,在这个方法里处理 subscriber 的订阅逻辑。但是使用方又不能直接调用该方法,而必须通过调用扩展方法subscribe(_ subscriber:)
来发起订阅。 - Publisher 接受订阅之后,会创建一个新的 Subscription 对象,然后调用 subscriber 的
receive(subscription:)
方法。协议 Subscription 约束了一个方法request(_ demand:)
,subscriber 通过调用该方法来说明自己需要请求多少的值。只有 request 之后,publisher 才会向该 subscriber 发布值。 - Publisher 通过调用 subscriber 的
receive(_ input:)
方法来向其发布值,并在结束时调用 subscriber 的receive(completion:)
来进行通知。注意这里面说的调用并不一定指 publisher 对象持有 subscriber 对象,然后直接调用 subscriber 对象的上述方法,具体是否持有是具体实现细节,也有可能通过某些中间对象间接调用。Anyway,实现 Subscriber 协议的类型必须实现这两个方法(以及开头的receive(subscription:)
方法),在这些方法中处理从 publisher 那里接收到的值。
Combine 中的 Publishers
内置 publishers
Combine 框架提供了许多内置的 publisher 类型供我们使用,如:
- 为 Sequence 类型实现的 publisher 扩展;
- 为 NotificationCenter 实现的
publisher(for:object:)
扩展; - URLSession 的
dataTaskPublisher(for:)
扩展; - …
Subject
Subject 给我们提供了一种向流中插入值的方式,为我们在存量命令式编程的代码中引入 Combine 提供了一个强大的工具。Subject 本身即是一个 publisher,下游可以正常去 subscribe 它,然后使用方通过调用它的 send(_ value:)
方法来发布一个值。Combine 提供了两种 subject:
- CurrentValueSubject:如其名,会维护一个当前值,初始化时需要传入一个初始值作为当前值,后续通过调用
send(_ value:)
来更新当前值。当一个新的 subscriber 订阅时,会马上收到一次最新的当前值。 - PassthroughSubject:不同于 CurrentValueSubject,内部没有缓存状态,每次调用
send(_ value:)
时才会向下游发布值。
@Published
该 property wrapper 修饰 Class 的某个属性,为其生成一个 publisher,使用方通过 $
加上属性名来访问该 publisher。当属性值变化时,该 publisher 会在属性的 willSet 里发布新值,因此需要留意属性值本身尚未被更新仍是旧值,传给 subscriber 的值是新值。
Combine 中的 Subscribers
Combine 提供了两个内置的 subscribers:Subscribers.Sink 和 Subscribers.Assign,但一般不直接创建这两个的实例,而是通过 Publisher 的两种扩展方法 sink 和 assign 来获取类型抹除后的 AnyCancellable 对象。
Sink
Sink subscriber 创建的时候会立即调用 subscription 对象的 request(.unlimited)
,后面会详细介绍 Demand 的用法,这里需要留意的是一旦请求了 .unlimited 的 demand 之后,便无法再调整了,也就是说只要 publisher 不断地产生新的值,Sink 就会持续地接收到新值,直至被 cancel。
Publisher 有两个 sink 扩展方法:
sink(receiveCompletion:receiveValue:)
:两个闭包的含义和用法不必多解释;sink(receiveValue:)
:只有当 Publisher 的 FAIlure associated type 是 Never 时才可以使用该方法。
Assign
Assign subscriber 会将接收到的值赋值给一个类对象的属性或者一个另一个 Published publisher 上,它对 publisher 的 demand 也是 .unlimited。
assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root)
:- 因为 keyPath 类型是 ReferenceWritableKeyPath,所以 object 只能是一个类实例;
- 注意该 Assign subscriber 会强持有 object 对象,除非上游发布了一个 completion。
assign(to published: inout Published<Self.Output>.Publisher)
:- 使用该 subscriber 可以将上游的值通过一个 @Published 修饰的属性重新发布;
- 该方法没有返回值,当关联的 Published 实例析构时会自动地 cancel 掉订阅。
Publisher 的 operators
Combine 还为 publisher 添加了许多扩展方法,称为 operators,它们返回的也是一个 Publisher,因此可以进行链式调用。每个 operator 接收一个上游 publisher,处理转换上游发布的值,然后重新发布到下游。
具体的 operator 及其用法详见文档。
Connectable
常用的 Subject 如 sink(receiveValue:)
会在订阅到一个 publisher 时立即发起一个 unlimited demand,对应的 publisher 如果此时有值则会马上发布,但这时使用方并不一定准备好接收并处理数据。另一种情况是如果一个 publisher 期望有多个 subscribers,但由于每个 subscriber 订阅的时机不一样,有可能当第一个 subscriber 订阅时,publisher 就已经把值给发布出去了,这样当第二个 subscriber 订阅时,只会收到一个 completion。
Combine 提供一个 ConnectablePublisher
协议来支持手动控制开始发布值的时机,遵循该协议的 Publisher 只有在显示调用 connect()
方法之后,才会开始值发布的过程,在这之前,即使满足 publisher 发布值的条件,也不会进行发布。
使用 Publisher 的 makeConnectable()
operator 来将一个已有的 publisher 包装成一个 Publishers.MakeConnectable
实例,该实例便是一个 ConnectablePublisher,之后使用方便可在合适的时机调用其 connect() 方法来开启值的发布。
Combine 中有些 publisher 已经实现了 ConnectablePublisher 协议,如 Publishers.Multicast
,Timer.TimerPublisher
等,有时在一些使用这些 publisher 的简单场景中,显式调用 connect()
反而显得繁琐,因此 ConnectablePublisher 又提供了一个 autoconnect()
operator,该操作符会在一个 subscriber 订阅它时立刻调用 connect 方法。
Demand and Back Pressure
在 Combine 中,一个 Publisher 只有在被 Subscriber 订阅并且发起要求的时候才会产生值。Subscriber 有下面两种方式来发起要求:
- 通过调用 Subscription 对象的
request(_ demand:)
方法,Subscription 对象在发起订阅时由receive(subscription:)
方法传入; - 每次
receive(_ input:)
调用时,可以返回一个新的 Demand。
Demand 表示 subscriber 需要多少值,Combine 提供的 API 里面,有 .none
, .unlimited
, 以及指定具体数目的 .max(Int)
。Demand 是可加的,如一个 subscriber 要求了 2 个值,然后又要求了 .max(3),则其订阅的 publisher 现在共有 5 个未满足的值,每当 publisher 发布一个值,其为满足的 demand 便随之减 1,这也是唯一使其为满足的 demand 数值减少的方式,因为 Demand 不支持负值。而一旦 subscriber 要求了 .unlimited 的 demand,则后续就无法继续再同 publisher 协商了。
自定义 Subscriber
内置的 Subscriber Sink 和 Assign 都是一开始就请求了 .unlimited 的 demand,如果需要精细化地控制 publisher 发送值的 rate,可以实现一个自定义的 Subscriber,如:
class MySubscriber: Subscriber {
typealias Input = Date
typealias Failure = Never
var subscription: Subscription?
func receive(subscription: Subscription) {
print("published received")
self.subscription = subscription
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
subscription.request(.max(3))
}
}
func receive(_ input: Date) -> Subscribers.Demand {
print("\(input) \(Date())")
return Subscribers.Demand.none
}
func receive(completion: Subscribers.Completion<Never>) {
print ("--done--")
}
}
自定义 Subscriber 的要点即是实现协议约束的三个方法,自定义的 subscriber 可以自己持有传入的 Subscription 对象,以实现精细化的控制。这种由 subscriber 来控制流速的行为称为 back pressure。
Back-Pressure 操作符
除了自定义 Subscriber,Combine 也提供了一些操作符给内置的 subscriber 使用以协助控制流速,这些操作符内部实现一些缓存相关的逻辑:
-
buffer:最大缓存一定数目的值,超出后丢弃或抛出错误;
-
debounce:设定一个 dueTime,假设时间为 t0 上游发布一个值,这时 debounce 不会立刻重新发布,而是创建一个重发布任务在 t0 + dueTime 之后执行(注意这里的任务是为了便于理解抽象出的概念,不代表具体实现真正地创建了一个任务,不过有可能确实是这样实现的);但如果在这期间,在时间 t1 时上游又发布了一个值,则之前的延时任务会被丢弃,会重新创建一个任务在 t1 + dueTime 之后才会重新发布,如果在这期间上游又发送了一个值,则以此类推。如:
let bounces:[(Int,TimeInterval)] = [ (0, 0), (1, 0.25), // 0.25s interval since last index (2, 1), // 0.75s interval since last index (3, 1.25), // 0.25s interval since last index (4, 1.5), // 0.25s interval since last index (5, 2) // 0.5s interval since last index ] let subject = PassthroughSubject<Int, Never>() cancellable = subject .debounce(for: .seconds(0.5), scheduler: RunLoop.main) .sink { index in print ("Received index \(index)") } for bounce in bounces { DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) { subject.send(bounce.0) } } // Prints: // Received index 1 // Received index 4 // Received index 5 // Here is the event flow shown from the perspective of time, showing value delivery through the `debounce()` operator: // Time 0: Send index 0. (republish task0 at: 0.5) // Time 0.25: Send index 1. (task0 is discarded, republish task1 at 0.25 + 0.5 = 0.75) // Time 0.75: Debounce period ends, publish index 1. (execute task1) // Time 1: Send index 2. (republish task2 at: 1 + 0.5 = 1.5) // Time 1.25: Send index 3. (task2 is discarded, republish task3 at 1.25 + 0.5 = 1.75) // Time 1.5: Send index 4. (task3 is discarded, republish task4 at 1.5 + 0.5 = 2.0) // Time 2: Debounce period ends, publish index 4. Also, send index 5. (execute task4. republish task5 at: 2 + 0.5 = 2.5) // Time 2.5: Debounce period ends, publish index 5. (execute task5)
-
throttle:设定一个 interval,每次达到时间时,会检查这一小段时间内有无值被发布,如果有的话,根据设定的 latest 参数决定将最新或最旧的值发布到下游。
-
collect:从上游接收到值时,先搜集起来,超过给定的数目或者超过给定的时间间隔之后,再把所有搜集到的值发给下游。
许多人经常搞不清 debounce 和 throttle 的区别,从上面的解释可以很清楚地看出二者的机制和差异,throttle 很稳定地定期发布一次(如有值可发布),而 debounce 如果上游频繁地发布值的话,可能要等好久才会发布一次,这也正是 debounce 的作用,比如在输入框输入文字的场景。
开源实现:OpenCombine
Apple 家的新东西都有一个特点:只能在比较新的操作系统版本上使用,比如 Combine 要求 iOS 13 以上才能使用,而且一些 API 更新可能会要求更新的 OS 版本。然而有位大佬 Sergej Jaskiewicz 开发了 Combine 的开源实现:OpenCombine,完全兼容 Combine 的 API,可以运行在老的 iOS 和 macOS 版本上,甚至支持 Windows、Linux 和 WASM。
我们来简单看一下内置的 Publisher 之一的 PassthroughSubject 的开源实现。
先从订阅流程看起,可以结合上面的图作为参照。首先是 Publisher 的扩展方法 subscribe(_ subscribe)
,其实就是简单地调用了一下具体 Publisher 类型的协议约束方法,传入参数 subscriber: receive(subscriber: subscriber)
。PassthroughSubject 的协议方法实现为:
public func receive<Downstream: Subscriber>(subscriber: Downstream)
where Output == Downstream.Input, Failure == Downstream.Failure
{
lock.lock()
if active {
let conduit = Conduit(parent: self, downstream: subscriber) // a.
downstreams.insert(conduit) // b.
lock.unlock()
subscriber.receive(subscription: conduit) // c.
} else {
let completion = self.completion!
lock.unlock()
subscriber.receive(subscription: Subscriptions.empty) // d.
subscriber.receive(completion: completion) // e.
}
}
先看 c 处,最终调用了 subscriber 的 receive(subscription:)
,传入的 subscription 对象在 a 处创建,在 b 处被 publisher 插入到自己内部的一个 downstreams 数组中,该对象传给 subscriber 之后也会被 subscriber 对象持有,用于向 publisher 要求 demand、执行 cancel。
我们上面多次介绍到 Subscription 类型,但 Combine 并没有提供具体的实现类型,因为它其实是某个具体 publisher 的实现的一部分,这里的 Conduit 便是 PassthroughSubject 的 Subscription 实现类。我们接着看 PassthroughSubject 的另一个协议约束方法:
// PassthroughSubject 的 send(_ input:) 实现
public func send(_ input: Output) {
lock.lock()
guard active else {
lock.unlock()
return
}
let downstreams = self.downstreams
lock.unlock()
for conduit in downstreams {
conduit.offer(input)
}
}
// Conduit 的 offer(_ output:) 实现
override func offer(_ output: Output) {
lock.lock()
guard demand > 0, let downstream = downstream else {
lock.unlock()
return
}
demand -= 1 // a.
lock.unlock()
downstreamLock.lock()
let newDemand = downstream.receive(output) // b.
downstreamLock.unlock()
guard newDemand > 0 else { return }
lock.lock()
demand += newDemand // c.
lock.unlock()
}
PassthroughSubject 的 send(_ input:)
会调用 Conduit 的 offer(_ output:)
,在 offer 方法中,a 处将 demand 减 1,b 处调用 subscriber 的 receive(_ input:) 方法,c 处再将返回的 newDemand 加到现有 demand 上面,和前文描述的逻辑完全一致。
其他方法实现、Subscriber 的实现、各种操作符的实现可以直接翻源码,了解各个协议之间的约束之后非常简洁易读。