-
Notifications
You must be signed in to change notification settings - Fork 96
/
tally.py
459 lines (375 loc) · 14.5 KB
/
tally.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# pylint: disable=unnecessary-comprehension
from dataclasses import dataclass, field
from typing import Iterable, Optional, List, Dict, Set, Tuple, Any
from collections.abc import Container, Sized
from .ballot import (
BallotBoxState,
CiphertextBallotSelection,
SubmittedBallot,
CiphertextSelection,
)
from .data_store import DataStore
from .ballot_validator import ballot_is_valid_for_election
from .decryption_share import CiphertextDecryptionSelection
from .election import CiphertextElectionContext
from .election_object_base import ElectionObjectBase, OrderedObjectBase
from .elgamal import ElGamalCiphertext, elgamal_add
from .group import ElementModQ, ONE_MOD_P, ElementModP
from .logs import log_warning
from .manifest import InternalManifest
from .scheduler import Scheduler
from .type import BallotId, ContestId, SelectionId
@dataclass
class PlaintextTallySelection(ElectionObjectBase):
"""
A plaintext Tally Selection is a decrypted selection of a contest
"""
tally: int
# g^tally or M in the spec
value: ElementModP
message: ElGamalCiphertext
shares: List[CiphertextDecryptionSelection]
@dataclass
class CiphertextTallySelection(ElectionObjectBase, CiphertextSelection):
"""
a CiphertextTallySelection is a homomorphic accumulation of all of the
CiphertextBallotSelection instances for a specific selection in an election.
"""
sequence_order: int
"""Order of the selection."""
description_hash: ElementModQ
"""
The SelectionDescription hash
"""
ciphertext: ElGamalCiphertext = field(
default=ElGamalCiphertext(ONE_MOD_P, ONE_MOD_P)
)
"""
The encrypted representation of the total of all ballots for this selection
"""
def elgamal_accumulate(
self, elgamal_ciphertext: ElGamalCiphertext
) -> ElGamalCiphertext:
"""
Homomorphically add the specified value to the message
"""
new_value = elgamal_add(self.ciphertext, elgamal_ciphertext)
self.ciphertext = new_value
return self.ciphertext
@dataclass
class PlaintextTallyContest(ElectionObjectBase):
"""
A plaintext Tally Contest is a collection of plaintext selections
"""
selections: Dict[SelectionId, PlaintextTallySelection]
@dataclass
class CiphertextTallyContest(OrderedObjectBase):
"""
A CiphertextTallyContest is a container for associating a collection of CiphertextTallySelection
to a specific ContestDescription
"""
description_hash: ElementModQ
"""
The ContestDescription hash
"""
selections: Dict[SelectionId, CiphertextTallySelection]
"""
A collection of CiphertextTallySelection mapped by SelectionDescription.object_id
"""
def accumulate_contest(
self,
contest_selections: List[CiphertextBallotSelection],
scheduler: Optional[Scheduler] = None,
) -> bool:
"""
Accumulate the contest selections of an individual ballot into this tally
"""
if len(contest_selections) == 0:
log_warning(
f"accumulate cannot add missing selections for contest {self.object_id}"
)
return False
# Validate the input data by comparing the selection id's provided
# to the valid selection id's for this tally contest
selection_ids = {
selection.object_id
for selection in contest_selections
if not selection.is_placeholder_selection
}
if any(set(self.selections).difference(selection_ids)):
log_warning(
f"accumulate cannot add mismatched selections for contest {self.object_id}"
)
return False
if scheduler is None:
scheduler = Scheduler()
# iterate through the tally selections and add the new value to the total
results: List[
Tuple[SelectionId, Optional[ElGamalCiphertext]]
] = scheduler.schedule(
self._accumulate_selections,
[
(key, selection_tally, contest_selections)
for (key, selection_tally) in self.selections.items()
],
)
for (key, ciphertext) in results:
if ciphertext is None:
return False
self.selections[key].ciphertext = ciphertext
return True
@staticmethod
def _accumulate_selections(
key: SelectionId,
selection_tally: CiphertextTallySelection,
contest_selections: List[CiphertextBallotSelection],
) -> Tuple[SelectionId, Optional[ElGamalCiphertext]]:
use_selection = None
for selection in contest_selections:
if key == selection.object_id:
use_selection = selection
break
# a selection on the ballot that is required was not found
# this should never happen when using the `CiphertextTally`
# but sanity check anyway
if not use_selection:
log_warning(f"add cannot accumulate for missing selection {key}")
return key, None
return key, selection_tally.elgamal_accumulate(use_selection.ciphertext)
@dataclass
class PlaintextTally(ElectionObjectBase):
"""
The plaintext representation of all contests in the election
"""
contests: Dict[ContestId, PlaintextTallyContest]
@dataclass
class PublishedCiphertextTally(ElectionObjectBase):
"""
A published version of the ciphertext tally
"""
contests: Dict[ContestId, CiphertextTallyContest]
@dataclass
class CiphertextTally(ElectionObjectBase, Container, Sized):
"""
A `CiphertextTally` accepts cast and spoiled ballots and accumulates a tally on the cast ballots
"""
_internal_manifest: InternalManifest
_encryption: CiphertextElectionContext
cast_ballot_ids: Set[BallotId] = field(default_factory=lambda: set())
"""A local cache of ballots id's that have already been cast"""
spoiled_ballot_ids: Set[BallotId] = field(default_factory=lambda: set())
contests: Dict[ContestId, CiphertextTallyContest] = field(init=False)
"""
A collection of each contest and selection in an election.
Retains an encrypted representation of a tally for each selection
"""
def __post_init__(self) -> None:
object.__setattr__(
self, "contests", self._build_tally_collection(self._internal_manifest)
)
def __len__(self) -> int:
return len(self.cast_ballot_ids) + len(self.spoiled_ballot_ids)
def __contains__(self, item: object) -> bool:
if not isinstance(item, SubmittedBallot):
return False
if (
item.object_id in self.cast_ballot_ids
or item.object_id in self.spoiled_ballot_ids
):
return True
return False
def append(
self,
ballot: SubmittedBallot,
should_validate: bool,
scheduler: Optional[Scheduler] = None,
) -> bool:
"""
Append a ballot to the tally and recalculate the tally.
"""
if ballot.state == BallotBoxState.UNKNOWN:
log_warning(f"append cannot add {ballot.object_id} with invalid state")
return False
if self.__contains__(ballot):
log_warning(f"append cannot add {ballot.object_id} that is already tallied")
return False
if not ballot_is_valid_for_election(
ballot, self._internal_manifest, self._encryption, should_validate
):
return False
if ballot.state == BallotBoxState.CAST:
return self._add_cast(ballot, scheduler)
if ballot.state == BallotBoxState.SPOILED:
return self._add_spoiled(ballot)
log_warning(f"append cannot add {ballot.object_id}")
return False
def batch_append(
self,
ballots: Iterable[Tuple[Any, SubmittedBallot]],
should_validate: bool,
scheduler: Optional[Scheduler] = None,
) -> bool:
"""
Append a collection of Ballots to the tally and recalculate
"""
cast_ballot_selections: Dict[
SelectionId, Dict[BallotId, ElGamalCiphertext]
] = {}
for ballot in ballots:
# get the value of the dict
ballot_value = ballot[1]
if not self.__contains__(ballot) and ballot_is_valid_for_election(
ballot_value, self._internal_manifest, self._encryption, should_validate
):
if ballot_value.state == BallotBoxState.CAST:
# collect the selections so they can can be accumulated in parallel
for contest in ballot_value.contests:
for selection in contest.ballot_selections:
if selection.object_id not in cast_ballot_selections:
cast_ballot_selections[selection.object_id] = {}
cast_ballot_selections[selection.object_id][
ballot_value.object_id
] = selection.ciphertext
# just append the spoiled ballots
elif ballot_value.state == BallotBoxState.SPOILED:
self._add_spoiled(ballot_value)
# cache the cast ballot id's so they are not double counted
if self._execute_accumulate(cast_ballot_selections, scheduler):
for ballot in ballots:
# get the value of the dict
ballot_value = ballot[1]
if ballot_value.state == BallotBoxState.CAST:
self.cast_ballot_ids.add(ballot_value.object_id)
return True
return False
def cast(self) -> int:
"""
Get a count of the cast ballots
"""
return len(self.cast_ballot_ids)
def spoiled(self) -> int:
"""
Get a count of the spoiled ballots
"""
return len(self.spoiled_ballot_ids)
def publish(self) -> PublishedCiphertextTally:
return PublishedCiphertextTally(self.object_id, self.contests)
@staticmethod
def _accumulate(
id: str, ballot_selections: Dict[BallotId, ElGamalCiphertext]
) -> Tuple[str, ElGamalCiphertext]:
return (
id,
elgamal_add(*[ciphertext for ciphertext in ballot_selections.values()]),
)
def _add_cast(
self, ballot: SubmittedBallot, scheduler: Optional[Scheduler] = None
) -> bool:
"""
Add a cast ballot to the tally, synchronously
"""
# iterate through the contests and elgamal add
for contest in ballot.contests:
# This should never happen since the ballot is validated against the election metadata
# but it's possible the local dictionary was modified so we double check.
if not contest.object_id in self.contests:
log_warning(
f"add cast missing contest in valid set {contest.object_id}"
)
return False
use_contest = self.contests[contest.object_id]
if not use_contest.accumulate_contest(contest.ballot_selections, scheduler):
return False
self.contests[contest.object_id] = use_contest
self.cast_ballot_ids.add(ballot.object_id)
return True
def _add_spoiled(self, ballot: SubmittedBallot) -> bool:
"""
Add a spoiled ballot
"""
self.spoiled_ballot_ids.add(ballot.object_id)
return True
@staticmethod
def _build_tally_collection(
internal_manifest: InternalManifest,
) -> Dict[ContestId, CiphertextTallyContest]:
"""
Build the object graph for the tally from the InternalManifest
"""
cast_collection: Dict[str, CiphertextTallyContest] = {}
for contest in internal_manifest.contests:
# build a collection of valid selections for the contest description
# note: we explicitly ignore the Placeholder Selections.
contest_selections: Dict[str, CiphertextTallySelection] = {}
for selection in contest.ballot_selections:
contest_selections[selection.object_id] = CiphertextTallySelection(
selection.object_id,
selection.sequence_order,
selection.crypto_hash(),
)
cast_collection[contest.object_id] = CiphertextTallyContest(
contest.object_id,
contest.sequence_order,
contest.crypto_hash(),
contest_selections,
)
return cast_collection
def _execute_accumulate(
self,
ciphertext_selections_by_selection_id: Dict[
str, Dict[BallotId, ElGamalCiphertext]
],
scheduler: Optional[Scheduler] = None,
) -> bool:
result_set: List[Tuple[SelectionId, ElGamalCiphertext]]
if not scheduler:
scheduler = Scheduler()
result_set = scheduler.schedule(
self._accumulate,
[
(selection_id, selections)
for (
selection_id,
selections,
) in ciphertext_selections_by_selection_id.items()
],
)
result_dict = {
selection_id: ciphertext for (selection_id, ciphertext) in result_set
}
for contest in self.contests.values():
for selection_id, selection in contest.selections.items():
if selection_id in result_dict:
selection.elgamal_accumulate(result_dict[selection_id])
return True
def tally_ballot(
ballot: SubmittedBallot, tally: CiphertextTally
) -> Optional[CiphertextTally]:
"""
Tally a ballot that is either Cast or Spoiled
:return: The mutated CiphertextTally or None if there is an error
"""
if ballot.state == BallotBoxState.UNKNOWN:
log_warning(
f"tally ballots error tallying unknown state for ballot {ballot.object_id}"
)
return None
if tally.append(ballot, True):
return tally
return None
def tally_ballots(
store: DataStore,
internal_manifest: InternalManifest,
context: CiphertextElectionContext,
) -> Optional[CiphertextTally]:
"""
Tally all of the ballots in the ballot store.
:return: a CiphertextTally or None if there is an error
"""
# TODO: ISSUE #14: unique Id for the tally
tally: CiphertextTally = CiphertextTally(
"election-results", internal_manifest, context
)
if tally.batch_append(store, True):
return tally
return None