Skip to content

Commit

Permalink
Merge pull request #12 from outoftheboxplugins/event-batching
Browse files Browse the repository at this point in the history
Event batching
  • Loading branch information
goenning authored Nov 17, 2023
2 parents 5de35d0 + e5d76d7 commit a3fda71
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 31 deletions.
128 changes: 99 additions & 29 deletions Source/Aptabase/Private/AptabaseAnalyticsProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <HttpModule.h>
#include <Interfaces/IHttpResponse.h>
#include <Interfaces/IPluginManager.h>
#include <JsonObjectConverter.h>
#include <Kismet/GameplayStatics.h>
#include <Kismet/KismetInternationalizationLibrary.h>

Expand All @@ -13,13 +12,51 @@
#include "AptabaseSettings.h"
#include "ExtendedAnalyticsEventAttribute.h"

namespace
{
UGameInstance* GetCurrentGameInstance()
{
for (const FWorldContext& Context : GEngine->GetWorldContexts())
{
if (Context.WorldType == EWorldType::PIE || Context.WorldType == EWorldType::Game)
{
if (UGameInstance* GameInstance = Context.OwningGameInstance)
{
return GameInstance;
}
}
}

return nullptr;
}

bool IsInReleaseMode()
{
// TODO: This should be something more extensible/customizable.
// Developers should be able to decide on the fly if they are running Debug/Release

return UE_BUILD_SHIPPING;
}
} // namespace

void FAptabaseAnalyticsProvider::RecordExtendedEvent(const FString& EventName, const TArray<FExtendedAnalyticsEventAttribute>& Attributes)
{
RecordEventInternal(EventName, Attributes);
}

bool FAptabaseAnalyticsProvider::StartSession(const TArray<FAnalyticsEventAttribute>& Attributes)
{
const UGameInstance* GameInstance = GetCurrentGameInstance();
if (!ensure(GameInstance))
{
UE_LOG(LogAptabase, Warning, TEXT("Cannot start session: Invalid game instance."));
return false;
}

const UAptabaseSettings* Settings = GetDefault<UAptabaseSettings>();
const float SendInterval = IsInReleaseMode() ? Settings->SendInterval : Settings->DebugSendInterval;
GameInstance->GetTimerManager().SetTimer(BatchEventTimerHandle, FTimerDelegate::CreateRaw(this, &FAptabaseAnalyticsProvider::FlushEvents), SendInterval, true);

const int64 EpochInSeconds = FDateTime::UtcNow().ToUnixTimestamp();
const int Random = FMath::RandRange(0, 99999999);
const FString RandomString = FString::Printf(TEXT("%08d"), Random);
Expand All @@ -31,6 +68,17 @@ bool FAptabaseAnalyticsProvider::StartSession(const TArray<FAnalyticsEventAttrib

void FAptabaseAnalyticsProvider::EndSession()
{
if (BatchEventTimerHandle.IsValid())
{
if (const UGameInstance* GameInstance = GetCurrentGameInstance())
{
GameInstance->GetTimerManager().ClearTimer(BatchEventTimerHandle);
}
}

// Send any leftover events if any before closing the active session
FlushEvents();

bHasActiveSession = false;
}

Expand All @@ -47,7 +95,22 @@ bool FAptabaseAnalyticsProvider::SetSessionID(const FString& InSessionID)

void FAptabaseAnalyticsProvider::FlushEvents()
{
UE_LOG(LogAptabase, Log, TEXT("Aptabase implementation doesn't cache events"));
UE_LOG(LogAptabase, Verbose, TEXT("Flushing %s batched events."), *LexToString(BatchedEvents.Num()));

TArrayView<FAptabaseEventPayload> EventsToProcess = BatchedEvents;

while (!EventsToProcess.IsEmpty())
{
constexpr int32 NumEventsPerRequest = 25;

TArray<FAptabaseEventPayload> CurrentBatch;
CurrentBatch.Append(EventsToProcess.Left(NumEventsPerRequest));

EventsToProcess.RightChopInline(NumEventsPerRequest);
SendEventsNow(CurrentBatch);
}

BatchedEvents.Empty();
}

void FAptabaseAnalyticsProvider::SetUserID(const FString& InUserID)
Expand Down Expand Up @@ -83,74 +146,81 @@ void FAptabaseAnalyticsProvider::RecordEventInternal(const FString& EventName, c
return;
}

const UAptabaseSettings* Settings = GetDefault<UAptabaseSettings>();
const TSharedPtr<IPlugin> AptabasePlugin = IPluginManager::Get().FindPlugin("Aptabase");

FAptabaseEventPayload EventPayload;
EventPayload.EventName = EventName;
EventPayload.SessionId = SessionId;
EventPayload.EventAttributes = Attributes;
EventPayload.TimeStamp = FDateTime::UtcNow().ToIso8601();
EventPayload.SystemProps.Locale = UKismetInternationalizationLibrary::GetCurrentLocale();
EventPayload.SystemProps.AppVersion = GetDefault<UGeneralProjectSettings>()->ProjectVersion;
EventPayload.SystemProps.SdkVersion = FString::Printf(TEXT("aptabase-unreal@%s"), *AptabasePlugin->GetDescriptor().VersionName);
EventPayload.SystemProps.OsName = UGameplayStatics::GetPlatformName();
EventPayload.SystemProps.OsVersion = FPlatformMisc::GetOSVersion();
EventPayload.SystemProps.IsDebug = !UE_BUILD_SHIPPING;
EventPayload.SystemProps.IsDebug = !IsInReleaseMode();

const TSharedPtr<FJsonObject> Props = MakeShared<FJsonObject>();
UE_LOG(LogAptabase, Verbose, TEXT("Batching event (%s) for next flush."), *EventName);
BatchedEvents.Emplace(EventPayload);
}

void FAptabaseAnalyticsProvider::SendEventsNow(const TArray<FAptabaseEventPayload>& EventPayloads)
{
TArray<TSharedPtr<FJsonValue>> Events;

for (const FExtendedAnalyticsEventAttribute& Attribute : Attributes)
UE_LOG(LogAptabase, VeryVerbose, TEXT("Sending batch containing:"));
for (const FAptabaseEventPayload& EventPayload : EventPayloads)
{
const auto& AttributeValue = Attribute.Value;
UE_LOG(LogAptabase, VeryVerbose, TEXT("Event: %s"), *EventPayload.EventName);
const TSharedPtr<FJsonObject>& JsonPayload = EventPayload.ToJsonObject();

if (AttributeValue.IsType<double>())
{
Props->SetField(Attribute.Key, MakeShared<FJsonValueNumber>(AttributeValue.Get<double>()));
}
else if (AttributeValue.IsType<float>())
{
Props->SetField(Attribute.Key, MakeShared<FJsonValueNumber>(AttributeValue.Get<float>()));
}
else
{
Props->SetField(Attribute.Key, MakeShared<FJsonValueString>(AttributeValue.Get<FString>()));
}
Events.Add(MakeShared<FJsonValueObject>(JsonPayload));
}

const FString RequestUrl = FString::Printf(TEXT("%s/api/v0/event"), *Settings->GetApiUrl());
const UAptabaseSettings* Settings = GetDefault<UAptabaseSettings>();

const FString RequestUrl = FString::Printf(TEXT("%s/api/v0/events"), *Settings->GetApiUrl());

FString RequestJsonPayload;
const TSharedPtr<FJsonObject> Payload = FJsonObjectConverter::UStructToJsonObject(EventPayload);
Payload->SetObjectField("props", Props);

const TSharedRef<TJsonWriter<>> JsonWriter = TJsonWriterFactory<>::Create(&RequestJsonPayload, 0);
FJsonSerializer::Serialize(Payload.ToSharedRef(), JsonWriter);

UE_LOG(LogAptabase, Verbose, TEXT("Sending event: %s to %s Payload: %s"), *EventName, *RequestUrl, *RequestJsonPayload);
FJsonSerializer::Serialize(Events, JsonWriter);

const FHttpRequestRef HttpRequest = FHttpModule::Get().CreateRequest();
HttpRequest->SetVerb("POST");
HttpRequest->SetContentAsString(RequestJsonPayload);
HttpRequest->SetHeader(TEXT("App-Key"), Settings->AppKey);
HttpRequest->SetHeader(TEXT("Content-Type"), TEXT("application/json"));
HttpRequest->SetURL(RequestUrl);
HttpRequest->OnProcessRequestComplete().BindRaw(this, &FAptabaseAnalyticsProvider::OnEventRecoded);
HttpRequest->OnProcessRequestComplete().BindRaw(this, &FAptabaseAnalyticsProvider::OnEventsRecoded, EventPayloads);
HttpRequest->ProcessRequest();
}

void FAptabaseAnalyticsProvider::OnEventRecoded(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bWasSuccessful)
void FAptabaseAnalyticsProvider::OnEventsRecoded(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bWasSuccessful, TArray<FAptabaseEventPayload> OriginalEvents)
{
if (!bWasSuccessful)
{
UE_LOG(LogAptabase, Error, TEXT("Request to record the event was unsuccessful."));
return;
}

if (!EHttpResponseCodes::IsOk(Response->GetResponseCode()))
const auto ResponseCode = Response->GetResponseCode();
if (!EHttpResponseCodes::IsOk(ResponseCode))
{
UE_LOG(LogAptabase, Error, TEXT("Request to record the event received unexpected code: %s"), *LexToString(Response->GetResponseCode()));

if (ResponseCode >= 400 && ResponseCode < 500)
{
UE_LOG(LogAptabase, Error, TEXT("Data was sent in the wrong format. Event will be skipped."))
}
else if (ResponseCode >= 500)
{
UE_LOG(LogAptabase, Error, TEXT("Server-side issue. Event will be re-queued."))
BatchedEvents.Append(OriginalEvents);
}

return;
}

UE_LOG(LogAptabase, Verbose, TEXT("Event recorded successfully."));
UE_LOG(LogAptabase, VeryVerbose, TEXT("Event recorded successfully."));
}
18 changes: 16 additions & 2 deletions Source/Aptabase/Private/AptabaseAnalyticsProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
#include <Interfaces/IAnalyticsProvider.h>
#include <Interfaces/IHttpRequest.h>

#include "AptabaseData.h"

struct FExtendedAnalyticsEventAttribute;

/**
* Implementation of Aptabase Analytics provider
*/
class FAptabaseAnalyticsProvider : public IAnalyticsProvider
class FAptabaseAnalyticsProvider final : public IAnalyticsProvider
{
public:
/**
Expand All @@ -27,14 +29,18 @@ class FAptabaseAnalyticsProvider : public IAnalyticsProvider
virtual FString GetUserID() const override;
virtual void RecordEvent(const FString& EventName, const TArray<FAnalyticsEventAttribute>& Attributes) override;
// End IAnalyticsProvider Interface
/**
* @brief Instantly sends an Aptabase event to the backend
*/
void SendEventsNow(const TArray<FAptabaseEventPayload>& EventPayloads);
/**
* Internal function for common code in recording events
*/
void RecordEventInternal(const FString& EventName, const TArray<FExtendedAnalyticsEventAttribute>& Attributes);
/**
* @brief Callback executed when an event is successfully recoded by the analytics backend.
*/
void OnEventRecoded(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bWasSuccessful);
void OnEventsRecoded(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bWasSuccessful, TArray<FAptabaseEventPayload> OriginalEvents);
/**
* @brief Current Id of the user, required by the IAnalyticsProvider interface
* @warning Aptabase is a privacy-first solution and will NOT send the UserId to the backend.
Expand All @@ -48,4 +54,12 @@ class FAptabaseAnalyticsProvider : public IAnalyticsProvider
* @brief Indicates if the user has an active session running.
*/
bool bHasActiveSession = false;
/**
* @brief Timer Delegate handle for the periodically flushing of the batched events.
*/
FTimerHandle BatchEventTimerHandle;
/**
* @brief Events we recoded but haven't sent to the backend yet. Waiting for next flush.
*/
TArray<FAptabaseEventPayload> BatchedEvents;
};
30 changes: 30 additions & 0 deletions Source/Aptabase/Private/AptabaseData.cpp
Original file line number Diff line number Diff line change
@@ -1 +1,31 @@
#include "AptabaseData.h"

#include <JsonObjectConverter.h>

TSharedPtr<FJsonObject> FAptabaseEventPayload::ToJsonObject() const
{
const TSharedPtr<FJsonObject> Props = MakeShared<FJsonObject>();

for (const FExtendedAnalyticsEventAttribute& Attribute : EventAttributes)
{
const auto& AttributeValue = Attribute.Value;

if (AttributeValue.IsType<double>())
{
Props->SetField(Attribute.Key, MakeShared<FJsonValueNumber>(AttributeValue.Get<double>()));
}
else if (AttributeValue.IsType<float>())
{
Props->SetField(Attribute.Key, MakeShared<FJsonValueNumber>(AttributeValue.Get<float>()));
}
else
{
Props->SetField(Attribute.Key, MakeShared<FJsonValueString>(AttributeValue.Get<FString>()));
}
}

const TSharedPtr<FJsonObject> Payload = FJsonObjectConverter::UStructToJsonObject(*this);
Payload->SetObjectField("props", Props);

return Payload;
}
12 changes: 12 additions & 0 deletions Source/Aptabase/Private/AptabaseData.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "ExtendedAnalyticsEventAttribute.h"

#include "AptabaseData.generated.h"

/**
Expand Down Expand Up @@ -80,4 +82,14 @@ struct FAptabaseEventPayload
*/
UPROPERTY()
FAptabaseSystemProperties SystemProps;

/**
* @brief Additional Event attributes to be sent along-side the main properties
*/
TArray<FExtendedAnalyticsEventAttribute> EventAttributes;

/**
* @brief Converts the current data to a JSON payload for the backend HTTP requests
*/
TSharedPtr<FJsonObject> ToJsonObject() const;
};
12 changes: 12 additions & 0 deletions Source/Aptabase/Private/AptabaseSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ class APTABASE_API UAptabaseSettings : public UDeveloperSettings
*/
UPROPERTY(Config, EditAnywhere, Category = "Aptabase Analytics", meta = (EditCondition = "Host == EAptabaseHost::SH", EditConditionHides))
FString CustomHost;
/**
* @brief How often the analytics provider will send the currently batched events to the backend
* @note in seconds
*/
UPROPERTY(Config, EditAnywhere, Category = "Aptabase Analytics", meta = (Unit = "s"))
float SendInterval = 60.0f;
/**
* @brief **DEBUG MODE**: How often the analytics provider will send the currently batched events to the backend
* @note in seconds
*/
UPROPERTY(Config, EditAnywhere, Category = "Aptabase Analytics", meta = (Unit = "s"))
float DebugSendInterval = 2.0f;

private:
// Begin UDeveloperSettings interface
Expand Down

0 comments on commit a3fda71

Please sign in to comment.