
suspend modifier to make its request definition functions suspending instead of blocking.// Retrofit class GithubApi { @GET("orgs/{organization}/repos?per_page=100") suspend fun getOrganizationRepos( @Path("organization") organization: String ): List<Repo> }
suspend modifier to make its functions suspending, as well as Flow for observing table states.// Room @Dao interface LocationDao { @Insert suspend fun insertLocation(location: Location) @Query("DELETE FROM location_table") suspend fun deleteLocations() @Query("SELECT * FROM location_table ORDER BY time") fun observeLocations(): Flow<List<Location>> }
suspendCancellableCoroutine[^401_0]. When a callback is called, the coroutine should be resumed using the resume method on Continuation. If this callback function is cancellable, it should be cancelled inside the invokeOnCancellation lambda expression[^401_1].suspend fun requestNews(): News { return suspendCancellableCoroutine<News> { cont -> val call = requestNewsApi { news -> cont.resume(news) } cont.invokeOnCancellation { call.cancel() } } }
Result, and either resume our coroutine with Result.success or with Result.failure.suspend fun requestNews(): Result<News> { return suspendCancellableCoroutine<News> { cont -> val call = requestNewsApi( onSuccess = { news -> cont.resume(Result.success(news)) }, onError = { e -> cont.resume(Result.failure(e)) } ) cont.invokeOnCancellation { call.cancel() } } }
null value.suspend fun requestNews(): News? { return suspendCancellableCoroutine<News> { cont -> val call = requestNewsApi( onSuccess = { news -> cont.resume(news) }, onError = { e -> cont.resume(null) } ) cont.invokeOnCancellation { call.cancel() } } }
suspend fun requestNews(): News { return suspendCancellableCoroutine<News> { cont -> val call = requestNewsApi( onSuccess = { news -> cont.resume(news) }, onError = { e -> cont.resumeWithException(e) } ) cont.invokeOnCancellation { call.cancel() } } }
Dispatchers.Main on Android, our whole application freezes. If we block the thread from Dispatchers.Default, we can forget about efficient processor use. This is why we should never make a blocking call without first specifying the dispatcher[^401_3].withContext. In most cases, when we implement repositories in applications, it is enough to use Dispatchers.IO[^401_4].class DiscSaveRepository( private val discReader: DiscReader ) : SaveRepository { override suspend fun loadSave(name: String): SaveData = withContext(Dispatchers.IO) { discReader.read("save/$name") } }
Dispatchers.IO is limited to 64 threads, which might not be enough on the backend and Android. If every request needs to make a blocking call, and you have thousands of requests per second, the queue for these 64 threads might start growing quickly. In such a situation, you might consider using limitedParallelism on Dispatchers.IO to make a new dispatcher with an independent limit that is greater than 64 threads[^401_5].class LibraryGoogleAccountVerifier : GoogleAccountVerifier { private val dispatcher = Dispatchers.IO .limitedParallelism(100) private var verifier = GoogleIdTokenVerifier.Builder(..., ...) .setAudience(...) .build() override suspend fun getUserData( googleToken: String ): GoogleUserData? = withContext(dispatcher) { verifier.verify(googleToken) ?.payload ?.let { GoogleUserData( email = it.email, name = it.getString("given_name"), surname = it.getString("family_name"), imageUrl = it.getString("picture"), ) } } }
Dispatchers.IO should be used whenever we suspect that our function might be called by so many coroutines that they might use a significant number of threads. In such cases, we do not want to block threads from Dispatchers.IO because we do not know which processes will wait until our process is finished.class CertificateGenerator { private val dispatcher = Dispatchers.IO .limitedParallelism(5) suspend fun generate(data: CertificateData): UserData = withContext(dispatcher) { Runtime.getRuntime() .exec("generateCertificate " + data.toArgs()) } }
Dispatchers.Default, and all processes that modify the main view run on Dispatchers.Main.immediate. For that, withContext might also be useful.suspend fun calculateModel() = withContext(Dispatchers.Default) { model.fit( dataset = newTrain, epochs = 10, batchSize = 100, verbose = false ) }
suspend fun setUserName(name: String) = withContext(Dispatchers.Main.immediate) { userNameView.text = name }
Flow instead. We've seen one example already: in the Room library, we use suspend functions to perform a single database operation, and we use the Flow type to observe changes in a table.// Room @Dao interface LocationDao { @Insert(onConflict = OnConflictStrategy.IGNORE) suspend fun insertLocation(location: Location) @Query("DELETE FROM location_table") suspend fun deleteLocations() @Query("SELECT * FROM location_table ORDER BY time") fun observeLocations(): Flow<List<Location>> }
Flow instead. To create such a flow (if the library we use does not support returning one), we should use callbackFlow (or channelFlow). Remember to end your builder with awaitClose[^401_6].fun listenMessages(): Flow<List<Message>> = callbackFlow { socket.on("NewMessage") { args -> trySendBlocking(args.toMessage()) } awaitClose() }
Flow is observing UI events, like button clicks or text changes.fun EditText.listenTextChange(): Flow<String> = callbackFlow { val watcher = doAfterTextChanged { trySendBlocking(it.toString()) } awaitClose { removeTextChangedListener(watcher) } }
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow { val callback = object : Callback { override fun onNextValue(value: T) { trySendBlocking(value) } override fun onApiError(cause: Throwable) { cancel(CancellationException("API Error", cause)) } override fun onCompleted() = channel.close() } api.register(callback) awaitClose { api.unregister(callback) } }
flowOn on the produced flow[^401_7].fun fibonacciFlow(): Flow<BigDecimal> = flow { var a = BigDecimal.ZERO var b = BigDecimal.ONE emit(a) emit(b) while (true) { val temp = a a = b b += temp emit(b) } }.flowOn(Dispatchers.Default) fun filesContentFlow(path: String): Flow<String> = channelFlow { File(path).takeIf { it.exists() } ?.listFiles() ?.forEach { send(it.readText()) } }.flowOn(Dispatchers.IO)
[^401_1]: For details, see the Cancellation chapter, section invokeOnCompletion.
[^401_2]: For details, see the How does suspension work? chapter, section Resume with an exception.
[^401_3]: For details, see the Dispatchers chapter.
[^401_4]: For details, see the Dispatchers chapter, section IO dispatcher.
[^401_5]: For details, see the Dispatchers chapter, section IO dispatcher with a custom pool of threads.
[^401_6]: For details, see the Flow building chapter, section callbackFlow.
[^401_7]: For details, see the Flow lifecycle functions chapter, section flowOn.