From 2a54285526c66c2a607a2598e8fe6c3d96028f11 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 16 Nov 2024 12:52:07 +0100 Subject: [PATCH] Bump Arcane.Framework from 0.0.35 to 0.0.47 in /src (#55) * Bump Arcane.Framework from 0.0.35 to 0.0.47 in /src Bumps [Arcane.Framework](https://github.com/SneaksAndData/arcane-framework) from 0.0.35 to 0.0.47. - [Release notes](https://github.com/SneaksAndData/arcane-framework/releases) - [Commits](https://github.com/SneaksAndData/arcane-framework/compare/v0.0.35...v0.0.47) --- updated-dependencies: - dependency-name: Arcane.Framework dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Fix incompatible changes --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Vitalii Savitskii Co-authored-by: George Zubrienko --- src/Arcane.Stream.Cdm.csproj | 2 +- src/GraphBuilder/CdmChangeFeedGraphBuilder.cs | 16 +++++++++++++--- src/Models/CdmChangeFeedStreamContext.cs | 6 +++++- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/Arcane.Stream.Cdm.csproj b/src/Arcane.Stream.Cdm.csproj index 9346a8a..4a401c7 100644 --- a/src/Arcane.Stream.Cdm.csproj +++ b/src/Arcane.Stream.Cdm.csproj @@ -6,7 +6,7 @@ Arcane.Stream.Cdm - + diff --git a/src/GraphBuilder/CdmChangeFeedGraphBuilder.cs b/src/GraphBuilder/CdmChangeFeedGraphBuilder.cs index c5d4271..f39cc36 100644 --- a/src/GraphBuilder/CdmChangeFeedGraphBuilder.cs +++ b/src/GraphBuilder/CdmChangeFeedGraphBuilder.cs @@ -3,8 +3,10 @@ using System.Threading.Tasks; using Akka.Streams; using Akka.Streams.Dsl; +using Akka.Util; using Arcane.Framework.Contracts; using Arcane.Framework.Services.Base; +using Arcane.Framework.Sinks.Models; using Arcane.Framework.Sinks.Parquet; using Arcane.Framework.Sources.CdmChangeFeedSource; using Arcane.Stream.Cdm.Models; @@ -19,13 +21,15 @@ public class CdmChangeFeedGraphBuilder : IStreamGraphBuilder BuildGraph(CdmChangeFeedStreamContext context) @@ -38,7 +42,11 @@ public CdmChangeFeedGraphBuilder(IBlobStorageService blobStorageService, Metrics context.SchemaUpdateInterval); var dimensions = source.GetDefaultTags().GetAsDictionary(context, context.StreamId); - var parquetSink = ParquetSinkFromContext(context, source.GetParquetSchema(), this.blobStorageWriter, context.SinkLocation); + var parquetSink = ParquetSinkFromContext(context, + source.GetParquetSchema(), + this.blobStorageWriter, + context.SinkLocation, + this.interruptionToken); return Source.FromGraph(source) .GroupedWithin(context.RowsPerGroup, context.GroupingInterval) .Select(grp => @@ -55,12 +63,14 @@ public CdmChangeFeedGraphBuilder(IBlobStorageService blobStorageService, Metrics private static ParquetSink ParquetSinkFromContext(CdmChangeFeedStreamContext streamContext, Schema schema, - IBlobStorageWriter blobStorageWriter, string sinkLocation) + IBlobStorageWriter blobStorageWriter, string sinkLocation, IInterruptionToken interruptionToken) { var parquetSink = ParquetSink.Create(parquetSchema: schema, storageWriter: blobStorageWriter, parquetFilePath: $"{sinkLocation}/{streamContext.StreamId}", rowGroupsPerFile: streamContext.GroupsPerFile, createSchemaFile: true, + interruptionToken: interruptionToken, + streamMetadata: streamContext.GetStreamMetadata().GetOrElse(new StreamMetadata(Option.None)), dataSinkPathSegment: streamContext.IsBackfilling ? "backfill" : "data", dropCompletionToken: streamContext.IsBackfilling); diff --git a/src/Models/CdmChangeFeedStreamContext.cs b/src/Models/CdmChangeFeedStreamContext.cs index b5b41cd..9b6f8b4 100644 --- a/src/Models/CdmChangeFeedStreamContext.cs +++ b/src/Models/CdmChangeFeedStreamContext.cs @@ -1,7 +1,9 @@ using System; using System.Text.Json.Serialization; +using Akka.Util; using Arcane.Framework.Configuration; using Arcane.Framework.Services.Base; +using Arcane.Framework.Sinks.Models; namespace Arcane.Stream.Cdm.Models; @@ -57,7 +59,9 @@ public class CdmChangeFeedStreamContext : IStreamContext, IStreamContextWriter [JsonConverter(typeof(SecondsToTimeSpanConverter))] [JsonPropertyName("schemaUpdateIntervalSeconds")] public TimeSpan SchemaUpdateInterval { get; set; } - + + public Option GetStreamMetadata() => new StreamMetadata(Option.None); + /// > public string StreamId { get; private set; }