diff --git a/docs/onion-message-channels.md b/docs/onion-message-channels.md new file mode 100644 index 000000000..deb5af6cb --- /dev/null +++ b/docs/onion-message-channels.md @@ -0,0 +1,173 @@ +# HOW TO SETUP ONION MESSAGE CHANNELS IN JOINMARKET + +### Contents + +1. [Overview](#overview) + +2. [Testing, configuring for signet](#testing) + +4. [Directory nodes](#directory) + + + +## Overview + +This is a new way for Joinmarket bots to communicate, namely by serving and connecting to Tor onion services. This does not +introduce any new requirements to your Joinmarket installation, technically, because the use of Payjoin already required the need +to service such onion services, and connecting to IRC used a SOCKS5 proxy (by default, and used by almost all users) over Tor to +a remote onion service. + +The purpose of this new type of message channel is as follows: + +* less reliance on any service external to Joinmarket +* most of the transaction negotiation will be happening directly peer to peer, not passed over a central server ( +albeit it was and remains E2E encrypted data, in either case) +* the above can lead to better scalability at large numbers +* a substantial increase in the speed of transaction negotiation; this is mostly related to the throttling of high bursts of traffic on IRC + +The configuration for a user is simple; in their `joinmarket.cfg` they will add a messaging section like this: + +``` +[MESSAGING:onion1] +type = onion +onion_serving_port = 8082 +# This is a comma separated list (comma can be omitted if only one item). +# Each item has format host:port +directory_nodes = rr6f6qtleiiwic45bby4zwmiwjrj3jsbmcvutwpqxjziaydjydkk5iad.onion:80 +``` + +Here, I have deliberately omitted the several other settings in this section which will almost always be fine as default; +see `jmclient/jmclient/configure.py` for what those defaults are, and the extensive comments explaining. + +The main point is the list of **directory nodes** (the one shown here is one being run on signet, right now), which will +be comma separated if multiple directory nodes are configured (we expect there will be 2 or 3 as a normal situation). +The `onion_serving_port` is on which port on the local machine the onion service is served. +The `type` field must always be `onion` in this case, and distinguishes it from IRC message channels and others. + +### Can/should I still run IRC message channels? + +In short, yes. + +### Do I need to configure Tor, and if so, how? + +These message channels use both outbound and inbound connections to onion services (or "hidden services"). + +As previously mentioned, both of these features were already in use in Joinmarket. If you never served an +onion service before, it should work fine as long as you have the Tor service running in the background, +and the default control port 9051 (if not, change that value in the `joinmarket.cfg`, see above. + +#### Why not use Lightning based onions? + +(*Feel free to skip this section if you don't know what "Lightning based onions" refers to!*). The reason this architecture is +proposed as an alternative to the previously suggested Lightning-node-based network (see +[this PR](https://github.com/JoinMarket-Org/joinmarket-clientserver/pull/1000)), is mostly that: + +* the latter has a bunch of extra installation and maintenance dependencies (just one example: pyln-client requires coincurve, which we just +removed) +* the latter requires establishing a new node "identity" which can be refreshed, but that creates more concern +* longer term ideas to integrate Lightning payments to the coinjoin workflow (and vice versa!) are not realizable yet +* using multi-hop onion messaging in the LN network itself is also a way off, and a bit problematic + +So the short version is: the Lightning based alternative is certainly feasible, but has a lot more baggage that can't really be justified +unless we're actually using it for something. + + + + +## Testing, and configuring for signet. + +This testing section focuses on signet since that will be the less troublesome way of getting involved in tests for +the non-hardcore JM developer :) + +(For the latter, please use the regtest setup by running `test/e2e-coinjoin-test.py` under `pytest`, +and pay attention to the settings in `regtest_joinmarket.cfg`.) + +There is no separate/special configuration for signet other than the configuration that is already needed for running +Joinmarket against a signet backend (so e.g. RPC port of 38332). + +Add the `[MESSAGING:onion1]` message channel section to your `joinmarket.cfg`, as listed above, including the +signet directory node listed above (rr6f6qtleiiwic45bby4zwmiwjrj3jsbmcvutwpqxjziaydjydkk5iad.onion:80), and, +for the simplest test, remove the other `[MESSAGING:*]` sections that you have. + +Then just make sure your bot has some signet coins and try running as maker or taker or both. + + + +## Directory nodes + +**This last section is for people with a lot of technical knowledge in this area, +who would like to help by running a directory node. You can ignore it if that does not apply.**. + +This requires a long running bot. It should be on a server you can keep running permanently, so perhaps a VPS, +but in any case, very high uptime. For reliability it also makes sense to configure to run as a systemd service. + +A note: in this early stage, the usage of Lightning is only really network-layer stuff, and the usage of bitcoin, is none; feel free to add elements that remove any need for a backend bitcoin blockchain, but beware: future upgrades *could* mean that the directory node really does need the bitcoin backend. + +#### Joinmarket-specific configuration + +Add `hidden_service_dir` to your `[MESSAGING:onion1]` with a directory accessible to your user. You may want to lock this down +a bit! +The point to understand is: Joinmarket's `jmbase.JMHiddenService` will, if configured with a non-empty `hidden_service_dir` +field, actually start an *independent* instance of Tor specifically for serving this, under the current user. +(our tor interface library `txtorcon` needs read access to the Tor HS dir, so it's troublesome to do this another way). + +##### Question: How to configure the `directory-nodes` list in our `joinmarket.cfg` for this directory node bot? + +Answer: **you must only enter your own node in this list!** (otherwise you may find your bot infinitely rebroadcasting messages). + + +#### Suggested setup of a service: + +You will need two components: bitcoind, and Joinmarket itself, which you can run as a yg. +Since this task is going to be attempted by someone with significant technical knowledge, +only an outline is provided here; several details will need to be filled in. +Here is a sketch of how the systemd service files can be set up for signet: + +If someone wants to put together a docker setup of this for a more "one-click install", that would be great. + +1. bitcoin-signet.service + +``` +[Unit] +Description=bitcoind signet +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +ExecStart=/usr/local/bin/bitcoind -signet +User=user + +[Install] +WantedBy=multi-user.target +``` + +This is deliberately a super-basic setup (see above). Don't forget to setup your `bitcoin.conf` as usual, +for the bitcoin user, and make it match (specifically in terms of RPC) what you set up for Lightning below. + + +2. + +``` +[Unit] +Description=joinmarket directory node on signet +Requires=bitcoin-signet.service +After=bitcoin-signet.service + +[Service] +Type=simple +ExecStart=/bin/bash -c 'cd /path/to/joinmarket-clientserver && source jmvenv/bin/activate && cd scripts && echo -n "password" | python yg-privacyenhanced.py --wallet-password-stdin --datadir=/custom/joinmarket-datadir some-signet-wallet.jmdat' +User=user + +[Install] +WantedBy=multi-user.target +``` + +To state the obvious, the idea here is that this second service will run the JM directory node and have a dependency on the previous one, +to ensure they start up in the correct order. + +Re: password echo, obviously this kind of password entry is bad; +for now we needn't worry as these nodes don't need to carry any real coins (and it's better they don't!). +Later we may need to change that (though of course you can use standard measures to protect the box). + +TODO: add some material on network hardening/firewalls here, I guess. diff --git a/jmbase/jmbase/twisted_utils.py b/jmbase/jmbase/twisted_utils.py index f7e2f287b..b7594d181 100644 --- a/jmbase/jmbase/twisted_utils.py +++ b/jmbase/jmbase/twisted_utils.py @@ -128,16 +128,23 @@ def config_to_hs_ports(virtual_port, host, port): class JMHiddenService(object): """ Wrapper class around the actions needed to create and serve on a hidden service; an object of - type Resource must be provided in the constructor, - which does the HTTP serving actions (GET, POST serving). + type either Resource or server.ProtocolFactory must + be provided in the constructor, which does the HTTP + (GET, POST) or other protocol serving actions. """ - def __init__(self, resource, info_callback, error_callback, - onion_hostname_callback, tor_control_host, + def __init__(self, proto_factory_or_resource, info_callback, + error_callback, onion_hostname_callback, tor_control_host, tor_control_port, serving_host, serving_port, - virtual_port = None, - shutdown_callback = None): - self.site = Site(resource) - self.site.displayTracebacks = False + virtual_port=None, + shutdown_callback=None, + hidden_service_dir=""): + if isinstance(proto_factory_or_resource, Resource): + # TODO bad naming, in this case it doesn't start + # out as a protocol factory; a Site is one, a Resource isn't. + self.proto_factory = Site(proto_factory_or_resource) + self.proto_factory.displayTracebacks = False + else: + self.proto_factory = proto_factory_or_resource self.info_callback = info_callback self.error_callback = error_callback # this has a separate callback for convenience, it should @@ -155,6 +162,13 @@ def __init__(self, resource, info_callback, error_callback, # config object, so no default here: self.serving_host = serving_host self.serving_port = serving_port + # this is used to serve an onion from the filesystem, + # NB: Because of how txtorcon is set up, this option + # uses a *separate tor instance* owned by the owner of + # this script (because txtorcon needs to read the + # HS dir), whereas if this option is "", we set up + # an ephemeral HS on the global or pre-existing tor. + self.hidden_service_dir = hidden_service_dir def start_tor(self): """ This function executes the workflow @@ -162,19 +176,31 @@ def start_tor(self): """ self.info_callback("Attempting to start onion service on port: {} " "...".format(self.virtual_port)) - if str(self.tor_control_host).startswith('unix:'): - control_endpoint = UNIXClientEndpoint(reactor, - self.tor_control_host[5:]) + if self.hidden_service_dir == "": + if str(self.tor_control_host).startswith('unix:'): + control_endpoint = UNIXClientEndpoint(reactor, + self.tor_control_host[5:]) + else: + control_endpoint = TCP4ClientEndpoint(reactor, + self.tor_control_host, self.tor_control_port) + d = txtorcon.connect(reactor, control_endpoint) + d.addCallback(self.create_onion_ep) + d.addErrback(self.setup_failed) + # TODO: add errbacks to the next two calls in + # the chain: + d.addCallback(self.onion_listen) + d.addCallback(self.print_host) else: - control_endpoint = TCP4ClientEndpoint(reactor, - self.tor_control_host, self.tor_control_port) - d = txtorcon.connect(reactor, control_endpoint) - d.addCallback(self.create_onion_ep) - d.addErrback(self.setup_failed) - # TODO: add errbacks to the next two calls in - # the chain: - d.addCallback(self.onion_listen) - d.addCallback(self.print_host) + ep = "onion:" + str(self.virtual_port) + ":localPort=" + ep += str(self.serving_port) + # endpoints.TCPHiddenServiceEndpoint creates version 2 by + # default for backwards compat (err, txtorcon needs to update that ...) + ep += ":version=3" + ep += ":hiddenServiceDir="+self.hidden_service_dir + onion_endpoint = serverFromString(reactor, ep) + d = onion_endpoint.listen(self.proto_factory) + d.addCallback(self.print_host_filesystem) + def setup_failed(self, arg): # Note that actions based on this failure are deferred to callers: @@ -195,7 +221,8 @@ def onion_listen(self, onion): serverstring = "tcp:{}:interface={}".format(self.serving_port, self.serving_host) onion_endpoint = serverFromString(reactor, serverstring) - return onion_endpoint.listen(self.site) + print("created the onion endpoint, now calling listen") + return onion_endpoint.listen(self.proto_factory) def print_host(self, ep): """ Callback fired once the HS is available @@ -204,6 +231,14 @@ def print_host(self, ep): """ self.onion_hostname_callback(self.onion.hostname) + def print_host_filesystem(self, port): + """ As above but needed to respect slightly different + callback chain for this case (where we start our own tor + instance for the filesystem-based onion). + """ + self.onion = port.onion_service + self.onion_hostname_callback(self.onion.hostname) + def shutdown(self): self.tor_connection.protocol.transport.loseConnection() self.info_callback("Hidden service shutdown complete") diff --git a/jmclient/jmclient/__init__.py b/jmclient/jmclient/__init__.py index e410638b6..ca91ace64 100644 --- a/jmclient/jmclient/__init__.py +++ b/jmclient/jmclient/__init__.py @@ -24,7 +24,7 @@ TYPE_P2PKH, TYPE_P2SH_P2WPKH, TYPE_P2WPKH, detect_script_type) from .configure import (load_test_config, process_shutdown, load_program_config, jm_single, get_network, update_persist_config, - validate_address, is_burn_destination, get_irc_mchannels, + validate_address, is_burn_destination, get_mchannels, get_blockchain_interface_instance, set_config, is_segwit_mode, is_native_segwit_mode, JMPluginService, get_interest_rate, get_bondless_makers_allowance) from .blockchaininterface import (BlockchainInterface, diff --git a/jmclient/jmclient/client_protocol.py b/jmclient/jmclient/client_protocol.py index 68ea865ac..01b00b8e7 100644 --- a/jmclient/jmclient/client_protocol.py +++ b/jmclient/jmclient/client_protocol.py @@ -15,7 +15,7 @@ import sys from jmbase import (get_log, EXIT_FAILURE, hextobin, bintohex, utxo_to_utxostr, bdict_sdict_convert) -from jmclient import (jm_single, get_irc_mchannels, +from jmclient import (jm_single, get_mchannels, RegtestBitcoinCoreInterface, SNICKERReceiver, process_shutdown) import jmbitcoin as btc @@ -434,7 +434,7 @@ def clientStart(self): "blockchain_source") #needed only for channel naming convention network = jm_single().config.get("BLOCKCHAIN", "network") - irc_configs = get_irc_mchannels() + irc_configs = self.factory.get_mchannels() #only here because Init message uses this field; not used by makers TODO minmakers = jm_single().config.getint("POLICY", "minimum_makers") maker_timeout_sec = jm_single().maker_timeout_sec @@ -601,7 +601,7 @@ def clientStart(self): "blockchain_source") #needed only for channel naming convention network = jm_single().config.get("BLOCKCHAIN", "network") - irc_configs = get_irc_mchannels() + irc_configs = self.factory.get_mchannels() minmakers = jm_single().config.getint("POLICY", "minimum_makers") maker_timeout_sec = jm_single().maker_timeout_sec @@ -795,6 +795,14 @@ def getClient(self): def buildProtocol(self, addr): return self.protocol(self, self.client) + def get_mchannels(self): + """ A transparent wrapper that allows override, + so that a script can return a customised set of + message channel configs; currently used for testing + multiple bots on regtest. + """ + return get_mchannels() + def start_reactor(host, port, factory=None, snickerfactory=None, bip78=False, jm_coinjoin=True, ish=True, daemon=False, rs=True, gui=False): #pragma: no cover diff --git a/jmclient/jmclient/configure.py b/jmclient/jmclient/configure.py index b12e194bf..0c3ff6b10 100644 --- a/jmclient/jmclient/configure.py +++ b/jmclient/jmclient/configure.py @@ -140,6 +140,9 @@ def jm_single(): ## SERVER 1/3) Darkscience IRC (Tor, IP) ################################################################################ [MESSAGING:server1] +# by default the legacy format without a `type` field is +# understood to be IRC, but you can, optionally, add it: +# type = irc channel = joinmarket-pit port = 6697 usessl = true @@ -154,24 +157,47 @@ def jm_single(): #socks5_host = localhost #socks5_port = 9050 -## SERVER 2/3) hackint IRC (Tor, IP) -################################################################################ -[MESSAGING:server2] -channel = joinmarket-pit +[MESSAGING:onion1] +# onion based message channels must have the exact type 'onion' +# (while the section name above can be MESSAGING:whatever), and there must +# be only ONE such message channel configured (note the directory servers +# can be multiple, below): +type = onion -# For traditional IP (default): -host = irc.hackint.org -port = 6697 -usessl = true -socks5 = false +socks5_host = localhost +socks5_port = 9050 -# For Tor (recommended as clearnet alternative): -#host = ncwkrwxpq2ikcngxq3dy2xctuheniggtqeibvgofixpzvrwpa77tozqd.onion -#port = 6667 -#usessl = false -#socks5 = true -#socks5_host = localhost -#socks5_port = 9050 +# the tor control configuration. +# for most people running the tor daemon +# on Linux, no changes are required here: +tor_control_host = localhost +# or, to use a UNIX socket +# tor_control_host = unix:/var/run/tor/control +tor_control_port = 9051 + +# the host/port actually serving the hidden service +# (note the *virtual port*, that the client uses, +# is hardcoded to 80): +onion_serving_host = 127.0.0.1 +onion_serving_port = 8080 + +# directory node configuration +# +# This is mandatory for directory nodes (who must also set their +# own *.onion:port as the only directory in directory_nodes, below), +# but NOT TO BE USED by non-directory nodes (which is you, unless +# you know otherwise!), as it will greatly degrade your privacy. +# (note the default is no value, don't replace it with ""). +hidden_service_dir = +# +# This is a comma separated list (comma can be omitted if only one item). +# Each item has format host:port ; both are required, though port will +# be 80 if created in this code. +directory_nodes = rr6f6qtleiiwic45bby4zwmiwjrj3jsbmcvutwpqxjziaydjydkk5iad.onion:80 + +# This setting is ONLY for developer regtest setups, +# running multiple bots at once. Don't alter it otherwise +regtest_count = 0,0 ## SERVER 3/3) ILITA IRC (Tor - disabled by default) ################################################################################ @@ -484,7 +510,7 @@ def set_config(cfg, bcint=None): global_singleton.bc_interface = bcint -def get_irc_mchannels(): +def get_mchannels(): SECTION_NAME = 'MESSAGING' # FIXME: remove in future release if jm_single().config.has_section(SECTION_NAME): @@ -495,16 +521,30 @@ def get_irc_mchannels(): return _get_irc_mchannels_old() SECTION_NAME += ':' - irc_sections = [] + sections = [] for s in jm_single().config.sections(): if s.startswith(SECTION_NAME): - irc_sections.append(s) - assert irc_sections + sections.append(s) + assert sections - req_fields = [("host", str), ("port", int), ("channel", str), ("usessl", str)] + irc_fields = [("host", str), ("port", int), ("channel", str), ("usessl", str), + ("socks5", str), ("socks5_host", str), ("socks5_port", str)] + onion_fields = [("type", str), ("directory_nodes", str), ("regtest_count", str), + ("socks5_host", str), ("socks5_port", int), + ("tor_control_host", str), ("tor_control_port", int), + ("onion_serving_host", str), ("onion_serving_port", int), + ("hidden_service_dir", str)] configs = [] - for section in irc_sections: + + # processing the IRC sections: + for section in sections: + if jm_single().config.has_option(section, "type"): + # legacy IRC configs do not have "type" but just + # in case, we'll allow the "irc" type: + if not jm_single().config.get(section, "type").lower( + ) == "irc": + break server_data = {} # check if socks5 is enabled for tor and load relevant config if so @@ -516,13 +556,30 @@ def get_irc_mchannels(): server_data["socks5_host"] = jm_single().config.get(section, "socks5_host") server_data["socks5_port"] = jm_single().config.get(section, "socks5_port") - for option, otype in req_fields: + for option, otype in irc_fields: val = jm_single().config.get(section, option) server_data[option] = otype(val) server_data['btcnet'] = get_network() configs.append(server_data) - return configs + # processing the onion sections: + for section in sections: + if not jm_single().config.has_option(section, "type") or \ + not jm_single().config.get(section, "type").lower() == "onion": + continue + onion_data = {} + for option, otype in onion_fields: + try: + val = jm_single().config.get(section, option) + except NoOptionError: + continue + onion_data[option] = otype(val) + onion_data['btcnet'] = get_network() + # Just to allow a dynamic set of var: + onion_data["section-name"] = section + configs.append(onion_data) + + return configs def _get_irc_mchannels_old(): fields = [("host", str), ("port", int), ("channel", str), ("usessl", str), @@ -651,28 +708,6 @@ def load_program_config(config_path="", bs=None, plugin_services=[]): "settings and restart joinmarket.", "info") sys.exit(EXIT_FAILURE) - #These are left as sanity checks but currently impossible - #since any edits are overlays to the default, these sections/options will - #always exist. - # FIXME: This check is a best-effort attempt. Certain incorrect section - # names can pass and so can non-first invalid sections. - for s in required_options: #pragma: no cover - # check for sections - avail = None - if not global_singleton.config.has_section(s): - for avail in global_singleton.config.sections(): - if avail.startswith(s): - break - else: - raise Exception( - "Config file does not contain the required section: " + s) - # then check for specific options - k = avail or s - for o in required_options[s]: - if not global_singleton.config.has_option(k, o): - raise Exception("Config file does not contain the required " - "option '{}' in section '{}'.".format(o, k)) - loglevel = global_singleton.config.get("LOGGING", "console_log_level") try: set_logging_level(loglevel) @@ -742,6 +777,11 @@ def load_program_config(config_path="", bs=None, plugin_services=[]): if not os.path.exists(plogsdir): os.makedirs(plogsdir) p.set_log_dir(plogsdir) + # Check if a onion message channel was configured, and if so, + # check there is only 1; multiple directory nodes will be inside the config. + chans = get_mchannels() + onion_chans = [x for x in chans if "type" in x and x["type"] == "onion"] + assert len(onion_chans) < 2 def load_test_config(**kwargs): if "config_path" not in kwargs: diff --git a/jmclient/jmclient/wallet_rpc.py b/jmclient/jmclient/wallet_rpc.py index 4597cf475..68c8f895d 100644 --- a/jmclient/jmclient/wallet_rpc.py +++ b/jmclient/jmclient/wallet_rpc.py @@ -159,6 +159,9 @@ def __init__(self, port, wss_port, tls=True): # can be shut down cleanly: self.coinjoin_connection = None + def get_client_factory(self): + return JMClientProtocolFactory(self.taker) + def activate_coinjoin_state(self, state): """ To be set when a maker or taker operation is initialized; they cannot @@ -420,7 +423,8 @@ def dummy_restart_callback(msg): walletname=self.wallet_name, token=self.cookie) - def taker_finished(self, res, fromtx=False, waittime=0.0, txdetails=None): + def taker_finished(self, res, fromtx=False, + waittime=0.0, txdetails=None): # This is a slimmed down version compared with what is seen in # the CLI code, since that code encompasses schedules with multiple # entries; for now, the RPC only supports single joins. @@ -1003,13 +1007,13 @@ def dummy_user_callback(rel, abs): self.taker = Taker(self.services["wallet"], schedule, max_cj_fee = max_cj_fee, callbacks=(self.filter_orders_callback, - None, self.taker_finished)) + None, self.taker_finished)) # TODO ; this makes use of a pre-existing hack to allow # selectively disabling the stallMonitor function that checks # if transactions went through or not; here we want to cleanly # destroy the Taker after an attempt is made, successful or not. self.taker.testflag = True - self.clientfactory = JMClientProtocolFactory(self.taker) + self.clientfactory = self.get_client_factory() dhost, dport = self.check_daemon_ready() diff --git a/jmdaemon/jmdaemon/__init__.py b/jmdaemon/jmdaemon/__init__.py index 384b5f720..fc1c4070b 100644 --- a/jmdaemon/jmdaemon/__init__.py +++ b/jmdaemon/jmdaemon/__init__.py @@ -4,6 +4,7 @@ from .enc_wrapper import as_init_encryption, decode_decrypt, \ encrypt_encode, init_keypair, init_pubkey, get_pubkey, NaclError from .irc import IRCMessageChannel +from .onionmc import OnionMessageChannel from jmbase.support import get_log from .message_channel import MessageChannel, MessageChannelCollection from .orderbookwatch import OrderbookWatch diff --git a/jmdaemon/jmdaemon/daemon_protocol.py b/jmdaemon/jmdaemon/daemon_protocol.py index b20a55107..d84bbb514 100644 --- a/jmdaemon/jmdaemon/daemon_protocol.py +++ b/jmdaemon/jmdaemon/daemon_protocol.py @@ -7,8 +7,9 @@ from .protocol import (COMMAND_PREFIX, ORDER_KEYS, NICK_HASH_LENGTH, NICK_MAX_ENCODED, JM_VERSION, JOINMARKET_NICK_HEADER, COMMITMENT_PREFIXES) -from .irc import IRCMessageChannel +from .irc import IRCMessageChannel +from .onionmc import OnionMessageChannel from jmbase import (is_hs_uri, get_tor_agent, JMHiddenService, get_nontor_agent, BytesProducer, wrapped_urlparse, bdict_sdict_convert, JMHTTPResource) @@ -527,10 +528,15 @@ def on_JM_INIT(self, bcsource, network, irc_configs, minmakers, self.mc_shutdown() self.irc_configs = irc_configs self.restart_mc_required = True - mcs = [IRCMessageChannel(c, - daemon=self, - realname='btcint=' + bcsource) - for c in self.irc_configs] + mcs = [] + for c in self.irc_configs: + if "type" in c and c["type"] == "onion": + mcs.append(OnionMessageChannel(c, daemon=self)) + else: + # default is IRC; TODO allow others + mcs.append(IRCMessageChannel(c, + daemon=self, + realname='btcint=' + bcsource)) self.mcc = MessageChannelCollection(mcs) OrderbookWatch.set_msgchan(self, self.mcc) #register taker-specific msgchan callbacks here @@ -947,6 +953,7 @@ def init_connections(self, nick): incomplete transaction is wiped. """ self.jm_state = 0 #uninited + self.mcc.set_nick(nick) if self.restart_mc_required: self.mcc.run() self.restart_mc_required = False @@ -954,7 +961,6 @@ def init_connections(self, nick): #if we are not restarting the MC, #we must simulate the on_welcome message: self.on_welcome() - self.mcc.set_nick(nick) def transfer_commitment(self, commit): """Send this commitment via privmsg to one (random) diff --git a/jmdaemon/jmdaemon/message_channel.py b/jmdaemon/jmdaemon/message_channel.py index 96be37ec6..9549f193d 100644 --- a/jmdaemon/jmdaemon/message_channel.py +++ b/jmdaemon/jmdaemon/message_channel.py @@ -263,9 +263,9 @@ def privmsg(self, nick, cmd, message, mc=None): #is supposed to be sent. There used to be an exception raise. #to prevent a crash (especially in makers), we just inform #the user about it for now - log.error("Tried to communicate on this IRC server but " + log.error("Tried to communicate on this message channel but " "failed: " + str(mc)) - log.error("You might have to comment out this IRC server " + log.error("You might have to comment out this message channel" "in joinmarket.cfg and restart.") log.error("No action needed for makers / yield generators!") # todo: add logic to continue on other available mc @@ -444,7 +444,7 @@ def on_welcome_trigger(self, mc): if (not self.on_welcome_announce_id) and self.on_welcome: self.on_welcome_announce_id = reactor.callLater(60, self.on_welcome_setup_finished,) else: - log.info("All IRC servers connected, starting execution.") + log.info("All message channels connected, starting execution.") if self.on_welcome_announce_id: self.on_welcome_announce_id.cancel() self.on_welcome_setup_finished() diff --git a/jmdaemon/jmdaemon/onionmc.py b/jmdaemon/jmdaemon/onionmc.py new file mode 100644 index 000000000..a426674bb --- /dev/null +++ b/jmdaemon/jmdaemon/onionmc.py @@ -0,0 +1,1179 @@ +from jmdaemon.message_channel import MessageChannel +from jmdaemon.protocol import COMMAND_PREFIX, JM_VERSION +from jmbase import get_log, JM_APP_NAME, JMHiddenService +import json +import copy +from typing import Callable, Union +from twisted.internet import reactor, task, protocol +from twisted.protocols import basic +from twisted.internet.endpoints import TCP4ClientEndpoint +from twisted.internet.address import IPv4Address, IPv6Address +from txtorcon.socks import TorSocksEndpoint + +log = get_log() + +def network_addr_to_string(location: Union[IPv4Address, IPv4Address]) -> str: + if isinstance(location, (IPv4Address, IPv6Address)): + host = location.host + port = location.port + else: + # TODO handle other addr types + assert False + return host + ":" + str(port) + +# module-level var to control whether we use Tor or not +# (specifically for tests): +testing_mode = False +def set_testing_mode(configdata: dict) -> None: + """ Toggles testing mode which enables non-Tor + network setup: + """ + global testing_mode + if not "regtest_count" in configdata: + log.debug("Onion message channel is not using regtest mode.") + testing_mode = False + return + try: + s, e = [int(x) for x in configdata["regtest_count"].split(",")] + except Exception as e: + log.info("Failed to get regtest count settings, error: {}".format(repr(e))) + testing_mode = False + return + if s == 0 and e == 0: + testing_mode = False + return + testing_mode = True + +""" +Messaging protocol (which wraps the underlying Joinmarket +messaging protocol) used here is documented in: +Joinmarket-Docs/onion-messaging.md +""" + +LOCAL_CONTROL_MESSAGE_TYPES = {"connect": 785, "disconnect": 787, "connect-in": 797} +CONTROL_MESSAGE_TYPES = {"peerlist": 789, "getpeerlist": 791, + "handshake": 793, "dn-handshake": 795, + "ping": 797, "pong": 799, "disconnect": 801} +JM_MESSAGE_TYPES = {"privmsg": 685, "pubmsg": 687} + +# Used for some control message construction, as detailed below. +NICK_PEERLOCATOR_SEPARATOR = ";" + +# location_string and nick must be set before sending, +# otherwise invalid: +client_handshake_json = {"app-name": JM_APP_NAME, + "directory": False, + "location-string": "", + "proto-ver": JM_VERSION, + "features": {}, + "nick": "" +} + +# default acceptance false; code must switch it on: +server_handshake_json = {"app-name": JM_APP_NAME, + "directory": True, + "proto-ver-min": JM_VERSION, + "proto-ver-max": JM_VERSION, + "features": {}, + "accepted": False, + "nick": "", + "motd": "Default MOTD, replace with information for the directory." + } + +# states that keep track of relationship to a peer +PEER_STATUS_UNCONNECTED, PEER_STATUS_CONNECTED, PEER_STATUS_HANDSHAKED, \ + PEER_STATUS_DISCONNECTED = range(4) + + +class OnionPeerError(Exception): + pass + +class OnionPeerDirectoryWithoutHostError(OnionPeerError): + pass + +class OnionPeerConnectionError(OnionPeerError): + pass + +class OnionCustomMessageDecodingError(Exception): + pass + +class OnionCustomMessage(object): + """ Encapsulates the messages passed over the wire + to and from other onion peers + """ + def __init__(self, text: str, msgtype: int): + self.text = text + self.msgtype = msgtype + + def encode(self) -> str: + self.encoded = json.dumps({"type": self.msgtype, + "line": self.text}).encode("utf-8") + return self.encoded + + @classmethod + def from_string_decode(cls, msg: str) -> 'OnionCustomMessage': + """ Build a custom message from a json-ified string. + """ + try: + msg_obj = json.loads(msg) + text = msg_obj["line"] + msgtype = msg_obj["type"] + except: + raise OnionCustomMessageDecodingError + return cls(text, msgtype) + +class OnionLineProtocol(basic.LineReceiver): + def connectionMade(self): + self.factory.register_connection(self) + + def connectionLost(self, reason): + self.factory.register_disconnection(self) + + def lineReceived(self, line: str) -> None: + #print("received", repr(line)) + try: + msg = OnionCustomMessage.from_string_decode(line) + except OnionCustomMessageDecodingError: + log.debug("Received invalid message, dropping connection.") + self.transport.loseConnection() + return + self.factory.receive_message(msg, self) + + def message(self, message: OnionCustomMessage) -> None: + #log.info("in OnionLineProtocol, about to send message: {} to peer {}".format(message.encode(), self.transport.getPeer())) + self.transport.write(message.encode() + self.delimiter) + +class OnionLineProtocolFactory(protocol.ServerFactory): + """ This factory allows us to start up instances + of the LineReceiver protocol that are instantiated + towards us. + As such, it is responsible for keeping track + """ + protocol = OnionLineProtocol + + def __init__(self, client: 'OnionMessageChannel'): + self.client = client + self.peers = {} + + def register_connection(self, p: OnionLineProtocol) -> None: + # make a local control message registering + # the new connection + peer_location = network_addr_to_string(p.transport.getPeer()) + self.client.register_connection(peer_location, direction=0) + self.peers[peer_location] = p + + def register_disconnection(self, p: OnionLineProtocol) -> None: + # make a local control message registering + # the new connection + peer_location = network_addr_to_string(p.transport.getPeer()) + self.client.register_disconnection(peer_location) + if not peer_location in self.peers: + log.warn("Disconnection event registered for non-existent peer.") + return + del self.peers[peer_location] + + def receive_message(self, message: OnionCustomMessage, + p: OnionLineProtocol) -> None: + self.client.receive_msg(message, network_addr_to_string( + p.transport.getPeer())) + + def send(self, message: OnionCustomMessage, destination: str) -> bool: + #print("trying to send in OnionLineProtocolFactory.") + #print("message: {}, destination: {}".format(message.encode(), destination)) + if not (destination in self.peers): + print("sending message {}, destination {} was not in peers {}".format(message.encode(), destination, self.peers)) + return False + proto = self.peers[destination] + proto.message(message) + return True + +class OnionClientFactory(protocol.ServerFactory): + """ We define a distinct protocol factory for outbound connections. + Notably, this factory supports only *one* protocol instance at a time. + """ + protocol = OnionLineProtocol + + def __init__(self, message_receive_callback: Callable, + connection_callback: Callable, + disconnection_callback: Callable): + self.proto_client = None + # callback takes OnionCustomMessage as arg and returns None + self.message_receive_callback = message_receive_callback + # connection callback, no args, returns None + self.connection_callback = connection_callback + # disconnection the same + self.disconnection_callback = disconnection_callback + + def register_connection(self, p: OnionLineProtocol) -> None: + #print("in OnionClientFactory, registered a connection, proto instance: ", p) + self.proto_client = p + self.connection_callback() + + def register_disconnection(self, p: OnionLineProtocol) -> None: + self.proto_client = None + self.disconnection_callback() + + def send(self, msg: OnionCustomMessage) -> bool: + self.proto_client.message(msg) + + def receive_message(self, message: OnionCustomMessage, + p: OnionLineProtocol) -> None: + self.message_receive_callback(message) + + """ + def clientConnectionLost(self, connector, reason): + log.debug('Connection to peer lost: {}, reason: {}'.format(connector, reason)) + if reactor.running: + log.info('Attempting to reconnect...') + protocol.ReconnectingClientFactory.clientConnectionLost( + self, connector, reason) + + def clientConnectionFailed(self, connector, reason): + log.debug('Connection to peer failed: {}, reason: {}'.format( + connector, reason)) + if reactor.running: + log.info('Attempting to reconnect...') + protocol.ReconnectingClientFactory.clientConnectionFailed( + self, connector, reason) + """ + +class OnionPeer(object): + + def __init__(self, messagechannel: 'OnionMessageChannel', + socks5_host: str, socks5_port: int, + hostname: str=None, port: int=-1, + directory: bool=False, nick: str="", + handshake_callback: Callable=None): + # reference to the managing OnionMessageChannel instance is + # needed so that we know where to send the messages received + # from this peer: + self.messagechannel = messagechannel + self.nick = nick + # client side net config: + self.socks5_host = socks5_host + self.socks5_port = socks5_port + # remote net config: + self.hostname = hostname + self.port = port + if directory and not (self.hostname): + raise OnionPeerDirectoryWithoutHostError() + self.directory = directory + self._status = PEER_STATUS_UNCONNECTED + #A function to be called to initiate a handshake; + # it should take a single argument, an OnionPeer object, + #and return None. + self.handshake_callback = handshake_callback + # Keep track of the protocol factory used to connect + # to the remote peer. Note that this won't always be used, + # if we have an inbound connection from this peer: + self.factory = None + # alternate location strings are used for inbound + # connections for this peer (these will be used first + # and foremost by directories, sending messages backwards + # on a connection created towards them). + self.alternate_location = "" + + def set_alternate_location(self, location_string: str): + self.alternate_location = location_string + + def update_status(self, destn_status: int) -> None: + """ Wrapping state updates to enforce: + (a) that the handshake is triggered by connection + outwards, and (b) to ensure no illegal state transitions. + """ + assert destn_status in range(4) + ignored_updates = [] + if self._status == PEER_STATUS_UNCONNECTED: + allowed_updates = [PEER_STATUS_CONNECTED, + PEER_STATUS_DISCONNECTED] + elif self._status == PEER_STATUS_CONNECTED: + # updates from connected->connected are harmless + allowed_updates = [PEER_STATUS_CONNECTED, + PEER_STATUS_DISCONNECTED, + PEER_STATUS_HANDSHAKED] + elif self._status == PEER_STATUS_HANDSHAKED: + allowed_updates = [PEER_STATUS_DISCONNECTED] + ignored_updates = [PEER_STATUS_CONNECTED] + elif self._status == PEER_STATUS_DISCONNECTED: + allowed_updates = [PEER_STATUS_CONNECTED] + ignored_updates = [PEER_STATUS_DISCONNECTED] + if destn_status in ignored_updates: + # TODO: this happens sometimes from 2->1; why? + log.debug("Attempt to update status of peer from {} " + "to {} ignored.".format(self._status, destn_status)) + return + assert destn_status in allowed_updates, ("couldn't update state " + "from {} to {}".format(self._status, destn_status)) + self._status = destn_status + # the handshakes are always initiated by a client: + if destn_status == PEER_STATUS_CONNECTED: + log.info("We, {}, are calling the handshake callback as client.".format(self.messagechannel.self_as_peer.peer_location())) + self.handshake_callback(self) + + def status(self) -> int: + """ Simple getter function for the wrapped _status: + """ + return self._status + + def set_nick(self, nick: str) -> None: + self.nick = nick + + def get_nick_peerlocation_ser(self) -> str: + if not self.nick: + raise OnionPeerError("Cannot serialize " + "identifier string without nick.") + return self.nick + NICK_PEERLOCATOR_SEPARATOR + \ + self.peer_location() + + @classmethod + def from_location_string(cls, mc: 'OnionMessageChannel', + location: str, + socks5_host: str, + socks5_port: int, + directory: bool=False, + handshake_callback: Callable=None) -> 'OnionPeer': + """ Allows construction of an OnionPeer from the + connection information given by the network interface. + TODO: special handling for inbound is needed. + """ + host, port = location.split(":") + return cls(mc, socks5_host, socks5_port, hostname=host, + port=int(port), directory=directory, + handshake_callback=handshake_callback) + + def set_host_port(self, hostname: str, port: int) -> None: + """ If the connection info is discovered + after this peer was already added to our list, + we can set it with this method. + """ + self.hostname = hostname + self.port = port + + def set_location(self, location_string: str) -> bool: + """ Allows setting location from an unchecked + input string argument; if the string does not have + the required format, + will return False, otherwise self.hostname, self.port are + updated for future `peer_location` calls, and True is returned. + """ + try: + host, port = location_string.split(":") + portint = int(port) + assert portint > 0 + except Exception as e: + log.debug("Failed to update host and port of this peer, " + "error: {}".format(repr(e))) + return False + self.hostname = host + self.port = portint + return True + + def peer_location(self) -> str: + assert (self.hostname and self.port > 0) + return self.hostname + ":" + str(self.port) + + def send(self, message: OnionCustomMessage) -> bool: + """ If the message can be sent on either an inbound or + outbound connection, True is returned, else False. + """ + if not self.factory: + #print("We are: {}. peer, wich was directory {}, did not have factory, so we send via mc".format( + # self.messagechannel.self_as_peer.peer_location(), self.directory)) + # we try to send via the overall message channel serving + # protocol, i.e. we assume the connection was made inbound: + #print("and to this location: ", self.peer_location()) + return self.messagechannel.proto_factory.send(message, self.alternate_location) + #print("peer which was directory {} did have factory {}, we send via that".format(self.directory, self.factory)) + return self.factory.send(message) + + def receive_message(self, message: OnionCustomMessage) -> None: + self.messagechannel.receive_msg(message, self.peer_location()) + + def connect(self) -> None: + """ This method is called to connect, over Tor, to the remote + peer at the given onion host/port. + The connection is 'persistent' in the sense that we use a + ReconnectingClientFactory. + """ + if self._status in [PEER_STATUS_HANDSHAKED, PEER_STATUS_CONNECTED]: + return + if not (self.hostname and self.port > 0): + raise OnionPeerConnectionError( + "Cannot connect without host, port info") + + self.factory = OnionClientFactory(self.receive_message, + self.register_connection, self.register_disconnection) + if testing_mode: + print("{} is making a tcp connection to {}, {}, {},".format( + self.messagechannel.self_as_peer.peer_location(), self.hostname, self.port, self.factory)) + self.tcp_connector = reactor.connectTCP(self.hostname, self.port, self.factory) + else: + torEndpoint = TCP4ClientEndpoint(reactor, self.socks5_host, self.socks5_port) + onionEndpoint = TorSocksEndpoint(torEndpoint, self.hostname, self.port) + onionEndpoint.connect(self.factory) + + def register_connection(self) -> None: + self.messagechannel.register_connection(self.peer_location(), direction=1) + + def register_disconnection(self) -> None: + self.messagechannel.register_disconnection(self.peer_location()) + + def try_to_connect(self) -> None: + """ This method wraps OnionPeer.connect and accepts + any error if that fails. + """ + try: + self.connect() + except OnionPeerConnectionError as e: + log.debug("Tried to connect but failed: {}".format(repr(e))) + except Exception as e: + log.warn("Got unexpected exception in connect attempt: {}".format( + repr(e))) + + def disconnect(self) -> None: + if self._status in [PEER_STATUS_UNCONNECTED, PEER_STATUS_DISCONNECTED]: + return + if not (self.hostname and self.port > 0): + raise OnionPeerConnectionError( + "Cannot disconnect without host, port info") + d = self.reconnecting_service.stopService() + d.addCallback(self.complete_disconnection) + d.addErrback(log.warn, "Failed to disconnect from peer {}.".format( + self.peer_location())) + + def complete_disconnection(self): + log.debug("Disconnected from peer: {}".format(self.peer_location())) + self.update_status(PEER_STATUS_DISCONNECTED) + self.factory = None + +class OnionDirectoryPeer(OnionPeer): + delay = 4.0 + def try_to_connect(self) -> None: + # Delay deliberately expands out to very + # long times as yg-s tend to be very long + # running bots: + self.delay *= 1.5 + if self.delay > 10000: + log.warn("Cannot connect to directory node peer: {} " + "after 20 attempts, giving up.".format(self.peer_location())) + return + try: + self.connect() + except OnionPeerConnectionError: + reactor.callLater(self.delay, self.try_to_connect) + +class OnionMessageChannel(MessageChannel): + """ Receives messages via a Torv3 hidden/onion service. + Sends messages to other nodes of the same type over Tor + via SOCKS5. + Uses one or more configured "directory nodes" + to access a list of current active nodes, and updates + dynamically from messages seen. + """ + + def __init__(self, + configdata, + daemon=None): + MessageChannel.__init__(self, daemon=daemon) + # hostid is a feature to avoid replay attacks across message channels; + # TODO investigate, but for now, treat onion-based as one "server". + self.hostid = "onion-network" + self.tor_control_host = configdata["tor_control_host"] + self.tor_control_port = int(configdata["tor_control_port"]) + self.onion_serving_host=configdata["onion_serving_host"] + self.onion_serving_port=int(configdata["onion_serving_port"]) + self.hidden_service_dir = configdata["hidden_service_dir"] + # client side config: + self.socks5_host = "127.0.0.1" + self.socks5_port = 9050 + # we use the setting in the config sent over from + # the client, to decide whether to set up our connections + # over localhost (if testing), without Tor: + set_testing_mode(configdata) + log.info("after call to testing_mode, it is: {}".format(testing_mode)) + # keep track of peers. the list will be instances + # of OnionPeer: + self.peers = set() + for dn in configdata["directory_nodes"].split(","): + # note we don't use a nick for directories: + self.peers.add(OnionDirectoryPeer.from_location_string( + self, dn, self.socks5_host, self.socks5_port, + directory=True, handshake_callback=self.handshake_as_client)) + # we can direct messages via the protocol factory, which + # will index protocol connections by peer location: + self.proto_factory = OnionLineProtocolFactory(self) + if testing_mode: + # we serve over TCP: + self.testing_serverconn = reactor.listenTCP(self.onion_serving_port, + self.proto_factory, interface="localhost") + self.onion_hostname = "127.0.0.1" + else: + self.hs = JMHiddenService(self.proto_factory, + self.info_callback, + self.setup_error_callback, + self.onion_hostname_callback, + self.tor_control_host, + self.tor_control_port, + self.onion_serving_host, + self.onion_serving_port, + shutdown_callback=self.shutdown_callback, + hidden_service_dir=self.hidden_service_dir) + # this call will start bringing up the HS; when it's finished, + # it will fire the `onion_hostname_callback`, or if it fails, + # it'll fire the `setup_error_callback`. + self.hs.start_tor() + + # This will serve as our unique identifier, indicating + # that we are ready to communicate (in both directions) over Tor. + self.onion_hostname = None + + # intended to represent the special case of 'we are the + # only directory node known', however for now dns don't interact + # so this has no role. TODO probably remove it. + self.genesis_node = False + + # waiting loop for all directories to have + # connected (note we could use a deferred but + # the rpc connection calls are not using twisted) + self.wait_for_directories_loop = None + + def info_callback(self, msg): + log.info(msg) + + def setup_error_callback(self, msg): + log.error(msg) + + def shutdown_callback(self, msg): + log.info("in shutdown callback: {}".format(msg)) + + def onion_hostname_callback(self, hostname): + """ This entrypoint marks the start of the OnionMessageChannel + running, since we need this unique identifier as our name + before we can start working (we need to compare it with the + configured directory nodes). + """ + print("hostname: ", hostname) + print("type: ", type(hostname)) + log.info("setting onion hostname to : {}".format(hostname)) + self.onion_hostname = hostname + +# ABC implementation section + def run(self) -> None: + self.hs_up_loop = task.LoopingCall(self.check_onion_hostname) + self.hs_up_loop.start(0.5) + + def get_pubmsg(self, msg:str, source_nick:str ="") -> str: + """ Converts a message into the known format for + pubmsgs; if we are not sending this (because we + are a directory, forwarding it), `source_nick` must be set. + Note that pubmsg does NOT prefix the *message* with COMMAND_PREFIX. + """ + nick = source_nick if source_nick else self.nick + return nick + COMMAND_PREFIX + "PUBLIC" + msg + + def get_privmsg(self, nick: str, cmd: str, message: str, + source_nick=None) -> None: + """ See `get_pubmsg` for comment on `source_nick`. + """ + from_nick = source_nick if source_nick else self.nick + return from_nick + COMMAND_PREFIX + nick + COMMAND_PREFIX + \ + cmd + " " + message + + def _pubmsg(self, msg:str) -> None: + """ Best effort broadcast of message `msg`: + send the message to every known directory node, + with the PUBLIC message type and nick. + """ + peerids = self.get_directory_peers() + msg = OnionCustomMessage(self.get_pubmsg(msg), + JM_MESSAGE_TYPES["pubmsg"]) + for peerid in peerids: + # currently a directory node can send its own + # pubmsgs (act as maker or taker); this will + # probably be removed but is useful in testing: + if peerid == self.self_as_peer.peer_location(): + self.receive_msg(msg, "00") + else: + self._send(self.get_peer_by_id(peerid), msg) + + def _privmsg(self, nick: str, cmd: str, msg:str) -> None: + log.debug("Privmsging to: {}, {}, {}".format(nick, cmd, msg)) + encoded_privmsg = OnionCustomMessage(self.get_privmsg(nick, cmd, msg), + JM_MESSAGE_TYPES["privmsg"]) + peerid = self.get_peerid_by_nick(nick) + if peerid: + peer = self.get_peer_by_id(peerid) + # notice the order matters here!: + if not peerid or not peer or not peer.status() == PEER_STATUS_HANDSHAKED: + # If we are trying to message a peer via their nick, we + # may not yet have a connection; then we just + # forward via directory nodes. + log.debug("Privmsg peer: {} but don't have peerid; " + "sending via directory.".format(nick)) + try: + # TODO change this to redundant or switching? + peer = self.get_connected_directory_peers()[0] + except Exception as e: + log.warn("Failed to send privmsg because no " + "directory peer is connected. Error: {}".format(repr(e))) + return + self._send(peer, encoded_privmsg) + + def _announce_orders(self, offerlist: list) -> None: + for offer in offerlist: + self._pubmsg(offer) + +# End ABC implementation section + + def check_onion_hostname(self): + if not self.onion_hostname: + return + self.hs_up_loop.stop() + # now our hidden service is up, we must check our peer status + # then set up directories. + self.get_our_peer_info() + # at this point the only peers added are directory + # nodes from config; we try to connect to all. + # We will get other peers to add to our list once they + # start sending us messages. + reactor.callLater(0.0, self.connect_to_directories) + + def get_our_peer_info(self) -> None: + """ Create a special OnionPeer object, + outside of our peerlist, to refer to ourselves. + """ + dp = self.get_directory_peers() + self_dir = False + # only for publically exposed onion does the 'virtual port' exist; + # for local tests we always connect to an actual machine port: + port_to_check = 80 if not testing_mode else self.onion_serving_port + my_location_str = self.onion_hostname + ":" + str(port_to_check) + log.info("To check if we are genesis, we compare {} with {}".format(my_location_str, dp)) + if [my_location_str] == dp: + log.info("This is the genesis node: {}".format(self.onion_hostname)) + self.genesis_node = True + self_dir = True + elif my_location_str in dp: + # Here we are just one of many directory nodes, + # which should be fine, we should just be careful + # to not query ourselves. + self_dir = True + self.self_as_peer = OnionPeer(self, self.socks5_host, self.socks5_port, + self.onion_hostname, self.onion_serving_port, + self_dir, nick=self.nick, + handshake_callback=None) + + def connect_to_directories(self) -> None: + if self.genesis_node: + # we are a directory and we have no directory peers; + # just start. + self.on_welcome(self) + return + # the remaining code is only executed by non-directories: + for p in self.peers: + log.info("Trying to connect to node: {}".format(p.peer_location())) + try: + p.connect() + except OnionPeerConnectionError: + pass + # do not trigger on_welcome event until all directories + # configured are ready: + self.on_welcome_sent = False + self.wait_for_directories_loop = task.LoopingCall( + self.wait_for_directories) + self.wait_for_directories_loop.start(10.0) + + def handshake_as_client(self, peer: OnionPeer) -> None: + assert peer.status() == PEER_STATUS_CONNECTED + if self.self_as_peer.directory: + log.debug("Not sending client handshake to {} because we are directory.".format(peer.peer_location())) + return + our_hs = copy.deepcopy(client_handshake_json) + our_hs["location-string"] = self.self_as_peer.peer_location() + our_hs["nick"] = self.nick + # We fire and forget the handshake; successful setting + # of the `is_handshaked` var in the Peer object will depend + # on a valid/success return via the custommsg hook in the plugin. + log.info("Sending this handshake: {} to peer {}".format(json.dumps(our_hs), peer.peer_location())) + self._send(peer, OnionCustomMessage(json.dumps(our_hs), + CONTROL_MESSAGE_TYPES["handshake"])) + + def handshake_as_directory(self, peer: OnionPeer, our_hs: dict) -> None: + assert peer.status() == PEER_STATUS_CONNECTED + log.info("Sending this handshake as directory: {}".format(json.dumps(our_hs))) + self._send(peer, OnionCustomMessage(json.dumps(our_hs), + CONTROL_MESSAGE_TYPES["dn-handshake"])) + + def get_directory_peers(self) -> list: + return [ p.peer_location() for p in self.peers if p.directory is True] + + def get_peerid_by_nick(self, nick:str) -> Union[OnionPeer, None]: + for p in self.get_all_connected_peers(): + if p.nick == nick: + return p.peer_location() + return None + + def _send(self, peer: OnionPeer, message: OnionCustomMessage) -> bool: + try: + return peer.send(message) + except Exception as e: + # This can happen when a peer disconnects, depending + # on the timing: + log.warn("Failed to send message to: {}, error: {}".format( + peer.peer_location(), repr(e))) + return False + + def shutdown(self): + """ TODO + """ + + def receive_msg(self, message: OnionCustomMessage, peer_location: str) -> None: + """ Messages from peers and also connection related control + messages. These messages either come via OnionPeer or via + the main OnionLineProtocolFactory instance that handles all + inbound connections. + """ + if self.self_as_peer.directory: + print("received message as directory: ", message.encode()) + peer = self.get_peer_by_id(peer_location) + if not peer: + log.warn("Received message but could not find peer: {}".format(peer_location)) + return + msgtype = message.msgtype + msgval = message.text + if msgtype in LOCAL_CONTROL_MESSAGE_TYPES.values(): + self.process_control_message(peer_location, msgtype, msgval) + # local control messages are processed first. + # TODO this is a historical artifact, we can simplify. + return + + if self.process_control_message(peer_location, msgtype, msgval): + # will return True if it is, elsewise, a control message. + return + + # ignore non-JM messages: + if not msgtype in JM_MESSAGE_TYPES.values(): + log.debug("Invalid message type, ignoring: {}".format(msgtype)) + return + + # real JM message; should be: from_nick, to_nick, cmd, message + try: + nicks_msgs = msgval.split(COMMAND_PREFIX) + from_nick, to_nick = nicks_msgs[:2] + msg = COMMAND_PREFIX + COMMAND_PREFIX.join(nicks_msgs[2:]) + if to_nick == "PUBLIC": + #log.debug("A pubmsg is being processed by {} from {}; it " + # "is {}".format(self.self_as_peer.nick, from_nick, msg)) + self.on_pubmsg(from_nick, msg) + if self.self_as_peer.directory: + self.forward_pubmsg_to_peers(msg, from_nick) + elif to_nick != self.nick: + if not self.self_as_peer.directory: + log.debug("Ignoring message, not for us: {}".format(msg)) + else: + self.forward_privmsg_to_peer(to_nick, msg, from_nick) + else: + self.on_privmsg(from_nick, msg) + except Exception as e: + log.debug("Invalid joinmarket message: {}, error was: {}".format( + msgval, repr(e))) + return + + def forward_pubmsg_to_peers(self, msg: str, from_nick: str) -> None: + """ Used by directory nodes currently. Takes a received + message that was PUBLIC and broadcasts it to the non-directory + peers. + """ + assert self.self_as_peer.directory + pubmsg = self.get_pubmsg(msg, source_nick=from_nick) + msgtype = JM_MESSAGE_TYPES["pubmsg"] + # NOTE!: Specifically with forwarding/broadcasting, + # we introduce the danger of infinite re-broadcast, + # if there is more than one party forwarding. + # For now we are having directory nodes not talk to + # each other (i.e. they are *required* to only configure + # themselves, not other dns). But this could happen by + # accident. + encoded_msg = OnionCustomMessage(pubmsg, msgtype) + for peer in self.get_connected_nondirectory_peers(): + # don't loop back to the sender: + if peer.nick == from_nick: + continue + log.debug("Sending {}:{} to nondir peer {}".format( + msgtype, pubmsg, peer.peer_location())) + self._send(peer, encoded_msg) + + def forward_privmsg_to_peer(self, nick: str, message: str, + from_nick: str) -> None: + assert self.self_as_peer.directory + peerid = self.get_peerid_by_nick(nick) + if not peerid: + log.debug("We were asked to send a message from {} to {}, " + "but {} is not connected.".format(from_nick, nick, nick)) + return + # The `message` passed in has format COMMAND_PREFIX||command||" "||msg + # we need to parse out cmd, message for sending. + _, cmdmsg = message.split(COMMAND_PREFIX) + cmdmsglist = cmdmsg.split(" ") + cmd = cmdmsglist[0] + msg = " ".join(cmdmsglist[1:]) + privmsg = self.get_privmsg(nick, cmd, msg, source_nick=from_nick) + #log.debug("Sending out privmsg: {} to peer: {}".format(privmsg, peerid)) + encoded_msg = OnionCustomMessage(privmsg, + JM_MESSAGE_TYPES["privmsg"]) + self._send(self.get_peer_by_id(peerid), encoded_msg) + # If possible, we forward the from-nick's network location + # to the to-nick peer, so they can just talk directly next time. + peerid_from = self.get_peerid_by_nick(from_nick) + if not peerid_from: + return + peer_to = self.get_peer_by_id(peerid) + self.send_peers(peer_to, peerid_filter=[peerid_from]) + + def process_control_message(self, peerid: str, msgtype: int, + msgval: str) -> bool: + """ Triggered by a directory node feeding us + peers, or by a connect/disconnect hook; this is our housekeeping + to try to create, and keep track of, useful connections. + """ + all_ctrl = list(LOCAL_CONTROL_MESSAGE_TYPES.values( + )) + list(CONTROL_MESSAGE_TYPES.values()) + if msgtype not in all_ctrl: + return False + # this is too noisy, but TODO, investigate allowing + # some kind of control message monitoring e.g. default-off + # log-to-file (we don't currently have a 'TRACE' level debug). + #log.debug("received control message: {},{}".format(msgtype, msgval)) + if msgtype == CONTROL_MESSAGE_TYPES["peerlist"]: + # This is the base method of seeding connections; + # a directory node can send this any time. We may well + # need to control this; for now it just gets processed, + # whereever it came from: + try: + peerlist = msgval.split(",") + for peer in peerlist: + # defaults mean we just add the peer, not + # add or alter its connection status: + self.add_peer(peer, with_nick=True) + except Exception as e: + log.debug("Incorrectly formatted peer list: {}, " + "ignoring, {}".format(msgval, e)) + # returning True either way, because although it was an + # invalid message, it *was* a control message, and should + # not be processed as something else. + return True + elif msgtype == CONTROL_MESSAGE_TYPES["getpeerlist"]: + # getpeerlist must be accompanied by a full node + # locator, and nick; + # add that peer before returning our peer list. + p = self.add_peer(msgval, connection=True, + overwrite_connection=True, with_nick=True) + try: + self.send_peers(p) + except OnionPeerConnectionError: + pass + # comment much as above; if we can't connect, it's none + # of our business. + return True + elif msgtype == CONTROL_MESSAGE_TYPES["handshake"]: + # sent by non-directory peers on startup + self.process_handshake(peerid, msgval) + return True + elif msgtype == CONTROL_MESSAGE_TYPES["dn-handshake"]: + self.process_handshake(peerid, msgval, dn=True) + return True + elif msgtype == LOCAL_CONTROL_MESSAGE_TYPES["connect"]: + self.add_peer(msgval, connection=True, + overwrite_connection=True) + elif msgtype == LOCAL_CONTROL_MESSAGE_TYPES["connect-in"]: + self.add_peer(msgval, connection=True, + overwrite_connection=True) + elif msgtype == LOCAL_CONTROL_MESSAGE_TYPES["disconnect"]: + log.debug("We got a disconnect event: {}".format(msgval)) + if msgval in [x.peer_location() for x in self.get_connected_directory_peers()]: + # we need to use the full peer locator string, so that + # add_peer knows it can try to reconnect: + msgval = self.get_peer_by_id(msgval).peer_location() + self.add_peer(msgval, connection=False, + overwrite_connection=True) + else: + assert False + # If we got here it is *not* a non-local control message; + # so we must process it as a Joinmarket message. + return False + + + def process_handshake(self, peerid: str, message: str, + dn: bool=False) -> None: + peer = self.get_peer_by_id(peerid) + if not peer: + # rando sent us a handshake? + log.warn("Unexpected handshake from unknown peer: {}, " + "ignoring.".format(peerid)) + return + assert isinstance(peer, OnionPeer) + if not peer.status() == PEER_STATUS_CONNECTED: + # we were not waiting for it: + log.warn("Unexpected handshake from peer: {}, " + "ignoring. Peer's current status is: {}".format( + peerid, peer.status())) + return + if dn: + print("We, {}, are processing a handshake with dn {} from peer {}".format(self.self_as_peer.peer_location(), dn, peerid)) + # it means, we are a non-dn and we are expecting + # a returned `dn-handshake` message: + # (currently dns don't talk to other dns): + assert not self.self_as_peer.directory + if not peer.directory: + # got dn-handshake from non-dn: + log.warn("Unexpected dn-handshake from non-dn " + "node: {}, ignoring.".format(peerid)) + return + # we got the right message from the right peer; + # check it is formatted correctly and represents + # acceptance of the connection + try: + handshake_json = json.loads(message) + app_name = handshake_json["app-name"] + is_directory = handshake_json["directory"] + proto_min = handshake_json["proto-ver-min"] + proto_max = handshake_json["proto-ver-max"] + features = handshake_json["features"] + accepted = handshake_json["accepted"] + nick = handshake_json["nick"] + assert isinstance(proto_max, int) + assert isinstance(proto_min, int) + assert isinstance(features, dict) + assert isinstance(nick, str) + except Exception as e: + log.warn("Invalid handshake message from: {}, exception: {}, message: {}," + "ignoring".format(peerid, repr(e), message)) + return + # currently we are not using any features, but the intention + # is forwards compatibility, so we don't check its contents + # at all. + if not accepted: + log.warn("Directory: {} rejected our handshake.".format(peerid)) + return + if not (app_name == JM_APP_NAME and is_directory and JM_VERSION \ + <= proto_max and JM_VERSION >= proto_min and accepted): + log.warn("Handshake from directory is incompatible or " + "rejected: {}".format(handshake_json)) + return + # We received a valid, accepting dn-handshake. Update the peer. + peer.update_status(PEER_STATUS_HANDSHAKED) + peer.set_nick(nick) + else: + print("We, {}, are processing a handshake with dn {} from peer {}".format(self.self_as_peer.peer_location(), dn, peerid)) + # it means, we are receiving an initial handshake + # message from a 'client' (non-dn) peer. + # dns don't talk to each other: + assert not peer.directory + accepted = True + try: + handshake_json = json.loads(message) + app_name = handshake_json["app-name"] + is_directory = handshake_json["directory"] + proto_ver = handshake_json["proto-ver"] + features = handshake_json["features"] + full_location_string = handshake_json["location-string"] + nick = handshake_json["nick"] + assert isinstance(proto_ver, int) + assert isinstance(features, dict) + assert isinstance(nick, str) + except Exception as e: + log.warn("(not dn) Invalid handshake message from: {}, exception: {}, message: {}," + "ignoring".format(peerid, repr(e), message)) + accepted = False + if not (app_name == JM_APP_NAME and proto_ver == JM_VERSION \ + and not is_directory): + log.warn("Invalid handshake name/version data: {}, from peer: " + "{}, rejecting.".format(message, peerid)) + accepted = False + # If accepted, we should update the peer to have the full + # location which in general will not yet be present, so as to + # allow publishing their location via `getpeerlist`: + if not peer.set_location(full_location_string): + accepted = False + if not peerid == full_location_string: + print("we are reading a handshake from location {} but they sent" + "us full location string {}, setting an alternate".format( + peerid, full_location_string)) + peer.set_alternate_location(peerid) + peer.set_nick(nick) + # client peer's handshake message was valid; send ours, and + # then mark this peer as successfully handshaked: + our_hs = copy.deepcopy(server_handshake_json) + our_hs["nick"] = self.nick + our_hs["accepted"] = accepted + if self.self_as_peer.directory: + self.handshake_as_directory(peer, our_hs) + if accepted: + peer.update_status(PEER_STATUS_HANDSHAKED) + + def get_peer_by_id(self, p: str) -> Union[OnionPeer, bool]: + """ Returns the OnionPeer with peer location p, + if it is in self.peers, otherwise returns False. + """ + if p == "00": + return self.self_as_peer + for x in self.peers: + if x.peer_location() == p: + return x + if x.alternate_location == p: + return x + return False + + def register_connection(self, peer_location: str, direction: int) -> None: + """ We send ourselves a local control message indicating + the new connection. + If the connection is inbound, direction == 0, else 1. + """ + assert direction in range(2) + if direction == 1: + msgtype = LOCAL_CONTROL_MESSAGE_TYPES["connect"] + else: + msgtype = LOCAL_CONTROL_MESSAGE_TYPES["connect-in"] + msg = OnionCustomMessage(peer_location, msgtype) + self.receive_msg(msg, "00") + + def register_disconnection(self, peer_location: str) -> None: + """ We send ourselves a local control message indicating + the disconnection. + """ + msg = OnionCustomMessage(peer_location, + LOCAL_CONTROL_MESSAGE_TYPES["disconnect"]) + self.receive_msg(msg, "00") + + def add_peer(self, peerdata: str, connection: bool=False, + overwrite_connection: bool=False, with_nick=False) -> None: + """ add non-directory peer from (nick, peer) serialization `peerdata`, + where "peer" is host:port; + return the created OnionPeer object. Or, with_nick=False means + that `peerdata` has only the peer location. + If the peer is already in our peerlist it can be updated in + one of these ways: + * the nick can be added + * it can be marked as 'connected' if it was previously unconnected, + with this conditional on whether the flag `overwrite_connection` is + set. Note that this peer removal, unlike the peer addition above, + can also occur for directory nodes, if we lose connection (and then + we persistently try to reconnect; see OnionDirectoryPeer). + """ + if with_nick: + try: + nick, peer = peerdata.split(NICK_PEERLOCATOR_SEPARATOR) + except Exception as e: + # TODO: as of now, this is not an error, but expected. + # Don't log? Do something else? + log.debug("Received invalid peer identifier string: {}, {}".format( + peerdata, e)) + return + else: + peer = peerdata + + # assumed that it's passing a full string + try: + temp_p = OnionPeer.from_location_string(self, peer, + self.socks5_host, self.socks5_port, + handshake_callback=self.handshake_as_client) + except Exception as e: + # There are currently a few ways the location + # parsing and Peer object construction can fail; + # TODO specify exception types. + log.warn("Failed to add peer: {}, exception: {}".format(peer, repr(e))) + return + if not self.get_peer_by_id(temp_p.peer_location()): + if connection: + log.info("Updating status of peer: {} to connected.".format(temp_p.peer_location())) + temp_p.update_status(PEER_STATUS_CONNECTED) + else: + temp_p.update_status(PEER_STATUS_DISCONNECTED) + if with_nick: + temp_p.set_nick(nick) + self.peers.add(temp_p) + if not connection: + # Here, we are not currently connected. We + # try to connect asynchronously. We don't pay attention + # to any return. This attempt is one-shot and opportunistic, + # for non-dns, but will retry with exp-backoff for dns. + # Notice this is only possible for non-dns to other non-dns, + # since dns will never reach this point without an active + # connection. + reactor.callLater(0.0, temp_p.try_to_connect) + return temp_p + else: + p = self.get_peer_by_id(temp_p.peer_location()) + if overwrite_connection: + if connection: + log.info("Updating status to connected for peer {}.".format(temp_p.peer_location())) + p.update_status(PEER_STATUS_CONNECTED) + else: + p.update_status(PEER_STATUS_DISCONNECTED) + if with_nick: + p.set_nick(nick) + return p + + def get_all_connected_peers(self) -> list: + return self.get_connected_directory_peers() + \ + self.get_connected_nondirectory_peers() + + def get_connected_directory_peers(self) -> list: + return [p for p in self.peers if p.directory and p.status() == \ + PEER_STATUS_HANDSHAKED] + + def get_connected_nondirectory_peers(self) -> list: + return [p for p in self.peers if (not p.directory) and p.status() == \ + PEER_STATUS_HANDSHAKED] + + def wait_for_directories(self) -> None: + # Notice this is checking for *handshaked* dps; + # the handshake will have been initiated once a + # connection was seen: + log.warn("in the wait for directories loop, this is the connected dps: {}".format(self.get_connected_directory_peers())) + if len(self.get_connected_directory_peers()) == 0: + return + # This is what triggers the start of taker/maker workflows. + if not self.on_welcome_sent: + self.on_welcome(self) + self.on_welcome_sent = True + self.wait_for_directories_loop.stop() + + """ CONTROL MESSAGES SENT BY US + """ + def send_peers(self, requesting_peer: OnionPeer, + peerid_filter: list=[]) -> None: + """ This message is sent by directory peers on request + by non-directory peers. + If peerid_filter is specified, only peers whose peerid is in + this list will be sent. (TODO this is inefficient). + The peerlist message should have this format: + (1) entries comma separated + (2) each entry is serialized nick then the NICK_PEERLOCATOR_SEPARATOR + then *either* 66 char hex peerid, *or* peerid@host:port + (3) However this message might be long enough to exceed a 1300 byte limit, + if we don't use a filter, so we may need to split it into multiple + messages (TODO). + """ + if not requesting_peer.status() == PEER_STATUS_HANDSHAKED: + raise OnionPeerConnectionError( + "Cannot send peer list to unhandshaked peer") + peerlist = set() + for p in self.get_connected_nondirectory_peers(): + # don't send a peer to itself + if p.peer_location() == requesting_peer.peer_location(): + continue + if len(peerid_filter) > 0 and p.peer_location() not in peerid_filter: + continue + if not p.status() == PEER_STATUS_HANDSHAKED: + # don't advertise what is not online. + continue + # peers that haven't sent their nick yet are not + # privmsg-reachable; don't send them + if p.nick == "": + continue + peerlist.add(p.get_nick_peerlocation_ser()) + # For testing: dns won't usually participate: + peerlist.add(self.self_as_peer.get_nick_peerlocation_ser()) + self._send(requesting_peer, OnionCustomMessage(",".join( + peerlist), CONTROL_MESSAGE_TYPES["peerlist"])) diff --git a/jmdaemon/test/test_daemon_protocol.py b/jmdaemon/test/test_daemon_protocol.py index 71beba734..f9dbf390e 100644 --- a/jmdaemon/test/test_daemon_protocol.py +++ b/jmdaemon/test/test_daemon_protocol.py @@ -7,7 +7,7 @@ from jmdaemon.protocol import NICK_HASH_LENGTH, NICK_MAX_ENCODED, JM_VERSION,\ JOINMARKET_NICK_HEADER from jmbase import get_log -from jmclient import (load_test_config, jm_single, get_irc_mchannels) +from jmclient import (load_test_config, jm_single, get_mchannels) from twisted.python.log import msg as tmsg from twisted.python.log import startLogging from twisted.internet import protocol, reactor, task @@ -59,7 +59,7 @@ def connectionMade(self): def clientStart(self): self.sigs_received = 0 - irc = get_irc_mchannels() + irc = [get_mchannels()[0]] d = self.callRemote(JMInit, bcsource="dummyblockchain", network="dummynetwork", diff --git a/jmdaemon/test/test_irc_messaging.py b/jmdaemon/test/test_irc_messaging.py index 755a20c69..0e9812fd7 100644 --- a/jmdaemon/test/test_irc_messaging.py +++ b/jmdaemon/test/test_irc_messaging.py @@ -6,7 +6,7 @@ from twisted.internet import reactor, task from jmdaemon import IRCMessageChannel, MessageChannelCollection #needed for test framework -from jmclient import (load_test_config, get_irc_mchannels, jm_single) +from jmclient import (load_test_config, get_mchannels, jm_single) si = 1 class DummyDaemon(object): @@ -95,7 +95,7 @@ def junk_fill(mc): def getmc(nick): dm = DummyDaemon() - mc = DummyMC(get_irc_mchannels()[0], nick, dm) + mc = DummyMC(get_mchannels()[0], nick, dm) mc.register_orderbookwatch_callbacks(on_order_seen=on_order_seen) mc.register_taker_callbacks(on_pubkey=on_pubkey) mc.on_connect = on_connect @@ -108,7 +108,7 @@ class TrialIRC(unittest.TestCase): def setUp(self): load_test_config() - print(get_irc_mchannels()[0]) + print(get_mchannels()[0]) jm_single().maker_timeout_sec = 1 dm, mc, mcc = getmc("irc_publisher") dm2, mc2, mcc2 = getmc("irc_receiver") diff --git a/jmdaemon/test/test_orderbookwatch.py b/jmdaemon/test/test_orderbookwatch.py index 39d4de791..17797a635 100644 --- a/jmdaemon/test/test_orderbookwatch.py +++ b/jmdaemon/test/test_orderbookwatch.py @@ -2,7 +2,7 @@ from jmdaemon.orderbookwatch import OrderbookWatch from jmdaemon import IRCMessageChannel, fidelity_bond_cmd_list -from jmclient import get_irc_mchannels, load_test_config +from jmclient import get_mchannels, load_test_config from jmdaemon.protocol import JM_VERSION, ORDER_KEYS from jmbase.support import hextobin from jmclient.fidelity_bond import FidelityBondProof @@ -24,7 +24,7 @@ def on_welcome(x): def get_ob(): load_test_config() dm = DummyDaemon() - mc = DummyMC(get_irc_mchannels()[0], "test", dm) + mc = DummyMC(get_mchannels()[0], "test", dm) ob = OrderbookWatch() ob.on_welcome = on_welcome ob.set_msgchan(mc) diff --git a/scripts/obwatch/ob-watcher.py b/scripts/obwatch/ob-watcher.py index 324f92c98..b6a2950ed 100755 --- a/scripts/obwatch/ob-watcher.py +++ b/scripts/obwatch/ob-watcher.py @@ -44,7 +44,7 @@ import matplotlib.pyplot as plt from jmclient import jm_single, load_program_config, calc_cj_fee, \ - get_irc_mchannels, add_base_options + get_mchannels, add_base_options from jmdaemon import OrderbookWatch, MessageChannelCollection, IRCMessageChannel #TODO this is only for base58, find a solution for a client without jmbitcoin import jmbitcoin as btc @@ -804,7 +804,7 @@ def main(): (options, args) = parser.parse_args() load_program_config(config_path=options.datadir) hostport = (options.host, options.port) - mcs = [ObIRCMessageChannel(c) for c in get_irc_mchannels()] + mcs = [ObIRCMessageChannel(c) for c in get_mchannels()] mcc = MessageChannelCollection(mcs) mcc.set_nick(get_dummy_nick()) taker = ObBasic(mcc, hostport) diff --git a/test/e2e-coinjoin-test.py b/test/e2e-coinjoin-test.py new file mode 100644 index 000000000..600d6ecd5 --- /dev/null +++ b/test/e2e-coinjoin-test.py @@ -0,0 +1,364 @@ +#! /usr/bin/env python +'''Creates wallets and yield generators in regtest, + then runs both them and a JMWalletDaemon instance + for the taker, injecting the newly created taker + wallet into it and running sendpayment once. + Number of ygs is configured in the joinmarket.cfg + with `regtest-count` in the `ln-onion` type MESSAGING + section. + See notes below for more detail on config. + Run it like: + pytest \ + --btcroot=/path/to/bitcoin/bin/ \ + --btcpwd=123456abcdef --btcconf=/blah/bitcoin.conf \ + -s test/ln-ygrunner.py + ''' +from twisted.internet import reactor, defer +from twisted.web.client import readBody, Headers +from common import make_wallets +import pytest +import random +import json +from datetime import datetime +from jmbase import (get_nontor_agent, BytesProducer, jmprint, + get_log, stop_reactor, hextobin, bintohex) +from jmclient import (YieldGeneratorBasic, load_test_config, jm_single, + JMClientProtocolFactory, start_reactor, SegwitWallet, get_mchannels, + SegwitLegacyWallet, JMWalletDaemon) +from jmclient.wallet_utils import wallet_gettimelockaddress +from jmclient.wallet_rpc import api_version_string + +log = get_log() + +# For quicker testing, restrict the range of timelock +# addresses to avoid slow load of multiple bots. +# Note: no need to revert this change as ygrunner runs +# in isolation. +from jmclient import FidelityBondMixin +FidelityBondMixin.TIMELOCK_ERA_YEARS = 2 +FidelityBondMixin.TIMELOCK_EPOCH_YEAR = datetime.now().year +FidelityBondMixin.TIMENUMBERS_PER_PUBKEY = 12 + +wallet_name = "test-onion-yg-runner.jmdat" + +mean_amt = 2.0 + +directory_node_indices = [1] + +# +def get_onion_messaging_config_regtest(run_num: int, dns=[1], hsd=""): + """ Sets a onion messaging channel section for a regtest instance + indexed by `run_num`. The indices to be used as directory nodes + should be passed as `dns`, as a list of ints. + """ + def location_string(directory_node_run_num): + return "127.0.0.1:" + str( + 8080 + directory_node_run_num) + if run_num in dns: + # means *we* are a dn, and dns currently + # do not use other dns: + dns_to_use = [location_string(run_num)] + else: + dns_to_use = [location_string(a) for a in dns] + dn_nodes_list = ",".join(dns_to_use) + log.info("For node: {}, set dn list to: {}".format(run_num, dn_nodes_list)) + cf = {"type": "onion", + "socks5_host": "127.0.0.1", + "socks5_port": 9050, + "tor_control_host": "127.0.0.1", + "tor_control_port": 9051, + "onion_serving_host": "127.0.0.1", + "onion_serving_port": 8080 + run_num, + "hidden_service_dir": "", + "directory_nodes": dn_nodes_list, + "regtest_count": "1, 1"} + if run_num in dns: + # only directories need to use fixed hidden service directories: + cf["hidden_service_dir"] = hsd + return cf + + +class RegtestJMClientProtocolFactory(JMClientProtocolFactory): + i = 1 + def set_directory_nodes(self, dns): + # a list of integers representing the directory nodes + # for this test: + self.dns = dns + + def get_mchannels(self): + # swaps out any existing lightning configs + # in the config settings on startup, for one + # that's indexed to the regtest counter var: + default_chans = get_mchannels() + new_chans = [] + onion_found = False + hsd = "" + for c in default_chans: + if "type" in c and c["type"] == "onion": + onion_found = True + if c["hidden_service_dir"] != "": + hsd = c["hidden_service_dir"] + continue + else: + new_chans.append(c) + if onion_found: + new_chans.append(get_onion_messaging_config_regtest( + self.i, self.dns, hsd)) + return new_chans + +class JMWalletDaemonT(JMWalletDaemon): + def check_cookie(self, request): + if self.auth_disabled: + return True + return super().check_cookie(request) + +class TWalletRPCManager(object): + """ Base class for set up of tests of the + Wallet RPC calls using the wallet_rpc.JMWalletDaemon service. + """ + # the port for the jmwallet daemon + dport = 28183 + # the port for the ws + wss_port = 28283 + + def __init__(self): + # a client connnection object which is often but not always + # instantiated: + self.client_connector = None + self.daemon = JMWalletDaemonT(self.dport, self.wss_port, tls=False) + self.daemon.auth_disabled = True + # because we sync and start the wallet service manually here + # (and don't use wallet files yet), we won't have set a wallet name, + # so we set it here: + self.daemon.wallet_name = wallet_name + + def start(self): + r, s = self.daemon.startService() + self.listener_rpc = r + self.listener_ws = s + + def get_route_root(self): + addr = "http://127.0.0.1:" + str(self.dport) + addr += api_version_string + return addr + + def stop(self): + for dc in reactor.getDelayedCalls(): + dc.cancel() + d1 = defer.maybeDeferred(self.listener_ws.stopListening) + d2 = defer.maybeDeferred(self.listener_rpc.stopListening) + if self.client_connector: + self.client_connector.disconnect() + # only fire if everything is finished: + return defer.gatherResults([d1, d2]) + + @defer.inlineCallbacks + def do_request(self, agent, method, addr, body, handler, token=None): + if token: + headers = Headers({"Authorization": ["Bearer " + self.jwt_token]}) + else: + headers = None + response = yield agent.request(method, addr, headers, bodyProducer=body) + yield self.response_handler(response, handler) + + @defer.inlineCallbacks + def response_handler(self, response, handler): + body = yield readBody(response) + # these responses should always be 200 OK. + #assert response.code == 200 + # handlers check the body is as expected; no return. + yield handler(body) + return True + +def test_start_yg_and_taker_setup(setup_onion_ygrunner): + """Set up some wallets, for the ygs and 1 taker. + Then start LN and the ygs in the background, then fire + a startup of a wallet daemon for the taker who then + makes a coinjoin payment. + """ + if jm_single().config.get("POLICY", "native") == "true": + walletclass = SegwitWallet + else: + # TODO add Legacy + walletclass = SegwitLegacyWallet + + start_bot_num, end_bot_num = [int(x) for x in jm_single().config.get( + "MESSAGING:onion1", "regtest_count").split(",")] + num_ygs = end_bot_num - start_bot_num + # specify the number of wallets and bots of each type: + wallet_services = make_wallets(num_ygs + 1, + wallet_structures=[[1, 3, 0, 0, 0]] * (num_ygs + 1), + mean_amt=2.0, + walletclass=walletclass) + #the sendpayment bot uses the last wallet in the list + wallet_service = wallet_services[end_bot_num - 1]['wallet'] + jmprint("\n\nTaker wallet seed : " + wallet_services[end_bot_num - 1]['seed']) + # for manual audit if necessary, show the maker's wallet seeds + # also (note this audit should be automated in future, see + # test_full_coinjoin.py in this directory) + jmprint("\n\nMaker wallet seeds: ") + for i in range(start_bot_num, end_bot_num): + jmprint("Maker seed: " + wallet_services[i - 1]['seed']) + jmprint("\n") + wallet_service.sync_wallet(fast=True) + ygclass = YieldGeneratorBasic + + # As per previous note, override non-default command line settings: + options = {} + for x in ["ordertype", "txfee_contribution", "txfee_contribution_factor", + "cjfee_a", "cjfee_r", "cjfee_factor", "minsize", "size_factor"]: + options[x] = jm_single().config.get("YIELDGENERATOR", x) + ordertype = options["ordertype"] + txfee_contribution = int(options["txfee_contribution"]) + txfee_contribution_factor = float(options["txfee_contribution_factor"]) + cjfee_factor = float(options["cjfee_factor"]) + size_factor = float(options["size_factor"]) + if ordertype == 'reloffer': + cjfee_r = options["cjfee_r"] + # minimum size is such that you always net profit at least 20% + #of the miner fee + minsize = max(int(1.2 * txfee_contribution / float(cjfee_r)), + int(options["minsize"])) + cjfee_a = None + elif ordertype == 'absoffer': + cjfee_a = int(options["cjfee_a"]) + minsize = int(options["minsize"]) + cjfee_r = None + else: + assert False, "incorrect offertype config for yieldgenerator." + + txtype = wallet_service.get_txtype() + if txtype == "p2wpkh": + prefix = "sw0" + elif txtype == "p2sh-p2wpkh": + prefix = "sw" + elif txtype == "p2pkh": + prefix = "" + else: + assert False, "Unsupported wallet type for yieldgenerator: " + txtype + + ordertype = prefix + ordertype + + for i in range(start_bot_num, end_bot_num): + cfg = [txfee_contribution, cjfee_a, cjfee_r, ordertype, minsize, + txfee_contribution_factor, cjfee_factor, size_factor] + wallet_service_yg = wallet_services[i - 1]["wallet"] + + wallet_service_yg.startService() + + yg = ygclass(wallet_service_yg, cfg) + clientfactory = RegtestJMClientProtocolFactory(yg, proto_type="MAKER") + # This ensures that the right rpc/port config is passed into the daemon, + # for this specific bot: + clientfactory.i = i + # This ensures that this bot knows which other bots are directory nodes: + clientfactory.set_directory_nodes(directory_node_indices) + nodaemon = jm_single().config.getint("DAEMON", "no_daemon") + daemon = True if nodaemon == 1 else False + #rs = True if i == num_ygs - 1 else False + start_reactor(jm_single().config.get("DAEMON", "daemon_host"), + jm_single().config.getint("DAEMON", "daemon_port"), + clientfactory, daemon=daemon, rs=False) + reactor.callLater(1.0, start_test_taker, wallet_services[end_bot_num - 1]['wallet'], end_bot_num) + reactor.run() + +@defer.inlineCallbacks +def start_test_taker(wallet_service, i): + # this rpc manager has auth disabled, + # and the wallet_service is set manually, + # so no unlock etc. + mgr = TWalletRPCManager() + mgr.daemon.wallet_service = wallet_service + # because we are manually setting the wallet_service + # of the JMWalletDaemon instance, we do not follow the + # usual flow of `initialize_wallet_service`, we do not set + # the auth token or start the websocket; so we must manually + # sync the wallet, including bypassing any restart callback: + def dummy_restart_callback(msg): + log.warn("Ignoring rescan request from backend wallet service: " + msg) + mgr.daemon.wallet_service.add_restart_callback(dummy_restart_callback) + mgr.daemon.wallet_name = wallet_name + while not mgr.daemon.wallet_service.synced: + mgr.daemon.wallet_service.sync_wallet(fast=True) + mgr.daemon.wallet_service.startService() + def get_client_factory(): + clientfactory = RegtestJMClientProtocolFactory(mgr.daemon.taker, + proto_type="TAKER") + clientfactory.i = i + clientfactory.set_directory_nodes(directory_node_indices) + return clientfactory + + mgr.daemon.get_client_factory = get_client_factory + # before preparing the RPC call to the wallet daemon, + # we decide a coinjoin destination and amount. Choosing + # a destination in the wallet is a bit easier because + # we can query the mixdepth balance at the end. + coinjoin_destination = mgr.daemon.wallet_service.get_internal_addr(4) + cj_amount = 22000000 + # once the taker is finished we sanity check before + # shutting down: + def dummy_taker_finished(res, fromtx=False, + waittime=0.0, txdetails=None): + jmprint("Taker is finished") + # check that the funds have arrived. + mbal = mgr.daemon.wallet_service.get_balance_by_mixdepth()[4] + assert mbal == cj_amount + jmprint("Funds: {} sats successfully arrived into mixdepth 4.".format(cj_amount)) + stop_reactor() + mgr.daemon.taker_finished = dummy_taker_finished + mgr.start() + agent = get_nontor_agent() + addr = mgr.get_route_root() + addr += "/wallet/" + addr += mgr.daemon.wallet_name + addr += "/taker/coinjoin" + addr = addr.encode() + body = BytesProducer(json.dumps({"mixdepth": "1", + "amount_sats": cj_amount, + "counterparties": "2", + "destination": coinjoin_destination}).encode()) + yield mgr.do_request(agent, b"POST", addr, body, + process_coinjoin_response) + +def process_coinjoin_response(response): + json_body = json.loads(response.decode("utf-8")) + print("coinjoin response: {}".format(json_body)) + +def get_addr_and_fund(yg): + """ This function allows us to create + and publish a fidelity bond for a particular + yield generator object after the wallet has reached + a synced state and is therefore ready to serve up + timelock addresses. We create the TL address, fund it, + refresh the wallet and then republish our offers, which + will also publish the new FB. + """ + if not yg.wallet_service.synced: + return + if yg.wallet_service.timelock_funded: + return + addr = wallet_gettimelockaddress(yg.wallet_service.wallet, "2021-11") + print("Got timelockaddress: {}".format(addr)) + + # pay into it; amount is randomized for now. + # Note that grab_coins already mines 1 block. + fb_amt = random.randint(1, 5) + jm_single().bc_interface.grab_coins(addr, fb_amt) + + # we no longer have to run this loop (TODO kill with nonlocal) + yg.wallet_service.timelock_funded = True + + # force wallet to check for the new coins so the new + # yg offers will include them: + yg.wallet_service.transaction_monitor() + + # publish a new offer: + yg.offerlist = yg.create_my_orders() + yg.fidelity_bond = yg.get_fidelity_bond_template() + jmprint('updated offerlist={}'.format(yg.offerlist)) + +@pytest.fixture(scope="module") +def setup_onion_ygrunner(): + load_test_config() + jm_single().bc_interface.tick_forward_chain_interval = 10 + jm_single().bc_interface.simulate_blocks() diff --git a/test/regtest_joinmarket.cfg b/test/regtest_joinmarket.cfg index 4d3c211cf..3345e29ff 100644 --- a/test/regtest_joinmarket.cfg +++ b/test/regtest_joinmarket.cfg @@ -16,6 +16,7 @@ network = testnet rpc_wallet_file = jm-test-wallet [MESSAGING:server1] +type = irc host = localhost hostid = localhost1 channel = joinmarket-pit @@ -26,6 +27,7 @@ socks5_host = localhost socks5_port = 9150 [MESSAGING:server2] +type = irc host = localhost hostid = localhost2 channel = joinmarket-pit @@ -35,8 +37,46 @@ socks5 = false socks5_host = localhost socks5_port = 9150 +[MESSAGING:onion1] +# onion based message channels must have the exact type 'onion' +# (while the section name above can be MESSAGING:whatever), and there must +# be only ONE such message channel configured (note the directory servers +# can be multiple, below): +type = onion +socks5_host = localhost +socks5_port = 9050 +# the tor control configuration: +tor_control_host = localhost +# or, to use a UNIX socket +# control_host = unix:/var/run/tor/control +tor_control_port = 9051 +# the host/port actually serving the hidden service +# (note the *virtual port*, that the client uses, +# is hardcoded to 80): +onion_serving_host = 127.0.0.1 +onion_serving_port = 8080 +# This is mandatory for directory nodes (who must also set their +# own .onion:port as the only directory in directory_nodes, below), +# but NOT TO BE USED by non-directory nodes (which is you, unless +# you know otherwise!), as it will greatly degrade your privacy. +# +# Special handling on regtest, so just ignore and let the code handle it: +hidden_service_dir = "" +# This is a comma separated list (comma can be omitted if only one item). +# Each item has format host:port +# On regtest we are going to increment the port numbers served from, with +# the value used here as the starting value: +directory_nodes = localhost:8081 +# this is not present in default real config +# and is specifically used to flag tests: +# means we use indices 1,2,3,4,5: +regtest_count=1,5 + [TIMEOUT] -maker_timeout_sec = 15 +maker_timeout_sec = 10 + +[LOGGING] +console_log_level = DEBUG [POLICY] # for dust sweeping, try merge_algorithm = gradual diff --git a/test/ygrunner.py b/test/ygrunner.py index 88ef65b97..d657179d5 100644 --- a/test/ygrunner.py +++ b/test/ygrunner.py @@ -96,7 +96,7 @@ def on_tx_received(self, nick, tx, offerinfo): "num_ygs, wallet_structures, fb_indices, mean_amt, malicious, deterministic", [ # 1sp 3yg, honest makers, one maker has FB: - (3, [[1, 3, 0, 0, 0]] * 4, [1, 2], 2, 0, False), + (3, [[1, 3, 0, 0, 0]] * 4, [], 2, 0, False), # 1sp 3yg, malicious makers reject on auth and on tx 30% of time #(3, [[1, 3, 0, 0, 0]] * 4, 2, 30, False), # 1 sp 9 ygs, deterministically malicious 50% of time @@ -173,6 +173,7 @@ def test_start_ygs(setup_ygrunner, num_ygs, wallet_structures, fb_indices, ygclass = DeterministicMaliciousYieldGenerator else: ygclass = MaliciousYieldGenerator + for i in range(num_ygs): cfg = [txfee_contribution, cjfee_a, cjfee_r, ordertype, minsize, txfee_contribution_factor, cjfee_factor, size_factor]