Skip to content

Commit

Permalink
fix: Concurrent messages received (#334)
Browse files Browse the repository at this point in the history
Fixes #329
  • Loading branch information
sdsantos authored Sep 7, 2023
1 parent ba90108 commit 6dc0c80
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import tech.relaycorp.relaynet.bindings.pdc.ServerException
import tech.relaycorp.relaynet.messages.control.PrivateNodeRegistration
import tech.relaycorp.relaynet.messages.control.PrivateNodeRegistrationRequest
import java.security.KeyPair
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Level
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
Expand All @@ -45,6 +46,7 @@ internal constructor(
// Gateway

private var gwServiceInteractor: ServiceInteractor? = null
private val isReceivingMessages = AtomicBoolean(false)

/**
* Bind to the gateway to be able to communicate with it.
Expand Down Expand Up @@ -183,6 +185,9 @@ internal constructor(
}
}

if (isReceivingMessages.get()) return@withContext
isReceivingMessages.set(true)

try {
receiveMessages
.receive()
Expand All @@ -195,6 +200,8 @@ internal constructor(
logger.log(Level.SEVERE, "Could not receive new messages", exp)
}

isReceivingMessages.set(false)

if (!wasAlreadyBound) unbind()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.nhaarman.mockitokotlin2.times
import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
Expand Down Expand Up @@ -40,6 +41,7 @@ import tech.relaycorp.relaynet.testing.pdc.RegisterNodeCall
import tech.relaycorp.relaynet.testing.pki.KeyPairSet
import tech.relaycorp.relaynet.testing.pki.PDACertPath
import java.time.ZonedDateTime
import kotlin.time.Duration.Companion.seconds

@RunWith(RobolectricTestRunner::class)
internal class GatewayClientImplTest : MockContextTestCase() {
Expand Down Expand Up @@ -284,4 +286,19 @@ internal class GatewayClientImplTest : MockContextTestCase() {

gatewayClient.checkForNewMessages()
}

@Test
fun checkForNewMessages_doesStartSimultaneousReceiveMessages() = coroutineScope.runTest {
whenever(receiveMessages.receive()).thenReturn(flow { delay(1.seconds) })

repeat(10) {
coroutineScope.launch {
gatewayClient.checkForNewMessages()
}
}

delay(1.seconds)

verify(receiveMessages, times(1)).receive()
}
}

0 comments on commit 6dc0c80

Please sign in to comment.