From ad5fa113da84a2a1aeaa275643cd586126e6f603 Mon Sep 17 00:00:00 2001 From: Jakub Michalak Date: Mon, 30 Sep 2024 16:44:31 +0200 Subject: [PATCH] feat: Upgrade stream sdk (#3105) - generate object assertions - add helpers client - rename methods in table helpers client for consistency - adjust and regenerate sdk - remove fields that are not present in Snowflake - add LastQueryId function - it's needed to test stream's AT|BEFORE - add missing tests - clean up integration tests - add some external table helper functions ## Test Plan * [x] integration tests * [x] unit tests ## References https://docs.snowflake.com/en/sql-reference/sql/create-stream ## TODO Adjust SHOW field types. Add new resources for each type of stream. Adjust stream data source. --- .../assert/objectassert/gen/sdk_object_def.go | 5 + .../objectassert/stream_snowflake_ext.go | 40 ++ .../objectassert/stream_snowflake_gen.go | 230 ++++++++++ .../stream_show_output_gen.go | 122 +++++ .../helpers/external_table_client.go | 23 + pkg/acceptance/helpers/stream_client.go | 69 +++ .../{streamlit.go => streamlit_client.go} | 0 pkg/acceptance/helpers/table_client.go | 41 +- pkg/acceptance/helpers/test_client.go | 2 + pkg/datasources/streams.go | 6 +- pkg/resources/stream.go | 30 +- pkg/resources/view_acceptance_test.go | 6 +- pkg/schemas/stream_gen.go | 7 - pkg/sdk/context_functions.go | 15 + pkg/sdk/external_tables_dto.go | 4 + pkg/sdk/streams_def.go | 12 +- pkg/sdk/streams_dto_builders_gen.go | 194 ++++---- pkg/sdk/streams_dto_gen.go | 10 +- pkg/sdk/streams_gen.go | 26 +- pkg/sdk/streams_gen_test.go | 68 ++- pkg/sdk/streams_impl_gen.go | 58 +-- pkg/sdk/streams_validations_gen.go | 52 ++- pkg/sdk/tables_dto.go | 4 + .../context_functions_integration_test.go | 8 + ...cortex_search_services_integration_test.go | 4 +- .../testint/dynamic_table_integration_test.go | 8 +- .../testint/event_tables_integration_test.go | 2 +- pkg/sdk/testint/grants_integration_test.go | 30 +- ...materialized_views_gen_integration_test.go | 4 +- pkg/sdk/testint/pipes_integration_test.go | 14 +- pkg/sdk/testint/schemas_integration_test.go | 2 +- .../testint/streams_gen_integration_test.go | 419 +++++++++--------- .../system_functions_integration_test.go | 4 +- pkg/sdk/testint/tables_integration_test.go | 16 +- pkg/sdk/testint/views_gen_integration_test.go | 4 +- v1-preparations/ESSENTIAL_GA_OBJECTS.MD | 2 +- 36 files changed, 1092 insertions(+), 449 deletions(-) create mode 100644 pkg/acceptance/bettertestspoc/assert/objectassert/stream_snowflake_ext.go create mode 100644 pkg/acceptance/bettertestspoc/assert/objectassert/stream_snowflake_gen.go create mode 100644 pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert/stream_show_output_gen.go create mode 100644 pkg/acceptance/helpers/stream_client.go rename pkg/acceptance/helpers/{streamlit.go => streamlit_client.go} (100%) diff --git a/pkg/acceptance/bettertestspoc/assert/objectassert/gen/sdk_object_def.go b/pkg/acceptance/bettertestspoc/assert/objectassert/gen/sdk_object_def.go index 9fe810dfbe..5596f476a6 100644 --- a/pkg/acceptance/bettertestspoc/assert/objectassert/gen/sdk_object_def.go +++ b/pkg/acceptance/bettertestspoc/assert/objectassert/gen/sdk_object_def.go @@ -62,6 +62,11 @@ var allStructs = []SdkObjectDef{ ObjectType: sdk.ObjectTypeTask, ObjectStruct: sdk.Task{}, }, + { + IdType: "sdk.SchemaObjectIdentifier", + ObjectType: sdk.ObjectTypeStream, + ObjectStruct: sdk.Stream{}, + }, } func GetSdkObjectDetails() []genhelpers.SdkObjectDetails { diff --git a/pkg/acceptance/bettertestspoc/assert/objectassert/stream_snowflake_ext.go b/pkg/acceptance/bettertestspoc/assert/objectassert/stream_snowflake_ext.go new file mode 100644 index 0000000000..cb474d651e --- /dev/null +++ b/pkg/acceptance/bettertestspoc/assert/objectassert/stream_snowflake_ext.go @@ -0,0 +1,40 @@ +package objectassert + +import ( + "fmt" + "testing" + + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" +) + +func (s *StreamAssert) HasTableId(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.TableName == nil { + return fmt.Errorf("expected table name to have value; got: nil") + } + gotTableId, err := sdk.ParseSchemaObjectIdentifier(*o.TableName) + if err != nil { + return err + } + if gotTableId.FullyQualifiedName() != expected { + return fmt.Errorf("expected table name: %v; got: %v", expected, *o.TableName) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasStageName(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.TableName == nil { + return fmt.Errorf("expected table name to have value; got: nil") + } + if *o.TableName != expected { + return fmt.Errorf("expected table name: %v; got: %v", expected, *o.TableName) + } + return nil + }) + return s +} diff --git a/pkg/acceptance/bettertestspoc/assert/objectassert/stream_snowflake_gen.go b/pkg/acceptance/bettertestspoc/assert/objectassert/stream_snowflake_gen.go new file mode 100644 index 0000000000..95b71b3f0f --- /dev/null +++ b/pkg/acceptance/bettertestspoc/assert/objectassert/stream_snowflake_gen.go @@ -0,0 +1,230 @@ +// Code generated by assertions generator; DO NOT EDIT. + +package objectassert + +import ( + "fmt" + "testing" + "time" + + acc "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance" + + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" +) + +type StreamAssert struct { + *assert.SnowflakeObjectAssert[sdk.Stream, sdk.SchemaObjectIdentifier] +} + +func Stream(t *testing.T, id sdk.SchemaObjectIdentifier) *StreamAssert { + t.Helper() + return &StreamAssert{ + assert.NewSnowflakeObjectAssertWithProvider(sdk.ObjectTypeStream, id, acc.TestClient().Stream.Show), + } +} + +func StreamFromObject(t *testing.T, stream *sdk.Stream) *StreamAssert { + t.Helper() + return &StreamAssert{ + assert.NewSnowflakeObjectAssertWithObject(sdk.ObjectTypeStream, stream.ID(), stream), + } +} + +func (s *StreamAssert) HasCreatedOn(expected time.Time) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.CreatedOn != expected { + return fmt.Errorf("expected created on: %v; got: %v", expected, o.CreatedOn) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasName(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.Name != expected { + return fmt.Errorf("expected name: %v; got: %v", expected, o.Name) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasDatabaseName(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.DatabaseName != expected { + return fmt.Errorf("expected database name: %v; got: %v", expected, o.DatabaseName) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasSchemaName(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.SchemaName != expected { + return fmt.Errorf("expected schema name: %v; got: %v", expected, o.SchemaName) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasOwner(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.Owner == nil { + return fmt.Errorf("expected owner to have value; got: nil") + } + if *o.Owner != expected { + return fmt.Errorf("expected owner: %v; got: %v", expected, *o.Owner) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasComment(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.Comment == nil { + return fmt.Errorf("expected comment to have value; got: nil") + } + if *o.Comment != expected { + return fmt.Errorf("expected comment: %v; got: %v", expected, *o.Comment) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasTableName(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.TableName == nil { + return fmt.Errorf("expected table name to have value; got: nil") + } + if *o.TableName != expected { + return fmt.Errorf("expected table name: %v; got: %v", expected, *o.TableName) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasSourceType(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.SourceType == nil { + return fmt.Errorf("expected source type to have value; got: nil") + } + if *o.SourceType != expected { + return fmt.Errorf("expected source type: %v; got: %v", expected, *o.SourceType) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasBaseTables(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.BaseTables == nil { + return fmt.Errorf("expected base tables to have value; got: nil") + } + if *o.BaseTables != expected { + return fmt.Errorf("expected base tables: %v; got: %v", expected, *o.BaseTables) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasType(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.Type == nil { + return fmt.Errorf("expected type to have value; got: nil") + } + if *o.Type != expected { + return fmt.Errorf("expected type: %v; got: %v", expected, *o.Type) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasStale(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.Stale == nil { + return fmt.Errorf("expected stale to have value; got: nil") + } + if *o.Stale != expected { + return fmt.Errorf("expected stale: %v; got: %v", expected, *o.Stale) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasMode(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.Mode == nil { + return fmt.Errorf("expected mode to have value; got: nil") + } + if *o.Mode != expected { + return fmt.Errorf("expected mode: %v; got: %v", expected, *o.Mode) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasStaleAfter(expected time.Time) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.StaleAfter == nil { + return fmt.Errorf("expected stale after to have value; got: nil") + } + if *o.StaleAfter != expected { + return fmt.Errorf("expected stale after: %v; got: %v", expected, *o.StaleAfter) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasInvalidReason(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.InvalidReason == nil { + return fmt.Errorf("expected invalid reason to have value; got: nil") + } + if *o.InvalidReason != expected { + return fmt.Errorf("expected invalid reason: %v; got: %v", expected, *o.InvalidReason) + } + return nil + }) + return s +} + +func (s *StreamAssert) HasOwnerRoleType(expected string) *StreamAssert { + s.AddAssertion(func(t *testing.T, o *sdk.Stream) error { + t.Helper() + if o.OwnerRoleType == nil { + return fmt.Errorf("expected owner role type to have value; got: nil") + } + if *o.OwnerRoleType != expected { + return fmt.Errorf("expected owner role type: %v; got: %v", expected, *o.OwnerRoleType) + } + return nil + }) + return s +} diff --git a/pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert/stream_show_output_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert/stream_show_output_gen.go new file mode 100644 index 0000000000..32394c1dce --- /dev/null +++ b/pkg/acceptance/bettertestspoc/assert/resourceshowoutputassert/stream_show_output_gen.go @@ -0,0 +1,122 @@ +// Code generated by assertions generator; DO NOT EDIT. + +package resourceshowoutputassert + +import ( + "testing" + "time" + + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" +) + +// to ensure sdk package is used +var _ = sdk.Object{} + +type StreamShowOutputAssert struct { + *assert.ResourceAssert +} + +func StreamShowOutput(t *testing.T, name string) *StreamShowOutputAssert { + t.Helper() + + s := StreamShowOutputAssert{ + ResourceAssert: assert.NewResourceAssert(name, "show_output"), + } + s.AddAssertion(assert.ValueSet("show_output.#", "1")) + return &s +} + +func ImportedStreamShowOutput(t *testing.T, id string) *StreamShowOutputAssert { + t.Helper() + + s := StreamShowOutputAssert{ + ResourceAssert: assert.NewImportedResourceAssert(id, "show_output"), + } + s.AddAssertion(assert.ValueSet("show_output.#", "1")) + return &s +} + +//////////////////////////// +// Attribute value checks // +//////////////////////////// + +func (s *StreamShowOutputAssert) HasCreatedOn(expected time.Time) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("created_on", expected.String())) + return s +} + +func (s *StreamShowOutputAssert) HasName(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("name", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasDatabaseName(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("database_name", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasSchemaName(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("schema_name", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasTableOn(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("table_on", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasOwner(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("owner", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasComment(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("comment", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasTableName(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("table_name", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasSourceType(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("source_type", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasBaseTables(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("base_tables", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasType(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("type", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasStale(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("stale", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasMode(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("mode", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasStaleAfter(expected time.Time) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("stale_after", expected.String())) + return s +} + +func (s *StreamShowOutputAssert) HasInvalidReason(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("invalid_reason", expected)) + return s +} + +func (s *StreamShowOutputAssert) HasOwnerRoleType(expected string) *StreamShowOutputAssert { + s.AddAssertion(assert.ResourceShowOutputValueSet("owner_role_type", expected)) + return s +} diff --git a/pkg/acceptance/helpers/external_table_client.go b/pkg/acceptance/helpers/external_table_client.go index 950bd1e416..3766a360d5 100644 --- a/pkg/acceptance/helpers/external_table_client.go +++ b/pkg/acceptance/helpers/external_table_client.go @@ -32,3 +32,26 @@ func (c *ExternalTableClient) PublishDataToStage(t *testing.T, stageId sdk.Schem _, err := c.context.client.ExecForTests(ctx, fmt.Sprintf(`copy into @%s/external_tables_test_data/test_data from (select parse_json('%s')) overwrite = true`, stageId.FullyQualifiedName(), string(data))) require.NoError(t, err) } + +func (c *ExternalTableClient) CreateOnTableWithRequest(t *testing.T, req *sdk.CreateExternalTableRequest) (*sdk.ExternalTable, func()) { + t.Helper() + ctx := context.Background() + + err := c.client().Create(ctx, req) + require.NoError(t, err) + + stream, err := c.client().ShowByID(ctx, req.GetName()) + require.NoError(t, err) + + return stream, c.DropFunc(t, req.GetName()) +} + +func (c *ExternalTableClient) DropFunc(t *testing.T, id sdk.SchemaObjectIdentifier) func() { + t.Helper() + ctx := context.Background() + + return func() { + err := c.client().Drop(ctx, sdk.NewDropExternalTableRequest(id).WithIfExists(true)) + require.NoError(t, err) + } +} diff --git a/pkg/acceptance/helpers/stream_client.go b/pkg/acceptance/helpers/stream_client.go new file mode 100644 index 0000000000..7a32a464b5 --- /dev/null +++ b/pkg/acceptance/helpers/stream_client.go @@ -0,0 +1,69 @@ +package helpers + +import ( + "context" + "testing" + + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" + "github.com/stretchr/testify/require" +) + +type StreamClient struct { + context *TestClientContext + ids *IdsGenerator +} + +func NewStreamClient(context *TestClientContext, idsGenerator *IdsGenerator) *StreamClient { + return &StreamClient{ + context: context, + ids: idsGenerator, + } +} + +func (c *StreamClient) client() sdk.Streams { + return c.context.client.Streams +} + +func (c *StreamClient) CreateOnTable(t *testing.T, tableId sdk.SchemaObjectIdentifier) (*sdk.Stream, func()) { + t.Helper() + + return c.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(c.ids.RandomSchemaObjectIdentifier(), tableId)) +} + +func (c *StreamClient) CreateOnTableWithRequest(t *testing.T, req *sdk.CreateOnTableStreamRequest) (*sdk.Stream, func()) { + t.Helper() + ctx := context.Background() + + err := c.client().CreateOnTable(ctx, req) + require.NoError(t, err) + + stream, err := c.client().ShowByID(ctx, req.GetName()) + require.NoError(t, err) + + return stream, c.DropFunc(t, req.GetName()) +} + +func (c *StreamClient) Update(t *testing.T, request *sdk.AlterStreamRequest) { + t.Helper() + ctx := context.Background() + + err := c.client().Alter(ctx, request) + require.NoError(t, err) +} + +func (c *StreamClient) DropFunc(t *testing.T, id sdk.SchemaObjectIdentifier) func() { + t.Helper() + ctx := context.Background() + + return func() { + err := c.client().Drop(ctx, sdk.NewDropStreamRequest(id).WithIfExists(true)) + require.NoError(t, err) + } +} + +func (c *StreamClient) Show(t *testing.T, id sdk.SchemaObjectIdentifier) (*sdk.Stream, error) { + t.Helper() + ctx := context.Background() + + return c.client().ShowByID(ctx, id) +} diff --git a/pkg/acceptance/helpers/streamlit.go b/pkg/acceptance/helpers/streamlit_client.go similarity index 100% rename from pkg/acceptance/helpers/streamlit.go rename to pkg/acceptance/helpers/streamlit_client.go diff --git a/pkg/acceptance/helpers/table_client.go b/pkg/acceptance/helpers/table_client.go index 7daab72be8..273fbe2e57 100644 --- a/pkg/acceptance/helpers/table_client.go +++ b/pkg/acceptance/helpers/table_client.go @@ -27,36 +27,36 @@ func (c *TableClient) client() sdk.Tables { return c.context.client.Tables } -func (c *TableClient) CreateTable(t *testing.T) (*sdk.Table, func()) { +func (c *TableClient) Create(t *testing.T) (*sdk.Table, func()) { t.Helper() - return c.CreateTableInSchema(t, c.ids.SchemaId()) + return c.CreateInSchema(t, c.ids.SchemaId()) } -func (c *TableClient) CreateTableWithName(t *testing.T, name string) (*sdk.Table, func()) { +func (c *TableClient) CreateWithName(t *testing.T, name string) (*sdk.Table, func()) { t.Helper() columns := []sdk.TableColumnRequest{ *sdk.NewTableColumnRequest("id", sdk.DataTypeNumber), } - return c.CreateTableWithIdAndColumns(t, c.ids.NewSchemaObjectIdentifier(name), columns) + return c.CreateWithRequest(t, sdk.NewCreateTableRequest(c.ids.NewSchemaObjectIdentifier(name), columns)) } -func (c *TableClient) CreateTableInSchema(t *testing.T, schemaId sdk.DatabaseObjectIdentifier) (*sdk.Table, func()) { +func (c *TableClient) CreateInSchema(t *testing.T, schemaId sdk.DatabaseObjectIdentifier) (*sdk.Table, func()) { t.Helper() columns := []sdk.TableColumnRequest{ *sdk.NewTableColumnRequest("id", sdk.DataTypeNumber), } - return c.CreateTableWithIdAndColumns(t, c.ids.RandomSchemaObjectIdentifierInSchema(schemaId), columns) + return c.CreateWithRequest(t, sdk.NewCreateTableRequest(c.ids.RandomSchemaObjectIdentifierInSchema(schemaId), columns)) } -func (c *TableClient) CreateTableWithColumns(t *testing.T, columns []sdk.TableColumnRequest) (*sdk.Table, func()) { +func (c *TableClient) CreateWithColumns(t *testing.T, columns []sdk.TableColumnRequest) (*sdk.Table, func()) { t.Helper() - return c.CreateTableWithIdAndColumns(t, c.ids.RandomSchemaObjectIdentifier(), columns) + return c.CreateWithRequest(t, sdk.NewCreateTableRequest(c.ids.RandomSchemaObjectIdentifier(), columns)) } -func (c *TableClient) CreateTableWithPredefinedColumns(t *testing.T) (*sdk.Table, func()) { +func (c *TableClient) CreateWithPredefinedColumns(t *testing.T) (*sdk.Table, func()) { t.Helper() columns := []sdk.TableColumnRequest{ @@ -65,24 +65,33 @@ func (c *TableClient) CreateTableWithPredefinedColumns(t *testing.T) (*sdk.Table *sdk.NewTableColumnRequest("some_other_text_column", "VARCHAR"), } - return c.CreateTableWithIdAndColumns(t, c.ids.RandomSchemaObjectIdentifier(), columns) + return c.CreateWithRequest(t, sdk.NewCreateTableRequest(c.ids.RandomSchemaObjectIdentifier(), columns)) } -func (c *TableClient) CreateTableWithIdAndColumns(t *testing.T, id sdk.SchemaObjectIdentifier, columns []sdk.TableColumnRequest) (*sdk.Table, func()) { +func (c *TableClient) CreateWithChangeTracking(t *testing.T) (*sdk.Table, func()) { + t.Helper() + + columns := []sdk.TableColumnRequest{ + *sdk.NewTableColumnRequest("id", "NUMBER"), + } + + return c.CreateWithRequest(t, sdk.NewCreateTableRequest(c.ids.RandomSchemaObjectIdentifier(), columns).WithChangeTracking(sdk.Pointer(true))) +} + +func (c *TableClient) CreateWithRequest(t *testing.T, req *sdk.CreateTableRequest) (*sdk.Table, func()) { t.Helper() ctx := context.Background() - dbCreateRequest := sdk.NewCreateTableRequest(id, columns) - err := c.client().Create(ctx, dbCreateRequest) + err := c.client().Create(ctx, req) require.NoError(t, err) - table, err := c.client().ShowByID(ctx, id) + table, err := c.client().ShowByID(ctx, req.GetName()) require.NoError(t, err) - return table, c.DropTableFunc(t, id) + return table, c.DropFunc(t, req.GetName()) } -func (c *TableClient) DropTableFunc(t *testing.T, id sdk.SchemaObjectIdentifier) func() { +func (c *TableClient) DropFunc(t *testing.T, id sdk.SchemaObjectIdentifier) func() { t.Helper() ctx := context.Background() diff --git a/pkg/acceptance/helpers/test_client.go b/pkg/acceptance/helpers/test_client.go index 975c74a061..90fb4f7e8f 100644 --- a/pkg/acceptance/helpers/test_client.go +++ b/pkg/acceptance/helpers/test_client.go @@ -51,6 +51,7 @@ type TestClient struct { SessionPolicy *SessionPolicyClient Share *ShareClient Stage *StageClient + Stream *StreamClient Streamlit *StreamlitClient Table *TableClient Tag *TagClient @@ -115,6 +116,7 @@ func NewTestClient(c *sdk.Client, database string, schema string, warehouse stri SecurityIntegration: NewSecurityIntegrationClient(context, idsGenerator), SessionPolicy: NewSessionPolicyClient(context, idsGenerator), Share: NewShareClient(context, idsGenerator), + Stream: NewStreamClient(context, idsGenerator), Streamlit: NewStreamlitClient(context, idsGenerator), Stage: NewStageClient(context, idsGenerator), Table: NewTableClient(context, idsGenerator), diff --git a/pkg/datasources/streams.go b/pkg/datasources/streams.go index 23f3ddbc22..cde2bcbcda 100644 --- a/pkg/datasources/streams.go +++ b/pkg/datasources/streams.go @@ -69,8 +69,10 @@ func ReadStreams(d *schema.ResourceData, meta interface{}) error { schemaName := d.Get("schema").(string) currentStreams, err := client.Streams.Show(ctx, sdk.NewShowStreamRequest(). - WithIn(&sdk.In{ - Schema: sdk.NewDatabaseObjectIdentifier(databaseName, schemaName), + WithIn(sdk.ExtendedIn{ + In: sdk.In{ + Schema: sdk.NewDatabaseObjectIdentifier(databaseName, schemaName), + }, })) if err != nil { log.Printf("[DEBUG] streams in schema (%s) not found", d.Id()) diff --git a/pkg/resources/stream.go b/pkg/resources/stream.go index 8af0f25be8..7a11058005 100644 --- a/pkg/resources/stream.go +++ b/pkg/resources/stream.go @@ -142,27 +142,27 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error { } if table.IsExternal { - req := sdk.NewCreateStreamOnExternalTableRequest(id, tableId) + req := sdk.NewCreateOnExternalTableStreamRequest(id, tableId) if insertOnly { - req.WithInsertOnly(sdk.Bool(true)) + req.WithInsertOnly(true) } if v, ok := d.GetOk("comment"); ok { - req.WithComment(sdk.String(v.(string))) + req.WithComment(v.(string)) } err := client.Streams.CreateOnExternalTable(ctx, req) if err != nil { return fmt.Errorf("error creating stream %v err = %w", name, err) } } else { - req := sdk.NewCreateStreamOnTableRequest(id, tableId) + req := sdk.NewCreateOnTableStreamRequest(id, tableId) if appendOnly { - req.WithAppendOnly(sdk.Bool(true)) + req.WithAppendOnly(true) } if showInitialRows { - req.WithShowInitialRows(sdk.Bool(true)) + req.WithShowInitialRows(true) } if v, ok := d.GetOk("comment"); ok { - req.WithComment(sdk.String(v.(string))) + req.WithComment(v.(string)) } err := client.Streams.CreateOnTable(ctx, req) if err != nil { @@ -181,15 +181,15 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error { return err } - req := sdk.NewCreateStreamOnViewRequest(id, viewId) + req := sdk.NewCreateOnViewStreamRequest(id, viewId) if appendOnly { - req.WithAppendOnly(sdk.Bool(true)) + req.WithAppendOnly(true) } if showInitialRows { - req.WithShowInitialRows(sdk.Bool(true)) + req.WithShowInitialRows(true) } if v, ok := d.GetOk("comment"); ok { - req.WithComment(sdk.String(v.(string))) + req.WithComment(v.(string)) } err = client.Streams.CreateOnView(ctx, req) if err != nil { @@ -208,9 +208,9 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error { if findStagePropertyValueByName(stageProperties, "ENABLE") != "true" { return fmt.Errorf("directory must be enabled on stage") } - req := sdk.NewCreateStreamOnDirectoryTableRequest(id, stageId) + req := sdk.NewCreateOnDirectoryTableStreamRequest(id, stageId) if v, ok := d.GetOk("comment"); ok { - req.WithComment(sdk.String(v.(string))) + req.WithComment(v.(string)) } err = client.Streams.CreateOnDirectoryTable(ctx, req) if err != nil { @@ -290,12 +290,12 @@ func UpdateStream(d *schema.ResourceData, meta interface{}) error { if d.HasChange("comment") { comment := d.Get("comment").(string) if comment == "" { - err := client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithUnsetComment(sdk.Bool(true))) + err := client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithUnsetComment(true)) if err != nil { return fmt.Errorf("error unsetting stream comment on %v", d.Id()) } } else { - err := client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithSetComment(sdk.String(comment))) + err := client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithSetComment(comment)) if err != nil { return fmt.Errorf("error setting stream comment on %v", d.Id()) } diff --git a/pkg/resources/view_acceptance_test.go b/pkg/resources/view_acceptance_test.go index e643348cd1..52b0780ccb 100644 --- a/pkg/resources/view_acceptance_test.go +++ b/pkg/resources/view_acceptance_test.go @@ -54,7 +54,7 @@ func TestAcc_View_basic(t *testing.T) { id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() resourceId := helpers.EncodeResourceIdentifier(id) - table, tableCleanup := acc.TestClient().Table.CreateTableWithColumns(t, []sdk.TableColumnRequest{ + table, tableCleanup := acc.TestClient().Table.CreateWithColumns(t, []sdk.TableColumnRequest{ *sdk.NewTableColumnRequest("id", sdk.DataTypeNumber), *sdk.NewTableColumnRequest("foo", sdk.DataTypeNumber), }) @@ -505,7 +505,7 @@ func TestAcc_View_complete(t *testing.T) { acc.TestAccPreCheck(t) id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() resourceId := helpers.EncodeResourceIdentifier(id) - table, tableCleanup := acc.TestClient().Table.CreateTableWithColumns(t, []sdk.TableColumnRequest{ + table, tableCleanup := acc.TestClient().Table.CreateWithColumns(t, []sdk.TableColumnRequest{ *sdk.NewTableColumnRequest("id", sdk.DataTypeNumber), *sdk.NewTableColumnRequest("foo", sdk.DataTypeNumber), }) @@ -666,7 +666,7 @@ func TestAcc_View_columns(t *testing.T) { acc.TestAccPreCheck(t) id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() - table, tableCleanup := acc.TestClient().Table.CreateTableWithColumns(t, []sdk.TableColumnRequest{ + table, tableCleanup := acc.TestClient().Table.CreateWithColumns(t, []sdk.TableColumnRequest{ *sdk.NewTableColumnRequest("id", sdk.DataTypeNumber), *sdk.NewTableColumnRequest("foo", sdk.DataTypeNumber), *sdk.NewTableColumnRequest("bar", sdk.DataTypeNumber), diff --git a/pkg/schemas/stream_gen.go b/pkg/schemas/stream_gen.go index 21ad574b81..841f6fb79b 100644 --- a/pkg/schemas/stream_gen.go +++ b/pkg/schemas/stream_gen.go @@ -25,10 +25,6 @@ var ShowStreamSchema = map[string]*schema.Schema{ Type: schema.TypeString, Computed: true, }, - "table_on": { - Type: schema.TypeString, - Computed: true, - }, "owner": { Type: schema.TypeString, Computed: true, @@ -83,9 +79,6 @@ func StreamToSchema(stream *sdk.Stream) map[string]any { streamSchema["name"] = stream.Name streamSchema["database_name"] = stream.DatabaseName streamSchema["schema_name"] = stream.SchemaName - if stream.TableOn != nil { - streamSchema["table_on"] = stream.TableOn - } if stream.Owner != nil { streamSchema["owner"] = stream.Owner } diff --git a/pkg/sdk/context_functions.go b/pkg/sdk/context_functions.go index f853417fd3..1983189eb0 100644 --- a/pkg/sdk/context_functions.go +++ b/pkg/sdk/context_functions.go @@ -20,6 +20,7 @@ type ContextFunctions interface { CurrentSession(ctx context.Context) (string, error) CurrentUser(ctx context.Context) (AccountObjectIdentifier, error) CurrentSessionDetails(ctx context.Context) (*CurrentSessionDetails, error) + LastQueryId(ctx context.Context) (string, error) // Session Object functions. CurrentDatabase(ctx context.Context) (string, error) @@ -257,3 +258,17 @@ func (c *contextFunctions) IsRoleInSession(ctx context.Context, role AccountObje } return s.IsRoleInSession, nil } + +func (c *contextFunctions) LastQueryId(ctx context.Context) (string, error) { + s := &struct { + LastQueryId sql.NullString `db:"LAST_QUERY_ID"` + }{} + err := c.client.queryOne(ctx, s, "SELECT LAST_QUERY_ID() as LAST_QUERY_ID") + if err != nil { + return "", err + } + if !s.LastQueryId.Valid { + return "", nil + } + return s.LastQueryId.String, nil +} diff --git a/pkg/sdk/external_tables_dto.go b/pkg/sdk/external_tables_dto.go index 77a5caaa36..bdbeaf6f81 100644 --- a/pkg/sdk/external_tables_dto.go +++ b/pkg/sdk/external_tables_dto.go @@ -35,6 +35,10 @@ type CreateExternalTableRequest struct { tag []*TagAssociationRequest } +func (r *CreateExternalTableRequest) GetName() SchemaObjectIdentifier { + return r.name +} + func (s *CreateExternalTableRequest) GetColumns() []*ExternalTableColumnRequest { return s.columns } diff --git a/pkg/sdk/streams_def.go b/pkg/sdk/streams_def.go index 135ecbc767..535a4ac5db 100644 --- a/pkg/sdk/streams_def.go +++ b/pkg/sdk/streams_def.go @@ -11,9 +11,9 @@ var ( QueryStructField( "Statement", g.NewQueryStruct("OnStreamStatement"). - OptionalTextAssignment("TIMESTAMP", g.ParameterOptions().ArrowEquals()). + OptionalTextAssignment("TIMESTAMP", g.ParameterOptions().ArrowEquals().SingleQuotes()). OptionalTextAssignment("OFFSET", g.ParameterOptions().ArrowEquals()). - OptionalTextAssignment("STATEMENT", g.ParameterOptions().ArrowEquals()). + OptionalTextAssignment("STATEMENT", g.ParameterOptions().ArrowEquals().SingleQuotes()). OptionalTextAssignment("STREAM", g.ParameterOptions().ArrowEquals().SingleQuotes()). WithValidation(g.ExactlyOneValueSet, "Timestamp", "Offset", "Statement", "Stream"), g.ListOptions().Parentheses(), @@ -25,7 +25,6 @@ var ( Field("name", "string"). Field("database_name", "string"). Field("schema_name", "string"). - Field("tableOn", "sql.NullString"). Field("owner", "sql.NullString"). Field("comment", "sql.NullString"). Field("table_name", "sql.NullString"). @@ -43,7 +42,6 @@ var ( Field("Name", "string"). Field("DatabaseName", "string"). Field("SchemaName", "string"). - Field("TableOn", "*string"). Field("Owner", "*string"). Field("Comment", "*string"). Field("TableName", "*string"). @@ -70,6 +68,7 @@ var ( SQL("STREAM"). IfNotExists(). Name(). + OptionalTags(). OptionalCopyGrants(). SQL("ON TABLE"). Identifier("TableId", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Required()). @@ -90,6 +89,7 @@ var ( SQL("STREAM"). IfNotExists(). Name(). + OptionalTags(). OptionalCopyGrants(). SQL("ON EXTERNAL TABLE"). Identifier("ExternalTableId", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Required()). @@ -109,6 +109,7 @@ var ( SQL("STREAM"). IfNotExists(). Name(). + OptionalTags(). OptionalCopyGrants(). SQL("ON STAGE"). Identifier("StageId", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Required()). @@ -126,6 +127,7 @@ var ( SQL("STREAM"). IfNotExists(). Name(). + OptionalTags(). OptionalCopyGrants(). SQL("ON VIEW"). Identifier("ViewId", g.KindOfT[SchemaObjectIdentifier](), g.IdentifierOptions().Required()). @@ -182,7 +184,7 @@ var ( Terse(). SQL("STREAMS"). OptionalLike(). - OptionalIn(). + OptionalExtendedIn(). OptionalStartsWith(). OptionalLimit(), ). diff --git a/pkg/sdk/streams_dto_builders_gen.go b/pkg/sdk/streams_dto_builders_gen.go index 7e32402fab..9c58c840ed 100644 --- a/pkg/sdk/streams_dto_builders_gen.go +++ b/pkg/sdk/streams_dto_builders_gen.go @@ -2,7 +2,9 @@ package sdk -func NewCreateStreamOnTableRequest( +import () + +func NewCreateOnTableStreamRequest( name SchemaObjectIdentifier, TableId SchemaObjectIdentifier, ) *CreateOnTableStreamRequest { @@ -12,38 +14,43 @@ func NewCreateStreamOnTableRequest( return &s } -func (s *CreateOnTableStreamRequest) WithOrReplace(OrReplace *bool) *CreateOnTableStreamRequest { - s.OrReplace = OrReplace +func (s *CreateOnTableStreamRequest) WithOrReplace(OrReplace bool) *CreateOnTableStreamRequest { + s.OrReplace = &OrReplace + return s +} + +func (s *CreateOnTableStreamRequest) WithIfNotExists(IfNotExists bool) *CreateOnTableStreamRequest { + s.IfNotExists = &IfNotExists return s } -func (s *CreateOnTableStreamRequest) WithIfNotExists(IfNotExists *bool) *CreateOnTableStreamRequest { - s.IfNotExists = IfNotExists +func (s *CreateOnTableStreamRequest) WithTag(Tag []TagAssociation) *CreateOnTableStreamRequest { + s.Tag = Tag return s } -func (s *CreateOnTableStreamRequest) WithCopyGrants(CopyGrants *bool) *CreateOnTableStreamRequest { - s.CopyGrants = CopyGrants +func (s *CreateOnTableStreamRequest) WithCopyGrants(CopyGrants bool) *CreateOnTableStreamRequest { + s.CopyGrants = &CopyGrants return s } -func (s *CreateOnTableStreamRequest) WithOn(On *OnStreamRequest) *CreateOnTableStreamRequest { - s.On = On +func (s *CreateOnTableStreamRequest) WithOn(On OnStreamRequest) *CreateOnTableStreamRequest { + s.On = &On return s } -func (s *CreateOnTableStreamRequest) WithAppendOnly(AppendOnly *bool) *CreateOnTableStreamRequest { - s.AppendOnly = AppendOnly +func (s *CreateOnTableStreamRequest) WithAppendOnly(AppendOnly bool) *CreateOnTableStreamRequest { + s.AppendOnly = &AppendOnly return s } -func (s *CreateOnTableStreamRequest) WithShowInitialRows(ShowInitialRows *bool) *CreateOnTableStreamRequest { - s.ShowInitialRows = ShowInitialRows +func (s *CreateOnTableStreamRequest) WithShowInitialRows(ShowInitialRows bool) *CreateOnTableStreamRequest { + s.ShowInitialRows = &ShowInitialRows return s } -func (s *CreateOnTableStreamRequest) WithComment(Comment *string) *CreateOnTableStreamRequest { - s.Comment = Comment +func (s *CreateOnTableStreamRequest) WithComment(Comment string) *CreateOnTableStreamRequest { + s.Comment = &Comment return s } @@ -51,13 +58,13 @@ func NewOnStreamRequest() *OnStreamRequest { return &OnStreamRequest{} } -func (s *OnStreamRequest) WithAt(At *bool) *OnStreamRequest { - s.At = At +func (s *OnStreamRequest) WithAt(At bool) *OnStreamRequest { + s.At = &At return s } -func (s *OnStreamRequest) WithBefore(Before *bool) *OnStreamRequest { - s.Before = Before +func (s *OnStreamRequest) WithBefore(Before bool) *OnStreamRequest { + s.Before = &Before return s } @@ -70,27 +77,27 @@ func NewOnStreamStatementRequest() *OnStreamStatementRequest { return &OnStreamStatementRequest{} } -func (s *OnStreamStatementRequest) WithTimestamp(Timestamp *string) *OnStreamStatementRequest { - s.Timestamp = Timestamp +func (s *OnStreamStatementRequest) WithTimestamp(Timestamp string) *OnStreamStatementRequest { + s.Timestamp = &Timestamp return s } -func (s *OnStreamStatementRequest) WithOffset(Offset *string) *OnStreamStatementRequest { - s.Offset = Offset +func (s *OnStreamStatementRequest) WithOffset(Offset string) *OnStreamStatementRequest { + s.Offset = &Offset return s } -func (s *OnStreamStatementRequest) WithStatement(Statement *string) *OnStreamStatementRequest { - s.Statement = Statement +func (s *OnStreamStatementRequest) WithStatement(Statement string) *OnStreamStatementRequest { + s.Statement = &Statement return s } -func (s *OnStreamStatementRequest) WithStream(Stream *string) *OnStreamStatementRequest { - s.Stream = Stream +func (s *OnStreamStatementRequest) WithStream(Stream string) *OnStreamStatementRequest { + s.Stream = &Stream return s } -func NewCreateStreamOnExternalTableRequest( +func NewCreateOnExternalTableStreamRequest( name SchemaObjectIdentifier, ExternalTableId SchemaObjectIdentifier, ) *CreateOnExternalTableStreamRequest { @@ -100,37 +107,42 @@ func NewCreateStreamOnExternalTableRequest( return &s } -func (s *CreateOnExternalTableStreamRequest) WithOrReplace(OrReplace *bool) *CreateOnExternalTableStreamRequest { - s.OrReplace = OrReplace +func (s *CreateOnExternalTableStreamRequest) WithOrReplace(OrReplace bool) *CreateOnExternalTableStreamRequest { + s.OrReplace = &OrReplace + return s +} + +func (s *CreateOnExternalTableStreamRequest) WithIfNotExists(IfNotExists bool) *CreateOnExternalTableStreamRequest { + s.IfNotExists = &IfNotExists return s } -func (s *CreateOnExternalTableStreamRequest) WithIfNotExists(IfNotExists *bool) *CreateOnExternalTableStreamRequest { - s.IfNotExists = IfNotExists +func (s *CreateOnExternalTableStreamRequest) WithTag(Tag []TagAssociation) *CreateOnExternalTableStreamRequest { + s.Tag = Tag return s } -func (s *CreateOnExternalTableStreamRequest) WithCopyGrants(CopyGrants *bool) *CreateOnExternalTableStreamRequest { - s.CopyGrants = CopyGrants +func (s *CreateOnExternalTableStreamRequest) WithCopyGrants(CopyGrants bool) *CreateOnExternalTableStreamRequest { + s.CopyGrants = &CopyGrants return s } -func (s *CreateOnExternalTableStreamRequest) WithOn(On *OnStreamRequest) *CreateOnExternalTableStreamRequest { - s.On = On +func (s *CreateOnExternalTableStreamRequest) WithOn(On OnStreamRequest) *CreateOnExternalTableStreamRequest { + s.On = &On return s } -func (s *CreateOnExternalTableStreamRequest) WithInsertOnly(InsertOnly *bool) *CreateOnExternalTableStreamRequest { - s.InsertOnly = InsertOnly +func (s *CreateOnExternalTableStreamRequest) WithInsertOnly(InsertOnly bool) *CreateOnExternalTableStreamRequest { + s.InsertOnly = &InsertOnly return s } -func (s *CreateOnExternalTableStreamRequest) WithComment(Comment *string) *CreateOnExternalTableStreamRequest { - s.Comment = Comment +func (s *CreateOnExternalTableStreamRequest) WithComment(Comment string) *CreateOnExternalTableStreamRequest { + s.Comment = &Comment return s } -func NewCreateStreamOnDirectoryTableRequest( +func NewCreateOnDirectoryTableStreamRequest( name SchemaObjectIdentifier, StageId SchemaObjectIdentifier, ) *CreateOnDirectoryTableStreamRequest { @@ -140,27 +152,32 @@ func NewCreateStreamOnDirectoryTableRequest( return &s } -func (s *CreateOnDirectoryTableStreamRequest) WithOrReplace(OrReplace *bool) *CreateOnDirectoryTableStreamRequest { - s.OrReplace = OrReplace +func (s *CreateOnDirectoryTableStreamRequest) WithOrReplace(OrReplace bool) *CreateOnDirectoryTableStreamRequest { + s.OrReplace = &OrReplace + return s +} + +func (s *CreateOnDirectoryTableStreamRequest) WithIfNotExists(IfNotExists bool) *CreateOnDirectoryTableStreamRequest { + s.IfNotExists = &IfNotExists return s } -func (s *CreateOnDirectoryTableStreamRequest) WithIfNotExists(IfNotExists *bool) *CreateOnDirectoryTableStreamRequest { - s.IfNotExists = IfNotExists +func (s *CreateOnDirectoryTableStreamRequest) WithTag(Tag []TagAssociation) *CreateOnDirectoryTableStreamRequest { + s.Tag = Tag return s } -func (s *CreateOnDirectoryTableStreamRequest) WithCopyGrants(CopyGrants *bool) *CreateOnDirectoryTableStreamRequest { - s.CopyGrants = CopyGrants +func (s *CreateOnDirectoryTableStreamRequest) WithCopyGrants(CopyGrants bool) *CreateOnDirectoryTableStreamRequest { + s.CopyGrants = &CopyGrants return s } -func (s *CreateOnDirectoryTableStreamRequest) WithComment(Comment *string) *CreateOnDirectoryTableStreamRequest { - s.Comment = Comment +func (s *CreateOnDirectoryTableStreamRequest) WithComment(Comment string) *CreateOnDirectoryTableStreamRequest { + s.Comment = &Comment return s } -func NewCreateStreamOnViewRequest( +func NewCreateOnViewStreamRequest( name SchemaObjectIdentifier, ViewId SchemaObjectIdentifier, ) *CreateOnViewStreamRequest { @@ -170,38 +187,43 @@ func NewCreateStreamOnViewRequest( return &s } -func (s *CreateOnViewStreamRequest) WithOrReplace(OrReplace *bool) *CreateOnViewStreamRequest { - s.OrReplace = OrReplace +func (s *CreateOnViewStreamRequest) WithOrReplace(OrReplace bool) *CreateOnViewStreamRequest { + s.OrReplace = &OrReplace + return s +} + +func (s *CreateOnViewStreamRequest) WithIfNotExists(IfNotExists bool) *CreateOnViewStreamRequest { + s.IfNotExists = &IfNotExists return s } -func (s *CreateOnViewStreamRequest) WithIfNotExists(IfNotExists *bool) *CreateOnViewStreamRequest { - s.IfNotExists = IfNotExists +func (s *CreateOnViewStreamRequest) WithTag(Tag []TagAssociation) *CreateOnViewStreamRequest { + s.Tag = Tag return s } -func (s *CreateOnViewStreamRequest) WithCopyGrants(CopyGrants *bool) *CreateOnViewStreamRequest { - s.CopyGrants = CopyGrants +func (s *CreateOnViewStreamRequest) WithCopyGrants(CopyGrants bool) *CreateOnViewStreamRequest { + s.CopyGrants = &CopyGrants return s } -func (s *CreateOnViewStreamRequest) WithOn(On *OnStreamRequest) *CreateOnViewStreamRequest { - s.On = On +func (s *CreateOnViewStreamRequest) WithOn(On OnStreamRequest) *CreateOnViewStreamRequest { + s.On = &On return s } -func (s *CreateOnViewStreamRequest) WithAppendOnly(AppendOnly *bool) *CreateOnViewStreamRequest { - s.AppendOnly = AppendOnly +func (s *CreateOnViewStreamRequest) WithAppendOnly(AppendOnly bool) *CreateOnViewStreamRequest { + s.AppendOnly = &AppendOnly return s } -func (s *CreateOnViewStreamRequest) WithShowInitialRows(ShowInitialRows *bool) *CreateOnViewStreamRequest { - s.ShowInitialRows = ShowInitialRows +func (s *CreateOnViewStreamRequest) WithShowInitialRows(ShowInitialRows bool) *CreateOnViewStreamRequest { + s.ShowInitialRows = &ShowInitialRows return s } -func (s *CreateOnViewStreamRequest) WithComment(Comment *string) *CreateOnViewStreamRequest { - s.Comment = Comment +func (s *CreateOnViewStreamRequest) WithComment(Comment string) *CreateOnViewStreamRequest { + s.Comment = &Comment return s } @@ -215,13 +237,13 @@ func NewCloneStreamRequest( return &s } -func (s *CloneStreamRequest) WithOrReplace(OrReplace *bool) *CloneStreamRequest { - s.OrReplace = OrReplace +func (s *CloneStreamRequest) WithOrReplace(OrReplace bool) *CloneStreamRequest { + s.OrReplace = &OrReplace return s } -func (s *CloneStreamRequest) WithCopyGrants(CopyGrants *bool) *CloneStreamRequest { - s.CopyGrants = CopyGrants +func (s *CloneStreamRequest) WithCopyGrants(CopyGrants bool) *CloneStreamRequest { + s.CopyGrants = &CopyGrants return s } @@ -233,18 +255,18 @@ func NewAlterStreamRequest( return &s } -func (s *AlterStreamRequest) WithIfExists(IfExists *bool) *AlterStreamRequest { - s.IfExists = IfExists +func (s *AlterStreamRequest) WithIfExists(IfExists bool) *AlterStreamRequest { + s.IfExists = &IfExists return s } -func (s *AlterStreamRequest) WithSetComment(SetComment *string) *AlterStreamRequest { - s.SetComment = SetComment +func (s *AlterStreamRequest) WithSetComment(SetComment string) *AlterStreamRequest { + s.SetComment = &SetComment return s } -func (s *AlterStreamRequest) WithUnsetComment(UnsetComment *bool) *AlterStreamRequest { - s.UnsetComment = UnsetComment +func (s *AlterStreamRequest) WithUnsetComment(UnsetComment bool) *AlterStreamRequest { + s.UnsetComment = &UnsetComment return s } @@ -266,8 +288,8 @@ func NewDropStreamRequest( return &s } -func (s *DropStreamRequest) WithIfExists(IfExists *bool) *DropStreamRequest { - s.IfExists = IfExists +func (s *DropStreamRequest) WithIfExists(IfExists bool) *DropStreamRequest { + s.IfExists = &IfExists return s } @@ -275,28 +297,28 @@ func NewShowStreamRequest() *ShowStreamRequest { return &ShowStreamRequest{} } -func (s *ShowStreamRequest) WithTerse(Terse *bool) *ShowStreamRequest { - s.Terse = Terse +func (s *ShowStreamRequest) WithTerse(Terse bool) *ShowStreamRequest { + s.Terse = &Terse return s } -func (s *ShowStreamRequest) WithLike(Like *Like) *ShowStreamRequest { - s.Like = Like +func (s *ShowStreamRequest) WithLike(Like Like) *ShowStreamRequest { + s.Like = &Like return s } -func (s *ShowStreamRequest) WithIn(In *In) *ShowStreamRequest { - s.In = In +func (s *ShowStreamRequest) WithIn(In ExtendedIn) *ShowStreamRequest { + s.In = &In return s } -func (s *ShowStreamRequest) WithStartsWith(StartsWith *string) *ShowStreamRequest { - s.StartsWith = StartsWith +func (s *ShowStreamRequest) WithStartsWith(StartsWith string) *ShowStreamRequest { + s.StartsWith = &StartsWith return s } -func (s *ShowStreamRequest) WithLimit(Limit *LimitFrom) *ShowStreamRequest { - s.Limit = Limit +func (s *ShowStreamRequest) WithLimit(Limit LimitFrom) *ShowStreamRequest { + s.Limit = &Limit return s } diff --git a/pkg/sdk/streams_dto_gen.go b/pkg/sdk/streams_dto_gen.go index ad0a868b05..25201a7d11 100644 --- a/pkg/sdk/streams_dto_gen.go +++ b/pkg/sdk/streams_dto_gen.go @@ -18,6 +18,7 @@ type CreateOnTableStreamRequest struct { OrReplace *bool IfNotExists *bool name SchemaObjectIdentifier // required + Tag []TagAssociation CopyGrants *bool TableId SchemaObjectIdentifier // required On *OnStreamRequest @@ -26,6 +27,10 @@ type CreateOnTableStreamRequest struct { Comment *string } +func (r *CreateOnTableStreamRequest) GetName() SchemaObjectIdentifier { + return r.name +} + type OnStreamRequest struct { At *bool Before *bool @@ -43,6 +48,7 @@ type CreateOnExternalTableStreamRequest struct { OrReplace *bool IfNotExists *bool name SchemaObjectIdentifier // required + Tag []TagAssociation CopyGrants *bool ExternalTableId SchemaObjectIdentifier // required On *OnStreamRequest @@ -54,6 +60,7 @@ type CreateOnDirectoryTableStreamRequest struct { OrReplace *bool IfNotExists *bool name SchemaObjectIdentifier // required + Tag []TagAssociation CopyGrants *bool StageId SchemaObjectIdentifier // required Comment *string @@ -63,6 +70,7 @@ type CreateOnViewStreamRequest struct { OrReplace *bool IfNotExists *bool name SchemaObjectIdentifier // required + Tag []TagAssociation CopyGrants *bool ViewId SchemaObjectIdentifier // required On *OnStreamRequest @@ -95,7 +103,7 @@ type DropStreamRequest struct { type ShowStreamRequest struct { Terse *bool Like *Like - In *In + In *ExtendedIn StartsWith *string Limit *LimitFrom } diff --git a/pkg/sdk/streams_gen.go b/pkg/sdk/streams_gen.go index 301c9afeed..0471d47595 100644 --- a/pkg/sdk/streams_gen.go +++ b/pkg/sdk/streams_gen.go @@ -16,7 +16,7 @@ type Streams interface { Drop(ctx context.Context, request *DropStreamRequest) error Show(ctx context.Context, request *ShowStreamRequest) ([]Stream, error) ShowByID(ctx context.Context, id SchemaObjectIdentifier) (*Stream, error) - Describe(ctx context.Context, request *DescribeStreamRequest) (*Stream, error) + Describe(ctx context.Context, id SchemaObjectIdentifier) (*Stream, error) } // CreateOnTableStreamOptions is based on https://docs.snowflake.com/en/sql-reference/sql/create-stream. @@ -26,6 +26,7 @@ type CreateOnTableStreamOptions struct { stream bool `ddl:"static" sql:"STREAM"` IfNotExists *bool `ddl:"keyword" sql:"IF NOT EXISTS"` name SchemaObjectIdentifier `ddl:"identifier"` + Tag []TagAssociation `ddl:"keyword,parentheses" sql:"TAG"` CopyGrants *bool `ddl:"keyword" sql:"COPY GRANTS"` onTable bool `ddl:"static" sql:"ON TABLE"` TableId SchemaObjectIdentifier `ddl:"identifier"` @@ -42,9 +43,9 @@ type OnStream struct { } type OnStreamStatement struct { - Timestamp *string `ddl:"parameter,arrow_equals" sql:"TIMESTAMP"` + Timestamp *string `ddl:"parameter,single_quotes,arrow_equals" sql:"TIMESTAMP"` Offset *string `ddl:"parameter,arrow_equals" sql:"OFFSET"` - Statement *string `ddl:"parameter,arrow_equals" sql:"STATEMENT"` + Statement *string `ddl:"parameter,single_quotes,arrow_equals" sql:"STATEMENT"` Stream *string `ddl:"parameter,single_quotes,arrow_equals" sql:"STREAM"` } @@ -55,6 +56,7 @@ type CreateOnExternalTableStreamOptions struct { stream bool `ddl:"static" sql:"STREAM"` IfNotExists *bool `ddl:"keyword" sql:"IF NOT EXISTS"` name SchemaObjectIdentifier `ddl:"identifier"` + Tag []TagAssociation `ddl:"keyword,parentheses" sql:"TAG"` CopyGrants *bool `ddl:"keyword" sql:"COPY GRANTS"` onExternalTable bool `ddl:"static" sql:"ON EXTERNAL TABLE"` ExternalTableId SchemaObjectIdentifier `ddl:"identifier"` @@ -70,6 +72,7 @@ type CreateOnDirectoryTableStreamOptions struct { stream bool `ddl:"static" sql:"STREAM"` IfNotExists *bool `ddl:"keyword" sql:"IF NOT EXISTS"` name SchemaObjectIdentifier `ddl:"identifier"` + Tag []TagAssociation `ddl:"keyword,parentheses" sql:"TAG"` CopyGrants *bool `ddl:"keyword" sql:"COPY GRANTS"` onStage bool `ddl:"static" sql:"ON STAGE"` StageId SchemaObjectIdentifier `ddl:"identifier"` @@ -83,6 +86,7 @@ type CreateOnViewStreamOptions struct { stream bool `ddl:"static" sql:"STREAM"` IfNotExists *bool `ddl:"keyword" sql:"IF NOT EXISTS"` name SchemaObjectIdentifier `ddl:"identifier"` + Tag []TagAssociation `ddl:"keyword,parentheses" sql:"TAG"` CopyGrants *bool `ddl:"keyword" sql:"COPY GRANTS"` onView bool `ddl:"static" sql:"ON VIEW"` ViewId SchemaObjectIdentifier `ddl:"identifier"` @@ -124,13 +128,13 @@ type DropStreamOptions struct { // ShowStreamOptions is based on https://docs.snowflake.com/en/sql-reference/sql/show-streams. type ShowStreamOptions struct { - show bool `ddl:"static" sql:"SHOW"` - Terse *bool `ddl:"keyword" sql:"TERSE"` - streams bool `ddl:"static" sql:"STREAMS"` - Like *Like `ddl:"keyword" sql:"LIKE"` - In *In `ddl:"keyword" sql:"IN"` - StartsWith *string `ddl:"parameter,no_equals,single_quotes" sql:"STARTS WITH"` - Limit *LimitFrom `ddl:"keyword" sql:"LIMIT"` + show bool `ddl:"static" sql:"SHOW"` + Terse *bool `ddl:"keyword" sql:"TERSE"` + streams bool `ddl:"static" sql:"STREAMS"` + Like *Like `ddl:"keyword" sql:"LIKE"` + In *ExtendedIn `ddl:"keyword" sql:"IN"` + StartsWith *string `ddl:"parameter,single_quotes,no_equals" sql:"STARTS WITH"` + Limit *LimitFrom `ddl:"keyword" sql:"LIMIT"` } type showStreamsDbRow struct { @@ -138,7 +142,6 @@ type showStreamsDbRow struct { Name string `db:"name"` DatabaseName string `db:"database_name"` SchemaName string `db:"schema_name"` - TableOn sql.NullString `db:"tableOn"` Owner sql.NullString `db:"owner"` Comment sql.NullString `db:"comment"` TableName sql.NullString `db:"table_name"` @@ -157,7 +160,6 @@ type Stream struct { Name string DatabaseName string SchemaName string - TableOn *string Owner *string Comment *string TableName *string diff --git a/pkg/sdk/streams_gen_test.go b/pkg/sdk/streams_gen_test.go index c06691129b..76963db251 100644 --- a/pkg/sdk/streams_gen_test.go +++ b/pkg/sdk/streams_gen_test.go @@ -64,6 +64,65 @@ func TestStreams_CreateOnTable(t *testing.T) { assertOptsValidAndSQLEquals(t, opts, "CREATE STREAM %s ON TABLE %s", id.FullyQualifiedName(), tableId.FullyQualifiedName()) }) + t.Run("at timestamp", func(t *testing.T) { + timestamp := "2024-09-25 06:16:10.359 -0700" + opts := defaultOpts() + opts.On = &OnStream{ + At: Bool(true), + Statement: OnStreamStatement{ + Timestamp: String(timestamp), + }, + } + assertOptsValidAndSQLEquals(t, opts, "CREATE STREAM %s ON TABLE %s AT (TIMESTAMP => '%s')", id.FullyQualifiedName(), tableId.FullyQualifiedName(), timestamp) + }) + + t.Run("at offset", func(t *testing.T) { + opts := defaultOpts() + opts.On = &OnStream{ + At: Bool(true), + Statement: OnStreamStatement{ + Offset: String("-10"), + }, + } + assertOptsValidAndSQLEquals(t, opts, "CREATE STREAM %s ON TABLE %s AT (OFFSET => -10)", id.FullyQualifiedName(), tableId.FullyQualifiedName()) + }) + + t.Run("at statement", func(t *testing.T) { + queryId := "0111447d-0905-8a5c-0062-f3820281547a" + opts := defaultOpts() + opts.On = &OnStream{ + At: Bool(true), + Statement: OnStreamStatement{ + Statement: String(queryId), + }, + } + assertOptsValidAndSQLEquals(t, opts, "CREATE STREAM %s ON TABLE %s AT (STATEMENT => '%s')", id.FullyQualifiedName(), tableId.FullyQualifiedName(), queryId) + }) + + t.Run("at stream", func(t *testing.T) { + streamId := randomSchemaObjectIdentifier() + opts := defaultOpts() + opts.On = &OnStream{ + At: Bool(true), + Statement: OnStreamStatement{ + Stream: String(streamId.FullyQualifiedName()), + }, + } + assertOptsValidAndSQLEquals(t, opts, "CREATE STREAM %s ON TABLE %s AT (STREAM => '%s')", id.FullyQualifiedName(), tableId.FullyQualifiedName(), temporaryReplace(streamId)) + }) + + t.Run("before timestamp", func(t *testing.T) { + timestamp := "2024-09-25 06:16:10.359 -0700" + opts := defaultOpts() + opts.On = &OnStream{ + Before: Bool(true), + Statement: OnStreamStatement{ + Timestamp: String(timestamp), + }, + } + assertOptsValidAndSQLEquals(t, opts, "CREATE STREAM %s ON TABLE %s BEFORE (TIMESTAMP => '%s')", id.FullyQualifiedName(), tableId.FullyQualifiedName(), timestamp) + }) + t.Run("all options", func(t *testing.T) { opts := defaultOpts() opts.OrReplace = Bool(true) @@ -153,7 +212,7 @@ func TestStreams_CreateOnExternalTable(t *testing.T) { } opts.InsertOnly = Bool(true) opts.Comment = String("some comment") - assertOptsValidAndSQLEquals(t, opts, `CREATE STREAM IF NOT EXISTS %s COPY GRANTS ON EXTERNAL TABLE %s AT (STATEMENT => 123) INSERT_ONLY = true COMMENT = 'some comment'`, id.FullyQualifiedName(), externalTableId.FullyQualifiedName()) + assertOptsValidAndSQLEquals(t, opts, `CREATE STREAM IF NOT EXISTS %s COPY GRANTS ON EXTERNAL TABLE %s AT (STATEMENT => '123') INSERT_ONLY = true COMMENT = 'some comment'`, id.FullyQualifiedName(), externalTableId.FullyQualifiedName()) }) } @@ -415,6 +474,11 @@ func TestStreams_Drop(t *testing.T) { assertOptsInvalidJoinedErrors(t, opts, ErrInvalidObjectIdentifier) }) + t.Run("basic", func(t *testing.T) { + opts := defaultOpts() + assertOptsValidAndSQLEquals(t, opts, `DROP STREAM %s`, id.FullyQualifiedName()) + }) + t.Run("all options", func(t *testing.T) { opts := defaultOpts() opts.IfExists = Bool(true) @@ -443,7 +507,7 @@ func TestStreams_Show(t *testing.T) { opts.Terse = Bool(true) opts.Like = &Like{Pattern: String("pattern")} schemaId := randomDatabaseObjectIdentifier() - opts.In = &In{Schema: schemaId} + opts.In = &ExtendedIn{In: In{Schema: schemaId}} opts.StartsWith = String("starts with pattern") opts.Limit = &LimitFrom{Rows: Int(123), From: String("from pattern")} assertOptsValidAndSQLEquals(t, opts, `SHOW TERSE STREAMS LIKE 'pattern' IN SCHEMA %s STARTS WITH 'starts with pattern' LIMIT 123 FROM 'from pattern'`, schemaId.FullyQualifiedName()) diff --git a/pkg/sdk/streams_impl_gen.go b/pkg/sdk/streams_impl_gen.go index 4b96f017ce..64b1b7e81a 100644 --- a/pkg/sdk/streams_impl_gen.go +++ b/pkg/sdk/streams_impl_gen.go @@ -59,19 +59,21 @@ func (v *streams) Show(ctx context.Context, request *ShowStreamRequest) ([]Strea func (v *streams) ShowByID(ctx context.Context, id SchemaObjectIdentifier) (*Stream, error) { streams, err := v.Show(ctx, NewShowStreamRequest(). - WithIn(&In{ - Schema: id.SchemaId(), + WithIn(ExtendedIn{ + In: In{ + Schema: id.SchemaId(), + }, }). - WithLike(&Like{Pattern: String(id.Name())})) + WithLike(Like{Pattern: String(id.Name())})) if err != nil { return nil, err } return collections.FindFirst(streams, func(r Stream) bool { return r.Name == id.Name() }) } -func (v *streams) Describe(ctx context.Context, request *DescribeStreamRequest) (*Stream, error) { +func (v *streams) Describe(ctx context.Context, id SchemaObjectIdentifier) (*Stream, error) { opts := &DescribeStreamOptions{ - name: request.name, + name: id, } result, err := validateAndQueryOne[showStreamsDbRow](v.client, ctx, opts) if err != nil { @@ -85,6 +87,7 @@ func (r *CreateOnTableStreamRequest) toOpts() *CreateOnTableStreamOptions { OrReplace: r.OrReplace, IfNotExists: r.IfNotExists, name: r.name, + Tag: r.Tag, CopyGrants: r.CopyGrants, TableId: r.TableId, @@ -96,12 +99,13 @@ func (r *CreateOnTableStreamRequest) toOpts() *CreateOnTableStreamOptions { opts.On = &OnStream{ At: r.On.At, Before: r.On.Before, - Statement: OnStreamStatement{ - Timestamp: r.On.Statement.Timestamp, - Offset: r.On.Statement.Offset, - Statement: r.On.Statement.Statement, - Stream: r.On.Statement.Stream, - }, + } + + opts.On.Statement = OnStreamStatement{ + Timestamp: r.On.Statement.Timestamp, + Offset: r.On.Statement.Offset, + Statement: r.On.Statement.Statement, + Stream: r.On.Statement.Stream, } } return opts @@ -112,6 +116,7 @@ func (r *CreateOnExternalTableStreamRequest) toOpts() *CreateOnExternalTableStre OrReplace: r.OrReplace, IfNotExists: r.IfNotExists, name: r.name, + Tag: r.Tag, CopyGrants: r.CopyGrants, ExternalTableId: r.ExternalTableId, @@ -122,12 +127,13 @@ func (r *CreateOnExternalTableStreamRequest) toOpts() *CreateOnExternalTableStre opts.On = &OnStream{ At: r.On.At, Before: r.On.Before, - Statement: OnStreamStatement{ - Timestamp: r.On.Statement.Timestamp, - Offset: r.On.Statement.Offset, - Statement: r.On.Statement.Statement, - Stream: r.On.Statement.Stream, - }, + } + + opts.On.Statement = OnStreamStatement{ + Timestamp: r.On.Statement.Timestamp, + Offset: r.On.Statement.Offset, + Statement: r.On.Statement.Statement, + Stream: r.On.Statement.Stream, } } return opts @@ -138,6 +144,7 @@ func (r *CreateOnDirectoryTableStreamRequest) toOpts() *CreateOnDirectoryTableSt OrReplace: r.OrReplace, IfNotExists: r.IfNotExists, name: r.name, + Tag: r.Tag, CopyGrants: r.CopyGrants, StageId: r.StageId, Comment: r.Comment, @@ -150,6 +157,7 @@ func (r *CreateOnViewStreamRequest) toOpts() *CreateOnViewStreamOptions { OrReplace: r.OrReplace, IfNotExists: r.IfNotExists, name: r.name, + Tag: r.Tag, CopyGrants: r.CopyGrants, ViewId: r.ViewId, @@ -161,12 +169,13 @@ func (r *CreateOnViewStreamRequest) toOpts() *CreateOnViewStreamOptions { opts.On = &OnStream{ At: r.On.At, Before: r.On.Before, - Statement: OnStreamStatement{ - Timestamp: r.On.Statement.Timestamp, - Offset: r.On.Statement.Offset, - Statement: r.On.Statement.Statement, - Stream: r.On.Statement.Stream, - }, + } + + opts.On.Statement = OnStreamStatement{ + Timestamp: r.On.Statement.Timestamp, + Offset: r.On.Statement.Offset, + Statement: r.On.Statement.Statement, + Stream: r.On.Statement.Stream, } } return opts @@ -223,9 +232,6 @@ func (r showStreamsDbRow) convert() *Stream { if r.StaleAfter.Valid { s.StaleAfter = &r.StaleAfter.Time } - if r.TableOn.Valid { - s.TableOn = &r.TableOn.String - } if r.Owner.Valid { s.Owner = &r.Owner.String } diff --git a/pkg/sdk/streams_validations_gen.go b/pkg/sdk/streams_validations_gen.go index fb2e53fba6..14fb378c7a 100644 --- a/pkg/sdk/streams_validations_gen.go +++ b/pkg/sdk/streams_validations_gen.go @@ -1,7 +1,5 @@ package sdk -import "errors" - var ( _ validatable = new(CreateOnTableStreamOptions) _ validatable = new(CreateOnExternalTableStreamOptions) @@ -16,7 +14,7 @@ var ( func (opts *CreateOnTableStreamOptions) validate() error { if opts == nil { - return errors.Join(ErrNilOptions) + return ErrNilOptions } var errs []error if !ValidObjectIdentifier(opts.name) { @@ -29,21 +27,21 @@ func (opts *CreateOnTableStreamOptions) validate() error { errs = append(errs, errOneOf("CreateOnTableStreamOptions", "IfNotExists", "OrReplace")) } if valueSet(opts.On) { - if ok := exactlyOneValueSet(opts.On.At, opts.On.Before); !ok { + if !exactlyOneValueSet(opts.On.At, opts.On.Before) { errs = append(errs, errExactlyOneOf("CreateOnTableStreamOptions.On", "At", "Before")) } if valueSet(opts.On.Statement) { - if ok := exactlyOneValueSet(opts.On.Statement.Timestamp, opts.On.Statement.Offset, opts.On.Statement.Statement, opts.On.Statement.Stream); !ok { + if !exactlyOneValueSet(opts.On.Statement.Timestamp, opts.On.Statement.Offset, opts.On.Statement.Statement, opts.On.Statement.Stream) { errs = append(errs, errExactlyOneOf("CreateOnTableStreamOptions.On.Statement", "Timestamp", "Offset", "Statement", "Stream")) } } } - return errors.Join(errs...) + return JoinErrors(errs...) } func (opts *CreateOnExternalTableStreamOptions) validate() error { if opts == nil { - return errors.Join(ErrNilOptions) + return ErrNilOptions } var errs []error if !ValidObjectIdentifier(opts.name) { @@ -56,21 +54,21 @@ func (opts *CreateOnExternalTableStreamOptions) validate() error { errs = append(errs, errOneOf("CreateOnExternalTableStreamOptions", "IfNotExists", "OrReplace")) } if valueSet(opts.On) { - if ok := exactlyOneValueSet(opts.On.At, opts.On.Before); !ok { + if !exactlyOneValueSet(opts.On.At, opts.On.Before) { errs = append(errs, errExactlyOneOf("CreateOnExternalTableStreamOptions.On", "At", "Before")) } if valueSet(opts.On.Statement) { - if ok := exactlyOneValueSet(opts.On.Statement.Timestamp, opts.On.Statement.Offset, opts.On.Statement.Statement, opts.On.Statement.Stream); !ok { + if !exactlyOneValueSet(opts.On.Statement.Timestamp, opts.On.Statement.Offset, opts.On.Statement.Statement, opts.On.Statement.Stream) { errs = append(errs, errExactlyOneOf("CreateOnExternalTableStreamOptions.On.Statement", "Timestamp", "Offset", "Statement", "Stream")) } } } - return errors.Join(errs...) + return JoinErrors(errs...) } func (opts *CreateOnDirectoryTableStreamOptions) validate() error { if opts == nil { - return errors.Join(ErrNilOptions) + return ErrNilOptions } var errs []error if !ValidObjectIdentifier(opts.name) { @@ -82,12 +80,12 @@ func (opts *CreateOnDirectoryTableStreamOptions) validate() error { if everyValueSet(opts.IfNotExists, opts.OrReplace) { errs = append(errs, errOneOf("CreateOnDirectoryTableStreamOptions", "IfNotExists", "OrReplace")) } - return errors.Join(errs...) + return JoinErrors(errs...) } func (opts *CreateOnViewStreamOptions) validate() error { if opts == nil { - return errors.Join(ErrNilOptions) + return ErrNilOptions } var errs []error if !ValidObjectIdentifier(opts.name) { @@ -100,32 +98,32 @@ func (opts *CreateOnViewStreamOptions) validate() error { errs = append(errs, errOneOf("CreateOnViewStreamOptions", "IfNotExists", "OrReplace")) } if valueSet(opts.On) { - if ok := exactlyOneValueSet(opts.On.At, opts.On.Before); !ok { + if !exactlyOneValueSet(opts.On.At, opts.On.Before) { errs = append(errs, errExactlyOneOf("CreateOnViewStreamOptions.On", "At", "Before")) } if valueSet(opts.On.Statement) { - if ok := exactlyOneValueSet(opts.On.Statement.Timestamp, opts.On.Statement.Offset, opts.On.Statement.Statement, opts.On.Statement.Stream); !ok { + if !exactlyOneValueSet(opts.On.Statement.Timestamp, opts.On.Statement.Offset, opts.On.Statement.Statement, opts.On.Statement.Stream) { errs = append(errs, errExactlyOneOf("CreateOnViewStreamOptions.On.Statement", "Timestamp", "Offset", "Statement", "Stream")) } } } - return errors.Join(errs...) + return JoinErrors(errs...) } func (opts *CloneStreamOptions) validate() error { if opts == nil { - return errors.Join(ErrNilOptions) + return ErrNilOptions } var errs []error if !ValidObjectIdentifier(opts.name) { errs = append(errs, ErrInvalidObjectIdentifier) } - return errors.Join(errs...) + return JoinErrors(errs...) } func (opts *AlterStreamOptions) validate() error { if opts == nil { - return errors.Join(ErrNilOptions) + return ErrNilOptions } var errs []error if !ValidObjectIdentifier(opts.name) { @@ -134,38 +132,38 @@ func (opts *AlterStreamOptions) validate() error { if everyValueSet(opts.IfExists, opts.UnsetTags) { errs = append(errs, errOneOf("AlterStreamOptions", "IfExists", "UnsetTags")) } - if ok := exactlyOneValueSet(opts.SetComment, opts.UnsetComment, opts.SetTags, opts.UnsetTags); !ok { + if !exactlyOneValueSet(opts.SetComment, opts.UnsetComment, opts.SetTags, opts.UnsetTags) { errs = append(errs, errExactlyOneOf("AlterStreamOptions", "SetComment", "UnsetComment", "SetTags", "UnsetTags")) } - return errors.Join(errs...) + return JoinErrors(errs...) } func (opts *DropStreamOptions) validate() error { if opts == nil { - return errors.Join(ErrNilOptions) + return ErrNilOptions } var errs []error if !ValidObjectIdentifier(opts.name) { errs = append(errs, ErrInvalidObjectIdentifier) } - return errors.Join(errs...) + return JoinErrors(errs...) } func (opts *ShowStreamOptions) validate() error { if opts == nil { - return errors.Join(ErrNilOptions) + return ErrNilOptions } var errs []error - return errors.Join(errs...) + return JoinErrors(errs...) } func (opts *DescribeStreamOptions) validate() error { if opts == nil { - return errors.Join(ErrNilOptions) + return ErrNilOptions } var errs []error if !ValidObjectIdentifier(opts.name) { errs = append(errs, ErrInvalidObjectIdentifier) } - return errors.Join(errs...) + return JoinErrors(errs...) } diff --git a/pkg/sdk/tables_dto.go b/pkg/sdk/tables_dto.go index 65b82e1359..c11e0d65cd 100644 --- a/pkg/sdk/tables_dto.go +++ b/pkg/sdk/tables_dto.go @@ -76,6 +76,10 @@ type CreateTableRequest struct { Comment *string } +func (r *CreateTableRequest) GetName() SchemaObjectIdentifier { + return r.name +} + type RowAccessPolicyRequest struct { Name SchemaObjectIdentifier // required On []string // required diff --git a/pkg/sdk/testint/context_functions_integration_test.go b/pkg/sdk/testint/context_functions_integration_test.go index 8767e051d8..f934160054 100644 --- a/pkg/sdk/testint/context_functions_integration_test.go +++ b/pkg/sdk/testint/context_functions_integration_test.go @@ -204,3 +204,11 @@ func TestInt_RolesUseSecondaryRoles(t *testing.T) { require.NoError(t, err) }) } + +func TestInt_LastQueryId(t *testing.T) { + client := testClient(t) + ctx := testContext(t) + lastQueryId, err := client.ContextFunctions.LastQueryId(ctx) + require.NoError(t, err) + require.NotEmpty(t, lastQueryId) +} diff --git a/pkg/sdk/testint/cortex_search_services_integration_test.go b/pkg/sdk/testint/cortex_search_services_integration_test.go index 4f8368f1c3..14e22c2e06 100644 --- a/pkg/sdk/testint/cortex_search_services_integration_test.go +++ b/pkg/sdk/testint/cortex_search_services_integration_test.go @@ -28,7 +28,7 @@ func TestInt_CortexSearchServices(t *testing.T) { createCortexSearchService := func(t *testing.T, id sdk.SchemaObjectIdentifier) *sdk.CortexSearchService { t.Helper() - table, tableCleanup := testClientHelper().Table.CreateTableWithPredefinedColumns(t) + table, tableCleanup := testClientHelper().Table.CreateWithPredefinedColumns(t) t.Cleanup(tableCleanup) err := client.CortexSearchServices.Create(ctx, sdk.NewCreateCortexSearchServiceRequest(id, on, warehouseId, targetLag, buildQuery(table.ID()))) @@ -42,7 +42,7 @@ func TestInt_CortexSearchServices(t *testing.T) { } t.Run("create: test complete", func(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTableWithPredefinedColumns(t) + table, tableCleanup := testClientHelper().Table.CreateWithPredefinedColumns(t) t.Cleanup(tableCleanup) name := testClientHelper().Ids.RandomSchemaObjectIdentifier() diff --git a/pkg/sdk/testint/dynamic_table_integration_test.go b/pkg/sdk/testint/dynamic_table_integration_test.go index b72148210d..778334f561 100644 --- a/pkg/sdk/testint/dynamic_table_integration_test.go +++ b/pkg/sdk/testint/dynamic_table_integration_test.go @@ -14,7 +14,7 @@ import ( func TestInt_DynamicTableCreateAndDrop(t *testing.T) { client := testClient(t) - tableTest, tableCleanup := testClientHelper().Table.CreateTable(t) + tableTest, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) ctx := context.Background() @@ -108,7 +108,7 @@ func TestInt_DynamicTableDescribe(t *testing.T) { client := testClient(t) ctx := context.Background() - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) dynamicTable, dynamicTableCleanup := testClientHelper().DynamicTable.CreateDynamicTable(t, table.ID()) @@ -129,7 +129,7 @@ func TestInt_DynamicTableAlter(t *testing.T) { client := testClient(t) ctx := context.Background() - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) t.Run("alter with suspend or resume", func(t *testing.T) { @@ -215,7 +215,7 @@ func TestInt_DynamicTablesShowByID(t *testing.T) { createDynamicTableHandle := func(t *testing.T, id sdk.SchemaObjectIdentifier) { t.Helper() - tableTest, tableCleanup := testClientHelper().Table.CreateTable(t) + tableTest, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) targetLag := sdk.TargetLag{ MaximumDuration: sdk.String("2 minutes"), diff --git a/pkg/sdk/testint/event_tables_integration_test.go b/pkg/sdk/testint/event_tables_integration_test.go index 7fd440bb3c..f8771a2e6f 100644 --- a/pkg/sdk/testint/event_tables_integration_test.go +++ b/pkg/sdk/testint/event_tables_integration_test.go @@ -216,7 +216,7 @@ func TestInt_EventTables(t *testing.T) { rowAccessPolicy2, rowAccessPolicy2Cleanup := testClientHelper().RowAccessPolicy.CreateRowAccessPolicy(t) t.Cleanup(rowAccessPolicy2Cleanup) - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) // add policy diff --git a/pkg/sdk/testint/grants_integration_test.go b/pkg/sdk/testint/grants_integration_test.go index 309a2001bc..1b36724659 100644 --- a/pkg/sdk/testint/grants_integration_test.go +++ b/pkg/sdk/testint/grants_integration_test.go @@ -158,7 +158,7 @@ func TestInt_GrantAndRevokePrivilegesToAccountRole(t *testing.T) { t.Run("on schema object", func(t *testing.T) { roleTest, roleCleanup := testClientHelper().Role.CreateRole(t) t.Cleanup(roleCleanup) - tableTest, tableTestCleanup := testClientHelper().Table.CreateTable(t) + tableTest, tableTestCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableTestCleanup) privileges := &sdk.AccountRoleGrantPrivileges{ SchemaObjectPrivileges: []sdk.SchemaObjectPrivilege{sdk.SchemaObjectPrivilegeSelect}, @@ -198,7 +198,7 @@ func TestInt_GrantAndRevokePrivilegesToAccountRole(t *testing.T) { t.Run("on schema object: cortex search service", func(t *testing.T) { roleTest, roleCleanup := testClientHelper().Role.CreateRole(t) t.Cleanup(roleCleanup) - table, tableTestCleanup := testClientHelper().Table.CreateTableWithPredefinedColumns(t) + table, tableTestCleanup := testClientHelper().Table.CreateWithPredefinedColumns(t) t.Cleanup(tableTestCleanup) cortex, cortexCleanup := testClientHelper().CortexSearchService.CreateCortexSearchService(t, table.ID()) t.Cleanup(cortexCleanup) @@ -241,7 +241,7 @@ func TestInt_GrantAndRevokePrivilegesToAccountRole(t *testing.T) { t.Run("on all: cortex search service", func(t *testing.T) { roleTest, roleCleanup := testClientHelper().Role.CreateRole(t) t.Cleanup(roleCleanup) - table, tableTestCleanup := testClientHelper().Table.CreateTableWithPredefinedColumns(t) + table, tableTestCleanup := testClientHelper().Table.CreateWithPredefinedColumns(t) t.Cleanup(tableTestCleanup) cortex, cortexCleanup := testClientHelper().CortexSearchService.CreateCortexSearchService(t, table.ID()) t.Cleanup(cortexCleanup) @@ -324,7 +324,7 @@ func TestInt_GrantAndRevokePrivilegesToAccountRole(t *testing.T) { }) t.Run("grant and revoke on all pipes", func(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -381,7 +381,7 @@ func TestInt_GrantAndRevokePrivilegesToAccountRole(t *testing.T) { }) t.Run("grant and revoke on all pipes with multiple errors", func(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -573,7 +573,7 @@ func TestInt_GrantAndRevokePrivilegesToDatabaseRole(t *testing.T) { t.Cleanup(databaseRoleCleanup) databaseRoleId := testClientHelper().Ids.NewDatabaseObjectIdentifier(databaseRole.Name) - table, _ := testClientHelper().Table.CreateTable(t) + table, _ := testClientHelper().Table.Create(t) privileges := &sdk.DatabaseRoleGrantPrivileges{ SchemaObjectPrivileges: []sdk.SchemaObjectPrivilege{sdk.SchemaObjectPrivilegeSelect}, @@ -626,7 +626,7 @@ func TestInt_GrantAndRevokePrivilegesToDatabaseRole(t *testing.T) { databaseRole, databaseRoleCleanup := testClientHelper().DatabaseRole.CreateDatabaseRole(t) t.Cleanup(databaseRoleCleanup) databaseRoleId := testClientHelper().Ids.NewDatabaseObjectIdentifier(databaseRole.Name) - table, tableTestCleanup := testClientHelper().Table.CreateTableWithPredefinedColumns(t) + table, tableTestCleanup := testClientHelper().Table.CreateWithPredefinedColumns(t) t.Cleanup(tableTestCleanup) cortex, cortexCleanup := testClientHelper().CortexSearchService.CreateCortexSearchService(t, table.ID()) t.Cleanup(cortexCleanup) @@ -723,7 +723,7 @@ func TestInt_GrantAndRevokePrivilegesToDatabaseRole(t *testing.T) { }) t.Run("grant and revoke on all pipes", func(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -780,7 +780,7 @@ func TestInt_GrantAndRevokePrivilegesToDatabaseRole(t *testing.T) { }) t.Run("grant and revoke on all pipes with multiple errors", func(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -874,7 +874,7 @@ func TestInt_GrantPrivilegeToShare(t *testing.T) { t.Run("with options", func(t *testing.T) { grantShareOnDatabase(t, shareTest) - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) err := client.Grants.GrantPrivilegeToShare(ctx, []sdk.ObjectPrivilege{sdk.ObjectPrivilegeSelect}, &sdk.ShareGrantOn{ @@ -958,7 +958,7 @@ func TestInt_GrantPrivilegeToShare(t *testing.T) { shareTest, shareCleanup := testClientHelper().Share.CreateShareWithIdentifier(t, testClientHelper().Ids.RandomAccountObjectIdentifierContaining(".foo.bar")) t.Cleanup(shareCleanup) grantShareOnDatabase(t, shareTest) - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) err := client.Grants.GrantPrivilegeToShare(ctx, []sdk.ObjectPrivilege{sdk.ObjectPrivilegeSelect}, &sdk.ShareGrantOn{ @@ -1015,7 +1015,7 @@ func TestInt_GrantOwnership(t *testing.T) { client := testClient(t) ctx := testContext(t) - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -1221,7 +1221,7 @@ func TestInt_GrantOwnership(t *testing.T) { t.Cleanup(databaseRoleCleanup) databaseRoleId := testClientHelper().Ids.NewDatabaseObjectIdentifier(databaseRole.Name) - table, _ := testClientHelper().Table.CreateTable(t) + table, _ := testClientHelper().Table.Create(t) on := sdk.OwnershipGrantOn{ Object: &sdk.Object{ @@ -1362,7 +1362,7 @@ func TestInt_GrantOwnership(t *testing.T) { t.Run("on cortex - with ownership", func(t *testing.T) { role, roleCleanup := testClientHelper().Role.CreateRole(t) t.Cleanup(roleCleanup) - table, tableTestCleanup := testClientHelper().Table.CreateTableWithPredefinedColumns(t) + table, tableTestCleanup := testClientHelper().Table.CreateWithPredefinedColumns(t) t.Cleanup(tableTestCleanup) testClientHelper().Schema.UseDefaultSchema(t) cortex, cortexCleanup := testClientHelper().CortexSearchService.CreateCortexSearchService(t, table.ID()) @@ -2015,7 +2015,7 @@ func TestInt_ShowGrants(t *testing.T) { t.Run("handles unquoted granted object names", func(t *testing.T) { // This name is returned as unquoted from Snowflake name := "G6TM2" - table, tableCleanup := testClientHelper().Table.CreateTableWithName(t, name) + table, tableCleanup := testClientHelper().Table.CreateWithName(t, name) t.Cleanup(tableCleanup) role, roleCleanup := testClientHelper().Role.CreateRole(t) diff --git a/pkg/sdk/testint/materialized_views_gen_integration_test.go b/pkg/sdk/testint/materialized_views_gen_integration_test.go index 7bfa215288..f0759ba400 100644 --- a/pkg/sdk/testint/materialized_views_gen_integration_test.go +++ b/pkg/sdk/testint/materialized_views_gen_integration_test.go @@ -16,7 +16,7 @@ func TestInt_MaterializedViews(t *testing.T) { client := testClient(t) ctx := testContext(t) - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) sql := fmt.Sprintf("SELECT id FROM %s", table.ID().FullyQualifiedName()) @@ -407,7 +407,7 @@ func TestInt_MaterializedViewsShowByID(t *testing.T) { client := testClient(t) ctx := testContext(t) - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) sql := fmt.Sprintf("SELECT id FROM %s", table.ID().FullyQualifiedName()) diff --git a/pkg/sdk/testint/pipes_integration_test.go b/pkg/sdk/testint/pipes_integration_test.go index 8ccfed7d1d..2f8701e802 100644 --- a/pkg/sdk/testint/pipes_integration_test.go +++ b/pkg/sdk/testint/pipes_integration_test.go @@ -28,7 +28,7 @@ func TestInt_CreatePipeWithStrangeSchemaName(t *testing.T) { schema, schemaCleanup := testClientHelper().Schema.CreateSchemaWithName(t, schemaIdentifier.Name()) t.Cleanup(schemaCleanup) - table, tableCleanup := testClientHelper().Table.CreateTableInSchema(t, schema.ID()) + table, tableCleanup := testClientHelper().Table.CreateInSchema(t, schema.ID()) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -66,10 +66,10 @@ func TestInt_CreatePipeWithStrangeSchemaName(t *testing.T) { } func TestInt_PipesShowAndDescribe(t *testing.T) { - table1, table1Cleanup := testClientHelper().Table.CreateTable(t) + table1, table1Cleanup := testClientHelper().Table.Create(t) t.Cleanup(table1Cleanup) - table2, table2Cleanup := testClientHelper().Table.CreateTable(t) + table2, table2Cleanup := testClientHelper().Table.Create(t) t.Cleanup(table2Cleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -145,7 +145,7 @@ func TestInt_PipesShowAndDescribe(t *testing.T) { } func TestInt_PipeCreate(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -213,7 +213,7 @@ func TestInt_PipeCreate(t *testing.T) { } func TestInt_PipeDrop(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -237,7 +237,7 @@ func TestInt_PipeDrop(t *testing.T) { } func TestInt_PipeAlter(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) @@ -339,7 +339,7 @@ func TestInt_PipesShowByID(t *testing.T) { client := testClient(t) ctx := testContext(t) - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStage(t) t.Cleanup(stageCleanup) diff --git a/pkg/sdk/testint/schemas_integration_test.go b/pkg/sdk/testint/schemas_integration_test.go index 151088c943..0031f8a353 100644 --- a/pkg/sdk/testint/schemas_integration_test.go +++ b/pkg/sdk/testint/schemas_integration_test.go @@ -280,7 +280,7 @@ func TestInt_Schemas(t *testing.T) { swapSchema, cleanupSwapSchema := testClientHelper().Schema.CreateSchema(t) t.Cleanup(cleanupSwapSchema) - table, _ := testClientHelper().Table.CreateTableInSchema(t, schema.ID()) + table, _ := testClientHelper().Table.CreateInSchema(t, schema.ID()) t.Cleanup(func() { newId := sdk.NewSchemaObjectIdentifierInSchema(swapSchema.ID(), table.Name) err := client.Tables.Drop(ctx, sdk.NewDropTableRequest(newId)) diff --git a/pkg/sdk/testint/streams_gen_integration_test.go b/pkg/sdk/testint/streams_gen_integration_test.go index 29fa5fd243..3c53684ca1 100644 --- a/pkg/sdk/testint/streams_gen_integration_test.go +++ b/pkg/sdk/testint/streams_gen_integration_test.go @@ -1,10 +1,11 @@ package testint import ( - "errors" "fmt" "testing" + assertions "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert" + "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/acceptance/bettertestspoc/assert/objectassert" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/internal/collections" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/sdk" "github.com/stretchr/testify/assert" @@ -18,37 +19,93 @@ func TestInt_Streams(t *testing.T) { databaseId := testClientHelper().Ids.DatabaseId() schemaId := testClientHelper().Ids.SchemaId() - assertStream := func(t *testing.T, s *sdk.Stream, id sdk.SchemaObjectIdentifier, sourceType string, mode string) { - t.Helper() - assert.NotNil(t, s) - assert.Nil(t, s.TableOn) - assert.Equal(t, id.Name(), s.Name) - assert.Equal(t, databaseId.Name(), s.DatabaseName) - assert.Equal(t, schemaId.Name(), s.SchemaName) - assert.Equal(t, "some comment", *s.Comment) - assert.Equal(t, sourceType, *s.SourceType) - assert.Equal(t, mode, *s.Mode) - } - + // There is no way to check at/before fields in show and describe. That's why in Create tests we try creating with these values, but do not assert them. t.Run("CreateOnTable", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateWithChangeTracking(t) t.Cleanup(cleanupTable) + tableId := table.ID() + + tag, tagCleanup := testClientHelper().Tag.CreateTag(t) + t.Cleanup(tagCleanup) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnTableStreamRequest(id, tableId). + WithOn(*sdk.NewOnStreamRequest().WithAt(true).WithStatement(*sdk.NewOnStreamStatementRequest().WithOffset("0"))). + WithAppendOnly(true). + WithShowInitialRows(true). + WithComment("some comment"). + WithTag([]sdk.TagAssociation{ + { + Name: tag.ID(), + Value: "v1", + }, + }) err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) - s, err := client.Streams.ShowByID(ctx, id) + tag1Value, err := client.SystemFunctions.GetTag(ctx, tag.ID(), id, sdk.ObjectTypeStream) + require.NoError(t, err) + assert.Equal(t, "v1", tag1Value) + + assertions.AssertThatObject(t, objectassert.Stream(t, id). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasComment("some comment"). + HasSourceType("Table"). + HasMode("APPEND_ONLY"). + HasTableId(tableId.FullyQualifiedName()), + ) + + // at stream + req = sdk.NewCreateOnTableStreamRequest(id, tableId). + WithOrReplace(true). + WithOn(*sdk.NewOnStreamRequest().WithAt(true).WithStatement(*sdk.NewOnStreamStatementRequest().WithStream(id.FullyQualifiedName()))) + err = client.Streams.CreateOnTable(ctx, req) + require.NoError(t, err) + + // at statement + _, err = testClient(t).ExecForTests(ctx, fmt.Sprintf("INSERT INTO %s VALUES(1);", table.ID().FullyQualifiedName())) + require.NoError(t, err) + + lastQueryId, err := testClient(t).ContextFunctions.LastQueryId(ctx) + require.NoError(t, err) + + req = sdk.NewCreateOnTableStreamRequest(id, tableId). + WithOrReplace(true). + WithOn(*sdk.NewOnStreamRequest().WithAt(true).WithStatement(*sdk.NewOnStreamStatementRequest().WithStatement(lastQueryId))) + err = client.Streams.CreateOnTable(ctx, req) + require.NoError(t, err) + + // before offset + req = sdk.NewCreateOnTableStreamRequest(id, tableId). + WithOrReplace(true). + WithOn(*sdk.NewOnStreamRequest().WithBefore(true).WithStatement(*sdk.NewOnStreamStatementRequest().WithOffset("0"))) + err = client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - // TODO [SNOW-1348112]: make nicer during the stream rework - assert.Equal(t, table.ID().FullyQualifiedName(), sdk.NewSchemaObjectIdentifierFromFullyQualifiedName(*s.TableName).FullyQualifiedName()) - assertStream(t, s, id, "Table", "DEFAULT") + // before stream + req = sdk.NewCreateOnTableStreamRequest(id, tableId). + WithOrReplace(true). + WithOn(*sdk.NewOnStreamRequest().WithBefore(true).WithStatement(*sdk.NewOnStreamStatementRequest().WithStream(id.FullyQualifiedName()))) + err = client.Streams.CreateOnTable(ctx, req) + require.NoError(t, err) + + // before statement + _, err = testClient(t).ExecForTests(ctx, fmt.Sprintf("INSERT INTO %s VALUES(1);", table.ID().FullyQualifiedName())) + require.NoError(t, err) + + lastQueryId, err = testClient(t).ContextFunctions.LastQueryId(ctx) + require.NoError(t, err) + + req = sdk.NewCreateOnTableStreamRequest(id, tableId). + WithOrReplace(true). + WithOn(*sdk.NewOnStreamRequest().WithBefore(true).WithStatement(*sdk.NewOnStreamStatementRequest().WithStatement(lastQueryId))) + err = client.Streams.CreateOnTable(ctx, req) + require.NoError(t, err) + + // TODO(SNOW-1689111): test timestamps }) t.Run("CreateOnExternalTable", func(t *testing.T) { @@ -58,28 +115,25 @@ func TestInt_Streams(t *testing.T) { t.Cleanup(stageCleanup) externalTableId := testClientHelper().Ids.RandomSchemaObjectIdentifier() - err := client.ExternalTables.Create(ctx, sdk.NewCreateExternalTableRequest(externalTableId, stageLocation).WithFileFormat(*sdk.NewExternalTableFileFormatRequest().WithFileFormatType(sdk.ExternalTableFileFormatTypeJSON))) - require.NoError(t, err) - t.Cleanup(func() { - err := client.ExternalTables.Drop(ctx, sdk.NewDropExternalTableRequest(externalTableId)) - require.NoError(t, err) - }) + externalTableReq := sdk.NewCreateExternalTableRequest(externalTableId, stageLocation).WithFileFormat(*sdk.NewExternalTableFileFormatRequest().WithFileFormatType(sdk.ExternalTableFileFormatTypeJSON)) + _, externalTableCleanup := testClientHelper().ExternalTable.CreateOnTableWithRequest(t, externalTableReq) + t.Cleanup(externalTableCleanup) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnExternalTableRequest(id, externalTableId).WithInsertOnly(sdk.Bool(true)).WithComment(sdk.String("some comment")) - err = client.Streams.CreateOnExternalTable(ctx, req) - require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) - - s, err := client.Streams.ShowByID(ctx, id) - require.NoError(t, err) - - // TODO [SNOW-1348112]: make nicer during the stream rework - assert.Equal(t, externalTableId.FullyQualifiedName(), sdk.NewSchemaObjectIdentifierFromFullyQualifiedName(*s.TableName).FullyQualifiedName()) - assertStream(t, s, id, "External Table", "INSERT_ONLY") + req := sdk.NewCreateOnExternalTableStreamRequest(id, externalTableId).WithInsertOnly(true).WithComment("some comment") + err := client.Streams.CreateOnExternalTable(ctx, req) + require.NoError(t, err) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) + + assertions.AssertThatObject(t, objectassert.Stream(t, id). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasComment("some comment"). + HasSourceType("External Table"). + HasMode("INSERT_ONLY"). + HasTableId(externalTableId.FullyQualifiedName()), + ) }) t.Run("CreateOnDirectoryTable", func(t *testing.T) { @@ -87,83 +141,84 @@ func TestInt_Streams(t *testing.T) { t.Cleanup(cleanupStage) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnDirectoryTableRequest(id, stage.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnDirectoryTableStreamRequest(id, stage.ID()).WithComment("some comment") err := client.Streams.CreateOnDirectoryTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) - - s, err := client.Streams.ShowByID(ctx, id) - require.NoError(t, err) - - assertStream(t, s, id, "Stage", "DEFAULT") + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) + + assertions.AssertThatObject(t, objectassert.Stream(t, id). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasComment("some comment"). + HasSourceType("Stage"). + HasMode("DEFAULT"). + HasStageName(stage.ID().Name()), + ) }) t.Run("CreateOnView", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) view, cleanupView := testClientHelper().View.CreateView(t, fmt.Sprintf("SELECT id FROM %s", table.ID().FullyQualifiedName())) t.Cleanup(cleanupView) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnViewRequest(id, view.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnViewStreamRequest(id, view.ID()). + WithAppendOnly(true). + WithShowInitialRows(true). + WithComment("some comment") err := client.Streams.CreateOnView(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) - - s, err := client.Streams.ShowByID(ctx, id) - require.NoError(t, err) - - assertStream(t, s, id, "View", "DEFAULT") + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) + + assertions.AssertThatObject(t, objectassert.Stream(t, id). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasComment("some comment"). + HasSourceType("View"). + HasMode("APPEND_ONLY"). + HasTableId(view.ID().FullyQualifiedName()), + ) }) t.Run("Clone", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnTableStreamRequest(id, table.ID()).WithComment("some comment") err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) cloneId := testClientHelper().Ids.RandomSchemaObjectIdentifier() - err = client.Streams.Clone(ctx, sdk.NewCloneStreamRequest(cloneId, id)) - require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(cloneId)) - require.NoError(t, err) - }) - - s, err := client.Streams.ShowByID(ctx, cloneId) - require.NoError(t, err) - - assertStream(t, s, cloneId, "Table", "DEFAULT") - // TODO [SNOW-1348112]: make nicer during the stream rework - assert.Equal(t, table.ID().FullyQualifiedName(), sdk.NewSchemaObjectIdentifierFromFullyQualifiedName(*s.TableName).FullyQualifiedName()) + err = client.Streams.Clone(ctx, sdk.NewCloneStreamRequest(cloneId, id).WithCopyGrants(true)) + require.NoError(t, err) + t.Cleanup(testClientHelper().Stream.DropFunc(t, cloneId)) + + assertions.AssertThatObject(t, objectassert.Stream(t, id). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasComment("some comment"). + HasSourceType("Table"). + HasMode("DEFAULT"). + HasTableId(table.ID().FullyQualifiedName()), + ) }) t.Run("Alter tags", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()) + req := sdk.NewCreateOnTableStreamRequest(id, table.ID()) err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) tag, cleanupTag := testClientHelper().Tag.CreateTag(t) t.Cleanup(cleanupTag) @@ -194,30 +249,27 @@ func TestInt_Streams(t *testing.T) { }) t.Run("Alter comment", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()) + req := sdk.NewCreateOnTableStreamRequest(id, table.ID()) err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) s, err := client.Streams.ShowByID(ctx, id) require.NoError(t, err) assert.Equal(t, "", *s.Comment) - err = client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithSetComment(sdk.String("some_comment"))) + err = client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithSetComment("some_comment")) require.NoError(t, err) s, err = client.Streams.ShowByID(ctx, id) require.NoError(t, err) assert.Equal(t, "some_comment", *s.Comment) - err = client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithUnsetComment(sdk.Bool(true))) + err = client.Streams.Alter(ctx, sdk.NewAlterStreamRequest(id).WithUnsetComment(true)) require.NoError(t, err) s, err = client.Streams.ShowByID(ctx, id) @@ -229,11 +281,11 @@ func TestInt_Streams(t *testing.T) { }) t.Run("Drop", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnTableStreamRequest(id, table.ID()).WithComment("some comment") err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) @@ -248,19 +300,16 @@ func TestInt_Streams(t *testing.T) { }) t.Run("Show terse", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnTableStreamRequest(id, table.ID()).WithComment("some comment") err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) - s, err := client.Streams.Show(ctx, sdk.NewShowStreamRequest().WithTerse(sdk.Bool(true))) + s, err := client.Streams.Show(ctx, sdk.NewShowStreamRequest().WithTerse(true)) require.NoError(t, err) stream, err := collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id.Name() == stream.Name }) @@ -270,191 +319,157 @@ func TestInt_Streams(t *testing.T) { assert.Equal(t, id.Name(), stream.Name) assert.Equal(t, databaseId.Name(), stream.DatabaseName) assert.Equal(t, schemaId.Name(), stream.SchemaName) - assert.Equal(t, table.Name, *stream.TableOn) assert.Nil(t, stream.Comment) assert.Nil(t, stream.SourceType) assert.Nil(t, stream.Mode) }) t.Run("Show single with options", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnTableStreamRequest(id, table.ID()).WithComment("some comment") err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) s, err := client.Streams.Show(ctx, sdk.NewShowStreamRequest(). - WithTerse(sdk.Bool(false)). - WithIn(&sdk.In{ - Schema: schemaId, + WithTerse(false). + WithIn(sdk.ExtendedIn{ + In: sdk.In{ + Schema: schemaId, + }, }). - WithLike(&sdk.Like{ + WithLike(sdk.Like{ Pattern: sdk.String(id.Name()), }). - WithStartsWith(sdk.String(id.Name())). - WithLimit(&sdk.LimitFrom{ + WithStartsWith(id.Name()). + WithLimit(sdk.LimitFrom{ Rows: sdk.Int(1), })) require.NoError(t, err) assert.Equal(t, 1, len(s)) - stream, err := collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id.Name() == stream.Name }) + _, err = collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id.Name() == stream.Name }) require.NoError(t, err) - assertStream(t, stream, id, "Table", "DEFAULT") + assertions.AssertThatObject(t, objectassert.Stream(t, id). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasComment("some comment"). + HasSourceType("Table"). + HasMode("DEFAULT"). + HasTableId(table.ID().FullyQualifiedName()), + ) }) t.Run("Show multiple", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnTableStreamRequest(id, table.ID()).WithComment("some comment") err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) id2 := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req2 := sdk.NewCreateStreamOnTableRequest(id2, table.ID()).WithComment(sdk.String("some comment")) + req2 := sdk.NewCreateOnTableStreamRequest(id2, table.ID()).WithComment("some comment") err = client.Streams.CreateOnTable(ctx, req2) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id2)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id2)) s, err := client.Streams.Show(ctx, sdk.NewShowStreamRequest()) require.NoError(t, err) assert.Equal(t, 2, len(s)) - stream, err := collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id.Name() == stream.Name }) + _, err = collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id.Name() == stream.Name }) require.NoError(t, err) - stream2, err := collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id2.Name() == stream.Name }) + _, err = collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id2.Name() == stream.Name }) require.NoError(t, err) - - assertStream(t, stream, id, "Table", "DEFAULT") - assertStream(t, stream2, id2, "Table", "DEFAULT") }) t.Run("Show multiple with options", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) idPrefix := "stream_show_" id := testClientHelper().Ids.RandomSchemaObjectIdentifierWithPrefix(idPrefix) - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnTableStreamRequest(id, table.ID()).WithComment("some comment") err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) id2 := testClientHelper().Ids.RandomSchemaObjectIdentifierWithPrefix(idPrefix) - req2 := sdk.NewCreateStreamOnTableRequest(id2, table.ID()).WithComment(sdk.String("some comment")) + req2 := sdk.NewCreateOnTableStreamRequest(id2, table.ID()).WithComment("some comment") err = client.Streams.CreateOnTable(ctx, req2) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id2)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id2)) s, err := client.Streams.Show(ctx, sdk.NewShowStreamRequest(). - WithTerse(sdk.Bool(false)). - WithIn(&sdk.In{ - Schema: schemaId, + WithTerse(false). + WithIn(sdk.ExtendedIn{ + In: sdk.In{ + Schema: schemaId, + }, }). - WithStartsWith(sdk.String(idPrefix)). - WithLimit(&sdk.LimitFrom{ + WithStartsWith(idPrefix). + WithLimit(sdk.LimitFrom{ Rows: sdk.Int(2), })) require.NoError(t, err) assert.Equal(t, 2, len(s)) - stream, err := collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id.Name() == stream.Name }) + _, err = collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id.Name() == stream.Name }) require.NoError(t, err) - stream2, err := collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id2.Name() == stream.Name }) + _, err = collections.FindFirst[sdk.Stream](s, func(stream sdk.Stream) bool { return id2.Name() == stream.Name }) require.NoError(t, err) - - assertStream(t, stream, id, "Table", "DEFAULT") - assertStream(t, stream2, id2, "Table", "DEFAULT") }) t.Run("Describe", func(t *testing.T) { - table, cleanupTable := testClientHelper().Table.CreateTableInSchema(t, schemaId) + table, cleanupTable := testClientHelper().Table.CreateInSchema(t, schemaId) t.Cleanup(cleanupTable) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() - req := sdk.NewCreateStreamOnTableRequest(id, table.ID()).WithComment(sdk.String("some comment")) + req := sdk.NewCreateOnTableStreamRequest(id, table.ID()).WithComment("some comment") err := client.Streams.CreateOnTable(ctx, req) require.NoError(t, err) - t.Cleanup(func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - require.NoError(t, err) - }) + t.Cleanup(testClientHelper().Stream.DropFunc(t, id)) - s, err := client.Streams.Describe(ctx, sdk.NewDescribeStreamRequest(id)) + s, err := client.Streams.Describe(ctx, id) require.NoError(t, err) assert.NotNil(t, s) - assert.Equal(t, id.Name(), s.Name) - assert.Equal(t, databaseId.Name(), s.DatabaseName) - assert.Equal(t, schemaId.Name(), s.SchemaName) - assert.Nil(t, s.TableOn) - assert.Equal(t, "some comment", *s.Comment) - // TODO [SNOW-1348112]: make nicer during the stream rework - assert.Equal(t, table.ID().FullyQualifiedName(), sdk.NewSchemaObjectIdentifierFromFullyQualifiedName(*s.TableName).FullyQualifiedName()) - assert.Equal(t, "Table", *s.SourceType) - assert.Equal(t, "DEFAULT", *s.Mode) + assertions.AssertThatObject(t, objectassert.Stream(t, id). + HasName(id.Name()). + HasDatabaseName(id.DatabaseName()). + HasSchemaName(id.SchemaName()). + HasComment("some comment"). + HasSourceType("Table"). + HasMode("DEFAULT"). + HasTableId(table.ID().FullyQualifiedName()), + ) }) -} - -func TestInt_StreamsShowByID(t *testing.T) { - client := testClient(t) - ctx := testContext(t) - - table, cleanupTable := testClientHelper().Table.CreateTable(t) - t.Cleanup(cleanupTable) - - cleanupStreamHandle := func(id sdk.SchemaObjectIdentifier) func() { - return func() { - err := client.Streams.Drop(ctx, sdk.NewDropStreamRequest(id)) - if errors.Is(err, sdk.ErrObjectNotExistOrAuthorized) { - return - } - require.NoError(t, err) - } - } - - createStreamHandle := func(t *testing.T, id sdk.SchemaObjectIdentifier) { - t.Helper() - - err := client.Streams.CreateOnTable(ctx, sdk.NewCreateStreamOnTableRequest(id, table.ID())) - require.NoError(t, err) - t.Cleanup(cleanupStreamHandle(id)) - } t.Run("show by id - same name in different schemas", func(t *testing.T) { schema, schemaCleanup := testClientHelper().Schema.CreateSchema(t) t.Cleanup(schemaCleanup) + table, cleanupTable := testClientHelper().Table.Create(t) + t.Cleanup(cleanupTable) + id1 := testClientHelper().Ids.RandomSchemaObjectIdentifier() id2 := testClientHelper().Ids.NewSchemaObjectIdentifierInSchema(id1.Name(), schema.ID()) - createStreamHandle(t, id1) - createStreamHandle(t, id2) + _, stream1Cleanup := testClientHelper().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id1, table.ID())) + t.Cleanup(stream1Cleanup) + _, stream2Cleanup := testClientHelper().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id2, table.ID())) + t.Cleanup(stream2Cleanup) e1, err := client.Streams.ShowByID(ctx, id1) require.NoError(t, err) diff --git a/pkg/sdk/testint/system_functions_integration_test.go b/pkg/sdk/testint/system_functions_integration_test.go index 4ae2311ef5..4b60bc016b 100644 --- a/pkg/sdk/testint/system_functions_integration_test.go +++ b/pkg/sdk/testint/system_functions_integration_test.go @@ -53,7 +53,7 @@ func TestInt_PipeStatus(t *testing.T) { schema, schemaCleanup := testClientHelper().Schema.CreateSchema(t) t.Cleanup(schemaCleanup) - table, tableCleanup := testClientHelper().Table.CreateTableInSchema(t, schema.ID()) + table, tableCleanup := testClientHelper().Table.CreateInSchema(t, schema.ID()) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStageInSchema(t, schema.ID()) @@ -102,7 +102,7 @@ func TestInt_PipeForceResume(t *testing.T) { schema, schemaCleanup := testClientHelper().Schema.CreateSchema(t) t.Cleanup(schemaCleanup) - table, tableCleanup := testClientHelper().Table.CreateTableInSchema(t, schema.ID()) + table, tableCleanup := testClientHelper().Table.CreateInSchema(t, schema.ID()) t.Cleanup(tableCleanup) stage, stageCleanup := testClientHelper().Stage.CreateStageInSchema(t, schema.ID()) diff --git a/pkg/sdk/testint/tables_integration_test.go b/pkg/sdk/testint/tables_integration_test.go index aaedb0a307..9071c4fcc4 100644 --- a/pkg/sdk/testint/tables_integration_test.go +++ b/pkg/sdk/testint/tables_integration_test.go @@ -94,7 +94,7 @@ func TestInt_Table(t *testing.T) { t.Run("create table: complete optionals", func(t *testing.T) { maskingPolicy, maskingPolicyCleanup := testClientHelper().MaskingPolicy.CreateMaskingPolicy(t) t.Cleanup(maskingPolicyCleanup) - table2, table2Cleanup := testClientHelper().Table.CreateTable(t) + table2, table2Cleanup := testClientHelper().Table.Create(t) t.Cleanup(table2Cleanup) comment := random.Comment() @@ -245,7 +245,7 @@ func TestInt_Table(t *testing.T) { *sdk.NewTableColumnRequest("col2", "VARCHAR"), *sdk.NewTableColumnRequest("col3", "BOOLEAN"), } - sourceTable, sourceTableCleanup := testClientHelper().Table.CreateTableWithColumns(t, columns) + sourceTable, sourceTableCleanup := testClientHelper().Table.CreateWithColumns(t, columns) t.Cleanup(sourceTableCleanup) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() @@ -276,7 +276,7 @@ func TestInt_Table(t *testing.T) { *sdk.NewTableColumnRequest("col2", "VARCHAR"), *sdk.NewTableColumnRequest("col3", "BOOLEAN"), } - sourceTable, sourceTableCleanup := testClientHelper().Table.CreateTableWithColumns(t, columns) + sourceTable, sourceTableCleanup := testClientHelper().Table.CreateWithColumns(t, columns) t.Cleanup(sourceTableCleanup) id := testClientHelper().Ids.RandomSchemaObjectIdentifier() @@ -897,7 +897,7 @@ func TestInt_Table(t *testing.T) { }) t.Run("drop table", func(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) err := client.Tables.Drop(ctx, sdk.NewDropTableRequest(table.ID()).WithIfExists(sdk.Bool(true))) if err != nil { t.Cleanup(tableCleanup) @@ -909,9 +909,9 @@ func TestInt_Table(t *testing.T) { }) t.Run("show tables", func(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) - table2, table2Cleanup := testClientHelper().Table.CreateTable(t) + table2, table2Cleanup := testClientHelper().Table.Create(t) t.Cleanup(table2Cleanup) tables, err := client.Tables.Show(ctx, sdk.NewShowTableRequest()) @@ -927,7 +927,7 @@ func TestInt_Table(t *testing.T) { }) t.Run("with terse", func(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) tables, err := client.Tables.Show(ctx, sdk.NewShowTableRequest().WithTerse(sdk.Bool(true)).WithLikePattern(table.ID().Name())) @@ -938,7 +938,7 @@ func TestInt_Table(t *testing.T) { }) t.Run("with starts with", func(t *testing.T) { - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) tables, err := client.Tables.Show(ctx, sdk.NewShowTableRequest().WithStartsWith(sdk.String(table.Name))) diff --git a/pkg/sdk/testint/views_gen_integration_test.go b/pkg/sdk/testint/views_gen_integration_test.go index 17786e41c8..45a5fdb790 100644 --- a/pkg/sdk/testint/views_gen_integration_test.go +++ b/pkg/sdk/testint/views_gen_integration_test.go @@ -21,7 +21,7 @@ func TestInt_Views(t *testing.T) { client := testClient(t) ctx := testContext(t) - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) sql := fmt.Sprintf("SELECT id FROM %s", table.ID().FullyQualifiedName()) @@ -751,7 +751,7 @@ func TestInt_ViewsShowByID(t *testing.T) { client := testClient(t) ctx := testContext(t) - table, tableCleanup := testClientHelper().Table.CreateTable(t) + table, tableCleanup := testClientHelper().Table.Create(t) t.Cleanup(tableCleanup) sql := fmt.Sprintf("SELECT id FROM %s", table.ID().FullyQualifiedName()) diff --git a/v1-preparations/ESSENTIAL_GA_OBJECTS.MD b/v1-preparations/ESSENTIAL_GA_OBJECTS.MD index 46d0122e53..70f0c6fa1c 100644 --- a/v1-preparations/ESSENTIAL_GA_OBJECTS.MD +++ b/v1-preparations/ESSENTIAL_GA_OBJECTS.MD @@ -29,7 +29,7 @@ newer provider versions. We will address these while working on the given object | ROW ACCESS POLICY | 🚀 | [#2053](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2053), [#1600](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1600), [#1151](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1151) | | SCHEMA | 🚀 | issues in the older versions: [resources](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues?q=label%3Aresource%3Aschema+) and [datasources](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues?q=label%3Adata_source%3Aschemas+) | | STAGE | ❌ | [#2995](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2995), [#2818](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2818), [#2505](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2505), [#1911](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1911), [#1903](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1903), [#1795](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1795), [#1705](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1705), [#1544](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1544), [#1491](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1491), [#1087](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1087), [#265](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/265) | -| STREAM | ❌ | [#2975](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2975), [#2413](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2413), [#2201](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2201), [#1150](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1150) | +| STREAM | 👨‍💻 | [#2975](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2975), [#2413](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2413), [#2201](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2201), [#1150](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1150) | | STREAMLIT | 🚀 | - | | TABLE | ❌ | [#2997](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2997), [#2844](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2844), [#2839](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2839), [#2735](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2735), [#2733](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2733), [#2683](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2683), [#2676](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2676), [#2674](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2674), [#2629](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2629), [#2418](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2418), [#2415](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2415), [#2406](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2406), [#2236](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2236), [#2035](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2035), [#1823](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1823), [#1799](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1799), [#1764](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1764), [#1600](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1600), [#1387](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1387), [#1272](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1272), [#1271](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1271), [#1248](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1248), [#1241](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1241), [#1146](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1146), [#1032](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1032), [#420](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/420) | | TAG | ❌ | [#2943](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2902), [#2598](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/2598), [#1910](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1910), [#1909](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1909), [#1862](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1862), [#1806](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1806), [#1657](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1657), [#1496](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1496), [#1443](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1443), [#1394](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1394), [#1372](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1372), [#1074](https://github.com/Snowflake-Labs/terraform-provider-snowflake/issues/1074) |