Skip to content

Commit

Permalink
Merge branch 'coroutines' into engagement_manager_coroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
wzieba committed Nov 29, 2023
2 parents 5cbea9f + 62cfa12 commit 0a526b5
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 222 deletions.
25 changes: 17 additions & 8 deletions parsely/src/main/java/com/parsely/parselyandroid/FlushManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,35 @@ import kotlinx.coroutines.launch
* Handles stopping and starting the flush timer. The flush timer
* controls how often we send events to Parse.ly servers.
*/
internal open class FlushManager(
private val parselyTracker: ParselyTracker,
val intervalMillis: Long,
internal interface FlushManager {
fun start()
fun stop()
val isRunning: Boolean
val intervalMillis: Long
}

internal class ParselyFlushManager(
private val onFlush: () -> Unit,
override val intervalMillis: Long,
private val coroutineScope: CoroutineScope
) {
) : FlushManager {
private var job: Job? = null

open fun start() {
override fun start() {
if (job?.isActive == true) return

job = coroutineScope.launch {
while (isActive) {
delay(intervalMillis)
parselyTracker.flushEvents()
onFlush.invoke()
}
}
}

open fun stop() = job?.cancel()
override fun stop() {
job?.cancel()
}

open val isRunning: Boolean
override val isRunning: Boolean
get() = job?.isActive ?: false
}
51 changes: 51 additions & 0 deletions parsely/src/main/java/com/parsely/parselyandroid/FlushQueue.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.parsely.parselyandroid

import com.parsely.parselyandroid.JsonSerializer.toParselyEventsPayload
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal class FlushQueue(
private val flushManager: FlushManager,
private val repository: QueueRepository,
private val restClient: RestClient,
private val scope: CoroutineScope
) {

private val mutex = Mutex()

operator fun invoke(skipSendingEvents: Boolean) {
scope.launch {
mutex.withLock {
val eventsToSend = repository.getStoredQueue()

if (eventsToSend.isEmpty()) {
flushManager.stop()
return@launch
}

if (skipSendingEvents) {
ParselyTracker.PLog("Debug mode on. Not sending to Parse.ly. Otherwise, would sent ${eventsToSend.size} events")
repository.remove(eventsToSend)
return@launch
}
ParselyTracker.PLog("Sending request with %d events", eventsToSend.size)
val jsonPayload = toParselyEventsPayload(eventsToSend)
ParselyTracker.PLog("POST Data %s", jsonPayload)
ParselyTracker.PLog("Requested %s", ParselyTracker.ROOT_URL)
restClient.send(jsonPayload)
.fold(
onSuccess = {
ParselyTracker.PLog("Pixel request success")
repository.remove(eventsToSend)
},
onFailure = {
ParselyTracker.PLog("Pixel request exception")
ParselyTracker.PLog(it.toString())
}
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlinx.coroutines.sync.withLock

internal class InMemoryBuffer(
private val coroutineScope: CoroutineScope,
private val localStorageRepository: LocalStorageRepository,
private val localStorageRepository: QueueRepository,
private val onEventAddedListener: () -> Unit,
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ import java.io.ObjectOutputStream
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal open class LocalStorageRepository(private val context: Context) {
internal interface QueueRepository {
suspend fun remove(toRemove: List<Map<String, Any?>?>)
suspend fun getStoredQueue(): ArrayList<Map<String, Any?>?>
suspend fun insertEvents(toInsert: List<Map<String, Any?>?>)
}

internal class LocalStorageRepository(private val context: Context) : QueueRepository {

private val mutex = Mutex()

Expand All @@ -32,21 +38,7 @@ internal open class LocalStorageRepository(private val context: Context) {
}
}

open suspend fun remove(toRemove: List<Map<String, Any?>?>) {
val storedEvents = getStoredQueue()

mutex.withLock {
persistObject(storedEvents - toRemove.toSet())
}
}

/**
* Get the stored event queue from persistent storage.
*
* @return The stored queue of events.
*/
open suspend fun getStoredQueue(): ArrayList<Map<String, Any?>?> = mutex.withLock {

private fun getInternalStoredQueue(): ArrayList<Map<String, Any?>?> {
var storedQueue: ArrayList<Map<String, Any?>?> = ArrayList()
try {
val fis = context.applicationContext.openFileInput(STORAGE_KEY)
Expand All @@ -68,15 +60,26 @@ internal open class LocalStorageRepository(private val context: Context) {
return storedQueue
}

override suspend fun remove(toRemove: List<Map<String, Any?>?>) = mutex.withLock {
val storedEvents = getInternalStoredQueue()
persistObject(storedEvents - toRemove.toSet())
}

/**
* Save the event queue to persistent storage.
* Get the stored event queue from persistent storage.
*
* @return The stored queue of events.
*/
open suspend fun insertEvents(toInsert: List<Map<String, Any?>?>){
val storedEvents = getStoredQueue()
override suspend fun getStoredQueue(): ArrayList<Map<String, Any?>?> = mutex.withLock {
getInternalStoredQueue()
}

mutex.withLock {
persistObject(ArrayList((toInsert + storedEvents).distinct()))
}
/**
* Save the event queue to persistent storage.
*/
override suspend fun insertEvents(toInsert: List<Map<String, Any?>?>) = mutex.withLock {
val storedEvents = getInternalStoredQueue()
persistObject(ArrayList((toInsert + storedEvents).distinct()))
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package com.parsely.parselyandroid

import java.net.HttpURLConnection
import java.net.URL
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext

internal open class ParselyAPIConnection(private val url: String) {
open suspend fun send(payload: String): Result<Unit> {
internal interface RestClient {
suspend fun send(payload: String): Result<Unit>
}

internal class ParselyAPIConnection(private val url: String) : RestClient {
override suspend fun send(payload: String): Result<Unit> {
var connection: HttpURLConnection? = null
try {
connection = URL(url).openConnection() as HttpURLConnection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.UUID;

import kotlin.Unit;
import kotlin.jvm.functions.Function0;

/**
* Tracks Parse.ly app views in Android apps
Expand Down Expand Up @@ -62,7 +63,7 @@ public class ParselyTracker {
@NonNull
private final InMemoryBuffer inMemoryBuffer;
@NonNull
private final SendEvents sendEvents;
private final FlushQueue flushQueue;

/**
* Create a new ParselyTracker instance.
Expand All @@ -71,7 +72,13 @@ protected ParselyTracker(String siteId, int flushInterval, Context c) {
context = c.getApplicationContext();
eventsBuilder = new EventsBuilder(context, siteId);
localStorageRepository = new LocalStorageRepository(context);
flushManager = new FlushManager(this, flushInterval * 1000L,
flushManager = new ParselyFlushManager(new Function0<Unit>() {
@Override
public Unit invoke() {
flushEvents();
return Unit.INSTANCE;
}
}, flushInterval * 1000L,
ParselyCoroutineScopeKt.getSdkScope());
inMemoryBuffer = new InMemoryBuffer(ParselyCoroutineScopeKt.getSdkScope(), localStorageRepository, () -> {
if (!flushTimerIsActive()) {
Expand All @@ -80,15 +87,14 @@ protected ParselyTracker(String siteId, int flushInterval, Context c) {
}
return Unit.INSTANCE;
});
sendEvents = new SendEvents(flushManager, localStorageRepository, new ParselyAPIConnection(ROOT_URL + "mobileproxy"), ParselyCoroutineScopeKt.getSdkScope());
flushQueue = new FlushQueue(flushManager, localStorageRepository, new ParselyAPIConnection(ROOT_URL + "mobileproxy"), ParselyCoroutineScopeKt.getSdkScope());
clock = new Clock();
intervalCalculator = new HeartbeatIntervalCalculator(clock);

// get the adkey straight away on instantiation
isDebug = false;

final SdkInit sdkInit = new SdkInit(ParselyCoroutineScopeKt.getSdkScope(), localStorageRepository, flushManager);
sdkInit.initialize();
flushManager.start();

ProcessLifecycleOwner.get().getLifecycle().addObserver(
(LifecycleEventObserver) (lifecycleOwner, event) -> {
Expand Down Expand Up @@ -466,7 +472,6 @@ void flushEvents() {
PLog("Network unreachable. Not flushing.");
return;
}
sendEvents.invoke(isDebug);
flushQueue.invoke(isDebug);
}

}
18 changes: 0 additions & 18 deletions parsely/src/main/java/com/parsely/parselyandroid/SdkInit.kt

This file was deleted.

57 changes: 0 additions & 57 deletions parsely/src/main/java/com/parsely/parselyandroid/SendEvents.kt

This file was deleted.

Loading

0 comments on commit 0a526b5

Please sign in to comment.