Skip to content

Commit

Permalink
Fetching from ROLIE feed
Browse files Browse the repository at this point in the history
  • Loading branch information
oxisto committed Nov 12, 2024
1 parent 694e7cf commit 7d3edad
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.github.csaf.sbom.retrieval.roles.CSAFProviderRole
import io.github.csaf.sbom.retrieval.roles.CSAFPublisherRole
import io.github.csaf.sbom.retrieval.roles.CSAFTrustedProviderRole
import io.github.csaf.sbom.schema.generated.Provider
import io.github.csaf.sbom.schema.generated.Provider.Feed
import io.github.csaf.sbom.schema.generated.ROLIEFeed
import java.util.*
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -91,7 +92,7 @@ class RetrievedProvider(val json: Provider) : Validatable {
): ReceiveChannel<Pair<Provider.Feed, Result<ROLIEFeed>>> {
val feeds = json.distributions?.mapNotNull { it.rolie }?.flatMap { it.feeds } ?: listOf()

// This channel collects up to `channelCapacity` directory indices concurrently.
// This channel collects up to `channelCapacity` feeds concurrently.
val rolieChannel =
ioScope.produce(capacity = channelCapacity) {
for (feed in feeds) {
Expand All @@ -100,8 +101,8 @@ class RetrievedProvider(val json: Provider) : Validatable {
}
// This terminal channel is a simple "rendezvous channel" for awaiting the Results.
return ioScope.produce {
for ((feed, indexDeferred) in rolieChannel) {
send(feed to indexDeferred.await())
for ((feed, feedDeferred) in rolieChannel) {
send(feed to feedDeferred.await())
}
}
}
Expand Down Expand Up @@ -146,13 +147,14 @@ class RetrievedProvider(val json: Provider) : Validatable {
channelCapacity: Int = DEFAULT_CHANNEL_CAPACITY
): ReceiveChannel<Result<RetrievedDocument>> {
val indexChannel = fetchDocumentIndices(loader, channelCapacity)
val rolieChannel = fetchRolieFeeds(loader, channelCapacity)
val rolieFeedsChannel = fetchRolieFeeds(loader, channelCapacity)

// This second channel collects up to `channelCapacity` Results concurrently, which
// represent CSAF Documents or errors from fetching or validation.
val documentJobChannel =
ioScope.produce<Deferred<Result<RetrievedDocument>>>(capacity = channelCapacity) {
fetchDirectoryBased(indexChannel, loader)
fetchDocumentsFromIndices(indexChannel, loader)
fetchDocumentsFromRolieFeeds(rolieFeedsChannel, loader)
}
// This terminal channel is a simple "rendezvous channel" for awaiting the Results.
return ioScope.produce {
Expand All @@ -162,7 +164,8 @@ class RetrievedProvider(val json: Provider) : Validatable {
}
}

private suspend fun ProducerScope<Deferred<Result<RetrievedDocument>>>.fetchDirectoryBased(
private suspend fun ProducerScope<Deferred<Result<RetrievedDocument>>>
.fetchDocumentsFromIndices(
indexChannel: ReceiveChannel<Pair<String, Result<String>>>,
loader: CsafLoader
) {
Expand Down Expand Up @@ -194,6 +197,35 @@ class RetrievedProvider(val json: Provider) : Validatable {
}
}

private suspend fun ProducerScope<Deferred<Result<RetrievedDocument>>>
.fetchDocumentsFromRolieFeeds(
rolieChannel: ReceiveChannel<Pair<Provider.Feed, Result<ROLIEFeed>>>,
loader: CsafLoader
) {
for ((feed, rolieResult) in rolieChannel) {
rolieResult.fold(
{ rolie ->
rolie.feed.entry.map { entry ->
send(
async {
RetrievedDocument.from(entry.content.src.toString(), loader, role)
}
)
}
},
{ e ->
send(
async {
Result.failure(
Exception("Failed to fetch feeds from directory at ${feed.url}", e)
)
}
)
}
)
}
}

/**
* This method provides the [Result]s of [fetchDocuments] as a Java [Stream] for usage in
* non-Kotlin environments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public void testRetrievedProviderJava() throws InterruptedException, ExecutionEx
final var documentResultsExplicit = providerExplicit.streamDocuments(loader, DEFAULT_CHANNEL_CAPACITY).toList();
final var documentResultsExplicitSlow = providerExplicit.streamDocuments(loader, 1).toList();
assertEquals(
4,
5,
documentResults.size(),
"Expected exactly 4 results: One document, two document errors, one index error"
"Expected exactly 5 results: Two documents, two document errors, one index error"
);
assertEquals(
documentResults.size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,9 @@ class RetrievedProviderTest {
@Test
fun testFetchRolieFeeds() = runTest {
val provider = RetrievedProvider.from("example.com").getOrThrow()
val documentIndexResults = provider.fetchRolieFeeds().toList()
assertEquals(
1,
documentIndexResults.size,
"Expected exactly 1 results: One index.txt content and one fetch error"
)
assertTrue(documentIndexResults[0].second.isSuccess)
val rolieFeedsResults = provider.fetchRolieFeeds().toList()
assertEquals(1, rolieFeedsResults.size, "Expected exactly 1 result: One parsed ROLIE feed")
assertTrue(rolieFeedsResults[0].second.isSuccess)
}

private suspend fun providerTest(domain: String) {
Expand All @@ -88,9 +84,9 @@ class RetrievedProviderTest {
assertEquals(3, expectedDocumentCount, "Expected 3 documents")
val documentResults = provider.fetchDocuments().toList()
assertEquals(
4,
5,
documentResults.size,
"Expected exactly 4 results: One document, two document errors, one index error"
"Expected exactly 5 results: Two documents, two document errors, one index error"
)
// Check some random property on successful document
assertEquals(
Expand Down

0 comments on commit 7d3edad

Please sign in to comment.