Skip to content

Commit

Permalink
[otlp] OTLP Exporter Custom serializer - Logs (open-telemetry#5941)
Browse files Browse the repository at this point in the history
  • Loading branch information
rajkumar-rangaraj authored Oct 31, 2024
1 parent 09b654e commit 1fc0ebc
Show file tree
Hide file tree
Showing 6 changed files with 616 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;

internal static class ProtobufOtlpLogFieldNumberConstants
{
// Resource Logs
#pragma warning disable SA1310 // Field names should not contain underscore
internal const int ResourceLogs_Resource = 1;
internal const int ResourceLogs_Scope_Logs = 2;
internal const int ResourceLogs_Schema_Url = 3;

// Resource
internal const int Resource_Attributes = 1;

// ScopeLogs
internal const int ScopeLogs_Scope = 1;
internal const int ScopeLogs_Log_Records = 2;
internal const int ScopeLogs_Schema_Url = 3;

// LogRecord
internal const int LogRecord_Time_Unix_Nano = 1;
internal const int LogRecord_Observed_Time_Unix_Nano = 11;
internal const int LogRecord_Severity_Number = 2;
internal const int LogRecord_Severity_Text = 3;
internal const int LogRecord_Body = 5;
internal const int LogRecord_Attributes = 6;
internal const int LogRecord_Dropped_Attributes_Count = 7;
internal const int LogRecord_Flags = 8;
internal const int LogRecord_Trace_Id = 9;
internal const int LogRecord_Span_Id = 10;

// SeverityNumber
internal const int Severity_Number_Unspecified = 0;
internal const int Severity_Number_Trace = 1;
internal const int Severity_Number_Trace2 = 2;
internal const int Severity_Number_Trace3 = 3;
internal const int Severity_Number_Trace4 = 4;
internal const int Severity_Number_Debug = 5;
internal const int Severity_Number_Debug2 = 6;
internal const int Severity_Number_Debug3 = 7;
internal const int Severity_Number_Debug4 = 8;
internal const int Severity_Number_Info = 9;
internal const int Severity_Number_Info2 = 10;
internal const int Severity_Number_Info3 = 11;
internal const int Severity_Number_Info4 = 12;
internal const int Severity_Number_Warn = 13;
internal const int Severity_Number_Warn2 = 14;
internal const int Severity_Number_Warn3 = 15;
internal const int Severity_Number_Warn4 = 16;
internal const int Severity_Number_Error = 17;
internal const int Severity_Number_Error2 = 18;
internal const int Severity_Number_Error3 = 19;
internal const int Severity_Number_Error4 = 20;
internal const int Severity_Number_Fatal = 21;
internal const int Severity_Number_Fatal2 = 22;
internal const int Severity_Number_Fatal3 = 23;
internal const int Severity_Number_Fatal4 = 24;

// LogRecordFlags

internal const int LogRecord_Flags_Do_Not_Use = 0;
internal const int LogRecord_Flags_Trace_Flags_Mask = 0x000000FF;

// InstrumentationScope
internal const int InstrumentationScope_Name = 1;
internal const int InstrumentationScope_Version = 2;
internal const int InstrumentationScope_Attributes = 3;
internal const int InstrumentationScope_Dropped_Attributes_Count = 4;

// KeyValue
internal const int KeyValue_Key = 1;
internal const int KeyValue_Value = 2;

// AnyValue
internal const int AnyValue_String_Value = 1;
internal const int AnyValue_Bool_Value = 2;
internal const int AnyValue_Int_Value = 3;
internal const int AnyValue_Double_Value = 4;
internal const int AnyValue_Array_Value = 5;
internal const int AnyValue_Kvlist_Value = 6;
internal const int AnyValue_Bytes_Value = 7;

internal const int ArrayValue_Value = 1;
#pragma warning restore SA1310 // Field names should not contain underscore
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Internal;
using OpenTelemetry.Logs;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;

internal static class ProtobufOtlpLogSerializer
{
private const int ReserveSizeForLength = 4;
private const int TraceIdSize = 16;
private const int SpanIdSize = 8;

private static readonly Stack<List<LogRecord>> LogsListPool = [];
private static readonly Dictionary<string, List<LogRecord>> ScopeLogsList = [];

internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, in Batch<LogRecord> logRecordBatch)
{
foreach (var logRecord in logRecordBatch)
{
var scopeName = logRecord.Logger.Name;
if (!ScopeLogsList.TryGetValue(scopeName, out var logRecords))
{
logRecords = LogsListPool.Count > 0 ? LogsListPool.Pop() : [];
ScopeLogsList[scopeName] = logRecords;
}

logRecords.Add(logRecord);
}

writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
ReturnLogRecordListToPool();

return writePosition;
}

internal static void ReturnLogRecordListToPool()
{
if (ScopeLogsList.Count != 0)
{
foreach (var entry in ScopeLogsList)
{
entry.Value.Clear();
LogsListPool.Push(entry.Value);
}

ScopeLogsList.Clear();
}
}

internal static int WriteResourceLogs(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, Dictionary<string, List<LogRecord>> scopeLogs)
{
writePosition = ProtobufOtlpResourceSerializer.WriteResource(buffer, writePosition, resource);
writePosition = WriteScopeLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, scopeLogs);
return writePosition;
}

internal static int WriteScopeLogs(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Dictionary<string, List<LogRecord>> scopeLogs)
{
if (scopeLogs != null)
{
foreach (KeyValuePair<string, List<LogRecord>> entry in scopeLogs)
{
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpLogFieldNumberConstants.ResourceLogs_Scope_Logs, ProtobufWireType.LEN);
int resourceLogsScopeLogsLengthPosition = writePosition;
writePosition += ReserveSizeForLength;

writePosition = WriteScopeLog(buffer, writePosition, sdkLimitOptions, experimentalOptions, entry.Value[0].Logger.Name, entry.Value);
ProtobufSerializer.WriteReservedLength(buffer, resourceLogsScopeLogsLengthPosition, writePosition - (resourceLogsScopeLogsLengthPosition + ReserveSizeForLength));
}
}

return writePosition;
}

internal static int WriteScopeLog(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, string loggerName, List<LogRecord> logRecords)
{
var value = loggerName.AsSpan();
var numberOfUtf8CharsInString = ProtobufSerializer.GetNumberOfUtf8CharsInString(value);
var serializedLengthSize = ProtobufSerializer.ComputeVarInt64Size((ulong)numberOfUtf8CharsInString);

// numberOfUtf8CharsInString + tagSize + length field size.
writePosition = ProtobufSerializer.WriteTagAndLength(buffer, writePosition, numberOfUtf8CharsInString + 1 + serializedLengthSize, ProtobufOtlpLogFieldNumberConstants.ScopeLogs_Scope, ProtobufWireType.LEN);
writePosition = ProtobufSerializer.WriteStringWithTag(buffer, writePosition, ProtobufOtlpLogFieldNumberConstants.InstrumentationScope_Name, numberOfUtf8CharsInString, value);

for (int i = 0; i < logRecords.Count; i++)
{
writePosition = WriteLogRecord(buffer, writePosition, sdkLimitOptions, experimentalOptions, logRecords[i]);
}

return writePosition;
}

internal static int WriteLogRecord(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, LogRecord logRecord)
{
var attributeValueLengthLimit = sdkLimitOptions.LogRecordAttributeValueLengthLimit;
var attributeCountLimit = sdkLimitOptions.LogRecordAttributeCountLimit ?? int.MaxValue;

ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState = new ProtobufOtlpTagWriter.OtlpTagWriterState
{
Buffer = buffer,
WritePosition = writePosition,
TagCount = 0,
DroppedTagCount = 0,
};

otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.ScopeLogs_Log_Records, ProtobufWireType.LEN);
int logRecordLengthPosition = otlpTagWriterState.WritePosition;
otlpTagWriterState.WritePosition += ReserveSizeForLength;

var timestamp = (ulong)logRecord.Timestamp.ToUnixTimeNanoseconds();
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteFixed64WithTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Time_Unix_Nano, timestamp);
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteFixed64WithTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Observed_Time_Unix_Nano, timestamp);

otlpTagWriterState.WritePosition = ProtobufSerializer.WriteEnumWithTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Severity_Number, logRecord.Severity.HasValue ? (int)logRecord.Severity : 0);

if (!string.IsNullOrWhiteSpace(logRecord.SeverityText))
{
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteStringWithTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Severity_Text, logRecord.SeverityText!);
}
else if (logRecord.Severity.HasValue)
{
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteStringWithTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Severity_Text, logRecord.Severity.Value.ToShortName());
}

if (experimentalOptions.EmitLogEventAttributes)
{
if (logRecord.EventId.Id != default)
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, ExperimentalOptions.LogRecordEventIdAttribute, logRecord.EventId.Id, attributeCountLimit, attributeValueLengthLimit);
}

if (!string.IsNullOrEmpty(logRecord.EventId.Name))
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, ExperimentalOptions.LogRecordEventNameAttribute, logRecord.EventId.Name!, attributeCountLimit, attributeValueLengthLimit);
}
}

if (logRecord.Exception != null)
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, SemanticConventions.AttributeExceptionType, logRecord.Exception.GetType().Name, attributeCountLimit, attributeValueLengthLimit);
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, SemanticConventions.AttributeExceptionMessage, logRecord.Exception.Message, attributeCountLimit, attributeValueLengthLimit);
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, SemanticConventions.AttributeExceptionStacktrace, logRecord.Exception.ToInvariantString(), attributeCountLimit, attributeValueLengthLimit);
}

bool bodyPopulatedFromFormattedMessage = false;
bool isLogRecordBodySet = false;

if (logRecord.FormattedMessage != null)
{
otlpTagWriterState.WritePosition = WriteLogRecordBody(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, logRecord.FormattedMessage.AsSpan());
bodyPopulatedFromFormattedMessage = true;
isLogRecordBodySet = true;
}

if (logRecord.Attributes != null)
{
foreach (var attribute in logRecord.Attributes)
{
// Special casing {OriginalFormat}
// See https://github.com/open-telemetry/opentelemetry-dotnet/pull/3182
// for explanation.
if (attribute.Key.Equals("{OriginalFormat}") && !bodyPopulatedFromFormattedMessage)
{
otlpTagWriterState.WritePosition = WriteLogRecordBody(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, (attribute.Value as string).AsSpan());
isLogRecordBodySet = true;
}
else
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, attribute, attributeCountLimit, attributeValueLengthLimit);
}
}

// Supports setting Body directly on LogRecord for the Logs Bridge API.
if (!isLogRecordBodySet && logRecord.Body != null)
{
// If {OriginalFormat} is not present in the attributes,
// use logRecord.Body if it is set.
otlpTagWriterState.WritePosition = WriteLogRecordBody(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, logRecord.Body.AsSpan());
}
}

if (logRecord.TraceId != default && logRecord.SpanId != default)
{
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTagAndLength(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, TraceIdSize, ProtobufOtlpLogFieldNumberConstants.LogRecord_Trace_Id, ProtobufWireType.LEN);
otlpTagWriterState.WritePosition = ProtobufOtlpTraceSerializer.WriteTraceId(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, logRecord.TraceId);

otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTagAndLength(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, SpanIdSize, ProtobufOtlpLogFieldNumberConstants.LogRecord_Span_Id, ProtobufWireType.LEN);
otlpTagWriterState.WritePosition = ProtobufOtlpTraceSerializer.WriteSpanId(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, logRecord.SpanId);

otlpTagWriterState.WritePosition = ProtobufSerializer.WriteFixed32WithTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Flags, (uint)logRecord.TraceFlags);
}

/*
* TODO: Handle scopes, otlpTagWriterState needs to be passed as ref.
logRecord.ForEachScope(ProcessScope, otlpTagWriterState);
void ProcessScope(LogRecordScope scope, ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState)
{
foreach (var scopeItem in scope)
{
if (scopeItem.Key.Equals("{OriginalFormat}") || string.IsNullOrEmpty(scopeItem.Key))
{
// Ignore if the scope key is empty.
// Ignore if the scope key is {OriginalFormat}
// Attributes should not contain duplicates,
// and it is expensive to de-dup, so this
// exporter is going to pass the scope items as is.
// {OriginalFormat} is going to be the key
// if one uses formatted string for scopes
// and if there are nested scopes, this is
// guaranteed to create duplicate keys.
// Similar for empty keys, which is what the
// key is going to be if user simply
// passes a string as scope.
// To summarize this exporter only allows
// IReadOnlyList<KeyValuePair<string, object?>>
// or IEnumerable<KeyValuePair<string, object?>>.
// and expect users to provide unique keys.
// Note: It is possible that we allow users
// to override this exporter feature. So not blocking
// empty/{OriginalFormat} in the SDK itself.
}
else
{
otlpTagWriterState = AddLogAttribute(ref otlpTagWriterState, scopeItem, attributeCountLimit, attributeValueLengthLimit);
}
}
}
*/

if (otlpTagWriterState.DroppedTagCount > 0)
{
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Dropped_Attributes_Count, ProtobufWireType.VARINT);
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteVarInt32(buffer, otlpTagWriterState.WritePosition, (uint)otlpTagWriterState.DroppedTagCount);
}

ProtobufSerializer.WriteReservedLength(otlpTagWriterState.Buffer, logRecordLengthPosition, otlpTagWriterState.WritePosition - (logRecordLengthPosition + ReserveSizeForLength));

return otlpTagWriterState.WritePosition;
}

private static int WriteLogRecordBody(byte[] buffer, int writePosition, ReadOnlySpan<char> value)
{
var numberOfUtf8CharsInString = ProtobufSerializer.GetNumberOfUtf8CharsInString(value);
var serializedLengthSize = ProtobufSerializer.ComputeVarInt64Size((ulong)numberOfUtf8CharsInString);

// length = numberOfUtf8CharsInString + tagSize + length field size.
writePosition = ProtobufSerializer.WriteTagAndLength(buffer, writePosition, numberOfUtf8CharsInString + 1 + serializedLengthSize, ProtobufOtlpLogFieldNumberConstants.LogRecord_Body, ProtobufWireType.LEN);
writePosition = ProtobufSerializer.WriteStringWithTag(buffer, writePosition, ProtobufOtlpTraceFieldNumberConstants.AnyValue_String_Value, numberOfUtf8CharsInString, value);
return writePosition;
}

private static ProtobufOtlpTagWriter.OtlpTagWriterState AddLogAttribute(ref ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState, KeyValuePair<string, object?> attribute, int maxAttributeCount, int? maxValueLength)
{
return AddLogAttribute(ref otlpTagWriterState, attribute.Key, attribute.Value, maxAttributeCount, maxValueLength);
}

private static ProtobufOtlpTagWriter.OtlpTagWriterState AddLogAttribute(ref ProtobufOtlpTagWriter.OtlpTagWriterState otlpTagWriterState, string key, object? value, int maxAttributeCount, int? maxValueLength)
{
if (otlpTagWriterState.TagCount == maxAttributeCount)
{
otlpTagWriterState.DroppedTagCount++;
}
else
{
otlpTagWriterState.WritePosition = ProtobufSerializer.WriteTag(otlpTagWriterState.Buffer, otlpTagWriterState.WritePosition, ProtobufOtlpLogFieldNumberConstants.LogRecord_Attributes, ProtobufWireType.LEN);
int logAttributesLengthPosition = otlpTagWriterState.WritePosition;
otlpTagWriterState.WritePosition += ReserveSizeForLength;

ProtobufOtlpTagWriter.Instance.TryWriteTag(ref otlpTagWriterState, key, value, maxValueLength);

var logAttributesLength = otlpTagWriterState.WritePosition - (logAttributesLengthPosition + ReserveSizeForLength);
ProtobufSerializer.WriteReservedLength(otlpTagWriterState.Buffer, logAttributesLengthPosition, logAttributesLength);
otlpTagWriterState.TagCount++;
}

return otlpTagWriterState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ protected override void OnUnsupportedTagDropped(
internal struct OtlpTagWriterState
{
public byte[] Buffer;
public int DroppedTagCount;
public int TagCount;
public int WritePosition;
}

Expand Down
Loading

0 comments on commit 1fc0ebc

Please sign in to comment.