Skip to content

Commit

Permalink
Merge pull request #8 from blinemedical/VIDEO-2016-verify-s3sink-seek…
Browse files Browse the repository at this point in the history
…-behavior

[VIDEO-2016] extend 'seek' feature set
  • Loading branch information
lyramcmillan authored Aug 16, 2022
2 parents c2aa549 + 9af8085 commit 26692dc
Show file tree
Hide file tree
Showing 20 changed files with 2,302 additions and 303 deletions.
30 changes: 18 additions & 12 deletions .github/workflows/c-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,27 @@ jobs:
strategy:
matrix:
os:
- ubuntu-18.04
- ubuntu-20.04
steps:
- name: Install dependencies (Ubuntu)
- name: Install AWS SDK (Linux)
if: runner.os == 'Linux'
run: >-
run: |
sudo apt-get update
sudo apt-get install -y curl libcurl4-openssl-dev cmake gstreamer1.0-plugins-base
libgstreamer1.0-dev python3-pip python3-setuptools
- uses: actions/checkout@v3
- run: pip3 install meson ninja
- run: 'git clone --depth 1 https://github.com/aws/aws-sdk-cpp.git -b 1.7.202'
- run: cmake -DBUILD_ONLY="s3;sts" aws-sdk-cpp
- run: make -j 4
- run: sudo make install
sudo apt-get install -y curl libcurl4-openssl-dev python3-setuptools python3-pip
pip3 install meson ninja
git clone --depth 1 --recurse-submodules https://github.com/aws/aws-sdk-cpp.git -b 1.9.210
cd aws-sdk-cpp
mkdir _build
cmake -S . -B _build -GNinja -DBUILD_ONLY="s3;sts" -DENABLE_TESTING=OFF
cmake --build _build
sudo cmake --install _build
- name: Install GStreamer (Linux)
if: runner.os == 'Linux'
uses: blinemedical/setup-gstreamer@main
with:
version: '1.20.2'
- name: Checkout Plugin
uses: actions/checkout@v3
- run: meson build
- run: ninja -C build
- run: 'export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH'
Expand Down
7 changes: 5 additions & 2 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ project('amazon-s3-gst-plugin', 'c', 'cpp',
default_options : [ 'warning_level=2',
'buildtype=debugoptimized' ])

gst_req = '>= 1.0.0'
aws_cpp_sdk_req = '>= 1.7.202'
gst_req = '>= 1.20.2'
aws_cpp_sdk_req = '>= 1.9.210'

gst_s3_version = meson.project_version()

c_args = ['-std=c17']
cpp_args = ['-std=c++17']

apiversion = '1.0'

glib_dep = dependency('glib-2.0')
Expand Down
86 changes: 70 additions & 16 deletions src/gsts3downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,42 @@

#include <gst/gst.h>

struct _GstS3Downloader {
_GstS3Downloader(const GstS3UploaderConfig *config);
namespace gst::aws::s3 {

bool _init_downloader(const GstS3UploaderConfig *config);
class Downloader {
public:
static std::unique_ptr<Downloader> create(const GstS3UploaderConfig *config)
{
auto downloader = std::unique_ptr<Downloader>(new Downloader(config));
if (!downloader->_init_downloader(config))
{
return nullptr;
}
return downloader;
}

size_t download_part(char * buffer, size_t first, size_t last);

private:
explicit Downloader(const GstS3UploaderConfig *config);

bool _init_downloader(const GstS3UploaderConfig *config);

Aws::String _bucket;
Aws::String _key;

std::shared_ptr<gst::aws::AwsApiHandle> _api_handle;
std::unique_ptr<Aws::S3::S3Client> _s3_client;
};

_GstS3Downloader::_GstS3Downloader(const GstS3UploaderConfig *config) :
Downloader::Downloader(const GstS3UploaderConfig *config) :
_bucket(std::move(get_bucket_from_config(config))),
_key(std::move(get_key_from_config(config))),
_api_handle(config->init_aws_sdk ? gst::aws::AwsApiHandle::GetHandle() : nullptr)
{
}

bool _GstS3Downloader::_init_downloader(const GstS3UploaderConfig *config)
bool Downloader::_init_downloader(const GstS3UploaderConfig *config)
{
Aws::Client::ClientConfiguration client_config;
if (!is_null_or_empty(config->ca_file))
Expand Down Expand Up @@ -101,7 +115,7 @@ bool _GstS3Downloader::_init_downloader(const GstS3UploaderConfig *config)
}

size_t
_GstS3Downloader::download_part(char* buffer, size_t first, size_t last)
Downloader::download_part(char* buffer, size_t first, size_t last)
{
char *range = g_strdup_printf("bytes=%ld-%ld", first, last);

Expand All @@ -125,31 +139,71 @@ _GstS3Downloader::download_part(char* buffer, size_t first, size_t last)
.gcount();
}

}; // gst::aws::s3

using gst::aws::s3::Downloader;

#define DOWNLOADER_(downloader) reinterpret_cast<GstS3DefaultDownloader*>(downloader);

typedef struct _GstS3DefaultDownloader GstS3DefaultDownloader;

struct _GstS3DefaultDownloader
{
GstS3Downloader base;
std::unique_ptr<Downloader> impl;

_GstS3DefaultDownloader(std::unique_ptr<Downloader> impl);
};

static void
gst_s3_default_downloader_destroy (GstS3Downloader *downloader)
{
delete DOWNLOADER_(downloader);
}

static size_t
gst_s3_default_downloader_download_part (GstS3Downloader *downloader, char* buff, size_t first, size_t last)
{
GstS3DefaultDownloader *self = DOWNLOADER_(downloader);
g_return_val_if_fail(self && self->impl, FALSE);
return self->impl->download_part (buff, first, last);
}

GstS3Downloader *
gst_s3_downloader_new (const GstS3UploaderConfig * config)
{
g_return_val_if_fail (config, NULL);

auto impl = new _GstS3Downloader(config);
auto impl = Downloader::create(config);

if (!impl->_init_downloader(config))
if (!impl)
{
delete impl;
return NULL;
}

return impl;
return reinterpret_cast <GstS3Downloader *>(new GstS3DefaultDownloader (std::move (impl)));
}

void
gst_s3_downloader_free (GstS3Downloader * downloader)
static GstS3DownloaderClass default_class = {
gst_s3_default_downloader_destroy,
gst_s3_default_downloader_download_part
};

_GstS3DefaultDownloader::_GstS3DefaultDownloader(std::unique_ptr<Downloader> impl) :
impl(std::move(impl))
{
delete reinterpret_cast<_GstS3Downloader *>(downloader);
base.klass = &default_class;
}

#define GET_CLASS_(downloader) ((GstS3Downloader*) (downloader))->klass

void
gst_s3_downloader_destroy (GstS3Downloader * downloader) {
GET_CLASS_ (downloader)->destroy(downloader);
}

gsize
gst_s3_downloader_download_part (GstS3Downloader * downloader,
gchar * buffer, gsize first, gsize last)
gst_s3_downloader_download_part (GstS3Downloader * downloader, gchar * buffer, gsize first, gsize last)
{
return reinterpret_cast<_GstS3Downloader *>(downloader)->download_part(buffer, first, last);
return GET_CLASS_ (downloader)->download_part (downloader, buffer, first, last);
}
11 changes: 10 additions & 1 deletion src/gsts3downloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,18 @@ G_BEGIN_DECLS

typedef struct _GstS3Downloader GstS3Downloader;

typedef struct {
void (*destroy) (GstS3Downloader *);
size_t (*download_part) (GstS3Downloader *, gchar *buffer, size_t first, size_t last);
} GstS3DownloaderClass;

struct _GstS3Downloader {
GstS3DownloaderClass *klass;
};

GstS3Downloader *gst_s3_downloader_new (const GstS3UploaderConfig * config);

void gst_s3_downloader_free (GstS3Downloader * downloader);
void gst_s3_downloader_destroy (GstS3Downloader * downloader);

gsize gst_s3_downloader_download_part (GstS3Downloader *
downloader, gchar * buffer, gsize first, gsize last);
Expand Down
56 changes: 51 additions & 5 deletions src/gsts3multipartuploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "gstawsutils.hpp"
#include "gstawscredentials.hpp"
#include "gstawsapihandle.hpp"
#include "gsts3uploaderpartcache.hpp"

#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
Expand Down Expand Up @@ -112,6 +113,11 @@ class PartStateCollection
std::lock_guard<std::mutex> l(_mtx);

int num = state.get_part_number();

// If previously uploaded, remove the old one.
if (_parts_completed.find(num) != _parts_completed.end())
_parts_completed.erase(num);

_insert(_parts_in_flight, num, std::move(state));
}

Expand Down Expand Up @@ -237,10 +243,16 @@ class MultipartUploader

~MultipartUploader();

bool upload(const char* data, size_t size);
bool upload(const char* data, size_t size, char** next, size_t *next_size);
bool upload_copy(const char* bucket, const char *key, size_t first, size_t last);
bool seek (size_t offset, char **buffer, size_t *size);
bool complete();

// AWS defines part numbers as inclusively 1-10000.
static bool part_num_range_check (int part_num) {
return (1 <= part_num && part_num <= 10000);
}

private:
explicit MultipartUploader(const GstS3UploaderConfig *config);
bool _init_uploader(const GstS3UploaderConfig * config);
Expand Down Expand Up @@ -270,6 +282,8 @@ class MultipartUploader

int _part_counter = 0;
bool _verify_hash = false;

UploaderPartCache _part_cache;
};

// TODO: There's a few things I didn't implement because they're not critical (yet), but might
Expand All @@ -286,7 +300,8 @@ MultipartUploader::MultipartUploader(const GstS3UploaderConfig *config) :
_bucket(std::move(get_bucket_from_config(config))),
_key(std::move(get_key_from_config(config))),
_api_handle(config->init_aws_sdk ? AwsApiHandle::GetHandle() : nullptr),
_part_states(std::make_shared<PartStateCollection>(false))
_part_states(std::make_shared<PartStateCollection>(false)),
_part_cache(config->cache_num_parts)
{
}

Expand Down Expand Up @@ -395,10 +410,13 @@ std::unique_ptr<Aws::IOStream> MultipartUploader::_create_stream(const char* dat
new Aws::IOStream(new Aws::Utils::Stream::PreallocatedStreamBuf(buffer, size)));
}

bool MultipartUploader::upload(const char* data, size_t size)
bool MultipartUploader::upload(const char* data, size_t size, char** next, size_t *next_size)
{
int part_number = ++_part_counter;

// Check the cache for the next part, if it exists.
_part_cache.get_copy(part_number+1, next, next_size);

std::shared_ptr<Aws::IOStream> stream = _create_stream(data, size);
Aws::S3::Model::UploadPartRequest request;
request.WithBucket(_bucket)
Expand All @@ -421,6 +439,8 @@ bool MultipartUploader::upload(const char* data, size_t size)

auto context = std::make_shared<MultipartUploaderContext>(_part_states, _buffer_manager, part_number);

_part_cache.insert_or_update(part_number, data, size);

_s3_client->UploadPartAsync(request, _handle_upload_completed, context);

return true;
Expand Down Expand Up @@ -449,11 +469,27 @@ bool MultipartUploader::upload_copy(const char* bucket, const char *key, size_t

auto context = std::make_shared<MultipartUploaderContext>(_part_states, _buffer_manager, part_number);

// Note the part in the cache but nothing was downloaded locally, so obv. we cannot cache it.
// However, knowing the size is helpful for finding other parts later, by offset.
_part_cache.insert_or_update(part_number, NULL, last - first);

_s3_client->UploadPartCopyAsync(request, _handle_upload_copy_completed, context);

return true;
}

bool MultipartUploader::seek (gsize offset, char **buffer, size_t *size)
{
int part_num = 0;
if (_part_cache.find(offset, &part_num, buffer, size) && (*buffer != NULL)) {
// set counter to previous value so the next upload() increments
// to the correct part number.
_part_counter = part_num - 1;
return true;
}
return false;
}

bool MultipartUploader::complete()
{
_part_states->wait_for_complete();
Expand Down Expand Up @@ -549,11 +585,11 @@ gst_s3_multipart_uploader_destroy (GstS3Uploader * uploader)

static gboolean
gst_s3_multipart_uploader_upload_part (GstS3Uploader *
uploader, const gchar * buffer, gsize size)
uploader, const gchar * buffer, gsize size, gchar **next, gsize *next_size)
{
GstS3MultipartUploader *self = MULTIPART_UPLOADER_ (uploader);
g_return_val_if_fail (self && self->impl, FALSE);
return self->impl->upload (buffer, size);
return self->impl->upload (buffer, size, next, next_size);
}

static gboolean
Expand All @@ -566,6 +602,15 @@ gst_s3_multipart_uploader_upload_part_copy (GstS3Uploader *
return self->impl->upload_copy (bucket, key, first, last);
}

static gboolean
gst_s3_multipart_uploader_seek (GstS3Uploader *
uploader, gsize offset, gchar **buffer, gsize *size)
{
GstS3MultipartUploader *self = MULTIPART_UPLOADER_ (uploader);
g_return_val_if_fail (self && self->impl, FALSE);
return self->impl->seek (offset, buffer, size);
}

static gboolean
gst_s3_multipart_uploader_complete (GstS3Uploader * uploader)
{
Expand All @@ -577,6 +622,7 @@ static GstS3UploaderClass default_class = {
gst_s3_multipart_uploader_destroy,
gst_s3_multipart_uploader_upload_part,
gst_s3_multipart_uploader_upload_part_copy,
gst_s3_multipart_uploader_seek,
gst_s3_multipart_uploader_complete
};

Expand Down
Loading

0 comments on commit 26692dc

Please sign in to comment.