
suspend fun processProject(projectName: String): ProjectDetails { val projectId = getProjectId(projectName) val details = getProjectDetails(projectId) val tasks = getRelatedTasks(projectId) return ProjectDetails( projectId = projectId, details = details, tasks = tasks ) }
flatMap calls, but since we need data from previous steps, our calls must be nested. This is how it looks in Reactor:fun processProject(projectName: String): Mono<ProjectDetails> { return getProjectId(projectName) .flatMap { projectId -> getProjectDetails(projectId) .flatMap { details -> getRelatedTasks(projectId) .map { tasks -> ProjectDetails( projectId = projectId, details = details, tasks = tasks ) } } } }
coroutineScope and async/await:suspend fun processProject(projectName: String): ProjectDetails = coroutineScope { val projectId = getProjectId(projectName) val details = async { getProjectDetails(projectId) } val tasks = async { getRelatedTasks(projectId) } return ProjectDetails( projectId = projectId, details = details.await(), tasks = tasks.await() ) }
zip to combine the results of the two asynchronous calls:fun processProject(projectName: String): Mono<ProjectDetails> { return getProjectId(projectName) .flatMap { projectId -> Mono.zip( getProjectDetails(projectId), getRelatedTasks(projectId) ).map { tuple -> ProjectDetails( projectId = projectId, details = tuple.t1, tasks = tuple.t2 ) } } }
async/await is intuitive and widely used in many programming languages, suspending functions act just like regular functions, and everything is not only readable, but also easy to debug. In contrast, Reactor completely changes the way we write code, making it more complex and harder to read. It requires us to learn planty of new operators, concepts, and patterns that are not intuitive for most developers.Schedulers in Reactor, and using them is not simple. Consider this simple code in Kotlin Coroutines:fun onCreate() { viewModelScope.launch { val user = fetchUser() displayUser(user) val posts = fetchPosts(user) displayPosts(posts) } }
fun onCreate() { disposables += fetchUser() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext { displayUser(it) } .flatMap { fetchPosts(it).subscribeOn(Schedulers.io()) } .observeOn(AndroidSchedulers.mainThread()) .subscribe({ displayPosts(it) }) }
subscribeOn and observeOn so many times to juggle beteen UI and IO threads. Those operations needed to be used very precisely, otherwise we would end up blocking the UI thread or running the code on the wrong thread. We also needed to explicily add this disposable to the disposables collection, otherwise we would leak memory. In Kotlin Coroutines, we just use viewModelScope.launch, and everything is handled for us. We also needed to use operators like doOnNext and flatMap, which are not intuitive for most developers. In Kotlin Coroutines, we just use regular function calls, which are much easier to understand.class ChatService( private val messageRepository: MessageRepository ) { private val newMessagesFlow = MutableSharedFlow<Message>() suspend fun sendMessage(message: String) { messageRepository.addMessage(message) newMessagesFlow.emit(message) } suspend fun markMessagesAsRead(messageIds: List<String>) { messageRepository.markAsRead(messageIds) } // Simplified implementation fun observeMessages(userId: String): Flow<Message> = newMessagesFlow .filter { it.receiverId == userId } .onStart { emitAll(messageRepository.getUnreadMessages(userId).asFlow()) } .distinctUntilChanged() }
class ChatService( private val messageRepository: MessageRepository ) { private val newMessagesSink = Sinks.many() .multicast() .onBackpressureBuffer<Message>() private val newMessagesFlux = newMessagesSink.asFlux() .share() fun sendMessage(messageText: String): Mono<Message> = messageRepository.addMessage(messageText) .doOnNext { newMessagesSink.tryEmitNext(it) } fun markMessagesAsRead(messageIds: List<String>): Mono<Void> = messageRepository.markAsRead(messageIds) fun observeMessages(userId: String): Flux<Message> = Flux.concat( messageRepository.getUnreadMessages(userId), newMessagesFlux ) .filter { it.receiverId == userId } .distinctUntilChanged() }
filter method is implemented under the hood with just a couple of lines of code, and it is very easy to understand. Reactor Flux or Mono and its functions are complex like hell, and we can only understand them from their documentation or other documents describing their behavior.// Flow's filter operator implementation after inlining transform fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T> = flow { collect { if (predicate(it)) { emit(it) } } }
// Reactor's filter operator implementation public final Flux<T> filter(Predicate<? super T> p) { if (this instanceof Fuseable) { return onAssembly(new FluxFilterFuseable<>(this, p)); } return onAssembly(new FluxFilter<>(this, p)); } // To show it, I would need to insert not only the implementation of `onAssembly` // but also the implementation of `FluxFilter` and `FluxFilterFuseable`, which // are both very long and complex classes.

FluxFilter implementation
Mono or Flux I could only see custom operators implemented using other operators, because implementing something custom from scratch is too complex or even impossible.fun <T> Flow<T>.distinct(): Flow<T> = flow { val seen = mutableSetOf<T>() collect { if (seen.add(it)) emit(it) } }
// Collection processing val list = (1..100) .take(10) .filter { it % 2 == 0 } .map { it * 2 } .onEach { println(it) } // Flow processing val flow = (1..100).asFlow() .take(10) .filter { it % 2 == 0 } .map { it * 2 } .onEach { println(it) }
async/await for asynchronous operations. In more complex cases, they provide a powerful and easy-to-use and easy-to-extend Flow API that is based on Kotlin Coroutines, which offers performance benefits, structured concurrency, and good cancellation support.