-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
200 lines (186 loc) · 5.65 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
const pump = require('pump')
const WorkBook = require('./lib/workbook.js')
const { Writable, Transform } = require('stream')
const FileType = require('stream-file-type')
const objectChunker = require('object-chunker')
const debug = require('debug')('salmon')
const Promise = require('bluebird')
const isEmpty = val => val === undefined || val === null || val === ''
const byIndex = (_val, i) => i
const END = Symbol('END')
/**
* Processes a stream containing an XLSX file.
* Calls the provided async `processor` function.
* The processor function must handle its own error, unhandled exceptions will
* cause the processing operation to fail.
* Returns a promise containing the number of processed rows
*/
module.exports = ({
inputStream,
mapColumns = byIndex,
onLineCount = () => {},
returnFormats = false,
formatting = true,
chunkSize = 1,
hasHeaders = true,
lenientColumnMapping = false
}) => {
let cols = null
let detected = false
let stream
let detector
let reader
const withFormats = (row, cols) => {
row.formats.shift()
const formats = {}
for (let i = 0; i < cols.length; i++) {
formats[cols[i]] = row.formats[i]
}
return {
values: transformRow(row, cols),
formats
}
}
const transformRow = (row, cols) => {
const rowObj = {}
for (let i = 0; i < row.values.length; i++) {
if (cols[i] === undefined) {
if (lenientColumnMapping) {
return rowObj
}
throw new Error(`Missing column name at index ${i}`)
}
rowObj[cols[i]] = row.values[i]
}
return rowObj
}
const onErr = err => {
if (!err) return
debug(err)
inputStream.destroy()
detector.destroy()
if (reader) { reader.removeAllListeners('row') }
if (stream) { stream.destroy(err) }
}
const rowTransformer = returnFormats ? withFormats : transformRow
return {
stream (limit = Infinity) {
let i = 0
detector = new FileType()
stream = new Transform({
objectMode: true,
transform (chunk, enc, cb) {
try {
this.push(rowTransformer(chunk, cols))
} catch (e) {
return onErr(e)
}
cb()
}
})
const checkAndPipe = fileType => {
if (!fileType || fileType.mime !== 'application/zip') {
onErr(new Error('Invalid file type'))
} else {
pump(detector, workBookReader, onErr)
workBookReader.on('end', () => {
if (!detected) {
onErr(new Error('Invalid file type'))
}
})
}
}
const readSheet = workSheetReader => {
detected = true
reader = workSheetReader
workSheetReader.workSheetStream.on('error', onErr)
// read only the first worksheet for now
if (workSheetReader.id > 1) { workSheetReader.skip(); return }
// worksheet reader is an event emitter - we have to convert it to a read stream
// signal stream end when the event emitter is finished
workSheetReader.on('end', async () => {
stream.end()
})
workSheetReader.process()
stream.on('drain', () => {
debug('resume stream')
workSheetReader.workSheetStream.resume()
})
workSheetReader.on('row', row => {
try {
if (row.values.every(isEmpty)) {
if (!cols && hasHeaders) { throw new Error('Header row is empty') } else { return }
}
if (i++ > limit) {
workBookReader.abort()
detector.destroy()
inputStream.destroy()
workSheetReader.workSheetStream.destroy()
stream.push(null)
return
}
row.values.shift()
debug('row received')
if (!cols) {
if (workSheetReader.sheetData.dimension) {
const lines = workSheetReader.sheetData.dimension[0].attributes.ref.match(/\d+$/)
onLineCount(parseInt(lines, 10) - 1)
}
cols = row.values.map(mapColumns)
if (!hasHeaders) {
stream.write(row)
} else if (row.values.every(isEmpty)) {
throw new Error('Empty header row')
}
} else {
const o = stream.write(row)
if (!o) {
debug('pausing stream')
workSheetReader.workSheetStream.pause()
}
}
} catch (err) {
onErr(err)
}
})
}
const workBookReader = new WorkBook({ formatting, returnFormats })
workBookReader.on('error', onErr)
workBookReader.on('worksheet', readSheet)
detector.on('file-type', checkAndPipe)
pump(inputStream, detector, onErr)
if (chunkSize > 1) {
const chunker = objectChunker(chunkSize)
return pump(stream, chunker, e => chunker.emit('error', e))
}
return stream
},
processor ({ onRow, limit }) {
let i = 0
const readStream = this.stream(limit)
return new Promise((resolve, reject) => {
const processRow = new Writable({
objectMode: true,
async write (row, encoding, cb) {
// if this fails, pump will cleanup the broken streams.
try {
await onRow(row, i)
i += chunkSize
} catch (err) {
return cb(err)
}
return cb(null)
}
})
pump(readStream, processRow, err => {
onErr(err)
if (err) {
reject(err)
} else {
resolve(i)
}
})
})
}
}
}