Skip to content

Commit

Permalink
Use different dispatchers for cache vs code (#1386)
Browse files Browse the repository at this point in the history
* Use different dispatchers for cache vs code

The cache needs to be thread-isolated.

QuickJS needs to be thread-isolated.

But they don't need to be the same threads. This makes that
possible.

This is working towards #1385

* apiDump
  • Loading branch information
squarejesse authored Jul 15, 2024
1 parent a976f61 commit b03f95c
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 18 deletions.
4 changes: 3 additions & 1 deletion zipline-loader/api/android/zipline-loader.api
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ public final class app/cash/zipline/loader/ZiplineLoader {
public static synthetic fun loadOnce$default (Lapp/cash/zipline/loader/ZiplineLoader;Ljava/lang/String;Lapp/cash/zipline/loader/FreshnessChecker;Ljava/lang/String;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun loadOnce$default (Lapp/cash/zipline/loader/ZiplineLoader;Ljava/lang/String;Ljava/lang/String;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun setConcurrentDownloads (I)V
public final fun withCache (Lapp/cash/zipline/loader/ZiplineCache;)Lapp/cash/zipline/loader/ZiplineLoader;
public final synthetic fun withCache (Lapp/cash/zipline/loader/ZiplineCache;)Lapp/cash/zipline/loader/ZiplineLoader;
public final fun withCache (Lapp/cash/zipline/loader/ZiplineCache;Lkotlinx/coroutines/CoroutineDispatcher;)Lapp/cash/zipline/loader/ZiplineLoader;
public static synthetic fun withCache$default (Lapp/cash/zipline/loader/ZiplineLoader;Lapp/cash/zipline/loader/ZiplineCache;Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/Object;)Lapp/cash/zipline/loader/ZiplineLoader;
public final fun withEmbedded (Lokio/FileSystem;Lokio/Path;)Lapp/cash/zipline/loader/ZiplineLoader;
public final synthetic fun withEmbedded (Lokio/Path;Lokio/FileSystem;)Lapp/cash/zipline/loader/ZiplineLoader;
public final fun withEventListenerFactory (Lapp/cash/zipline/EventListener$Factory;)Lapp/cash/zipline/loader/ZiplineLoader;
Expand Down
4 changes: 3 additions & 1 deletion zipline-loader/api/jvm/zipline-loader.api
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ public final class app/cash/zipline/loader/ZiplineLoader {
public static synthetic fun loadOnce$default (Lapp/cash/zipline/loader/ZiplineLoader;Ljava/lang/String;Lapp/cash/zipline/loader/FreshnessChecker;Ljava/lang/String;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun loadOnce$default (Lapp/cash/zipline/loader/ZiplineLoader;Ljava/lang/String;Ljava/lang/String;Lkotlinx/serialization/modules/SerializersModule;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun setConcurrentDownloads (I)V
public final fun withCache (Lapp/cash/zipline/loader/ZiplineCache;)Lapp/cash/zipline/loader/ZiplineLoader;
public final synthetic fun withCache (Lapp/cash/zipline/loader/ZiplineCache;)Lapp/cash/zipline/loader/ZiplineLoader;
public final fun withCache (Lapp/cash/zipline/loader/ZiplineCache;Lkotlinx/coroutines/CoroutineDispatcher;)Lapp/cash/zipline/loader/ZiplineLoader;
public static synthetic fun withCache$default (Lapp/cash/zipline/loader/ZiplineLoader;Lapp/cash/zipline/loader/ZiplineCache;Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/Object;)Lapp/cash/zipline/loader/ZiplineLoader;
public final fun withEmbedded (Lokio/FileSystem;Lokio/Path;)Lapp/cash/zipline/loader/ZiplineLoader;
public final synthetic fun withEmbedded (Lokio/Path;Lokio/FileSystem;)Lapp/cash/zipline/loader/ZiplineLoader;
public final fun withEventListenerFactory (Lapp/cash/zipline/EventListener$Factory;)Lapp/cash/zipline/loader/ZiplineLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ import okio.Path
*
* Loader attempts to load code as quickly as possible with concurrent network downloads and code
* loading.
*
* # Concurrency
*
* This class uses two different coroutine dispatchers:
*
* - The [dispatcher] is the only dispatcher permitted to call functions on the returned [Zipline]
* instance. Each instance is thread-confined and this dispatcher must implement that
* enforcement.
* - The [cacheDispatcher] is the only dispatcher permitted to call functions on [ZiplineCache].
* The cache is also thread-confined, and the two threads may be different.
*
* In applications where multiple [Zipline] instances are used, but the instances all share a cache,
* there should be N + 1 dispatchers: one for the cache plus another for each application.
*/
class ZiplineLoader internal constructor(
private val dispatcher: CoroutineDispatcher,
Expand All @@ -64,6 +77,7 @@ class ZiplineLoader internal constructor(
private val embeddedDir: Path?,
private val embeddedFileSystem: FileSystem?,
private val cache: ZiplineCache?,
private val cacheDispatcher: CoroutineDispatcher?,
) {
constructor(
dispatcher: CoroutineDispatcher,
Expand All @@ -80,6 +94,7 @@ class ZiplineLoader internal constructor(
embeddedDir = null,
embeddedFileSystem = null,
cache = null,
cacheDispatcher = null,
)

@Deprecated(
Expand All @@ -101,9 +116,20 @@ class ZiplineLoader internal constructor(

fun withCache(
cache: ZiplineCache,
cacheDispatcher: CoroutineDispatcher? = null,
): ZiplineLoader = copy(
cache = cache,
cacheDispatcher = cacheDispatcher,
)

@Deprecated(
message = "Deprecated, will be removed in 1.16",
level = DeprecationLevel.HIDDEN,
replaceWith = ReplaceWith("withCache() that accepts a CacheDispatcher parameter"),
)
fun withCache(
cache: ZiplineCache,
): ZiplineLoader = withCache(cache, null)

fun withEventListenerFactory(
eventListenerFactory: EventListener.Factory,
Expand All @@ -115,6 +141,7 @@ class ZiplineLoader internal constructor(
embeddedDir: Path? = this.embeddedDir,
embeddedFileSystem: FileSystem? = this.embeddedFileSystem,
cache: ZiplineCache? = this.cache,
cacheDispatcher: CoroutineDispatcher? = this.cacheDispatcher,
eventListenerFactory: EventListener.Factory = this.eventListenerFactory,
): ZiplineLoader {
return ZiplineLoader(
Expand All @@ -126,6 +153,7 @@ class ZiplineLoader internal constructor(
embeddedDir = embeddedDir,
embeddedFileSystem = embeddedFileSystem,
cache = cache,
cacheDispatcher = cacheDispatcher,
)
}

Expand All @@ -149,6 +177,7 @@ class ZiplineLoader internal constructor(
private val cachingFetcher: FsCachingFetcher? = run {
FsCachingFetcher(
cache = cache ?: return@run null,
cacheDispatcher = cacheDispatcher ?: dispatcher,
delegate = httpFetcher,
)
}
Expand Down Expand Up @@ -630,7 +659,7 @@ class ZiplineLoader internal constructor(
}
}

private fun loadCachedOrEmbeddedManifest(
private suspend fun loadCachedOrEmbeddedManifest(
applicationName: String,
eventListener: EventListener,
nowEpochMs: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package app.cash.zipline.loader.internal.fetcher

import app.cash.zipline.EventListener
import app.cash.zipline.loader.ZiplineCache
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.withContext
import okio.ByteString

/**
* Fetch from the network and save to local fileSystem cache once downloaded.
*/
internal class FsCachingFetcher(
private val cache: ZiplineCache,
private val cacheDispatcher: CoroutineDispatcher,
private val delegate: Fetcher<ByteString>,
) : Fetcher<ByteString> {
override suspend fun fetch(
Expand All @@ -35,13 +38,17 @@ internal class FsCachingFetcher(
baseUrl: String?,
url: String,
): ByteString {
return cache.getOrPut(applicationName, sha256, nowEpochMs) {
delegate.fetch(applicationName, eventListener, id, sha256, nowEpochMs, baseUrl, url)
return withContext(cacheDispatcher) {
cache.getOrPut(applicationName, sha256, nowEpochMs) {
delegate.fetch(applicationName, eventListener, id, sha256, nowEpochMs, baseUrl, url)
}
}
}

fun loadPinnedManifest(applicationName: String, nowEpochMs: Long): LoadedManifest? {
return cache.getPinnedManifest(applicationName, nowEpochMs)
suspend fun loadPinnedManifest(applicationName: String, nowEpochMs: Long): LoadedManifest? {
return withContext(cacheDispatcher) {
cache.getPinnedManifest(applicationName, nowEpochMs)
}
}

/**
Expand All @@ -50,18 +57,31 @@ internal class FsCachingFetcher(
* This assumes that all artifacts in [loadedManifest] are currently pinned. Fetchers do not
* necessarily enforce this assumption.
*/
fun pin(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) =
cache.pinManifest(applicationName, loadedManifest, nowEpochMs)
suspend fun pin(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) {
return withContext(cacheDispatcher) {
cache.pinManifest(applicationName, loadedManifest, nowEpochMs)
}
}

/**
* Removes the pins for [applicationName] in [loadedManifest] so they may be pruned.
*/
fun unpin(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) =
cache.unpinManifest(applicationName, loadedManifest, nowEpochMs)
suspend fun unpin(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) {
return withContext(cacheDispatcher) {
cache.unpinManifest(applicationName, loadedManifest, nowEpochMs)
}
}

/**
* Updates freshAt timestamp for manifests that in later network fetch is still the freshest.
*/
fun updateFreshAt(applicationName: String, loadedManifest: LoadedManifest, nowEpochMs: Long) =
cache.updateManifestFreshAt(applicationName, loadedManifest, nowEpochMs)
suspend fun updateFreshAt(
applicationName: String,
loadedManifest: LoadedManifest,
nowEpochMs: Long,
) {
return withContext(cacheDispatcher) {
cache.updateManifestFreshAt(applicationName, loadedManifest, nowEpochMs)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import app.cash.zipline.loader.ManifestVerifier.Companion.NO_SIGNATURE_CHECKS
import app.cash.zipline.loader.internal.getApplicationManifestFileName
import app.cash.zipline.loader.testing.LoaderTestFixtures
import app.cash.zipline.testing.systemFileSystem
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
Expand All @@ -37,11 +38,12 @@ import okio.FileSystem
class LoaderTester(
private val eventListenerFactory: EventListener.Factory = EventListenerNoneFactory,
private val manifestVerifier: ManifestVerifier = NO_SIGNATURE_CHECKS,
private val dispatcher: CoroutineDispatcher = UnconfinedTestDispatcher(),
private val cacheDispatcher: CoroutineDispatcher = dispatcher,
) {
val tempDir = FileSystem.SYSTEM_TEMPORARY_DIRECTORY / "okio-${randomToken().hex()}"

val httpClient = FakeZiplineHttpClient()
private val dispatcher = UnconfinedTestDispatcher()
private val cacheMaxSizeInBytes = 100 * 1024 * 1024
var nowMillis = 1_000L

Expand All @@ -68,9 +70,9 @@ class LoaderTester(
systemFileSystem.createDirectories(tempDir, mustCreate = true)
systemFileSystem.createDirectories(embeddedDir, mustCreate = true)
cache = testZiplineCache(
systemFileSystem,
cacheDir,
cacheMaxSizeInBytes.toLong(),
fileSystem = systemFileSystem,
directory = cacheDir,
maxSizeInBytes = cacheMaxSizeInBytes.toLong(),
)
loader = testZiplineLoader(
dispatcher = dispatcher,
Expand All @@ -82,7 +84,8 @@ class LoaderTester(
embeddedFileSystem = embeddedFileSystem,
embeddedDir = embeddedDir,
).withCache(
cache,
cache = cache,
cacheDispatcher = cacheDispatcher,
)
}

Expand Down

0 comments on commit b03f95c

Please sign in to comment.