diff --git a/lib/constructs/persistence/data-pipeline.ts b/lib/constructs/persistence/data-pipeline.ts index 6ef0daf2a..7c82b398e 100644 --- a/lib/constructs/persistence/data-pipeline.ts +++ b/lib/constructs/persistence/data-pipeline.ts @@ -11,12 +11,16 @@ import { Construct } from 'constructs'; import { syllabusSchedule } from '../../configs/event/schedule'; import { prodCorsRule } from '../../configs/s3/cors'; import { allowApiGatewayPolicy, allowLambdaPolicy } from '../../utils/s3'; -import { SyllabusScraper, SyllabusUpdateFunction } from '../common/lambda-functions'; +import { + SyllabusScraper, + SyllabusUpdateFunction, +} from '../common/lambda-functions'; export enum Worker { SYLLABUS, CAREER, - FEEDS + FEEDS, + THREADIMG, } export interface DataPipelineProps { @@ -52,14 +56,21 @@ export class SyllabusDataPipeline extends AbstractDataPipeline { allowApiGatewayPolicy(this.dataWarehouse); allowLambdaPolicy(this.dataWarehouse); - const scraperBaseFunction: lambda.Function = new SyllabusScraper(this, 'scraper-base-function', { - envVars: { - ['BUCKET_NAME']: this.dataWarehouse.bucketName, - ['OBJECT_PATH']: 'syllabus/', + const scraperBaseFunction: lambda.Function = new SyllabusScraper( + this, + 'scraper-base-function', + { + envVars: { + ['BUCKET_NAME']: this.dataWarehouse.bucketName, + ['OBJECT_PATH']: 'syllabus/', + }, }, - }).baseFunction; + ).baseFunction; - function getLambdaTaskInstance(schools: string[], num: string): sfn_tasks.LambdaInvoke { + function getLambdaTaskInstance( + schools: string[], + num: string, + ): sfn_tasks.LambdaInvoke { return new sfn_tasks.LambdaInvoke(scope, 'task-' + num, { lambdaFunction: scraperBaseFunction, comment: 'Scrape the syllabus info of school(s).', @@ -79,9 +90,35 @@ export class SyllabusDataPipeline extends AbstractDataPipeline { .next(getLambdaTaskInstance(['PSE', 'G_ASE', 'LAW'], '4')) .next(getLambdaTaskInstance(['G_FSE', 'SOC', 'SSS'], '5')) .next(getLambdaTaskInstance(['G_LAS', 'G_CSE', 'G_EDU', 'HUM'], '6')) - .next(getLambdaTaskInstance(['SILS', 'G_HUM', 'CJL', 'SPS', 'G_WBS', 'G_PS'], '7')) - .next(getLambdaTaskInstance(['G_SPS', 'G_IPS', 'G_WLS', 'G_E', 'G_SSS', 'G_SC', 'G_LAW', - 'G_SAPS', 'G_SA', 'G_SJAL', 'G_SICCS', 'G_SEEE', 'EHUM', 'ART', 'CIE', 'G_ITS'], '8')) + .next( + getLambdaTaskInstance( + ['SILS', 'G_HUM', 'CJL', 'SPS', 'G_WBS', 'G_PS'], + '7', + ), + ) + .next( + getLambdaTaskInstance( + [ + 'G_SPS', + 'G_IPS', + 'G_WLS', + 'G_E', + 'G_SSS', + 'G_SC', + 'G_LAW', + 'G_SAPS', + 'G_SA', + 'G_SJAL', + 'G_SICCS', + 'G_SEEE', + 'EHUM', + 'ART', + 'CIE', + 'G_ITS', + ], + '8', + ), + ) .next(new sfn.Succeed(this, 'success', {})), }); @@ -141,17 +178,44 @@ export class SyllabusSyncPipeline extends AbstractDataPipeline { //Use exsisting s3 bucket this.dataSource = props.dataSource!; - this.processor = new SyllabusUpdateFunction(this, 'syllabus-update-function', { - envVars: { - ['BUCKET_NAME']: this.dataSource.bucketName, - ['TABLE_NAME']: this.dataWarehouse.tableName, - ['OBJECT_PATH']: 'syllabus/', + this.processor = new SyllabusUpdateFunction( + this, + 'syllabus-update-function', + { + envVars: { + ['BUCKET_NAME']: this.dataSource.bucketName, + ['TABLE_NAME']: this.dataWarehouse.tableName, + ['OBJECT_PATH']: 'syllabus/', + }, }, - }).updateFunction; + ).updateFunction; + + this.processor.addEventSource( + new event_sources.S3EventSource(this.dataSource, { + events: [s3.EventType.OBJECT_CREATED_PUT], + filters: [{ prefix: 'syllabus/' }], + }), + ); + } +} + +export class ThreadImgDataPipeline extends AbstractDataPipeline { + readonly dataSource: s3.Bucket; + readonly processor: lambda.Function; + readonly dataWarehouse: s3.Bucket; + + constructor(scope: Construct, id: string, props?: DataPipelineProps) { + super(scope, id); - this.processor.addEventSource(new event_sources.S3EventSource(this.dataSource, { - events: [s3.EventType.OBJECT_CREATED_PUT], - filters: [{ prefix: 'syllabus/' }], - })); + // Initialize S3 bucket for storing thread images + this.dataSource = new s3.Bucket(this, 'thread-img-bucket', { + accessControl: s3.BucketAccessControl.PRIVATE, + blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL, + bucketName: 'wasedatime-thread-img', + encryption: s3.BucketEncryption.S3_MANAGED, + publicReadAccess: false, + removalPolicy: RemovalPolicy.RETAIN, + versioned: true, + }); } } diff --git a/lib/stacks/persistence.ts b/lib/stacks/persistence.ts index ae2c94a98..1c4bd362e 100644 --- a/lib/stacks/persistence.ts +++ b/lib/stacks/persistence.ts @@ -7,6 +7,7 @@ import { CareerDataPipeline, SyllabusDataPipeline, SyllabusSyncPipeline, + ThreadImgDataPipeline, Worker, } from '../constructs/persistence/data-pipeline'; import { Collection, DynamoDatabase } from '../constructs/persistence/database'; @@ -44,6 +45,12 @@ export class WasedaTimePersistenceLayer extends PersistenceLayer { }, ); + const threadImgDataPipeline = new ThreadImgDataPipeline( + this, + 'thread-img-data-pipeline', + ); + this.dataPipelines[Worker.THREADIMG] = threadImgDataPipeline; + this.dataInterface.setEndpoint( DataEndpoint.COURSE_REVIEWS, dynamoDatabase.tables[Collection.COURSE_REVIEW].tableName, diff --git a/src/lambda/get-user-threads/index.py b/src/lambda/get-user-threads/index.py index 6337a20c9..f301a36e8 100644 --- a/src/lambda/get-user-threads/index.py +++ b/src/lambda/get-user-threads/index.py @@ -9,7 +9,7 @@ def get_user_threads(uid=""): # Query the GSI response = table.query( - IndexName='UidbyThreadIDIndex', # Replace with your actual GSI name + IndexName='UidbyThreadIDIndex', KeyConditionExpression=Key('uid').eq(uid), FilterExpression=Attr('new_comment').eq(True), ScanIndexForward=False # Sorting by thread_id in descending order