Skip to content

Commit

Permalink
Merge pull request #91 from Parsely/coroutines-queue-manager
Browse files Browse the repository at this point in the history
  • Loading branch information
wzieba authored Nov 9, 2023
2 parents 5e590c2 + 8c4ce70 commit 845947b
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 189 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/readme.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,6 @@ jobs:
if: always()
with:
name: artifact
path: ./parsely/build/reports/*
path: |
./parsely/build/reports/*
./parsely/build/outputs/androidTest-results
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ class FunctionalTests {
}

/**
* In this scenario, the consumer application tracks more than 50 events-threshold during a flush interval.
* In this scenario, the consumer application tracks 51 events-threshold during a flush interval.
* The SDK will save the events to disk and send them in the next flush interval.
* At the end, when all events are sent, the SDK will delete the content of local storage file.
*/
@Test
fun appTracksEventsAboveQueueSizeLimit() {
fun appTracksEventsDuringTheFlushInterval() {
ActivityScenario.launch(SampleActivity::class.java).use { scenario ->
scenario.onActivity { activity: Activity ->
beforeEach(activity)
Expand Down Expand Up @@ -154,6 +154,42 @@ class FunctionalTests {
}
}

/**
* In this scenario we "stress test" the concurrency model to see if we have any conflict during
*
* - Unexpectedly high number of recorded events in small intervals (I/O locking)
* - Scenario in which a request is sent at the same time as new events are recorded
*/
@Test
fun stressTest() {
val eventsToSend = 500

ActivityScenario.launch(SampleActivity::class.java).use { scenario ->
scenario.onActivity { activity: Activity ->
beforeEach(activity)
server.enqueue(MockResponse().setResponseCode(200))
parselyTracker = initializeTracker(activity)

repeat(eventsToSend) {
parselyTracker.trackPageview("url", null, null, null)
}
}

// Wait some time to give events chance to be saved in local data storage
Thread.sleep((defaultFlushInterval * 2).inWholeMilliseconds)

// Catch up to 10 requests. We don't know how many requests the device we test on will
// perform. It's probably more like 1-2, but we're on safe (not flaky) side here.
val requests = (1..10).mapNotNull {
runCatching { server.takeRequest(100, TimeUnit.MILLISECONDS) }.getOrNull()
}.flatMap {
it.toMap()["events"]!!
}

assertThat(requests).hasSize(eventsToSend)
}
}

private fun RecordedRequest.toMap(): Map<String, List<Event>> {
val listType: TypeReference<Map<String, List<Event>>> =
object : TypeReference<Map<String, List<Event>>>() {}
Expand Down
44 changes: 44 additions & 0 deletions parsely/src/main/java/com/parsely/parselyandroid/InMemoryBuffer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.parsely.parselyandroid

import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal class InMemoryBuffer(
private val coroutineScope: CoroutineScope,
private val localStorageRepository: LocalStorageRepository,
private val onEventAddedListener: () -> Unit,
) {

private val mutex = Mutex()
private val buffer = mutableListOf<Map<String, Any?>>()

init {
coroutineScope.launch {
while (isActive) {
mutex.withLock {
if (buffer.isNotEmpty()) {
ParselyTracker.PLog("Persisting ${buffer.size} events")
localStorageRepository.insertEvents(buffer)
buffer.clear()
}
}
delay(1.seconds)
}
}
}

fun add(event: Map<String, Any>) {
coroutineScope.launch {
mutex.withLock {
ParselyTracker.PLog("Event added to buffer")
buffer.add(event)
onEventAddedListener()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ import java.io.EOFException
import java.io.FileNotFoundException
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal open class LocalStorageRepository(private val context: Context) {

private val mutex = Mutex()

/**
* Persist an object to storage.
*
Expand All @@ -21,6 +27,7 @@ internal open class LocalStorageRepository(private val context: Context) {
val oos = ObjectOutputStream(fos)
oos.writeObject(o)
oos.close()
fos.close()
} catch (ex: Exception) {
ParselyTracker.PLog("Exception thrown during queue serialization: %s", ex.toString())
}
Expand All @@ -33,6 +40,10 @@ internal open class LocalStorageRepository(private val context: Context) {
persistObject(ArrayList<Map<String, Any>>())
}

fun remove(toRemove: List<Map<String, Any>>) {
persistObject(getStoredQueue() - toRemove.toSet())
}

/**
* Get the stored event queue from persistent storage.
*
Expand All @@ -46,6 +57,7 @@ internal open class LocalStorageRepository(private val context: Context) {
@Suppress("UNCHECKED_CAST")
storedQueue = ois.readObject() as ArrayList<Map<String, Any?>?>
ois.close()
fis.close()
} catch (ex: EOFException) {
// Nothing to do here.
} catch (ex: FileNotFoundException) {
Expand All @@ -70,10 +82,8 @@ internal open class LocalStorageRepository(private val context: Context) {
/**
* Save the event queue to persistent storage.
*/
@Synchronized
open fun persistQueue(inMemoryQueue: List<Map<String, Any?>?>) {
ParselyTracker.PLog("Persisting event queue")
persistObject((inMemoryQueue + getStoredQueue()).distinct())
open suspend fun insertEvents(toInsert: List<Map<String, Any?>?>) = mutex.withLock {
persistObject(ArrayList((toInsert + getStoredQueue()).distinct()))
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.parsely.parselyandroid

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob

val sdkScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
internal val sdkScope =
CoroutineScope(SupervisorJob() + Dispatchers.IO + CoroutineName("Parse.ly SDK Scope"))
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import java.util.ArrayList;
import java.util.Formatter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.UUID;

import kotlin.Unit;

/**
* Tracks Parse.ly app views in Android apps
* <p>
Expand All @@ -53,7 +53,6 @@ public class ParselyTracker {
@SuppressWarnings("StringOperationCanBeSimplified")
// private static final String ROOT_URL = "http://10.0.2.2:5001/".intern(); // emulator localhost
private static final String ROOT_URL = "https://p1.parsely.com/".intern();
private final ArrayList<Map<String, Object>> eventQueue;
private boolean isDebug;
private final Context context;
private final Timer timer;
Expand All @@ -67,6 +66,8 @@ public class ParselyTracker {
private final HeartbeatIntervalCalculator intervalCalculator = new HeartbeatIntervalCalculator(new Clock());
@NonNull
private final LocalStorageRepository localStorageRepository;
@NonNull
private final InMemoryBuffer inMemoryBuffer;

/**
* Create a new ParselyTracker instance.
Expand All @@ -75,16 +76,20 @@ protected ParselyTracker(String siteId, int flushInterval, Context c) {
context = c.getApplicationContext();
eventsBuilder = new EventsBuilder(context, siteId);
localStorageRepository = new LocalStorageRepository(context);
flushManager = new FlushManager(this, flushInterval * 1000L,
ParselyCoroutineScopeKt.getSdkScope());
inMemoryBuffer = new InMemoryBuffer(ParselyCoroutineScopeKt.getSdkScope(), localStorageRepository, () -> {
if (!flushTimerIsActive()) {
startFlushTimer();
PLog("Flush flushTimer set to %ds", (flushManager.getIntervalMillis() / 1000));
}
return Unit.INSTANCE;
});

// get the adkey straight away on instantiation
timer = new Timer();
isDebug = false;

eventQueue = new ArrayList<>();

flushManager = new FlushManager(this, flushInterval * 1000L,
ParselyCoroutineScopeKt.getSdkScope());

if (localStorageRepository.getStoredQueue().size() > 0) {
startFlushTimer();
}
Expand All @@ -98,10 +103,6 @@ protected ParselyTracker(String siteId, int flushInterval, Context c) {
);
}

List<Map<String, Object>> getInMemoryQueue() {
return eventQueue;
}

/**
* Singleton instance accessor. Note: This must be called after {@link #sharedInstance(String, Context)}
*
Expand Down Expand Up @@ -410,19 +411,12 @@ public void resetVideo() {
* <p>
* Place a data structure representing the event into the in-memory queue for later use.
* <p>
* **Note**: Events placed into this queue will be discarded if the size of the persistent queue
* store exceeds {@link QueueManager#STORAGE_SIZE_LIMIT}.
*
* @param event The event Map to enqueue.
*/
void enqueueEvent(Map<String, Object> event) {
// Push it onto the queue
eventQueue.add(event);
new QueueManager(this, localStorageRepository).execute();
if (!flushTimerIsActive()) {
startFlushTimer();
PLog("Flush flushTimer set to %ds", (flushManager.getIntervalMillis() / 1000));
}
inMemoryBuffer.add(event);
}

/**
Expand Down Expand Up @@ -475,7 +469,6 @@ private boolean isReachable() {
}

void purgeEventsQueue() {
eventQueue.clear();
localStorageRepository.purgeStoredQueue();
}

Expand Down Expand Up @@ -536,7 +529,7 @@ private String generatePixelId() {
* @return The number of events waiting to be flushed to Parsely.
*/
public int queueSize() {
return eventQueue.size();
return localStorageRepository.getStoredQueue().size();
}

/**
Expand All @@ -553,24 +546,19 @@ private class FlushQueue extends AsyncTask<Void, Void, Void> {
@Override
protected synchronized Void doInBackground(Void... params) {
ArrayList<Map<String, Object>> storedQueue = localStorageRepository.getStoredQueue();
PLog("%d events in queue, %d stored events", eventQueue.size(), storedEventsCount());
PLog("%d events in stored queue", storedEventsCount());
// in case both queues have been flushed and app quits, don't crash
if ((eventQueue == null || eventQueue.size() == 0) && storedQueue.size() == 0) {
if (storedQueue.isEmpty()) {
stopFlushTimer();
return null;
}
if (!isReachable()) {
PLog("Network unreachable. Not flushing.");
return null;
}
HashSet<Map<String, Object>> hs = new HashSet<>();
ArrayList<Map<String, Object>> newQueue = new ArrayList<>();

hs.addAll(eventQueue);
hs.addAll(storedQueue);
newQueue.addAll(hs);
PLog("Flushing queue");
sendBatchRequest(newQueue);
sendBatchRequest(storedQueue);
return null;
}
}
Expand Down
30 changes: 0 additions & 30 deletions parsely/src/main/java/com/parsely/parselyandroid/QueueManager.kt

This file was deleted.

Loading

0 comments on commit 845947b

Please sign in to comment.