Skip to content

Commit

Permalink
Merge pull request #151 from conlanpatrek/bulk-write
Browse files Browse the repository at this point in the history
Add bulkWrite implementation
  • Loading branch information
LinusU authored Jan 18, 2022
2 parents 85dfee8 + de16675 commit 224c2f9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 1 deletion.
64 changes: 63 additions & 1 deletion lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,69 @@ module.exports = function Collection(db, state) {
get writeConcern() { NotImplemented() },

aggregate: NotImplemented,
bulkWrite: NotImplemented,
bulkWrite: function (operations, options, callback) {
const promises = []

for (const operation of operations) {
let promise

// Determine which operation to forward to
if (operation.insertOne) {
const { document } = operation.insertOne
promise = this.insertOne(document, options)
} else if (operation.updateOne) {
const { filter, update, ...opts } = operation.updateOne
promise = this.updateOne(filter, update, { ...options, ...opts })
} else if (operation.updateMany) {
const { filter, update, ...opts } = operation.updateMany
promise = this.updateMany(filter, update, { ...options, ...opts })
} else if (operation.deleteOne) {
const { filter } = operation.deleteOne
promise = this.deleteOne(filter, options)
} else if (operation.deleteMany) {
const { filter } = operation.deleteMany
promise = this.deleteMany(filter, options)
} else if (operation.replaceOne) {
const { filter, replacement, ...opts } = operation.replaceOne
promise = this.replaceOne(filter, replacement, { ...options, ...opts })
} else {
throw Error('bulkWrite only supports insertOne, updateOne, updateMany, deleteOne, deleteMany')
}

// Add the operation results to the list
promises.push(promise)
}

Promise.all(promises).then(function(values) {
// Loop through all operation results, and aggregate
// the result object
let ops = []
let n = 0
for (const value of values) {
if (value.insertedId || value.insertedIds) {
ops = [...ops, ...value.ops]
}
n += value.result.n
}

callback(null, {
ops,
connection: db,
result: {
ok: 1,
n
}
})
}).catch(function (error) {
callback(error, null)
})

if (typeof callback !== 'function') {
return new Promise(function(resolve, reject) {
callback = function(e, r) { e ? reject(e) : resolve(r); };
});
}
},
count: count,
countDocuments: count,
estimatedDocumentCount: function(options, callback){
Expand Down
50 changes: 50 additions & 0 deletions test/mock.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,56 @@ describe('mock tests', function () {
});
});
}).timeout(0);

it('should bulk write', function(done) {
const setup = Promise.all([
collection.insertOne({ test: 1989, delete: true, many: false }),
collection.insertOne({ test: 1989, delete: true, many: true }),
collection.insertOne({ test: 1989, delete: true, many: true }),
collection.insertOne({ test: 1989, update: true, many: false }),
collection.insertOne({ test: 1989, update: true, many: true }),
collection.insertOne({ test: 1989, update: true, many: true })
])

setup
.then(() => {
return collection.bulkWrite([
{
insertOne: {
document: { test: 1989, inserted: true }
}
},
{ updateOne: {
filter: { test: 1989, update: true, many: false },
update: {
$set: { foo: 'bar' }
}
} },
{ updateMany: {
filter: { test: 1989, update: true, many: true },
update: {
$set: { baz: 'bing' }
}
} },
{ deleteOne: {
filter: { test: 1989, delete: true, many: false }
} },
{ deleteMany: {
filter: { test: 1989, delete: true, many: true }
} },
]).then(results => {
results.result.n.should.equal(7)

// Clean up, and assert that there are 4 records left
return collection.deleteMany({test: 1989})
.then(results => {
results.result.n.should.equal(4)
done()
})
})
})
.catch(err => done(err))
}).timeout(0);
});

describe('cursors', function() {
Expand Down

0 comments on commit 224c2f9

Please sign in to comment.