Skip to content

Commit

Permalink
🪟 🐛 Apply source-defined PK and cursor instead of clearing on breakin…
Browse files Browse the repository at this point in the history
…g change (#12105)
  • Loading branch information
teallarson committed Apr 15, 2024
1 parent ee98d01 commit 1ddb612
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { analyzeSyncCatalogBreakingChanges } from "./calculateInitialCatalog";
const mockSyncSchemaStream: AirbyteStreamAndConfiguration = {
stream: {
sourceDefinedCursor: true,
defaultCursorField: ["source_cursor"],
defaultCursorField: ["new_source_cursor"],
sourceDefinedPrimaryKey: [["new_primary_key"]],
jsonSchema: {},
name: "test",
Expand All @@ -31,6 +31,26 @@ const mockSyncSchemaStream: AirbyteStreamAndConfiguration = {
},
};

const mockSyncSchemaStreamUserDefined: AirbyteStreamAndConfiguration = {
stream: {
sourceDefinedCursor: true,
defaultCursorField: [],
sourceDefinedPrimaryKey: [],
jsonSchema: {},
name: "test",
namespace: "namespace-test",
supportedSyncModes: [],
},
config: {
destinationSyncMode: DestinationSyncMode.append,
selected: false,
syncMode: SyncMode.full_refresh,
cursorField: ["old_cursor"],
primaryKey: [["old_primary_key"]],
aliasName: "",
},
};

describe("analyzeSyncCatalogBreakingChanges", () => {
it("should return syncCatalog unchanged when schemaChange is no_change and catalogDiff is undefined", () => {
const syncCatalog: AirbyteCatalog = { streams: [mockSyncSchemaStream] };
Expand Down Expand Up @@ -62,7 +82,7 @@ describe("analyzeSyncCatalogBreakingChanges", () => {
],
};
const result = analyzeSyncCatalogBreakingChanges(syncCatalog, catalogDiff, SchemaChange.breaking);
expect(result.streams[0].config?.primaryKey).toEqual([]);
expect(result.streams[0].config?.primaryKey).toEqual([["new_primary_key"]]);
});

it("should return syncCatalog with transformed streams when there are breaking changes - cursor", () => {
Expand All @@ -83,6 +103,48 @@ describe("analyzeSyncCatalogBreakingChanges", () => {
],
};
const result = analyzeSyncCatalogBreakingChanges(syncCatalog, catalogDiff, SchemaChange.breaking);
expect(result.streams[0].config?.cursorField).toEqual(["new_source_cursor"]);
});

it("should return syncCatalog with transformed streams when there are breaking changes - primaryKey - user-defined", () => {
const syncCatalog: AirbyteCatalog = { streams: [mockSyncSchemaStreamUserDefined] };
const catalogDiff: CatalogDiff = {
transforms: [
{
transformType: StreamTransformTransformType.update_stream,
streamDescriptor: { name: "test", namespace: "namespace-test" },
updateStream: [
{
breaking: true,
transformType: FieldTransformTransformType.remove_field,
fieldName: ["old_primary_key"],
},
],
},
],
};
const result = analyzeSyncCatalogBreakingChanges(syncCatalog, catalogDiff, SchemaChange.breaking);
expect(result.streams[0].config?.primaryKey).toEqual([]);
});

it("should return syncCatalog with transformed streams when there are breaking changes - cursor - user-defined", () => {
const syncCatalog: AirbyteCatalog = { streams: [mockSyncSchemaStreamUserDefined] };
const catalogDiff: CatalogDiff = {
transforms: [
{
transformType: StreamTransformTransformType.update_stream,
streamDescriptor: { name: "test", namespace: "namespace-test" },
updateStream: [
{
breaking: true,
transformType: FieldTransformTransformType.remove_field,
fieldName: ["old_cursor"],
},
],
},
],
};
const result = analyzeSyncCatalogBreakingChanges(syncCatalog, catalogDiff, SchemaChange.breaking);
expect(result.streams[0].config?.cursorField).toEqual([]);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,45 @@ const clearBreakingFieldChanges = (
}

const { primaryKey, cursorField } = nodeStream.config;
const stream = nodeStream.stream;

let clearPrimaryKey = false;
let clearCursorField = false;

for (const streamTransformation of breakingChangesByStream) {
if (!streamTransformation.updateStream || !streamTransformation.updateStream?.length) {
continue;
}

// get all of the removed field paths for this transformation
const breakingFieldPaths = streamTransformation.updateStream
.filter(({ breaking }) => breaking)
.map((update) => update.fieldName);

// if there is a primary key in the config, and any of its field paths were removed, we'll be clearing it
if (
!!primaryKey?.length &&
primaryKey?.some((primaryKeyPath) => breakingFieldPaths.some((path) => isEqual(primaryKeyPath, path)))
) {
clearPrimaryKey = true;
}

// if there is a cursor field, and any of its field path was removed, we'll be clearing it
if (!!cursorField?.length && breakingFieldPaths.some((path) => isEqual(path, cursorField))) {
clearCursorField = true;
}
}

if (clearPrimaryKey || clearCursorField) {
return {
...nodeStream,
config: {
...nodeStream.config,
primaryKey: clearPrimaryKey ? [] : nodeStream.config.primaryKey,
cursorField: clearCursorField ? [] : nodeStream.config.cursorField,
primaryKey: stream?.sourceDefinedPrimaryKey // it's possible there's a new source-defined primary key, in which case that should take precedence
? stream?.sourceDefinedPrimaryKey
: clearPrimaryKey
? []
: nodeStream.config.primaryKey,
cursorField: nodeStream.stream?.defaultCursorField
? nodeStream.stream?.defaultCursorField // likewise, a source-defined cursor should never be cleared
: clearCursorField
? []
: nodeStream.config.cursorField,
},
};
}
Expand Down

0 comments on commit 1ddb612

Please sign in to comment.