Skip to content

Commit

Permalink
chore: add primary key updates to schema change notifications (#14555)
Browse files Browse the repository at this point in the history
  • Loading branch information
teallarson committed Nov 14, 2024
1 parent 68cfa4e commit 0dbd990
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.api.common.StreamDescriptorUtils;
import io.airbyte.api.model.generated.CatalogDiff;
import io.airbyte.api.model.generated.StreamAttributeTransform;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.commons.envvar.EnvVar;
import io.airbyte.commons.json.Jsons;
Expand All @@ -26,6 +28,7 @@
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import okhttp3.Interceptor;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -341,40 +344,66 @@ static ObjectNode buildSchemaChangeJson(final SchemaUpdateNotification notificat
final ObjectNode changesNode = MAPPER.createObjectNode();
messageDataNode.set("changes", changesNode);

final var diff = notification.getCatalogDiff();
final var newStreams = diff.getTransforms().stream().filter((t) -> t.getTransformType() == StreamTransform.TransformTypeEnum.ADD_STREAM).toList();
final CatalogDiff diff = notification.getCatalogDiff();

final List<StreamTransform> newStreams =
diff.getTransforms().stream().filter((t) -> t.getTransformType() == StreamTransform.TransformTypeEnum.ADD_STREAM).toList();
LOGGER.info("Notify schema changes on new streams: {}", newStreams);
final ArrayNode newStreamsNodes = MAPPER.createArrayNode();
changesNode.set("new_streams", newStreamsNodes);
for (final var stream : newStreams) {
for (final StreamTransform stream : newStreams) {
newStreamsNodes.add(StreamDescriptorUtils.buildFullyQualifiedName(stream.getStreamDescriptor()));
}

final var deletedStreams =
final List<StreamTransform> deletedStreams =
diff.getTransforms().stream().filter((t) -> t.getTransformType() == StreamTransform.TransformTypeEnum.REMOVE_STREAM).toList();
LOGGER.info("Notify schema changes on deleted streams: {}", deletedStreams);
final ArrayNode deletedStreamsNodes = MAPPER.createArrayNode();
changesNode.set("deleted_streams", deletedStreamsNodes);
for (final var stream : deletedStreams) {
for (final StreamTransform stream : deletedStreams) {
deletedStreamsNodes.add(StreamDescriptorUtils.buildFullyQualifiedName(stream.getStreamDescriptor()));
}

final var alteredStreams =
final List<StreamTransform> alteredStreams =
diff.getTransforms().stream().filter((t) -> t.getTransformType() == StreamTransform.TransformTypeEnum.UPDATE_STREAM).toList();
LOGGER.info("Notify schema changes on altered streams: {}", alteredStreams);
final ObjectNode modifiedStreamsNodes = MAPPER.createObjectNode();
changesNode.set("modified_streams", modifiedStreamsNodes);
for (final var stream : alteredStreams) {

final var streamNode = MAPPER.createObjectNode();
for (final StreamTransform stream : alteredStreams) {

final ObjectNode streamNode = MAPPER.createObjectNode();
modifiedStreamsNodes.set(StreamDescriptorUtils.buildFullyQualifiedName(stream.getStreamDescriptor()), streamNode);
final ArrayNode newFields = MAPPER.createArrayNode();
final ArrayNode deletedFields = MAPPER.createArrayNode();
final ArrayNode modifiedFields = MAPPER.createArrayNode();
final ArrayNode updatedPrimaryKeyInfo = MAPPER.createArrayNode();

streamNode.set("new", newFields);
streamNode.set("deleted", deletedFields);
streamNode.set("altered", modifiedFields);
streamNode.set("updated_primary_key", updatedPrimaryKeyInfo);

final Optional<StreamAttributeTransform> primaryKeyChangeOptional = stream.getUpdateStream().getStreamAttributeTransforms().stream()
.filter(t -> t.getTransformType().equals(StreamAttributeTransform.TransformTypeEnum.UPDATE_PRIMARY_KEY))
.findFirst();

if (primaryKeyChangeOptional.isPresent()) {
final StreamAttributeTransform primaryKeyChange = primaryKeyChangeOptional.get();
final List<List<String>> oldPrimaryKey = primaryKeyChange.getUpdatePrimaryKey().getOldPrimaryKey();
final String oldPrimaryKeyString = formatPrimaryKeyString(oldPrimaryKey);

final List<List<String>> newPrimaryKey = primaryKeyChange.getUpdatePrimaryKey().getNewPrimaryKey();
final String newPrimaryKeyString = formatPrimaryKeyString(newPrimaryKey);

if (!oldPrimaryKeyString.isEmpty() && newPrimaryKeyString.isEmpty()) {
updatedPrimaryKeyInfo.add(String.format("%s removed as primary key", oldPrimaryKeyString));
} else if (oldPrimaryKeyString.isEmpty() && !newPrimaryKeyString.isEmpty()) {
updatedPrimaryKeyInfo.add(String.format("%s added as primary key", newPrimaryKeyString));
} else if (!oldPrimaryKeyString.isEmpty()) {
updatedPrimaryKeyInfo.add(String.format("Primary key changed (%s -> %s)", oldPrimaryKeyString, newPrimaryKeyString));
}
}

for (final var fieldChange : stream.getUpdateStream().getFieldTransforms()) {
final String fieldName = StreamDescriptorUtils.buildFieldName(fieldChange.getFieldName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.notification;

import io.airbyte.api.common.StreamDescriptorUtils;
import io.airbyte.config.ActorDefinitionBreakingChange;
import io.airbyte.config.ActorType;
import io.airbyte.notification.messages.SchemaUpdateNotification;
Expand Down Expand Up @@ -57,4 +58,14 @@ public abstract boolean notifySchemaDiffToApply(final SchemaUpdateNotification n

public abstract String getNotificationClientType();

static String formatPrimaryKeyString(List<List<String>> primaryKey) {
final String primaryKeyString = String.join(", ", primaryKey.stream().map(StreamDescriptorUtils::buildFieldName).toList());

if (primaryKeyString.isEmpty()) {
return "";
}

return primaryKeyString.contains(",") ? "[" + primaryKeyString + "]" : primaryKeyString;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.api.common.StreamDescriptorUtils;
import io.airbyte.api.model.generated.CatalogDiff;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.StreamAttributeTransform;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.commons.resources.MoreResources;
Expand Down Expand Up @@ -321,42 +322,74 @@ public boolean notifySchemaDiffToApply(final SchemaUpdateNotification notificati
protected static String buildSummary(final CatalogDiff diff) {
final StringBuilder summaryBuilder = new StringBuilder();

final var newStreams =
final List<StreamTransform> newStreams =
diff.getTransforms().stream().filter((t) -> t.getTransformType() == StreamTransform.TransformTypeEnum.ADD_STREAM)
.sorted(Comparator.comparing(o -> StreamDescriptorUtils.buildFullyQualifiedName(o.getStreamDescriptor()))).toList();
final var deletedStreams =
final List<StreamTransform> deletedStreams =
diff.getTransforms().stream().filter((t) -> t.getTransformType() == StreamTransform.TransformTypeEnum.REMOVE_STREAM)
.sorted(Comparator.comparing(o -> StreamDescriptorUtils.buildFullyQualifiedName(o.getStreamDescriptor()))).toList();
if (!newStreams.isEmpty() || !deletedStreams.isEmpty()) {
summaryBuilder.append(String.format(" • Streams (+%d/-%d)\n", newStreams.size(), deletedStreams.size()));
for (final var stream : newStreams) {
final List<StreamTransform> streamsWithPkChanges =
diff.getTransforms().stream().filter((t) -> t.getTransformType() == StreamTransform.TransformTypeEnum.UPDATE_STREAM)
.filter(t -> t.getUpdateStream().getStreamAttributeTransforms().stream()
.anyMatch(a -> a.getTransformType().equals(StreamAttributeTransform.TransformTypeEnum.UPDATE_PRIMARY_KEY)))
.sorted(Comparator.comparing(o -> StreamDescriptorUtils.buildFullyQualifiedName(o.getStreamDescriptor()))).toList();

if (!newStreams.isEmpty() || !deletedStreams.isEmpty() || !streamsWithPkChanges.isEmpty()) {
summaryBuilder.append(String.format(" • Streams (+%d/-%d/~%d)\n", newStreams.size(), deletedStreams.size(), streamsWithPkChanges.size()));
for (final StreamTransform stream : newStreams) {
final StreamDescriptor descriptor = stream.getStreamDescriptor();
final String fullyQualifiedStreamName = StreamDescriptorUtils.buildFullyQualifiedName(descriptor);
summaryBuilder.append(String.format(" + %s\n", fullyQualifiedStreamName));
}
for (final var stream : deletedStreams) {
for (final StreamTransform stream : deletedStreams) {
final StreamDescriptor descriptor = stream.getStreamDescriptor();
final String fullyQualifiedStreamName = StreamDescriptorUtils.buildFullyQualifiedName(descriptor);
summaryBuilder.append(String.format(" - %s\n", fullyQualifiedStreamName));
}
for (final StreamTransform stream : streamsWithPkChanges) {
final StreamDescriptor descriptor = stream.getStreamDescriptor();
final String fullyQualifiedStreamName = StreamDescriptorUtils.buildFullyQualifiedName(descriptor);
final Optional<StreamAttributeTransform> primaryKeyChange = stream.getUpdateStream().getStreamAttributeTransforms().stream()
.filter(t -> t.getTransformType().equals(StreamAttributeTransform.TransformTypeEnum.UPDATE_PRIMARY_KEY))
.findFirst();
if (primaryKeyChange.isPresent()) {
final String oldPrimaryKeyString = formatPrimaryKeyString(primaryKeyChange.get().getUpdatePrimaryKey().getOldPrimaryKey());
final String newPrimaryKeyString = formatPrimaryKeyString(primaryKeyChange.get().getUpdatePrimaryKey().getNewPrimaryKey());

summaryBuilder.append(String.format(" ~ %s\n", fullyQualifiedStreamName));

if (!oldPrimaryKeyString.isEmpty() && newPrimaryKeyString.isEmpty()) {
summaryBuilder.append(String.format(" • %s removed as primary key\n", oldPrimaryKeyString));
} else if (oldPrimaryKeyString.isEmpty() && !newPrimaryKeyString.isEmpty()) {
summaryBuilder.append(String.format(" • %s added as primary key\n", newPrimaryKeyString));
} else if (!oldPrimaryKeyString.isEmpty()) {
summaryBuilder.append(String.format(" • Primary key changed (%s -> %s)\n", oldPrimaryKeyString, newPrimaryKeyString));
}
}
}
}

final var alteredStreams =
diff.getTransforms().stream().filter((t) -> t.getTransformType() == StreamTransform.TransformTypeEnum.UPDATE_STREAM)
final List<StreamTransform> streamsWithFiledChanges =
diff.getTransforms().stream().filter((t) -> t.getTransformType() == StreamTransform.TransformTypeEnum.UPDATE_STREAM
&& t.getUpdateStream().getStreamAttributeTransforms().stream()
.noneMatch(a -> a.getTransformType().equals(StreamAttributeTransform.TransformTypeEnum.UPDATE_PRIMARY_KEY)))
.sorted(Comparator.comparing(o -> StreamDescriptorUtils.buildFullyQualifiedName(o.getStreamDescriptor()))).toList();
if (!alteredStreams.isEmpty()) {
final var newFieldCount = alteredStreams.stream().flatMap(t -> t.getUpdateStream().getFieldTransforms().stream())
if (!streamsWithFiledChanges.isEmpty()) {
final long newFieldCount = streamsWithFiledChanges.stream().flatMap(t -> t.getUpdateStream().getFieldTransforms().stream())
.filter(t -> t.getTransformType().equals(FieldTransform.TransformTypeEnum.ADD_FIELD)).count();
final var deletedFieldsCount = alteredStreams.stream().flatMap(t -> t.getUpdateStream().getFieldTransforms().stream())
final long deletedFieldsCount = streamsWithFiledChanges.stream().flatMap(t -> t.getUpdateStream().getFieldTransforms().stream())
.filter(t -> t.getTransformType().equals(FieldTransform.TransformTypeEnum.REMOVE_FIELD)).count();
final var alteredFieldsCount = alteredStreams.stream().flatMap(t -> t.getUpdateStream().getFieldTransforms().stream())
final long alteredFieldsCount = streamsWithFiledChanges.stream().flatMap(t -> t.getUpdateStream().getFieldTransforms().stream())
.filter(t -> t.getTransformType().equals(FieldTransform.TransformTypeEnum.UPDATE_FIELD_SCHEMA)).count();

summaryBuilder.append(String.format(" • Fields (+%d/~%d/-%d)\n", newFieldCount, alteredFieldsCount, deletedFieldsCount));
for (final var stream : alteredStreams) {

for (final StreamTransform stream : streamsWithFiledChanges) {
final StreamDescriptor descriptor = stream.getStreamDescriptor();
final String fullyQualifiedStreamName = StreamDescriptorUtils.buildFullyQualifiedName(descriptor);
summaryBuilder.append(String.format(" • %s\n", fullyQualifiedStreamName));
for (final var fieldChange : stream.getUpdateStream().getFieldTransforms().stream().sorted((o1, o2) -> {

for (final FieldTransform fieldChange : stream.getUpdateStream().getFieldTransforms().stream().sorted((o1, o2) -> {
if (o1.getTransformType().equals(o2.getTransformType())) {
return StreamDescriptorUtils.buildFieldName(o1.getFieldName())
.compareTo(StreamDescriptorUtils.buildFieldName(o2.getFieldName()));
Expand All @@ -380,8 +413,8 @@ protected static String buildSummary(final CatalogDiff diff) {
}
}
}

return summaryBuilder.toString();

}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.api.model.generated.CatalogDiff;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.StreamAttributePrimaryKeyUpdate;
import io.airbyte.api.model.generated.StreamAttributeTransform;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum;
Expand Down Expand Up @@ -217,7 +219,16 @@ void testBuildSchemaNotificationMessageData() {
.addTransformsItem(
new StreamTransform().transformType(StreamTransform.TransformTypeEnum.ADD_STREAM).streamDescriptor(new StreamDescriptor().name("foo")))
.addTransformsItem(new StreamTransform().transformType(StreamTransform.TransformTypeEnum.REMOVE_STREAM)
.streamDescriptor(new StreamDescriptor().name("removed")));
.streamDescriptor(new StreamDescriptor().name("removed")))
.addTransformsItem(new StreamTransform().transformType(StreamTransform.TransformTypeEnum.UPDATE_STREAM)
.updateStream(new StreamTransformUpdateStream()
.streamAttributeTransforms(List.of(
new StreamAttributeTransform()
.transformType(StreamAttributeTransform.TransformTypeEnum.UPDATE_PRIMARY_KEY)
.updatePrimaryKey(
new StreamAttributePrimaryKeyUpdate()
.newPrimaryKey(List.of(List.of("new_pk")))))))
.streamDescriptor(new StreamDescriptor().name("stream_with_added_pk")));
String recipient = "[email protected]";
String transactionMessageId = "455";
SchemaUpdateNotification notification = SchemaUpdateNotification.builder()
Expand All @@ -239,6 +250,7 @@ void testBuildSchemaNotificationMessageData() {
assertTrue(node.get("message_data").get("changes").get("deleted_streams").isArray());
assertEquals(1, node.get("message_data").get("changes").get("deleted_streams").size());
assertTrue(node.get("message_data").get("changes").get("modified_streams").isObject());
assertEquals(1, node.get("message_data").get("changes").get("deleted_streams").size());
assertEquals(1, node.get("message_data").get("changes").get("modified_streams").get("updatedStream").get("deleted").size());
}

Expand Down
Loading

0 comments on commit 0dbd990

Please sign in to comment.