Skip to content

Commit

Permalink
feat(conditional-writes): misc updates and fixes (datahub-project#10901)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Jul 12, 2024
1 parent d77d565 commit 46530f2
Show file tree
Hide file tree
Showing 8 changed files with 594 additions and 60 deletions.
12 changes: 12 additions & 0 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ A writer can provide a header with the expected `version` when initiating the re
match the actual `version` stored in the database, the write will fail. This prevents overwriting an aspect that has
been modified by another process.

Note: If the aspect doesn't exist yet, then the `version` is `-1`. A writer can use this `version` to only create
an aspect if it doesn't. Also see _Change Types: [`CREATE`, `CREATE_ENTITY`]_ section below.

#### If-Modified-Since / If-Unmodified-Since

A writer may also specify time-based conditions using http header semantics. Similar to version based conditional writes
Expand All @@ -194,3 +197,12 @@ A writer can specify that the aspect must NOT have been modified after a specifi

`If-Modified-Since`
A writer can specify that the aspect must have been modified after a specific time, following [If-Modified-Since](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Modified-Since) http headers.

#### Change Types: [`CREATE`, `CREATE_ENTITY`]

Another form of conditional writes which considers the existence of an aspect or entity uses the following Change Types.

`CREATE` - Create the aspect if it doesn't already exist.

`CREATE_ENTITY` - Create the aspect if no aspects exist for the entity.

224 changes: 223 additions & 1 deletion docs/api/openapi/openapi-usage-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -588,4 +588,226 @@ public class Main {
### Conditional Writes

All the create/POST endpoints for aspects support `headers` in the POST body to support batch APIs. See the docs in the
[MetadataChangeProposal](../../advanced/mcp-mcl.md) section for the use of these headers to support conditional writes semantics.
[MetadataChangeProposal](../../advanced/mcp-mcl.md) section for the use of these headers to support conditional writes semantics.

### Batch Get

Batch get endpoints in the form of `/v3/entity/{entityName}/batchGet` exist for all entities. This endpoint allows
fetching entity and aspects in bulk. In combination with the `If-Version-Match` header it can also retrieve
a specific version of the aspects, however it defaults to the latest aspect version. Currently, this interface is limited
to returning a single version for each entity/aspect however different versions can be specified across entities.

A few example queries are as follows:

Example Request:

Fetch the latest aspects for the given URNs with the url parameter `systemMetadata=true` in order to view the current
versions of the aspects.

```json
[
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)",
"globalTags": {},
"datasetProperties": {}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"globalTags": {},
"datasetProperties": {}
}
]
```

Example Response:

Notice that `systemMetadata` contains `"version": "1"` for each of the aspects that exist in the system.

```json
[
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)",
"datasetProperties": {
"value": {
"description": "table containing all the users deleted on a single day",
"customProperties": {
"encoding": "utf-8"
},
"tags": []
},
"systemMetadata": {
"properties": {
"clientVersion": "1!0.0.0.dev0",
"clientId": "acryl-datahub"
},
"version": "1",
"lastObserved": 1720781548776,
"lastRunId": "file-2024_07_12-05_52_28",
"runId": "file-2024_07_12-05_52_28"
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"datasetProperties": {
"value": {
"description": "table containing all the users created on a single day",
"customProperties": {
"encoding": "utf-8"
},
"tags": []
},
"systemMetadata": {
"properties": {
"clientVersion": "1!0.0.0.dev0",
"clientId": "acryl-datahub"
},
"version": "1",
"lastObserved": 1720781548773,
"lastRunId": "file-2024_07_12-05_52_28",
"runId": "file-2024_07_12-05_52_28"
}
},
"globalTags": {
"value": {
"tags": [
{
"tag": "urn:li:tag:NeedsDocumentation"
}
]
},
"systemMetadata": {
"properties": {
"appSource": "ui"
},
"version": "1",
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"runId": "no-run-id-provided"
}
}
}
]
```

Next let's mutate `globalTags` for the second URN by adding a new tag. This will increment the version of
the `globalTags` aspect. The response will then look at like the following, notice the incremented
`"version": "2"` in `systemMetadata` for the `globalTags` aspect. Also notice that there are now 2 tags present, unlike
previously where only `urn:li:tag:NeedsDocumentation` was present.

```json
[
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)",
"datasetProperties": {
"value": {
"description": "table containing all the users deleted on a single day",
"customProperties": {
"encoding": "utf-8"
},
"tags": []
},
"systemMetadata": {
"properties": {
"clientVersion": "1!0.0.0.dev0",
"clientId": "acryl-datahub"
},
"version": "1",
"lastObserved": 1720781548776,
"lastRunId": "file-2024_07_12-05_52_28",
"runId": "file-2024_07_12-05_52_28"
}
}
},
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"datasetProperties": {
"value": {
"description": "table containing all the users created on a single day",
"customProperties": {
"encoding": "utf-8"
},
"tags": []
},
"systemMetadata": {
"properties": {
"clientVersion": "1!0.0.0.dev0",
"clientId": "acryl-datahub"
},
"version": "1",
"lastObserved": 1720781548773,
"lastRunId": "file-2024_07_12-05_52_28",
"runId": "file-2024_07_12-05_52_28"
}
},
"globalTags": {
"value": {
"tags": [
{
"tag": "urn:li:tag:NeedsDocumentation"
},
{
"tag": "urn:li:tag:Legacy"
}
]
},
"systemMetadata": {
"properties": {
"appSource": "ui"
},
"version": "2",
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"runId": "no-run-id-provided"
}
}
}
]
```

Next, we'll retrieve the previous version of the `globalTags` for the one aspect with a version 2 with the following query.
We can do this by populating the `headers` map with `If-Version-Match` to retrieve the previous version 1.

Example Request:
```json
[
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"globalTags": {
"headers": {
"If-Version-Match": "1"
}
}
}
]
```

Example Response:

The previous version `1` of the `globalTags` aspect is returned as expected with only the single tag.

```json
[
{
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"globalTags": {
"value": {
"tags": [
{
"tag": "urn:li:tag:NeedsDocumentation"
}
]
},
"systemMetadata": {
"properties": {
"appSource": "ui"
},
"version": "1",
"lastObserved": 0,
"lastRunId": "no-run-id-provided",
"runId": "no-run-id-provided"
}
}
}
]
```
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@Getter
@Accessors(chain = true)
public class ConditionalWriteValidator extends AspectPayloadValidator {
public static final String DEFAULT_ASPECT_VERSION = "1";
public static final String UNVERSIONED_ASPECT_VERSION = "-1";
public static final long DEFAULT_LAST_MODIFIED_TIME = Long.MIN_VALUE;
public static final String HTTP_HEADER_IF_VERSION_MATCH = "If-Version-Match";
public static final Set<ChangeType> CREATE_CHANGE_TYPES =
Expand Down Expand Up @@ -130,7 +130,7 @@ private static Optional<AspectValidationException> validateVersionPrecondition(
switch (item.getChangeType()) {
case CREATE:
case CREATE_ENTITY:
actualAspectVersion = DEFAULT_ASPECT_VERSION;
actualAspectVersion = UNVERSIONED_ASPECT_VERSION;
break;
default:
actualAspectVersion =
Expand All @@ -143,7 +143,7 @@ private static Optional<AspectValidationException> validateVersionPrecondition(
return String.valueOf(Math.max(1, prevSystemAspect.getVersion()));
}
})
.orElse(DEFAULT_ASPECT_VERSION);
.orElse(UNVERSIONED_ASPECT_VERSION);
break;
}

Expand Down
Loading

0 comments on commit 46530f2

Please sign in to comment.