Skip to content

Commit

Permalink
Discovery: opaque timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
reinkrul committed Dec 7, 2023
1 parent b2292b6 commit a8bed89
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 66 deletions.
45 changes: 43 additions & 2 deletions discovery/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,56 @@ package discovery
import (
"errors"
"github.com/nuts-foundation/go-did/vc"
"math"
"strconv"
"strings"
)

// Timestamp is value that references a point in the list.
// Tag is value that references a point in the list.
// It is used by clients to request new entries since their last query.
// It is opaque for clients: they should not try to interpret it.
// The server who issued the tag can interpret it as Lamport timestamp.
type Tag string

// Timestamp decodes the Tag into a Timestamp, which is a monotonically increasing integer value (Lamport timestamp).
// Tags should only be decoded by the server who issued it, so the server should provide the stored tag prefix.
// The tag prefix is a random value that is generated when the service is created.
// It is not a secret; it only makes sure clients receive the complete presentation list when they switch servers for a specific Discovery Service:
// servers return the complete list when the client passes a timestamp the server can't decode.
func (t Tag) Timestamp(tagPrefix string) *Timestamp {
trimmed := strings.TrimPrefix(string(t), tagPrefix)
if len(trimmed) == len(string(t)) {
// Invalid tag prefix
return nil
}
result, err := strconv.ParseUint(trimmed, 10, 64)
if err != nil {
// Not a number
return nil
}
if result < 0 || result > math.MaxUint64 {
// Invalid uint64
return nil
}
lamport := Timestamp(result)
return &lamport
}

// Timestamp is the interpreted Tag.
// It's implemented as lamport timestamp (https://en.wikipedia.org/wiki/Lamport_timestamp);
// it is incremented when a new entry is added to the list.
// Pass 0 to start at the beginning of the list.
type Timestamp uint64

// Tag returns the Timestamp as Tag.
func (l Timestamp) Tag(serviceSeed string) Tag {
return Tag(serviceSeed + strconv.FormatUint(uint64(l), 10))
}

func (l Timestamp) Increment() Timestamp {
return l + 1
}

// ErrServiceNotFound is returned when a service (ID) is not found in the discovery service.
var ErrServiceNotFound = errors.New("discovery service not found")

Expand All @@ -43,7 +84,7 @@ type Server interface {
// If the presentation is not valid or it does not conform to the Service ServiceDefinition, it returns an error.
Add(serviceID string, presentation vc.VerifiablePresentation) error
// Get retrieves the presentations for the given service, starting at the given timestamp.
Get(serviceID string, startAt Timestamp) ([]vc.VerifiablePresentation, *Timestamp, error)
Get(serviceID string, startAt *Tag) ([]vc.VerifiablePresentation, *Tag, error)
}

// Client defines the API for Discovery Clients.
Expand Down
4 changes: 2 additions & 2 deletions discovery/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions discovery/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (m *Module) Configure(_ core.ServerConfig) error {

func (m *Module) Start() error {
var err error
m.store, err = newSQLStore(m.storageInstance.GetSQLDatabase(), m.services)
m.store, err = newSQLStore(m.storageInstance.GetSQLDatabase(), m.services, m.serverDefinitions)
if err != nil {
return err
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (m *Module) validateRetraction(serviceID string, presentation vc.Verifiable
return nil
}

func (m *Module) Get(serviceID string, startAt Timestamp) ([]vc.VerifiablePresentation, *Timestamp, error) {
func (m *Module) Get(serviceID string, startAt *Tag) ([]vc.VerifiablePresentation, *Tag, error) {
if _, exists := m.services[serviceID]; !exists {
return nil, nil, ErrServiceNotFound
}
Expand Down
26 changes: 17 additions & 9 deletions discovery/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ func Test_Module_Add(t *testing.T) {
err := m.Add(testServiceID, vpAlice)
require.EqualError(t, err, "presentation verification failed: failed")

_, timestamp, err := m.Get(testServiceID, 0)
_, tag, err := m.Get(testServiceID, nil)
require.NoError(t, err)
assert.Equal(t, Timestamp(0), *timestamp)
expectedTag := tagForTimestamp(t, m.store, testServiceID, 0)
assert.Equal(t, expectedTag, *tag)
})
t.Run("already exists", func(t *testing.T) {
m, presentationVerifier := setupModule(t, storageEngine)
Expand Down Expand Up @@ -114,9 +115,9 @@ func Test_Module_Add(t *testing.T) {
err := m.Add(testServiceID, vpAlice)
require.NoError(t, err)

_, timestamp, err := m.Get(testServiceID, 0)
_, tag, err := m.Get(testServiceID, nil)
require.NoError(t, err)
assert.Equal(t, Timestamp(1), *timestamp)
assert.Equal(t, "1", string(*tag)[tagPrefixLength:])
})
t.Run("valid longer than its credentials", func(t *testing.T) {
m, _ := setupModule(t, storageEngine)
Expand All @@ -136,8 +137,8 @@ func Test_Module_Add(t *testing.T) {
err := m.Add(testServiceID, otherVP)
require.ErrorContains(t, err, "presentation does not fulfill Presentation ServiceDefinition")

_, timestamp, _ := m.Get(testServiceID, 0)
assert.Equal(t, Timestamp(0), *timestamp)
_, tag, _ := m.Get(testServiceID, nil)
assert.Equal(t, "0", string(*tag)[tagPrefixLength:])
})
})
t.Run("retraction", func(t *testing.T) {
Expand Down Expand Up @@ -193,14 +194,21 @@ func Test_Module_Get(t *testing.T) {
t.Run("ok", func(t *testing.T) {
m, _ := setupModule(t, storageEngine)
require.NoError(t, m.store.add(testServiceID, vpAlice, nil))
presentations, timestamp, err := m.Get(testServiceID, 0)
presentations, tag, err := m.Get(testServiceID, nil)
assert.NoError(t, err)
assert.Equal(t, []vc.VerifiablePresentation{vpAlice}, presentations)
assert.Equal(t, Timestamp(1), *timestamp)
assert.Equal(t, "1", string(*tag)[tagPrefixLength:])
})
t.Run("ok - retrieve delta", func(t *testing.T) {
m, _ := setupModule(t, storageEngine)
require.NoError(t, m.store.add(testServiceID, vpAlice, nil))
presentations, _, err := m.Get(testServiceID, nil)
require.NoError(t, err)
require.Len(t, presentations, 1)
})
t.Run("service unknown", func(t *testing.T) {
m, _ := setupModule(t, storageEngine)
_, _, err := m.Get("unknown", 0)
_, _, err := m.Get("unknown", nil)
assert.ErrorIs(t, err, ErrServiceNotFound)
})
}
Expand Down
116 changes: 83 additions & 33 deletions discovery/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@ import (
"github.com/google/uuid"
"github.com/nuts-foundation/go-did/vc"
"github.com/nuts-foundation/nuts-node/discovery/log"
credential "github.com/nuts-foundation/nuts-node/vcr/credential"
"github.com/nuts-foundation/nuts-node/vcr/credential"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/schema"
"math/rand"
"strconv"
"strings"
"sync"
"time"
)

const tagPrefixLength = 5

type serviceRecord struct {
ID string `gorm:"primaryKey"`
LamportTimestamp uint64
ID string `gorm:"primaryKey"`
LastTag Tag
TagPrefix string
}

func (s serviceRecord) TableName() string {
Expand Down Expand Up @@ -103,12 +107,16 @@ type sqlStore struct {
writeLock sync.Mutex
}

func newSQLStore(db *gorm.DB, definitions map[string]ServiceDefinition) (*sqlStore, error) {
// Creates entries in the discovery service table with initial timestamp, if they don't exist yet
for _, definition := range definitions {
func newSQLStore(db *gorm.DB, clientDefinitions map[string]ServiceDefinition, serverDefinitions map[string]ServiceDefinition) (*sqlStore, error) {
// Creates entries in the discovery service table, if they don't exist yet
for _, definition := range clientDefinitions {
currentList := serviceRecord{
ID: definition.ID,
}
// If the node is server for this discovery service, make sure the timestamp prefix is set.
if _, isServer := serverDefinitions[definition.ID]; isServer {
currentList.TagPrefix = generatePrefix()
}
if err := db.FirstOrCreate(&currentList, "id = ?", definition.ID).Error; err != nil {
return nil, err
}
Expand All @@ -120,10 +128,10 @@ func newSQLStore(db *gorm.DB, definitions map[string]ServiceDefinition) (*sqlSto
}

// Add adds a presentation to the list of presentations.
// Timestamp should be passed if the presentation was received from a remote Discovery Server, then it is stored alongside the presentation.
// Tag should be passed if the presentation was received from a remote Discovery Server, then it is stored alongside the presentation.
// If the local node is the Discovery Server and thus is responsible for the timestamping,
// nil should be passed to let the store determine the right value.
func (s *sqlStore) add(serviceID string, presentation vc.VerifiablePresentation, timestamp *Timestamp) error {
func (s *sqlStore) add(serviceID string, presentation vc.VerifiablePresentation, tag *Tag) error {
credentialSubjectID, err := credential.PresentationSigner(presentation)
if err != nil {
return err
Expand All @@ -142,7 +150,7 @@ func (s *sqlStore) add(serviceID string, presentation vc.VerifiablePresentation,
return err
}
return s.db.Transaction(func(tx *gorm.DB) error {
newTimestamp, err := s.updateTimestamp(tx, serviceID, timestamp)
newTimestamp, err := s.updateTag(tx, serviceID, tag)
if err != nil {
return err
}
Expand All @@ -166,7 +174,7 @@ func (s *sqlStore) add(serviceID string, presentation vc.VerifiablePresentation,
// - presentationRecord
// - presentationRecord.Credentials with credentialRecords of the credentials in the presentation
// - presentationRecord.Credentials.Properties of the credentialSubject properties of the credential (for s
func createPresentationRecord(serviceID string, timestamp Timestamp, presentation vc.VerifiablePresentation) (*presentationRecord, error) {
func createPresentationRecord(serviceID string, timestamp *Timestamp, presentation vc.VerifiablePresentation) (*presentationRecord, error) {
credentialSubjectID, err := credential.PresentationSigner(presentation)
if err != nil {
return nil, err
Expand All @@ -175,12 +183,14 @@ func createPresentationRecord(serviceID string, timestamp Timestamp, presentatio
newPresentation := presentationRecord{
ID: uuid.NewString(),
ServiceID: serviceID,
LamportTimestamp: uint64(timestamp),
CredentialSubjectID: credentialSubjectID.String(),
PresentationID: presentation.ID.String(),
PresentationRaw: presentation.Raw(),
PresentationExpiration: presentation.JWT().Expiration().Unix(),
}
if timestamp != nil {
newPresentation.LamportTimestamp = uint64(*timestamp)
}

for _, currCred := range presentation.VerifiableCredential {
var credentialType *string
Expand Down Expand Up @@ -221,26 +231,40 @@ func createPresentationRecord(serviceID string, timestamp Timestamp, presentatio
return &newPresentation, nil
}

// get returns all presentations, registered on the given service, starting after the given timestamp.
// It also returns the latest timestamp of the returned presentations.
// This timestamp can then be used next time to only retrieve presentations that were added after that timestamp.
func (s *sqlStore) get(serviceID string, startAt Timestamp) ([]vc.VerifiablePresentation, *Timestamp, error) {
// get returns all presentations, registered on the given service, starting after the given tag.
// It also returns the latest tag of the returned presentations.
// This tag can then be used next time to only retrieve presentations that were added after that tag.
func (s *sqlStore) get(serviceID string, tag *Tag) ([]vc.VerifiablePresentation, *Tag, error) {
var service serviceRecord
if err := s.db.Find(&service, "id = ?", serviceID).Error; err != nil {
return nil, nil, fmt.Errorf("query service '%s': %w", serviceID, err)
}
var startAfter uint64
if tag != nil {
// Decode tag
lamportTimestamp := tag.Timestamp(service.TagPrefix)
if lamportTimestamp != nil {
startAfter = uint64(*lamportTimestamp)
}
}

var rows []presentationRecord
err := s.db.Order("lamport_timestamp ASC").Find(&rows, "service_id = ? AND lamport_timestamp > ?", serviceID, int(startAt)).Error
err := s.db.Order("lamport_timestamp ASC").Find(&rows, "service_id = ? AND lamport_timestamp > ?", serviceID, startAfter).Error
if err != nil {
return nil, nil, fmt.Errorf("query service '%s': %w", serviceID, err)
}
timestamp := startAt
highestLamportClock := startAfter
presentations := make([]vc.VerifiablePresentation, 0, len(rows))
for _, row := range rows {
presentation, err := vc.ParseVerifiablePresentation(row.PresentationRaw)
if err != nil {
return nil, nil, fmt.Errorf("parse presentation '%s' of service '%s': %w", row.PresentationID, serviceID, err)
}
presentations = append(presentations, *presentation)
timestamp = Timestamp(row.LamportTimestamp)
highestLamportClock = row.LamportTimestamp
}
return presentations, &timestamp, nil
newTag := Timestamp(highestLamportClock).Tag(service.TagPrefix)
return presentations, &newTag, nil
}

// search searches for presentations, registered on the given service, matching the given query.
Expand Down Expand Up @@ -303,29 +327,44 @@ func (s *sqlStore) search(serviceID string, query map[string]string) ([]vc.Verif
return results, nil
}

// updateTimestamp updates the timestamp of the given service.
// Clients should pass the timestamp they received from the server (which simply sets it).
// Servers should pass nil (since they "own" the timestamp), which causes it to be incremented.
func (s *sqlStore) updateTimestamp(tx *gorm.DB, serviceID string, newTimestamp *Timestamp) (Timestamp, error) {
var result serviceRecord
// updateTag updates the tag of the given service.
// Clients should pass the tag they received from the server (which simply sets it).
// Servers should pass nil (since they "own" the tag), which causes it to be incremented.
// It returns
func (s *sqlStore) updateTag(tx *gorm.DB, serviceID string, newTimestamp *Tag) (*Timestamp, error) {
var service serviceRecord
// Lock (SELECT FOR UPDATE) discovery_service row to prevent concurrent updates to the same list, which could mess up the lamport timestamp.
if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
Where(serviceRecord{ID: serviceID}).
Find(&result).
Find(&service).
Error; err != nil {
return 0, err
return nil, err
}
result.ID = serviceID
service.ID = serviceID
var result *Timestamp
if newTimestamp == nil {
// Increment timestamp
result.LamportTimestamp++
// Update tag: decode current timestamp, increment it, encode it again.
currTimestamp := Timestamp(0)
if service.LastTag != "" {
// If LastTag is empty, it means the service was just created and no presentations were added yet.
ts := service.LastTag.Timestamp(service.TagPrefix)
if ts == nil {
// would be very weird
return nil, fmt.Errorf("invalid tag '%s'", service.LastTag)
}
currTimestamp = *ts
}
ts := currTimestamp.Increment()
result = &ts
service.LastTag = ts.Tag(service.TagPrefix)
} else {
result.LamportTimestamp = uint64(*newTimestamp)
// Set tag: just store it
service.LastTag = *newTimestamp
}
if err := tx.Save(&result).Error; err != nil {
return 0, err
if err := tx.Save(service).Error; err != nil {
return nil, err
}
return Timestamp(result.LamportTimestamp), nil
return result, nil
}

// exists checks whether a presentation of the given subject is registered on a service.
Expand Down Expand Up @@ -382,3 +421,14 @@ func indexJSONObject(target map[string]interface{}, jsonPaths []string, stringVa
}
return jsonPaths, stringValues
}

// generatePrefix generates a random seed for a service, consisting of 5 uppercase letters.
func generatePrefix() string {
result := make([]byte, tagPrefixLength)
lower := int('A')
upper := int('Z')
for i := 0; i < len(result); i++ {
result[i] = byte(lower + rand.Intn(upper-lower))
}
return string(result)
}
Loading

0 comments on commit a8bed89

Please sign in to comment.