diff --git a/CHANGELOG.md b/CHANGELOG.md index 67c88a141..d5af49219 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Change Batch API to be consistent with Query() (CASSGO-7) +- Added Cassandra 4.0 table options support(CASSGO-13) + ### Fixed - Retry policy now takes into account query idempotency (CASSGO-27) diff --git a/cassandra_test.go b/cassandra_test.go index ec6969190..c1cf25fb9 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -2314,7 +2314,7 @@ func TestViewMetadata(t *testing.T) { func TestMaterializedViewMetadata(t *testing.T) { if flagCassVersion.Before(3, 0, 0) { - return + t.Skip("The Cassandra version is too old") } session := createSession(t) defer session.Close() @@ -2333,14 +2333,19 @@ func TestMaterializedViewMetadata(t *testing.T) { expectedChunkLengthInKB := "16" expectedDCLocalReadRepairChance := float64(0) expectedSpeculativeRetry := "99p" + expectedAdditionalWritePolicy := "99p" + expectedReadRepair := "BLOCKING" if flagCassVersion.Before(4, 0, 0) { expectedChunkLengthInKB = "64" expectedDCLocalReadRepairChance = 0.1 expectedSpeculativeRetry = "99PERCENTILE" + expectedReadRepair = "" + expectedAdditionalWritePolicy = "" } expectedView1 := MaterializedViewMetadata{ Keyspace: "gocql_test", Name: "view_view", + AdditionalWritePolicy: expectedAdditionalWritePolicy, baseTableName: "view_table", BloomFilterFpChance: 0.01, Caching: map[string]string{"keys": "ALL", "rows_per_partition": "NONE"}, @@ -2352,12 +2357,17 @@ func TestMaterializedViewMetadata(t *testing.T) { DefaultTimeToLive: 0, Extensions: map[string]string{}, GcGraceSeconds: 864000, - IncludeAllColumns: false, MaxIndexInterval: 2048, MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0, - SpeculativeRetry: expectedSpeculativeRetry, + IncludeAllColumns: false, MaxIndexInterval: 2048, + MemtableFlushPeriodInMs: 0, + MinIndexInterval: 128, + ReadRepair: expectedReadRepair, + ReadRepairChance: 0, + SpeculativeRetry: expectedSpeculativeRetry, } expectedView2 := MaterializedViewMetadata{ Keyspace: "gocql_test", Name: "view_view2", + AdditionalWritePolicy: expectedAdditionalWritePolicy, baseTableName: "view_table2", BloomFilterFpChance: 0.01, Caching: map[string]string{"keys": "ALL", "rows_per_partition": "NONE"}, @@ -2369,8 +2379,13 @@ func TestMaterializedViewMetadata(t *testing.T) { DefaultTimeToLive: 0, Extensions: map[string]string{}, GcGraceSeconds: 864000, - IncludeAllColumns: false, MaxIndexInterval: 2048, MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0, - SpeculativeRetry: expectedSpeculativeRetry, + IncludeAllColumns: false, + MaxIndexInterval: 2048, + MemtableFlushPeriodInMs: 0, + MinIndexInterval: 128, + ReadRepair: expectedReadRepair, + ReadRepairChance: 0, + SpeculativeRetry: expectedSpeculativeRetry, } expectedView1.BaseTableId = materializedViews[0].BaseTableId diff --git a/metadata.go b/metadata.go index 6eb798f8a..1d165003f 100644 --- a/metadata.go +++ b/metadata.go @@ -122,6 +122,7 @@ type ViewMetadata struct { type MaterializedViewMetadata struct { Keyspace string Name string + AdditionalWritePolicy string BaseTableId UUID BaseTable *TableMetadata BloomFilterFpChance float64 @@ -139,7 +140,8 @@ type MaterializedViewMetadata struct { MaxIndexInterval int MemtableFlushPeriodInMs int MinIndexInterval int - ReadRepairChance float64 + ReadRepair string // Only present in Cassandra 4.0+ + ReadRepairChance float64 // Note: Cassandra 4.0 removed ReadRepairChance and added ReadRepair instead SpeculativeRetry string baseTableName string @@ -999,69 +1001,201 @@ func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, er return views, nil } +func bytesMapToStringsMap(byteData map[string][]byte) map[string]string { + extensions := make(map[string]string, len(byteData)) + for key, rowByte := range byteData { + extensions[key] = string(rowByte) + } + + return extensions +} + +func materializedViewMetadataFromMap(currentObject map[string]interface{}, materializedView *MaterializedViewMetadata) error { + const errorMessage = "gocql.materializedViewMetadataFromMap failed to read column %s" + var ok bool + for key, value := range currentObject { + switch key { + case "keyspace_name": + materializedView.Keyspace, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "view_name": + materializedView.Name, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "additional_write_policy": + materializedView.AdditionalWritePolicy, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "base_table_id": + materializedView.BaseTableId, ok = value.(UUID) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "base_table_name": + materializedView.baseTableName, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "bloom_filter_fp_chance": + materializedView.BloomFilterFpChance, ok = value.(float64) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "caching": + materializedView.Caching, ok = value.(map[string]string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "comment": + materializedView.Comment, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "compaction": + materializedView.Compaction, ok = value.(map[string]string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "compression": + materializedView.Compression, ok = value.(map[string]string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "crc_check_chance": + materializedView.CrcCheckChance, ok = value.(float64) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "dclocal_read_repair_chance": + materializedView.DcLocalReadRepairChance, ok = value.(float64) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "default_time_to_live": + materializedView.DefaultTimeToLive, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "extensions": + byteData, ok := value.(map[string][]byte) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + materializedView.Extensions = bytesMapToStringsMap(byteData) + + case "gc_grace_seconds": + materializedView.GcGraceSeconds, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "id": + materializedView.Id, ok = value.(UUID) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "include_all_columns": + materializedView.IncludeAllColumns, ok = value.(bool) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "max_index_interval": + materializedView.MaxIndexInterval, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "memtable_flush_period_in_ms": + materializedView.MemtableFlushPeriodInMs, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "min_index_interval": + materializedView.MinIndexInterval, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "read_repair": + materializedView.ReadRepair, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "read_repair_chance": + materializedView.ReadRepairChance, ok = value.(float64) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "speculative_retry": + materializedView.SpeculativeRetry, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + } + } + return nil +} + +func parseSystemSchemaViews(iter *Iter) ([]MaterializedViewMetadata, error) { + var materializedViews []MaterializedViewMetadata + s, err := iter.SliceMap() + if err != nil { + return nil, err + } + + for _, row := range s { + var materializedView MaterializedViewMetadata + err = materializedViewMetadataFromMap(row, &materializedView) + if err != nil { + return nil, err + } + + materializedViews = append(materializedViews, materializedView) + } + + return materializedViews, nil +} + func getMaterializedViewsMetadata(session *Session, keyspaceName string) ([]MaterializedViewMetadata, error) { if !session.useSystemSchema { return nil, nil } var tableName = "system_schema.views" stmt := fmt.Sprintf(` - SELECT - view_name, - base_table_id, - base_table_name, - bloom_filter_fp_chance, - caching, - comment, - compaction, - compression, - crc_check_chance, - dclocal_read_repair_chance, - default_time_to_live, - extensions, - gc_grace_seconds, - id, - include_all_columns, - max_index_interval, - memtable_flush_period_in_ms, - min_index_interval, - read_repair_chance, - speculative_retry + SELECT * FROM %s WHERE keyspace_name = ?`, tableName) var materializedViews []MaterializedViewMetadata - rows := session.control.query(stmt, keyspaceName).Scanner() - for rows.Next() { - materializedView := MaterializedViewMetadata{Keyspace: keyspaceName} - err := rows.Scan(&materializedView.Name, - &materializedView.BaseTableId, - &materializedView.baseTableName, - &materializedView.BloomFilterFpChance, - &materializedView.Caching, - &materializedView.Comment, - &materializedView.Compaction, - &materializedView.Compression, - &materializedView.CrcCheckChance, - &materializedView.DcLocalReadRepairChance, - &materializedView.DefaultTimeToLive, - &materializedView.Extensions, - &materializedView.GcGraceSeconds, - &materializedView.Id, - &materializedView.IncludeAllColumns, - &materializedView.MaxIndexInterval, - &materializedView.MemtableFlushPeriodInMs, - &materializedView.MinIndexInterval, - &materializedView.ReadRepairChance, - &materializedView.SpeculativeRetry, - ) - if err != nil { - return nil, err - } - materializedViews = append(materializedViews, materializedView) - } + iter := session.control.query(stmt, keyspaceName) - if err := rows.Err(); err != nil { + materializedViews, err := parseSystemSchemaViews(iter) + if err != nil { return nil, err }