From cdd41e69d9df15ce4d56dc5d5784d3bc8101585f Mon Sep 17 00:00:00 2001 From: practice Date: Mon, 23 Dec 2024 02:05:45 +0800 Subject: [PATCH] Add UpstreamAggregator to handle upstream stamping submit --- otsd | 18 ++++++++- otsserver/calendar.py | 86 +++++++++++++++++++++++++++++++++++++++++++ otsserver/rpc.py | 13 +++++-- 3 files changed, 113 insertions(+), 4 deletions(-) diff --git a/otsd b/otsd index 230fb74..348a19a 100755 --- a/otsd +++ b/otsd @@ -27,6 +27,8 @@ import otsserver.calendar import otsserver.rpc import otsserver.stamper +import opentimestamps.calendar + parser = argparse.ArgumentParser(description="OpenTimestamps Server") parser.add_argument("-q", "--quiet", action="count", default=0, @@ -92,6 +94,11 @@ parser.add_argument("--btc-max-fee", metavar='FEE', type=float, parser.add_argument("--btc-full-rbf", action='store_true', default=False, help="Do full-RBF replacements") +parser.add_argument("--upstream", type=str, + help="Calling another upstream server for timestamping, while performing a transaction by itself if the upstream is not available") + +parser.add_argument("--upstream-timeout", type=int, default=100, + help="If the upstream don't upgrade after the timeout, proceed timestamping by its own") btc_net_group = parser.add_mutually_exclusive_group() btc_net_group.add_argument('--btc-testnet', dest='btc_net', action='store_const', @@ -149,6 +156,13 @@ with open(calendar_path + '/donation_addr', "r") as fd: calendar = otsserver.calendar.Calendar(calendar_path) aggregator = otsserver.calendar.Aggregator(calendar, exit_event) +upstream_calendar = None +upstream_aggregator = None + +if args.upstream: + upstream_calendar = opentimestamps.calendar.RemoteCalendar(args.upstream, + user_agent="OpenTimestamps-Server/%s" % otsserver.__version__) + upstream_aggregator = otsserver.calendar.UpstreamAggregator(upstream_calendar, calendar, exit_event, upstream_timeout=args.upstream_timeout) stamper = otsserver.stamper.Stamper(calendar, exit_event, args.btc_conf_target, @@ -166,7 +180,9 @@ server = otsserver.rpc.StampServer((args.rpc_address, args.rpc_port), calendar, args.lightning_invoice_file, donation_address, - args.explorer_url) + args.explorer_url, + upstream_aggregator=upstream_aggregator, + upstream_calendar=upstream_calendar) try: server.serve_forever() diff --git a/otsserver/calendar.py b/otsserver/calendar.py index 8c6be68..be3e53d 100644 --- a/otsserver/calendar.py +++ b/otsserver/calendar.py @@ -309,3 +309,89 @@ def submit(self, msg): done_event.wait() return timestamp + +class UpstreamAggregator: + def __init__(self, upstream_calendar, local_calendar, exit_event, upstream_timeout=60): + """ + Args: + calendar: Local calendar for fallback stamping. + upstream_calendar: Remote calendar for upstream stamping. + exit_event: Threading event for graceful shutdown. + timeout: Timeout in seconds for upstream proof upgrade. + """ + self.local_calendar = local_calendar + self.upstream_calendar = upstream_calendar + self.exit_event = exit_event + self.timeout = upstream_timeout + self.pending_upgrades = {} # {digest_str: (Timestamp, submission_time)} + self.upgrade_lock = threading.Lock() + self.thread = threading.Thread(target=self.check_proof_upgrade) + self.thread.start() + + def submit(self, msg): + """Submit message to upstream server and handle fallback""" + timestamp = Timestamp(msg) + + # Send to upstream server + try: + upstream_timestamp = self.upstream_calendar.submit(msg, timeout=self.timeout) + except Exception as exc: + logging.warning("Upstream submission failed: %s. Falling back to local stamping.", exc) + return self.fallback_to_local_stamping(msg) + + # Check if upstream provided attestations + if upstream_timestamp and upstream_timestamp.attestations: + logging.info("Upstream provided attestations.") + timestamp.merge(upstream_timestamp) + else: + # Add PendingAttestation and monitor for upgrades + logging.info("Adding PendingAttestation and monitoring for upgrades.") + timestamp.attestations.add(PendingAttestation(self.upstream_calendar.url)) + digest_str = b2x(msg) + with self.upgrade_lock: + self.pending_upgrades[digest_str] = (timestamp, time.time()) + + return timestamp + + def fallback_to_local_stamping(self, msg): + """Perform local stamping as a fallback""" + logging.info("Performing local stamping for %s", b2x(msg)) + timestamp = Timestamp(msg) + self.calendar.submit(timestamp) + return timestamp + + def check_proof_upgrade(self): + """Monitor for upstream proof upgrades and fallback if needed""" + logging.info("Starting proof upgrade checker thread") + + while not self.exit_event.is_set(): + with self.upgrade_lock: + current_time = time.time() + to_remove = [] + + for digest_str, (timestamp, submission_time) in self.pending_upgrades.items(): + elapsed_time = current_time - submission_time + + # Check if upstream proof has been upgraded + try: + upgraded_timestamp = self.upstream_calendar.get_timestamp(bytes.fromhex(digest_str)) + if upgraded_timestamp.attestations: + logging.info("Proof upgraded for %s", digest_str) + timestamp.merge(upgraded_timestamp) + to_remove.append(digest_str) + continue + except Exception as exc: + logging.debug("Failed to fetch upgrade for %s: %s", digest_str, exc) + + # Check if timeout has been exceeded + if elapsed_time > self.timeout: + logging.warning("Upstream timeout for %s. Falling back to local stamping.", digest_str) + self.fallback_to_local_stamping(bytes.fromhex(digest_str)) + to_remove.append(digest_str) + + # Remove completed or timed-out entries + for digest_str in to_remove: + del self.pending_upgrades[digest_str] + + # Sleep to avoid excessive resource usage + time.sleep(5) diff --git a/otsserver/rpc.py b/otsserver/rpc.py index 35d09d0..be50a6c 100644 --- a/otsserver/rpc.py +++ b/otsserver/rpc.py @@ -67,7 +67,12 @@ def post_digest(self): digest = self.rfile.read(content_length) - timestamp = self.aggregator.submit(digest) + timestamp = None + + if self.upstream_calendar is not None: + timestamp = self.upstream_aggregator.submit(digest) + else: + timestamp = self.aggregator.submit(digest) self.send_response(200) self.send_header('Content-type', 'application/octet-stream') @@ -336,12 +341,14 @@ def do_GET(self): class StampServer(socketserver.ThreadingMixIn, http.server.HTTPServer): - def __init__(self, server_address, aggregator, calendar, lightning_invoice_file, donation_addr, explorer_url): + def __init__(self, server_address, aggregator, calendar, lightning_invoice_file, donation_addr, explorer_url, upstream_aggregator=None, upstream_calendar=None): class rpc_request_handler(RPCRequestHandler): pass rpc_request_handler.aggregator = aggregator rpc_request_handler.calendar = calendar + rpc_request_handler.upstream_aggregator = upstream_aggregator + rpc_request_handler.upstream_calendar = upstream_calendar rpc_request_handler.lightning_invoice_file = lightning_invoice_file rpc_request_handler.donation_addr = donation_addr rpc_request_handler.explorer_url = explorer_url @@ -350,7 +357,7 @@ class rpc_request_handler(RPCRequestHandler): rpc_request_handler.backup = Backup(journal, calendar, calendar.path + '/backup_cache') super().__init__(server_address, rpc_request_handler) - + def serve_forever(self): super().serve_forever()