Skip to content

Commit

Permalink
Multi updates
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Sep 23, 2024
1 parent d8c305a commit 73a2d09
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 149 deletions.
2 changes: 1 addition & 1 deletion platform/common/core/generic/vault/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (i *Inspector) SetStateMetadata(driver.Namespace, driver.PKey, driver.Metad
panic("programming error: the rwset inspector is read-only")
}

func (i *Inspector) SetStateMetadatas(ns driver.Namespace, kvs map[driver.PKey]driver.Metadata, block driver.BlockNum, txnum driver.TxNum) map[driver.PKey]error {
func (i *Inspector) SetStateMetadatas(ns driver.Namespace, kvs map[driver.PKey]driver.VersionedMetadataValue) map[driver.PKey]error {
panic("programming error: the rwset inspector is read-only")
}

Expand Down
12 changes: 12 additions & 0 deletions platform/common/core/generic/vault/txidstore/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type cache[V driver.ValidationCode] interface {
type txidStore[V driver.ValidationCode] interface {
Get(txID driver.TxID) (V, string, error)
Set(txID driver.TxID, code V, message string) error
SetMultiple(txs []driver.ByNum[V]) error
Iterator(pos interface{}) (collections.Iterator[*driver.ByNum[V]], error)
}

Expand Down Expand Up @@ -88,6 +89,17 @@ func (s *CachedStore[V]) Set(txID string, code V, message string) error {
return nil
}

func (s *CachedStore[V]) SetMultiple(txs []driver.ByNum[V]) error {
s.logger.Debugf("Set values for %d txs into backed and cache", len(txs))
if err := s.backed.SetMultiple(txs); err != nil {
return err
}
for _, tx := range txs {
s.cache.Add(tx.TxID, &Entry[V]{ValidationCode: tx.Code, ValidationMessage: tx.Message})
}
return nil
}

func (s *CachedStore[V]) Iterator(pos interface{}) (collections.Iterator[*driver.ByNum[V]], error) {
return s.backed.Iterator(pos)
}
145 changes: 85 additions & 60 deletions platform/common/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type TXIDStoreReader[V driver.ValidationCode] interface {
type TXIDStore[V driver.ValidationCode] interface {
TXIDStoreReader[V]
Set(txID driver.TxID, code V, message string) error
SetMultiple(txs []driver.ByNum[V]) error
Invalidate(txID driver.TxID)
}

Expand Down Expand Up @@ -289,94 +290,110 @@ func (db *Vault[V]) commitRWs(inputs ...commitInput) error {
return errors.Wrapf(err, "begin update in store for txid %v failed", inputs)
}

for _, input := range inputs {
span := trace.SpanFromContext(input.ctx)
if _, err := db.setStatuses(inputs, db.vcProvider.Busy()); err != nil {
return err
}

span.AddEvent("set_tx_busy")
if err := db.txIDStore.Set(input.txID, db.vcProvider.Busy(), ""); err != nil {
if !errors.HasCause(err, UniqueKeyViolation) {
return err
db.logger.Debugf("parse writes")
writes := make(map[driver.Namespace]map[driver.PKey]VersionedValue)
for _, input := range inputs {
for ns, ws := range input.rws.Writes {
vals := versionedValues(ws, input.block, input.indexInBloc)
if nsWrites, ok := writes[ns]; !ok {
writes[ns] = vals
} else {
collections.CopyMap(nsWrites, vals)
}
}
}

db.logger.Debugf("parse writes [%s]", input.txID)
span.AddEvent("store_writes")
if discarded, err := db.storeWrites(input.ctx, input.rws.Writes, input.block, input.indexInBloc); err != nil {
return errors.Wrapf(err, "failed storing writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing writes as duplicates. Skipping...")
if errs := db.storeAllWrites(writes); len(errs) == 0 {
db.logger.Debugf("Successfully stored writes for %d namespaces", len(writes))
} else if discarded, err := db.discard("", 0, 0, errs); err != nil {
return errors.Wrapf(err, "failed storing writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing writes as duplicates. Skipping...")
for _, input := range inputs {
db.txIDStore.Invalidate(input.txID)
return nil
}
return nil
}

db.logger.Debugf("parse meta writes [%s]", input.txID)
span.AddEvent("store_meta_writes")
if discarded, err := db.storeMetaWrites(input.ctx, input.rws.MetaWrites, input.block, input.indexInBloc); err != nil {
return errors.Wrapf(err, "failed storing meta writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing meta writes as duplicates. Skipping...")
db.logger.Debugf("parse meta writes")
metaWrites := make(map[driver.Namespace]map[driver.PKey]driver.VersionedMetadataValue)
for _, input := range inputs {
for ns, ws := range input.rws.MetaWrites {
vals := versionedMetaValues(ws, input.block, input.indexInBloc)
if nsWrites, ok := metaWrites[ns]; !ok {
metaWrites[ns] = vals
} else {
collections.CopyMap(nsWrites, vals)
}
}
}
if errs := db.storeAllMetaWrites(metaWrites); len(errs) == 0 {
db.logger.Debugf("Successfully stored meta writes for %d namespaces", len(metaWrites))
} else if discarded, err := db.discard("", 0, 0, errs); err != nil {
return errors.Wrapf(err, "failed storing meta writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing meta writes as duplicates. Skipping...")
for _, input := range inputs {
db.txIDStore.Invalidate(input.txID)
return nil
}
return nil
}

db.logger.Debugf("set state to valid [%s]", input.txID)
span.AddEvent("set_tx_valid")
if discarded, err := db.setTxValid(input.txID); err != nil {
return errors.Wrapf(err, "failed setting tx state to valid")
} else if discarded {
db.logger.Infof("Discarded changes while setting tx state to valid as duplicates. Skipping...")
return nil
if discarded, err := db.setStatuses(inputs, db.vcProvider.Valid()); err != nil {
if err1 := db.store.Discard(); err1 != nil {
db.logger.Errorf("got error %s; discarding caused %s", err.Error(), err1.Error())
}

for _, input := range inputs {
db.txIDStore.Invalidate(input.txID)
}
return errors.Wrapf(err, "failed setting tx state to valid")
} else if discarded {
if err1 := db.store.Discard(); err1 != nil {
db.logger.Errorf("got unique key violation; discarding caused %s", err1.Error())
}
db.logger.Infof("Discarded changes while setting tx state to valid as duplicates. Skipping...")
return nil
}

for _, input := range inputs {
trace.SpanFromContext(input.ctx).AddEvent("commit_update")
}
if err := db.store.Commit(); err != nil {
return errors.Wrapf(err, "committing tx for txid in store [%v] failed", inputs)
}

return nil
}

func (db *Vault[V]) setTxValid(txID driver.TxID) (bool, error) {
err := db.txIDStore.Set(txID, db.vcProvider.Valid(), "")
if err == nil {
return false, nil
func (db *Vault[V]) setStatuses(inputs []commitInput, v V) (bool, error) {
txs := make([]driver.ByNum[V], len(inputs))
for i, input := range inputs {
txs[i] = driver.ByNum[V]{TxID: input.txID, Code: v}
}

if err1 := db.store.Discard(); err1 != nil {
db.logger.Errorf("got error %s; discarding caused %s", err.Error(), err1.Error())
}

if !errors.HasCause(err, UniqueKeyViolation) {
db.txIDStore.Invalidate(txID)
return true, errors.Wrapf(err, "error setting tx valid")
if err := db.txIDStore.SetMultiple(txs); err == nil {
return false, nil
} else if !errors.HasCause(err, UniqueKeyViolation) {
return true, err
} else {
return true, nil
}
return true, nil
}

func (db *Vault[V]) storeMetaWrites(ctx context.Context, writes NamespaceKeyedMetaWrites, block driver.BlockNum, indexInBloc driver.TxNum) (bool, error) {
span := trace.SpanFromContext(ctx)
for ns, keyMap := range writes {
span.AddEvent("set_tx_metadata_state")
if errs := db.store.SetStateMetadatas(ns, keyMap, block, indexInBloc); len(errs) > 0 {
return db.discard(ns, block, indexInBloc, errs)
}
func (db *Vault[V]) storeAllWrites(writes map[driver.Namespace]map[driver.PKey]VersionedValue) map[driver.PKey]error {
errs := make(map[driver.PKey]error)
for ns, vals := range writes {
collections.CopyMap(errs, db.store.SetStates(ns, vals))
}
return false, nil
return errs
}

func (db *Vault[V]) storeWrites(ctx context.Context, writes Writes, block driver.BlockNum, indexInBloc driver.TxNum) (bool, error) {
span := trace.SpanFromContext(ctx)
for ns, keyMap := range writes {
span.AddEvent("set_tx_states")
if errs := db.store.SetStates(ns, versionedValues(keyMap, block, indexInBloc)); len(errs) > 0 {
return db.discard(ns, block, indexInBloc, errs)
}
func (db *Vault[V]) storeAllMetaWrites(metaWrites map[driver.Namespace]map[driver.PKey]driver.VersionedMetadataValue) map[driver.PKey]error {
errs := make(map[driver.PKey]error)
for ns, vals := range metaWrites {
collections.CopyMap(errs, db.store.SetStateMetadatas(ns, vals))
}
return false, nil
return errs
}

func versionedValues(keyMap NamespaceWrites, block driver.BlockNum, indexInBloc driver.TxNum) map[driver.PKey]VersionedValue {
Expand All @@ -387,6 +404,14 @@ func versionedValues(keyMap NamespaceWrites, block driver.BlockNum, indexInBloc
return vals
}

func versionedMetaValues(keyMap KeyedMetaWrites, block driver.BlockNum, indexInBloc driver.TxNum) map[driver.PKey]driver.VersionedMetadataValue {
vals := make(map[driver.PKey]driver.VersionedMetadataValue, len(keyMap))
for pkey, val := range keyMap {
vals[pkey] = driver.VersionedMetadataValue{Metadata: val, Block: block, TxNum: indexInBloc}
}
return vals
}

func (db *Vault[V]) discard(ns driver.Namespace, block driver.BlockNum, indexInBloc driver.TxNum, errs map[driver.PKey]error) (bool, error) {
if err1 := db.store.Discard(); err1 != nil {
db.logger.Errorf("got error %v; discarding caused %s", errors2.Join(collections.Values(errs)...), err1.Error())
Expand Down
6 changes: 6 additions & 0 deletions platform/common/driver/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ type VersionedRead struct {
TxNum TxNum
}

type VersionedMetadataValue struct {
Block BlockNum
TxNum TxNum
Metadata Metadata
}

type VersionedResultsIterator = collections.Iterator[*VersionedRead]

type QueryExecutor interface {
Expand Down
1 change: 1 addition & 0 deletions platform/fabric/driver/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ type TXIDStore interface {
Iterator(pos interface{}) (TxIDIterator, error)
Get(txid string) (ValidationCode, string, error)
Set(txID string, code ValidationCode, message string) error
SetMultiple(txs []driver.ByNum[ValidationCode]) error
}
4 changes: 2 additions & 2 deletions platform/view/services/db/driver/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ func (db *DB) SetStateMetadata(namespace driver2.Namespace, key driver2.PKey, me
return nil
}

func (db *DB) SetStateMetadatas(ns driver2.Namespace, kvs map[driver2.PKey]driver2.Metadata, block driver2.BlockNum, txnum driver2.TxNum) map[driver2.PKey]error {
func (db *DB) SetStateMetadatas(ns driver2.Namespace, kvs map[driver2.PKey]driver2.VersionedMetadataValue) map[driver2.PKey]error {
errs := make(map[driver2.PKey]error)
for pkey, value := range kvs {
if err := db.SetStateMetadata(ns, pkey, value, block, txnum); err != nil {
if err := db.SetStateMetadata(ns, pkey, value.Metadata, value.Block, value.TxNum); err != nil {
errs[pkey] = err
}
}
Expand Down
4 changes: 3 additions & 1 deletion platform/view/services/db/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type VersionedValue struct {
TxNum driver.TxNum
}

type VersionedMetadataValue = driver.VersionedMetadataValue

type UnversionedRead struct {
Key driver.PKey
Raw driver.RawValue
Expand Down Expand Up @@ -90,7 +92,7 @@ type VersionedPersistence interface {
// SetStateMetadata sets the given metadata for the given namespace, key, and version
SetStateMetadata(namespace driver.Namespace, key driver.PKey, metadata driver.Metadata, block driver.BlockNum, txnum driver.TxNum) error
// SetStateMetadatas sets the given metadata for the given namespace, keys, and version
SetStateMetadatas(ns driver.Namespace, kvs map[driver.PKey]driver.Metadata, block driver.BlockNum, txnum driver.TxNum) map[driver.PKey]error
SetStateMetadatas(ns driver.Namespace, kvs map[driver.PKey]driver.VersionedMetadataValue) map[driver.PKey]error
}

type WriteTransaction interface {
Expand Down
4 changes: 2 additions & 2 deletions platform/view/services/db/driver/notifier/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ func (db *VersionedPersistenceNotifier[P]) SetStateMetadata(namespace driver2.Na
return db.Persistence.SetStateMetadata(namespace, key, metadata, block, txnum)
}

func (db *VersionedPersistenceNotifier[P]) SetStateMetadatas(ns driver2.Namespace, kvs map[driver2.PKey]driver2.Metadata, block driver2.BlockNum, txnum driver2.TxNum) map[driver2.PKey]error {
return db.Persistence.SetStateMetadatas(ns, kvs, block, txnum)
func (db *VersionedPersistenceNotifier[P]) SetStateMetadatas(ns driver2.Namespace, kvs map[driver2.PKey]driver2.VersionedMetadataValue) map[driver2.PKey]error {
return db.Persistence.SetStateMetadatas(ns, kvs)
}

func (db *VersionedPersistenceNotifier[P]) GetStateRangeScanIterator(namespace driver2.Namespace, startKey, endKey driver2.PKey) (driver.VersionedResultsIterator, error) {
Expand Down
20 changes: 17 additions & 3 deletions platform/view/services/db/driver/sql/common/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type readScanner[V any] interface {
ReadValue(scannable) (V, error)
}

type valueScanner[V any] interface {
type ValueScanner[V any] interface {
readScanner[V]
// WriteValue writes the values of the V struct in the order given by the Columns method
WriteValue(V) []any
Expand All @@ -47,12 +47,12 @@ type BasePersistence[V any, R any] struct {
table string

readScanner readScanner[R]
ValueScanner valueScanner[V]
ValueScanner ValueScanner[V]
errorWrapper driver.SQLErrorWrapper
ci Interpreter
}

func NewBasePersistence[V any, R any](writeDB *sql.DB, readDB *sql.DB, table string, readScanner readScanner[R], valueScanner valueScanner[V], errorWrapper driver.SQLErrorWrapper, ci Interpreter, newTransaction func() (*sql.Tx, error)) *BasePersistence[V, R] {
func NewBasePersistence[V any, R any](writeDB *sql.DB, readDB *sql.DB, table string, readScanner readScanner[R], valueScanner ValueScanner[V], errorWrapper driver.SQLErrorWrapper, ci Interpreter, newTransaction func() (*sql.Tx, error)) *BasePersistence[V, R] {
return &BasePersistence[V, R]{
BaseDB: common.NewBaseDB[*sql.Tx](func() (*sql.Tx, error) { return newTransaction() }),
readDB: readDB,
Expand Down Expand Up @@ -207,6 +207,20 @@ func (db *BasePersistence[V, R]) SetStateWithTx(tx *sql.Tx, ns driver2.Namespace
val = append([]byte(nil), val...)
values[valIndex] = val

return db.UpsertStateWithTx(tx, ns, pkey, keys, values)
}

func (db *BasePersistence[V, R]) UpsertStates(ns driver2.Namespace, valueKeys []string, vals map[driver2.PKey][]any) map[driver2.PKey]error {
errs := make(map[driver2.PKey]error)
for pkey, val := range vals {
if err := db.UpsertStateWithTx(db.Txn, ns, pkey, valueKeys, val); err != nil {
errs[pkey] = err
}
}
return errs
}

func (db *BasePersistence[V, R]) UpsertStateWithTx(tx *sql.Tx, ns driver2.Namespace, pkey driver2.PKey, keys []string, values []any) error {
// Portable upsert
exists, err := db.exists(tx, ns, pkey)
if err != nil {
Expand Down
Loading

0 comments on commit 73a2d09

Please sign in to comment.