Skip to content

Commit

Permalink
KMM :: Internal :: Make InMemoryDataSource thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
franmontiel committed Oct 2, 2023
1 parent b0de0ff commit 789cdaf
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,53 @@ import com.harmony.kotlin.data.query.KeyQuery
import com.harmony.kotlin.data.query.Query
import com.harmony.kotlin.error.DataNotFoundException
import com.harmony.kotlin.error.notSupportedQuery
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

class InMemoryDataSource<V> : GetDataSource<V>, PutDataSource<V>, DeleteDataSource {

private val objects: MutableMap<String, V> = mutableMapOf()
private val mutex = Mutex()

override suspend fun get(query: Query): V =
when (query) {
is KeyQuery -> {
objects[query.key].run {
this ?: throw DataNotFoundException()
mutex.withLock {
when (query) {
is KeyQuery -> {
objects[query.key].run {
this ?: throw DataNotFoundException()
}
}

else -> notSupportedQuery()
}
else -> notSupportedQuery()
}

override suspend fun put(query: Query, value: V?): V =
when (query) {
is KeyQuery -> {
value?.let {
objects.put(query.key, value).run { value }
} ?: throw IllegalArgumentException("InMemoryDataSource: value must be not null")
mutex.withLock {
when (query) {
is KeyQuery -> {
value?.let {
objects.put(query.key, value).run { value }
} ?: throw IllegalArgumentException("InMemoryDataSource: value must be not null")
}

else -> notSupportedQuery()
}
else -> notSupportedQuery()
}

override suspend fun delete(query: Query) {
when (query) {
is AllObjectsQuery -> {
objects.clear()
}
is KeyQuery -> {
clearAll(query.key)
mutex.withLock {
when (query) {
is AllObjectsQuery -> {
objects.clear()
}

is KeyQuery -> {
clearAll(query.key)
}

else -> notSupportedQuery()
}
else -> notSupportedQuery()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@

package com.harmony.kotlin.data.datasource.memory

import arrow.atomic.AtomicInt
import com.harmony.kotlin.common.BaseTest
import com.harmony.kotlin.common.randomString
import com.harmony.kotlin.data.query.KeyQuery
import com.harmony.kotlin.data.query.VoidQuery
import com.harmony.kotlin.error.DataNotFoundException
import com.harmony.kotlin.error.QueryNotSupportedException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.fail

class InMemoryDataSourceTests : BaseTest() {

@Test
fun `should throw DataNotFoundException if value is missing`() = runTest {
assertFailsWith<DataNotFoundException> {
val inMemoryDataSource = givenInMemoryDataSource()
val inMemoryDataSource = givenInMemoryDataSource<String>()
val query = KeyQuery(randomString())

inMemoryDataSource.get(query)
Expand All @@ -27,7 +34,7 @@ class InMemoryDataSourceTests : BaseTest() {
@Test
fun `should throw QueryNotSupportedException if query is invalid when get function is called`() = runTest {
assertFailsWith<QueryNotSupportedException> {
val inMemoryDataSource = givenInMemoryDataSource()
val inMemoryDataSource = givenInMemoryDataSource<String>()
val invalidQuery = VoidQuery

inMemoryDataSource.get(invalidQuery)
Expand Down Expand Up @@ -57,7 +64,7 @@ class InMemoryDataSourceTests : BaseTest() {
@Test
fun `should throw QueryNotSupportedException if query is invalid when put function is called`() = runTest {
assertFailsWith<QueryNotSupportedException> {
val inMemoryDataSource = givenInMemoryDataSource()
val inMemoryDataSource = givenInMemoryDataSource<String>()
val invalidQuery = VoidQuery

inMemoryDataSource.put(invalidQuery, randomString())
Expand All @@ -67,7 +74,7 @@ class InMemoryDataSourceTests : BaseTest() {
@Test
fun `should throw IllegalArgumentException if the value is null when put function is called`() = runTest {
assertFailsWith<IllegalArgumentException> {
val inMemoryDataSource = givenInMemoryDataSource()
val inMemoryDataSource = givenInMemoryDataSource<String>()
val query = KeyQuery(randomString())

inMemoryDataSource.put(query, null)
Expand All @@ -76,7 +83,7 @@ class InMemoryDataSourceTests : BaseTest() {

@Test
fun `should store value if query is valid when put function is called`() = runTest {
val inMemoryDataSource = givenInMemoryDataSource()
val inMemoryDataSource = givenInMemoryDataSource<String>()
val query = KeyQuery(randomString())
val expectedValue = randomString()

Expand All @@ -89,7 +96,7 @@ class InMemoryDataSourceTests : BaseTest() {
@Test
fun `should throw QueryNotSupportedException if query is invalid when delete function is called`() = runTest {
assertFailsWith<QueryNotSupportedException> {
val inMemoryDataSource = givenInMemoryDataSource()
val inMemoryDataSource = givenInMemoryDataSource<String>()
val invalidQuery = VoidQuery

inMemoryDataSource.delete(query = invalidQuery)
Expand Down Expand Up @@ -120,10 +127,44 @@ class InMemoryDataSourceTests : BaseTest() {
scope.inMemoryDataSource.put(pair.first, pair.second)
}

private suspend fun givenInMemoryDataSource(
insertValue: Pair<KeyQuery, String>? = null,
): InMemoryDataSource<String> {
val inMemoryDataSource = InMemoryDataSource<String>()
@Test
@Suppress("SwallowedException")
fun `should not have concurrency problems`() = kotlinx.coroutines.test.runTest {
val key = AtomicInt(0)
val inMemoryDataSource = givenInMemoryDataSource<Int>(Pair(KeyQuery(key.get().toString()), 0))

val numThreads = 10
val numIterationsPerThread = 1000

val jobs = List(numThreads) {
launch(Dispatchers.IO) {
for (i in 0 until numIterationsPerThread) {
try {
inMemoryDataSource.put(KeyQuery(key.incrementAndGet().toString()), 0)
} catch (e: Exception) {
fail("Concurrency problem: putting element ${key.get()} failed with ${e.stackTraceToString()}")
}
}
}
}

jobs.joinAll()

// Ensure that the final state of the hashmap is consistent with expectations
val numberOfElements = numThreads * numIterationsPerThread
for (currentKey in 0..numberOfElements) {
try {
inMemoryDataSource.get(KeyQuery(currentKey.toString()))
} catch (e: DataNotFoundException) {
fail("Concurrency problem: element $currentKey not found because it was not inserted in the first place")
}
}
}

private suspend fun <T> givenInMemoryDataSource(
insertValue: Pair<KeyQuery, T>? = null,
): InMemoryDataSource<T> {
val inMemoryDataSource = InMemoryDataSource<T>()

insertValue?.let {
inMemoryDataSource.put(it.first, it.second)
Expand Down

0 comments on commit 789cdaf

Please sign in to comment.