From c8efba1ac2120b18d0f3cccdec0c763e26251073 Mon Sep 17 00:00:00 2001 From: priti-pa <130400500+priti-pa@users.noreply.github.com> Date: Fri, 22 Sep 2023 16:14:21 -0400 Subject: [PATCH] RAI-15628 Add beta version of snowflake datastream apis (#100) * Add beta datastream apis * add comments * cleanup * resolve pr comments * version upgrade * update change logs --- CHANGELOG.md | 3 ++ rai/client.go | 120 ++++++++++++++++++++++++++++++++++++++----------- rai/models.go | 18 ++++++-- rai/version.go | 2 +- 4 files changed, 113 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a8a465..e9823db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## v0.5.12-alpha +* Add beta version of snowflake datastream APIs. + ## v0.5.11-alpha * Add ability to start and stop engines. diff --git a/rai/client.go b/rai/client.go index d1e9013..b181258 100644 --- a/rai/client.go +++ b/rai/client.go @@ -365,13 +365,14 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { // const ( - PathDatabase = "/database" - PathEngine = "/compute" - PathIntegrations = "/integration/v1alpha1/integrations" - PathOAuthClients = "/oauth-clients" - PathTransaction = "/transaction" - PathTransactions = "/transactions" - PathUsers = "/users" + PathDatabase = "/database" + PathEngine = "/compute" + PathIntegrationsAlpha = "/integration/v1alpha1/integrations" + PathIntegrationsBeta = "/integration/v1beta1/integrations" + PathOAuthClients = "/oauth-clients" + PathTransaction = "/transaction" + PathTransactions = "/transactions" + PathUsers = "/users" ) func makePath(parts ...string) string { @@ -1650,7 +1651,7 @@ func (c *Client) CreateSnowflakeIntegration( req.Snowflake.Account = snowflakeAccount req.Snowflake.Admin = *adminCreds req.Snowflake.Proxy = *proxyCreds - if err := c.Post(PathIntegrations, nil, &req, &result); err != nil { + if err := c.Post(PathIntegrationsAlpha, nil, &req, &result); err != nil { return nil, err } return &result, nil @@ -1664,18 +1665,18 @@ func (c *Client) UpdateSnowflakeIntegration( req.Snowflake.Proxy = *proxyCreds req.RAI.ClientID = raiClientID req.RAI.ClientSecret = raiClientSecret - return c.Patch(PathIntegrations, nil, &req, &result) + return c.Patch(PathIntegrationsAlpha, nil, &req, &result) } func (c *Client) DeleteSnowflakeIntegration(name string, adminCreds *SnowflakeCredentials) error { req := deleteSnowflakeIntegrationRequest{} req.Snowflake.Admin = *adminCreds - return c.Delete(makePath(PathIntegrations, name), nil, &req, nil) + return c.Delete(makePath(PathIntegrationsAlpha, name), nil, &req, nil) } func (c *Client) GetSnowflakeIntegration(name string) (*Integration, error) { var result Integration - if err := c.Get(makePath(PathIntegrations, name), nil, nil, &result); err != nil { + if err := c.Get(makePath(PathIntegrationsAlpha, name), nil, nil, &result); err != nil { return nil, err } return &result, nil @@ -1683,7 +1684,7 @@ func (c *Client) GetSnowflakeIntegration(name string) (*Integration, error) { func (c *Client) ListSnowflakeIntegrations() ([]Integration, error) { var result []Integration - if err := c.Get(PathIntegrations, nil, nil, &result); err != nil { + if err := c.Get(PathIntegrationsAlpha, nil, nil, &result); err != nil { return nil, err } return result, nil @@ -1705,7 +1706,7 @@ func (c *Client) CreateSnowflakeDatabaseLink( integration, database, schema, role string, creds *SnowflakeCredentials, ) (*SnowflakeDatabaseLink, error) { var result SnowflakeDatabaseLink - path := makePath(PathIntegrations, integration, "database-links") + path := makePath(PathIntegrationsAlpha, integration, "database-links") req := createSnowflakeDatabaseLinkRequest{} req.Snowflake.Database = database req.Snowflake.Schema = schema @@ -1722,7 +1723,7 @@ func (c *Client) UpdateSnowflakeDatabaseLink( ) error { var result SnowflakeDatabaseLink name := fmt.Sprintf("%s.%s", database, schema) - path := makePath(PathIntegrations, integration, "database-links", name) + path := makePath(PathIntegrationsAlpha, integration, "database-links", name) req := updateSnowflakeDatabaseLinkRequest{} req.Snowflake.Role = role req.Snowflake.Credentials = *creds @@ -1733,7 +1734,7 @@ func (c *Client) DeleteSnowflakeDatabaseLink( integration, database, schema, role string, creds *SnowflakeCredentials, ) error { name := fmt.Sprintf("%s.%s", database, schema) - path := makePath(PathIntegrations, integration, "database-links", name) + path := makePath(PathIntegrationsAlpha, integration, "database-links", name) req := deleteSnowflakeDatabaseLinkRequest{} req.Snowflake.Role = role req.Snowflake.Credentials = *creds @@ -1745,7 +1746,7 @@ func (c *Client) GetSnowflakeDatabaseLink( ) (*SnowflakeDatabaseLink, error) { var result SnowflakeDatabaseLink name := fmt.Sprintf("%s.%s", database, schema) - path := makePath(PathIntegrations, integration, "database-links", name) + path := makePath(PathIntegrationsAlpha, integration, "database-links", name) if err := c.Get(path, nil, nil, &result); err != nil { return nil, err } @@ -1756,7 +1757,7 @@ func (c *Client) ListSnowflakeDatabaseLinks( integration string, ) ([]SnowflakeDatabaseLink, error) { var result []SnowflakeDatabaseLink - path := makePath(PathIntegrations, integration, "database-links") + path := makePath(PathIntegrationsAlpha, integration, "database-links") if err := c.Get(path, nil, nil, &result); err != nil { return nil, err } @@ -1767,7 +1768,8 @@ func (c *Client) ListSnowflakeDatabaseLinks( // Snowflake Data Streams // -type DataStreamOpts struct { +// alpha +type DataStreamOpts_alpha struct { RaiDatabase string Relation string ObjectName string @@ -1779,11 +1781,11 @@ type DataStreamOpts struct { // Creates a data stream to replicate data from a Snowflake table/view to a RAI relation. func (c *Client) CreateSnowflakeDataStream( - integration, dbLink string, opts *DataStreamOpts, + integration, dbLink string, opts *DataStreamOpts_alpha, ) (*SnowflakeDataStream, error) { var result SnowflakeDataStream - path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams") - req := createSnowflakeDataStreamRequest{} + path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams") + req := createSnowflakeDataStreamRequest_alpha{} req.Snowflake.Object = opts.ObjectName req.Snowflake.Role = opts.Role req.Snowflake.Warehouse = opts.Warehouse @@ -1801,7 +1803,7 @@ func (c *Client) CreateSnowflakeDataStream( func (c *Client) DeleteSnowflakeDataStream( integration, dbLink, objectName, role string, creds *SnowflakeCredentials, ) error { - path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams", objectName) + path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams", objectName) req := deleteSnowflakeDataStreamRequest{} req.Snowflake.Role = role if creds != nil { @@ -1814,7 +1816,7 @@ func (c *Client) GetSnowflakeDataStream( integration, dbLink, objectName string, ) (*SnowflakeDataStream, error) { var result SnowflakeDataStream - path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams", objectName) + path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams", objectName) if err := c.Get(path, nil, nil, &result); err != nil { return nil, err } @@ -1825,7 +1827,7 @@ func (c *Client) ListSnowflakeDataStreams( integration, dbLink string, ) ([]SnowflakeDataStream, error) { var result []SnowflakeDataStream - path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams") + path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams") if err := c.Get(path, nil, nil, &result); err != nil { return nil, err } @@ -1836,7 +1838,75 @@ func (c *Client) GetSnowflakeDataStreamStatus( integration, dbLink, objectName string, ) (*SnowflakeDataStreamStatus, error) { var result SnowflakeDataStreamStatus - path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams", objectName, "status") + path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams", objectName, "status") + if err := c.Get(path, nil, nil, &result); err != nil { + return nil, err + } + return &result, nil +} + +// beta + +type DataStreamOpts struct { + RaiDatabase string + Relation string + ObjectName string +} + +// Register snowflake datastream created by native app to replicate data from a Snowflake table/view to a RAI relation. +func (c *Client) RegisterSnowflakeDataStream( + integration string, opts *DataStreamOpts, +) (*SnowflakeDataStream, error) { + var result SnowflakeDataStream + path := makePath(PathIntegrationsBeta, integration, "data-streams") + req := createSnowflakeDataStreamRequest{} + req.Snowflake.Object = opts.ObjectName + req.RAI.Database = opts.RaiDatabase + req.RAI.Relation = opts.Relation + if err := c.Post(path, nil, &req, &result); err != nil { + return nil, err + } + return &result, nil +} + +// Unregisters a datastream deleted by native app +func (c *Client) UnregisterSnowflakeDataStream( + integration, objectName string, +) error { + path := makePath(PathIntegrationsBeta, integration, "data-streams", objectName) + return c.Delete(path, nil, nil, nil) +} + +// Get datastream registered by native app +func (c *Client) GetRegisteredSnowflakeDataStream( + integration, objectName string, +) (*SnowflakeDataStream, error) { + var result SnowflakeDataStream + path := makePath(PathIntegrationsBeta, integration, "data-streams", objectName) + if err := c.Get(path, nil, nil, &result); err != nil { + return nil, err + } + return &result, nil +} + +// List datastreams registered by native app associated with the integration +func (c *Client) ListRegisteredSnowflakeDataStreams( + integration string, +) ([]SnowflakeDataStream, error) { + var result []SnowflakeDataStream + path := makePath(PathIntegrationsBeta, integration, "data-streams") + if err := c.Get(path, nil, nil, &result); err != nil { + return nil, err + } + return result, nil +} + +// Get datastream status registered by native app +func (c *Client) GetRegisteredSnowflakeDataStreamStatus( + integration, objectName string, +) (*SnowflakeDataStreamStatus, error) { + var result SnowflakeDataStreamStatus + path := makePath(PathIntegrationsBeta, integration, "data-streams", objectName, "status") if err := c.Get(path, nil, nil, &result); err != nil { return nil, err } diff --git a/rai/models.go b/rai/models.go index 84d9db0..a1632bb 100644 --- a/rai/models.go +++ b/rai/models.go @@ -440,14 +440,14 @@ type SnowflakeDataStream struct { ID string `json:"id"` Name string `json:"name"` // database.schema.object Integration string `json:"integration"` - DbLink string `json:"dbLink"` + DbLink string `json:"dbLink,omitempty"` // Deprecated after alpha CreatedBy string `json:"createdBy"` CreatedOn string `json:"createdOn"` State string `json:"state"` Snowflake struct { - Database string `json:"database"` - Schema string `json:"schema"` - Object string `json:"object"` // fully qualified object name + Database string `json:"database,omitempty"` // Deprecated after alpha"` + Schema string `json:"schema,omitempty"` // Deprecated after alpha"` + Object string `json:"object"` // fully qualified object name } `json:"snowflake"` RAI struct { Database string `json:"database"` @@ -456,6 +456,16 @@ type SnowflakeDataStream struct { } type createSnowflakeDataStreamRequest struct { + Snowflake struct { + Object string `json:"object"` // fully qualified object name + } `json:"snowflake"` + RAI struct { + Database string `json:"database"` + Relation string `json:"relation"` + } `json:"rai"` +} + +type createSnowflakeDataStreamRequest_alpha struct { Snowflake struct { Object string `json:"object"` // fully qualified object name Role string `json:"role"` diff --git a/rai/version.go b/rai/version.go index 8cb99d7..0482f58 100644 --- a/rai/version.go +++ b/rai/version.go @@ -14,4 +14,4 @@ package rai -const Version = "0.5.11-alpha" +const Version = "0.5.12-alpha"