Как безболезненно мигрировать с RxJava на Kotlin Coroutines+Flow
Корутины позволяют тратить меньше системных ресурсов, чем RxJava. Кроме того, поскольку они являются частью Kotlin, Android предоставляет удобные инструменты для работы с ними — например, viewModelScope и lifecycleScope. В этой статье мы рассмотрим use cases, распространенные в Rx Java, и то, какие возможности вы получите при переходе на Flow.
Для начала сравним, как происходит переключение потоков в RxJava и Flow.
RxJava
Observable.create<Int> { emitter ->
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}
.observeOn(Schedulers.io())
.map {
printThread(«map1 value = $it»)
it + it
}
.doOnNext { printThread(«after map1 -> $it») }
.observeOn(Schedulers.computation())
.map {
printThread(«map2 value = $it»)
it * it
}
.doOnNext { printThread(«after map2 -> $it») }
.observeOn(Schedulers.single())
.subscribe (
{
printThread(«On Next $it»)
},
{
printThread(«On Error»)
},
{
printThread(«On Complete»)
}
)
При этом сложение выполняется в IO шедулере, умножение — в computation шедулере, а подписка — в single.
Flow
Повторим этот же пример для Flow:
launch {
flow {
emit(1)
emit(2)
emit(3)
}
.map {
printThread(«map1 value = $it»)
it + it
}
.onEach { printThread(«after map1 -> $it») }
.flowOn(Dispatchers.IO)
.map {
printThread(«map2 value = $it»)
it * it
}
.onEach { printThread(«after map2 -> $it») }
.flowOn(Dispatchers.Default)
.onCompletion { printThread(«onCompletion») }
.collect { printThread(«received value $it») }
}
В результате можно отметить следующее:
1) observeOn переключает поток, в котором будут выполняться последующие операторы, а flowOn определяет диспетчер выполнения для предыдущих операторов.
2) Метод collect() будет выполняться в том же диспетчере, что и launch, а emit данных будет происходить в Dispatchers.IO. Метод subscribe() будет выполняться в Schedulers.single(), потому что идет после него.
3) Flow также имеет стандартные методы создания flow:
- flowOf(): в примере можно было бы использовать Observable.fromArray(1, 2, 3) и flowOf(1, 2, 3)
- extenstion function asFlow(), который превращает Iterable, Sequence, массивы во flow
- билдер flow { }
4) В данном примере Flow, как и RxJava, представляет собой cold stream данных: до вызова методов collect() и subscribe() никакой обработки происходить не будет.
5) В RxJava нужно явно вызывать emitter.onComplete(). В Flow метод onCompletion() будет автоматически вызываться после окончания блока flow { }.
6) При попытке сделать эмит данных из другого диспетчера, с помощью withContext, например, приведет к ошибке.
Exception in thread «main» java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@5df83c81, BlockingEventLoop@3383bcd],
but emission happened in [DispatchedCoroutine{Active}@7fbc37eb, Dispatchers.IO].
Please refer to ‘flow’ documentation or use ‘flowOn’ instead
В RxJava метод Observable.subscribe() возвращает объект Disposable. Он служит для отписки от источника данных, когда новые порции данных от текущего источника уже не нужны. Важно иметь доступ к этому объекту, чтобы вовремя отписываться и избегать утечек.
Для Flow ситуация схожа: так как метод collect() — suspend метод, он может быть запущен только внутри корутины. Следовательно, отписка от flow происходит в момент отмены Job корутины:
val job = scope.launch {
flow.collect { }
}
job.cancel() // тут произойдет отписка от flow
В случае же использования viewModelScope об этом заботиться не нужно: все корутины, запущенные в рамках этого scope, будут отменены, когда ViewModel будет очищена, т.е. вызовется метод ViewModel.onCleared(). Для lifecycleScope ситуация аналогична: запущенные в его рамках корутины будут отменены, когда соответствующий Lifecycle будет уничтожен.
В RxJava есть метод onError(), который будет вызван в случае возникновения какой-либо ошибки и на вход получит данные о ней. В Flow тоже есть такой метод, он называется catch(). Рассмотрим следующий пример.
RxJava
Observable.fromArray(1, 2, 3)
.map {
val divider = Random.Default.nextInt(0, 1)
it / divider
}
.subscribe(
{ value ->
println(value)
},
{ e ->
println(e)
}
)
При возникновении ArithmeticException будет срабатывать onError(), и информация об ошибке будет напечатана в консоль.
Flow
flowOf(1, 2, 3)
.map {
val divider = Random.Default.nextInt(0, 1)
it / divider
}
.catch { e -> println(e) }
.collect { println(it) }
Этот же пример, переписанный на flow, можно представить с помощью catch { }, который под капотом имеет вид привычной конструкции try/catch.
Операторы RxJava onErrorResumeNext и onErrorReturn можно представить в виде:
catch { emit(defaultValue) } // onErrorReturn
catch { emitAll(fallbackFlow) } // onErrorResumeNext
В Flow, как и в RxJava, есть операторы retry и retryWhen, позволяющие повторить операции в случае возникновения ошибки.
Рассмотрим наиболее распространенные операторы RxJava и найдем их аналоги из Flow.
Подробнее с операторами Flow можно познакомиться здесь.
Некоторые операторы Flow (например, merge) помечены как экспериментальные или отсутствующие. Их api может измениться (как, например, для flatMapMerge), или их могут задепрекейтить, то есть они станут недоступны. Это важно помнить при работе с Flow. При этом отсутствие некоторых операторов компенсируется тем, что flow всегда можно собрать в список и работать уже с ним. В стандартной библиотеке Kotlin есть множество функций для работы со списками.
Также у Flow отсутствуют отдельные операторы троттлинга и другие операторы, которые работают с временными промежутками. Это можно объяснить «молодостью» библиотеки, а также тем, что, согласно словам разработчика Kotlin Романа Елизарова, команда Jetbrains не планирует «раздувать» библиотеку множеством операторов, оставляя разработчикам возможность компоновать нужные операторы самостоятельно, предоставляя им удобные «блоки» для сборки.
Backpressure
Backpressure – это ситуация, когда производитель данных выдает элементы подписчику быстрее, чем тот их может обработать. Готовые данные, в ожидании того, как подписчик сможет их обработать, складываются в буфер Observable. Проблема такого подхода в том, что буфер может переполниться, вызвав OutOfMemoryError.
В ситуациях, когда возможен backpressure, для Observable нужно применять различные механизмы для предотвращения ошибки MissingBackpressureException.
После появления в RxJava 2 Flowable произошло разделение на источники данных с поддержкой backpressure (Flowable) и Observable, которые теперь не поддерживают backpressure. При работе с RxJava требуется правильно выбрать тип источника данных для корректной работы с ним.
У Flow backpressure заложена в Kotlin suspending functions. Если сборщик flow не может принимать новые данные в настоящий момент, он приостанавливает источник. Возобновление происходит позднее, когда сборщик flow снова сможет получать данные. Таким образом, в Kotlin нет необходимости выбирать тип источника данных, в отличие от RxJava.
Hot streams
«Горячий» источник рассылает новые порции данных по мере их появления, вне зависимости от того, есть ли активные подписчики. Новые подписчики получат не всю сгенерированную последовательность данных с самого начала, а только те данные, что были сгенерированы после подписки. В этом отличие горячих и холодных источников: холодные не начинают генерацию данных, пока нет хотя бы одного подписчика, а новые подписчики получают всю последовательность.
Горячие источники данных полезны, например, при подписке на события от View: при этом нужно получать только новые события, нет смысла обрабатывать заново все пользовательские действия. Также мы не можем запретить пользователю нажимать на экран до тех пор, пока мы не будем готовы обрабатывать его действия. Для обработки событий от View в реактивном виде существует библиотека RxBinding, которая имеет поддержку RxJava3.
В Kotlin Flow есть свои возможности для работы с горячим flow, который производит данные вне зависимости от наличия подписчиков и выдает новые данные одновременно всем имеющимся подписчикам. Для этого можно использовать Channel, SharedFlow, чтобы отправлять новые порции данных одновременно всем подписанным сборщикам.
Кстати, для Flow тоже есть отличная библиотека для обработки событий от View – Corbind. В ней есть поддержка большинства Android-виджетов.
RxJava Subjects
Subject в RxJava – это специальный элемент, который одновременно является источником данных и подписчиком. Он может подписаться на один или несколько источников данных, получать от них порции данных и отдавать их своим подписчикам.
Аналог Subject в Flow – это Channel, в частности, BroadcastChannel. Существуют различные варианты их реализации: с буферизацией данных (ArrayBroadcastChannel), с хранением только последнего элемента (ConflatedBroadcastChannel). Но важно помнить, что, так как библиотека Kotlin Flow молода и постоянно развивается, ее части могут меняться. Так получилось и в случае с BroadcastChannel: в своей статье Роман Елизаров сообщил, что, начиная с версии 1.4 будет предложено лучшее решение – shared flows, а BroadcastChannel ждет deprecation в ближайшем будущем.
В данной статье мы сравнили RxJava и Kotlin Flow, рассмотрели их схожие моменты и аналоги частей RxJava в Flow. При этом Flow хорошо подойдет в качестве инструмента для обработки событий в реактивном стиле в проектах на Kotlin, использующих паттерн MVVM: благодаря viewModelScope и lifecycleScope запускать корутины можно быстро и удобно, не боясь утечек. В связи с тем, что популярность Kotlin и его инструментов растет, а также этот язык является приоритетным для разработки Android-приложений, в ближайшие годы связка Coroutines+Flow может заменить RxJava – скорее всего, новые проекты будут написаны именно с помощью нее. На первый взгляд, миграция с RxJava на Flow не представляется болезненной, потому что в обоих случаях есть похожие операторы и разделение общей концепции Reactive streams. Кроме того, Kotlin имеет достаточно большое комьюнити, которое постоянно развивается и помогает разработчикам в изучении новых возможностей.
А вы готовы мигрировать на корутины? Приглашаем поделиться мнениями!