From b03f95cb24156876a5906a13a3af734bcb467e4f Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Mon, 15 Jul 2024 17:04:18 -0400 Subject: [PATCH] Use different dispatchers for cache vs code (#1386) * Use different dispatchers for cache vs code The cache needs to be thread-isolated. QuickJS needs to be thread-isolated. But they don't need to be the same threads. This makes that possible. This is working towards https://github.com/cashapp/zipline/issues/1385 * apiDump --- zipline-loader/api/android/zipline-loader.api | 4 +- zipline-loader/api/jvm/zipline-loader.api | 4 +- .../app/cash/zipline/loader/ZiplineLoader.kt | 31 +++++++++++++- .../internal/fetcher/FsCachingFetcher.kt | 40 ++++++++++++++----- .../app/cash/zipline/loader/LoaderTester.kt | 13 +++--- 5 files changed, 74 insertions(+), 18 deletions(-) diff --git a/zipline-loader/api/android/zipline-loader.api b/zipline-loader/api/android/zipline-loader.api index e4a536023f..3c73c2804d 100644 --- a/zipline-loader/api/android/zipline-loader.api +++ b/zipline-loader/api/android/zipline-loader.api @@ -155,7 +155,9 @@ public final class app/cash/zipline/loader/ZiplineLoader { public static synthetic fun loadOnce$default (Lapp/cash/zipline/loader/ZiplineLoader;Ljava/lang/String;Lapp/cash/zipline/loader/FreshnessChecker;Ljava/lang/String;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static synthetic fun loadOnce$default (Lapp/cash/zipline/loader/ZiplineLoader;Ljava/lang/String;Ljava/lang/String;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun setConcurrentDownloads (I)V - public final fun withCache (Lapp/cash/zipline/loader/ZiplineCache;)Lapp/cash/zipline/loader/ZiplineLoader; + public final synthetic fun withCache (Lapp/cash/zipline/loader/ZiplineCache;)Lapp/cash/zipline/loader/ZiplineLoader; + public final fun withCache (Lapp/cash/zipline/loader/ZiplineCache;Lkotlinx/coroutines/CoroutineDispatcher;)Lapp/cash/zipline/loader/ZiplineLoader; + public static synthetic fun withCache$default (Lapp/cash/zipline/loader/ZiplineLoader;Lapp/cash/zipline/loader/ZiplineCache;Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/Object;)Lapp/cash/zipline/loader/ZiplineLoader; public final fun withEmbedded (Lokio/FileSystem;Lokio/Path;)Lapp/cash/zipline/loader/ZiplineLoader; public final synthetic fun withEmbedded (Lokio/Path;Lokio/FileSystem;)Lapp/cash/zipline/loader/ZiplineLoader; public final fun withEventListenerFactory (Lapp/cash/zipline/EventListener$Factory;)Lapp/cash/zipline/loader/ZiplineLoader; diff --git a/zipline-loader/api/jvm/zipline-loader.api b/zipline-loader/api/jvm/zipline-loader.api index 2edfdd6206..ebf560eb52 100644 --- a/zipline-loader/api/jvm/zipline-loader.api +++ b/zipline-loader/api/jvm/zipline-loader.api @@ -155,7 +155,9 @@ public final class app/cash/zipline/loader/ZiplineLoader { public static synthetic fun loadOnce$default (Lapp/cash/zipline/loader/ZiplineLoader;Ljava/lang/String;Lapp/cash/zipline/loader/FreshnessChecker;Ljava/lang/String;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static synthetic fun loadOnce$default (Lapp/cash/zipline/loader/ZiplineLoader;Ljava/lang/String;Ljava/lang/String;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public final fun setConcurrentDownloads (I)V - public final fun withCache (Lapp/cash/zipline/loader/ZiplineCache;)Lapp/cash/zipline/loader/ZiplineLoader; + public final synthetic fun withCache (Lapp/cash/zipline/loader/ZiplineCache;)Lapp/cash/zipline/loader/ZiplineLoader; + public final fun withCache (Lapp/cash/zipline/loader/ZiplineCache;Lkotlinx/coroutines/CoroutineDispatcher;)Lapp/cash/zipline/loader/ZiplineLoader; + public static synthetic fun withCache$default (Lapp/cash/zipline/loader/ZiplineLoader;Lapp/cash/zipline/loader/ZiplineCache;Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/Object;)Lapp/cash/zipline/loader/ZiplineLoader; public final fun withEmbedded (Lokio/FileSystem;Lokio/Path;)Lapp/cash/zipline/loader/ZiplineLoader; public final synthetic fun withEmbedded (Lokio/Path;Lokio/FileSystem;)Lapp/cash/zipline/loader/ZiplineLoader; public final fun withEventListenerFactory (Lapp/cash/zipline/EventListener$Factory;)Lapp/cash/zipline/loader/ZiplineLoader; diff --git a/zipline-loader/src/commonMain/kotlin/app/cash/zipline/loader/ZiplineLoader.kt b/zipline-loader/src/commonMain/kotlin/app/cash/zipline/loader/ZiplineLoader.kt index ee4a2d0fcf..86e07b1cc7 100644 --- a/zipline-loader/src/commonMain/kotlin/app/cash/zipline/loader/ZiplineLoader.kt +++ b/zipline-loader/src/commonMain/kotlin/app/cash/zipline/loader/ZiplineLoader.kt @@ -54,6 +54,19 @@ import okio.Path * * Loader attempts to load code as quickly as possible with concurrent network downloads and code * loading. + * + * # Concurrency + * + * This class uses two different coroutine dispatchers: + * + * - The [dispatcher] is the only dispatcher permitted to call functions on the returned [Zipline] + * instance. Each instance is thread-confined and this dispatcher must implement that + * enforcement. + * - The [cacheDispatcher] is the only dispatcher permitted to call functions on [ZiplineCache]. + * The cache is also thread-confined, and the two threads may be different. + * + * In applications where multiple [Zipline] instances are used, but the instances all share a cache, + * there should be N + 1 dispatchers: one for the cache plus another for each application. */ class ZiplineLoader internal constructor( private val dispatcher: CoroutineDispatcher, @@ -64,6 +77,7 @@ class ZiplineLoader internal constructor( private val embeddedDir: Path?, private val embeddedFileSystem: FileSystem?, private val cache: ZiplineCache?, + private val cacheDispatcher: CoroutineDispatcher?, ) { constructor( dispatcher: CoroutineDispatcher, @@ -80,6 +94,7 @@ class ZiplineLoader internal constructor( embeddedDir = null, embeddedFileSystem = null, cache = null, + cacheDispatcher = null, ) @Deprecated( @@ -101,9 +116,20 @@ class ZiplineLoader internal constructor( fun withCache( cache: ZiplineCache, + cacheDispatcher: CoroutineDispatcher? = null, ): ZiplineLoader = copy( cache = cache, + cacheDispatcher = cacheDispatcher, + ) + + @Deprecated( + message = "Deprecated, will be removed in 1.16", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("withCache() that accepts a CacheDispatcher parameter"), ) + fun withCache( + cache: ZiplineCache, + ): ZiplineLoader = withCache(cache, null) fun withEventListenerFactory( eventListenerFactory: EventListener.Factory, @@ -115,6 +141,7 @@ class ZiplineLoader internal constructor( embeddedDir: Path? = this.embeddedDir, embeddedFileSystem: FileSystem? = this.embeddedFileSystem, cache: ZiplineCache? = this.cache, + cacheDispatcher: CoroutineDispatcher? = this.cacheDispatcher, eventListenerFactory: EventListener.Factory = this.eventListenerFactory, ): ZiplineLoader { return ZiplineLoader( @@ -126,6 +153,7 @@ class ZiplineLoader internal constructor( embeddedDir = embeddedDir, embeddedFileSystem = embeddedFileSystem, cache = cache, + cacheDispatcher = cacheDispatcher, ) } @@ -149,6 +177,7 @@ class ZiplineLoader internal constructor( private val cachingFetcher: FsCachingFetcher? = run { FsCachingFetcher( cache = cache ?: return@run null, + cacheDispatcher = cacheDispatcher ?: dispatcher, delegate = httpFetcher, ) } @@ -630,7 +659,7 @@ class ZiplineLoader internal constructor( } } - private fun loadCachedOrEmbeddedManifest( + private suspend fun loadCachedOrEmbeddedManifest( applicationName: String, eventListener: EventListener, nowEpochMs: Long, diff --git a/zipline-loader/src/commonMain/kotlin/app/cash/zipline/loader/internal/fetcher/FsCachingFetcher.kt b/zipline-loader/src/commonMain/kotlin/app/cash/zipline/loader/internal/fetcher/FsCachingFetcher.kt index 078d0c3d14..a94e019dcc 100644 --- a/zipline-loader/src/commonMain/kotlin/app/cash/zipline/loader/internal/fetcher/FsCachingFetcher.kt +++ b/zipline-loader/src/commonMain/kotlin/app/cash/zipline/loader/internal/fetcher/FsCachingFetcher.kt @@ -17,6 +17,8 @@ package app.cash.zipline.loader.internal.fetcher import app.cash.zipline.EventListener import app.cash.zipline.loader.ZiplineCache +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.withContext import okio.ByteString /** @@ -24,6 +26,7 @@ import okio.ByteString */ internal class FsCachingFetcher( private val cache: ZiplineCache, + private val cacheDispatcher: CoroutineDispatcher, private val delegate: Fetcher, ) : Fetcher { override suspend fun fetch( @@ -35,13 +38,17 @@ internal class FsCachingFetcher( baseUrl: String?, url: String, ): ByteString { - return cache.getOrPut(applicationName, sha256, nowEpochMs) { - delegate.fetch(applicationName, eventListener, id, sha256, nowEpochMs, baseUrl, url) + return withContext(cacheDispatcher) { + cache.getOrPut(applicationName, sha256, nowEpochMs) { + delegate.fetch(applicationName, eventListener, id, sha256, nowEpochMs, baseUrl, url) + } } } - fun loadPinnedManifest(applicationName: String, nowEpochMs: Long): LoadedManifest? { - return cache.getPinnedManifest(applicationName, nowEpochMs) + suspend fun loadPinnedManifest(applicationName: String, nowEpochMs: Long): LoadedManifest? { + return withContext(cacheDispatcher) { + cache.getPinnedManifest(applicationName, nowEpochMs) + } } /** @@ -50,18 +57,31 @@ internal class FsCachingFetcher( * This assumes that all artifacts in [loadedManifest] are currently pinned. Fetchers do not * necessarily enforce this assumption. */ - fun pin(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) = - cache.pinManifest(applicationName, loadedManifest, nowEpochMs) + suspend fun pin(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) { + return withContext(cacheDispatcher) { + cache.pinManifest(applicationName, loadedManifest, nowEpochMs) + } + } /** * Removes the pins for [applicationName] in [loadedManifest] so they may be pruned. */ - fun unpin(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) = - cache.unpinManifest(applicationName, loadedManifest, nowEpochMs) + suspend fun unpin(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) { + return withContext(cacheDispatcher) { + cache.unpinManifest(applicationName, loadedManifest, nowEpochMs) + } + } /** * Updates freshAt timestamp for manifests that in later network fetch is still the freshest. */ - fun updateFreshAt(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) = - cache.updateManifestFreshAt(applicationName, loadedManifest, nowEpochMs) + suspend fun updateFreshAt( + applicationName: String, + loadedManifest: LoadedManifest, + nowEpochMs: Long, + ) { + return withContext(cacheDispatcher) { + cache.updateManifestFreshAt(applicationName, loadedManifest, nowEpochMs) + } + } } diff --git a/zipline-loader/src/commonTest/kotlin/app/cash/zipline/loader/LoaderTester.kt b/zipline-loader/src/commonTest/kotlin/app/cash/zipline/loader/LoaderTester.kt index 0394bba122..44d9101a65 100644 --- a/zipline-loader/src/commonTest/kotlin/app/cash/zipline/loader/LoaderTester.kt +++ b/zipline-loader/src/commonTest/kotlin/app/cash/zipline/loader/LoaderTester.kt @@ -22,6 +22,7 @@ import app.cash.zipline.loader.ManifestVerifier.Companion.NO_SIGNATURE_CHECKS import app.cash.zipline.loader.internal.getApplicationManifestFileName import app.cash.zipline.loader.testing.LoaderTestFixtures import app.cash.zipline.testing.systemFileSystem +import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow @@ -37,11 +38,12 @@ import okio.FileSystem class LoaderTester( private val eventListenerFactory: EventListener.Factory = EventListenerNoneFactory, private val manifestVerifier: ManifestVerifier = NO_SIGNATURE_CHECKS, + private val dispatcher: CoroutineDispatcher = UnconfinedTestDispatcher(), + private val cacheDispatcher: CoroutineDispatcher = dispatcher, ) { val tempDir = FileSystem.SYSTEM_TEMPORARY_DIRECTORY / "okio-${randomToken().hex()}" val httpClient = FakeZiplineHttpClient() - private val dispatcher = UnconfinedTestDispatcher() private val cacheMaxSizeInBytes = 100 * 1024 * 1024 var nowMillis = 1_000L @@ -68,9 +70,9 @@ class LoaderTester( systemFileSystem.createDirectories(tempDir, mustCreate = true) systemFileSystem.createDirectories(embeddedDir, mustCreate = true) cache = testZiplineCache( - systemFileSystem, - cacheDir, - cacheMaxSizeInBytes.toLong(), + fileSystem = systemFileSystem, + directory = cacheDir, + maxSizeInBytes = cacheMaxSizeInBytes.toLong(), ) loader = testZiplineLoader( dispatcher = dispatcher, @@ -82,7 +84,8 @@ class LoaderTester( embeddedFileSystem = embeddedFileSystem, embeddedDir = embeddedDir, ).withCache( - cache, + cache = cache, + cacheDispatcher = cacheDispatcher, ) }