
class NetworkUserRepository( private val api: UserApi, ) : UserRepository { override suspend fun getUser(): User = api.getUser().toDomainUser() } class NetworkNewsService( private val newsRepo: NewsRepository, private val settings: SettingsRepository, ) { suspend fun getNews(): List<News> = newsRepo .getNews() .map { it.toDomainNews() } suspend fun getNewsSummary(): List<News> { val type = settings.getNewsSummaryType() return newsRepo.getNewsSummary(type) } }
coroutineScope and use async builder inside it to start each process that should run asynchronously.suspend fun produceCurrentUser(): User = coroutineScope { val profile = async { repo.getProfile() } val friends = async { repo.getFriends() } User(profile.await(), friends.await()) }
produceCurrentUserSeq and produceCurrentUserPar functions below, the only important difference is that the first one is sequential, while the second one starts two parallel processes[^401_8].suspend fun produceCurrentUserSeq(): User { val profile = repo.getProfile() val friends = repo.getFriends() return User(profile, friends) } suspend fun produceCurrentUserPar(): User = coroutineScope { val profile = async { repo.getProfile() } val friends = async { repo.getFriends() } User(profile.await(), friends.await()) }
async function; however, the same result can also be achieved if we start only one process using async and run the second one on the same coroutine. The following implementation of produceCurrentUserPar will have practically the same behavior as the previous one. Which option should be preferred? I think that most developers will prefer the first one, arguing that using async for each process we want to execute in parallel makes our code more readable. On the other hand, some developers will prefer the second option, arguing that it is more efficient because it uses fewer coroutines and creates fewer objects. It is up to you to choose which option you prefer.suspend fun produceCurrentUserPar(): User = coroutineScope { val profile = async { repo.getProfile() } val friends = async { repo.getFriends() } User(profile.await(), friends.await()) }
suspend fun getArticlesForUser( userToken: String?, ): List<ArticleJson> = coroutineScope { val articles = async { articleRepository.getArticles() } val user = userService.getUser(userToken) articles.await() .filter { canSeeOnList(user, it) } .map { toArticleJson(it) } }
async together with collection processing functions to start an asynchronous process for each list element. In such cases, it is good practice to await results using the awaitAll function.suspend fun getOffers( categories: List<Category> ): List<Offer> = coroutineScope { categories .map { async { api.requestOffers(it) } } .awaitAll() .flatten() }
Flow and then use flatMapMerge with the concurrency parameter, which specifies how many concurrent calls you will send[^401_10].fun getOffers( categories: List<Category> ): Flow<List<Offer>> = categories .asFlow() .flatMapMerge(concurrency = 20) { suspend { api.requestOffers(it) }.asFlow() // or flow { emit(api.requestOffers(it)) } }
mapAsync, with a concurrency parameter, to limit the number of concurrent calls.suspend fun getOffers( categories: List<Category> ): List<Offer> = categories .mapAsync { api.requestOffers(it) } .flatten() suspend fun <T, R> Iterable<T>.mapAsync( concurrency: Int, transformation: suspend (T) -> R ): List<R> = coroutineScope { val semaphore = Semaphore(concurrency) this@mapAsync .map { async { semaphore.withPermit { transformation(it) } } } .awaitAll() }
coroutineScope, remember that an exception in any child coroutine will break the coroutine created by coroutineScope, cancel all its other children, and then throw an exception. This is the behavior we typically expect, but in some cases it doesn't suit us very well. When we want to start a number of concurrent processes that are considered independent, we should instead use supervisorScope, which ignores exceptions in its children[^401_11].suspend fun notifyAnalytics(actions: List<UserAction>) = supervisorScope { actions.forEach { action -> launch { notifyAnalytics(action) } } }
withTimeout or withTimeoutOrNull, both of which cancel their process if it takes longer than the time specified by the argument[^401_12]. I generally prefer withTimeoutOrNull because withTimeout throws CancellationException, which behaves differently from other exceptions.suspend fun getUserOrNull(): User? = withTimeoutOrNull(5000) { fetchUser() }
map, filter, or onEach, and occasionally less common functions like scan or flatMapMerge[^401_13].class UserStateProvider( private val userRepository: UserRepository ) { fun userStateFlow(): Flow<User> = userRepository .observeUserChanges() .filter { it.isSignificantChange } .scan(userRepository.currentUser()) { user, update -> user.with(update) } .map { it.toDomainUser() } }
merge, zip, or combine[^401_14].class ArticlesProvider( private val ktAcademy: KtAcademyRepository, private val kotlinBlog: KotlinBlogRepository, ) { fun observeArticles(): Flow<Article> = merge( ktAcademy.observeArticles().map { it.toArticle() }, kotlinBlog.observeArticles().map { it.toArticle() }, ) } class NotificationStatusProvider( private val userStateProvider: UserStateProvider, private val notificationsProvider: NotificationsProvider, private val statusFactory: NotificationStatusFactory, ) { fun notificationStatusFlow(): Flow<NotificationStatus> = notificationsProvider.observeNotifications() .filter { it.status == Notification.UNSEEN } .combine(userStateProvider.userStateFlow()) { notifications, user -> statusFactory.produce(notifications, user) } }
SharedFlow. A common way to do this is by using shareIn with a scope. To keep this flow active only when needed, use the WhileSubscribed option for the started parameter[^401_15].class LocationService( locationDao: LocationDao, scope: CoroutineScope ) { private val locations = locationDao.observeLocations() .shareIn( scope = scope, started = SharingStarted.WhileSubscribed(), ) fun observeLocations(): Flow<List<Location>> = locations }
[^401_10]: For details, see the Flow processing chapter, section
flatMapConcat, flatMapMerge, and flatMapLatest.[^401_11]: For details, see the Coroutine Scope Functions chapter, section supervisorScope.
[^401_12]: For details, see the Coroutine Scope Functions chapter, section withTimeout.
[^401_13]: For details, see the Flow processing chapter.
[^401_14]: For details, see the Flow processing chapter, section
merge, zip, and combine.[^401_15]: For details, see the SharedFlow and StateFlow chapter.