diff --git a/.nengobones.yml b/.nengobones.yml index f03803903..444070cfc 100644 --- a/.nengobones.yml +++ b/.nengobones.yml @@ -437,7 +437,7 @@ docs_conf_py: analytics_id: UA-41658423-2 travis_yml: - python: 3.5.2 + python: 3.6 global_vars: NENGO_VERSION: nengo[tests] NENGO_DL_VERSION: nengo-dl @@ -452,10 +452,11 @@ travis_yml: NENGO_VERSION: git+https://github.com/nengo/nengo.git#egg=nengo[tests] NENGO_DL_VERSION: git+https://github.com/nengo/nengo-dl.git#egg=nengo-dl - script: hardware + python: 3.5.2 # nxsdk requires 3.5.2 env: NENGO_VERSION: git+https://github.com/nengo/nengo.git#egg=nengo[tests] NENGO_DL_VERSION: git+https://github.com/nengo/nengo-dl.git#egg=nengo-dl - NXSDK_VERSION: 0.8.5 + NXSDK_VERSION: 0.9 - script: docs env: NENGO_VERSION: git+https://github.com/nengo/nengo.git#egg=nengo[tests] @@ -472,7 +473,7 @@ ci_scripts: - $NENGO_VERSION - $NENGO_DL_VERSION - jupyter - - numpy + - numpy>=1.14 # avoid the default-installed 1.13 on TravisCI coverage: true nengo_tests: true - template: static diff --git a/.travis.yml b/.travis.yml index d46e3f8ab..6c9810c99 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ # Automatically generated by nengo-bones, do not edit this file directly language: python -python: 3.5.2 +python: 3.6 notifications: email: on_success: change @@ -41,8 +41,9 @@ jobs: env: NENGO_VERSION="git+https://github.com/nengo/nengo.git#egg=nengo[tests]" NENGO_DL_VERSION="git+https://github.com/nengo/nengo-dl.git#egg=nengo-dl" - NXSDK_VERSION="0.8.5" + NXSDK_VERSION="0.9" SCRIPT="hardware" + python: 3.5.2 - env: NENGO_VERSION="git+https://github.com/nengo/nengo.git#egg=nengo[tests]" diff --git a/CHANGES.rst b/CHANGES.rst index 2a2883d9a..24cdf9f3a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -31,6 +31,8 @@ Release history (`#240 `__) - A more informative error message is raised if any encoders contain NaNs. (`#251 `__) +- Nengo Loihi now supports NxSDK version 0.9.0. + (`#255 `__) **Changed** @@ -39,6 +41,8 @@ Release history (`#226 `__) - The ``scipy`` package is now required to run Nengo Loihi. (`#240 `__) +- Nengo Loihi now requires NxSDK version 0.8.7. + (`#234 `__) 0.8.0 (June 23, 2019) ===================== diff --git a/docs/benchmarks/parallel_ensembles.py b/docs/benchmarks/parallel_ensembles.py new file mode 100644 index 000000000..78a49e8df --- /dev/null +++ b/docs/benchmarks/parallel_ensembles.py @@ -0,0 +1,172 @@ +import cProfile +import timeit + +import matplotlib.pyplot as plt +import nengo +import nengo_loihi +from nengo_loihi import decode_neurons +import numpy as np + +rng = np.random.RandomState(1) +seed = rng.randint(0, 2**31) + +LoihiEmulator = lambda net: nengo_loihi.Simulator(net, target="sim") +LoihiSimulator = lambda net: nengo_loihi.Simulator( + net, target="loihi", hardware_options=dict(snip_max_spikes_per_step=300), +) +NengoSimulator = lambda net: nengo.Simulator(net) + +Simulator = LoihiSimulator + +funnel_input = False +funnel_output = True +learning = False +profile = False +#profile = True + +simtime = 10 + +#n_input = 6 +#n_output = 3 +n_input = 10 +n_output = 5 + +#n_neurons = 10 +#n_neurons = 100 +n_neurons = 400 +#n_neurons = 1000 + +#n_ensembles = 1 +#n_ensembles = 3 +n_ensembles = 20 + +#pes_learning_rate = 5e-9 +pes_learning_rate = 5e-6 +#pes_learning_rate = 5e-4 + +# synapse time constants +tau_input = 0.01 +tau_error = 0.01 +tau_output = 0.01 + +encoders_dist = nengo.dists.UniformHypersphere(surface=True) +encoders = encoders_dist.sample(n_neurons * n_ensembles, n_input, rng=rng) +encoders = encoders.reshape(n_ensembles, n_neurons, n_input) + +input_freq = np.pi +input_phases = (1 / n_input) * np.arange(n_input) + +# desired transform between input and output +assert n_input % n_output == 0 +inout_transform = np.repeat(np.eye(n_output), n_input / n_output, axis=1) +inout_transform /= inout_transform.sum(axis=1, keepdims=True) + +net = nengo.Network(seed=seed) +net.config[nengo.Ensemble].neuron_type = nengo.LIF() +net.config[nengo.Ensemble].intercepts = nengo.dists.Uniform(-1, 0.5) +net.config[nengo.Ensemble].max_rates = nengo.dists.Uniform(100, 200) +# net.config[nengo.Connection].synapse = None + +with net: + error_rng = np.random.RandomState(5) + + def ctrl_func(t, u_adapt): + inputs = np.sin(input_freq*t + 2*np.pi*input_phases) + + target_output = np.dot(inout_transform, inputs) + errors = u_adapt - target_output + + return inputs.tolist() + errors.tolist() + + ctrl = nengo.Node(ctrl_func, size_in=n_output, size_out=n_input+n_output, label="ctrl") + ctrl_probe = nengo.Probe(ctrl) + + input = ctrl[:n_input] + error = ctrl[n_input:] + output = ctrl + + inp2ens_transform = None + if funnel_input: + input_decodeneurons = decode_neurons.Preset10DecodeNeurons() + onchip_input = input_decodeneurons.get_ensemble(dim=n_input) + nengo.Connection(input, onchip_input, synapse=None) + inp2ens_transform = np.hstack( + [np.eye(n_input), -np.eye(n_input)] * input_decodeneurons.pairs_per_dim + ) + input = onchip_input + + if funnel_output: + output_decodeneurons = decode_neurons.Preset10DecodeNeurons() + onchip_output = output_decodeneurons.get_ensemble(dim=n_output) + out2ctrl_transform = np.hstack( + [np.eye(n_output), -np.eye(n_output)] * output_decodeneurons.pairs_per_dim + ) / 2000. + nengo.Connection( + onchip_output.neurons, + output, + transform=out2ctrl_transform, + synapse=tau_output, + ) + output = onchip_output + + for ii in range(n_ensembles): + ens = nengo.Ensemble( + n_neurons=n_neurons, + dimensions=n_input, + radius=np.sqrt(n_input), + encoders=encoders[ii], + label="ens%02d" % ii, + ) + + if inp2ens_transform is not None: + inp2ens_transform_ii = np.dot(encoders[ii], inp2ens_transform) + nengo.Connection( + input.neurons, + ens.neurons, + transform=inp2ens_transform_ii, + synapse=tau_input + ) + else: + nengo.Connection(input, ens, synapse=tau_input) + + conn_kwargs = dict() + if learning: + conn_kwargs["transform"] = rng.uniform(-0.01, 0.01, size=(n_output, n_input)) + conn_kwargs["learning_rule_type"] = nengo.PES( + pes_learning_rate, pre_synapse=tau_error, + ) + else: + conn_kwargs["transform"] = inout_transform / n_ensembles + + conn = nengo.Connection(ens, output, **conn_kwargs) + + if learning: + nengo.Connection(error, conn.learning_rule, synapse=None) + + +with Simulator(net) as sim: + sim.run(0.001) # eliminate any extra startup from timing + + steps = sim.n_steps + timer = timeit.default_timer() + if profile: + cProfile.runctx("sim.run(%s)" % simtime, globals(), locals(), sort="cumtime") + else: + sim.run(simtime) + timer = timeit.default_timer() - timer + steps = sim.n_steps - steps + print('Run time/step: %0.2f ms' % (1000 * timer / steps)) + + inputs = sim.data[ctrl_probe][:, :n_input] + targets = np.dot(inputs, inout_transform.T) + errors = sim.data[ctrl_probe][:, n_input:] + outputs = errors + targets + + error_steps = min(sim.n_steps, 2000) + start_error = np.abs(errors[:error_steps]).mean() + end_error = np.abs(errors[-error_steps:]).mean() + print("Error: start %0.3f, end %0.3f" % (start_error, end_error)) + + plt.plot(sim.trange(), targets, ":") + plt.plot(sim.trange(), outputs) + plt.savefig("parallel_ensembles.pdf") diff --git a/docs/conf.py b/docs/conf.py index 4c16775e5..8528336fd 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -43,7 +43,10 @@ # -- sphinx nitpicky = True -exclude_patterns = ["_build", "**/.ipynb_checkpoints"] +exclude_patterns = [ + "_build", + "**/.ipynb_checkpoints", +] linkcheck_timeout = 30 source_suffix = ".rst" source_encoding = "utf-8" diff --git a/nengo_loihi/block.py b/nengo_loihi/block.py index 96509b0c2..5201b5c56 100644 --- a/nengo_loihi/block.py +++ b/nengo_loihi/block.py @@ -3,7 +3,7 @@ from nengo.exceptions import BuildError import numpy as np -import scipy +import scipy.sparse from nengo_loihi.nxsdk_obfuscation import d diff --git a/nengo_loihi/builder/connection.py b/nengo_loihi/builder/connection.py index cf7a58167..2719402dd 100644 --- a/nengo_loihi/builder/connection.py +++ b/nengo_loihi/builder/connection.py @@ -15,7 +15,7 @@ from nengo.exceptions import BuildError, ValidationError from nengo.solvers import NoSolver, Solver import numpy as np -import scipy +import scipy.sparse from nengo_loihi.block import Axon, LoihiBlock, Probe, Synapse from nengo_loihi.builder.builder import Builder @@ -185,7 +185,7 @@ def build_host_to_chip(model, conn): build_chip_connection(model, receive2post) logger.debug("Creating DecodeNeuron ensemble for %s", conn) - ens = model.node_neurons.get_ensemble(dim) + ens = model.node_neurons.get_ensemble(dim, is_input=True, add_to_container=False) ens.label = None if conn.label is None else "%s_ens" % conn.label _inherit_seed(host, ens, model, conn) host.build(ens) diff --git a/nengo_loihi/builder/inputs.py b/nengo_loihi/builder/inputs.py index 54a7997fa..95c2fb714 100644 --- a/nengo_loihi/builder/inputs.py +++ b/nengo_loihi/builder/inputs.py @@ -9,11 +9,13 @@ class HostSendNode(Node): """For sending host->chip messages""" - def __init__(self, dimensions, label=Default): + def __init__(self, dimensions, label=Default, check_output=False): self.queue = [] super(HostSendNode, self).__init__( self.update, size_in=dimensions, size_out=0, label=label ) + if hasattr(self, "check_output"): + self.check_output = check_output def update(self, t, x): assert len(self.queue) == 0 or t > self.queue[-1][0] @@ -23,20 +25,18 @@ def update(self, t, x): class HostReceiveNode(Node): """For receiving chip->host messages""" - def __init__(self, dimensions, label=Default): + def __init__(self, dimensions, label=Default, check_output=False): self.queue = [(0, np.zeros(dimensions))] - self.queue_index = 0 super(HostReceiveNode, self).__init__( self.update, size_in=0, size_out=dimensions, label=label ) + if hasattr(self, "check_output"): + self.check_output = check_output def update(self, t): - while ( - len(self.queue) > self.queue_index + 1 - and self.queue[self.queue_index][0] < t - ): - self.queue_index += 1 - return self.queue[self.queue_index][1] + t1, x = self.queue[-1] + assert t >= t1 + return x def receive(self, t, x): self.queue.append((t, x)) @@ -48,6 +48,7 @@ class ChipReceiveNode(Node): def __init__(self, dimensions, size_out, label=Default): self.raw_dimensions = dimensions self.spikes = [] + self.error_target = None self.spike_input = None # set by builder super(ChipReceiveNode, self).__init__( self.update, size_in=0, size_out=size_out, label=label @@ -64,10 +65,11 @@ def receive(self, t, x): def update(self, t): raise SimulationError("ChipReceiveNodes should not be run") + def collect_errors(self): + return () + def collect_spikes(self): - assert self.spike_input is not None - for t, x in self.spikes: - yield (self.spike_input, t, x) + return self.spikes class ChipReceiveNeurons(ChipReceiveNode): @@ -80,8 +82,9 @@ def __init__(self, dimensions, neuron_type=None, label=Default): class PESModulatoryTarget: def __init__(self, target): - self.target = target self.errors = OrderedDict() + self.error_target = target + self.spike_input = None def clear(self): self.errors.clear() @@ -94,5 +97,7 @@ def receive(self, t, x): self.errors[t] = np.array(x) def collect_errors(self): - for t, x in self.errors.items(): - yield (self.target, t, x) + return self.errors.items() + + def collect_spikes(self): + return () diff --git a/nengo_loihi/builder/sparse_matrix.py b/nengo_loihi/builder/sparse_matrix.py index 8e0610d56..741d6ae3c 100644 --- a/nengo_loihi/builder/sparse_matrix.py +++ b/nengo_loihi/builder/sparse_matrix.py @@ -1,5 +1,5 @@ import numpy as np -import scipy +import scipy.sparse def expand_matrix(matrix, shape): diff --git a/nengo_loihi/builder/tests/test_sparse_matrix.py b/nengo_loihi/builder/tests/test_sparse_matrix.py index a13c5f11e..d3f36135e 100644 --- a/nengo_loihi/builder/tests/test_sparse_matrix.py +++ b/nengo_loihi/builder/tests/test_sparse_matrix.py @@ -1,6 +1,6 @@ import numpy as np import pytest -import scipy +import scipy.sparse from nengo_loihi.builder.sparse_matrix import ( expand_matrix, diff --git a/nengo_loihi/compat.py b/nengo_loihi/compat.py index 211d5cad4..bcc7f61ba 100644 --- a/nengo_loihi/compat.py +++ b/nengo_loihi/compat.py @@ -3,7 +3,7 @@ import nengo import numpy as np -import scipy +import scipy.sparse logger = logging.getLogger(__name__) @@ -12,6 +12,7 @@ from nengo.builder.network import seed_network from nengo.builder.transforms import multiply import nengo.transforms as nengo_transforms + from nengo.utils.testing import signals_allclose def conn_solver(solver, activities, targets, rng): return solver(activities, targets, rng=rng) @@ -36,6 +37,7 @@ def make_process_step(process, shape_in, shape_out, dt, rng, dtype=None): else: from nengo.builder.connection import multiply + from nengo.utils.testing import allclose as signals_allclose nengo_transforms = None from nengo.dists import get_samples as _get_samples diff --git a/nengo_loihi/decode_neurons.py b/nengo_loihi/decode_neurons.py index f57c02169..17f2e1a5c 100644 --- a/nengo_loihi/decode_neurons.py +++ b/nengo_loihi/decode_neurons.py @@ -167,8 +167,8 @@ def get_block(self, weights, block_label=None, syn_label=None): return block, syn - def get_ensemble(self, dim): - if self.pairs_per_dim != 1: + def get_ensemble(self, dim, is_input=False, **ens_kwargs): + if is_input and self.pairs_per_dim != 1: # To support this, we need to figure out how to deal with the # `post_inds` that map neurons to axons. Either we can do this # on the host, in which case we'd have inputs going to the chip @@ -187,7 +187,7 @@ def get_ensemble(self, dim): encoders=encoders, gain=self.gain.repeat(dim), bias=self.bias.repeat(dim), - add_to_container=False, + **ens_kwargs ) return ens diff --git a/nengo_loihi/hardware/builder.py b/nengo_loihi/hardware/builder.py index 97e4902b5..fb0f1c80f 100644 --- a/nengo_loihi/hardware/builder.py +++ b/nengo_loihi/hardware/builder.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) -def build_board(board, seed=None): +def build_board(board, use_snips=False, seed=None): n_chips = board.n_chips n_cores_per_chip = board.n_cores_per_chip n_synapses_per_core = board.n_synapses_per_core @@ -32,7 +32,7 @@ def build_board(board, seed=None): # add our own attribute for storing our spike generator assert not hasattr(nxsdk_board, "global_spike_generator") - nxsdk_board.global_spike_generator = SpikeGen(nxsdk_board) + nxsdk_board.global_spike_generator = None if use_snips else SpikeGen(nxsdk_board) # custom attr for storing SpikeInputs (filled in build_input) assert not hasattr(nxsdk_board, "spike_inputs") @@ -50,9 +50,9 @@ def build_board(board, seed=None): def build_chip(nxsdk_chip, chip, seed=None): - assert len(chip.cores) == len(d_get(nxsdk_chip, b"bjJDb3Jlcw==")) + assert len(chip.cores) == len(d_get(nxsdk_chip, b"bjJDb3Jlc0FzTGlzdA==")) rng = np.random.RandomState(seed) - for core, nxsdk_core in zip(chip.cores, d_get(nxsdk_chip, b"bjJDb3Jlcw==")): + for core, nxsdk_core in zip(chip.cores, d_get(nxsdk_chip, b"bjJDb3Jlc0FzTGlzdA==")): logger.debug("Building core %s", core) seed = rng.randint(npext.maxint) build_core(nxsdk_core, core, seed=seed) @@ -380,6 +380,10 @@ def build_input(nxsdk_core, core, spike_input, compartment_idxs): # add any pre-existing spikes to spikegen for t in spike_input.spike_times(): + assert ( + nxsdk_board.global_spike_generator is not None + ), "Cannot add pre-existing spikes when using SNIPs (no spike generator)" + spikes = spike_input.spike_idxs(t) for spike in loihi_input.spikes_to_loihi(t, spikes): assert ( @@ -539,7 +543,7 @@ def build_axons(nxsdk_core, core, block, axon, compartment_ids, pop_id_map): nxsdk_board = d_get(nxsdk_core, b"cGFyZW50", b"cGFyZW50") tchip_id = d_get(d_get(nxsdk_board, b"bjJDaGlwcw==")[tchip_idx], b"aWQ=") tcore_id = d_get( - d_get(d_get(nxsdk_board, b"bjJDaGlwcw==")[tchip_idx], b"bjJDb3Jlcw==")[ + d_get(d_get(nxsdk_board, b"bjJDaGlwcw==")[tchip_idx], b"bjJDb3Jlc0FzTGlzdA==")[ tcore_idx ], b"aWQ=", diff --git a/nengo_loihi/hardware/interface.py b/nengo_loihi/hardware/interface.py index 4822c9d7a..b1ef3992b 100644 --- a/nengo_loihi/hardware/interface.py +++ b/nengo_loihi/hardware/interface.py @@ -3,6 +3,8 @@ import logging import os import shutil +import socket +import struct import tempfile import time import warnings @@ -17,13 +19,21 @@ from nengo_loihi.hardware.allocators import OneToOne, RoundRobin from nengo_loihi.hardware.builder import build_board from nengo_loihi.nxsdk_obfuscation import d, d_func, d_get -from nengo_loihi.hardware.nxsdk_shim import assert_nxsdk, nxsdk, SpikeProbe +from nengo_loihi.hardware.nxsdk_shim import assert_nxsdk, nxsdk, SnipPhase, SpikeProbe from nengo_loihi.hardware.validate import validate_board from nengo_loihi.validate import validate_model logger = logging.getLogger(__name__) +def ceil_div(a, b): + return -((-a) // b) + + +def roundup(a, b): + return b * ceil_div(a, b) + + class HardwareInterface: """Place a Model onto a Loihi board and run it. @@ -43,6 +53,10 @@ class HardwareInterface: Defaults to one block and one input per core on a single chip. """ + min_nxsdk_version = LooseVersion("0.8.7") + max_nxsdk_version = LooseVersion("0.9.0") + channel_packet_size = 64 # size of channel packets in int32s + def __init__( self, model, @@ -57,25 +71,32 @@ def __init__( ) self.closed = False - self.use_snips = use_snips - self.check_nxsdk_version() - self.nxsdk_board = None self.nengo_io_h2c = None # IO snip host-to-chip channel self.nengo_io_c2h = None # IO snip chip-to-host channel + self.host_socket = None # IO snip superhost (this) <-> host socket + self.host_socket_connected = False + self.host_socket_port = None + self.error_chip_map = {} # maps synapses to chip locations for errors self._probe_filters = {} self._probe_filter_pos = {} self._snip_probe_data = collections.OrderedDict() self._chip2host_sent_steps = 0 + self.model = model + self.use_snips = use_snips + self.seed = seed # Maximum number of spikes that can be sent through # the nengo_io_h2c channel on one timestep. self.snip_max_spikes_per_step = snip_max_spikes_per_step + self.allocator = allocator + + self.check_nxsdk_version() # clear cached content from SpikeProbe class attribute d_func(SpikeProbe, b"cHJvYmVEaWN0", b"Y2xlYXI=") - self.build(model, allocator=allocator, seed=seed) + self.build() def __enter__(self): return self @@ -83,24 +104,23 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self.close() - @staticmethod - def check_nxsdk_version(): + @classmethod + def check_nxsdk_version(cls): # raise exception if nxsdk not installed assert_nxsdk() # if installed, check version version = LooseVersion(getattr(nxsdk, "__version__", "0.0.0")) - minimum = LooseVersion("0.8.5") - max_tested = LooseVersion("0.8.5") - if version < minimum: + if version < cls.min_nxsdk_version: raise ImportError( - "nengo-loihi requires nxsdk>=%s, found %s" % (minimum, version) + "nengo-loihi requires nxsdk>=%s, found %s" + % (cls.min_nxsdk_version, version) ) - elif version > max_tested: + elif version > cls.max_nxsdk_version: warnings.warn( "nengo-loihi has not been tested with your nxsdk " "version (%s); latest fully supported version is " - "%s" % (version, max_tested) + "%s" % (version, cls.max_nxsdk_version) ) def _iter_blocks(self): @@ -111,10 +131,11 @@ def _iter_probes(self): for probe in block.probes: yield probe - def build(self, model, allocator, seed=None): - validate_model(model) - self.model = model - self.pes_error_scale = getattr(model, "pes_error_scale", 1.0) + def build(self): + assert self.nxsdk_board is None, "Cannot rebuild model" + + validate_model(self.model) + self.pes_error_scale = getattr(self.model, "pes_error_scale", 1.0) if self.use_snips: # tag all probes as being snip-based, @@ -124,22 +145,40 @@ def build(self, model, allocator, seed=None): self._snip_probe_data[probe] = [] # --- allocate - self.board = allocator(self.model) + self.board = self.allocator(self.model) # --- validate validate_board(self.board) # --- build - self.nxsdk_board = build_board(self.board, seed=seed) + self.nxsdk_board = build_board( + self.board, use_snips=self.use_snips, seed=self.seed + ) - def run_steps(self, steps, blocking=True): - if self.use_snips and self.nengo_io_h2c is None: + # --- create snips + if self.use_snips: self.create_io_snip() - # NOTE: we need to call connect() after snips are created - self.connect() + def run_steps(self, steps, blocking=True): + self.connect() # returns immediately if already connected d_get(self.nxsdk_board, b"cnVu")(steps, **{d(b"YVN5bmM="): not blocking}) + # connect to host socket + if self.host_socket is not None and not self.host_socket_connected: + # pause to allow host snip to start and listen for connection + time.sleep(0.1) + + host_address = self.nxsdk_board.executor._host_coordinator.hostAddr + print( + "Connecting to host socket at (%s, %s)" + % (host_address, self.host_socket_port) + ) + self.host_socket.connect((host_address, self.host_socket_port)) + self.host_socket_connected = True + + # send number of steps to host process + self.host_socket.send(struct.pack("i", steps)) + def _chip2host_monitor(self, probes_receivers): increment = None for probe, receiver in probes_receivers.items(): @@ -173,9 +212,9 @@ def _chip2host_monitor(self, probes_receivers): self._chip2host_sent_steps += increment def _chip2host_snips(self, probes_receivers): - count = self.nengo_io_c2h_count - data = self.nengo_io_c2h.read(count) - time_step, data = data[0], np.array(data[1:], dtype=np.int32) + data = self.host_socket.recv(4 * self.nengo_io_c2h_count) + data = np.frombuffer(data, dtype=np.int32) + time_step, data = data[0], data[1:] snip_range = self.nengo_io_snip_range for probe in self._snip_probe_data: @@ -229,6 +268,8 @@ def _host2chip_spikegen(self, loihi_spikes): ) def _host2chip_snips(self, loihi_spikes, loihi_errors): + assert self.host_socket_connected + max_spikes = self.snip_max_spikes_per_step if len(loihi_spikes) > max_spikes: warnings.warn( @@ -247,8 +288,15 @@ def _host2chip_snips(self, loihi_spikes, loihi_errors): assert len(loihi_errors) == self.nengo_io_h2c_errors for error in loihi_errors: msg.extend(error) - assert len(msg) <= self.nengo_io_h2c.numElements - self.nengo_io_h2c.write(len(msg), msg) + + i = 0 + send_size = 1024 + while i < len(msg): + msg_i = msg[i : i + send_size] + msg_bytes = struct.pack("%di" % len(msg_i), *msg_i) + bytes_sent = self.host_socket.send(msg_bytes) + assert bytes_sent == len(msg_bytes), "Host socket send failed" + i += len(msg_i) def host2chip(self, spikes, errors): loihi_spikes = [] @@ -256,22 +304,38 @@ def host2chip(self, spikes, errors): spike_input = self.nxsdk_board.spike_inputs[spike_input] loihi_spikes.extend(spike_input.spikes_to_loihi(t, s)) - loihi_errors = [] + error_info = [] + error_vecs = [] for synapse, t, e in errors: - coreid = None - for core in self.board.chips[0].cores: - for block in core.blocks: - if synapse in block.synapses: - # TODO: assumes one block per core - coreid = core.learning_coreid + core_id = self.error_chip_map.get(synapse, None) + if core_id is None: + for core in self.board.chips[0].cores: + for block in core.blocks: + if synapse in block.synapses: + assert ( + len(core.blocks) == 1 + ), "Learning not implemented with multiple blocks per core" + core_id = core.learning_coreid + break + + if core_id is not None: break - if coreid is not None: - break + assert core_id is not None + self.error_chip_map[synapse] = core_id + + error_info.append([core_id, len(e)]) + error_vecs.append(e) + + loihi_errors = [] + if len(error_vecs) > 0: + error_vecs = np.concatenate(error_vecs) + error_vecs = scale_pes_errors(error_vecs, scale=self.pes_error_scale) - assert coreid is not None - e = scale_pes_errors(e, scale=self.pes_error_scale) - loihi_errors.append([coreid, len(e)] + e.tolist()) + i = 0 + for core_id, e_len in error_info: + loihi_errors.append([core_id, e_len] + error_vecs[i:i + e_len].tolist()) + i += e_len if self.use_snips: return self._host2chip_snips(loihi_spikes, loihi_errors) @@ -308,6 +372,16 @@ def connect(self, attempts=10): raise SimulationError("Could not connect to the board") def close(self): + if self.host_socket is not None and self.host_socket_connected: + # send -1 to signal host/chip that we're done + self.host_socket.send(struct.pack("i", -1)) + + # pause to allow chip to receive -1 signal via host + time.sleep(0.1) + + self.host_socket.close() + self.host_socket_connected = False + if self.nxsdk_board is not None: d_func(self.nxsdk_board, b"ZGlzY29ubmVjdA==") @@ -368,6 +442,7 @@ def create_io_snip(self): n_errors = 0 total_error_len = 0 max_error_len = 0 + assert len(self.board.chips) == 1, "Learning not implemented for multiple chips" for core in self.board.chips[0].cores: # TODO: don't assume 1 chip if core.learning_coreid: error_len = core.blocks[0].n_neurons // 2 @@ -375,6 +450,8 @@ def create_io_snip(self): n_errors += 1 total_error_len += 2 + error_len + print("n_errors: %d" % n_errors) + n_outputs = 1 probes = [] cores = set() @@ -404,7 +481,7 @@ def create_io_snip(self): get_channel=d(b"Z2V0Q2hhbm5lbElE"), int_type=d(b"aW50MzJfdA=="), spike_size=d(b"Mg=="), - error_info_size=d(b"Mg=="), + error_info_size=d(b"Mg==", int), step=d(b"dGltZV9zdGVw"), read=d(b"cmVhZENoYW5uZWw="), write=d(b"d3JpdGVDaGFubmVs"), @@ -427,6 +504,8 @@ def create_io_snip(self): ), ) + n_output_packets = ceil_div(n_outputs, self.channel_packet_size) + # --- write c file using template template = env.get_template("nengo_io.c.template") self.tmp_snip_dir = tempfile.TemporaryDirectory() @@ -439,10 +518,21 @@ def create_io_snip(self): len(cores), len(probes), ) + chip_buffer_size = roundup( + max( + n_outputs, # currently, buffer needs to hold all outputs at once + self.channel_packet_size + + max(SpikePacker.size(), obfs["error_info_size"]), + ), + self.channel_packet_size, + ) code = template.render( n_outputs=n_outputs, + n_output_packets=n_output_packets, n_errors=n_errors, max_error_len=max_error_len, + buffer_size=chip_buffer_size, + packet_size=self.channel_packet_size, cores=cores, probes=probes, obfs=obfs, @@ -455,8 +545,8 @@ def create_io_snip(self): with open(os.path.join(snips_dir, "nengo_learn.c"), "w") as f: f.write(code) - # --- create SNIP process and channels - logger.debug("Creating nengo_io snip process") + # --- create chip processes + logger.debug("Creating nengo_io chip process") nengo_io = d_func( self.nxsdk_board, b"Y3JlYXRlUHJvY2Vzcw==", @@ -469,7 +559,7 @@ def create_io_snip(self): b"cGhhc2U=": d(b"bWdtdA=="), }, ) - logger.debug("Creating nengo_learn snip process") + logger.debug("Creating nengo_learn chip process") c_path = os.path.join(self.tmp_snip_dir.name, "nengo_learn.c") shutil.copyfile(os.path.join(snips_dir, "nengo_learn.c"), c_path) d_func( @@ -485,21 +575,68 @@ def create_io_snip(self): }, ) - size = ( + # --- create host process (for faster communication via sockets) + host_socket_port = np.random.randint(50000, 60000) + + max_inputs = n_errors + self.snip_max_spikes_per_step * SpikePacker.size() + host_buffer_size = roundup(max(max_inputs, n_outputs), self.channel_packet_size) + + host_template = env.get_template("nengo_host.cc.template") + host_path = os.path.join(self.tmp_snip_dir.name, "nengo_host.cc") + host_code = host_template.render( + host_buffer_size=host_buffer_size, + n_outputs=n_outputs, + n_output_packets=n_output_packets, + server_port=host_socket_port, + input_channel="nengo_io_h2c", + output_channel="nengo_io_c2h", + packet_size=self.channel_packet_size, + obfs=obfs, + ) + with open(host_path, "w") as f: + f.write(host_code) + + # make process + host_process = d_func( + self.nxsdk_board, + b"Y3JlYXRlU25pcA==", + kwargs={ + b"cGhhc2U=": SnipPhase.HOST_CONCURRENT_EXECUTION, + b"Y3BwRmlsZQ==": host_path, + }, + ) + + # connect to host socket + print("Making superhost socket") + self.host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.host_socket_port = host_socket_port + + # --- create channels + input_channel_size = ( 1 # first int stores number of spikes + self.snip_max_spikes_per_step * SpikePacker.size() + total_error_len ) - logger.debug("Creating nengo_io_h2c channel (%d)" % size) + logger.debug("Creating nengo_io_h2c channel (%d)" % input_channel_size) self.nengo_io_h2c = d_get(self.nxsdk_board, b"Y3JlYXRlQ2hhbm5lbA==")( - b"nengo_io_h2c", "int", size + b"nengo_io_h2c", + **{ + d(b"bnVtRWxlbWVudHM="): input_channel_size, + d(b"bWVzc2FnZVNpemU="): 4 * self.channel_packet_size, + d(b"c2xhY2s="): 16, # size of buffer on chip (in packets) + }, ) logger.debug("Creating nengo_io_c2h channel (%d)" % n_outputs) self.nengo_io_c2h = d_get(self.nxsdk_board, b"Y3JlYXRlQ2hhbm5lbA==")( - b"nengo_io_c2h", "int", n_outputs + b"nengo_io_c2h", + **{ + d(b"bnVtRWxlbWVudHM="): n_outputs, + d(b"bWVzc2FnZVNpemU="): 4 * self.channel_packet_size, + d(b"c2xhY2s="): 16, # size of buffer on chip (in packets) + }, ) - d_get(self.nengo_io_h2c, b"Y29ubmVjdA==")(None, nengo_io) - d_get(self.nengo_io_c2h, b"Y29ubmVjdA==")(nengo_io, None) + d_get(self.nengo_io_h2c, b"Y29ubmVjdA==")(host_process, nengo_io) + d_get(self.nengo_io_c2h, b"Y29ubmVjdA==")(nengo_io, host_process) self.nengo_io_h2c_errors = n_errors self.nengo_io_c2h_count = n_outputs self.nengo_io_snip_range = snip_range diff --git a/nengo_loihi/hardware/nxsdk_shim.py b/nengo_loihi/hardware/nxsdk_shim.py index 27b8a858c..5eb172481 100644 --- a/nengo_loihi/hardware/nxsdk_shim.py +++ b/nengo_loihi/hardware/nxsdk_shim.py @@ -80,9 +80,11 @@ def assert_nxsdk(exception=exception): b"bnhzZGsuZ3JhcGgubnhpbnB1dGdlbi5ueGlucHV0Z2Vu", b"QmFzaWNTcGlrZUdlbmVyYXRvcg==" ) SpikeProbe = d_import(b"bnhzZGsuZ3JhcGgubnhwcm9iZXM=", b"TjJTcGlrZVByb2Jl") + SnipPhase = d_import(b"bnhzZGsuZ3JhcGgucHJvY2Vzc2VzLnBoYXNlX2VudW1z", b"UGhhc2U=") else: micro_gen = None TraceConfigGenerator = None NxsdkBoard = None SpikeGen = None SpikeProbe = None + SnipPhase = None diff --git a/nengo_loihi/hardware/snips/nengo_host.cc.template b/nengo_loihi/hardware/snips/nengo_host.cc.template new file mode 100644 index 000000000..1c0c8ab49 --- /dev/null +++ b/nengo_loihi/hardware/snips/nengo_host.cc.template @@ -0,0 +1,156 @@ +#include +#include +#include +#include +#include // usleep + +#include +#include +#include +#include +#include + +#include "nxsdkhost.h" + +#define N_OUTPUTS {{ n_outputs }} +#define N_OUTPUT_PACKETS {{ n_output_packets }} +#define BUFFER_SIZE {{ host_buffer_size }} +#define PACKET_SIZE {{ packet_size }} + +#define SERVER_PORT htons({{ server_port }}) + + +namespace nengo_host { + +template +inline T ceil_div(T a, T b) { + return (a / b) + (a % b != 0); +} + +const char input_channel[] = "{{ input_channel }}"; +const char output_channel[] = "{{ output_channel }}"; + +class NengoHostProcess : public ConcurrentHostSnip { + int32_t buffer[BUFFER_SIZE]; + int server_socket; + int client_socket; + + public: + NengoHostProcess() { + // --- set up socket to communicate with superhost + // This machine will act as the server, and the superhost will + // connect in as the client + server_socket = socket(AF_INET, SOCK_STREAM, 0); + + sockaddr_in server_address; + server_address.sin_family = AF_INET; + server_address.sin_port = SERVER_PORT; + server_address.sin_addr.s_addr = INADDR_ANY; + + bind(server_socket, (struct sockaddr*)&server_address, sizeof(struct sockaddr)); + + // wait for a client + std::cout << "Host snip listening for client" << std::endl; + listen(server_socket, 1); + + // get incoming client connection + sockaddr_in client_address; + socklen_t sin_size = sizeof(struct sockaddr_in); + client_socket = accept( + server_socket, (struct sockaddr*)&client_address, &sin_size + ); + + std::cout << "Host snip connected to client" << std::endl; + } + + ~NengoHostProcess() { + std::cout << "Closing host (server) socket" << std::endl; + close(server_socket); + } + + void run(std::atomic_bool& end_of_execution) override { + // --- loop to transmit data from superhost to chip and vice versa + int step_counter = -1; + + while (!end_of_execution) { + // Wait until host sends number of run steps + while (!end_of_execution && step_counter <= 0) { + ssize_t n_read0 = recv(client_socket, buffer, sizeof(int32_t), MSG_DONTWAIT); + if (n_read0 == 4) { + step_counter = buffer[0]; + if (step_counter > 0) { + std::cout << "Host running " << step_counter << " steps" << std::endl; + } + break; + } else if (n_read0 > 0) { + std::cout << "Host received unexpected number of bytes (" + << n_read0 << ")" << std::endl; + return; + } else { + usleep(100); + } + } + if (end_of_execution) { + break; + } else if (step_counter <= 0) { + break; + } + + // read messages from superhost socket + // we make sure BUFFER_SIZE is large enough for any message + // TODO: read in blocks of e.g 4096 bytes (small power of 2) for speed + size_t n_read = read(client_socket, buffer, BUFFER_SIZE * sizeof(int32_t)); + if (n_read % sizeof(int32_t) != 0) { + std::cout << "Did not read full int32s from socket" << std::endl; + break; + } + if (buffer[0] < 0) { + std::cout << "Host received shutdown signal: " << buffer[0] << std::endl; + break; + } + + // write packets to chip (PACKET_SIZE == sizeof(int32_t)) + size_t n_packets = ceil_div(n_read, 4*PACKET_SIZE); + writeChannel(input_channel, buffer, n_packets); + + // wait until chip has written output + while (!end_of_execution && !probeChannel(output_channel)) { + usleep(100); + } + if (end_of_execution) { + break; + } + + // read chip output + readChannel(output_channel, buffer, N_OUTPUT_PACKETS); + + // write output to superhost socket + const size_t write_len = N_OUTPUTS; + const size_t write_bytes = write_len * sizeof(int32_t); + size_t n_write = write(client_socket, buffer, write_bytes); + if (n_write != write_bytes) { + std::cout << "Failed write to socket (tried " << write_bytes + << ", wrote " << n_write << " bytes)" << std::endl; + break;; + } + + // next timestep + step_counter--; + } + + if (step_counter != 0) { + // make sure chip gets shutdown signal + buffer[0] = (step_counter > 0) ? -1 : step_counter; + writeChannel(input_channel, buffer, 1); + } + } +}; + +} // namespace nengo_host + +using nengo_host::NengoHostProcess; + +// Each ConcurrentHostSnip is run within a thread +// If you have more threads on the host cpu, you can choose to create individual +// snips for input and output +REGISTER_SNIP(NengoHostProcess, ConcurrentHostSnip); diff --git a/nengo_loihi/hardware/snips/nengo_io.c.template b/nengo_loihi/hardware/snips/nengo_io.c.template index a0b31e6e7..99a3ee556 100644 --- a/nengo_loihi/hardware/snips/nengo_io.c.template +++ b/nengo_loihi/hardware/snips/nengo_io.c.template @@ -4,16 +4,31 @@ #define DEBUG 0 #define N_OUTPUTS {{ n_outputs }} +#define N_OUTPUT_PACKETS {{ n_output_packets }} #define N_ERRORS {{ n_errors }} -#define MAX_ERROR_LEN {{ max_error_len }} +#define BUFFER_SIZE {{ buffer_size }} +#define PACKET_SIZE {{ packet_size }} #define SPIKE_SIZE {{ obfs.spike_size }} #define ERROR_INFO_SIZE {{ obfs.error_info_size }} +int is_shutdown = 0; // if true, we've been asked to shut down + +inline int min(int a, int b) { + return (a < b) ? a : b; +} + int guard_io(runState *s) { - return 1; + return !is_shutdown; } void nengo_io(runState *s) { +#if SPIKE_SIZE != 2 + // SPIKE_SIZE != 2 will require a number of changes to this function. + // Search for SPIKE_SIZE == 2 precompiler directives. + printf("SPIKE_SIZE == 2 assertion failed\n"); + return; +#endif + {% for core in cores %} {{ obfs.core_class }} *core{{ core }} = NEURON_PTR((CoreId){ .id={{ core }} }); {% endfor %} @@ -24,12 +39,18 @@ void nengo_io(runState *s) { {{ obfs.int_type }} axon_type; {{ obfs.int_type }} axon_id; {{ obfs.int_type }} atom; - {{ obfs.int_type }} count[1]; - {{ obfs.int_type }} spike[SPIKE_SIZE]; - {{ obfs.int_type }} error_info[ERROR_INFO_SIZE]; - {{ obfs.int_type }} error_data[MAX_ERROR_LEN]; - {{ obfs.int_type }} error_index; - {{ obfs.int_type }} output[N_OUTPUTS]; + {{ obfs.int_type }} n_spikes; // input spike count + {{ obfs.int_type }} i_spike; // input spike position + {{ obfs.int_type }} *spike; + + {{ obfs.int_type }} error_index; // index into error stored in shared data + {{ obfs.int_type }} i_error = 0; // index of error block + {{ obfs.int_type }} j_error = 0; // index of error in error block + {{ obfs.int_type }} n_errors = -1; // number of errors in error block + + {{ obfs.int_type }} buffer[BUFFER_SIZE]; + {{ obfs.int_type }} buffer_pos; // current read position in buffer + {{ obfs.int_type }} buffer_len; // current length of info in buffer if (in_channel == -1 || out_channel == -1) { printf("Got an invalid channel ID\n"); @@ -40,24 +61,55 @@ void nengo_io(runState *s) { printf("time %d\n", s->{{ obfs.step }}); } - {{ obfs.read }}(in_channel, count, 1); - if (DEBUG) { - printf("count %d\n", count[0]); + // read first packet + {{ obfs.read }}(in_channel, buffer, 1); + buffer_len = PACKET_SIZE; + n_spikes = buffer[0]; + buffer_pos = 1; + if (n_spikes < 0) { + printf("Chip received shutdown signal: %d\n", n_spikes); + is_shutdown = 1; + return; } +#if DEBUG + printf("num input spikes: %d\n", n_spikes); +#endif + + for (i_spike = 0; i_spike < n_spikes; i_spike++) { + // read a new packet if necessary + if (buffer_pos + SPIKE_SIZE > buffer_len) { + if (buffer_len - buffer_pos > 0) { + // part of a spike remains at end of buffer +#if SPIKE_SIZE == 2 + buffer[0] = buffer[buffer_pos]; + buffer_len = 1; +#endif + } else { + buffer_len = 0; + } - for (int i=0; i < count[0]; i++) { - {{ obfs.read }}(in_channel, spike, SPIKE_SIZE); - if (DEBUG) { - printf("send spike %d.%d\n", spike[0], spike[1]); + // read next packet + {{ obfs.read }}(in_channel, &buffer[buffer_len], 1); + buffer_len += PACKET_SIZE; + buffer_pos = 0; } + + spike = &buffer[buffer_pos]; + buffer_pos += SPIKE_SIZE; +#if DEBUG + printf("send spike %d.%d\n", spike[0], spike[1]); +#endif + +#if SPIKE_SIZE == 2 core_id = ({{ obfs.id_class }}) { .id=(spike[0] >> {{ obfs.spike_shift }}) }; axon_id = spike[0] & {{ obfs.spike_mask }}; axon_type = spike[1] >> {{ obfs.spike_shift }}; atom = spike[1] & {{ obfs.spike_mask }}; - if (DEBUG) { - printf("send spike core=%d, axon=%d, type=%d atom=%d\n", - core_id.id, axon_id, axon_type, atom); - } +#endif +#if DEBUG + printf("send spike core=%d, axon=%d, type=%d atom=%d\n", + core_id.id, axon_id, axon_type, atom); +#endif if (axon_type == {{ obfs.axon_type_0 }}) { {{ obfs.do_axon_type_0 }}(s->{{ obfs.step }}, core_id, axon_id); } else if (axon_type == {{ obfs.axon_type_1 }}) { @@ -71,25 +123,60 @@ void nengo_io(runState *s) { // Communicate with learning snip s->{{ obfs.data }}[0] = N_ERRORS; error_index = 1; - for (int i=0; i < N_ERRORS; i++) { - {{ obfs.read }}(in_channel, error_info, ERROR_INFO_SIZE); - {{ obfs.read }}(in_channel, error_data, error_info[1]); - s->{{ obfs.data }}[error_index] = error_info[0]; - s->{{ obfs.data }}[error_index + 1] = error_info[1]; - for (int j=0; j < error_info[1]; j++) { - s->{{ obfs.data }}[error_index + ERROR_INFO_SIZE + j] = error_data[j]; + i_error = 0; + j_error = 0; + n_errors = -1; + + while (i_error < N_ERRORS || j_error < n_errors) { + // read from channel + if (buffer_pos + ((n_errors < 0) ? ERROR_INFO_SIZE : 1) > buffer_len) { + if (buffer_pos < buffer_len) { + // part of the error info remains at end of buffer +#if ERROR_INFO_SIZE == 2 + buffer[0] = buffer[buffer_pos]; + buffer_len = 1; +#endif + } else { + buffer_len = 0; + } + + // read next packet + {{ obfs.read }}(in_channel, &buffer[buffer_len], 1); + buffer_len += PACKET_SIZE; + buffer_pos = 0; + } + + if (n_errors < 0) { + // move to next error block +{% for j in range(obfs.error_info_size) %} + s->{{ obfs.data }}[error_index + {{ j }}] = buffer[buffer_pos + {{ j }}]; +{% endfor %} + n_errors = buffer[buffer_pos + 1]; + j_error = 0; + error_index += ERROR_INFO_SIZE; + buffer_pos += ERROR_INFO_SIZE; + } else { + // read next error + s->{{ obfs.data }}[error_index] = buffer[buffer_pos]; + j_error++; + error_index++; + buffer_pos++; + } + + if (j_error == n_errors) { + i_error++; + n_errors = -1; } - error_index += ERROR_INFO_SIZE + error_info[1]; } - output[0] = s->{{ obfs.step }}; + buffer[0] = s->{{ obfs.step }}; {% for n_out, core, compartment, key in probes %} {% if key == 'u' %} - output[{{ n_out }}] = core{{ core }}->{{ obfs.state }}[{{ compartment }}].U; + buffer[{{ n_out }}] = core{{ core }}->{{ obfs.state }}[{{ compartment }}].U; {% elif key in ('v', 'spike') %} - output[{{ n_out }}] = core{{ core }}->{{ obfs.state }}[{{ compartment }}].V; + buffer[{{ n_out }}] = core{{ core }}->{{ obfs.state }}[{{ compartment }}].V; {% endif %} {% endfor %} - {{ obfs.write }}(out_channel, output, N_OUTPUTS); + {{ obfs.write }}(out_channel, buffer, N_OUTPUT_PACKETS); } diff --git a/nengo_loihi/hardware/snips/nengo_learn.c.template b/nengo_loihi/hardware/snips/nengo_learn.c.template index d2038d665..c5db54ce9 100644 --- a/nengo_loihi/hardware/snips/nengo_learn.c.template +++ b/nengo_loihi/hardware/snips/nengo_learn.c.template @@ -38,9 +38,11 @@ void nengo_learn(runState *s) { for (int error_index=0; error_index < n_errors; error_index++) { core = s->{{ obfs.data }} [offset]; n_vals = s->{{ obfs.data }} [offset+1]; + + neuron = {{ obfs.neuron }}(({{ obfs.id_class }}) {.id=core}); + for (int i=0; i < n_vals; i++) { error = (signed char) s->{{ obfs.data }} [offset+2+i]; - neuron = {{ obfs.neuron }}(({{ obfs.id_class }}) {.id=core}); compartment_idx = i; if (error > 0) { diff --git a/nengo_loihi/hardware/tests/test_interface.py b/nengo_loihi/hardware/tests/test_interface.py index 3989588de..2228e2ce0 100644 --- a/nengo_loihi/hardware/tests/test_interface.py +++ b/nengo_loihi/hardware/tests/test_interface.py @@ -1,8 +1,11 @@ import nengo from nengo.exceptions import SimulationError +import numpy as np import pytest +import nengo_loihi from nengo_loihi.block import Axon, LoihiBlock, Synapse +from nengo_loihi.compat import signals_allclose from nengo_loihi.builder import Model from nengo_loihi.discretize import discretize_model from nengo_loihi.hardware import interface as hardware_interface @@ -27,7 +30,7 @@ def test_error_on_old_version(monkeypatch): def test_no_warn_on_current_version(monkeypatch): mock = MockNxsdk() - mock.__version__ = "0.8.5" + mock.__version__ = str(hardware_interface.HardwareInterface.max_nxsdk_version) monkeypatch.setattr(hardware_interface, "nxsdk", mock) monkeypatch.setattr(hardware_interface, "assert_nxsdk", lambda: True) diff --git a/nengo_loihi/simulator.py b/nengo_loihi/simulator.py index 78999877c..cfc9150fd 100644 --- a/nengo_loihi/simulator.py +++ b/nengo_loihi/simulator.py @@ -1,6 +1,6 @@ from collections import OrderedDict import logging -import traceback +import timeit import warnings import nengo @@ -16,7 +16,6 @@ from nengo_loihi.discretize import discretize_model from nengo_loihi.emulator import EmulatorInterface from nengo_loihi.hardware import HardwareInterface, HAS_NXSDK -from nengo_loihi.nxsdk_obfuscation import d_func from nengo_loihi.splitter import Split logger = logging.getLogger(__name__) @@ -97,7 +96,7 @@ def __init__( # noqa: C901 dt=0.001, seed=None, model=None, - precompute=False, + precompute=None, target=None, progress_bar=None, remove_passthrough=True, @@ -105,7 +104,6 @@ def __init__( # noqa: C901 ): # initialize values used in __del__ and close() first self.closed = True - self.precompute = precompute self.network = network self.sims = OrderedDict() self._run_steps = None @@ -139,7 +137,9 @@ def __init__( # noqa: C901 # determine how to split the host into one, two or three models self.model.split = Split( - network, precompute=precompute, remove_passthrough=remove_passthrough + network, + precompute=False if precompute is None else precompute, + remove_passthrough=remove_passthrough, ) # Build the network into the model @@ -154,8 +154,24 @@ def __init__( # noqa: C901 self.model.seeded[conn] = False self.model.build(conn) - if len(self.model.host_pre.params): - assert precompute + # Create host_pre and host simulators if necessary + self.precompute = precompute + if precompute is None: + # If there is no host then all objects must be on the chip, + # which is precomputable in the sense that + # no communication has to happen with the host. + self.precompute = len(self.model.host.params) == 0 + elif len(self.model.host_pre.params) == 0 and precompute: + warnings.warn( + "No precomputable objects. Setting precompute=True has no effect." + ) + elif len(self.model.host.params) == 0 and not precompute: + warnings.warn( + "Model is pre-computable. Setting precompute=False may slow execution." + ) + + if len(self.model.host_pre.params) > 0: + assert self.precompute self.sims["host_pre"] = nengo.Simulator( network=None, dt=self.dt, @@ -163,12 +179,8 @@ def __init__( # noqa: C901 progress_bar=False, optimize=False, ) - elif precompute: - warnings.warn( - "No precomputable objects. Setting " "precompute=True has no effect." - ) - if len(self.model.host.params): + if len(self.model.host.params) > 0: self.sims["host"] = nengo.Simulator( network=None, dt=self.dt, @@ -176,13 +188,6 @@ def __init__( # noqa: C901 progress_bar=False, optimize=False, ) - elif not precompute: - # If there is no host and precompute=False, then all objects - # must be on the chip, which is precomputable in the sense that - # no communication has to happen with the host. - # We could warn about this, but we want to avoid people having - # to specify `precompute` unless they absolutely have to. - self.precompute = True self._probe_outputs = self.model.params self.data = ProbeDict(self._probe_outputs) @@ -316,6 +321,7 @@ def reset(self, seed=None): self._n_steps = 0 self._time = 0 + self.timers = dict(steps=0.0) # clear probe data for probe in self.model.probes: @@ -365,37 +371,103 @@ def step(self): self.run_steps(1) def _collect_receiver_info(self): + if not hasattr(self, "_error_synapse_map"): + self._error_synapse_map = {} + for _, receiver in self.model.host2chip_senders.items(): + probe = receiver.error_target + if probe is None: + continue + + conn = self.model.probe_conns[probe] + synapse = self.model.objs[conn]["decoders"] + assert synapse.learning + if probe in self._error_synapse_map: + assert self._error_synapse_map[probe] is synapse + else: + self._error_synapse_map[probe] = synapse + + self.timers["collect_recv"] = 0 + self.timers["collect_spike"] = 0 + self.timers["collect_error"] = 0 + self.timers["collect_end"] = 0 + spikes = [] errors = OrderedDict() for sender, receiver in self.model.host2chip_senders.items(): + timer = timeit.default_timer() receiver.clear() for t, x in sender.queue: receiver.receive(t, x) - del sender.queue[:] - - if hasattr(receiver, "collect_spikes"): - for spike_input, t, spike_idxs in receiver.collect_spikes(): - ti = round(t / self.model.dt) - spikes.append((spike_input, ti, spike_idxs)) - if hasattr(receiver, "collect_errors"): - for probe, t, e in receiver.collect_errors(): - conn = self.model.probe_conns[probe] - synapse = self.model.objs[conn]["decoders"] - assert synapse.learning - ti = round(t / self.model.dt) - errors_ti = errors.setdefault(ti, OrderedDict()) - if synapse in errors_ti: - errors_ti[synapse] += e - else: - errors_ti[synapse] = e.copy() - + sender.queue.clear() + self.timers["collect_recv"] += timeit.default_timer() - timer + + timer = timeit.default_timer() + spike_input = receiver.spike_input + for t, spike_idxs in receiver.collect_spikes(): + ti = round(t / self.model.dt) + spikes.append((spike_input, ti, spike_idxs)) + self.timers["collect_spike"] += timeit.default_timer() - timer + + timer = timeit.default_timer() + probe = receiver.error_target + synapse = self._error_synapse_map.get(probe, None) + for t, e in receiver.collect_errors(): + ti = round(t / self.model.dt) + errors_ti = errors.setdefault(ti, OrderedDict()) + if synapse in errors_ti: + errors_ti[synapse] += e + else: + errors_ti[synapse] = e.copy() + self.timers["collect_error"] += timeit.default_timer() - timer + + timer = timeit.default_timer() errors = [ (synapse, ti, e) for ti, ee in errors.items() for synapse, e in ee.items() ] + self.timers["collect_end"] += timeit.default_timer() - timer + return spikes, errors + + def _collect_receiver_info1(self): + spikes = [] + errors = OrderedDict() + tf = None + ti = None + for sender, receiver in self.model.host2chip_senders.items(): + assert receiver.spike_input is None or receiver.error_target is None + assert len(sender.queue) == 1 + t, x = sender.queue.pop() + if tf is None: + tf = t + ti = round(t / self.model.dt) + else: + assert t == tf + + spike_target = receiver.spike_input + if spike_target is not None: + spike_idxs = x.nonzero()[0] + spikes.append((spike_target, ti, spike_idxs)) + + error_target = receiver.error_target + if error_target is not None: + conn = self.model.probe_conns[error_target] + synapse = self.model.objs[conn]["decoders"] + assert synapse.learning + + if synapse in errors: + errors[synapse] += x + else: + errors[synapse] = x.copy() + + if len(errors) > 0: + assert ti is not None + errors = [(synapse, ti, e) for synapse, e in errors.items()] + else: + errors = [] + return spikes, errors def _host2chip(self, sim): - spikes, errors = self._collect_receiver_info() + spikes, errors = self._collect_receiver_info1() sim.host2chip(spikes, errors) def _chip2host(self, sim): @@ -505,13 +577,24 @@ def loihi_precomputed_host_only(steps): else: assert host is not None, "Model is precomputable" + self.timers["snips"] = 0 def loihi_bidirectional_with_host(steps): loihi.run_steps(steps, blocking=False) - for _ in range(steps): - host.step() + time0 = timeit.default_timer() + + # Run the first host step so there is info to send to the chip, + # then run subsequent host steps simultaneously with the chip + host.step() + for i in range(steps): self._host2chip(loihi) + if i + 1 < steps: + host.step() self._chip2host(loihi) + + time1 = timeit.default_timer() + self.timers["snips"] += time1 - time0 + logger.info("Waiting for run_steps to complete...") loihi.wait_for_completion() logger.info("run_steps completed") @@ -530,31 +613,14 @@ def run_steps(self, steps): raise SimulatorClosed("Simulator cannot run because it is closed.") self._make_run_steps() - try: - self._run_steps(steps) - except Exception: - if "loihi" in self.sims and self.sims["loihi"].use_snips: - # Need to write to board, otherwise it will wait indefinitely - h2c = self.sims["loihi"].nengo_io_h2c - c2h = self.sims["loihi"].nengo_io_c2h - print(traceback.format_exc()) - print("\nAttempting to end simulation...") + if "loihi" in self.sims: + self.sims["loihi"].connect() # connect outside timing loop - for _ in range(steps): - h2c.write(h2c.numElements, [0] * h2c.numElements) - c2h.read(c2h.numElements) - self.sims["loihi"].wait_for_completion() - d_func( - self.sims["loihi"].nxsdk_board, - b"bnhEcml2ZXI=", - b"c3RvcEV4ZWN1dGlvbg==", - ) - d_func( - self.sims["loihi"].nxsdk_board, b"bnhEcml2ZXI=", b"c3RvcERyaXZlcg==" - ) - raise + time0 = timeit.default_timer() + self._run_steps(steps) + self.timers["steps"] += timeit.default_timer() - time0 self._n_steps += steps logger.info("Finished running for %d steps", steps) self._probe() diff --git a/nengo_loihi/tests/test_connection.py b/nengo_loihi/tests/test_connection.py index 8b7b8967b..3316ecf04 100644 --- a/nengo_loihi/tests/test_connection.py +++ b/nengo_loihi/tests/test_connection.py @@ -3,7 +3,7 @@ from nengo.utils.matplotlib import rasterplot import numpy as np import pytest -import scipy +import scipy.sparse from nengo_loihi.builder import connection from nengo_loihi.compat import nengo_transforms diff --git a/nengo_loihi/tests/test_simulator.py b/nengo_loihi/tests/test_simulator.py index 45a0b43b2..6a90dbb04 100644 --- a/nengo_loihi/tests/test_simulator.py +++ b/nengo_loihi/tests/test_simulator.py @@ -452,7 +452,6 @@ def test_interface(Simulator, allclose): sim.run(1e-8) -@pytest.mark.hang @pytest.mark.target_loihi def test_loihi_simulation_exception(Simulator): """Test that Loihi shuts down properly after exception during simulation""" diff --git a/setup.py b/setup.py index 98ce1648e..b3a501fd2 100755 --- a/setup.py +++ b/setup.py @@ -29,7 +29,11 @@ def read(*filenames, **kwargs): root = os.path.dirname(os.path.realpath(__file__)) version = runpy.run_path(os.path.join(root, "nengo_loihi", "version.py"))["version"] -install_req = ["jinja2", "nengo>=2.8.0", "scipy>=1.2.1"] +install_req = [ + "jinja2", + "nengo>=2.8.0", + "scipy>=1.2.1", +] docs_req = [ "abr_control", "jupyter", @@ -76,8 +80,8 @@ def read(*filenames, **kwargs): "tests": tests_req, }, python_requires=">=3.4", - entry_points={"nengo.backends": ["loihi = nengo_loihi:Simulator"]}, - package_data={"nengo_loihi": ["nengo_loihi/snips/*"]}, + entry_points={"nengo.backends": ["loihi = nengo_loihi:Simulator",],}, + package_data={"nengo_loihi": ["nengo_loihi/snips/*",],}, classifiers=[ "Development Status :: 3 - Alpha", "Framework :: Nengo", diff --git a/test_many_ensembles.py b/test_many_ensembles.py new file mode 100644 index 000000000..d441a8def --- /dev/null +++ b/test_many_ensembles.py @@ -0,0 +1,53 @@ +import matplotlib.pyplot as plt +import nengo +import nengo_loihi +import numpy as np + + +def input_f(t, phase_offset): + return np.sin(6 * t + phase_offset + np.array([0, np.pi / 2])) + + +n_ensembles = 15 +n_neurons = 600 +phase_step = 1 # in radians +dimensions = len(input_f(0, 0)) + + +with nengo.Network(seed=34) as net: + nengo_loihi.set_defaults() + + inputs = [] + ensembles = [] + probes = [] + for i in range(n_ensembles): + + def input_f_i(t, i=i): + return input_f(t, phase_offset=i * phase_step) + + input = nengo.Node(input_f_i) + ensemble = nengo.Ensemble(n_neurons, dimensions=dimensions) + probe = nengo.Probe(ensemble, synapse=nengo.synapses.Alpha(0.01)) + nengo.Connection(input, ensemble, synapse=None) + + inputs.append(input) + ensembles.append(ensemble) + probes.append(probe) + + +with nengo_loihi.Simulator(net, precompute=False) as sim: + sim.run(1.0) + #sim.run(10.0) + print("Time/step: %0.2f ms" % (sim.timers["snips"] / sim.n_steps * 1000)) + +plt.figure() + +rows = 2 +cols = 2 + +for i in range(rows * cols): + plt.subplot(rows, cols, i + 1) + plt.plot(sim.trange(), sim.data[probes[i]]) + +plt.savefig("test_many_ensembles.png") +plt.show()