diff --git a/apihandler.py b/apihandler.py index 1643b77..1024573 100644 --- a/apihandler.py +++ b/apihandler.py @@ -700,9 +700,14 @@ def api_gettransaction(self, socket_handler, db_handler, peers): # and format format = connections.receive(socket_handler) # raw tx details - db_handler.execute_param(db_handler.h, - "SELECT * FROM transactions WHERE substr(signature,1,4)=substr(?1,1,4) and signature like ?1", - (transaction_id+'%',)) + if self.config.old_sqlite: + db_handler.execute_param(db_handler.h, + "SELECT * FROM transactions WHERE signature like ?1", + (transaction_id + '%',)) + else: + db_handler.execute_param(db_handler.h, + "SELECT * FROM transactions WHERE substr(signature,1,4)=substr(?1,1,4) and signature like ?1", + (transaction_id+'%',)) raw = db_handler.h.fetchone() if not format: connections.send(socket_handler, raw) @@ -757,9 +762,14 @@ def api_gettransactionbysignature(self, socket_handler, db_handler, peers): # and format format = connections.receive(socket_handler) # raw tx details - db_handler.execute_param(db_handler.h, - "SELECT * FROM transactions WHERE substr(signature,1,4)=substr(?1,1,4) and signature = ?1", - (signature,)) + if self.config.old_sqlite: + db_handler.execute_param(db_handler.h, + "SELECT * FROM transactions WHERE signature = ?1", + (signature,)) + else: + db_handler.execute_param(db_handler.h, + "SELECT * FROM transactions WHERE substr(signature,1,4)=substr(?1,1,4) and signature = ?1", + (signature,)) raw = db_handler.h.fetchone() if not format: connections.send(socket_handler, raw) @@ -836,9 +846,16 @@ def api_gettransaction_for_recipients(self, socket_handler, db_handler, peers): format = connections.receive(socket_handler) recipients = json.dumps(addresses).replace("[", "(").replace(']', ')') # format as sql # raw tx details - db_handler.execute_param(db_handler.h, - "SELECT * FROM transactions WHERE recipient IN {} AND substr(signature,1,4)=substr(?1,1,4) and signature LIKE ?1".format(recipients), - (transaction_id + '%', )) + if self.config.old_sqlite: + db_handler.execute_param(db_handler.h, + "SELECT * FROM transactions WHERE recipient IN {} AND signature LIKE ?1".format(recipients), + (transaction_id + '%', )) + else: + db_handler.execute_param(db_handler.h, + "SELECT * FROM transactions WHERE recipient IN {} AND substr(signature,1,4)=substr(?1,1,4) and signature LIKE ?1".format( + recipients), + (transaction_id + '%',)) + raw = db_handler.h.fetchone() if not format: connections.send(socket_handler, raw) diff --git a/digest.py b/digest.py index cac828a..0875564 100644 --- a/digest.py +++ b/digest.py @@ -108,17 +108,26 @@ def check_signature(block): if entry_signature: # prevent empty signature database retry hack signature_list.append(entry_signature) # reject block with transactions which are already in the ledger ram + if node.old_sqlite: + db_handler.execute_param(db_handler.h, "SELECT block_height FROM transactions WHERE signature = ?1;", + (entry_signature,)) + else: + db_handler.execute_param(db_handler.h, + "SELECT block_height FROM transactions WHERE substr(signature,1,4) = substr(?1,1,4) and signature = ?1;", + (entry_signature,)) - db_handler.execute_param(db_handler.h, "SELECT block_height FROM transactions WHERE substr(signature,1,4) = substr(?1,1,4) and signature = ?1;", - (entry_signature,)) tx_presence_check = db_handler.h.fetchone() if tx_presence_check: # print(node.last_block) raise ValueError(f"That transaction {entry_signature[:10]} is already in our ledger, " f"block_height {tx_presence_check[0]}") - - db_handler.execute_param(db_handler.c, "SELECT block_height FROM transactions WHERE substr(signature,1,4) = substr(?1,1,4) and signature = ?1;", - (entry_signature,)) + if node.old_sqlite: + db_handler.execute_param(db_handler.c, "SELECT block_height FROM transactions WHERE signature = ?1;", + (entry_signature,)) + else: + db_handler.execute_param(db_handler.c, + "SELECT block_height FROM transactions WHERE substr(signature,1,4) = substr(?1,1,4) and signature = ?1;", + (entry_signature,)) tx_presence_check = db_handler.c.fetchone() if tx_presence_check: # print(node.last_block) diff --git a/mempool.py b/mempool.py index 67c7ced..5701c18 100644 --- a/mempool.py +++ b/mempool.py @@ -61,9 +61,11 @@ # Check for presence of a given tx signature SQL_SIG_CHECK = 'SELECT timestamp FROM transactions WHERE substr(signature,1,4) = substr(?1,1,4) and signature = ?1' +SQL_SIG_CHECK_OLD = 'SELECT timestamp FROM transactions WHERE signature = ?1' # delete a single tx SQL_DELETE_TX = 'DELETE FROM transactions WHERE substr(signature,1,4) = substr(?1,1,4) and signature = ?1' +SQL_DELETE_TX_OLD = 'DELETE FROM transactions WHERE signature = ?1' # Selects all tx from mempool - list fields so we don't send mergedts and keep compatibility SQL_SELECT_ALL_TXS = 'SELECT timestamp, address, recipient, amount, signature, public_key, operation, openfield FROM transactions' @@ -289,7 +291,10 @@ def delete_transaction(self, signature): :return: """ with self.lock: - self.execute(SQL_DELETE_TX, (signature,)) + if self.config.old_sqlite: + self.execute(SQL_DELETE_TX_OLD, (signature,)) + else: + self.execute(SQL_DELETE_TX, (signature,)) self.commit() def sig_check(self, signature): @@ -298,7 +303,10 @@ def sig_check(self, signature): :param signature: :return: boolean """ - return bool(self.fetchone(SQL_SIG_CHECK, (signature,))) + if self.config.old_sqlite: + return bool(self.fetchone(SQL_SIG_CHECK_OLD, (signature,))) + else: + return bool(self.fetchone(SQL_SIG_CHECK, (signature,))) def status(self): """ @@ -578,14 +586,22 @@ def merge(self, data: list, peer_ip: str, c, size_bypass: bool=False, wait: bool # reject transactions which are already in the ledger # TODO: not clean, will need to have ledger as a module too. # TODO: need better txid index, this is very sloooooooow - essentials.execute_param_c(c, "SELECT timestamp FROM transactions WHERE substr(signature,1,4) = substr(?1,1,4) AND signature = ?1", - (mempool_signature_enc,), self.app_log) + if self.config.old_sqlite: + essentials.execute_param_c(c, "SELECT timestamp FROM transactions WHERE signature = ?1", + (mempool_signature_enc,), self.app_log) + else: + essentials.execute_param_c(c, + "SELECT timestamp FROM transactions WHERE substr(signature,1,4) = substr(?1,1,4) AND signature = ?1", + (mempool_signature_enc,), self.app_log) ledger_in = bool(c.fetchone()) # remove from mempool if it's in both ledger and mempool already if mempool_in and ledger_in: try: # Do not lock, we already have the lock for the whole merge. - self.execute(SQL_DELETE_TX, (mempool_signature_enc,)) + if self.config.old_sqlite: + self.execute(SQL_DELETE_TX_OLD, (mempool_signature_enc,)) + else: + self.execute(SQL_DELETE_TX, (mempool_signature_enc,)) self.commit() mempool_result.append("Mempool: Transaction deleted from our mempool") except: # experimental try and except diff --git a/node.py b/node.py index 3865fd4..fb45762 100644 --- a/node.py +++ b/node.py @@ -1974,15 +1974,20 @@ def add_indices(db_handler: dbhandler.DbHandler): node.logger.app_log.warning("Creating indices") # ledger.db - db_handler.execute(db_handler.h, CREATE_TXID4_INDEX_IF_NOT_EXISTS) + if not node.old_sqlite: + db_handler.execute(db_handler.h, CREATE_TXID4_INDEX_IF_NOT_EXISTS) + else: + node.logger.app_log.warning("Setting old_sqlite is True, lookups will be slower.") db_handler.execute(db_handler.h, CREATE_MISC_BLOCK_HEIGHT_INDEX_IF_NOT_EXISTS) # hyper.db - db_handler.execute(db_handler.h2, CREATE_TXID4_INDEX_IF_NOT_EXISTS) + if not node.old_sqlite: + db_handler.execute(db_handler.h2, CREATE_TXID4_INDEX_IF_NOT_EXISTS) db_handler.execute(db_handler.h2, CREATE_MISC_BLOCK_HEIGHT_INDEX_IF_NOT_EXISTS) # RAM or hyper.db - db_handler.execute(db_handler.c, CREATE_TXID4_INDEX_IF_NOT_EXISTS) + if not node.old_sqlite: + db_handler.execute(db_handler.c, CREATE_TXID4_INDEX_IF_NOT_EXISTS) db_handler.execute(db_handler.c, CREATE_MISC_BLOCK_HEIGHT_INDEX_IF_NOT_EXISTS) node.logger.app_log.warning("Finished creating indices") @@ -2009,6 +2014,7 @@ def add_indices(db_handler: dbhandler.DbHandler): # or just do node.config = config # and use node.config.port... aso + # TODO: Simplify. Just do node.config = config, then use node.config.required_option node.version = config.version node.debug_level = config.debug_level node.port = config.port @@ -2032,6 +2038,7 @@ def add_indices(db_handler: dbhandler.DbHandler): node.full_ledger = config.full_ledger node.trace_db_calls = config.trace_db_calls node.heavy3_path = config.heavy3_path + node.old_sqlite = config.old_sqlite node.logger.app_log = log.log("node.log", node.debug_level, node.terminal_output) node.logger.app_log.warning("Configuration settings loaded") diff --git a/options.py b/options.py index 360a87b..fefa919 100644 --- a/options.py +++ b/options.py @@ -1,37 +1,38 @@ +import json import os.path as path from sys import exit -import json + class Get: # "param_name":["type"] or "param_name"=["type","property_name"] - vars={ - "port":["str"], - "verify":["bool","verify"], - "testnet":["bool"], - "regnet":["bool"], - "version":["str","version"], - "version_allow":["list"], - "thread_limit":["int","thread_limit"], - "rebuild_db":["bool","rebuild_db"], - "debug":["bool","debug"], - "purge":["bool","purge"], - "pause":["int","pause"], - "ledger_path":["str","ledger_path"], - "hyper_path":["str","hyper_path"], - "hyper_recompress":["bool","hyper_recompress"], - "full_ledger":["bool","full_ledger"], - "ban_threshold":["int"], - "tor":["bool","tor"], - "debug_level":["str","debug_level"], - "allowed":["str","allowed"], - "ram":["bool","ram"], - "node_ip":["str","node_ip"], - "light_ip":["dict"], - "reveal_address":["bool"], - "accept_peers":["bool"], - "banlist":["list"], - "whitelist":["list"], - "nodes_ban_reset":["int"], + vars = { + "port": ["str"], + "verify": ["bool", "verify"], + "testnet": ["bool"], + "regnet": ["bool"], + "version": ["str", "version"], + "version_allow": ["list"], + "thread_limit": ["int", "thread_limit"], + "rebuild_db": ["bool", "rebuild_db"], + "debug": ["bool", "debug"], + "purge": ["bool", "purge"], + "pause": ["int", "pause"], + "ledger_path": ["str", "ledger_path"], + "hyper_path": ["str", "hyper_path"], + "hyper_recompress": ["bool", "hyper_recompress"], + "full_ledger": ["bool", "full_ledger"], + "ban_threshold": ["int"], + "tor": ["bool", "tor"], + "debug_level": ["str", "debug_level"], + "allowed": ["str", "allowed"], + "ram": ["bool", "ram"], + "node_ip": ["str", "node_ip"], + "light_ip": ["dict"], + "reveal_address": ["bool"], + "accept_peers": ["bool"], + "banlist": ["list"], + "whitelist": ["list"], + "nodes_ban_reset": ["int"], "mempool_allowed": ["list"], "terminal_output": ["bool"], "gui_scaling": ["str"], @@ -40,6 +41,7 @@ class Get: "trace_db_calls": ["bool"], "heavy3_path": ["str"], "mempool_path": ["str"], + "old_sqlite": ["bool"], } # Optional default values so we don't bug if they are not in the config. @@ -51,16 +53,19 @@ class Get: "mempool_ram": True, "heavy3_path": "./heavy3a.bin", "mempool_path": "./mempool.db", + "old_sqlite": False, } def load_file(self, filename): - #print("Loading",filename) + # print("Loading",filename) with open(filename) as fp: for line in fp: - if '=' in line: - left,right = map(str.strip,line.rstrip("\n").split("=")) + if "=" in line: + left, right = map(str.strip, line.rstrip("\n").split("=")) if "mempool_ram_conf" == left: - print("Inconsistent config, param is now mempool_ram in config.txt") + print( + "Inconsistent config, param is now mempool_ram in config.txt" + ) exit() if not left in self.vars: # Warn for unknown param? @@ -71,7 +76,7 @@ def load_file(self, filename): elif params[0] == "dict": try: right = json.loads(right) - except: #compatibility + except: # compatibility right = [item.strip() for item in right.split(",")] elif params[0] == "list": right = [item.strip() for item in right.split(",")] @@ -94,7 +99,7 @@ def load_file(self, filename): if key not in self.__dict__: setattr(self, key, default) - #print(self.__dict__) + # print(self.__dict__) def read(self): # first of all, load from default config so we have all needed params diff --git a/rpcconnections.py b/rpcconnections.py new file mode 100644 index 0000000..3adb0a7 --- /dev/null +++ b/rpcconnections.py @@ -0,0 +1,166 @@ +""" +Bismuth default/legacy connection layer. +Json over sockets +This file is no more compatible with the Bismuth code, it's been converted to a class +EggPool 2018 +""" + +import json +import socket +import time +import threading + +# Logical timeout +LTIMEOUT = 45 +# Fixed header length +SLEN = 10 + + +__version__ = '0.1.7' + + +class Connection(object): + """Connection to a Bismuth Node. Handles auto reconnect when needed""" + + __slots__ = ('ipport', 'verbose', 'sdef', 'stats', 'last_activity', 'command_lock', 'raw') + + def __init__(self, ipport, verbose=False, raw=False): + """ipport is an (ip, port) tuple""" + self.ipport = ipport + self.verbose = verbose + self.raw = raw + self.sdef = None + self.last_activity = 0 + self.command_lock = threading.Lock() + self.check_connection() + + def check_connection(self): + """Check connection state and reconnect if needed.""" + if not self.sdef: + try: + if self.verbose: + print("Connecting to", self.ipport) + self.sdef = socket.socket() + self.sdef.connect(self.ipport) + self.last_activity = time.time() + except Exception as e: + self.sdef = None + raise RuntimeError("Connections: {}".format(e)) + + def _send(self, data, slen=SLEN, retry=True): + """Sends something to the server""" + self.check_connection() + try: + self.sdef.settimeout(LTIMEOUT) + # Make sure the packet is sent in one call + sdata = str(json.dumps(data)) + res = self.sdef.sendall(str(len(sdata)).encode("utf-8").zfill(slen)+sdata.encode("utf-8")) + if self.raw: + print("sending raw:") + print(str(len(sdata)).encode("utf-8").zfill(slen)+sdata.encode("utf-8")) + self.last_activity = time.time() + # res is always 0 on linux + if self.verbose: + print("send ", data) + return True + except Exception as e: + # send failed, try to reconnect + # TODO: handle tries # + self.sdef = None + if retry: + if self.verbose: + print("Send failed ({}), trying to reconnect".format(e)) + self.check_connection() + else: + if self.verbose: + print("Send failed ({}), not retrying.".format(e)) + return False + try: + self.sdef.settimeout(LTIMEOUT) + # Make sure the packet is sent in one call + self.sdef.sendall(str(len(str(json.dumps(data)))).encode("utf-8").zfill(slen)+str(json.dumps(data)).encode("utf-8")) + return True + except Exception as e: + self.sdef = None + raise RuntimeError("Connections: {}".format(e)) + + def _receive(self, slen=SLEN): + """Wait for an answer, for LTIMEOUT sec.""" + self.check_connection() + self.sdef.settimeout(LTIMEOUT) + if self.raw: + print("getting raw:") + try: + data = self.sdef.recv(slen) + if self.raw: + raw = data + if not data: + raise RuntimeError("Socket EOF") + data = int(data) # receive length + except socket.timeout as e: + self.sdef = None + return "" + try: + chunks = [] + bytes_recd = 0 + while bytes_recd < data: + chunk = self.sdef.recv(min(data - bytes_recd, 2048)) + if not chunk: + raise RuntimeError("Socket EOF2") + chunks.append(chunk) + bytes_recd = bytes_recd + len(chunk) + self.last_activity = time.time() + if self.raw: + print(raw + b''.join(chunks)) + segments = b''.join(chunks).decode("utf-8") + return json.loads(segments) + except Exception as e: + """ + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + print(exc_type, fname, exc_tb.tb_lineno) + """ + self.sdef = None + raise RuntimeError("Connections: {}".format(e)) + + def command(self, command, options=None): + """ + Sends a command and return it's raw result. + options has to be a list. + Each item of options will be sent separately. So If you want to send a list, pass a list of list. + """ + with self.command_lock: + try: + self._send(command) + if options: + for option in options: + self._send(option, retry=False) + ret = self._receive() + return ret + except Exception as e: + """ + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + print(exc_type, fname, exc_tb.tb_lineno) + """ + # TODO : better handling of tries and delay between + if self.verbose: + print("Error <{}> sending command, trying to reconnect.".format(e)) + self.check_connection() + self._send(command) + if options: + for option in options: + self._send(option, retry=False) + ret = self._receive() + return ret + + def close(self): + """Close the socket""" + try: + self.sdef.close() + except Exception as e: + pass + + +if __name__ == "__main__": + print("I'm a module, can't run!")