
CoroutineContext determines which thread a certain coroutine will run on.Dispatchers in Kotlin Coroutines are a similar concept to RxJava Schedulers.
Dispatchers.Default, which is designed to run CPU-intensive operations. It has a pool of threads whose size is equal to the number of cores in the machine your code is running on (but not less than two). At least theoretically, this is the optimal number of threads, assuming you are using them efficiently, i.e., performing CPU-intensive calculations and not blocking threads. To see this dispatcher in action, run the following code:import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlin.random.Random //sampleStart suspend fun main(): Unit = coroutineScope { repeat(1000) { launch { // or launch(Dispatchers.Default) { // To make it busy List(1_000_000) { Random.nextLong() }.maxOrNull() val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } } //sampleEnd
Running on thread: DefaultDispatcher-worker-1
Running on thread: DefaultDispatcher-worker-5
Running on thread: DefaultDispatcher-worker-7
Running on thread: DefaultDispatcher-worker-6
Running on thread: DefaultDispatcher-worker-11
Running on thread: DefaultDispatcher-worker-2
Running on thread: DefaultDispatcher-worker-10
Running on thread: DefaultDispatcher-worker-4
...
Warning:runBlockingsets its own dispatcher if no other one is set; so, inside its scope theDispatcher.Defaultis not the one that is chosen automatically. If we usedrunBlockinginstead ofcoroutineScopein the above example, all coroutines would be running on "main".
Dispatchers.Default threads and starve other coroutines using the same dispatcher. In such cases, we can use limitedParallelism on Dispatchers.Default to make a dispatcher that runs on the same threads but is limited to using not more than a certain number of them at the same time.private val dispatcher = Dispatchers.Default .limitedParallelism(5)
limitedParallelism before, I should warn you that this function has completely different behavior for Dispatchers.Default than for Dispatchers.IO. We will discuss it later.limitedParallelismwas introduced in kotlinx-coroutines version1.6, so it is quite a new feature and you won't find it being used in older projects.
Dispatchers.Main.suspend fun showUserName(name: String) = withContext(Dispatchers.Main) { userNameTextView.text = name }
Dispatchers.Main is available on Android if we use the kotlinx-coroutines-android artifact. Similarly, it's available on JavaFX if we use kotlinx-coroutines-javafx, and on Swing if we use kotlinx-coroutines-swing. If you do not have a dependency that defines the main dispatcher, it is not available and cannot be used.Dispatchers.Main is not defined there. To be able to use it, you need to set a dispatcher using Dispatchers.setMain(dispatcher) from kotlinx-coroutines-test.class SomeTest { private val dispatcher = Executors .newSingleThreadExecutor() .asCoroutineDispatcher() @Before fun setup() { Dispatchers.setMain(dispatcher) } @After fun tearDown() { // reset the Main dispatcher to // the original Main dispatcher Dispatchers.resetMain() dispatcher.close() } @Test fun testSomeUI() = runBlocking { launch(Dispatchers.Main) { // ... } } }
Dispatchers.Main. If you do some CPU-intensive operations, you should run them on Dispatchers.Default. These two are enough for many applications, but what if you need to block the thread because, for example, you need to perform long I/O operations (e.g., read big files) or use a library with blocking functions? You cannot block the Main thread because your application would freeze. If you block your default dispatcher, you risk blocking all the threads in the thread pool, in which case you won't be able to do any calculations. This is why we need a different dispatcher for such a situation, and this dispatcher is Dispatchers.IO.Dispatchers.IO is designed to be used when we block threads with I/O operations, such as when we read/write files or call blocking functions. The code below takes around 1 second because Dispatchers.IO allows more than 50 active threads at the same time.import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlin.system.measureTimeMillis suspend fun main() { val time = measureTimeMillis { coroutineScope { repeat(50) { launch(Dispatchers.IO) { Thread.sleep(1000) } } } } println(time) // ~1000 }
Dispatchers.IOis only needed if you have an API that blocks threads. If you use suspending functions, you can use any dispatcher. You do not need to useDispatchers.IOif you want to use a network or database library that provides suspending functions. In many projects, this means you might not need to useDispatchers.IOat all.
Dispatchers.Default is limited by the number of cores in your processor. The limit of Dispatchers.IO is 64 (or the number of cores if there are more).import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch //sampleStart suspend fun main(): Unit = coroutineScope { repeat(1000) { launch(Dispatchers.IO) { Thread.sleep(200) val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } } // Running on thread: DefaultDispatcher-worker-1 //... // Running on thread: DefaultDispatcher-worker-53 // Running on thread: DefaultDispatcher-worker-14 //sampleEnd
Dispatchers.Default and Dispatchers.IO share the same pool of threads. This is an important optimization. Threads are reused, and redispatching is often not needed. For instance, let's say you are running on Dispatchers.Default and then execution reaches withContext(Dispatchers.IO) { ... }. Most often, you will stay on the same thread[^207_4], but what changes is that this thread counts towards not the Dispatchers.Default limit but the Dispatchers.IO limit. Their limits are independent, so they will never starve each other.import kotlinx.coroutines.* suspend fun main(): Unit = coroutineScope { launch(Dispatchers.Default) { println(Thread.currentThread().name) withContext(Dispatchers.IO) { println(Thread.currentThread().name) } } } // DefaultDispatcher-worker-2 // DefaultDispatcher-worker-2
Dispatchers.Default and Dispatchers.IO to the maximum. As a result, your number of active threads will be the sum of their limits. If you allow 64 threads in Dispatchers.IO and you have 8 cores, you will have 72 active threads in the shared pool. This means we have efficient thread reuse and both dispatchers have strong independence.Dispatchers.IO is when we need to call blocking functions from libraries. The best practice is to wrap them with withContext(Dispatchers.IO) to make them suspending functions, which can be used without any special care: they can be treated like all other properly implemented suspending functions.class DiscUserRepository( private val discReader: DiscReader ) : UserRepository { override suspend fun getUser(): UserData = withContext(Dispatchers.IO) { UserData(discReader.read("userName")) } }
Dispatchers.IO must have a limit, imagine that you have a periodic task that needs to start a large number of coroutines, each of which needs to block a thread. Your task might be sending a newsletter using a blocking API, like SendGrid. If Dispatchers.IO had no limit, this process would start as many threads as there are emails to send: if you have 100,000 emails to send, it will try to start 100,000 threads, which would require 100 GB of RAM, so it would crash your application. This is why we need to set a limit for Dispatchers.IO.class NewsletterService { private val sendGrid = SendGrid(API_KEY) suspend fun sendNewsletter( newsletter: Newsletter, emails: List<Email> ) = withContext(Dispatchers.IO) { emails.forEach { email -> launch { sendGrid.api(createNewsletter(email, newsletter)) } } } // ... }
Dispatchers.IO limited to 64 threads it will take nearly 3 minutes to send all emails. The problem with Dispatchers.IO is that it has one limit for the whole application, so one service might block another. Imagine that in the same application you have another service that needs to send registration confirmation emails. If both services used Dispatchers.IO, then users trying to register would wait in a queue for threads until all newsletter emails have been sent. This should never happen, so we need to create dispatchers with custom independent limits.Dispatchers.IO has a special behavior defined for the limitedParallelism function that creates a new dispatcher with an independent thread limit. For example, imagine you start 100 coroutines, each of which blocks a thread for a second. If you run these coroutines on Dispatchers.IO, it will take 2 seconds. If you run them on Dispatchers.IO with limitedParallelism set to 100 threads, it will take 1 second. The execution time of both dispatchers can be measured at the same time because the limits of these two dispatchers are independent anyway.import kotlinx.coroutines.* import kotlin.system.measureTimeMillis suspend fun main(): Unit = coroutineScope { launch { printCoroutinesTime(Dispatchers.IO) // Dispatchers.IO took: 2074 } launch { val dispatcher = Dispatchers.IO .limitedParallelism(100) printCoroutinesTime(dispatcher) // LimitedDispatcher@XXX took: 1082 } } suspend fun printCoroutinesTime( dispatcher: CoroutineDispatcher ) { val test = measureTimeMillis { coroutineScope { repeat(100) { launch(dispatcher) { Thread.sleep(1000) } } } } println("$dispatcher took: $test") }
Dispatchers.Default and Dispatchers.IO, but they both have limited access to these threads. When we use limitedParallelism on Dispatchers.IO, we create a new dispatcher with an independent pool of threads (completely independent of Dispatchers.IO limit). If we use limitedParallelism on Dispatchers.Default or any other dispatcher, we create a dispatcher with an additional limit that is still limited, just like the original dispatcher.// Dispatcher with an unlimited pool of threads
private val pool = ...
Dispatchers.IO = pool limited to 64
Dispatchers.IO.limitedParallelism(x) = pool limited to x
Dispatchers.Default = pool limited to coresNum
Dispatchers.Default.limitedParallelism(x) =
Dispatchers.Default limited to x

Dispatchers.Default is limited to the number of cores. Dispatchers.IO is limited to 64 (or the number of cores). Using limitedParallelism on Dispatchers.Default makes a dispatcher with an additional limit to Dispatchers.Default, whereas using it on Dispatcher.IO makes a dispatcher with a limit independent of Dispatcher.IO. However, all these dispatchers share the same infinite pool of threads.
limitedParallelism has different behavior for Dispatchers.IO, which has nothing to do with the Dispatchers.IO limit. I think we can make this much more intuitive by just adding a simple function to name the creation of a dispatcher that has an independent thread limit:fun limitedDispatcher(threadLimit: Int) = Dispatchers.IO .limitedParallelism(threadLimit)
Dispatcher.IO and other dispatchers' limits, therefore one service will not block another.class DiscUserRepository( private val discReader: DiscReader ) : UserRepository { private val dispatcher = Dispatchers.IO .limitParallelism(5) override suspend fun getUser(): UserData = withContext(dispatcher) { UserData(discReader.read("userName")) } }
class NewsletterService { private val dispatcher = Dispatchers.IO.limitedParallelism(5) private val sendGrid = SendGrid(API_KEY) suspend fun sendNewsletter( newsletter: Newsletter, emails: List<Email> ) = withContext(dispatcher) { emails.forEach { email -> launch { sendGrid.api(createNewsletter(email, newsletter)) } } } // ... } class AuthorizationService { private val dispatcher = Dispatchers.IO.limitedParallelism(50) suspend fun sendAuthEmail( user: User ) = withContext(dispatcher) { sendGrid.api(createConfirmationEmail(user)) } // ... }
Executors class. These pools implement the ExecutorService or Executor interfaces, which we can transform into a dispatcher using the asCoroutineDispatcher function.private val NUMBER_OF_THREADS = 20 val dispatcher = Executors .newFixedThreadPool(NUMBER_OF_THREADS) .asCoroutineDispatcher()
limitedParallelismwas introduced inkotlinx-coroutinesversion 1.6; in previous versions, we often created dispatchers with independent pools of threads using theExecutorsclass.
newFixedThreadPoolContext.private val NUMBER_OF_THREADS = 20 val dispatcher = newFixedThreadPoolContext( nThreads = 10, name = "background-thread" )
close function. Developers often forget about this, which leads to leaking threads. Another problem is that when you create a fixed pool of threads, you are not using them efficiently because you will keep unused threads alive without sharing them with other services.i by 1. So, its value should be 10,000, but it is a smaller number. This is a result of a shared state (i property) modification on multiple threads at the same time.import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch //sampleStart var i = 0 suspend fun main(): Unit = coroutineScope { repeat(10_000) { launch(Dispatchers.IO) { // or Default i++ } } delay(1000) println(i) // ~9930 } //sampleEnd
Executors. The problem with this is that this dispatcher keeps an extra thread active that needs to be closed when it is not used anymore. A modern solution is to use Dispatchers.Default or Dispatchers.IO (if we block threads) with parallelism limited to 1.val dispatcher = Dispatchers.IO .limitedParallelism(1) // previously: // val dispatcher = Executors.newSingleThreadExecutor() // .asCoroutineDispatcher()
import kotlinx.coroutines.* //sampleStart var i = 0 suspend fun main(): Unit = coroutineScope { val dispatcher = Dispatchers.Default .limitedParallelism(1) repeat(10000) { launch(dispatcher) { i++ } } delay(1000) println(i) // 10000 } //sampleEnd
import kotlinx.coroutines.* import kotlin.system.measureTimeMillis suspend fun main(): Unit = coroutineScope { val dispatcher = Dispatchers.Default .limitedParallelism(1) val launch = launch(dispatcher) { repeat(5) { launch { Thread.sleep(1000) } } } val time = measureTimeMillis { launch.join() } println("Took $time") // Took 5006 }
Dispatcher.IO, where we cannot avoid blocking threads[^207_6].--enable-preview flag. Then, we should be able to create an executor using newVirtualThreadPerTaskExecutor from Executors and transform it into a coroutine dispatcher.val LoomDispatcher = Executors .newVirtualThreadPerTaskExecutor() .asCoroutineDispatcher()
object LoomDispatcher : ExecutorCoroutineDispatcher() { override val executor: Executor = Executor { command -> Thread.startVirtualThread(command) } override fun dispatch( context: CoroutineContext, block: Runnable ) { executor.execute(block) } override fun close() { error("Cannot be invoked on Dispatchers.LOOM") } }
Dispatchers object. This should also help this dispatcher's discoverability.val Dispatchers.Loom: CoroutineDispatcher get() = LoomDispatcher
Dispatchers.Loom took a bit more than two seconds.suspend fun main(): Unit = measureTimeMillis { coroutineScope { repeat(100_000) { launch(Dispatchers.Loom) { Thread.sleep(1000) } } } }.let(::println) // 2 273
Dispatchers.IO would not be fair as it is limited to 64 threads, and such function execution would take over 26 minutes. We should increment the thread limit to the number of coroutines. When I did that, such code execution took over 23 seconds, so ten times more. Of course, it consumed much more memory and processor time than the Dispatchers.Loom version. Whenever possible, we should use Dispatchers.Loom instead of Dispatchers.IO.import kotlinx.coroutines.* import kotlin.system.measureTimeMillis suspend fun main(): Unit = measureTimeMillis { val dispatcher = Dispatchers.IO .limitedParallelism(100_000) coroutineScope { repeat(100_000) { launch(dispatcher) { Thread.sleep(1000) } } } }.let(::println) // 23 803
Dispatchers.IO. However, you will likely not need it explicitly in the future as the Kotlin Coroutines team has expressed their willingness to use virtual threads by default once Project Loom is stable. I hope this happens soon.Dispatchers.Unconfined, which is different from all the other dispatchers as it does not change any threads. When it is started, it runs on the thread on which it was started. If it is resumed, it runs on the thread that resumed it.import kotlinx.coroutines.* import kotlin.concurrent.thread import kotlin.coroutines.Continuation import kotlin.coroutines.resume //sampleStart fun main() { var continuation: Continuation<Unit>? = null thread(name = "Thread1") { runBlocking(Dispatchers.Unconfined) { println(Thread.currentThread().name) // Thread1 suspendCancellableCoroutine { continuation = it } println(Thread.currentThread().name) // Thread2 delay(1000) println(Thread.currentThread().name) // kotlinx.coroutines.DefaultExecutor (used by delay) } } thread(name = "Thread2") { Thread.sleep(1000) continuation?.resume(Unit) } } //sampleEnd
Dispatchers.Unconfined used in unit tests. Imagine that you need to test a function that calls launch, for which synchronizing the time might not be easy. One solution is to use Dispatchers.Unconfined instead of all other dispatchers. If it is used in all scopes, everything runs on the same thread, and we can more easily control the order of operations. This trick is not needed anymore, as we have much better tools for testing coroutines. We will discuss this later in the book.Main thread? This could lead to blocking the entire application. This is why we avoid using Dispatchers.Unconfined in production code, except for some special cases.runBlocking, it creates a special dispatcher that makes this coroutine and its children run on the thread that called runBlocking. This is why in the example below, all prints show Thread1, even after resuming the coroutine from another thread.fun main() { var continuation: Continuation<Unit>? = null thread(name = "Thread1") { runBlocking { println(Thread.currentThread().name) // Thread1 suspendCancellableCoroutine { continuation = it } println(Thread.currentThread().name) // Thread1 delay(1000) println(Thread.currentThread().name) // Thread1 } } thread(name = "Thread2") { Thread.sleep(1000) continuation?.resume(Unit) } }
runBlocking in main functions. This way, we avoid this special dispatcher and its surprising behavior.withContext is called, the coroutine needs to be suspended, possibly wait in a queue, and then be resumed. This is a small but unnecessary cost if we are already on this thread. Look at the function below:suspend fun showUser(user: User) = withContext(Dispatchers.Main) { userNameElement.text = user.name // ... }
withContext, the user data might be shown after some delay (this coroutine would need to wait for other coroutines to do their job first). To prevent this, there is Dispatchers.Main.immediate, which dispatches only if it is needed. So, if the function below is called on the Main thread, it won't be re-dispatched: it will be called immediately.suspend fun showUser(user: User) = withContext(Dispatchers.Main.immediate) { userNameElement.text = user.name // ... }
Dispatchers.Main.immediate as the withContext argument whenever this function might have already been called from the main dispatcher. Currently, the other dispatchers do not support immediate dispatching.ContinuationInterceptor, whose interceptContinuation method is used to modify a continuation when a coroutine is suspended[^207_3]. It also has a releaseInterceptedContinuation method that is called when a continuation is ended.public interface ContinuationInterceptor : CoroutineContext.Element { companion object Key : CoroutineContext.Key<ContinuationInterceptor> fun <T> interceptContinuation( continuation: Continuation<T> ): Continuation<T> fun releaseInterceptedContinuation( continuation: Continuation<*> ) { } //... }
interceptContinuation to wrap a continuation with DispatchedContinuation, which runs on a specific pool of threads. This is how dispatchers work.runTest from kotlinx-coroutines-test. Each element in a context has to have a unique key, which is why we sometimes inject dispatchers into unit tests to replace them with test dispatchers. We will get back to this topic in the chapter dedicated to coroutine testing.class DiscUserRepository( private val discReader: DiscReader, private val dispatcher: CoroutineContext = Dispatchers.IO, ) : UserRepository { override suspend fun getUser(): UserData = withContext(dispatcher) { UserData(discReader.read("userName")) } } class UserReaderTests { @Test fun `some test`() = runTest { // given val discReader = FakeDiscReader() val repo = DiscUserRepository( discReader, // one of coroutines testing practices this.coroutineContext[ContinuationInterceptor]!! ) //... } }
|---------------------|------------|----------|--------|--------|
| Single thread | 1 002 | 100 003 | 39 103 | 94 358 |
| Default (8 threads) | 1 002 | 13 003 | 8 473 | 21 461 |
| IO (64 threads) | 1 002 | 2 003 | 9 893 | 20 776 |
| 100 threads | 1 002 | 1 003 | 10 379 | 21 004 |
- When we are just suspending, it doesn't really matter how many threads we are using.
- When we are blocking, the more threads we are using, the faster all these coroutines will be finished.
- With CPU-intensive operations,
Dispatchers.Defaultis the best option[^207_2]. - If we are dealing with a memory-intensive problem, more threads might provide some (but not significant) improvement.
fun cpu(order: Order): Coffee { var i = Int.MAX_VALUE while (i > 0) { i -= if (i % 2 == 0) 1 else 2 } return Coffee(order.copy(customer = order.customer + i)) } fun memory(order: Order): Coffee { val list = List(1_000) { it } val list2 = List(1_000) { list } val list3 = List(1_000) { list2 } return Coffee( order.copy( customer = order.customer + list3.hashCode() ) ) } fun blocking(order: Order): Coffee { Thread.sleep(1000) return Coffee(order) } suspend fun suspending(order: Order): Coffee { delay(1000) return Coffee(order) }
Dispatchers.Default, which we use for CPU-intensive operations;Dispatchers.Main, which we use to access the Main thread on Android, Swing, or JavaFX;Dispatchers.Main.immediate, which runs on the same thread asDispatchers.Mainbut is not redispatched if it is not necessary;Dispatchers.IO, which we use when we need to do some blocking operations;Dispatchers.IOwith limited parallelism or a custom dispatcher with a pool of threads, which we use when we might have many blocking calls;Dispatchers.DefaultorDispatchers.IOwith parallelism limited to 1, or a custom dispatcher with a single thread, which is used when we need to secure shared state modifications;Dispatchers.Unconfined, which does not change threads and is used in some special cases;
[^207_2]: The main reason is that the more threads we use, the more time the processor needs to spend switching between them, thus it has less time to do meaningful operations. Also
Dispatchers.IO should not be used for CPU-intensive operations because it is used to block operations, and some other process might block all its threads.[^207_3]: Wrapping needs to happen only once per continuation thanks to the caching mechanism.
[^207_4]: This mechanism is not deterministic.
[^207_5]: We will discuss this in the Testing Kotlin Coroutines chapter.
[^207_6]: The solution was inspired by the article Running Kotlin coroutines on Project Loom's virtual threads by Jan Vladimir Mostert.
[^207_7]: The stable version of Project Loom was released in September 2023 along with Java 21. Java 21 officially added the main features of Project Loom such as Virtual Threads (JEP 444) and Structured Concurrency (JEP 453).