Wrapping Kotlin Flow with Swift Combine Publisher in a Kotlin Multiplatform project

Share on:

One area of “friction”, when developing an iOS client that uses Kotlin Multiplatform code, is the consumption of APIs from shared code that expose a Kotlin Flow. This has typically required providing APIs for starting to collect the flow (and passing data back to iOS client through provided closure/lambda) and then for cancelling the flow (through associated job). A related question then is how this process can be integrated in to particular reactive frameworks being used on iOS. One example of such a framework is RxSwift and Russell Wolf wrote an article (and published associated code ) last year that described how to manage process of integrating with that framework. I had started to use Swift’s Combine framework in a number of projects and this article describes exploration to use Russell’s approach with that.


Current Implementation

The example I’m going to use is the polling for the position of the International Space Station (ISS) that’s performed in shared code in the PeopleInSpace project. The following were the methods previously exposed from PeopleInSpaceRepository in the shared code.

var issPositionJob: Job? = null

fun startObservingISSPosition(success: (IssPosition) -> Unit) {
    issPositionJob = coroutineScope.launch {
        pollISSPosition().collect {
            success(it)
        }
    }
}

fun stopObservingISSPosition() {
    issPositionJob?.cancel()
}

These were invoked by following functions in our Swift View Model. Updates to the position are passed back to Swift code through provided closure and which turn updates issPosition variable.

@Published var issPosition = IssPosition(latitude: 0.0, longitude: 0.0)

func startObservingISSPosition() {
    repository.startObservingISSPosition(success: { data in
        self.issPosition = data
    })
}

func stopObservingISSPosition() {
    repository.stopObservingISSPosition()
}

In our SwiftUI code then we trigger starting and stopping observation of the position data in onAppear and onDisappear callbacks respectively.

.onAppear {
    self.peopleInSpaceViewModel.startObservingISSPosition()
}.onDisappear {
    self.peopleInSpaceViewModel.stopObservingISSPosition()
}

and then show the current position in UI then using the issPosition value published from the view model.

let issPosition = peopleInSpaceViewModel.issPosition
let issPositionString = String(format: "ISS Position = (%f, %f)", issPosition.latitude, issPosition.longitude )
HStack {
    Text(issPositionString)
}


Replacing with Swift Combine Publisher

Swift’s Combine framework, announced at WWDC 2019, is described as:

a declarative Swift API for processing values over time. These values can represent many kinds of asynchronous events. Combine declares publishers to expose values that can change over time, and subscribers to receive those values from the publishers.

Combine’s Publisher maps pretty closely to Kotlin Flows and as such I thought it would be interesting to see what would be involved in creating one that encapsulates access to the Kotlin Flows exposed from shared Kotlin Multiplatform code.

To start off with I added following KotlinNativeFlowWrapper class to shared code. This is pretty much same code that Russell used in his RxSwift example. In this project the flow is running on the main thread and then invoking “main-safe” suspend functions provided by Ktor. If the flow does need to run on a background thread then you’ll need to update this (as per Russell’s article) to invoke freeze() in appropriate places. (there’s more information about the Kotlin/Native concurrency model that makes this necessary in this article)

class KotlinNativeFlowWrapper<T>(private val flow: Flow<T>) {
    fun subscribe(
        scope: CoroutineScope,
        onEach: (item: T) -> Unit,
        onComplete: () -> Unit,
        onThrow: (error: Throwable) -> Unit
    ) = flow
        .onEach { onEach(it) }
        .catch { onThrow(it) }
        .onCompletion { onComplete() }
        .launchIn(scope)
}

We can then use that to wrap pollISSPosition()(this is the function that returns the flow and that’s currently consumed directly in Android client).

// invoked directly by Android client
fun pollISSPosition(): Flow<IssPosition> = flow {
    while (true) {
        val position = peopleInSpaceApi.fetchISSPosition().iss_position
        emit(position)
        delay(POLL_INTERVAL)
    }
}


// following invoked from iOS client
val iosScope: CoroutineScope = object : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = SupervisorJob() + Dispatchers.Main
}

fun iosPollISSPosition() = KotlinNativeFlowWrapper<IssPosition>(pollISSPosition())

With that in place we can now create IssPositionPublisher, our Combine Publisher. This in turn exposes IssPositionSubscription which calls subscribe() using the flow wrapper returned by repository.iosPollISSPosition() (and also using the iosScope coroutine scope also exposed from PeopleInSpaceRepository. We store the job returned and use that then to cancel the flow when the associated Combine Subscription is cancelled.

import Combine
import common


public struct IssPositionPublisher: Publisher {
    public typealias Output = IssPosition
    public typealias Failure = Never
    
    private let repository: PeopleInSpaceRepository
    public init(repository: PeopleInSpaceRepository) {
        self.repository = repository
    }

    public func receive<S: Subscriber>(subscriber: S) where S.Input == IssPosition, S.Failure == Failure {
        let subscription = IssPositionSubscription(repository: repository, subscriber: subscriber)
        subscriber.receive(subscription: subscription)
    }
    
    final class IssPositionSubscription<S: Subscriber>: Subscription where S.Input == IssPosition, S.Failure == Failure {
        private var subscriber: S?
        private var job: Kotlinx_coroutines_coreJob? = nil

        private let repository: PeopleInSpaceRepository

        init(repository: PeopleInSpaceRepository, subscriber: S) {
            self.repository = repository
            self.subscriber = subscriber
          
            job = repository.iosPollISSPosition().subscribe(
                scope: repository.iosScope,
                onEach: { position in
                    subscriber.receive(position!)
                },
                onComplete: { subscriber.receive(completion: .finished) },
                onThrow: { error in debugPrint(error) }
            )
        }
      
        func cancel() {
            subscriber = nil
            job?.cancel(cause: nil)
        }

        func request(_ demand: Subscribers.Demand) {}
    }
}

Now in our Swift view model we can call following which will update issPosition (this is same variable we used in original code) whenever there’s an update.

subscription = IssPositionPublisher(repository: repository)
    .assign(to: \.issPosition, on: self)

As with flows a benefit of having our stream of data represented by a Publisher like this is that we can start to say combine multiple sources or apply operators like map and flatMap. For example we can update our view model to include following so that formatted version of the information is exposed to the UI.

subscription = IssPositionPublisher(repository: repository)
    .map { position in String(format: "ISS Position = (%f, %f)", position.latitude, position.longitude ) }
    .assign(to: \.issPositionString, on: self)

Note that the code shown here has been pushed to PeopleInSpace repo.

Next Steps

At this point this is primarily a proof of concept. A more robust version would firstly need of course to build out proper error handling (e.g. mapping from any errors returned in onThrow to something exposed from the Publisher). It also feels like there could be possibility of automatically generating code from the original flow function in shared code (using perhaps some custom annotation)…generating both Kotlin wrapper function and also the Swift Publisher. Much of what’s described here also applies to “normal” suspend functions (as original article discussed in RxSwift case).


Featured in Kotlin Weekly Issue #232