Skip to content

Commit

Permalink
Add S3 ca path support
Browse files Browse the repository at this point in the history
  • Loading branch information
Phoebus Mak authored and phoebusm committed Mar 22, 2024
1 parent 3f4a18d commit 8a2e29c
Show file tree
Hide file tree
Showing 15 changed files with 276 additions and 37 deletions.
6 changes: 5 additions & 1 deletion cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ void register_bindings(py::module& storage) {
.def_property("bucket_name", &S3Override::bucket_name, &S3Override::set_bucket_name)
.def_property("region", &S3Override::region, &S3Override::set_region)
.def_property(
"use_virtual_addressing", &S3Override::use_virtual_addressing, &S3Override::set_use_virtual_addressing);
"use_virtual_addressing", &S3Override::use_virtual_addressing, &S3Override::set_use_virtual_addressing)
.def_property("ca_cert_path", &S3Override::ca_cert_path, &S3Override::set_ca_cert_path)
.def_property("ca_cert_dir", &S3Override::ca_cert_dir, &S3Override::set_ca_cert_dir)
.def_property("https", &S3Override::https, &S3Override::set_https)
.def_property("ssl", &S3Override::ssl, &S3Override::set_ssl);

py::class_<AzureOverride>(storage, "AzureOverride")
.def(py::init<>())
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/s3/s3_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ auto get_s3_config(const ConfigType& conf) {
const bool verify_ssl = ConfigsMap::instance()->get_int("S3Storage.VerifySSL", conf.ssl());
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Verify ssl: {}", verify_ssl);
client_configuration.verifySSL = verify_ssl;
if (client_configuration.verifySSL && (!conf.ca_cert_path().empty() || !conf.ca_cert_dir().empty())) {
client_configuration.caFile = conf.ca_cert_path();
client_configuration.caPath = conf.ca_cert_dir();
}
client_configuration.maxConnections = conf.max_connections() == 0 ?
ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", 16) :
conf.max_connections();
Expand Down
41 changes: 41 additions & 0 deletions cpp/arcticdb/storage/storage_override.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ class S3Override {
std::string endpoint_;
std::string bucket_name_;
std::string region_;
std::string ca_cert_path_;
std::string ca_cert_dir_;
bool use_virtual_addressing_ = false;
bool https_;
bool ssl_;

public:
std::string credential_name() const {
Expand Down Expand Up @@ -65,6 +69,39 @@ class S3Override {
use_virtual_addressing_ = use_virtual_addressing;
}

std::string ca_cert_path() const {
return ca_cert_path_;
}

void set_ca_cert_path(std::string_view ca_cert_path){
ca_cert_path_ = ca_cert_path;
}


std::string ca_cert_dir() const {
return ca_cert_dir_;
}

void set_ca_cert_dir(std::string_view ca_cert_dir){
ca_cert_dir_ = ca_cert_dir;
}

bool https() const {
return https_;
}

void set_https(bool https){
https_ = https;
}

bool ssl() const {
return ssl_;
}

void set_ssl(bool ssl){
ssl_ = ssl;
}

void modify_storage_config(arcticdb::proto::storage::VariantStorage& storage) const {
if(storage.config().Is<arcticdb::proto::s3_storage::Config>()) {
arcticdb::proto::s3_storage::Config s3_storage;
Expand All @@ -76,6 +113,10 @@ class S3Override {
s3_storage.set_endpoint(endpoint_);
s3_storage.set_region(region_);
s3_storage.set_use_virtual_addressing(use_virtual_addressing_);
s3_storage.set_ca_cert_path(ca_cert_path_);
s3_storage.set_ca_cert_dir(ca_cert_dir_);
s3_storage.set_https(https_);
s3_storage.set_ssl(ssl_);

util::pack_to_any(s3_storage, *storage.mutable_config());
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/proto/arcticc/pb2/nfs_backed_storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ message Config {
bool https = 10;
string region = 11;
bool use_virtual_addressing = 12;
string ca_cert_path = 13;
string ca_cert_dir = 14;
}
2 changes: 2 additions & 0 deletions cpp/proto/arcticc/pb2/s3_storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ message Config {
string region = 11;
bool use_virtual_addressing = 12;
bool use_mock_storage_for_testing = 13;
string ca_cert_path = 14;
string ca_cert_dir = 15;
}
3 changes: 2 additions & 1 deletion environment_unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies:
- prometheus-cpp
- libprotobuf < 4
- openssl
- libcurl
- libcurl==8.5.0 #Until https://github.com/conda-forge/curl-feedstock/issues/135 addressed and release build available
- bitmagic
- spdlog
- azure-core-cpp
Expand Down Expand Up @@ -80,3 +80,4 @@ dependencies:
- asv
- pymongo
- pytest
- trustme
35 changes: 35 additions & 0 deletions python/arcticdb/adapters/s3_library_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import re
import time
from typing import Optional
import ssl
import platform

from arcticdb.options import LibraryOptions
from arcticc.pb2.storage_pb2 import EnvironmentConfigsMap, LibraryConfig
Expand Down Expand Up @@ -45,6 +47,13 @@ class ParsedQuery:
# DEPRECATED - see https://github.com/man-group/ArcticDB/pull/833
force_uri_lib_config: Optional[bool] = True

# winhttp is used as s3 backend support on Winodws by default; winhttp itself mainatains ca cert.
# The options has no effect on Windows
CA_cert_path: Optional[str] = "" # CURLOPT_CAINFO in curl
CA_cert_dir: Optional[str] = "" # CURLOPT_CAPATH in curl

ssl: Optional[bool] = False


class S3LibraryAdapter(ArcticLibraryAdapter):
REGEX = r"s3(s)?://(?P<endpoint>.*):(?P<bucket>[-_a-zA-Z0-9.]+)(?P<query>\?.*)?"
Expand Down Expand Up @@ -73,6 +82,18 @@ def __init__(self, uri: str, encoding_version: EncodingVersion, *args, **kwargs)

self._https = uri.startswith("s3s")
self._encoding_version = encoding_version
if platform.system() != "Linux" and (self._query_params.CA_cert_path or self._query_params.CA_cert_dir):
raise ValueError("You have provided `ca_cert_path` or `ca_cert_dir` in the URI which is only supported on Linux. " \
"Remove the setting in the connection URI and use your operating system defaults.")
self._ca_cert_path = self._query_params.CA_cert_path
self._ca_cert_dir = self._query_params.CA_cert_dir
if not self._ca_cert_path and not self._ca_cert_dir and platform.system() == "Linux":
if ssl.get_default_verify_paths().cafile is not None:
self._ca_cert_path = ssl.get_default_verify_paths().cafile
if ssl.get_default_verify_paths().capath is not None:
self._ca_cert_dir = ssl.get_default_verify_paths().capath

self._ssl = self._query_params.ssl

if "amazonaws" in self._endpoint:
self._configure_aws()
Expand Down Expand Up @@ -103,6 +124,9 @@ def config_library(self):
with_prefix=with_prefix,
region=self._query_params.region,
use_virtual_addressing=self._query_params.use_virtual_addressing,
ca_cert_path=self._ca_cert_path,
ca_cert_dir=self._ca_cert_dir,
ssl=self._ssl,
)

lib = NativeVersionStore.create_store_from_config(
Expand Down Expand Up @@ -160,6 +184,14 @@ def get_storage_override(self) -> StorageOverride:
s3_override.endpoint = self._endpoint
if self._bucket:
s3_override.bucket_name = self._bucket
if self._https:
s3_override.https = self._https
if self._ca_cert_path:
s3_override.ca_cert_path = self._ca_cert_path
if self._ca_cert_dir:
s3_override.ca_cert_dir = self._ca_cert_dir
if self._ssl:
s3_override.ssl = self._ssl

s3_override.use_virtual_addressing = self._query_params.use_virtual_addressing

Expand Down Expand Up @@ -198,6 +230,9 @@ def get_library_config(self, name, library_options: LibraryOptions):
env_name=_DEFAULT_ENV,
region=self._query_params.region,
use_virtual_addressing=self._query_params.use_virtual_addressing,
ca_cert_path=self._ca_cert_path,
ca_cert_dir=self._ca_cert_dir,
ssl=self._ssl,
)

library_options.encoding_version = (
Expand Down
90 changes: 71 additions & 19 deletions python/arcticdb/storage_fixtures/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
import os
import re
import sys
import time
import trustme
import subprocess
import platform
from tempfile import mkdtemp


import requests
from typing import NamedTuple, Optional, Any, Type

from .api import *
from .utils import get_ephemeral_port, GracefulProcessUtils, wait_for_server_to_come_up
from .utils import get_ephemeral_port, GracefulProcessUtils, wait_for_server_to_come_up, safer_rmtree
from arcticc.pb2.storage_pb2 import EnvironmentConfigsMap
from arcticdb.version_store.helper import add_s3_library_to_env

Expand Down Expand Up @@ -57,6 +61,12 @@ def __init__(self, factory: "BaseS3StorageFixtureFactory", bucket: str):
self.arctic_uri += f"&port={port}"
if factory.default_prefix:
self.arctic_uri += f"&path_prefix={factory.default_prefix}"
if factory.ssl:
self.arctic_uri += "&ssl=True"
if platform.system() == "Linux":
if factory.client_cert_file:
self.arctic_uri += f"&CA_cert_path={self.factory.client_cert_file}"
# client_cert_dir is skipped on purpose; It will be test manually in other tests

def __exit__(self, exc_type, exc_value, traceback):
if self.factory.clean_bucket_on_fixture_exit:
Expand All @@ -81,7 +91,9 @@ def create_test_cfg(self, lib_name: str) -> EnvironmentConfigsMap:
is_https=self.factory.endpoint.startswith("s3s:"),
region=self.factory.region,
use_mock_storage_for_testing=self.factory.use_mock_storage_for_testing,
)
ssl=self.factory.ssl,
ca_cert_path=self.factory.client_cert_file,
)# client_cert_dir is skipped on purpose; It will be test manually in other tests
return cfg

def set_permission(self, *, read: bool, write: bool):
Expand Down Expand Up @@ -124,6 +136,11 @@ class BaseS3StorageFixtureFactory(StorageFixtureFactory):
clean_bucket_on_fixture_exit = True
use_mock_storage_for_testing = None # If set to true allows error simulation

def __init__(self):
self.client_cert_file = ""
self.client_cert_dir = ""
self.ssl = False

def __str__(self):
return f"{type(self).__name__}[{self.default_bucket or self.endpoint}]"

Expand All @@ -137,7 +154,8 @@ def _boto(self, service: str, key: Key, api="client"):
region_name=self.region,
aws_access_key_id=key.id,
aws_secret_access_key=key.secret,
)
verify=self.client_cert_file if self.client_cert_file else False,
) # verify=False cannot skip verification on buggy boto3 in py3.6

def create_fixture(self) -> S3Bucket:
return S3Bucket(self, self.default_bucket)
Expand Down Expand Up @@ -194,8 +212,12 @@ class MotoS3StorageFixtureFactory(BaseS3StorageFixtureFactory):
_bucket_id = 0
_live_buckets: List[S3Bucket] = []

def __init__(self, use_ssl: bool):
self.http_protocol = "https" if use_ssl else "http"


@staticmethod
def run_server(port):
def run_server(port, key_file, cert_file):
import werkzeug
from moto.server import DomainDispatcherApplication, create_backend_app

Expand Down Expand Up @@ -237,38 +259,68 @@ def __call__(self, environ, start_response):
self._reqs_till_rate_limit -= 1

return super().__call__(environ, start_response)

werkzeug.run_simple(
"0.0.0.0",
port,
_HostDispatcherApplication(create_backend_app),
threaded=True,
ssl_context=None,
ssl_context=(cert_file, key_file) if cert_file and key_file else None,
)

def _start_server(self):
port = self.port = get_ephemeral_port(2)
self.endpoint = f"http://{self.host}:{port}"
self._iam_endpoint = f"http://127.0.0.1:{port}"

p = self._p = multiprocessing.Process(target=self.run_server, args=(port,))
p.start()
wait_for_server_to_come_up(self.endpoint, "moto", p)
self.endpoint = f"{self.http_protocol}://{self.host}:{port}"
self.working_dir = mkdtemp(suffix="MotoS3StorageFixtureFactory")
self._iam_endpoint = f"{self.http_protocol}://localhost:{port}"

self.ssl = self.http_protocol == "https" # In real world, using https protocol doesn't necessarily mean ssl will be verified
if self.http_protocol == "https":
self.key_file = os.path.join(self.working_dir, "key.pem")
self.cert_file = os.path.join(self.working_dir, "cert.pem")
self.client_cert_file = os.path.join(self.working_dir, "client.pem")
ca = trustme.CA()
server_cert = ca.issue_cert("localhost")
server_cert.private_key_pem.write_to_path(self.key_file)
server_cert.cert_chain_pems[0].write_to_path(self.cert_file)
ca.cert_pem.write_to_path(self.client_cert_file)
self.client_cert_dir = self.working_dir
# Create the sym link for curl CURLOPT_CAPATH option; rehash only available on openssl >=1.1.1
subprocess.run(
f'ln -s "{self.client_cert_file}" "$(openssl x509 -hash -noout -in "{self.client_cert_file}")".0',
cwd=self.working_dir,
shell=True,
)
else:
self.key_file = ""
self.cert_file = ""
self.client_cert_file = ""
self.client_cert_dir = ""

self._p = multiprocessing.Process(
target=self.run_server,
args=(port,
self.key_file if self.http_protocol == "https" else None,
self.cert_file if self.http_protocol == "https" else None,
)
)
self._p.start()
wait_for_server_to_come_up(self.endpoint, "moto", self._p)

def _safe_enter(self):
for attempt in range(3): # For unknown reason, Moto, when running in pytest-xdist, will randomly fail to start
for _ in range(3): # For unknown reason, Moto, when running in pytest-xdist, will randomly fail to start
try:
self._start_server()
break
except AssertionError as e: # Thrown by wait_for_server_to_come_up
sys.stderr.write(repr(e))
GracefulProcessUtils.terminate(self._p)

self._s3_admin = self._boto("s3", self.default_key)
self._s3_admin = self._boto(service="s3", key=self.default_key)
return self

def __exit__(self, exc_type, exc_value, traceback):
GracefulProcessUtils.terminate(self._p)
safer_rmtree(self, self.working_dir)

def _create_user_get_key(self, user: str, iam=None):
iam = iam or self._iam_admin
Expand All @@ -286,7 +338,7 @@ def enforcing_permissions(self, enforcing: bool):
if enforcing == self._enforcing_permissions:
return
if enforcing and not self._iam_admin:
iam = self._boto("iam", self.default_key)
iam = self._boto(service="iam", key=self.default_key)

def _policy(*statements):
return json.dumps({"Version": "2012-10-17", "Statement": statements})
Expand All @@ -302,8 +354,8 @@ def _policy(*statements):

key = self._create_user_get_key("admin", iam)
iam.attach_user_policy(UserName="admin", PolicyArn=policy_arn)
self._iam_admin = self._boto("iam", key)
self._s3_admin = self._boto("s3", key)
self._iam_admin = self._boto(service="iam", key=key)
self._s3_admin = self._boto(service="s3", key=key)

# The number is the remaining requests before permission checks kick in
requests.post(self._iam_endpoint + "/moto-api/reset-auth", "0" if enforcing else "inf")
Expand All @@ -324,7 +376,7 @@ def cleanup_bucket(self, b: S3Bucket):
b.slow_cleanup(failure_consequence="The following delete bucket call will also fail. ")
self._s3_admin.delete_bucket(Bucket=b.bucket)
else:
requests.post(self._iam_endpoint + "/moto-api/reset")
requests.post(self._iam_endpoint + "/moto-api/reset", verify=False) # If CA cert verify fails, it will take ages for this line to finish
self._iam_admin = None


Expand Down
3 changes: 2 additions & 1 deletion python/arcticdb/storage_fixtures/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Union, Any
from contextlib import AbstractContextManager
from dataclasses import dataclass, field
import trustme

_WINDOWS = platform.system() == "Windows"
_DEBUG = os.getenv("ACTIONS_RUNNER_DEBUG", default=None) in (1, "True")
Expand Down Expand Up @@ -97,7 +98,7 @@ def wait_for_server_to_come_up(url: str, service: str, process: ProcessUnion, *,
assert alive(), service + " process died shortly after start up"
time.sleep(sleep)
try:
response = requests.get(url, timeout=req_timeout) # head() confuses Mongo
response = requests.get(url, timeout=req_timeout, verify=False) # head() confuses Mongo
if response.status_code < 500: # We might not have permission, so not requiring 2XX response
break
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
Expand Down
Loading

0 comments on commit 8a2e29c

Please sign in to comment.