Skip to content

Commit

Permalink
add maxCount to GetVerifiedBallots and CountVerifiedBallots
Browse files Browse the repository at this point in the history
Signed-off-by: p4u <[email protected]>
  • Loading branch information
p4u committed Dec 19, 2024
1 parent 96da053 commit 438739e
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 60 deletions.
103 changes: 76 additions & 27 deletions storage/ballot_queue.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package storage

import (
"encoding/hex"
"errors"
"fmt"

"github.com/vocdoni/vocdoni-z-sandbox/log"
"go.vocdoni.io/dvote/db/prefixeddb"
)

Expand All @@ -23,6 +25,8 @@ func (s *Storage) PushBallot(b *Ballot) error {
}

// NextBallot returns the next non-reserved ballot, creates a reservation, and returns it.
// It returns the ballot, the key, and an error. If no ballots are available, returns ErrNoMoreElements.
// The key is used to mark the ballot as done after processing and to pass it to the next stage.
func (s *Storage) NextBallot() (*Ballot, []byte, error) {
s.globalLock.Lock()
defer s.globalLock.Unlock()
Expand Down Expand Up @@ -56,9 +60,8 @@ func (s *Storage) NextBallot() (*Ballot, []byte, error) {
return &b, chosenKey, nil
}

// MarkBallotDone called after we have processed the ballot. We must remove reservation and possibly move it to next stage.
// MarkBallotDone called after we have processed the ballot. We push the verified ballot to the next queue.
// In this scenario, next stage is verifiedBallot so we do not store the original ballot.
// The aggregator stage expects verified ballots as input.
func (s *Storage) MarkBallotDone(k []byte, vb *VerifiedBallot) error {
s.globalLock.Lock()
defer s.globalLock.Unlock()
Expand Down Expand Up @@ -88,24 +91,67 @@ func (s *Storage) MarkBallotDone(k []byte, vb *VerifiedBallot) error {
return wTx.Commit()
}

// GetVerifiedBallots returns all verified ballots for a given processID
func (s *Storage) GetVerifiedBallots(processID []byte) ([]VerifiedBallot, error) {
// PullVerifiedBallots returns a list of non-reserved verified ballots for a given processID
// and creates reservations for them. The maxCount parameter is used to limit the number of results.
// If no ballots are available, returns ErrNotFound.
func (s *Storage) PullVerifiedBallots(processID []byte, maxCount int) ([]*VerifiedBallot, [][]byte, error) {
s.globalLock.Lock()
defer s.globalLock.Unlock()

if maxCount == 0 {
return []*VerifiedBallot{}, nil, nil
}

rd := prefixeddb.NewPrefixedReader(s.db, verifiedBallotPrefix)
var res []VerifiedBallot
var res []*VerifiedBallot
var keys [][]byte
rd.Iterate(processID, func(k, v []byte) bool {
key := append(processID, k...)
if maxCount > 0 && len(res) >= maxCount {
return false
}
// Skip if already reserved
if s.isReserved(verifiedBallotReservPrefix, key) {
return true
}
var vb VerifiedBallot
if err := decodeArtifact(v, &vb); err == nil {
res = append(res, vb)
if err := decodeArtifact(v, &vb); err != nil {
log.Warnw("failed to decode verified ballot", "key", hex.EncodeToString(key), "error", err.Error())
return true
}
// Set reservation before adding to results
if err := s.setReservation(verifiedBallotReservPrefix, key); err != nil {
log.Warnw("failed to set reservation for verified ballot", "key", hex.EncodeToString(key), "error", err.Error())
return true
}
// Make a copy of the key to avoid any potential modification
keyCopy := make([]byte, len(key))
copy(keyCopy, key)
res = append(res, &vb)
keys = append(keys, keyCopy)
return true
})

// Return ErrNotFound if we found no ballots at all
if len(res) == 0 {
return nil, ErrNotFound
return nil, nil, ErrNotFound
}
return res, nil

return res, keys, nil
}

// CountVerifiedBallots returns the number of verified ballots for a given processID.
func (s *Storage) CountVerifiedBallots(processID []byte) int {
s.globalLock.Lock()
defer s.globalLock.Unlock()

rd := prefixeddb.NewPrefixedReader(s.db, verifiedBallotPrefix)
count := 0
rd.Iterate(processID, func(_, _ []byte) bool {
count++
return true
})
return count
}

// PushBallotBatch pushes an aggregated ballot batch to the aggregator queue.
Expand All @@ -123,22 +169,6 @@ func (s *Storage) PushBallotBatch(abb *AggregatedBallotBatch) error {
return wTx.Commit()
}

// ListBallotBatch returns all aggregated ballot batches keys.
func (s *Storage) ListBallotBatch() [][]byte {
s.globalLock.Lock()
defer s.globalLock.Unlock()

rd := prefixeddb.NewPrefixedReader(s.db, aggregBatchPrefix)
var res [][]byte
rd.Iterate(nil, func(k, v []byte) bool {
k2 := make([]byte, len(k))
copy(k2, k)
res = append(res, k2)
return true
})
return res
}

// NextBallotBatch returns the next aggregated ballot batch for a given processID, sets a reservation.
func (s *Storage) NextBallotBatch(processID []byte) (*AggregatedBallotBatch, []byte, error) {
s.globalLock.Lock()
Expand All @@ -147,10 +177,11 @@ func (s *Storage) NextBallotBatch(processID []byte) (*AggregatedBallotBatch, []b
pr := prefixeddb.NewPrefixedReader(s.db, aggregBatchPrefix)
var chosenKey, chosenVal []byte
pr.Iterate(processID, func(k, v []byte) bool {
if s.isReserved(aggregBatchReservPrefix, k) {
key := append(processID, k...)
if s.isReserved(aggregBatchReservPrefix, key) {
return true
}
chosenKey = append(processID, k...)
chosenKey = key
chosenVal = v
return false
})
Expand All @@ -170,6 +201,24 @@ func (s *Storage) NextBallotBatch(processID []byte) (*AggregatedBallotBatch, []b
return &abb, chosenKey, nil
}

// MarkVerifiedBallotDone removes the reservation and the verified ballot.
func (s *Storage) MarkVerifiedBallotDone(k []byte) error {
s.globalLock.Lock()
defer s.globalLock.Unlock()

// remove reservation
if err := s.deleteArtifact(verifiedBallotReservPrefix, k); err != nil && !errors.Is(err, ErrNotFound) {
return fmt.Errorf("delete verified ballot reservation: %w", err)
}

// remove from verified queue
if err := s.deleteArtifact(verifiedBallotPrefix, k); err != nil && !errors.Is(err, ErrNotFound) {
return fmt.Errorf("delete verified ballot: %w", err)
}

return nil
}

// MarkBallotBatchDone called after processing aggregator batch. For simplicity, we just remove it from aggregator queue and reservation.
func (s *Storage) MarkBallotBatchDone(k []byte) error {
s.globalLock.Lock()
Expand Down
38 changes: 24 additions & 14 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ var (
ErrNoMoreElements = errors.New("no more elements")

// Prefixes
ballotPrefix = []byte("b/")
ballotReservationPrefix = []byte("br/")
verifiedBallotPrefix = []byte("vb/")
aggregBatchPrefix = []byte("ag/")
aggregBatchReservPrefix = []byte("agr/")
encryptionKeyPrefix = []byte("ek/")
metadataPrefix = []byte("m/")
ballotPrefix = []byte("b/")
ballotReservationPrefix = []byte("br/")
verifiedBallotPrefix = []byte("vb/")
verifiedBallotReservPrefix = []byte("vbr/")
aggregBatchPrefix = []byte("ag/")
aggregBatchReservPrefix = []byte("agr/")
encryptionKeyPrefix = []byte("ek/")
metadataPrefix = []byte("m/")

maxKeySize = 12
)
Expand Down Expand Up @@ -61,12 +62,13 @@ func (s *Storage) recover() error {
s.globalLock.Lock()
defer s.globalLock.Unlock()

// Clear ballot reservations
// Clear all reservations
if err := s.clearAllReservations(ballotReservationPrefix); err != nil {
return fmt.Errorf("failed to clear ballot reservations: %w", err)
}

// Clear aggregated batch reservations
if err := s.clearAllReservations(verifiedBallotReservPrefix); err != nil {
return fmt.Errorf("failed to clear verified ballot reservations: %w", err)
}
if err := s.clearAllReservations(aggregBatchReservPrefix); err != nil {
return fmt.Errorf("failed to clear aggregated batch reservations: %w", err)
}
Expand Down Expand Up @@ -115,14 +117,22 @@ func (s *Storage) ReleaseStaleReservations(maxAge time.Duration) error {
defer s.globalLock.Unlock()

now := time.Now().Unix()
err := s.releaseStaleInPrefix(ballotReservationPrefix, now, maxAge)
if err != nil {

// Release stale ballot reservations
if err := s.releaseStaleInPrefix(ballotReservationPrefix, now, maxAge); err != nil {
return err
}
err = s.releaseStaleInPrefix(aggregBatchReservPrefix, now, maxAge)
if err != nil {

// Release stale verified ballot reservations
if err := s.releaseStaleInPrefix(verifiedBallotReservPrefix, now, maxAge); err != nil {
return err
}

// Release stale aggregated batch reservations
if err := s.releaseStaleInPrefix(aggregBatchReservPrefix, now, maxAge); err != nil {
return err
}

return nil
}

Expand Down
54 changes: 37 additions & 17 deletions storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,49 @@ func TestBallotQueue(t *testing.T) {
}
c.Assert(st.MarkBallotDone(b2key, verified2), qt.IsNil)

// Now retrieve verified ballots for the process
// Test GetVerifiedBallots with different maxCount values
// There should be now 2 verified ballots.
c.Assert(st.CountVerifiedBallots(
processID.Marshal()),
qt.Equals,
2,
qt.Commentf("should have 2 verified ballots"),
)

// Now pull verified ballots for the process
// Test PullVerifiedBallots with different maxCount values

// Test maxCount = 1 should return only one ballot
vbs1, err := st.GetVerifiedBallots(processID.Marshal(), 1)
c.Assert(err, qt.IsNil, qt.Commentf("must get verified ballots with maxCount=1"))
vbs1, keys1, err := st.PullVerifiedBallots(processID.Marshal(), 1)
c.Assert(err, qt.IsNil, qt.Commentf("must pull verified ballots with maxCount=2"))
c.Assert(len(vbs1), qt.Equals, 1, qt.Commentf("should return exactly 1 ballot"))
c.Assert(len(keys1), qt.Equals, 1, qt.Commentf("should return exactly 1 key"))

// Verify reservation was created
c.Assert(st.isReserved(verifiedBallotReservPrefix, keys1[0]), qt.IsTrue, qt.Commentf("ballot should be reserved"))

// Test maxCount = 2 should return both ballots
vbs2, err := st.GetVerifiedBallots(processID.Marshal(), 2)
c.Assert(err, qt.IsNil, qt.Commentf("must get verified ballots with maxCount=2"))
c.Assert(len(vbs2), qt.Equals, 2, qt.Commentf("should return exactly 2 ballots"))
// Mark first ballot as done
c.Assert(st.MarkVerifiedBallotDone(keys1[0]), qt.IsNil)

// Now we should be able to pull the second ballot
vbs3, keys3, err := st.PullVerifiedBallots(processID.Marshal(), 2)
c.Assert(err, qt.IsNil, qt.Commentf("must pull verified ballots after marking first as done"))
c.Assert(len(vbs3), qt.Equals, 1, qt.Commentf("should return exactly 1 ballot"))
c.Assert(len(keys3), qt.Equals, 1, qt.Commentf("should return exactly 1 key"))

// Verify the second ballot is now reserved
c.Assert(st.isReserved(verifiedBallotReservPrefix, keys3[0]), qt.IsTrue, qt.Commentf("second ballot should be reserved"))

// Test maxCount = 0 should return no ballots
vbs0, err := st.GetVerifiedBallots(processID.Marshal(), 0)
c.Assert(err, qt.IsNil, qt.Commentf("must get verified ballots with maxCount=0"))
vbs0, keys0, err := st.PullVerifiedBallots(processID.Marshal(), 0)
c.Assert(err, qt.IsNil, qt.Commentf("must pull verified ballots with maxCount=0"))
c.Assert(len(vbs0), qt.Equals, 0, qt.Commentf("should return no ballots"))
c.Assert(len(keys0), qt.Equals, 0, qt.Commentf("should return no keys"))

// Test maxCount > number of available ballots should return all ballots
vbs10, err := st.GetVerifiedBallots(processID.Marshal(), 10)
c.Assert(err, qt.IsNil, qt.Commentf("must get verified ballots with maxCount=10"))
c.Assert(len(vbs10), qt.Equals, 2, qt.Commentf("should return all available ballots"))
// Test maxCount > number of available ballots should return remaining unreserved ballots
vbs10, keys10, err := st.PullVerifiedBallots(processID.Marshal(), 10)
c.Assert(err, qt.Equals, ErrNotFound, qt.Commentf("should return ErrNotFound when no unreserved ballots"))
c.Assert(vbs10, qt.IsNil)
c.Assert(keys10, qt.IsNil)

// Try again NextBallot. There should be no more ballots.
_, _, err = st.NextBallot()
Expand All @@ -127,9 +148,10 @@ func TestBallotQueue(t *testing.T) {
ChainID: 0,
Nonce: 999,
}
vbsEmpty, err := st.GetVerifiedBallots(anotherPID.Marshal(), 10)
vbsEmpty, keysEmpty, err := st.PullVerifiedBallots(anotherPID.Marshal(), 10)
c.Assert(err, qt.Equals, ErrNotFound, qt.Commentf("no verified ballots for a new process"))
c.Assert(vbsEmpty, qt.IsNil)
c.Assert(keysEmpty, qt.IsNil)
}

func TestBallotBatchQueue(t *testing.T) {
Expand Down Expand Up @@ -203,8 +225,6 @@ func TestBallotBatchQueue(t *testing.T) {
// Mark batch2 done and wait
c.Assert(st.MarkBallotBatchDone(b2key), qt.IsNil)

c.Assert(st.ListBallotBatch(), qt.IsNil)

// Push and verify batch3
batch3 := &AggregatedBallotBatch{
ProcessID: processID.Marshal(),
Expand Down
3 changes: 1 addition & 2 deletions storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ type VerifiedBallot struct {
Commitment types.HexBytes `json:"commitment"`
EncryptedBallot elgamal.Ciphertext `json:"encryptedBallot"`
Address types.HexBytes `json:"address"`

Proof groth16.Proof `json:"proof"`
Proof groth16.Proof `json:"proof"`
}

type Ballot struct {
Expand Down

0 comments on commit 438739e

Please sign in to comment.