Skip to content

Commit

Permalink
[HOPSWORKS-1972] Featurestore connect using API key value (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Dowling authored Aug 31, 2020
1 parent 7aff980 commit 96934b7
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class HopsworksConnection implements Closeable {
@Getter
private String apiKeyFilePath;

@Getter
private String apiKeyValue;

private FeatureStoreApi featureStoreApi = new FeatureStoreApi();
private ProjectApi projectApi = new ProjectApi();
Expand All @@ -70,7 +72,8 @@ public class HopsworksConnection implements Closeable {
@Builder
public HopsworksConnection(String host, int port, String project, Region region, SecretStore secretStore,
boolean hostnameVerification, String trustStorePath,
String certPath, String apiKeyFilePath) throws IOException, FeatureStoreException {
String certPath, String apiKeyFilePath, String apiKeyValue)
throws IOException, FeatureStoreException {
this.host = host;
this.port = port;
this.project = project;
Expand All @@ -80,9 +83,10 @@ public HopsworksConnection(String host, int port, String project, Region region,
this.trustStorePath = trustStorePath;
this.certPath = certPath;
this.apiKeyFilePath = apiKeyFilePath;
this.apiKeyValue = apiKeyValue;

HopsworksClient hopsworksClient = HopsworksClient.setupHopsworksClient(host, port, region, secretStore,
hostnameVerification, trustStorePath, this.apiKeyFilePath);
hostnameVerification, trustStorePath, this.apiKeyFilePath, this.apiKeyValue);
projectObj = getProject();
hopsworksClient.downloadCredentials(projectObj, certPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static void setInstance(HopsworksClient instance) {

public static synchronized HopsworksClient setupHopsworksClient(String host, int port, Region region,
SecretStore secretStore, boolean hostnameVerification,
String trustStorePath, String apiKeyFilePath)
String trustStorePath, String apiKeyFilePath,
String apiKeyValue)
throws FeatureStoreException {
if (hopsworksClientInstance != null) {
return hopsworksClientInstance;
Expand All @@ -75,7 +76,7 @@ public static synchronized HopsworksClient setupHopsworksClient(String host, int
hopsworksHttpClient = new HopsworksInternalClient();
} else {
hopsworksHttpClient = new HopsworksExternalClient(host, port, region,
secretStore, hostnameVerification, trustStorePath, apiKeyFilePath);
secretStore, hostnameVerification, trustStorePath, apiKeyFilePath, apiKeyValue);
}
} catch (Exception e) {
throw new FeatureStoreException("Could not setup Hopsworks client", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,38 @@ public class HopsworksExternalClient implements HopsworksHttpClient {

private String apiKey = "";

public HopsworksExternalClient(String host, int port, Region region,
public HopsworksExternalClient(String host, int port, String apiKeyFilepath,
boolean hostnameVerification, String trustStorePath)
throws IOException, FeatureStoreException, KeyStoreException, CertificateException,
NoSuchAlgorithmException, KeyManagementException {
this(host, port, null, null, hostnameVerification, trustStorePath, apiKeyFilepath, null);
}

public HopsworksExternalClient(String host, int port, boolean hostnameVerification,
String trustStorePath, Region region, SecretStore secretStore)
throws IOException, FeatureStoreException, KeyStoreException, CertificateException,
NoSuchAlgorithmException, KeyManagementException {
this(host, port, region, secretStore, hostnameVerification, trustStorePath, null, null);
}


public HopsworksExternalClient(String host, int port, boolean hostnameVerification,
String trustStorePath, String apiKeyValue)
throws IOException, FeatureStoreException, KeyStoreException, CertificateException,
NoSuchAlgorithmException, KeyManagementException {
this(host, port, null, null, hostnameVerification, trustStorePath, null, apiKeyValue);
}

public HopsworksExternalClient(CloseableHttpClient httpClient, HttpHost httpHost) {
this.httpClient = httpClient;
this.httpHost = httpHost;
}

HopsworksExternalClient(String host, int port, Region region,
SecretStore secretStore, boolean hostnameVerification,
String trustStorePath, String apiKeyFilepath)
throws IOException, FeatureStoreException, KeyStoreException, CertificateException,
NoSuchAlgorithmException, KeyManagementException {
String trustStorePath, String apiKeyFilepath, String apiKeyValue)
throws IOException, FeatureStoreException, KeyStoreException, CertificateException,
NoSuchAlgorithmException, KeyManagementException {

httpHost = new HttpHost(host, port, "https");

Expand All @@ -86,16 +113,15 @@ public HopsworksExternalClient(String host, int port, Region region,
connectionPool.setDefaultMaxPerRoute(10);

httpClient = HttpClients.custom()
.setConnectionManager(connectionPool)
.setKeepAliveStrategy((httpResponse, httpContext) -> 30 * 1000)
.build();

apiKey = readApiKey(secretStore, region, apiKeyFilepath);
}
.setConnectionManager(connectionPool)
.setKeepAliveStrategy((httpResponse, httpContext) -> 30 * 1000)
.build();

public HopsworksExternalClient(CloseableHttpClient httpClient, HttpHost httpHost) {
this.httpClient = httpClient;
this.httpHost = httpHost;
if (!Strings.isNullOrEmpty(apiKeyValue)) {
this.apiKey = apiKeyValue;
} else {
this.apiKey = readApiKey(secretStore, region, apiKeyFilepath);
}
}

private Registry<ConnectionSocketFactory> createConnectionFactory(HttpHost httpHost, boolean hostnameVerification,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ public class TestHopsworksExternalClient {
.willReturn(success().body(HttpBodyConverter.json(credentials)))
));

// @Test
// public void testReadAPIKey() throws IOException, FeatureStoreException {
// CloseableHttpClient httpClient = HttpClients.createSystem();
// try {
// HopsworksConnection hc = HopsworksConnection.builder().host("35.241.253.100").hostnameVerification(false)
// .project("demo_featurestore_admin000")
// .apiKeyValue("ovVQksgJezSckjyK.ftO2YywCI6gZp4btlvWRnSDjSgyAQgCTRAoQTTSXBxPRMo0Dq029eAf3HVq3I6JO").build();
// System.out.println("Connected");
// FeatureStore fs = hc.getFeatureStore();
// Assert.assertTrue(fs != null);
// } catch (Exception e) {
// // Do not assert an error as this unit test method needs an external cluster
// }
// }

@Test
public void testReadAPIKeyFromFile() throws IOException, FeatureStoreException {
Path apiFilePath = Paths.get(System.getProperty("java.io.tmpdir"), "test.api");
Expand Down
2 changes: 2 additions & 0 deletions python/hsfs/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def init(
trust_store_path=None,
cert_folder=None,
api_key_file=None,
api_key_value=None,
):
global _client
if not _client:
Expand All @@ -46,6 +47,7 @@ def init(
trust_store_path,
cert_folder,
api_key_file,
api_key_value,
)


Expand Down
9 changes: 6 additions & 3 deletions python/hsfs/client/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
trust_store_path,
cert_folder,
api_key_file,
api_key_value,
):
"""Initializes a client in an external environment such as AWS Sagemaker."""
if not host:
Expand All @@ -55,9 +56,11 @@ def __init__(
self._cert_folder_base = cert_folder
self._cert_folder = os.path.join(cert_folder, host, project)

self._auth = auth.ApiKeyAuth(
self._get_secret(secrets_store, "api-key", api_key_file)
)
if api_key_value is not None:
api_key = api_key_value
else:
api_key = self._get_secret(secrets_store, "api-key", api_key_file)
self._auth = auth.ApiKeyAuth(api_key)

self._session = requests.session()
self._connected = True
Expand Down
43 changes: 43 additions & 0 deletions python/hsfs/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,32 @@
# limitations under the License.
#

"""
A feature store connection object. Clients in external clusters need to connect to the Hopsworks Feature Store using an API key. The API key is generated inside the Hopsworks platform, and requires at least the "project" and "featurestore" scopes to be able to access a feature store.
>>> # External Clusters (Sagemaker, Databricks, KubeFlow, etc)
>>> #
>>> from hsfs import feature_store as fs
>>> # You can connect to the feature store using an API key supplied by:
>>> # (1) a shared secret service
>>> # (2) a file that contains the API key
>>> # (3) a string containing the API key (insecure)
>>> #
>>> # Connect by downloading the API key from a shared secret service using an IAM Role (AWS Managed Secrets Service)
>>> fs.connect(host="abc.hopsworks.ai", project="prod_fs", region_name="eu-west-1", secrets_store="secretsmanager",
>>> hostname_verification=True)
>>>
>>> # Connecting using an API key stored in a local file
>>> fs.connect(host="abc.hopsworks.ai", project="prod_fs", region_name="eu-west-1", api_key_file="/home/john/.secrets/hsfs-api-key.txt",
>>> hostname_verification=True)
>>>
>>> # Connecting using an API key stored in a local file
>>> fs.connect(host="abc.hopsworks.ai", project="prod_fs", region_name="eu-west-1",
>>> api_key_value="PFcy3dZ6wLXYglRd.ydcdq5jH878IdG7xlL9lHVqrS8v3sBUqQgyR4xbpUgDnB5ZpYro6OxNnAzJ7RV6H", hostname_verification=True)
>>>
"""


import os
from requests.exceptions import ConnectionError

Expand All @@ -40,6 +66,7 @@ def __init__(
trust_store_path=None,
cert_folder=None,
api_key_file=None,
api_key_value=None,
):
self._host = host
self._port = port or self.HOPSWORKS_PORT_DEFAULT
Expand All @@ -52,6 +79,7 @@ def __init__(
self._trust_store_path = trust_store_path
self._cert_folder = cert_folder or self.CERT_FOLDER_DEFAULT
self._api_key_file = api_key_file
self._api_key_value = api_key_value
self._connected = False

self.connect()
Expand All @@ -68,6 +96,7 @@ def connection(
trust_store_path=None,
cert_folder=None,
api_key_file=None,
api_key_value=None,
):
return cls(
host,
Expand All @@ -79,6 +108,7 @@ def connection(
trust_store_path,
cert_folder,
api_key_file,
api_key_value,
)

@classmethod
Expand All @@ -93,6 +123,7 @@ def setup_databricks(
hostname_verification=True,
trust_store_path=None,
api_key_file=None,
api_key_value=None,
):
connection = cls(
host,
Expand All @@ -104,6 +135,7 @@ def setup_databricks(
trust_store_path,
cert_folder,
api_key_file,
api_key_value,
)

dbfs_folder = client.get_instance()._cert_folder_base
Expand Down Expand Up @@ -140,6 +172,7 @@ def connect(self):
os.path.join("/dbfs", self._api_key_file)
if self._api_key_file is not None
else None,
self._api_key_value,
)
engine.init("spark")
else:
Expand All @@ -155,6 +188,7 @@ def connect(self):
self._trust_store_path,
self._cert_folder,
self._api_key_file,
self._api_key_value,
)
engine.init(
"hive",
Expand Down Expand Up @@ -355,11 +389,20 @@ def cert_folder(self, cert_folder):
def api_key_file(self):
return self._api_key_file

@property
def api_key_value(self):
return self._api_key_value

@api_key_file.setter
@not_connected
def api_key_file(self, api_key_file):
self._api_key_file = api_key_file

@api_key_value.setter
@not_connected
def api_key_value(self, api_key_value):
self._api_key_value = api_key_value

def __enter__(self):
self.connect()
return self
Expand Down

0 comments on commit 96934b7

Please sign in to comment.