Skip to content

Commit

Permalink
feat: transformer: sideline partially erroring lines (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samrose-Ahmed authored Apr 11, 2023
1 parent c6ec2ff commit 682376e
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 35 deletions.
1 change: 1 addition & 0 deletions infra/bin/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const dpMainStack = new DPMainStack(app, "DPMainStack", {
realtimeBucket: dpCommonStack.realtimeBucket,
realtimeBucketTopic: dpCommonStack.realtimeBucketTopic,
matanoAthenaResultsBucket: dpCommonStack.matanoAthenaResultsBucket,
transformerSidelineBucket: dpCommonStack.transformerSidelineBucket,
integrationsStore: dpCommonStack.integrationsStore,
alertTrackerTable: dpCommonStack.alertTrackerTable,
});
Expand Down
3 changes: 3 additions & 0 deletions infra/lib/transformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as lambda from "aws-cdk-lib/aws-lambda";
import { RustFunctionLayer } from "./rust-function-layer";

interface TransformerProps {
sidelineBucket: s3.IBucket;
realtimeBucket: s3.IBucket;
realtimeTopic: sns.Topic;
matanoSourcesBucket: s3.IBucket;
Expand Down Expand Up @@ -39,6 +40,7 @@ export class Transformer extends Construct {
MATANO_REALTIME_BUCKET_NAME: props.realtimeBucket.bucketName,
MATANO_REALTIME_TOPIC_ARN: props.realtimeTopic.topicArn,
SQS_METADATA: props.sqsMetadata,
MATANO_SIDELINE_BUCKET: props.sidelineBucket.bucketName,
},
layers: [this.rustFunctionLayer.layer],
timeout: cdk.Duration.seconds(100),
Expand All @@ -56,6 +58,7 @@ export class Transformer extends Construct {
],
});

props.sidelineBucket.grantReadWrite(this.transformerLambda);
props.matanoSourcesBucket.grantRead(this.transformerLambda);
props.realtimeBucket.grantWrite(this.transformerLambda);
props.realtimeTopic.grantPublish(this.transformerLambda);
Expand Down
9 changes: 9 additions & 0 deletions infra/src/DPCommonStack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export class DPCommonStack extends MatanoStack {
integrationsStore: IntegrationsStore;
alertTrackerTable: ddb.Table;
matanoAthenaResultsBucket: s3.Bucket;
transformerSidelineBucket: s3.Bucket;

constructor(scope: Construct, id: string, props: DPCommonStackProps) {
super(scope, id, props);
Expand Down Expand Up @@ -135,6 +136,8 @@ export class DPCommonStack extends MatanoStack {
this.integrationsStore = new IntegrationsStore(this, "MatanoIntegrationsStore", {});
}

this.transformerSidelineBucket = new Bucket(this, "MatanoTransformerSidelineBucket");

this.humanCfnOutput("MatanoIngestionS3BucketName", {
value: this.matanoIngestionBucket.bucket.bucketName,
description:
Expand All @@ -147,8 +150,14 @@ export class DPCommonStack extends MatanoStack {
"The name of the S3 Bucket used for long term storage backing your data lake. See https://www.matano.dev/docs/tables/querying",
});

this.humanCfnOutput("MatanoTransformerSidelineS3BucketName", {
value: this.transformerSidelineBucket.bucketName,
description: "The name of the S3 Bucket where erroring lines are sidelined.",
});

// important: to prevent output deletion
this.exportValue(this.matanoIngestionBucket.topic.topicArn);
this.exportValue(this.transformerSidelineBucket.bucketArn);
this.exportValue(this.matanoIngestionBucket.bucket.bucketArn);
this.exportValue(this.matanoAthenaResultsBucket.bucketArn);
}
Expand Down
2 changes: 2 additions & 0 deletions infra/src/DPMainStack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ interface DPMainStackProps extends MatanoStackProps {
realtimeBucket: Bucket;
realtimeBucketTopic: sns.Topic;
matanoAthenaResultsBucket: s3.Bucket;
transformerSidelineBucket: s3.Bucket;
integrationsStore: IntegrationsStore;
alertTrackerTable: Table;
}
Expand Down Expand Up @@ -194,6 +195,7 @@ export class DPMainStack extends MatanoStack {
realtimeBucket: props.realtimeBucket,
realtimeTopic: props.realtimeBucketTopic,
matanoSourcesBucket: props.matanoSourcesBucket.bucket,
sidelineBucket: props.transformerSidelineBucket,
logSourcesConfigurationPath: path.join(this.configTempDir, "config"), // TODO: weird fix later (@shaeq)
sqsMetadata: sqsSources.sqsMetadata,
});
Expand Down
5 changes: 3 additions & 2 deletions lib/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/rust/lake_writer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
shared = { path = "../shared", features = ["avro"] }
rayon = "1.5.3"
zstd = "0.11.2"
zstd = "0.12.1"
base64 = "0.13.1"
bytes = "1.2.1"
tokio = { version = "1.17.0", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion lib/rust/log_puller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async-trait = "0.1.58"
enum_dispatch = "0.3.8"
regex = "1"
async-stream = "0.3.3"
zstd = "0.11.2"
zstd = "0.12.1"
walkdir = "2.3.2"
zip = "0.6.3"
config = { version = "0.13.1", features = ["yaml"] }
Expand Down
8 changes: 8 additions & 0 deletions lib/rust/shared/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,11 @@ pub fn convert_json_array_str_to_ndjson(s: &str) -> Result<String> {

Ok(res)
}

pub fn result_to_option<T, E>(r: Result<Option<T>, E>) -> Option<Result<T, E>> {
match r {
Ok(Some(v)) => Some(Ok(v)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
1 change: 1 addition & 0 deletions lib/rust/transformer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async-compression = { version = "0.3.14", default-features = false, features = [
"zstd",
"stream",
] }
zstd = "0.12.1"
infer = "0.12.0"
regex = "1.5.4"
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", features = [
Expand Down
Loading

0 comments on commit 682376e

Please sign in to comment.