Skip to content

Commit

Permalink
Merge branch 'master' into 2209-footer-cutting-off-export-users-page
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyc1 authored Oct 13, 2023
2 parents 2b28d8d + 2e6cdc5 commit 576c367
Show file tree
Hide file tree
Showing 13 changed files with 422 additions and 99 deletions.
2 changes: 2 additions & 0 deletions backend/env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ staging:
PE_API_URL: ${ssm:/crossfeed/staging/PE_API_URL}
REPORTS_BUCKET_NAME: cisa-crossfeed-staging-reports
CLOUDWATCH_BUCKET_NAME: cisa-crossfeed-staging-cloudwatch
SQS_QUEUE_URL: { Ref: WorkerQueue }

prod:
DB_DIALECT: 'postgres'
Expand Down Expand Up @@ -78,6 +79,7 @@ prod:
PE_API_URL: ${ssm:/crossfeed/staging/PE_API_URL}
REPORTS_BUCKET_NAME: cisa-crossfeed-prod-reports
CLOUDWATCH_BUCKET_NAME: cisa-crossfeed-prod-cloudwatch
SQS_QUEUE_URL: { Ref: WorkerQueue }

dev-vpc:
securityGroupIds:
Expand Down
33 changes: 32 additions & 1 deletion backend/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ provider:
- ecs:RunTask
- ecs:ListTasks
- iam:PassRole
- logs:GetLogEvents
Resource: '*'
- Effect: Allow
Action:
Expand All @@ -56,19 +55,51 @@ provider:
- s3:GetObjectAcl
- s3:PutObject
- s3:PutObjectAcl
- s3:PutBucketAcl
- s3:GetBucketAcl
Resource: '*'
- Effect: Allow
Action:
- sts:AssumeRole
Resource: '*'
- Effect: Allow
Action:
- sqs:ReceiveMessage
- sqs:SendMessage
Resource: '*'
- Effect: Allow
Action:
- logs:CreateExportTask
- logs:CreateLogStream
- logs:Describe*
- logs:Get*
- logs:List*
- logs:PutLogEvents
- logs:StartQuery
- logs:StopQuery
- logs:TestMetricFilter
- logs:FilterLogEvents
- logs:StartLiveTail
- logs:StopLiveTail
Resource: '*'
- Effect: Allow
Action:
- ssm:DescribeParameters
- ssm:GetParameter
- ssm:GetParameters
- ssm:GetParametersByPath
- ssm:PutParameter
Resource: '*'

resources:
Resources:
WorkerQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:provider.stage}-worker-queue
VisibilityTimeout: 300 # Should match or exceed function timeout
MaximumMessageSize: 262144 # 256 KB
MessageRetentionPeriod: 604800 # 7 days

functions:
- ${file(./src/tasks/functions.yml)}
Expand Down
129 changes: 62 additions & 67 deletions backend/src/tasks/cloudwatchToS3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import {
CloudWatchLogsClient,
DescribeLogGroupsCommand,
DescribeLogGroupsRequest,
LogGroup,
ListTagsForResourceCommand,
LogGroup,
CreateExportTaskCommand
} from '@aws-sdk/client-cloudwatch-logs';
import {
Expand All @@ -14,117 +14,112 @@ import {

const logs = new CloudWatchLogsClient({});
const ssm = new SSMClient({});
const region = process.env.AWS_REGION || 'us-east-1';
const accountId = process.env.AWS_ACCOUNT_ID || '957221700844';
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

export const handler = async () => {
const extra_args: DescribeLogGroupsRequest = {};
let log_groups: LogGroup[] = [];
const log_groups_to_export: string[] = [];
const args: DescribeLogGroupsRequest = {};
let logGroups: LogGroup[] = [];
const logBucketName = process.env.CLOUDWATCH_BUCKET_NAME;
const stage = process.env.STAGE;

console.log(`logBucketName=${logBucketName}, stage=${stage}`);

if (!process.env.CLOUDWATCH_BUCKET_NAME) {
console.error('Error: CLOUDWATCH_BUCKET_NAME not defined');
if (!logBucketName || !stage) {
console.error(`Error: logBucketName or stage not defined`);
return;
}

console.log(
'--> CLOUDWATCH_BUCKET_NAME=' + process.env.CLOUDWATCH_BUCKET_NAME
);

while (true) {
const response = await logs.send(new DescribeLogGroupsCommand(extra_args));
log_groups = log_groups.concat(response.logGroups!);

if (!response.nextToken) {
const describeLogGroupsResponse = await logs.send(
new DescribeLogGroupsCommand(args)
);
logGroups = logGroups.concat(describeLogGroupsResponse.logGroups!);
if (!describeLogGroupsResponse.nextToken) {
break;
}
extra_args.nextToken = response.nextToken;
args.nextToken = describeLogGroupsResponse.nextToken;
}

for (const log_group of log_groups) {
const command = new ListTagsForResourceCommand({
resourceArn: `arn:aws:logs:${region}:${accountId}:log-group:${log_group.logGroupName}`
});
const response = await logs.send(command);
const log_group_tags = response.tags || {};

if (log_group_tags.ExportToS3 === 'true') {
log_groups_to_export.push(log_group.logGroupName!);
for (const logGroup of logGroups) {
const listTagsResponse = await logs.send(
new ListTagsForResourceCommand({
resourceArn: logGroup.arn
})
);
console.log(`listTagsResponse: ${JSON.stringify(listTagsResponse)}`);
const logGroupTags = listTagsResponse.tags || {};
if (logGroupTags.Stage !== stage) {
console.log(
`Skipping log group: ${logGroup.logGroupName} (no ${stage} tag)`
);
continue;
}
await delay(10 * 1000); // prevents LimitExceededException (AWS allows only one export task at a time)
}

for (const log_group_name of log_groups_to_export) {
const ssm_parameter_name = (
'/log-exporter-last-export/' + log_group_name
).replace('//', '/');
let ssm_value = '0';
const logGroupName = logGroup.logGroupName!;
console.log(`Processing log group: ${logGroupName}`);
const ssmParameterName = `last-export-to-s3/${logGroupName}`.replace(
'//',
'/'
);
let ssmValue = '0';

try {
const ssm_response = await ssm.send(
new GetParameterCommand({ Name: ssm_parameter_name })
const ssmResponse = await ssm.send(
new GetParameterCommand({ Name: ssmParameterName })
);
ssm_value = ssm_response.Parameter?.Value || '0';
console.log(`ssmResponse: ${JSON.stringify(ssmResponse)}`);
ssmValue = ssmResponse.Parameter?.Value || '0';
} catch (error) {
if (error.name !== 'ParameterNotFound') {
console.error('Error fetching SSM parameter: ' + error.message);
console.error(`Error fetching SSM parameter: ${JSON.stringify(error)}`);
}
console.error(`ssm.send error: ${JSON.stringify(error)}`);
}

const export_to_time = Math.round(Date.now());
const exportTime = Math.round(Date.now());

console.log(
'--> Exporting ' +
log_group_name +
' to ' +
process.env.CLOUDWATCH_BUCKET_NAME
);
console.log(`--> Exporting ${logGroupName} to ${logBucketName}`);

if (export_to_time - parseInt(ssm_value) < 24 * 60 * 60 * 1000) {
// Haven't been 24hrs from the last export of this log group
console.log(' Skipped until 24hrs from last export is completed');
if (exportTime - parseInt(ssmValue) < 24 * 60 * 60 * 1000) {
console.log(
'Skipped: log group was already exported in the last 24 hours'
);
continue;
}

try {
const response = await logs.send(
const exportTaskResponse = await logs.send(
new CreateExportTaskCommand({
logGroupName: log_group_name,
from: parseInt(ssm_value),
to: export_to_time,
destination: process.env.CLOUDWATCH_BUCKET_NAME,
destinationPrefix: log_group_name
.replace(/^\//, '')
.replace(/\/$/, '')
logGroupName: logGroupName,
from: parseInt(ssmValue),
to: exportTime,
destination: logBucketName,
destinationPrefix: logGroupName.replace(/^\/|\/$/g, '')
})
);

console.log(' Task created: ' + response.taskId);
console.log(`exportTaskResponse: ${JSON.stringify(exportTaskResponse)}`);
console.log(`Task created: ${exportTaskResponse.taskId}`);
await new Promise((resolve) => setTimeout(resolve, 5000));
} catch (error) {
if (error.name === 'LimitExceededException') {
console.log(
' Need to wait until all tasks are finished (LimitExceededException). Continuing later...'
);
console.log(JSON.stringify(error));
return;
}
console.error(
' Error exporting ' +
log_group_name +
': ' +
(error.message || JSON.stringify(error))
`Error exporting ${logGroupName}: ${JSON.stringify(error)}`
);
continue;
}

await ssm.send(
new PutParameterCommand({
Name: ssm_parameter_name,
Name: ssmParameterName,
Type: 'String',
Value: export_to_time.toString(),
Value: exportTime.toString(),
Overwrite: true
})
);
console.log(`SSM parameter updated: ${ssmParameterName}`);
}
// TODO: reevaluate the delay time after the first set of exports
await delay(30 * 1000); // mitigates LimitExceededException (AWS allows only one export task at a time)
};
13 changes: 13 additions & 0 deletions backend/src/tasks/functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ bastion:
makeGlobalAdmin:
handler: src/tasks/makeGlobalAdmin.handler

scanExecution:
handler: src/tasks/scanExecution.handler
timeout: 300 # 5 minutes
environment:
SQS_QUEUE_NAME: ${self:provider.stage}-worker-queue
events:
- sqs:
arn:
Fn::GetAtt:
- WorkerQueue
- Arn
batchSize: 5 # Number of messages the lambda can continue to process while a Fargate is still running

updateScanTaskStatus:
handler: src/tasks/updateScanTaskStatus.handler
events:
Expand Down
57 changes: 57 additions & 0 deletions backend/src/tasks/scanExecution.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Handler, SQSRecord } from 'aws-lambda';
import * as AWS from 'aws-sdk';

const ecs = new AWS.ECS();
const sqs = new AWS.SQS();

export const handler: Handler = async (event) => {
try {
// Get the SQS record and message body
const sqsRecord: SQSRecord = event.Records[0];
const body: string = sqsRecord.body;

console.log(body);

let commandOptions;
if (body === 'SHODAN') {
commandOptions = './worker/shodan.sh';
} else {
commandOptions = body;
}
// Run command in queue message in Fargate
const params: AWS.ECS.RunTaskRequest = {
cluster: process.env.FARGATE_CLUSTER_NAME!,
taskDefinition: process.env.FARGATE_TASK_DEFINITION_NAME!,
launchType: 'FARGATE',
networkConfiguration: {
awsvpcConfiguration: {
assignPublicIp: 'ENABLED',
securityGroups: [process.env.FARGATE_SG_ID!],
subnets: [process.env.FARGATE_SUBNET_ID!]
}
},
platformVersion: '1.4.0',
overrides: {
containerOverrides: [
{
name: 'main', // from task definition
command: [commandOptions] // Pass the command options as an array
}
]
}
};
const data = await ecs.runTask(params).promise();
console.log('Fargate task started:', data);

return {
statusCode: 200,
body: JSON.stringify('Fargate task started and message sent to SQS queue')
};
} catch (error) {
console.error('Error starting Fargate task:', error);
return {
statusCode: 500,
body: JSON.stringify('Error starting Fargate task')
};
}
};
38 changes: 38 additions & 0 deletions backend/src/tasks/test/scanExecution.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { handler } from '../scanExecution';
import { SQSRecord } from 'aws-lambda';

// Mock the AWS SDK methods using aws-sdk-mock
jest.mock('aws-sdk', () => {
return {
ECS: jest.fn(() => ({
runTask: jest.fn().mockReturnThis(),
promise: jest.fn()
})),
SQS: jest.fn(() => ({
sendMessage: jest.fn().mockReturnThis(),
promise: jest.fn()
}))
};
});

describe('Scan Execution', () => {
process.env.SQS_QUEUE_URL = 'YOUR_SQS_QUEUE_URL';
it('should handle the event', async () => {
const event = {
Records: [
{
body: 'test command',
eventSourceARN: 'SQSQueueARN'
} as SQSRecord
]
};

const result = await handler(event, {} as any, () => void 0);

// Add your assertions here
expect(result.statusCode).toEqual(200);
expect(result.body).toContain(
'Fargate task started and message sent to SQS queue'
);
});
});
11 changes: 11 additions & 0 deletions backend/worker/shodan.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

set -e

cd /app/pe-reports

echo "Starting Shodan"

pe-source shodan --orgs=DHS --soc_med_included

echo "Done"
Loading

0 comments on commit 576c367

Please sign in to comment.