Combine By Tutorials

Márcio Oliveira
15 min readJul 4, 2023

--

After a while without writing articles and changing my career again (yeah, now I am an iOS developer 📱), I think it was about time to write something cool in this fantastic stack (for those who are interested to know more about how it was my first significant career change, check this article).

Combine is a powerful reactive framework introduced at WWDC 2019 to natively compete with the popular RxSwift and ReactiveSwift using a declarative Swift API for processing values over time where these values can represent many kinds of asynchronous events. SwiftUI already uses Combine under the hood with types like ObservableObjects and property wrappers like @Published.

”Processing values over time”. What the heck does it mean?

Functional Reactive Programming (FRP) languages allow you to process values over time. Examples of these kinds of values include network responses, user interface events, and other types of asynchronous data.

For example:

  • Once a network response is received
  • I want to map its data to a JSON model
  • And assign it to my View.

The Basics

Combine uses a very common design pattern: Observer, where we have publishers (observables) and subscribers (observers). Publishers expose values that can change on which subscribers subscribe to receive all those updates. We have already seen this pattern in APIs like NotificationCenter:

let notification = Notification.Name("MyNotification")
let center = NotificationCenter.default

let observer = center.addObserver(forName: notification,
object: nil,
queue: nil) { notification in
print("Notification received!")
}

// Cancel subscription
center.removeObserver(observer)

With Combine, we can achieve a similar approach using the sink() method. Once we’ve created a publisher, we can then attach the subscribers to it:

import Combine

let observer = ["A", "B", "C", "D"].publisher.sink {
print($0)
}
// Outputs: A, B, C, D

The Foundation framework already contains a lot of extensions to work with Combine. The first example above could be written this way (I won’t be explicitly importing the Combine library anymore in the further examples for the sake of simplicity):

let notification = Notification.Name("MyNotification")

let publisher = NotificationCenter.default.publisher(for: notification)

let subscription = publisher.sink { _ in
print("Notification received!")
}

// Cancel subscription
subscription.cancel()

So why Combine?

⚙️ The transformation power!

With the various operators provided in the Combine framework, you can easily intercept and transform the stream of data that comes from the publishers. The operators decorate a publisher and return a new publisher that publishes the transformed data. Therefore, you can easily “combine” operators in the most convenient way to tackle a problem.

For example, let’s suppose we have a publisher of ordered numbers:

let publisher = (1...100).publisher

let subscription = publisher.sink {
print($0) // Outputs: 1, 2, 3, …, 100
}

Let’s suppose, we’re now only interested in even numbers. We could just add a filter operator:

let publisher = (1...100).publisher

let subscription = publisher
.filter { $0 % 2 == 0 } // filter even numbers
.sink {
print($0) // Outputs: 2, 4, 6, …, 100
}

And now we want the same, but with numbers greater than 50:

let publisher = (1...100).publisher

let subscription = publisher
.dropFirst(50) // drop first 50 numbers
.filter { $0 % 2 == 0 } // filter even numbers
.sink {
print($0) // Outputs: 52, 54, 56, …, 100
}

And finally, we’re only interested in a total sequence of 10 numbers:

let publisher = (1...100).publisher

let subscription = publisher
.dropFirst(50) // drop first 50 numbers
.prefix(20) // Take the next 20 numbers after the initial drop and discard the remaining
.filter { $0 % 2 == 0 } // filter even numbers
.sink {
print($0) // Outputs: 52, 54, 56, 58, 60, 62, 64, 66, 68, 70
}

Therefore, this is just a slice of what we can do with the available operators. These are some of the most popular ones:

  • map() — transforms all elements according to the provided closure
  • compactMap() — same of map(), but discharge resulting nil values
  • first(where:) — publish the first element that matches the provided closure criteria
  • filter() — filter the elements according to the provided closure
  • prepend() — add elements to the beginning of the stream
  • append() — add elements to the end of the stream
  • dropFirst() — skip the first n elements
  • dropLast() — skip the last n elements
  • drop(while:) — skip the elements until the provided condition is not satisfied (the next elements are not checked anymore)
  • prefix() — take the first n elements only
  • removeDuplicates() — publishes only elements that don’t match the previous one
  • merge() — combines elements from the current publisher with the ones from another publisher of the same type
  • replaceNil() — replace nil elements with a default value

A full list can be found here.

Combine in more practical examples

As we can see, Combine offers a powerful toolset to manipulate a stream of asynchronous values. But how can we use it in more practical ways for iOS apps?

➡️ Consuming JSON APIs

URLSession also provides publishers for data tasks where we can attach a decode operator and make the whole flow in a cleaner and declarative way, for example:

struct Post: Codable {
let title: String
let body: String
}

func getPosts() -> AnyPublisher<[Post], Error> {
guard let url = URL(string: "https://jsonplaceholder.typicode.com/posts") else {
fatalError("Invalid URL")
}

return URLSession.shared.dataTaskPublisher(for: url)
.map { $0.data }
.decode(type: [Post].self, decoder: JSONDecoder())
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}

let cancellable = getPosts().sink { print($0.count) }
// Outputs the number of posts fetched.

Let’s go step by step:

  1. getPosts() returns an AnyPublisher which is a more generic form of a Publisher that has no significant properties of its own and passes through elements and completion values from its upstream publisher (in this case, the one from the URLSession ). The first type corresponds to the published data ([Post]) and the second one, is the error type. We’ll see more about AnyPublishers later.
  2. URLSession.shared.dataTaskPublisher(for: url) — Here the request is started and it returns a publisher that will publish a tuple with the data and response after the request completes.
  3. .map { $0.data } — Here we’re just interested in the data of the published tuple above. So we’re transforming the tuple into a single data element. We can also use a key path, this way: .map(\.data).
  4. .decode(type: [Post].self, decoder: JSONDecoder()) — Here we’re transforming the upstream data into a decoded list of Posts.
  5. .receive(on: DispatchQueue.main) — Here we’re just dispatching the results in the main queue, so subscribers won’t have to worry about it.
  6. .eraseToAnyPublisher() — Here we’re wrapping the resulting Publisher into an AnyPublisher (we’ll cover this in detail in the next sections).
  7. let cancellable = getPosts().sink { print($0.count) } — Here we call getPosts() and subscribe to the publisher to receive the fetched data. You might have noticed that now I’m calling the observer “cancellable”. We’ll dig into it later.

Assign results to a variable

Alternatively, we can just have a variable updated with the published values instead of using sink() with a closure to handle them, to make the code tidier. In the example above we have:

let cancellable = getPosts().sink { print($0.count) }

We could refactor it with the assign() method to keep a variable updated in the code:

var posts = [Post]() {
didSet { print(posts.count) }
}

let cancellable = getPosts().assign(to: \.posts, on: self)

Observable members

Still moving from the examples above, we could re-publish the values assigned to the posts member for other classes to consume. This is very common in the MVVM pattern, especially when working with SwiftUI, where we have ViewModels with observable members and views that observe those members and react to their changes. So, let’s suppose our code above is in a ViewModel that fetches posts from an API. We can add the @Published property wrapper so that a member becomes a publisher itself.

class PostsViewModel {
@Published private(set) var posts: [Post]

init(service: PostService) {
service.getPosts().assign(to: &$posts)
}
}

💡 Note: you might have noticed that the assign() signature is a little bit different from the previous example. Don’t worry, we’ll dig into it in the next sections.

A SwiftUI view can bind this ViewModel with its own property wrappers (in this case our ViewModel needs to conform to ObservableObject protocol:

class PostsViewModel: ObservableObject { … }

//—---------------

Import SwiftUI

struct ContentView: View {
@ObservedObject var viewModel = PostsViewModel()

var body: some View {
List {
ForEach(viewModel.posts, id: \.self) { post in
Text(post.title)
}
}
}
}

And, of course, we can use the already-known sink() to create this binding as well in other places. In this case, we have to access the publisher member with a special $ operator:

viewModel.$posts.sink { print($0) }

➡️ Testing

Using Combine in tests also helps to set expectations that should be fulfilled only when a variable in the SUT changes. Still, in the same example, let’s test our PostsViewModel initialization:

func testGetPostsWhenViewModelIsInitialized() {
let mockService = MockPostService()
mockService.postsResponse = [Post()]

var actualPosts: [Post]?
let expectation = expectation(description: "init expectation")
let cancellable = PostsViewModel(service: mockService)
.$posts
.sink {
actualPosts = $0
expectation.fulfill()
}
waitForExpectations(timeout: 1)

XCTAssertEqual(actualPosts, [Post()])
}

➡️ Handling UI events

We can use Combine for capturing and intercepting events in the UI. A very common use case is when we’re typing something in a text field and we wish to process the data while typing, for example, performing a search in some API. But it should be even better if we can do it only after some pause in the typing to avoid unnecessary processing and API requests.

The solution can be tacked by setting/canceling timers while observing text fields notifications, but with Combine, we can easily achieve the solution with the debounce() operator:

class ViewController: UIViewController {
@IBOutlet weak var postsTableView: UITableView!
@IBOutlet weak var searchTextField: UITextField!

private var webService: PostService = PostService()
private var cancellable: AnyCancellable?
private var posts = [Post]() {
didSet {
self.postsTableView.reloadData()
}
}

override func viewDidLoad() {
super.viewDidLoad()
setupTableView() // I won't do that ;-)
setupPublishers()
}

private func setupPublishers() {
let publisher = NotificationCenter.default.publisher(for: UITextField.textDidChangeNotification,
object: searchTextField)
cancellable = publisher
.compactMap {
($0.object as? UITextField)?.text?.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)
}.debounce(for: .milliseconds(500), scheduler: RunLoop.main)
.flatMap { text in
return self.webService.getPosts(with: text)
.catch { _ in Just([[Post]()]) }
.map { $0 }
}
.assign(to: \.posts, on: self)
}

// Setup TableView, delegates and data source
}

Again let’s go step by step here:

1) Here we’re just setting the service, the posts member that’ll refresh the table view every time is updated, publishers, and UI

@IBOutlet weak var postsTableView: UITableView!
@IBOutlet weak var searchTextField: UITextField!

private var webService: PostService = PostService()
private var cancellable: AnyCancellable?
private var posts = [Post]() {
didSet {
self.postsTableView.reloadData()
}
}

override func viewDidLoad() {
super.viewDidLoad()
setupTableView() // I won't do that ;-)
setupPublishers()
}

2) Here we’re creating a publisher for the textDidChangeNotification events while typing in the search field:

let publisher = NotificationCenter.default.publisher(for: UITextField.textDidChangeNotification,
object: searchTextField)

3) Here we’re setting the observer (cancellable) and discarding nil events. Also, we’re just adding some encoding in the typed text to make it better handled by the API query (e.g, replacing spaces with “%20”):

cancellable = publisher
.compactMap {
($0.object as? UITextField)?.text?.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)
}

4) Here we’re actually debouncing the input. This operator intercepts the upstream of text field events and only republishes the last one after 0.5 seconds on the main thread.

.debounce(for: .milliseconds(500), scheduler: RunLoop.main)

5) Here we’re invoking the service with the debounced text and retrieving its publisher with the results. The flatMap() operator is used to flatten this nested stream and publish only the post list result, otherwise, with the regular map() we would be just publishing the service’s publisher. Also, we’re using the catch() operator to handle possible errors from the service, but in this case, we’re just republishing an empty list when they happen. This is achieved with the Just publisher, that’s a special kind of Publisher that just publishes once.

.flatMap { text in
return self.webService.getPosts(with: text)
.catch { _ in Just([Post]()) }
.map { $0 }
}

6) Finally, here we’re updating the posts member with the search results and it’ll refresh the table view:

.assign(to: \.posts, on: self)

Digging into some concepts

So far I could show practical examples for you to get the hang of it. But let’s explore some of the concepts behind it and other possibilities, like creating our custom publishers or subscribers (and explain what the heck that cancellable thing is), etc.

➡️ Subscribers

As we’ve seen before, we can use the publisher sink() or assign() operators to subscribe to it.

  • sink() — We’ve been using it in most of the examples so far but without handling failures or completion signals because the receiveCompletion closure is optional. Let’s see it in detail now:
let subscription = publisher.sink { (completion) in
switch completion {
case .finished:
print("Received finished")
case .failure(let error):
print("Received error: \(error)")
}
} receiveValue: { value in
print("Received value: \(value)")
}

💡 Note: We’ll see in the publisher’s section below how a publisher sends those completion signals, but it’s important to know that when they send them, the subscription is canceled and further values sent by the publisher will be discarded.

  • assign(to:on:) — Assigns each element from a publisher to a property on an object.
class Foo {
var bar = "A"
}

var foo = Foo()

print(foo.bar) // "A"
["B"].publisher.assign(to: \.bar, on: foo) // Needs to use KeyPath expressions
print(foo.bar) // "B"
  • assign(to:) — Republishes elements received from a publisher, by assigning them to a property marked as a publisher.
class Foo {
@Published var bar = "A"
}

var foo = Foo()

let observer = foo.$bar.sink { print($0) } // observer is subscribed to foo.bar

["B"].publisher.assign(to: &foo.$bar) // this publisher updates foo.bar

// Outputs: A, B

AnyCancellables

Both sink() and assign() operators return a cancellable object where we can use this type to provide a “cancellation token” that makes it possible for a caller to cancel a publisher.

// This is new, but don't worry. We'll cover it below.
// You just need to know this is a special publisher where we can inject values to be published
let publisher = PassthroughSubject<String, Never>()

let subscription = publisher.sink { print($0) }

publisher.send("A")
publisher.send("B")
subscription.cancel()
publisher.send("C")

// Outputs: A, B

💡 Notes:

  • Cancellables are important to keep a strong reference of the subscription, otherwise, it’ll be immediately deallocated.
  • Cancellables also call its cancel() method on deinit() , so we don’t need to explicitly do it when the host object with subscriptions alive is deallocated.

— So, is that mean we should keep one cancellable for each subscription???

— Yes and No.

Luckily, we can create a set of cancellable and use the store() operator to manage it:

var cancellables = Set<AnyCancellable>()

publisher1.sink {...}.store(in: &cancellables)
publisher2.sink {...}.store(in: &cancellables)
publisher3.sink {...}.store(in: &cancellables)
publisher4.sink {...}.store(in: &cancellables)

This is enough to keep strong references for all of the subscriptions and all of them will be automatically canceled when the set is deallocated.

Creating custom subscribers

We can create our custom subscribers by conforming to the Subscriber protocol:

class StringSubscriber: Subscriber {
typealias Input = String
typealias Failure = Never

func receive(subscription: Subscription) {
print("Received subscription")
subscription.request(.max(3)) // backpressure
}

func receive(_ input: String) -> Subscribers.Demand {
print("Received value: \(input)")
return .none
}

func receive(completion: Subscribers.Completion<Never>) {
print("Completed")
}
}

let publisher = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"].publisher
let subscriber = StringSubscriber()
publisher.subscribe(subscriber)

// Outputs:
Received subscription
Received value: A
Received value: B
Received value: C

💡 Notes:

  • The first method is called from the publisher’s side when we can use the received subscription reference to request values. In this case, we’re only interested in 3 values
  • The second method is called from the subscription’s side and just sends an input value. The returned demand is a value that increments the demand in the subscription class. As we don’t want it to increase, we return a none, which is the same as max(0). If we change to .unlimited we request as many values as the publisher can produce. Therefore, here we can control the demand on each value received if we just specify a new max(value).
  • The third method is also called from the subscription’s side and sends a completion object that notifies our subscriber that the subscription is over with a finished completion or a failure one. In this case, it’ll never fail because of the Never failure type and this method won’t even be called because we’re only requesting 3 elements from the publisher, which will keep publishing after it.

➡️ Publishers

As we’ve already covered, we can publish events in different ways:

  • use the .publisher extension for Sequence types
  • use convenience publishers like Just , Future , etc
  • use @Published property wrapper
  • Use Subject

Subjects

Subjects are publishers that expose a method for outside callers to publish elements. For example, we can create a PassthroughSubject<Output, Failure>:

let subject = PassthroughSubject<String, Never>() // it'll publish string elements and never fail

let cancellable = subject.sink { print($0) }

subject.send("A")
subject.send("B")

// Outputs in the print above: A, B

You can also use the CurrentValueSubject<Output, Failure> that is very similar to the previous one, with the difference that it should be initialized with an initial value (that’s immediately received on subscription) and publishes updates whenever this value changes or the send() is called (that also updates the value).

let subject = CurrentValueSubject<String, Never>("A")

let cancellable = subject.sink { print($0) }

subject.send("B")
subject.value = "C"

// Outputs in the print above: A, B, C

Publishers can only send a completion signal to a subscriber (it might be a finished or error signal), so no more events will be received.

subject.send("A")
subject.send(completion: .finished)
subject.send("B") // subscribers won't receive this.

AnyPublisher

As we have already seen in previous examples, AnyPublishers are a more generic form of publisher that doesn’t expose its methods or values, so outside callers can’t explicitly call its send() method, for example. Also, we can use the eraseToAnyPublisher() operator to wrap a subject into an AnyPublisher.

let subject = PassthroughSubject<String, Never>()

let publisher = subject.eraseToAnyPublisher()

let observer = publisher.sink { print($0) }

// We can call send() on the subject and the publisher will just re-publish it to the subscriber
subject.send("A") // outputs "A" in the print above

// But we can't do it on the publisher. The send() is not accessible anymore
publisher.send("B") // it'll fail to compile

@Publisher

Marking a property with it will create a publisher of this type and we can access it with the $ operator.

class Model {
@Published var name = "A"
}

let model = Model()
let observer = model.$name.sink{ print($0) }

model.test = "B"

// Outputs: A, B

💡 Notes:

  • As you might have noticed, subscribers receive the initial value on the subscription
  • The @Published attribute is class constrained. Use it with properties of classes, not with non-class types like structures.
  • When the property changes, publishing occurs in the property’s willSet block, meaning subscribers receive the new value before it’s actually set on the property. That’s something that should take into consideration, especially on test assertions.

Future

This is a publisher that eventually produces a single value and then finishes or fails. This is a very interesting publisher that allows us to retrofit existing asynchronous methods to chain them in our declarative code. It uses the same concept of promises from other asynchronous languages.

Let’s suppose we have an image processor that asynchronous returns an image in a completion block:

struct ImageProcessor {
func process(
_ image: UIImage,
then handler: @escaping (Result<UIImage, Error>) -> Void
) {
// Process the image and call the handler when done
...
}
}

We can create an extension to wrap this call with a Future publisher:

extension ImageProcessor {
func process(_ image: UIImage) -> Future<UIImage, Error> {
Future { promise in
process(image, then: promise)
}
}
}

Note that the example above takes the benefit of the original handler using a Result type. Just to illustrate, here’s what the above implementation would’ve looked like if we were to instead use a dedicated completion handler closure and then manually pass its result to our promise:

extension ImageProcessor {
func process(_ image: UIImage) -> Future<UIImage, Error> {
Future { promise in
process(image) { result in
promise(result)
}
}
}
}

Finally, we can now chain our new extension method in a cool Combine approach:

processor.process(image)
.replaceError(with: .errorIcon)
.map { $0.withRenderingMode(.alwaysTemplate) }
.receive(on: DispatchQueue.main)
.assign(to: \.image, on: imageView)
.store(in: &cancellables)

Others

Check other convenience publishers like Just, Deferred, Empty, etc., here.

Again, remember that all of the transformation operators return a new publisher that republishes the previous one.

Conclusion

As we can see, Combine is a powerful framework that takes the benefit of a declarative API to easily tackle problems by combining operators. I really recommend watching the WWDC videos that also give good insights and practical examples to demonstrate the benefits of it. But, again, this is not a silver bullet to tackle all of the problems, just one more tool to make the code simpler and maintainable.

Personally, I can see great benefits for improving features with timers (polling, for example), setting expectations for tricky asynchronous tests, and orchestrating multiple API requests or UI events.

🎬 Videos:

🌎 Sources:

--

--