Skip to content

Commit

Permalink
Cloudwatch to s3 lambda (#2305)
Browse files Browse the repository at this point in the history
* Add aws_s3_bucket_logging resource so that changes to cloudwatch_bucket are saved to logging bucket.

* Update lambda to export based on log group's stage tag.

* Add additional console and error logs.

* Add todo for delay.
  • Loading branch information
Matthew-Grayson authored Oct 13, 2023
1 parent b8f2717 commit 2e6cdc5
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
68 changes: 43 additions & 25 deletions backend/src/tasks/cloudwatchToS3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
CloudWatchLogsClient,
DescribeLogGroupsCommand,
DescribeLogGroupsRequest,
ListTagsForResourceCommand,
LogGroup,
CreateExportTaskCommand
} from '@aws-sdk/client-cloudwatch-logs';
Expand All @@ -19,46 +20,64 @@ export const handler = async () => {
const args: DescribeLogGroupsRequest = {};
let logGroups: LogGroup[] = [];
const logBucketName = process.env.CLOUDWATCH_BUCKET_NAME;
const stage = process.env.STAGE;

if (!logBucketName) {
console.error('Error: logBucketName not defined');
console.log(`logBucketName=${logBucketName}, stage=${stage}`);

if (!logBucketName || !stage) {
console.error(`Error: logBucketName or stage not defined`);
return;
}

console.log('--> logBucketName=' + logBucketName);

while (true) {
const response = await logs.send(new DescribeLogGroupsCommand(args));
logGroups = logGroups.concat(response.logGroups!);
if (!response.nextToken) {
const describeLogGroupsResponse = await logs.send(
new DescribeLogGroupsCommand(args)
);
logGroups = logGroups.concat(describeLogGroupsResponse.logGroups!);
if (!describeLogGroupsResponse.nextToken) {
break;
}
args.nextToken = response.nextToken;
args.nextToken = describeLogGroupsResponse.nextToken;
}

for (const logGroup of logGroups) {
const listTagsResponse = await logs.send(
new ListTagsForResourceCommand({
resourceArn: logGroup.arn
})
);
console.log(`listTagsResponse: ${JSON.stringify(listTagsResponse)}`);
const logGroupTags = listTagsResponse.tags || {};
if (logGroupTags.Stage !== stage) {
console.log(
`Skipping log group: ${logGroup.logGroupName} (no ${stage} tag)`
);
continue;
}
const logGroupName = logGroup.logGroupName!;
console.log('Processing log group: ' + logGroupName);
const ssmParameterName = (
'/log-exporter-last-export/' + logGroupName
).replace('//', '/');
console.log(`Processing log group: ${logGroupName}`);
const ssmParameterName = `last-export-to-s3/${logGroupName}`.replace(
'//',
'/'
);
let ssmValue = '0';

try {
const ssmResponse = await ssm.send(
new GetParameterCommand({ Name: ssmParameterName })
);
console.log(`ssmResponse: ${JSON.stringify(ssmResponse)}`);
ssmValue = ssmResponse.Parameter?.Value || '0';
} catch (error) {
if (error.name !== 'ParameterNotFound') {
console.error('Error fetching SSM parameter: ' + error.message);
console.error(`Error fetching SSM parameter: ${JSON.stringify(error)}`);
}
console.error(`error: ${error.message}`);
console.error(`ssm.send error: ${JSON.stringify(error)}`);
}

const exportTime = Math.round(Date.now());

console.log('--> Exporting ' + logGroupName + ' to ' + logBucketName);
console.log(`--> Exporting ${logGroupName} to ${logBucketName}`);

if (exportTime - parseInt(ssmValue) < 24 * 60 * 60 * 1000) {
console.log(
Expand All @@ -68,28 +87,25 @@ export const handler = async () => {
}

try {
const response = await logs.send(
const exportTaskResponse = await logs.send(
new CreateExportTaskCommand({
logGroupName: logGroupName,
from: parseInt(ssmValue),
to: exportTime,
destination: logBucketName,
destinationPrefix: logGroupName.replace(/^\//, '').replace(/\/$/, '')
destinationPrefix: logGroupName.replace(/^\/|\/$/g, '')
})
);

console.log(' Task created: ' + response.taskId);
console.log(`exportTaskResponse: ${JSON.stringify(exportTaskResponse)}`);
console.log(`Task created: ${exportTaskResponse.taskId}`);
await new Promise((resolve) => setTimeout(resolve, 5000));
} catch (error) {
if (error.name === 'LimitExceededException') {
console.log(error.message);
console.log(JSON.stringify(error));
return;
}
console.error(
'Error exporting ' +
logGroupName +
': ' +
(error.message || JSON.stringify(error))
`Error exporting ${logGroupName}: ${JSON.stringify(error)}`
);
continue;
}
Expand All @@ -102,6 +118,8 @@ export const handler = async () => {
Overwrite: true
})
);
console.log(`SSM parameter updated: ${ssmParameterName}`);
}
await delay(10 * 1000); // prevents LimitExceededException (AWS allows only one export task at a time)
// TODO: reevaluate the delay time after the first set of exports
await delay(30 * 1000); // mitigates LimitExceededException (AWS allows only one export task at a time)
};
6 changes: 6 additions & 0 deletions infrastructure/cloudwatch.tf
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,10 @@ resource "aws_s3_bucket_policy" "cloudwatch_bucket" {
}
]
})
}

resource "aws_s3_bucket_logging" "cloudwatch_bucket" {
bucket = aws_s3_bucket.cloudwatch_bucket.id
target_bucket = aws_s3_bucket.logging_bucket.id
target_prefix = "cloudwatch_bucket"
}

0 comments on commit 2e6cdc5

Please sign in to comment.