Skip to content

Commit

Permalink
Merge pull request #17 from blinemedical/VIDEO-2517-Fix-uploader-race
Browse files Browse the repository at this point in the history
[VIDEO-2517] Fix Uploader Race Condition
  • Loading branch information
lyramcmillan authored Sep 18, 2023
2 parents b05a522 + 344eec9 commit 7f1a030
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
2 changes: 1 addition & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ if host_system == 'windows'
cpp_args = ['/std:c++20']
else
c_args = ['-std=c17']
cpp_args = ['-std=c++17']
cpp_args = ['-std=c++17', '-fexceptions']
endif

apiversion = '1.0'
Expand Down
52 changes: 42 additions & 10 deletions src/gsts3multipartuploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* Boston, MA 02110-1301, USA.
*/

#include <exception>

#include "gsts3multipartuploader.h"

#include "gstawsutils.hpp"
Expand Down Expand Up @@ -126,23 +128,39 @@ class PartStateCollection
void mark_part_as_completed(int part_number, const Aws::String& etag)
{
std::unique_lock<std::mutex> l(_mtx);

PartState state = std::move(_parts_in_flight.at(part_number));
_parts_in_flight.erase(part_number);
state.set_etag(etag);
_insert(_parts_completed, part_number, std::move(state));

// this is a super defensive just in case
// we added the recursive retry to the complete call in an attempt
// to avoid clearing the part states too early but we can't make
// a guarantee
try {
PartState state = std::move(_parts_in_flight.at(part_number));
_parts_in_flight.erase(part_number);
state.set_etag(etag);
_insert(_parts_completed, part_number, std::move(state));
}
catch (std::exception& ex) {
// no op - we don't have logging or a return value to set
// but this prevents an unhandled exception crash
}
l.unlock();
_upload_completed_cv.notify_one();
}

void mark_part_as_failed(int part_number)
{
std::unique_lock<std::mutex> l(_mtx);

_insert(_parts_failed, part_number, std::move(_parts_in_flight.at(part_number)));
_parts_in_flight.erase(part_number);

// this is a super defensive just in case
// we added the recursive retry to the complete call in an attempt
// to avoid clearing the part states too early but we can't make
// a guarantee
try {
_insert(_parts_failed, part_number, std::move(_parts_in_flight.at(part_number)));
_parts_in_flight.erase(part_number);
}
catch (std::exception& ex) {
// no op - we don't have logging or a return value to set
// but this prevents an unhandled exception crash
}
l.unlock();
_upload_completed_cv.notify_one();
}
Expand Down Expand Up @@ -183,6 +201,11 @@ class PartStateCollection
_parts_failed.clear();
}

bool parts_in_flight()
{
return !_parts_in_flight.empty();
}

private:
static void _insert(PartStateMap& map, int number, PartState part)
{
Expand Down Expand Up @@ -519,6 +542,15 @@ bool MultipartUploader::complete()
}

size_t parts_failed_count = _part_states->get_failed_parts_count();

// check to see if a new part was added while updating parts
// race condition on completion of stream due to
// lost connection and existing buffer flushing along with EOS
// if new part added - recursively retry the complete
// to avoid clearing part states to early and causing exceptions
if (_part_states->parts_in_flight()) {
return complete();
}
_part_states->clear();

Aws::S3::Model::CompleteMultipartUploadRequest upload_request;
Expand Down

0 comments on commit 7f1a030

Please sign in to comment.