Skip to content

Commit

Permalink
Implement retry for subscriptions in PrimalApiClient
Browse files Browse the repository at this point in the history
  • Loading branch information
AleksandarIlic committed Feb 22, 2024
1 parent fe857f6 commit 7cff367
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class PrimalApiClient @Inject constructor(
@Throws(WssException::class)
suspend fun query(message: PrimalCacheFilter): PrimalQueryResult {
val queryResult = runCatching {
retrySendMessage(MAX_QUERY_RETRIES) {
retrySendMessage(MAX_RETRIES) {
val subscriptionId = UUID.randomUUID()
val deferredQueryResult = scope.async { collectQueryResult(subscriptionId) }
sendMessageAndAwaitForResultOrThrow(
Expand All @@ -134,21 +134,26 @@ class PrimalApiClient @Inject constructor(
deferredQueryResult: Deferred<PrimalQueryResult>,
): PrimalQueryResult {
ensureSocketClientConnection()
return when (socketClient.sendREQ(subscriptionId = subscriptionId, data = data)) {
true -> {
try {
deferredQueryResult.await()
} catch (error: CancellationException) {
throw error.cause ?: error
}
}
false -> {
deferredQueryResult.cancel(CancellationException("Unable to send socket message."))
throw SocketSendMessageException(message = "Unable to send socket message.")
}

try {
sendMessageOrThrow(subscriptionId = subscriptionId, data = data)
} catch (error: SocketSendMessageException) {
deferredQueryResult.cancel(CancellationException("Unable to send socket message."))
throw error
}

return try {
deferredQueryResult.await()
} catch (error: CancellationException) {
throw error.cause ?: error
}
}

private fun sendMessageOrThrow(subscriptionId: UUID, data: JsonObject) {
val success = socketClient.sendREQ(subscriptionId = subscriptionId, data = data)
if (!success) throw SocketSendMessageException(message = "Unable to send socket message.")
}

private fun Throwable?.takeAsWssException(): WssException {
return when (this) {
is WssException -> this
Expand All @@ -160,12 +165,14 @@ class PrimalApiClient @Inject constructor(

suspend fun subscribe(subscriptionId: UUID, message: PrimalCacheFilter): Flow<NostrIncomingMessage> {
ensureSocketClientConnection()
val success = socketClient.sendREQ(
subscriptionId = subscriptionId,
data = message.toPrimalJsonObject(),
)
if (!success) throw WssException(message = "Api unreachable at the moment.")

try {
retrySendMessage(MAX_RETRIES) {
sendMessageOrThrow(subscriptionId = subscriptionId, data = message.toPrimalJsonObject())
}
} catch (error: SocketSendMessageException) {
Timber.w(error)
throw WssException(message = "Api unreachable at the moment.", cause = error)
}
return socketClient.incomingMessages.filterBySubscriptionId(id = subscriptionId)
}

Expand Down Expand Up @@ -218,7 +225,7 @@ class PrimalApiClient @Inject constructor(
private class SocketSendMessageException(override val message: String?) : RuntimeException()

companion object {
const val MAX_QUERY_RETRIES = 3
const val MAX_RETRIES = 3
private const val RETRY_DELAY_MILLIS = 1_000L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class PrimalApiClientTest {
primalClient.query(message = PrimalCacheFilter(primalVerb = PrimalVerb.IMPORT_EVENTS))
} catch (_: WssException) {}

verify(exactly = 1 + PrimalApiClient.MAX_QUERY_RETRIES) {
verify(exactly = 1 + PrimalApiClient.MAX_RETRIES) {
mockNostrSocketClient.sendREQ(any(), any())
}
}
Expand Down

0 comments on commit 7cff367

Please sign in to comment.