Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UpstreamAggregator to handle upstream stamping submit #108

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion otsd
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import otsserver.calendar
import otsserver.rpc
import otsserver.stamper

import opentimestamps.calendar

parser = argparse.ArgumentParser(description="OpenTimestamps Server")

parser.add_argument("-q", "--quiet", action="count", default=0,
Expand Down Expand Up @@ -92,6 +94,11 @@ parser.add_argument("--btc-max-fee", metavar='FEE', type=float,
parser.add_argument("--btc-full-rbf", action='store_true',
default=False,
help="Do full-RBF replacements")
parser.add_argument("--upstream", type=str,
help="Calling another upstream server for timestamping, while performing a transaction by itself if the upstream is not available")

parser.add_argument("--upstream-timeout", type=int, default=100,
help="If the upstream don't upgrade after the timeout, proceed timestamping by its own")

btc_net_group = parser.add_mutually_exclusive_group()
btc_net_group.add_argument('--btc-testnet', dest='btc_net', action='store_const',
Expand Down Expand Up @@ -149,6 +156,13 @@ with open(calendar_path + '/donation_addr', "r") as fd:

calendar = otsserver.calendar.Calendar(calendar_path)
aggregator = otsserver.calendar.Aggregator(calendar, exit_event)
upstream_calendar = None
upstream_aggregator = None

if args.upstream:
upstream_calendar = opentimestamps.calendar.RemoteCalendar(args.upstream,
user_agent="OpenTimestamps-Server/%s" % otsserver.__version__)
upstream_aggregator = otsserver.calendar.UpstreamAggregator(upstream_calendar, calendar, exit_event, upstream_timeout=args.upstream_timeout)

stamper = otsserver.stamper.Stamper(calendar, exit_event,
args.btc_conf_target,
Expand All @@ -166,7 +180,9 @@ server = otsserver.rpc.StampServer((args.rpc_address, args.rpc_port),
calendar,
args.lightning_invoice_file,
donation_address,
args.explorer_url)
args.explorer_url,
upstream_aggregator=upstream_aggregator,
upstream_calendar=upstream_calendar)

try:
server.serve_forever()
Expand Down
86 changes: 86 additions & 0 deletions otsserver/calendar.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,89 @@ def submit(self, msg):
done_event.wait()

return timestamp

class UpstreamAggregator:
def __init__(self, upstream_calendar, local_calendar, exit_event, upstream_timeout=60):
"""
Args:
calendar: Local calendar for fallback stamping.
upstream_calendar: Remote calendar for upstream stamping.
exit_event: Threading event for graceful shutdown.
timeout: Timeout in seconds for upstream proof upgrade.
"""
self.local_calendar = local_calendar
self.upstream_calendar = upstream_calendar
self.exit_event = exit_event
self.timeout = upstream_timeout
self.pending_upgrades = {} # {digest_str: (Timestamp, submission_time)}
self.upgrade_lock = threading.Lock()
self.thread = threading.Thread(target=self.check_proof_upgrade)
self.thread.start()

def submit(self, msg):
"""Submit message to upstream server and handle fallback"""
timestamp = Timestamp(msg)

# Send to upstream server
try:
upstream_timestamp = self.upstream_calendar.submit(msg, timeout=self.timeout)
except Exception as exc:
logging.warning("Upstream submission failed: %s. Falling back to local stamping.", exc)
return self.fallback_to_local_stamping(msg)

# Check if upstream provided attestations
if upstream_timestamp and upstream_timestamp.attestations:
logging.info("Upstream provided attestations.")
timestamp.merge(upstream_timestamp)
else:
# Add PendingAttestation and monitor for upgrades
logging.info("Adding PendingAttestation and monitoring for upgrades.")
timestamp.attestations.add(PendingAttestation(self.upstream_calendar.url))
digest_str = b2x(msg)
with self.upgrade_lock:
self.pending_upgrades[digest_str] = (timestamp, time.time())

return timestamp

def fallback_to_local_stamping(self, msg):
"""Perform local stamping as a fallback"""
logging.info("Performing local stamping for %s", b2x(msg))
timestamp = Timestamp(msg)
self.calendar.submit(timestamp)
return timestamp

def check_proof_upgrade(self):
"""Monitor for upstream proof upgrades and fallback if needed"""
logging.info("Starting proof upgrade checker thread")

while not self.exit_event.is_set():
with self.upgrade_lock:
current_time = time.time()
to_remove = []

for digest_str, (timestamp, submission_time) in self.pending_upgrades.items():
elapsed_time = current_time - submission_time

# Check if upstream proof has been upgraded
try:
upgraded_timestamp = self.upstream_calendar.get_timestamp(bytes.fromhex(digest_str))
if upgraded_timestamp.attestations:
logging.info("Proof upgraded for %s", digest_str)
timestamp.merge(upgraded_timestamp)
to_remove.append(digest_str)
continue
except Exception as exc:
logging.debug("Failed to fetch upgrade for %s: %s", digest_str, exc)

# Check if timeout has been exceeded
if elapsed_time > self.timeout:
logging.warning("Upstream timeout for %s. Falling back to local stamping.", digest_str)
self.fallback_to_local_stamping(bytes.fromhex(digest_str))
to_remove.append(digest_str)

# Remove completed or timed-out entries
for digest_str in to_remove:
del self.pending_upgrades[digest_str]

# Sleep to avoid excessive resource usage
time.sleep(5)
13 changes: 10 additions & 3 deletions otsserver/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ def post_digest(self):

digest = self.rfile.read(content_length)

timestamp = self.aggregator.submit(digest)
timestamp = None

if self.upstream_calendar is not None:
timestamp = self.upstream_aggregator.submit(digest)
else:
timestamp = self.aggregator.submit(digest)

self.send_response(200)
self.send_header('Content-type', 'application/octet-stream')
Expand Down Expand Up @@ -336,12 +341,14 @@ def do_GET(self):


class StampServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
def __init__(self, server_address, aggregator, calendar, lightning_invoice_file, donation_addr, explorer_url):
def __init__(self, server_address, aggregator, calendar, lightning_invoice_file, donation_addr, explorer_url, upstream_aggregator=None, upstream_calendar=None):

class rpc_request_handler(RPCRequestHandler):
pass
rpc_request_handler.aggregator = aggregator
rpc_request_handler.calendar = calendar
rpc_request_handler.upstream_aggregator = upstream_aggregator
rpc_request_handler.upstream_calendar = upstream_calendar
rpc_request_handler.lightning_invoice_file = lightning_invoice_file
rpc_request_handler.donation_addr = donation_addr
rpc_request_handler.explorer_url = explorer_url
Expand All @@ -350,7 +357,7 @@ class rpc_request_handler(RPCRequestHandler):
rpc_request_handler.backup = Backup(journal, calendar, calendar.path + '/backup_cache')

super().__init__(server_address, rpc_request_handler)

def serve_forever(self):
super().serve_forever()