Skip to content

Commit

Permalink
Don't return the primary key/cursor field for all connections (#12094)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonsSpaghetti committed Apr 16, 2024
1 parent 24493d1 commit ba4c105
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ class ConnectionConfigurationProblem private constructor(message: String) : Abst
)
}

fun primaryKeyAlreadyDefined(streamName: String): ConnectionConfigurationProblem {
fun primaryKeyAlreadyDefined(
streamName: String,
allowedPrimaryKey: List<List<String>>,
): ConnectionConfigurationProblem {
return ConnectionConfigurationProblem(
"Primary key for stream: $streamName is already pre-defined. Please do NOT include a primary key configuration for this stream.",
"Primary key for stream: $streamName is already pre-defined. Please remove the primaryKey or provide the value as $allowedPrimaryKey.",
)
}

Expand All @@ -88,6 +91,15 @@ class ConnectionConfigurationProblem private constructor(message: String) : Abst
)
}

fun duplicatePrimaryKey(
streamName: String,
key: List<List<String?>?>,
): ConnectionConfigurationProblem {
return ConnectionConfigurationProblem(
"Duplicate primary key detected for stream: $streamName, please don't provide the same column more than once. Key: $key",
)
}

fun invalidCronExpressionUnderOneHour(cronExpression: String): ConnectionConfigurationProblem {
return ConnectionConfigurationProblem(
"The cron expression " + cronExpression +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,8 @@ object AirbyteCatalogHelper {
airbyteStream: AirbyteStream,
streamConfiguration: StreamConfiguration,
): List<List<String>>? {
// if no source defined primary key
return if (airbyteStream.sourceDefinedPrimaryKey == null || airbyteStream.sourceDefinedPrimaryKey!!.isEmpty()) {
return (airbyteStream.sourceDefinedPrimaryKey ?: emptyList()).ifEmpty {
streamConfiguration.primaryKey
} else if (streamConfiguration.primaryKey == null || streamConfiguration.primaryKey.isEmpty()) {
airbyteStream.sourceDefinedPrimaryKey
} else {
listOf()
}
}

Expand Down Expand Up @@ -325,26 +320,33 @@ object AirbyteCatalogHelper {
primaryKey: List<List<String>>?,
airbyteStream: AirbyteStream,
) {
// if no source defined primary key
if (airbyteStream.sourceDefinedPrimaryKey == null || airbyteStream.sourceDefinedPrimaryKey!!.isEmpty()) {
if (!primaryKey.isNullOrEmpty()) {
// validate primary key
// Validate that if a source defined primary key exists, that's the one we use.
// Currently, UI only supports this and there's likely assumptions baked into the platform that mean this needs to be true.
val sourceDefinedPrimaryKeyExists = !airbyteStream.sourceDefinedPrimaryKey.isNullOrEmpty()
val configuredPrimaryKeyExists = !primaryKey.isNullOrEmpty()

val validPrimaryKey: List<List<String>> = getStreamFields(airbyteStream.jsonSchema!!)
if (sourceDefinedPrimaryKeyExists && configuredPrimaryKeyExists) {
if (airbyteStream.sourceDefinedPrimaryKey != primaryKey) {
throw ConnectionConfigurationProblem.primaryKeyAlreadyDefined(airbyteStream.name, airbyteStream.sourceDefinedPrimaryKey)
}
}

// todo maybe check that they don't provide the same primary key twice?
for (singlePrimaryKey in primaryKey) {
if (!validPrimaryKey.contains(singlePrimaryKey)) { // todo double check if the .contains() for list of strings works as intended
throw ConnectionConfigurationProblem.invalidPrimaryKey(airbyteStream.name, validPrimaryKey)
}
}
} else {
throw ConnectionConfigurationProblem.missingPrimaryKey(airbyteStream.name)
// Ensure that we've passed at least some kind of primary key
val noPrimaryKey = !configuredPrimaryKeyExists && !sourceDefinedPrimaryKeyExists
if (noPrimaryKey) {
throw ConnectionConfigurationProblem.missingPrimaryKey(airbyteStream.name)
}

// Validate the actual key passed in
val validPrimaryKey: List<List<String>> = getStreamFields(airbyteStream.jsonSchema!!)

for (singlePrimaryKey in primaryKey!!) {
if (!validPrimaryKey.contains(singlePrimaryKey)) { // todo double check if the .contains() for list of strings works as intended
throw ConnectionConfigurationProblem.invalidPrimaryKey(airbyteStream.name, validPrimaryKey)
}
} else {
// source defined primary key exists
if (!primaryKey.isNullOrEmpty()) {
throw ConnectionConfigurationProblem.primaryKeyAlreadyDefined(airbyteStream.name)

if (singlePrimaryKey.distinct() != singlePrimaryKey) {
throw ConnectionConfigurationProblem.duplicatePrimaryKey(airbyteStream.name, primaryKey)
}
}
}
Expand Down

0 comments on commit ba4c105

Please sign in to comment.