Skip to content

Commit

Permalink
Add support for cassandra 4.0 table options
Browse files Browse the repository at this point in the history
In the PR implemented backward compatibility with previous versions,
and added new types support. To make metadata table support easier for
future Cassandra versions, hardcode scan from Cassandra were replaced
with new "parseSystemSchemaViews" method which is much easier to expand,
even if some fields were added in the middle of the table it wouldn`t be an issue anymore.

patch by Mykyta Oleksiienko; reviewed by Joao Reis CASSGO-13
  • Loading branch information
OleksiienkoMykyta committed Dec 11, 2024
1 parent 37030fb commit c171403
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Change Batch API to be consistent with Query() (CASSGO-7)

- Added Cassandra 4.0 table options support(CASSGO-13)

### Fixed

- Retry policy now takes into account query idempotency (CASSGO-27)
Expand Down
25 changes: 20 additions & 5 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2314,7 +2314,7 @@ func TestViewMetadata(t *testing.T) {

func TestMaterializedViewMetadata(t *testing.T) {
if flagCassVersion.Before(3, 0, 0) {
return
t.Skip("The Cassandra version is too old")
}
session := createSession(t)
defer session.Close()
Expand All @@ -2333,14 +2333,19 @@ func TestMaterializedViewMetadata(t *testing.T) {
expectedChunkLengthInKB := "16"
expectedDCLocalReadRepairChance := float64(0)
expectedSpeculativeRetry := "99p"
expectedAdditionalWritePolicy := "99p"
expectedReadRepair := "BLOCKING"
if flagCassVersion.Before(4, 0, 0) {
expectedChunkLengthInKB = "64"
expectedDCLocalReadRepairChance = 0.1
expectedSpeculativeRetry = "99PERCENTILE"
expectedReadRepair = ""
expectedAdditionalWritePolicy = ""
}
expectedView1 := MaterializedViewMetadata{
Keyspace: "gocql_test",
Name: "view_view",
AdditionalWritePolicy: expectedAdditionalWritePolicy,
baseTableName: "view_table",
BloomFilterFpChance: 0.01,
Caching: map[string]string{"keys": "ALL", "rows_per_partition": "NONE"},
Expand All @@ -2352,12 +2357,17 @@ func TestMaterializedViewMetadata(t *testing.T) {
DefaultTimeToLive: 0,
Extensions: map[string]string{},
GcGraceSeconds: 864000,
IncludeAllColumns: false, MaxIndexInterval: 2048, MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0,
SpeculativeRetry: expectedSpeculativeRetry,
IncludeAllColumns: false, MaxIndexInterval: 2048,
MemtableFlushPeriodInMs: 0,
MinIndexInterval: 128,
ReadRepair: expectedReadRepair,
ReadRepairChance: 0,
SpeculativeRetry: expectedSpeculativeRetry,
}
expectedView2 := MaterializedViewMetadata{
Keyspace: "gocql_test",
Name: "view_view2",
AdditionalWritePolicy: expectedAdditionalWritePolicy,
baseTableName: "view_table2",
BloomFilterFpChance: 0.01,
Caching: map[string]string{"keys": "ALL", "rows_per_partition": "NONE"},
Expand All @@ -2369,8 +2379,13 @@ func TestMaterializedViewMetadata(t *testing.T) {
DefaultTimeToLive: 0,
Extensions: map[string]string{},
GcGraceSeconds: 864000,
IncludeAllColumns: false, MaxIndexInterval: 2048, MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0,
SpeculativeRetry: expectedSpeculativeRetry,
IncludeAllColumns: false,
MaxIndexInterval: 2048,
MemtableFlushPeriodInMs: 0,
MinIndexInterval: 128,
ReadRepair: expectedReadRepair,
ReadRepairChance: 0,
SpeculativeRetry: expectedSpeculativeRetry,
}

expectedView1.BaseTableId = materializedViews[0].BaseTableId
Expand Down
238 changes: 186 additions & 52 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type ViewMetadata struct {
type MaterializedViewMetadata struct {
Keyspace string
Name string
AdditionalWritePolicy string
BaseTableId UUID
BaseTable *TableMetadata
BloomFilterFpChance float64
Expand All @@ -139,7 +140,8 @@ type MaterializedViewMetadata struct {
MaxIndexInterval int
MemtableFlushPeriodInMs int
MinIndexInterval int
ReadRepairChance float64
ReadRepair string // Only present in Cassandra 4.0+
ReadRepairChance float64 // Note: Cassandra 4.0 removed ReadRepairChance and added ReadRepair instead
SpeculativeRetry string

baseTableName string
Expand Down Expand Up @@ -999,69 +1001,201 @@ func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, er
return views, nil
}

func bytesMapToStringsMap(byteData map[string][]byte) map[string]string {
extensions := make(map[string]string, len(byteData))
for key, rowByte := range byteData {
extensions[key] = string(rowByte)
}

return extensions
}

func materializedViewMetadataFromMap(currentObject map[string]interface{}, materializedView *MaterializedViewMetadata) error {
const errorMessage = "gocql.materializedViewMetadataFromMap failed to read column %s"
var ok bool
for key, value := range currentObject {
switch key {
case "keyspace_name":
materializedView.Keyspace, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "view_name":
materializedView.Name, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "additional_write_policy":
materializedView.AdditionalWritePolicy, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "base_table_id":
materializedView.BaseTableId, ok = value.(UUID)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "base_table_name":
materializedView.baseTableName, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "bloom_filter_fp_chance":
materializedView.BloomFilterFpChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "caching":
materializedView.Caching, ok = value.(map[string]string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "comment":
materializedView.Comment, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "compaction":
materializedView.Compaction, ok = value.(map[string]string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "compression":
materializedView.Compression, ok = value.(map[string]string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "crc_check_chance":
materializedView.CrcCheckChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "dclocal_read_repair_chance":
materializedView.DcLocalReadRepairChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "default_time_to_live":
materializedView.DefaultTimeToLive, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "extensions":
byteData, ok := value.(map[string][]byte)
if !ok {
return fmt.Errorf(errorMessage, key)
}

materializedView.Extensions = bytesMapToStringsMap(byteData)

case "gc_grace_seconds":
materializedView.GcGraceSeconds, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "id":
materializedView.Id, ok = value.(UUID)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "include_all_columns":
materializedView.IncludeAllColumns, ok = value.(bool)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "max_index_interval":
materializedView.MaxIndexInterval, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "memtable_flush_period_in_ms":
materializedView.MemtableFlushPeriodInMs, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "min_index_interval":
materializedView.MinIndexInterval, ok = value.(int)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "read_repair":
materializedView.ReadRepair, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "read_repair_chance":
materializedView.ReadRepairChance, ok = value.(float64)
if !ok {
return fmt.Errorf(errorMessage, key)
}

case "speculative_retry":
materializedView.SpeculativeRetry, ok = value.(string)
if !ok {
return fmt.Errorf(errorMessage, key)
}

}
}
return nil
}

func parseSystemSchemaViews(iter *Iter) ([]MaterializedViewMetadata, error) {
var materializedViews []MaterializedViewMetadata
s, err := iter.SliceMap()
if err != nil {
return nil, err
}

for _, row := range s {
var materializedView MaterializedViewMetadata
err = materializedViewMetadataFromMap(row, &materializedView)
if err != nil {
return nil, err
}

materializedViews = append(materializedViews, materializedView)
}

return materializedViews, nil
}

func getMaterializedViewsMetadata(session *Session, keyspaceName string) ([]MaterializedViewMetadata, error) {
if !session.useSystemSchema {
return nil, nil
}
var tableName = "system_schema.views"
stmt := fmt.Sprintf(`
SELECT
view_name,
base_table_id,
base_table_name,
bloom_filter_fp_chance,
caching,
comment,
compaction,
compression,
crc_check_chance,
dclocal_read_repair_chance,
default_time_to_live,
extensions,
gc_grace_seconds,
id,
include_all_columns,
max_index_interval,
memtable_flush_period_in_ms,
min_index_interval,
read_repair_chance,
speculative_retry
SELECT *
FROM %s
WHERE keyspace_name = ?`, tableName)

var materializedViews []MaterializedViewMetadata

rows := session.control.query(stmt, keyspaceName).Scanner()
for rows.Next() {
materializedView := MaterializedViewMetadata{Keyspace: keyspaceName}
err := rows.Scan(&materializedView.Name,
&materializedView.BaseTableId,
&materializedView.baseTableName,
&materializedView.BloomFilterFpChance,
&materializedView.Caching,
&materializedView.Comment,
&materializedView.Compaction,
&materializedView.Compression,
&materializedView.CrcCheckChance,
&materializedView.DcLocalReadRepairChance,
&materializedView.DefaultTimeToLive,
&materializedView.Extensions,
&materializedView.GcGraceSeconds,
&materializedView.Id,
&materializedView.IncludeAllColumns,
&materializedView.MaxIndexInterval,
&materializedView.MemtableFlushPeriodInMs,
&materializedView.MinIndexInterval,
&materializedView.ReadRepairChance,
&materializedView.SpeculativeRetry,
)
if err != nil {
return nil, err
}
materializedViews = append(materializedViews, materializedView)
}
iter := session.control.query(stmt, keyspaceName)

if err := rows.Err(); err != nil {
materializedViews, err := parseSystemSchemaViews(iter)
if err != nil {
return nil, err
}

Expand Down

0 comments on commit c171403

Please sign in to comment.