diff --git a/storage/ballot_queue.go b/storage/ballot_queue.go index 5eacedc..214bba2 100644 --- a/storage/ballot_queue.go +++ b/storage/ballot_queue.go @@ -1,9 +1,11 @@ package storage import ( + "encoding/hex" "errors" "fmt" + "github.com/vocdoni/vocdoni-z-sandbox/log" "go.vocdoni.io/dvote/db/prefixeddb" ) @@ -23,6 +25,8 @@ func (s *Storage) PushBallot(b *Ballot) error { } // NextBallot returns the next non-reserved ballot, creates a reservation, and returns it. +// It returns the ballot, the key, and an error. If no ballots are available, returns ErrNoMoreElements. +// The key is used to mark the ballot as done after processing and to pass it to the next stage. func (s *Storage) NextBallot() (*Ballot, []byte, error) { s.globalLock.Lock() defer s.globalLock.Unlock() @@ -56,9 +60,8 @@ func (s *Storage) NextBallot() (*Ballot, []byte, error) { return &b, chosenKey, nil } -// MarkBallotDone called after we have processed the ballot. We must remove reservation and possibly move it to next stage. +// MarkBallotDone called after we have processed the ballot. We push the verified ballot to the next queue. // In this scenario, next stage is verifiedBallot so we do not store the original ballot. -// The aggregator stage expects verified ballots as input. func (s *Storage) MarkBallotDone(k []byte, vb *VerifiedBallot) error { s.globalLock.Lock() defer s.globalLock.Unlock() @@ -88,24 +91,67 @@ func (s *Storage) MarkBallotDone(k []byte, vb *VerifiedBallot) error { return wTx.Commit() } -// GetVerifiedBallots returns all verified ballots for a given processID -func (s *Storage) GetVerifiedBallots(processID []byte) ([]VerifiedBallot, error) { +// PullVerifiedBallots returns a list of non-reserved verified ballots for a given processID +// and creates reservations for them. The maxCount parameter is used to limit the number of results. +// If no ballots are available, returns ErrNotFound. +func (s *Storage) PullVerifiedBallots(processID []byte, maxCount int) ([]*VerifiedBallot, [][]byte, error) { s.globalLock.Lock() defer s.globalLock.Unlock() + if maxCount == 0 { + return []*VerifiedBallot{}, nil, nil + } + rd := prefixeddb.NewPrefixedReader(s.db, verifiedBallotPrefix) - var res []VerifiedBallot + var res []*VerifiedBallot + var keys [][]byte rd.Iterate(processID, func(k, v []byte) bool { + key := append(processID, k...) + if maxCount > 0 && len(res) >= maxCount { + return false + } + // Skip if already reserved + if s.isReserved(verifiedBallotReservPrefix, key) { + return true + } var vb VerifiedBallot - if err := decodeArtifact(v, &vb); err == nil { - res = append(res, vb) + if err := decodeArtifact(v, &vb); err != nil { + log.Warnw("failed to decode verified ballot", "key", hex.EncodeToString(key), "error", err.Error()) + return true + } + // Set reservation before adding to results + if err := s.setReservation(verifiedBallotReservPrefix, key); err != nil { + log.Warnw("failed to set reservation for verified ballot", "key", hex.EncodeToString(key), "error", err.Error()) + return true } + // Make a copy of the key to avoid any potential modification + keyCopy := make([]byte, len(key)) + copy(keyCopy, key) + res = append(res, &vb) + keys = append(keys, keyCopy) return true }) + + // Return ErrNotFound if we found no ballots at all if len(res) == 0 { - return nil, ErrNotFound + return nil, nil, ErrNotFound } - return res, nil + + return res, keys, nil +} + +// CountVerifiedBallots returns the number of verified ballots for a given processID. +func (s *Storage) CountVerifiedBallots(processID []byte) int { + s.globalLock.Lock() + defer s.globalLock.Unlock() + + rd := prefixeddb.NewPrefixedReader(s.db, verifiedBallotPrefix) + count := 0 + rd.Iterate(processID, func(_, _ []byte) bool { + count++ + return true + }) + return count } // PushBallotBatch pushes an aggregated ballot batch to the aggregator queue. @@ -123,22 +169,6 @@ func (s *Storage) PushBallotBatch(abb *AggregatedBallotBatch) error { return wTx.Commit() } -// ListBallotBatch returns all aggregated ballot batches keys. -func (s *Storage) ListBallotBatch() [][]byte { - s.globalLock.Lock() - defer s.globalLock.Unlock() - - rd := prefixeddb.NewPrefixedReader(s.db, aggregBatchPrefix) - var res [][]byte - rd.Iterate(nil, func(k, v []byte) bool { - k2 := make([]byte, len(k)) - copy(k2, k) - res = append(res, k2) - return true - }) - return res -} - // NextBallotBatch returns the next aggregated ballot batch for a given processID, sets a reservation. func (s *Storage) NextBallotBatch(processID []byte) (*AggregatedBallotBatch, []byte, error) { s.globalLock.Lock() @@ -147,10 +177,11 @@ func (s *Storage) NextBallotBatch(processID []byte) (*AggregatedBallotBatch, []b pr := prefixeddb.NewPrefixedReader(s.db, aggregBatchPrefix) var chosenKey, chosenVal []byte pr.Iterate(processID, func(k, v []byte) bool { - if s.isReserved(aggregBatchReservPrefix, k) { + key := append(processID, k...) + if s.isReserved(aggregBatchReservPrefix, key) { return true } - chosenKey = append(processID, k...) + chosenKey = key chosenVal = v return false }) @@ -170,6 +201,24 @@ func (s *Storage) NextBallotBatch(processID []byte) (*AggregatedBallotBatch, []b return &abb, chosenKey, nil } +// MarkVerifiedBallotDone removes the reservation and the verified ballot. +func (s *Storage) MarkVerifiedBallotDone(k []byte) error { + s.globalLock.Lock() + defer s.globalLock.Unlock() + + // remove reservation + if err := s.deleteArtifact(verifiedBallotReservPrefix, k); err != nil && !errors.Is(err, ErrNotFound) { + return fmt.Errorf("delete verified ballot reservation: %w", err) + } + + // remove from verified queue + if err := s.deleteArtifact(verifiedBallotPrefix, k); err != nil && !errors.Is(err, ErrNotFound) { + return fmt.Errorf("delete verified ballot: %w", err) + } + + return nil +} + // MarkBallotBatchDone called after processing aggregator batch. For simplicity, we just remove it from aggregator queue and reservation. func (s *Storage) MarkBallotBatchDone(k []byte) error { s.globalLock.Lock() diff --git a/storage/storage.go b/storage/storage.go index 20999be..e9c84cb 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -19,13 +19,14 @@ var ( ErrNoMoreElements = errors.New("no more elements") // Prefixes - ballotPrefix = []byte("b/") - ballotReservationPrefix = []byte("br/") - verifiedBallotPrefix = []byte("vb/") - aggregBatchPrefix = []byte("ag/") - aggregBatchReservPrefix = []byte("agr/") - encryptionKeyPrefix = []byte("ek/") - metadataPrefix = []byte("m/") + ballotPrefix = []byte("b/") + ballotReservationPrefix = []byte("br/") + verifiedBallotPrefix = []byte("vb/") + verifiedBallotReservPrefix = []byte("vbr/") + aggregBatchPrefix = []byte("ag/") + aggregBatchReservPrefix = []byte("agr/") + encryptionKeyPrefix = []byte("ek/") + metadataPrefix = []byte("m/") maxKeySize = 12 ) @@ -61,12 +62,13 @@ func (s *Storage) recover() error { s.globalLock.Lock() defer s.globalLock.Unlock() - // Clear ballot reservations + // Clear all reservations if err := s.clearAllReservations(ballotReservationPrefix); err != nil { return fmt.Errorf("failed to clear ballot reservations: %w", err) } - - // Clear aggregated batch reservations + if err := s.clearAllReservations(verifiedBallotReservPrefix); err != nil { + return fmt.Errorf("failed to clear verified ballot reservations: %w", err) + } if err := s.clearAllReservations(aggregBatchReservPrefix); err != nil { return fmt.Errorf("failed to clear aggregated batch reservations: %w", err) } @@ -115,14 +117,22 @@ func (s *Storage) ReleaseStaleReservations(maxAge time.Duration) error { defer s.globalLock.Unlock() now := time.Now().Unix() - err := s.releaseStaleInPrefix(ballotReservationPrefix, now, maxAge) - if err != nil { + + // Release stale ballot reservations + if err := s.releaseStaleInPrefix(ballotReservationPrefix, now, maxAge); err != nil { return err } - err = s.releaseStaleInPrefix(aggregBatchReservPrefix, now, maxAge) - if err != nil { + + // Release stale verified ballot reservations + if err := s.releaseStaleInPrefix(verifiedBallotReservPrefix, now, maxAge); err != nil { return err } + + // Release stale aggregated batch reservations + if err := s.releaseStaleInPrefix(aggregBatchReservPrefix, now, maxAge); err != nil { + return err + } + return nil } diff --git a/storage/storage_test.go b/storage/storage_test.go index 2a4bb02..77887ee 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -89,28 +89,49 @@ func TestBallotQueue(t *testing.T) { } c.Assert(st.MarkBallotDone(b2key, verified2), qt.IsNil) - // Now retrieve verified ballots for the process - // Test GetVerifiedBallots with different maxCount values + // There should be now 2 verified ballots. + c.Assert(st.CountVerifiedBallots( + processID.Marshal()), + qt.Equals, + 2, + qt.Commentf("should have 2 verified ballots"), + ) + + // Now pull verified ballots for the process + // Test PullVerifiedBallots with different maxCount values // Test maxCount = 1 should return only one ballot - vbs1, err := st.GetVerifiedBallots(processID.Marshal(), 1) - c.Assert(err, qt.IsNil, qt.Commentf("must get verified ballots with maxCount=1")) + vbs1, keys1, err := st.PullVerifiedBallots(processID.Marshal(), 1) + c.Assert(err, qt.IsNil, qt.Commentf("must pull verified ballots with maxCount=2")) c.Assert(len(vbs1), qt.Equals, 1, qt.Commentf("should return exactly 1 ballot")) + c.Assert(len(keys1), qt.Equals, 1, qt.Commentf("should return exactly 1 key")) + + // Verify reservation was created + c.Assert(st.isReserved(verifiedBallotReservPrefix, keys1[0]), qt.IsTrue, qt.Commentf("ballot should be reserved")) - // Test maxCount = 2 should return both ballots - vbs2, err := st.GetVerifiedBallots(processID.Marshal(), 2) - c.Assert(err, qt.IsNil, qt.Commentf("must get verified ballots with maxCount=2")) - c.Assert(len(vbs2), qt.Equals, 2, qt.Commentf("should return exactly 2 ballots")) + // Mark first ballot as done + c.Assert(st.MarkVerifiedBallotDone(keys1[0]), qt.IsNil) + + // Now we should be able to pull the second ballot + vbs3, keys3, err := st.PullVerifiedBallots(processID.Marshal(), 2) + c.Assert(err, qt.IsNil, qt.Commentf("must pull verified ballots after marking first as done")) + c.Assert(len(vbs3), qt.Equals, 1, qt.Commentf("should return exactly 1 ballot")) + c.Assert(len(keys3), qt.Equals, 1, qt.Commentf("should return exactly 1 key")) + + // Verify the second ballot is now reserved + c.Assert(st.isReserved(verifiedBallotReservPrefix, keys3[0]), qt.IsTrue, qt.Commentf("second ballot should be reserved")) // Test maxCount = 0 should return no ballots - vbs0, err := st.GetVerifiedBallots(processID.Marshal(), 0) - c.Assert(err, qt.IsNil, qt.Commentf("must get verified ballots with maxCount=0")) + vbs0, keys0, err := st.PullVerifiedBallots(processID.Marshal(), 0) + c.Assert(err, qt.IsNil, qt.Commentf("must pull verified ballots with maxCount=0")) c.Assert(len(vbs0), qt.Equals, 0, qt.Commentf("should return no ballots")) + c.Assert(len(keys0), qt.Equals, 0, qt.Commentf("should return no keys")) - // Test maxCount > number of available ballots should return all ballots - vbs10, err := st.GetVerifiedBallots(processID.Marshal(), 10) - c.Assert(err, qt.IsNil, qt.Commentf("must get verified ballots with maxCount=10")) - c.Assert(len(vbs10), qt.Equals, 2, qt.Commentf("should return all available ballots")) + // Test maxCount > number of available ballots should return remaining unreserved ballots + vbs10, keys10, err := st.PullVerifiedBallots(processID.Marshal(), 10) + c.Assert(err, qt.Equals, ErrNotFound, qt.Commentf("should return ErrNotFound when no unreserved ballots")) + c.Assert(vbs10, qt.IsNil) + c.Assert(keys10, qt.IsNil) // Try again NextBallot. There should be no more ballots. _, _, err = st.NextBallot() @@ -127,9 +148,10 @@ func TestBallotQueue(t *testing.T) { ChainID: 0, Nonce: 999, } - vbsEmpty, err := st.GetVerifiedBallots(anotherPID.Marshal(), 10) + vbsEmpty, keysEmpty, err := st.PullVerifiedBallots(anotherPID.Marshal(), 10) c.Assert(err, qt.Equals, ErrNotFound, qt.Commentf("no verified ballots for a new process")) c.Assert(vbsEmpty, qt.IsNil) + c.Assert(keysEmpty, qt.IsNil) } func TestBallotBatchQueue(t *testing.T) { @@ -203,8 +225,6 @@ func TestBallotBatchQueue(t *testing.T) { // Mark batch2 done and wait c.Assert(st.MarkBallotBatchDone(b2key), qt.IsNil) - c.Assert(st.ListBallotBatch(), qt.IsNil) - // Push and verify batch3 batch3 := &AggregatedBallotBatch{ ProcessID: processID.Marshal(), diff --git a/storage/types.go b/storage/types.go index eadcb24..d8cf1e0 100644 --- a/storage/types.go +++ b/storage/types.go @@ -28,8 +28,7 @@ type VerifiedBallot struct { Commitment types.HexBytes `json:"commitment"` EncryptedBallot elgamal.Ciphertext `json:"encryptedBallot"` Address types.HexBytes `json:"address"` - - Proof groth16.Proof `json:"proof"` + Proof groth16.Proof `json:"proof"` } type Ballot struct {