Skip to content

Commit

Permalink
RAI-15628 Add beta version of snowflake datastream apis (#100)
Browse files Browse the repository at this point in the history
* Add beta datastream apis

* add comments

* cleanup

* resolve pr comments

* version upgrade

* update change logs
  • Loading branch information
priti-pa authored Sep 22, 2023
1 parent dbf4478 commit c8efba1
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 30 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## v0.5.12-alpha
* Add beta version of snowflake datastream APIs.

## v0.5.11-alpha
* Add ability to start and stop engines.

Expand Down
120 changes: 95 additions & 25 deletions rai/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,14 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
//

const (
PathDatabase = "/database"
PathEngine = "/compute"
PathIntegrations = "/integration/v1alpha1/integrations"
PathOAuthClients = "/oauth-clients"
PathTransaction = "/transaction"
PathTransactions = "/transactions"
PathUsers = "/users"
PathDatabase = "/database"
PathEngine = "/compute"
PathIntegrationsAlpha = "/integration/v1alpha1/integrations"
PathIntegrationsBeta = "/integration/v1beta1/integrations"
PathOAuthClients = "/oauth-clients"
PathTransaction = "/transaction"
PathTransactions = "/transactions"
PathUsers = "/users"
)

func makePath(parts ...string) string {
Expand Down Expand Up @@ -1650,7 +1651,7 @@ func (c *Client) CreateSnowflakeIntegration(
req.Snowflake.Account = snowflakeAccount
req.Snowflake.Admin = *adminCreds
req.Snowflake.Proxy = *proxyCreds
if err := c.Post(PathIntegrations, nil, &req, &result); err != nil {
if err := c.Post(PathIntegrationsAlpha, nil, &req, &result); err != nil {
return nil, err
}
return &result, nil
Expand All @@ -1664,26 +1665,26 @@ func (c *Client) UpdateSnowflakeIntegration(
req.Snowflake.Proxy = *proxyCreds
req.RAI.ClientID = raiClientID
req.RAI.ClientSecret = raiClientSecret
return c.Patch(PathIntegrations, nil, &req, &result)
return c.Patch(PathIntegrationsAlpha, nil, &req, &result)
}

func (c *Client) DeleteSnowflakeIntegration(name string, adminCreds *SnowflakeCredentials) error {
req := deleteSnowflakeIntegrationRequest{}
req.Snowflake.Admin = *adminCreds
return c.Delete(makePath(PathIntegrations, name), nil, &req, nil)
return c.Delete(makePath(PathIntegrationsAlpha, name), nil, &req, nil)
}

func (c *Client) GetSnowflakeIntegration(name string) (*Integration, error) {
var result Integration
if err := c.Get(makePath(PathIntegrations, name), nil, nil, &result); err != nil {
if err := c.Get(makePath(PathIntegrationsAlpha, name), nil, nil, &result); err != nil {
return nil, err
}
return &result, nil
}

func (c *Client) ListSnowflakeIntegrations() ([]Integration, error) {
var result []Integration
if err := c.Get(PathIntegrations, nil, nil, &result); err != nil {
if err := c.Get(PathIntegrationsAlpha, nil, nil, &result); err != nil {
return nil, err
}
return result, nil
Expand All @@ -1705,7 +1706,7 @@ func (c *Client) CreateSnowflakeDatabaseLink(
integration, database, schema, role string, creds *SnowflakeCredentials,
) (*SnowflakeDatabaseLink, error) {
var result SnowflakeDatabaseLink
path := makePath(PathIntegrations, integration, "database-links")
path := makePath(PathIntegrationsAlpha, integration, "database-links")
req := createSnowflakeDatabaseLinkRequest{}
req.Snowflake.Database = database
req.Snowflake.Schema = schema
Expand All @@ -1722,7 +1723,7 @@ func (c *Client) UpdateSnowflakeDatabaseLink(
) error {
var result SnowflakeDatabaseLink
name := fmt.Sprintf("%s.%s", database, schema)
path := makePath(PathIntegrations, integration, "database-links", name)
path := makePath(PathIntegrationsAlpha, integration, "database-links", name)
req := updateSnowflakeDatabaseLinkRequest{}
req.Snowflake.Role = role
req.Snowflake.Credentials = *creds
Expand All @@ -1733,7 +1734,7 @@ func (c *Client) DeleteSnowflakeDatabaseLink(
integration, database, schema, role string, creds *SnowflakeCredentials,
) error {
name := fmt.Sprintf("%s.%s", database, schema)
path := makePath(PathIntegrations, integration, "database-links", name)
path := makePath(PathIntegrationsAlpha, integration, "database-links", name)
req := deleteSnowflakeDatabaseLinkRequest{}
req.Snowflake.Role = role
req.Snowflake.Credentials = *creds
Expand All @@ -1745,7 +1746,7 @@ func (c *Client) GetSnowflakeDatabaseLink(
) (*SnowflakeDatabaseLink, error) {
var result SnowflakeDatabaseLink
name := fmt.Sprintf("%s.%s", database, schema)
path := makePath(PathIntegrations, integration, "database-links", name)
path := makePath(PathIntegrationsAlpha, integration, "database-links", name)
if err := c.Get(path, nil, nil, &result); err != nil {
return nil, err
}
Expand All @@ -1756,7 +1757,7 @@ func (c *Client) ListSnowflakeDatabaseLinks(
integration string,
) ([]SnowflakeDatabaseLink, error) {
var result []SnowflakeDatabaseLink
path := makePath(PathIntegrations, integration, "database-links")
path := makePath(PathIntegrationsAlpha, integration, "database-links")
if err := c.Get(path, nil, nil, &result); err != nil {
return nil, err
}
Expand All @@ -1767,7 +1768,8 @@ func (c *Client) ListSnowflakeDatabaseLinks(
// Snowflake Data Streams
//

type DataStreamOpts struct {
// alpha
type DataStreamOpts_alpha struct {
RaiDatabase string
Relation string
ObjectName string
Expand All @@ -1779,11 +1781,11 @@ type DataStreamOpts struct {

// Creates a data stream to replicate data from a Snowflake table/view to a RAI relation.
func (c *Client) CreateSnowflakeDataStream(
integration, dbLink string, opts *DataStreamOpts,
integration, dbLink string, opts *DataStreamOpts_alpha,
) (*SnowflakeDataStream, error) {
var result SnowflakeDataStream
path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams")
req := createSnowflakeDataStreamRequest{}
path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams")
req := createSnowflakeDataStreamRequest_alpha{}
req.Snowflake.Object = opts.ObjectName
req.Snowflake.Role = opts.Role
req.Snowflake.Warehouse = opts.Warehouse
Expand All @@ -1801,7 +1803,7 @@ func (c *Client) CreateSnowflakeDataStream(
func (c *Client) DeleteSnowflakeDataStream(
integration, dbLink, objectName, role string, creds *SnowflakeCredentials,
) error {
path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams", objectName)
path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams", objectName)
req := deleteSnowflakeDataStreamRequest{}
req.Snowflake.Role = role
if creds != nil {
Expand All @@ -1814,7 +1816,7 @@ func (c *Client) GetSnowflakeDataStream(
integration, dbLink, objectName string,
) (*SnowflakeDataStream, error) {
var result SnowflakeDataStream
path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams", objectName)
path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams", objectName)
if err := c.Get(path, nil, nil, &result); err != nil {
return nil, err
}
Expand All @@ -1825,7 +1827,7 @@ func (c *Client) ListSnowflakeDataStreams(
integration, dbLink string,
) ([]SnowflakeDataStream, error) {
var result []SnowflakeDataStream
path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams")
path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams")
if err := c.Get(path, nil, nil, &result); err != nil {
return nil, err
}
Expand All @@ -1836,7 +1838,75 @@ func (c *Client) GetSnowflakeDataStreamStatus(
integration, dbLink, objectName string,
) (*SnowflakeDataStreamStatus, error) {
var result SnowflakeDataStreamStatus
path := makePath(PathIntegrations, integration, "database-links", dbLink, "data-streams", objectName, "status")
path := makePath(PathIntegrationsAlpha, integration, "database-links", dbLink, "data-streams", objectName, "status")
if err := c.Get(path, nil, nil, &result); err != nil {
return nil, err
}
return &result, nil
}

// beta

type DataStreamOpts struct {
RaiDatabase string
Relation string
ObjectName string
}

// Register snowflake datastream created by native app to replicate data from a Snowflake table/view to a RAI relation.
func (c *Client) RegisterSnowflakeDataStream(
integration string, opts *DataStreamOpts,
) (*SnowflakeDataStream, error) {
var result SnowflakeDataStream
path := makePath(PathIntegrationsBeta, integration, "data-streams")
req := createSnowflakeDataStreamRequest{}
req.Snowflake.Object = opts.ObjectName
req.RAI.Database = opts.RaiDatabase
req.RAI.Relation = opts.Relation
if err := c.Post(path, nil, &req, &result); err != nil {
return nil, err
}
return &result, nil
}

// Unregisters a datastream deleted by native app
func (c *Client) UnregisterSnowflakeDataStream(
integration, objectName string,
) error {
path := makePath(PathIntegrationsBeta, integration, "data-streams", objectName)
return c.Delete(path, nil, nil, nil)
}

// Get datastream registered by native app
func (c *Client) GetRegisteredSnowflakeDataStream(
integration, objectName string,
) (*SnowflakeDataStream, error) {
var result SnowflakeDataStream
path := makePath(PathIntegrationsBeta, integration, "data-streams", objectName)
if err := c.Get(path, nil, nil, &result); err != nil {
return nil, err
}
return &result, nil
}

// List datastreams registered by native app associated with the integration
func (c *Client) ListRegisteredSnowflakeDataStreams(
integration string,
) ([]SnowflakeDataStream, error) {
var result []SnowflakeDataStream
path := makePath(PathIntegrationsBeta, integration, "data-streams")
if err := c.Get(path, nil, nil, &result); err != nil {
return nil, err
}
return result, nil
}

// Get datastream status registered by native app
func (c *Client) GetRegisteredSnowflakeDataStreamStatus(
integration, objectName string,
) (*SnowflakeDataStreamStatus, error) {
var result SnowflakeDataStreamStatus
path := makePath(PathIntegrationsBeta, integration, "data-streams", objectName, "status")
if err := c.Get(path, nil, nil, &result); err != nil {
return nil, err
}
Expand Down
18 changes: 14 additions & 4 deletions rai/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,14 @@ type SnowflakeDataStream struct {
ID string `json:"id"`
Name string `json:"name"` // database.schema.object
Integration string `json:"integration"`
DbLink string `json:"dbLink"`
DbLink string `json:"dbLink,omitempty"` // Deprecated after alpha
CreatedBy string `json:"createdBy"`
CreatedOn string `json:"createdOn"`
State string `json:"state"`
Snowflake struct {
Database string `json:"database"`
Schema string `json:"schema"`
Object string `json:"object"` // fully qualified object name
Database string `json:"database,omitempty"` // Deprecated after alpha"`
Schema string `json:"schema,omitempty"` // Deprecated after alpha"`
Object string `json:"object"` // fully qualified object name
} `json:"snowflake"`
RAI struct {
Database string `json:"database"`
Expand All @@ -456,6 +456,16 @@ type SnowflakeDataStream struct {
}

type createSnowflakeDataStreamRequest struct {
Snowflake struct {
Object string `json:"object"` // fully qualified object name
} `json:"snowflake"`
RAI struct {
Database string `json:"database"`
Relation string `json:"relation"`
} `json:"rai"`
}

type createSnowflakeDataStreamRequest_alpha struct {
Snowflake struct {
Object string `json:"object"` // fully qualified object name
Role string `json:"role"`
Expand Down
2 changes: 1 addition & 1 deletion rai/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

package rai

const Version = "0.5.11-alpha"
const Version = "0.5.12-alpha"

0 comments on commit c8efba1

Please sign in to comment.