Skip to content

Commit

Permalink
feat: init forum image table (#327)
Browse files Browse the repository at this point in the history
* feat: init forum image table

* feat: instantiating forum thread image data pipeline

* feat: threadimgdatapipeline
  • Loading branch information
JasonNotJson authored Sep 22, 2023
1 parent f462005 commit 0a813a7
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 22 deletions.
106 changes: 85 additions & 21 deletions lib/constructs/persistence/data-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ import { Construct } from 'constructs';
import { syllabusSchedule } from '../../configs/event/schedule';
import { prodCorsRule } from '../../configs/s3/cors';
import { allowApiGatewayPolicy, allowLambdaPolicy } from '../../utils/s3';
import { SyllabusScraper, SyllabusUpdateFunction } from '../common/lambda-functions';
import {
SyllabusScraper,
SyllabusUpdateFunction,
} from '../common/lambda-functions';

export enum Worker {
SYLLABUS,
CAREER,
FEEDS
FEEDS,
THREADIMG,
}

export interface DataPipelineProps {
Expand Down Expand Up @@ -52,14 +56,21 @@ export class SyllabusDataPipeline extends AbstractDataPipeline {
allowApiGatewayPolicy(this.dataWarehouse);
allowLambdaPolicy(this.dataWarehouse);

const scraperBaseFunction: lambda.Function = new SyllabusScraper(this, 'scraper-base-function', {
envVars: {
['BUCKET_NAME']: this.dataWarehouse.bucketName,
['OBJECT_PATH']: 'syllabus/',
const scraperBaseFunction: lambda.Function = new SyllabusScraper(
this,
'scraper-base-function',
{
envVars: {
['BUCKET_NAME']: this.dataWarehouse.bucketName,
['OBJECT_PATH']: 'syllabus/',
},
},
}).baseFunction;
).baseFunction;

function getLambdaTaskInstance(schools: string[], num: string): sfn_tasks.LambdaInvoke {
function getLambdaTaskInstance(
schools: string[],
num: string,
): sfn_tasks.LambdaInvoke {
return new sfn_tasks.LambdaInvoke(scope, 'task-' + num, {
lambdaFunction: scraperBaseFunction,
comment: 'Scrape the syllabus info of school(s).',
Expand All @@ -79,9 +90,35 @@ export class SyllabusDataPipeline extends AbstractDataPipeline {
.next(getLambdaTaskInstance(['PSE', 'G_ASE', 'LAW'], '4'))
.next(getLambdaTaskInstance(['G_FSE', 'SOC', 'SSS'], '5'))
.next(getLambdaTaskInstance(['G_LAS', 'G_CSE', 'G_EDU', 'HUM'], '6'))
.next(getLambdaTaskInstance(['SILS', 'G_HUM', 'CJL', 'SPS', 'G_WBS', 'G_PS'], '7'))
.next(getLambdaTaskInstance(['G_SPS', 'G_IPS', 'G_WLS', 'G_E', 'G_SSS', 'G_SC', 'G_LAW',
'G_SAPS', 'G_SA', 'G_SJAL', 'G_SICCS', 'G_SEEE', 'EHUM', 'ART', 'CIE', 'G_ITS'], '8'))
.next(
getLambdaTaskInstance(
['SILS', 'G_HUM', 'CJL', 'SPS', 'G_WBS', 'G_PS'],
'7',
),
)
.next(
getLambdaTaskInstance(
[
'G_SPS',
'G_IPS',
'G_WLS',
'G_E',
'G_SSS',
'G_SC',
'G_LAW',
'G_SAPS',
'G_SA',
'G_SJAL',
'G_SICCS',
'G_SEEE',
'EHUM',
'ART',
'CIE',
'G_ITS',
],
'8',
),
)
.next(new sfn.Succeed(this, 'success', {})),
});

Expand Down Expand Up @@ -141,17 +178,44 @@ export class SyllabusSyncPipeline extends AbstractDataPipeline {
//Use exsisting s3 bucket
this.dataSource = props.dataSource!;

this.processor = new SyllabusUpdateFunction(this, 'syllabus-update-function', {
envVars: {
['BUCKET_NAME']: this.dataSource.bucketName,
['TABLE_NAME']: this.dataWarehouse.tableName,
['OBJECT_PATH']: 'syllabus/',
this.processor = new SyllabusUpdateFunction(
this,
'syllabus-update-function',
{
envVars: {
['BUCKET_NAME']: this.dataSource.bucketName,
['TABLE_NAME']: this.dataWarehouse.tableName,
['OBJECT_PATH']: 'syllabus/',
},
},
}).updateFunction;
).updateFunction;

this.processor.addEventSource(
new event_sources.S3EventSource(this.dataSource, {
events: [s3.EventType.OBJECT_CREATED_PUT],
filters: [{ prefix: 'syllabus/' }],
}),
);
}
}

export class ThreadImgDataPipeline extends AbstractDataPipeline {
readonly dataSource: s3.Bucket;
readonly processor: lambda.Function;
readonly dataWarehouse: s3.Bucket;

constructor(scope: Construct, id: string, props?: DataPipelineProps) {
super(scope, id);

this.processor.addEventSource(new event_sources.S3EventSource(this.dataSource, {
events: [s3.EventType.OBJECT_CREATED_PUT],
filters: [{ prefix: 'syllabus/' }],
}));
// Initialize S3 bucket for storing thread images
this.dataSource = new s3.Bucket(this, 'thread-img-bucket', {
accessControl: s3.BucketAccessControl.PRIVATE,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
bucketName: 'wasedatime-thread-img',
encryption: s3.BucketEncryption.S3_MANAGED,
publicReadAccess: false,
removalPolicy: RemovalPolicy.RETAIN,
versioned: true,
});
}
}
7 changes: 7 additions & 0 deletions lib/stacks/persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
CareerDataPipeline,
SyllabusDataPipeline,
SyllabusSyncPipeline,
ThreadImgDataPipeline,
Worker,
} from '../constructs/persistence/data-pipeline';
import { Collection, DynamoDatabase } from '../constructs/persistence/database';
Expand Down Expand Up @@ -44,6 +45,12 @@ export class WasedaTimePersistenceLayer extends PersistenceLayer {
},
);

const threadImgDataPipeline = new ThreadImgDataPipeline(
this,
'thread-img-data-pipeline',
);
this.dataPipelines[Worker.THREADIMG] = threadImgDataPipeline;

this.dataInterface.setEndpoint(
DataEndpoint.COURSE_REVIEWS,
dynamoDatabase.tables[Collection.COURSE_REVIEW].tableName,
Expand Down
2 changes: 1 addition & 1 deletion src/lambda/get-user-threads/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def get_user_threads(uid=""):

# Query the GSI
response = table.query(
IndexName='UidbyThreadIDIndex', # Replace with your actual GSI name
IndexName='UidbyThreadIDIndex',
KeyConditionExpression=Key('uid').eq(uid),
FilterExpression=Attr('new_comment').eq(True),
ScanIndexForward=False # Sorting by thread_id in descending order
Expand Down

0 comments on commit 0a813a7

Please sign in to comment.