-
Notifications
You must be signed in to change notification settings - Fork 4
/
index.js
executable file
·107 lines (96 loc) · 3 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env node
/* eslint-disable no-await-in-loop, import/no-extraneous-dependencies, no-cond-assign, no-param-reassign, max-classes-per-file */
const { createReadStream } = require('fs');
const parseCSV = require('csv-parse');
const DynamoDB = require('aws-sdk/clients/dynamodb');
const stream = require('stream');
const util = require('util');
const debug = require('debug')('dynamodb-csv-export-import');
const pipeline = util.promisify(stream.pipeline);
async function writeBatch(ddb, tableName, batch) {
debug('writeBatch: writing to %s batch %O', tableName, batch);
await ddb.batchWriteItem({
RequestItems: {
[tableName]: batch.map(record => ({
PutRequest: {
Item: record,
},
})),
},
}).promise();
}
const transformRecord = record => Object.entries(record)
.reduce((output, [key, value]) => {
const [, name, type] = /(\w+) \((\w+)\)/.exec(key);
if (!value) {
return output;
}
const contents = (['L', 'M', 'BOOL'].includes(type)) ? JSON.parse(value) : value;
output[name] = {
[type]: contents,
};
return output;
}, {});
class DynamoDBCSVRecordTransformer extends stream.Transform {
constructor(options = {}) {
super({ ...options, objectMode: true });
}
_transform(chunk, encoding, callback) {
try {
const transformed = transformRecord(chunk);
debug('transformRecord: transformed record %o', transformed);
callback(null,transformed);
} catch (error) {
callback(error);
}
}
}
class DynamoDBWriter extends stream.Writable {
constructor(options = {}) {
super({ ...options, objectMode: true });
this.tableName = options.tableName;
this.ddb = options.ddb;
this.batch = [];
}
async _write(chunk, encoding, callback) {
try {
this.batch.push(chunk);
if (this.batch.length >= 25) {
await writeBatch(this.ddb, this.tableName, this.batch);
this.batch = [];
}
callback();
} catch (error) {
callback(error);
}
}
async _final(callback) {
try {
if (this.batch.length > 0) {
await writeBatch(this.ddb, this.tableName, this.batch);
}
callback();
} catch (error) {
callback(error);
}
}
}
(async function writePipeline() {
const filename = process.argv[2];
const tablename = process.argv[3];
if (!filename || !tablename) {
console.log(`Usage: ${process.argv[0]} ${process.argv[1]} <csv_file> <target_table>`);
return;
}
const rs = createReadStream(filename);
const parser = parseCSV({ delimiter: ',', columns: true });
const ddb = new DynamoDB({ endpoint: process.env.DYNAMODB_ENDPOINT_URL, region: process.env.AWS_DEFAULT_REGION || 'us-east-1' });
const transformer = new DynamoDBCSVRecordTransformer();
const writer = new DynamoDBWriter({ tableName: tablename, ddb });
try {
await pipeline(rs, parser, transformer, writer);
console.log('Import completed');
} catch (error) {
console.error(`Fatal error running CSV transform pipeline: `, error);
}
}());