Skip to content

Commit

Permalink
add snowflake id implementation mvp
Browse files Browse the repository at this point in the history
Signed-off-by: Vilius Okockis <[email protected]>
  • Loading branch information
DeathBorn committed Nov 27, 2024
1 parent 80e11f0 commit 558a200
Show file tree
Hide file tree
Showing 20 changed files with 639 additions and 69 deletions.
53 changes: 51 additions & 2 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type Generate struct {
// will be stored as a list within the PlanValue. New
// values will be generated based on how many were not
// supplied (NULL).

// Need to distiguish between sequence or snowflake type
Type string
Values sqltypes.PlanValue
}

Expand Down Expand Up @@ -289,6 +292,27 @@ func shouldGenerate(v sqltypes.Value) bool {
return false
}

const (
TimestampLength uint8 = 41
MachineIDLength uint8 = 10
SequenceLength uint8 = 12
MaxSequence int64 = 1<<SequenceLength - 1
MaxTimestamp int64 = 1<<TimestampLength - 1
MaxMachineID int64 = 1<<MachineIDLength - 1

machineIDMoveLength = SequenceLength
timestampMoveLength = MachineIDLength + SequenceLength
)

var (
// default starttime
SnowflakeStartTime = time.Date(2008, 11, 10, 23, 0, 0, 0, time.UTC)
)

func elapsedTime(noms int64, s time.Time) int64 {
return noms - s.UTC().UnixNano()/1e6
}

// processGenerate generates new values using a sequence if necessary.
// If no value was generated, it returns 0. Values are generated only
// for cases where none are supplied.
Expand Down Expand Up @@ -316,6 +340,7 @@ func (ins *Insert) processGenerate(vcursor VCursor, bindVars map[string]*querypb
if err != nil {
return 0, err
}
// TODO: place where to decide routing maybe for snowflake
if len(rss) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "auto sequence generation can happen through single shard only, it is getting routed to %d shards", len(rss))
}
Expand All @@ -333,6 +358,30 @@ func (ins *Insert) processGenerate(vcursor VCursor, bindVars map[string]*querypb
}

// Fill the holes where no value was supplied.
// For Snowflake
if ins.Generate.Type == vindexes.TypeSnowflake {
cur := insertID
ts := (cur >> int64(SequenceLength+MachineIDLength)) + SnowflakeStartTime.UTC().UnixNano()/1e6
sequence := cur & int64(MaxSequence)
machineID := (cur & (int64(MaxMachineID) << SequenceLength)) >> SequenceLength
for i, v := range resolved {
fmt.Println(fmt.Sprintf("Generating Snowflake, %s id %d", ins.GetTableName(), cur))

Check failure on line 368 in go/vt/vtgate/engine/insert.go

View workflow job for this annotation

GitHub Actions / Lint using golangci-lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
if shouldGenerate(v) {
bindVars[SeqVarName+strconv.Itoa(i)] = sqltypes.Int64BindVariable(cur)
// calculate next id and advance ts and sequence
totalInc := sequence + 1
ts := ts + totalInc/MaxSequence
sequence = totalInc % MaxSequence
// TODO: generate next id properly for snowflake
df := elapsedTime(ts, SnowflakeStartTime)
cur = int64((uint64(df) << uint64(timestampMoveLength)) | (uint64(machineID) << uint64(machineIDMoveLength)) | uint64(sequence))
} else {
bindVars[SeqVarName+strconv.Itoa(i)] = sqltypes.ValueBindVariable(v)
}
}
return insertID, nil
}
// For Sequence
cur := insertID
for i, v := range resolved {
if shouldGenerate(v) {
Expand Down Expand Up @@ -609,8 +658,8 @@ func (ins *Insert) processUnowned(vcursor VCursor, vindexColumnsKeys [][]sqltype
return nil
}

//InsertVarName returns a name for the bind var for this column. This method is used by the planner and engine,
//to make sure they both produce the same names
// InsertVarName returns a name for the bind var for this column. This method is used by the planner and engine,
// to make sure they both produce the same names
func InsertVarName(col sqlparser.ColIdent, rowNum int) string {
return fmt.Sprintf("_%s_%d", col.CompliantName(), rowNum)
}
Expand Down
56 changes: 56 additions & 0 deletions go/vt/vtgate/engine/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,62 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

func TestInsertUnshardedSnowflakeGenerate(t *testing.T) {
ins := NewQueryInsert(
InsertUnsharded,
&vindexes.Keyspace{
Name: "ks",
Sharded: false,
},
"dummy_insert",
)
ins.Generate = &Generate{
Keyspace: &vindexes.Keyspace{
Name: "ks2",
Sharded: false,
},
Query: "dummy_generate",
Type: "snowflake",
Values: sqltypes.PlanValue{
Values: []sqltypes.PlanValue{
{Value: sqltypes.NewInt64(1)},
{Value: sqltypes.NULL},
{Value: sqltypes.NewInt64(2)},
{Value: sqltypes.NULL},
{Value: sqltypes.NewInt64(3)},
},
},
}

vc := newDMLTestVCursor("0")
vc.results = []*sqltypes.Result{
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"nextval",
"int64",
),
"4",
),
{InsertID: 1},
}

result, err := ins.Execute(vc, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
// Fetch two sequence value.
`ResolveDestinations ks2 [] Destinations:DestinationAnyShard()`,
`ExecuteStandalone dummy_generate n: type:INT64 value:"2" ks2 0`,
// Fill those values into the insert.
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
`ExecuteMultiShard ks.0: dummy_insert {__seq0: type:INT64 value:"1" __seq1: type:INT64 value:"4" __seq2: type:INT64 value:"2" __seq3: type:INT64 value:"5" __seq4: type:INT64 value:"3"} true true`,
})

// The insert id returned by ExecuteMultiShard should be overwritten by processGenerate.
expectResult(t, "Execute", result, &sqltypes.Result{InsertID: 4})
}

func TestInsertUnsharded(t *testing.T) {
ins := NewQueryInsert(
InsertUnsharded,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func Exists(m Match, p Primitive) bool {
return Find(m, p) != nil
}

//MarshalJSON serializes the plan into a JSON representation.
// MarshalJSON serializes the plan into a JSON representation.
func (p *Plan) MarshalJSON() ([]byte, error) {
var instructions *PrimitiveDescription
if p.Instructions != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ const (
// SelectScatter is for routing a scatter query
// to all shards of a keyspace.
SelectScatter
// SelectNext is for fetching from a sequence.
// SelectNext is for fetching from a sequence or snowflake.
SelectNext
// SelectDBA is for executing a DBA statement.
SelectDBA
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/planbuilder/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ func (pb *primitiveBuilder) buildTablePrimitive(tableExpr *sqlparser.AliasedTabl
switch {
case vschemaTable.Type == vindexes.TypeSequence:
eroute = engine.NewSimpleRoute(engine.SelectNext, vschemaTable.Keyspace)
// TODO: snowflake
case vschemaTable.Type == vindexes.TypeSnowflake:
eroute = engine.NewSimpleRoute(engine.SelectNext, vschemaTable.Keyspace)
case vschemaTable.Type == vindexes.TypeReference:
eroute = engine.NewSimpleRoute(engine.SelectReference, vschemaTable.Keyspace)
case !vschemaTable.Keyspace.Sharded:
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,11 @@ func modifyForAutoinc(ins *sqlparser.Insert, eins *engine.Insert) error {
row[colNum] = sqlparser.NewArgument(engine.SeqVarName + strconv.Itoa(rowNum))
}

// TODO: Here query is generated for Snoflake
eins.Generate = &engine.Generate{
Keyspace: eins.Table.AutoIncrement.Sequence.Keyspace,
Query: fmt.Sprintf("select next :n values from %s", sqlparser.String(eins.Table.AutoIncrement.Sequence.Name)),
Type: eins.Table.AutoIncrement.Sequence.Type,
Values: autoIncValues,
}
return nil
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vtgate/planbuilder/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,10 @@ type (
)

/*
The greedy planner will plan a query by finding first finding the best route plan for every table.
Then, iteratively, it finds the cheapest join that can be produced between the remaining plans,
and removes the two inputs to this cheapest plan and instead adds the join.
As an optimization, it first only considers joining tables that have predicates defined between them
The greedy planner will plan a query by finding first finding the best route plan for every table.
Then, iteratively, it finds the cheapest join that can be produced between the remaining plans,
and removes the two inputs to this cheapest plan and instead adds the join.
As an optimization, it first only considers joining tables that have predicates defined between them
*/
func greedySolve(qg *abstract.QueryGraph, semTable *semantics.SemTable, vschema ContextVSchema) (joinTree, error) {
joinTrees, err := seedPlanList(qg, semTable, vschema)
Expand Down Expand Up @@ -604,6 +604,9 @@ func createRoutePlan(table *abstract.QueryTable, solves semantics.TableSet, vsch
switch {
case vschemaTable.Type == vindexes.TypeSequence:
plan.routeOpCode = engine.SelectNext
// TODO: Snowflake
case vschemaTable.Type == vindexes.TypeSnowflake:
plan.routeOpCode = engine.SelectNext
case vschemaTable.Type == vindexes.TypeReference:
plan.routeOpCode = engine.SelectReference
case !vschemaTable.Keyspace.Sharded:
Expand Down
12 changes: 10 additions & 2 deletions go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var TabletTypeSuffix = map[topodatapb.TabletType]string{
// The following constants represent table types.
const (
TypeSequence = "sequence"
TypeSnowflake = "snowflake"
TypeReference = "reference"
)

Expand Down Expand Up @@ -156,6 +157,7 @@ func (ks *KeyspaceSchema) MarshalJSON() ([]byte, error) {
}

// AutoIncrement contains the auto-inc information for a table.
// TODO: We reuse same field for Snowflake and Sequence tables.
type AutoIncrement struct {
Column sqlparser.ColIdent `json:"column"`
Sequence *Table `json:"sequence"`
Expand All @@ -177,7 +179,7 @@ func BuildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema) {
}

// BuildKeyspaceSchema builds the vschema portion for one keyspace.
// The build ignores sequence references because those dependencies can
// The build ignores sequence/snowflake references because those dependencies can
// go cross-keyspace.
func BuildKeyspaceSchema(input *vschemapb.Keyspace, keyspace string) (*KeyspaceSchema, error) {
if input == nil {
Expand All @@ -199,7 +201,7 @@ func BuildKeyspaceSchema(input *vschemapb.Keyspace, keyspace string) (*KeyspaceS
}

// ValidateKeyspace ensures that the keyspace vschema is valid.
// External references (like sequence) are not validated.
// External references (like sequence/snowflake) are not validated.
func ValidateKeyspace(input *vschemapb.Keyspace) error {
_, err := BuildKeyspaceSchema(input, "")
return err
Expand Down Expand Up @@ -252,6 +254,11 @@ func buildTables(ks *vschemapb.Keyspace, vschema *VSchema, ksvschema *KeyspaceSc
return fmt.Errorf("sequence table has to be in an unsharded keyspace or must be pinned: %s", tname)
}
t.Type = table.Type
case TypeSnowflake:
if keyspace.Sharded && table.Pinned == "" {
return fmt.Errorf("snowflake table has to be in an unsharded keyspace or must be pinned: %s", tname)
}
t.Type = table.Type
default:
return fmt.Errorf("unidentified table type %s", table.Type)
}
Expand Down Expand Up @@ -354,6 +361,7 @@ func resolveAutoIncrement(source *vschemapb.SrvVSchema, vschema *VSchema) {
if t == nil || table.AutoIncrement == nil {
continue
}
// TODO: Should I check for type here
seqks, seqtab, err := sqlparser.ParseTable(table.AutoIncrement.Sequence)
var seq *Table
if err == nil {
Expand Down
34 changes: 25 additions & 9 deletions go/vt/vttablet/tabletserver/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package planbuilder

import (
"fmt"
"strings"

"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -44,17 +45,32 @@ func analyzeSelect(sel *sqlparser.Select, tables map[string]*schema.Table) (plan

// Check if it's a NEXT VALUE statement.
if nextVal, ok := sel.SelectExprs[0].(*sqlparser.Nextval); ok {
if plan.Table == nil || plan.Table.Type != schema.Sequence {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%s is not a sequence", sqlparser.ToString(sel.From))
if plan.Table == nil || (plan.Table.Type != schema.Sequence && plan.Table.Type != schema.Snowflake) {
fmt.Println("plan.Table.Type", plan.Table.Type, schema.Snowflake)
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%s is not a sequence or snowflake", sqlparser.ToString(sel.From))
}
plan.PlanID = PlanNextval
v, err := sqlparser.NewPlanValue(nextVal.Expr)
if err != nil {
return nil, err

switch plan.Table.Type {
case schema.Sequence:
plan.PlanID = PlanNextval
v, err := sqlparser.NewPlanValue(nextVal.Expr)
if err != nil {
return nil, err
}
plan.NextCount = v
plan.FieldQuery = nil
plan.FullQuery = nil
case schema.Snowflake:
// should be different from sequence?
plan.PlanID = PlanNextval
v, err := sqlparser.NewPlanValue(nextVal.Expr)
if err != nil {
return nil, err
}
plan.NextCount = v
plan.FieldQuery = nil
plan.FullQuery = nil
}
plan.NextCount = v
plan.FieldQuery = nil
plan.FullQuery = nil
}
return plan, nil
}
Expand Down
50 changes: 48 additions & 2 deletions go/vt/vttablet/tabletserver/planbuilder/testdata/exec_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,52 @@
"FullQuery": "select :bv from a where 1 != 1 limit :#maxLimit"
}

# single value snowflake
"select next value from snow"
{
"PlanID": "Nextval",
"TableName": "snow",
"Permissions": [
{
"TableName": "snow",
"Role": 0
}
],
"NextCount": "1"
}

# snowflake with number
"select next 10 values from snow"
{
"PlanID": "Nextval",
"TableName": "snow",
"Permissions": [
{
"TableName": "snow",
"Role": 0
}
],
"NextCount": "10"
}

# snowflake with bindvar
"select next :a values from snow"
{
"PlanID": "Nextval",
"TableName": "snow",
"Permissions": [
{
"TableName": "snow",
"Role": 0
}
],
"NextCount": "\":a\""
}

# snowflake with bad value
"select next 12345667852342342342323423423 values from snow"
"strconv.ParseUint: parsing "12345667852342342342323423423": value out of range"

# single value sequence
"select next value from seq"
{
Expand Down Expand Up @@ -188,11 +234,11 @@

# nextval on non-sequence table
"select next value from a"
"a is not a sequence"
"a is not a sequence or snowflake"

# nextval on non-existent table
"select next value from id"
"id is not a sequence"
"id is not a sequence or snowflake"

# for update
"select eid from a for update"
Expand Down
Loading

0 comments on commit 558a200

Please sign in to comment.