From 25480355af54963aa388a0b41afbdad91bee2d27 Mon Sep 17 00:00:00 2001 From: Jonathan Heathcote Date: Fri, 17 Jul 2015 11:20:57 +0100 Subject: [PATCH 1/8] Make SCP errors more descriptive. * All SCPError derrived exceptions now include a summary of the offending packet (most importantly x, y, p and command!). The SCPPacket object is also made accessible from the exc.packet attribute. * RC errors also now contain a human readable explanation of what the error codes mean. * Instead of having many RC error exception types, there is now one FatalReturnCodeError exception type. If differentiating between the types is important, the RC is included in the exc.return_code attribute. If required in the future, subclasses can (of course) be created for individual return codes without breaking backward compatibility. This change (strictly speaking) breaks backward compatibility as it renames exceptions of the RC-specific types. @mundya are you using these exceptions anywhere and is breaking compatibility here a problem? --- rig/machine_control/consts.py | 60 +++++++++++-- rig/machine_control/packets.py | 11 +++ rig/machine_control/scp_connection.py | 91 +++++++++++--------- tests/machine_control/test_packets.py | 14 +++ tests/machine_control/test_scp_connection.py | 27 +++--- 5 files changed, 139 insertions(+), 64 deletions(-) diff --git a/rig/machine_control/consts.py b/rig/machine_control/consts.py index 69da837..b7c61fd 100644 --- a/rig/machine_control/consts.py +++ b/rig/machine_control/consts.py @@ -18,14 +18,6 @@ produce (256 + 8 bytes). """ -SCP_RC_OK = 0x80 -"""SCP response code which indicates that everything was fine.""" - -SCP_RC_TIMEOUT = {0x8b, 0x8c, 0x8d, 0x8e} -"""SCP response codes which should be treated as if they were a packet timing -out. -""" - SPINNAKER_RTR_BASE = 0xE1000000 # Unbuffered """Base address of router hardware registers.""" @@ -93,6 +85,58 @@ class SCPCommands(enum.IntEnum): power = 57 # BMP main board power control +@add_int_enums_to_docstring +class SCPReturnCodes(enum.IntEnum): + """SCP return codes""" + ok = 0x80 # Command completed OK + len = 0x81 # Bad packet length (Fatal) + sum = 0x82 # Bad checksum (Retryable) + cmd = 0x83 # Bad/invalid command (Fatal) + arg = 0x84 # Invalid arguments (Fatal) + port = 0x85 # Bad port number (Fatal) + timeout = 0x86 # Monitor <-> app-core comms timeout (Fatal) + route = 0x87 # No P2P route (Fatal) + cpu = 0x88 # Bad CPU number (Fatal) + dead = 0x89 # SHM dest dead (Fatal) + buf = 0x8a # No free SHM buffers (Fatal) + p2p_noreply = 0x8b # No reply to open (Fatal) + p2p_reject = 0x8c # Open rejected (Fatal) + p2p_busy = 0x8d # Dest busy (Retryable) + p2p_timeout = 0x8e # Eth chip <--> destination comms timeout (Fatal) + pkt_tx = 0x8f # Pkt Tx failed (Fatal) + +RETRYABLE_SCP_RETURN_CODES = set([ + SCPReturnCodes.sum, + SCPReturnCodes.p2p_busy, +]) +"""The set of :py:class:`.SCPReturnCodes` values which indicate a non-fatal +retryable fault.""" + + +FATAL_SCP_RETURN_CODES = { + SCPReturnCodes.len: "Bad command length.", + SCPReturnCodes.cmd: "Bad/invalid command.", + SCPReturnCodes.arg: "Invalid command arguments.", + SCPReturnCodes.port: "Bad port number.", + SCPReturnCodes.timeout: + "Timeout waiting for the application core to respond to " + "the monitor core's request.", + SCPReturnCodes.route: "No P2P route to the target chip is available.", + SCPReturnCodes.cpu: "Bad CPU number.", + SCPReturnCodes.dead: "SHM dest dead.", + SCPReturnCodes.buf: "No free SHM buffers.", + SCPReturnCodes.p2p_noreply: + "No response packets from the target reached the " + "ethernet connected chip.", + SCPReturnCodes.p2p_reject: "The target chip rejected the packet.", + SCPReturnCodes.p2p_timeout: + "Communications between the ethernet connected chip and target chip " + "timedout.", + SCPReturnCodes.pkt_tx: "Packet transmission failed.", +} +"""The set of fatal SCP errors and a human-readable error.""" + + @add_int_enums_to_docstring class DataType(enum.IntEnum): """Used to specify the size of data being read to/from a SpiNNaker machine diff --git a/rig/machine_control/packets.py b/rig/machine_control/packets.py index a3a3a01..21c68e8 100644 --- a/rig/machine_control/packets.py +++ b/rig/machine_control/packets.py @@ -143,6 +143,17 @@ def packed_data(self): # Return the SCP header and the rest of the data return scp_header + self.data + def __repr__(self): + """Produce a human-redaable summary of (the most important parts of) + the packet.""" + return ("<{} x: {}, y: {}, cpu: {}, " + "cmd_rc: {}, arg1: {}, arg2: {}, arg3: {}, " + "data: {}>".format(self.__class__.__name__, + self.dest_x, self.dest_y, self.dest_cpu, + self.cmd_rc, + self.arg1, self.arg2, self.arg3, + repr(self.data))) + def _unpack_sdp_into_packet(packet, bytestring): """Unpack the SDP header from a bytestring into a packet. diff --git a/rig/machine_control/scp_connection.py b/rig/machine_control/scp_connection.py index f3f1106..4a85822 100644 --- a/rig/machine_control/scp_connection.py +++ b/rig/machine_control/scp_connection.py @@ -50,7 +50,6 @@ def __new__(cls, x, y, p, cmd, arg1=0, arg2=0, arg3=0, data=b'', class SCPConnection(object): """Implements the SCP protocol for communicating with a SpiNNaker chip. """ - error_codes = {} def __init__(self, spinnaker_host, port=consts.SCP_PORT, n_tries=5, timeout=0.5): @@ -82,15 +81,6 @@ def __init__(self, spinnaker_host, port=consts.SCP_PORT, # Sequence values self.seq = seqs() - @classmethod - def _register_error(cls, cmd_rc): - """Register an Exception class as belonging to a certain CMD_RC value. - """ - def err_(err): - cls.error_codes[cmd_rc] = err - return err - return err_ - def send_scp(self, buffer_size, x, y, p, cmd, arg1=0, arg2=0, arg3=0, data=b'', expected_args=3, timeout=0.0): """Transmit a packet to the SpiNNaker machine and block until an @@ -176,12 +166,13 @@ def send_scp_burst(self, buffer_size, window_size, class TransmittedPacket(object): """A packet which has been transmitted and still awaits a response. """ - __slots__ = ["callback", "packet", "n_tries", + __slots__ = ["callback", "packet", "bytestring", "n_tries", "timeout", "timeout_time"] def __init__(self, callback, packet, timeout): self.callback = callback self.packet = packet + self.bytestring = packet.bytestring self.n_tries = 1 self.timeout = timeout self.timeout_time = time.time() + self.timeout @@ -226,12 +217,12 @@ def __init__(self, callback, packet, timeout): # expecting a response for it and can retransmit it if # necessary. outstanding_packets[seq] = TransmittedPacket( - args.callback, packet.bytestring, + args.callback, packet, self.default_timeout + args.timeout ) # Actually send the packet - self.sock.send(outstanding_packets[seq].packet) + self.sock.send(outstanding_packets[seq].bytestring) # Listen on the socket for an acknowledgement packet, there may not # be one. @@ -260,18 +251,19 @@ def __init__(self, callback, packet, timeout): consts.SDP_HEADER_LENGTH + 2) # If the code is an error then we respond immediately - if rc != consts.SCP_RC_OK: - if rc in consts.SCP_RC_TIMEOUT: + if rc != consts.SCPReturnCodes.ok: + if rc in consts.RETRYABLE_SCP_RETURN_CODES: # If the error is timeout related then treat the packet # as though it timed out, just discard. This avoids us # hammering the board when it's most vulnerable. pass - elif rc in self.error_codes: - raise self.error_codes[rc] else: - raise SCPError( - "Unhandled exception code {:#2x}".format(rc) - ) + # For all other errors, we'll just fall over + # immediately. + packet = outstanding_packets.get(seq) + if packet is not None: + packet = packet.packet + raise FatalReturnCodeError(rc, packet) else: # Look up the sequence index of packet in the list of # outstanding packets. We may have already processed a @@ -294,10 +286,13 @@ def __init__(self, callback, packet, timeout): # the given number of times then raise a timeout error for # it. if outstanding.n_tries >= self.n_tries: - raise TimeoutError(self.n_tries) + raise TimeoutError( + "No response after {} attempts.".format( + self.n_tries), + outstanding.packet) # Otherwise we retransmit it - self.sock.send(outstanding.packet) + self.sock.send(outstanding.bytestring) outstanding.n_tries += 1 outstanding.timeout_time = (current_time + outstanding.timeout) @@ -425,34 +420,46 @@ def seqs(mask=0xffff): class SCPError(IOError): - """Base Error for SCP return codes.""" - pass + """Base Error for SCP return codes. - -class TimeoutError(SCPError): - """Raised when an SCP is not acknowledged within the given period of time. + Attributes + ---------- + packet : :py:class:`rig.machine_control.packets.SCPPacket` + The packet being processed when the error occurred. May be None if no + specific packet was involved. """ - pass + def __init__(self, message, packet=None): + self.packet = packet + if self.packet is not None: + message = "{} (Packet: {})".format(message, packet) -@SCPConnection._register_error(0x81) -class BadPacketLengthError(SCPError): - """Raised when an SCP packet is an incorrect length.""" - pass + super(SCPError, self).__init__(message) -@SCPConnection._register_error(0x83) -class InvalidCommandError(SCPError): - """Raised when an SCP packet contains an invalid command code.""" +class TimeoutError(SCPError): + """Raised when an SCP is not acknowledged within the given period of time. + """ pass -@SCPConnection._register_error(0x84) -class InvalidArgsError(SCPError): - """Raised when an SCP packet has an invalid argument.""" - pass +class FatalReturnCodeError(SCPError): + """Raised when an SCP command returns with an error which is connsidered + fatal. + Attributes + ---------- + return_code : :py:class:`rig.machine_control.consts.SCPReturnCodes` or int + The return code (will be a raw integer if the code is unrecognised). + """ -@SCPConnection._register_error(0x87) -class NoRouteError(SCPError): - """Raised when there is no route to the requested core.""" + def __init__(self, return_code=None, packet=None): + try: + self.return_code = consts.SCPReturnCodes(return_code) + message = "RC_{}: {}".format( + self.return_code.name.upper(), + consts.FATAL_SCP_RETURN_CODES[self.return_code]) + except ValueError: + self.return_code = return_code + message = "Unrecognised return code {:#2X}." + super(FatalReturnCodeError, self).__init__(message, packet) diff --git a/tests/machine_control/test_packets.py b/tests/machine_control/test_packets.py index 279a020..157754a 100644 --- a/tests/machine_control/test_packets.py +++ b/tests/machine_control/test_packets.py @@ -1,5 +1,7 @@ from rig.machine_control.packets import SDPPacket, SCPPacket +import sys + class TestSDPPacket(object): """Test SDPPacket representations.""" @@ -263,3 +265,15 @@ def test_from_bytestring_2_args_short(self): # Check that the bytestring this packet creates is the same as the one # we specified before. assert scp_packet.bytestring == packet + + def test_repr(self): + """Test the string representation of an SCP packet makes sense.""" + scp_packet = SCPPacket(dest_x=10, dest_y=20, dest_cpu=3, + cmd_rc=2, arg1=123, arg2=456, + data=b"foobar") + # Note: Python 2 does not have the "b" prefix on byte strings + assert repr(scp_packet) == ( + "".format( + "b" if sys.version_info >= (3, 0) else "")) diff --git a/tests/machine_control/test_scp_connection.py b/tests/machine_control/test_scp_connection.py index aab2b79..9c57452 100644 --- a/tests/machine_control/test_scp_connection.py +++ b/tests/machine_control/test_scp_connection.py @@ -4,9 +4,12 @@ import struct import time -from rig.machine_control.consts import SCPCommands, DataType, SDP_HEADER_LENGTH +from rig.machine_control.consts import \ + SCPCommands, DataType, SDP_HEADER_LENGTH, RETRYABLE_SCP_RETURN_CODES, \ + FATAL_SCP_RETURN_CODES from rig.machine_control.packets import SCPPacket -from rig.machine_control.scp_connection import SCPConnection, scpcall +from rig.machine_control.scp_connection import \ + SCPConnection, scpcall, FatalReturnCodeError from rig.machine_control import scp_connection @@ -193,10 +196,11 @@ def mock_select(rlist, wlist, xlist, timeout): # times we specified. assert mock_conn.sock.send.call_count == mock_conn.n_tries - @pytest.mark.parametrize("err_code", [0x8b, 0x8c, 0x8d, 0x8e]) - def test_single_packet_fails_with_RC_P2P_ERROR(self, mock_conn, err_code): + @pytest.mark.parametrize("err_code", RETRYABLE_SCP_RETURN_CODES) + def test_single_packet_fails_with_retryable_error(self, mock_conn, + err_code): """Test correct operation for transmitting a single packet which is - always acknowledged with one of the RC_P2P error codes. + always acknowledged with one of the retryable error codes. """ # Create a packet to send packets = [scpcall(3, 5, 0, 12)] @@ -344,14 +348,9 @@ def recv(self, *args): mock.patch("select.select", new=mock_select): mock_conn.send_scp_burst(512, 8, packets()) - @pytest.mark.parametrize( - "rc, error", - [(0x81, scp_connection.BadPacketLengthError), - (0x83, scp_connection.InvalidCommandError), - (0x84, scp_connection.InvalidArgsError), - (0x87, scp_connection.NoRouteError), - (0x00, Exception)]) - def test_errors(self, mock_conn, rc, error): + @pytest.mark.parametrize("rc", + list(map(int, FATAL_SCP_RETURN_CODES)) + [0x00]) + def test_errors(self, mock_conn, rc): """Test that errors are raised when error RCs are returned.""" # Create an object which returns a packet with an error code class ReturnPacket(object): @@ -374,7 +373,7 @@ def __call__(self, packet): # Send an SCP command and check that the correct error is raised packets = [scpcall(3, 5, 0, 12)] - with pytest.raises(error), \ + with pytest.raises(FatalReturnCodeError), \ mock.patch("select.select", new=mock_select): mock_conn.send_scp_burst(256, 1, iter(packets)) From 868387accd1f52a8e19f878dbfc38e45090d92df Mon Sep 17 00:00:00 2001 From: Jonathan Heathcote Date: Fri, 17 Jul 2015 11:36:27 +0100 Subject: [PATCH 2/8] Make get_machine() more robust. Instead of raising an exception, if get_machine() encounters an SCP error while probing the cores/links of a chip it now simply reports that chip as dead. This scenario most commonly occurs when a chip dies (or becomes inaccessible) some time after the P2P routing tables have been intialised. This change means that get_machine() now returns a valid subset of the machine which is still accessible and is especially useful for post-morten diagnostics, e.g. using rig-ps. Finally, get_machine() now also has an x and y argument allowing the initial P2P table reading commands to be sent to non-(0, 0) chips. Again, this is potentially useful if (0, 0) has become isolated from many other chips and an alternative ethernet connected chip is used. --- rig/machine_control/machine_controller.py | 53 ++++++++++++------- .../test_machine_controller.py | 28 +++++----- 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/rig/machine_control/machine_controller.py b/rig/machine_control/machine_controller.py index 72b5a6f..1196d82 100644 --- a/rig/machine_control/machine_controller.py +++ b/rig/machine_control/machine_controller.py @@ -14,6 +14,8 @@ from . import boot, consts, regions, struct_file from .scp_connection import SCPConnection +from rig.machine_control.scp_connection import SCPError + from rig import routing_table from rig.machine import Cores, SDRAM, SRAM, Links, Machine @@ -1328,7 +1330,7 @@ def get_num_working_cores(self, x, y): """Return the number of working cores, including the monitor.""" return self.read_struct_field("sv", "num_cpus", x, y) - def get_machine(self, default_num_cores=18): + def get_machine(self, x=0, y=0, default_num_cores=18): """Probe the machine to discover which cores and links are working. .. note:: @@ -1342,7 +1344,12 @@ def get_machine(self, default_num_cores=18): .. note:: The size of the SDRAM and SysRAM heaps is assumed to be the same - for all chips and is only checked on chip (0, 0). + for all chips and is only checked on chip (x, y). + + .. note:: + The chip (x, y) supplied is the one which will be where the search + for working chips begins. Selecting anything other than (0, 0), the + default, may be useful when debugging very broken machines. Parameters ---------- @@ -1364,19 +1371,19 @@ def get_machine(self, default_num_cores=18): :py:data:`~rig.machine.SRAM` The size of the SysRAM heap. """ - p2p_tables = self.get_p2p_routing_table(0, 0) + p2p_tables = self.get_p2p_routing_table(x, y) # Calculate the extent of the system - max_x = max(x for (x, y), r in iteritems(p2p_tables) + max_x = max(x_ for (x_, y_), r in iteritems(p2p_tables) if r != consts.P2PTableEntry.none) - max_y = max(y for (x, y), r in iteritems(p2p_tables) + max_y = max(y_ for (x_, y_), r in iteritems(p2p_tables) if r != consts.P2PTableEntry.none) # Discover the heap sizes available for memory allocation - sdram_start = self.read_struct_field("sv", "sdram_heap", 0, 0) - sdram_end = self.read_struct_field("sv", "sdram_sys", 0, 0) - sysram_start = self.read_struct_field("sv", "sysram_heap", 0, 0) - sysram_end = self.read_struct_field("sv", "vcpu_base", 0, 0) + sdram_start = self.read_struct_field("sv", "sdram_heap", x, y) + sdram_end = self.read_struct_field("sv", "sdram_sys", x, y) + sysram_start = self.read_struct_field("sv", "sysram_heap", x, y) + sysram_end = self.read_struct_field("sv", "vcpu_base", x, y) chip_resources = {Cores: default_num_cores, SDRAM: sdram_end - sdram_start, @@ -1391,17 +1398,23 @@ def get_machine(self, default_num_cores=18): if p2p_route == consts.P2PTableEntry.none: dead_chips.add((x, y)) else: - num_working_cores = self.get_num_working_cores(x, y) - working_links = self.get_working_links(x, y) - - if num_working_cores < default_num_cores: - resource_exception = chip_resources.copy() - resource_exception[Cores] = min(default_num_cores, - num_working_cores) - chip_resource_exceptions[(x, y)] = resource_exception - - for link in set(Links) - working_links: - dead_links.add((x, y, link)) + try: + num_working_cores = self.get_num_working_cores(x, y) + working_links = self.get_working_links(x, y) + + if num_working_cores < default_num_cores: + resource_exception = chip_resources.copy() + resource_exception[Cores] = min(default_num_cores, + num_working_cores) + chip_resource_exceptions[(x, y)] = \ + resource_exception + + for link in set(Links) - working_links: + dead_links.add((x, y, link)) + except SCPError: + # The chip was listed in the P2P table but is not + # responding. Assume it is dead anyway. + dead_chips.add((x, y)) return Machine(max_x + 1, max_y + 1, chip_resources, diff --git a/tests/machine_control/test_machine_controller.py b/tests/machine_control/test_machine_controller.py index 270fbe4..d91f74f 100644 --- a/tests/machine_control/test_machine_controller.py +++ b/tests/machine_control/test_machine_controller.py @@ -17,7 +17,8 @@ unpack_routing_table_entry ) from rig.machine_control.packets import SCPPacket -from rig.machine_control.scp_connection import SCPConnection +from rig.machine_control.scp_connection import \ + SCPConnection, FatalReturnCodeError from rig.machine_control import regions, consts, struct_file from rig.machine import Cores, SDRAM, SRAM, Links, Machine @@ -1762,7 +1763,7 @@ def read_struct_field(struct_name, field_name, x, y, p=0): cn.read_struct_field.side_effect = read_struct_field # Return a set of p2p tables where an 8x8 set of chips is alive with - # all chips with (3,3) being dead. + # all except (3,3) being dead. cn.get_p2p_routing_table = mock.Mock() cn.get_p2p_routing_table.return_value = { (x, y): (consts.P2PTableEntry.north @@ -1773,10 +1774,13 @@ def read_struct_field(struct_name, field_name, x, y, p=0): } # Return 18 working cores except for (2, 2) which will have only 3 - # cores. + # cores and (5, 5) which will fail to respond. def get_num_working_cores(x, y): - return 18 if (x, y) != (2, 2) else 3 + if (x, y) == (5, 5): + raise FatalReturnCodeError(0) + else: + return 18 if (x, y) != (2, 2) else 3 cn.get_num_working_cores = mock.Mock() cn.get_num_working_cores.side_effect = get_num_working_cores @@ -1791,7 +1795,7 @@ def get_working_links(x, y): cn.get_working_links = mock.Mock() cn.get_working_links.side_effect = get_working_links - m = cn.get_machine() + m = cn.get_machine(x=3, y=2) # Check that the machine is correct assert isinstance(m, Machine) @@ -1809,24 +1813,24 @@ def get_working_links(x, y): SRAM: vcpu_base - sysram_heap, }, } - assert m.dead_chips == set([(3, 3)]) + assert m.dead_chips == set([(3, 3), (5, 5)]) assert m.dead_links == set([(4, 4, Links.north)]) # Check that only the expected calls were made to mocks cn.read_struct_field.assert_has_calls([ - mock.call("sv", "sdram_heap", 0, 0), - mock.call("sv", "sdram_sys", 0, 0), - mock.call("sv", "sysram_heap", 0, 0), - mock.call("sv", "vcpu_base", 0, 0), + mock.call("sv", "sdram_heap", 3, 2), + mock.call("sv", "sdram_sys", 3, 2), + mock.call("sv", "sysram_heap", 3, 2), + mock.call("sv", "vcpu_base", 3, 2), ], any_order=True) - cn.get_p2p_routing_table.assert_called_once_with(0, 0) + cn.get_p2p_routing_table.assert_called_once_with(3, 2) cn.get_num_working_cores.assert_has_calls([ mock.call(x, y) for x in range(8) for y in range(8) if (x, y) != (3, 3) ], any_order=True) cn.get_working_links.assert_has_calls([ mock.call(x, y) for x in range(8) for y in range(8) - if (x, y) != (3, 3) + if (x, y) != (3, 3) and (x, y) != (5, 5) ], any_order=True) @pytest.mark.parametrize("app_id", [66, 12]) From 4cb3078c587ea18d1e6fba793e4e692329638fff Mon Sep 17 00:00:00 2001 From: Jonathan Heathcote Date: Fri, 17 Jul 2015 11:51:54 +0100 Subject: [PATCH 3/8] Make SCPError message default to empty. --- rig/machine_control/scp_connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rig/machine_control/scp_connection.py b/rig/machine_control/scp_connection.py index 4a85822..13054fc 100644 --- a/rig/machine_control/scp_connection.py +++ b/rig/machine_control/scp_connection.py @@ -429,12 +429,12 @@ class SCPError(IOError): specific packet was involved. """ - def __init__(self, message, packet=None): + def __init__(self, message="", packet=None): self.packet = packet if self.packet is not None: message = "{} (Packet: {})".format(message, packet) - super(SCPError, self).__init__(message) + super(SCPError, self).__init__(message.lstrip()) class TimeoutError(SCPError): From c466cd6556a1b5c2cc555e1a4113f85d989b8403 Mon Sep 17 00:00:00 2001 From: Jonathan Heathcote Date: Fri, 17 Jul 2015 11:52:49 +0100 Subject: [PATCH 4/8] Make rig-ps more robust. If rig-ps encounters a core which return SCP errors it now prints the error and contiinues rather than falling over immediately. --- rig/scripts/rig_ps.py | 32 ++++++++++++++++++++------------ tests/scripts/test_rig_ps.py | 27 +++++++++++++++++++++++++-- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/rig/scripts/rig_ps.py b/rig/scripts/rig_ps.py index f2b89ee..897df8d 100644 --- a/rig/scripts/rig_ps.py +++ b/rig/scripts/rig_ps.py @@ -13,7 +13,7 @@ from rig.machine_control import MachineController -from rig.machine_control.scp_connection import TimeoutError +from rig.machine_control.scp_connection import SCPError, TimeoutError from rig.machine import Cores @@ -56,17 +56,25 @@ def get_process_list(mc, x_=None, y_=None, p_=None, if p_ is not None and p_ != p: continue - status = mc.get_processor_status(x=x, y=y, p=p) - keep = (match(str(status.app_id), app_ids) and - match(status.app_name, applications) and - match(status.cpu_state.name, states)) - - if keep: - yield (x, y, p, - status.cpu_state, - status.rt_code, - status.app_name, - status.app_id) + try: + status = mc.get_processor_status(x=x, y=y, p=p) + keep = (match(str(status.app_id), app_ids) and + match(status.app_name, applications) and + match(status.cpu_state.name, states)) + + if keep: + yield (x, y, p, + status.cpu_state, + status.rt_code, + status.app_name, + status.app_id) + except SCPError as e: + # If an error occurs while communicating with a chip, we bodge + # it into the "cpu_status" field and continue (note that it + # will never get filtered out). + class DeadStatus(object): + name = "{}: {}".format(e.__class__.__name__, str(e)) + yield (x, y, p, DeadStatus(), None, "", -1) def main(args=None): diff --git a/tests/scripts/test_rig_ps.py b/tests/scripts/test_rig_ps.py index 7315879..d772268 100644 --- a/tests/scripts/test_rig_ps.py +++ b/tests/scripts/test_rig_ps.py @@ -6,7 +6,8 @@ import rig.scripts.rig_ps as rig_ps -from rig.machine_control.scp_connection import TimeoutError +from rig.machine_control.scp_connection import \ + TimeoutError, FatalReturnCodeError from rig.machine import Machine, Cores @@ -82,6 +83,28 @@ def get_processor_status(x, y, p): ] +def test_get_process_list_dead_chip(): + mc = mock.Mock() + + machine = Machine(1, 1, chip_resources={Cores: 2}) + mc.get_machine.return_value = machine + + mc.get_processor_status.side_effect = FatalReturnCodeError(0x88) + + # Should list the failiure + ps = list(rig_ps.get_process_list(mc)) + assert len(ps) == 2 + for x, y, p, app_state, rte, name, app_id in ps: + assert x == 0 + assert y == 0 + assert 0 <= p < 2 + assert app_state.name == \ + "FatalReturnCodeError: RC_CPU: Bad CPU number." + assert bool(rte) is False + assert name == "" + assert app_id == -1 + + def test_bad_args(): # No hostname with pytest.raises(SystemExit): @@ -101,7 +124,7 @@ def test_bad_args(): def test_no_machine(monkeypatch): # Should fail if nothing responds mc = mock.Mock() - mc.get_software_version = mock.Mock(side_effect=TimeoutError) + mc.get_software_version = mock.Mock(side_effect=TimeoutError()) MC = mock.Mock() MC.return_value = mc From d276ea3acc153f1cedae7057f5bc3ff365d0ba9b Mon Sep 17 00:00:00 2001 From: Jonathan Heathcote Date: Sat, 18 Jul 2015 14:54:04 +0100 Subject: [PATCH 5/8] Add read_into methods throughout. --- rig/machine_control/machine_controller.py | 81 ++++++++++++++++--- rig/machine_control/scp_connection.py | 34 +++++++- .../test_machine_controller.py | 46 ++++++++--- tests/machine_control/test_scp_connection.py | 12 ++- 4 files changed, 147 insertions(+), 26 deletions(-) diff --git a/rig/machine_control/machine_controller.py b/rig/machine_control/machine_controller.py index 1196d82..1b4e817 100644 --- a/rig/machine_control/machine_controller.py +++ b/rig/machine_control/machine_controller.py @@ -341,6 +341,27 @@ def read(self, address, length_bytes, x, y, p=0): return connection.read(self.scp_data_length, self.scp_window_size, x, y, p, address, length_bytes) + @ContextMixin.use_contextual_arguments() + def read_into(self, address, buffer, length_bytes, x, y, p=0): + """Read a from an address in memory into the supplied buffer. + + Parameters + ---------- + address : int + The address at which to start reading the data. + buffer : bytearray + A bufferable object (e.g. bytearray) into which the data will be + read. + length_bytes : int + The number of bytes to read from memory. Large reads are + transparently broken into multiple SCP read commands. + """ + # Call the SCPConnection to perform the read on our behalf + connection = self._get_connection(x, y) + connection.read_into(buffer, self.scp_data_length, + self.scp_window_size, + x, y, p, address, length_bytes) + def _get_struct_field_and_address(self, struct_name, field_name): field = self.structs[six.b(struct_name)][six.b(field_name)] address = self.structs[six.b(struct_name)].base + field.offset @@ -1734,6 +1755,20 @@ def __exit__(self, exception_type, exception_value, traceback): """Exit a block and call :py:meth:`~.close`.""" self.close() + def _read_n_bytes(self, n_bytes): + """Return the number of bytes to actually read accounting for the + cursor position. + """ + # If n_bytes is negative then calculate it as the number of bytes left + if n_bytes < 0: + n_bytes = self._end_address - self.address + + # Determine how far to read, then read nothing beyond that point. + if self.address + n_bytes > self._end_address: + n_bytes = min(n_bytes, self._end_address - self.address) + + return n_bytes + @_if_not_closed def read(self, n_bytes=-1): """Read a number of bytes from the memory. @@ -1755,22 +1790,42 @@ def read(self, n_bytes=-1): # Flush this write buffer self.flush() - # If n_bytes is negative then calculate it as the number of bytes left - if n_bytes < 0: - n_bytes = self._end_address - self.address - - # Determine how far to read, then read nothing beyond that point. - if self.address + n_bytes > self._end_address: - n_bytes = min(n_bytes, self._end_address - self.address) - + n_bytes = self._read_n_bytes(n_bytes) if n_bytes <= 0: return b'' + else: + data = bytearray(n_bytes) + self.read_into(data, n_bytes) + return data - # Perform the read and increment the offset - data = self._machine_controller.read( - self.address, n_bytes, self._x, self._y, 0) - self._offset += n_bytes - return data + @_if_not_closed + def read_into(self, buffer, n_bytes=-1): + """Read a number of bytes from the memory into a supplied buffer. + + .. note:: + Reads beyond the specified memory range will be truncated. + + Parameters + ---------- + buffer : bytearray + A bufferable object (e.g. bytearray) into which the data will be + read. + n_bytes : int + A number of bytes to read. If the number of bytes is negative or + omitted then read all data until the end of memory region. + """ + # Flush this write buffer + self.flush() + + n_bytes = self._read_n_bytes(n_bytes) + + if n_bytes <= 0: + return + else: + # Perform the read and increment the offset + self._machine_controller.read_into( + self.address, buffer, n_bytes, self._x, self._y, 0) + self._offset += n_bytes @_if_not_closed def write(self, bytes): diff --git a/rig/machine_control/scp_connection.py b/rig/machine_control/scp_connection.py index 13054fc..8c3f3dd 100644 --- a/rig/machine_control/scp_connection.py +++ b/rig/machine_control/scp_connection.py @@ -328,6 +328,39 @@ def read(self, buffer_size, window_size, x, y, p, address, length_bytes): """ # Prepare the buffer to receive the incoming data data = bytearray(length_bytes) + self.read_into(data, buffer_size, window_size, x, y, p, address, + length_bytes) + return bytes(data) + + def read_into(self, data, buffer_size, window_size, x, y, p, address, + length_bytes): + """Read a bytestring from an address in memory into a supplied buffer. + + ..note:: + This method is included here to maintain API compatibility with an + `alternative implementation of SCP + `_. + + Parameters + ---------- + data : bytearray + An object into which supports the buffer protocol (e.g. bytearray) + into which the data will be read. + buffer_size : int + Number of bytes held in an SCP buffer by SARK, determines how many + bytes will be expected in a socket and how many bytes of data will + be read back in each packet. + window_size : int + x : int + y : int + p : int + address : int + The address at which to start reading the data. + length_bytes : int + The number of bytes to read from memory. Large reads are + transparently broken into multiple SCP read commands. + """ + # Prepare the buffer to receive the incoming data mem = memoryview(data) # Create a callback which will write the data from a packet into a @@ -361,7 +394,6 @@ def packets(length_bytes, data): # Run the event loop and then return the retrieved data self.send_scp_burst(buffer_size, window_size, packets(length_bytes, data)) - return bytes(data) def write(self, buffer_size, window_size, x, y, p, address, data): """Write a bytestring to an address in memory. diff --git a/tests/machine_control/test_machine_controller.py b/tests/machine_control/test_machine_controller.py index d91f74f..33a4c8a 100644 --- a/tests/machine_control/test_machine_controller.py +++ b/tests/machine_control/test_machine_controller.py @@ -536,6 +536,34 @@ def test_read(self, buffer_size, window_size, x, y, p, buffer_size, window_size, x, y, p, start_address, length ) + @pytest.mark.parametrize( + "buffer_size, window_size, x, y, p, start_address, length, data", + [(128, 1, 0, 1, 2, 0x67800000, 100, b"\x00" * 100), + (256, 5, 1, 4, 5, 0x67801000, 2, b"\x10\x23"), + ] + ) + def test_read_into(self, buffer_size, window_size, x, y, p, + start_address, length, data): + # Create the mock controller + cn = MachineController("localhost") + cn._scp_data_length = buffer_size + cn._window_size = window_size + cn.connections[0] = mock.Mock(spec_set=SCPConnection) + + def mock_read_into(buffer, *args, **kwargs): + buffer[:] = data + cn.connections[0].read_into.side_effect = mock_read_into + + # Perform the read and ensure that values are passed on as appropriate + with cn(x=x, y=y, p=p): + read_data = bytearray(length) + cn.read_into(start_address, read_data, length) + assert data == read_data + + assert len(cn.connections[0].read_into.mock_calls) == 1 + assert cn.connections[0].read_into.mock_calls[0][1][1:] == \ + (buffer_size, window_size, x, y, p, start_address, length) + @pytest.mark.parametrize( "iptag, addr, port", [(1, "localhost", 54321), @@ -1865,15 +1893,17 @@ def test_read(self, mock_controller, x, y, start_address, lengths): calls = [] offset = 0 for n_bytes in lengths: - sdram_file.read(n_bytes) + buf = bytearray(n_bytes) + sdram_file.read_into(buf, n_bytes) assert sdram_file.tell() == offset + n_bytes assert sdram_file.address == start_address + offset + n_bytes - calls.append(mock.call(start_address + offset, n_bytes, x, y, 0)) + calls.append(mock.call(start_address + offset, + buf, n_bytes, x, y, 0)) offset = offset + n_bytes # Check the reads caused the appropriate calls to the machine # controller. - mock_controller.read.assert_has_calls(calls) + mock_controller.read_into.assert_has_calls(calls) @pytest.mark.parametrize("x, y", [(1, 3), (3, 0)]) @pytest.mark.parametrize("start_address, length, offset", @@ -1885,18 +1915,15 @@ def test_read_no_parameter(self, mock_controller, x, y, start_address, # Assert that reading with no parameter reads the full number of bytes sdram_file.seek(offset) - sdram_file.read() - mock_controller.read.assert_called_one_with( - start_address + offset, length - offset, x, y, 0) + assert sdram_file._read_n_bytes(-1) == length - offset def test_read_beyond(self, mock_controller): sdram_file = MemoryIO(mock_controller, 0, 0, start_address=0, end_address=10) - sdram_file.read(100) - mock_controller.read.assert_called_with(0, 10, 0, 0, 0) + assert len(sdram_file.read(100)) == 10 assert sdram_file.read(1) == b'' - assert mock_controller.read.call_count == 1 + assert mock_controller.read_into.call_count == 1 @pytest.mark.parametrize("x, y", [(4, 2), (255, 1)]) @pytest.mark.parametrize("start_address", [0x60000004, 0x61000003]) @@ -2114,6 +2141,7 @@ def test_zero_length_filelike(self, mock_controller): "flush_event", [lambda filelike: filelike.flush(), lambda filelike: filelike.read(1), + lambda filelike: filelike.read_into(bytearray(1), 1), lambda filelike: filelike.close()] ) def test_coalescing_writes(self, get_node, flush_event): diff --git a/tests/machine_control/test_scp_connection.py b/tests/machine_control/test_scp_connection.py index 9c57452..8e004dc 100644 --- a/tests/machine_control/test_scp_connection.py +++ b/tests/machine_control/test_scp_connection.py @@ -438,6 +438,7 @@ def recv(self, *args, **kwargs): mock_conn.send_scp_burst(512, 8, packets) +@pytest.mark.parametrize("read_into", [True, False]) @pytest.mark.parametrize( "buffer_size, window_size, x, y, p", [(128, 1, 0, 0, 1), (256, 5, 1, 2, 3)] ) @@ -461,7 +462,7 @@ def recv(self, *args, **kwargs): (256, DataType.word, 0x60000004) ]) def test_read(buffer_size, window_size, x, y, p, n_bytes, - data_type, start_address): + data_type, start_address, read_into): mock_conn = SCPConnection("localhost") # Construct the expected calls, and hence the expected return packets @@ -502,8 +503,13 @@ def __call__(self, buffer_size, window_size, args): send_scp_burst.side_effect = ccs # Read an amount of memory specified by the size. - data = mock_conn.read(buffer_size, window_size, x, y, p, - start_address, n_bytes) + if read_into: + data = bytearray(n_bytes) + mock_conn.read_into(data, buffer_size, window_size, x, y, p, + start_address, n_bytes) + else: + data = mock_conn.read(buffer_size, window_size, x, y, p, + start_address, n_bytes) assert data == ccs.read_data # send_burst_scp should have been called once, each element in the iterator From 08ff6e1166c0bbe00e72ead6e1ab13dc445502fa Mon Sep 17 00:00:00 2001 From: Jonathan Heathcote Date: Sat, 18 Jul 2015 19:07:33 +0100 Subject: [PATCH 6/8] Discover and use multiple Ethernet connections This commit adds support for discovering and using multiple Ethernet connections (when available) but does not (yet) feature any way to use these connections in parallel to improve performance. A limited performance improvement due to reduced average latency within the machine may be plausible, however. This commit lifts board geometry functions from commit b666e80 (part of the now defunct non-blocking-io branch) along with the basic principles for probing the machine for Ethernet connections. --- rig/geometry.py | 83 ++++++++++++ rig/machine_control/machine_controller.py | 116 +++++++++++++++-- rig/machine_control/scp_connection.py | 5 +- .../test_machine_controller.py | 122 ++++++++++++++++-- tests/machine_control/test_scp_connection.py | 5 + tests/test_geometry.py | 70 +++++++++- 6 files changed, 380 insertions(+), 21 deletions(-) diff --git a/rig/geometry.py b/rig/geometry.py index 048029e..7b082a2 100644 --- a/rig/geometry.py +++ b/rig/geometry.py @@ -5,6 +5,8 @@ from math import sqrt +import numpy as np + def to_xyz(xy): """Convert a two-tuple (x, y) coordinate into an (x, y, 0) coordinate.""" @@ -214,3 +216,84 @@ def standard_system_dimensions(num_boards): # Convert the number of triads into numbers of chips (each triad of boards # contributes as 12x12 block of chips). return (w * 12, h * 12) + + +def spinn5_eth_coords(width, height): + """Generate a list of board coordinates with Ethernet connectivity in a + SpiNNaker machine. + + Specifically, generates the coordinates for the Ethernet connected chips of + SpiNN-5 boards arranged in a standard torus topology. + + Parameters + ---------- + width : int + Width of the system in chips. + height : int + Height of the system in chips. + """ + # Internally, work with the width and height rounded up to the next + # multiple of 12 + w = ((width + 11) // 12) * 12 + h = ((height + 11) // 12) * 12 + + for x in range(0, w, 12): + for y in range(0, h, 12): + for dx, dy in ((0, 0), (4, 8), (8, 4)): + nx = (x + dx) % w + ny = (y + dy) % h + # Skip points which are outside the range available + if nx < width and ny < height: + yield (nx, ny) + + +def spinn5_local_eth_coord(x, y, w, h): + """Get the coordinates of a chip's local ethernet connected chip. + + .. note:: + This function assumes the system is constructed from SpiNN-5 boards + returns the coordinates of the ethernet connected chip on the current + board. + + Parameters + ---------- + x : int + y : int + w : int + Width of the system in chips. + h : int + Height of the system in chips. + """ + dx, dy = SPINN5_ETH_OFFSET[y % 12][x % 12] + return ((x + dx) % w), ((y + dy) % h) + + +SPINN5_ETH_OFFSET = np.array([ + [(vx - x, vy - y) for x, (vx, vy) in enumerate(row)] + for y, row in enumerate([ + # Below is an enumeration of the absolute coordinates of the nearest + # ethernet connected chip. Note that the above list comprehension + # changes these into offsets to the nearest chip. + # X: 0 1 2 3 4 5 6 7 8 9 10 11 # noqa Y: + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 0 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 1 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 2 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, -4), (+4, -4), (+4, -4), (+4, -4)], # noqa 3 + [(-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 4 + [(-4, +4), (-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 5 + [(-4, +4), (-4, +4), (-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 6 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 7 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4), (+8, +4)], # noqa 8 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4)], # noqa 9 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4)], # noqa 10 + [(-4, +4), (-4, +4), (-4, +4), (-4, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)] # noqa 11 + ]) +], dtype=int) +"""SpiNN-5 ethernet connected chip lookup. + +Used by :py:func:`.spinn5_local_eth_coord`. Given an x and y chip position +modulo 12, return the offset of the board's bottom-left chip from the chip's +position. + +Note: the order of indexes: ``SPINN5_ETH_OFFSET[y][x]``! +""" diff --git a/rig/machine_control/machine_controller.py b/rig/machine_control/machine_controller.py index 1b4e817..716e48a 100644 --- a/rig/machine_control/machine_controller.py +++ b/rig/machine_control/machine_controller.py @@ -19,6 +19,8 @@ from rig import routing_table from rig.machine import Cores, SDRAM, SRAM, Links, Machine +from rig.geometry import spinn5_eth_coords, spinn5_local_eth_coord + from rig.utils.contexts import ContextMixin, Required from rig.utils.docstrings import add_signature_to_docstring @@ -109,10 +111,20 @@ def __init__(self, initial_host, scp_port=consts.SCP_PORT, "boot/sark.struct") self.structs = struct_file.read_struct_file(struct_data) - # Create the initial connection - self.connections = [ - SCPConnection(initial_host, scp_port, n_tries, timeout) - ] + # This dictionary contains a lookup from chip (x, y) to the + # SCPConnection associated with that chip. The special entry with the + # key None is reserved for the connection initially made to the + # machine and is special since it is always known to exist but its + # actual position in the network is unknown. + self.connections = { + None: SCPConnection(initial_host, scp_port, n_tries, timeout) + } + + # The dimensions of the system. This is set by discover_connections() + # and is used by _get_connection to determine which of the above + # connections to use. + self._width = None + self._height = None def __call__(self, **context_args): """For use with `with`: set default argument values. @@ -154,9 +166,9 @@ def send_scp(self, *args, **kwargs): This function is a thin wrapper around :py:meth:`rig.machine_control.scp_connection.SCPConnection.send_scp`. - Future versions of this command will automatically choose the most - appropriate connection to use for machines with more than one Ethernet - connection. + This function will attempt to use the SCP connection nearest the + destination of the SCP command if multiple connections have been + discovered using :py:meth:`.discover_connections`. Parameters ---------- @@ -175,7 +187,20 @@ def send_scp(self, *args, **kwargs): def _get_connection(self, x, y): """Get the appropriate connection for a chip.""" - return self.connections[0] + if self._width is None or self._height is None: + return self.connections[None] + else: + # If possible, use the local Ethernet connected chip + eth_chip = spinn5_local_eth_coord(x, y, self._width, self._height) + conn = self.connections.get(eth_chip) + if conn is not None: + return conn + else: + # If no connection was available to the local board, chose + # another arbitrarily. + # XXX: This choice will cause lots of contention in systems + # with many missing Ethernet connections. + return self.connections[None] def _send_scp(self, x, y, p, *args, **kwargs): """Determine the best connection to use to send an SCP packet and use @@ -251,6 +276,63 @@ def boot(self, width, height, **boot_kwargs): **boot_kwargs) assert len(self.structs) > 0 + @ContextMixin.use_contextual_arguments() + def discover_connections(self, x=0, y=0): + """Attempt to discover all available Ethernet connections to a machine. + + After calling this method, :py:class:`.MachineController` will attempt + to communicate via the Ethernet connection on the same board as the + destination chip for all commands. + + If called multiple times, existing connections will be retained in + preference to new ones. + + .. note:: + The system must be booted for this command to succeed. + + .. note:: + Currently, only systems comprised of multiple Ethernet-connected + SpiNN-5 boards are supported. + + Parameters + ---------- + x : int + y : int + (Optional) The coordinates of the chip to initially use to query + the system for the set of live chips. + + Returns + ------- + int + The number of new connections established. + """ + working_chips = set( + (x, y) + for (x, y), route in iteritems(self.get_p2p_routing_table(x, y)) + if route != consts.P2PTableEntry.none) + self._width = max(x for x, y in working_chips) + 1 + self._height = max(y for x, y in working_chips) + 1 + + num_new_connections = 0 + + for x, y in spinn5_eth_coords(self._width, self._height): + if (x, y) in working_chips and (x, y) not in self.connections: + ip = self.get_ip_address(x, y) + if ip is not None: + # Create a connection to the IP + self.connections[(x, y)] = \ + SCPConnection(ip, self.scp_port, + self.n_tries, self.timeout) + # Attempt to use the connection (and remove it if it + # doesn't work) + try: + self.get_software_version(x, y) + num_new_connections += 1 + except SCPError: + self.connections.pop((x, y)).close() + + return num_new_connections + @ContextMixin.use_contextual_arguments() def application(self, app_id): """Update the context to use the given application ID and stop the @@ -294,6 +376,24 @@ def get_software_version(self, x, y, processor=0): return CoreInfo(p2p_address, pcpu, vcpu, version, buffer_size, sver.arg3, sver.data.decode("utf-8")) + @ContextMixin.use_contextual_arguments() + def get_ip_address(self, x, y): + """Get the IP address of a particular SpiNNaker chip's Ethernet link. + + Returns + ------- + str or None + The IPv4 address (as a string) of the chip's Ethernet link or None + if the chip does not have an Ethernet connection or the link is + currently down. + """ + if self.read_struct_field("sv", "eth_up", x=x, y=y): + ip = self.read_struct_field("sv", "ip_addr", x=x, y=y) + # Convert the IP address to the standard decimal string format + return ".".join(str((ip >> i) & 0xFF) for i in range(0, 32, 8)) + else: + return None + @ContextMixin.use_contextual_arguments() def write(self, address, data, x, y, p=0): """Write a bytestring to an address in memory. diff --git a/rig/machine_control/scp_connection.py b/rig/machine_control/scp_connection.py index 8c3f3dd..acc5204 100644 --- a/rig/machine_control/scp_connection.py +++ b/rig/machine_control/scp_connection.py @@ -72,7 +72,6 @@ def __init__(self, spinnaker_host, port=consts.SCP_PORT, # Create a socket to communicate with the SpiNNaker machine self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sock.settimeout(self.default_timeout) self.sock.connect((spinnaker_host, port)) # Store the number of tries that will be allowed @@ -443,6 +442,10 @@ def packets(address, data): # Run the event loop and then return the retrieved data self.send_scp_burst(buffer_size, window_size, packets(address, data)) + def close(self): + """Close the SCP connection.""" + self.sock.close() + def seqs(mask=0xffff): i = 0 diff --git a/tests/machine_control/test_machine_controller.py b/tests/machine_control/test_machine_controller.py index 33a4c8a..362daf2 100644 --- a/tests/machine_control/test_machine_controller.py +++ b/tests/machine_control/test_machine_controller.py @@ -18,7 +18,7 @@ ) from rig.machine_control.packets import SCPPacket from rig.machine_control.scp_connection import \ - SCPConnection, FatalReturnCodeError + SCPConnection, SCPError, FatalReturnCodeError from rig.machine_control import regions, consts, struct_file from rig.machine import Cores, SDRAM, SRAM, Links, Machine @@ -99,6 +99,17 @@ def test_get_software_version(self, controller, spinnaker_width, assert sver.version >= 1.3 assert sver.position == (x, y) + def test_get_ip_address(self, controller): + """Test getting the IP address.""" + # Chip 0, 0 should report an IP address (since it is what we're + # connected via, though note that we can't check the IP since we may be + # connected via a proxy). + assert isinstance(controller.get_ip_address(0, 0), str) + + # Chip 1, 1 should not report an IP address (since in no existing + # hardware does it have an Ethernet connection).. + assert controller.get_ip_address(1, 1) is None + def test_write_and_read(self, controller): """Test write and read capabilities by writing a string to SDRAM and then reading back in a different order. @@ -138,7 +149,7 @@ def test_write_and_read_struct_values(self, controller): def test_set_get_clear_iptag(self, controller): # Get our address, then add a new IPTag pointing # **YUCK** - ip_addr = controller.connections[0].sock.getsockname()[0] + ip_addr = controller.connections[None].sock.getsockname()[0] port = 1234 iptag = 7 @@ -480,6 +491,95 @@ def test_get_software_version(self, mock_conn): # noqa assert sver.build_date == 888999 assert sver.version_string == "Hello, World!" + @pytest.mark.parametrize("has_ip", [True, False]) + def test_get_ip_address(self, has_ip): + cn = MachineController("localhost") + cn.read_struct_field = mock.Mock(side_effect=[has_ip, 0x11223344]) + + ip = cn.get_ip_address(1, 2) + + if has_ip: + assert ip == "68.51.34.17" + cn.read_struct_field.assert_has_calls([ + mock.call("sv", "eth_up", x=1, y=2), + mock.call("sv", "ip_addr", x=1, y=2), + ]) + else: + assert ip is None + cn.read_struct_field.assert_called_once_with("sv", "eth_up", + x=1, y=2) + + def test__get_connection(self): + cn = MachineController("localhost") + cn.connections = { + None: "default", + (0, 0): "0,0", + (4, 8): "4,8", + # 8, 4 is missing! + } + + # Until _width and _height are set, the default should be used at all + # times. + assert cn._get_connection(0, 0) == "default" + assert cn._get_connection(1, 0) == "default" + assert cn._get_connection(0, 1) == "default" + assert cn._get_connection(11, 0) == "default" + assert cn._get_connection(0, 11) == "default" + + # With width and height specified, the local connector should be used + # in all cases when possible + cn._width = 12 + cn._height = 12 + + assert cn._get_connection(0, 0) == "0,0" + assert cn._get_connection(1, 0) == "0,0" + assert cn._get_connection(0, 1) == "0,0" + + assert cn._get_connection(4, 8) == "4,8" + assert cn._get_connection(5, 8) == "4,8" + assert cn._get_connection(4, 9) == "4,8" + + # When a missing a connection, another connection should be used + assert cn._get_connection(8, 4) in ("default", "0,0", "4,8") + assert cn._get_connection(9, 4) in ("default", "0,0", "4,8") + assert cn._get_connection(8, 5) in ("default", "0,0", "4,8") + + def test_discover_connections(self): + # In this test, the discovered system is a 6-board system with the + # board with a dead chip on (16, 8), the Ethernet link at (4, 8) being + # down, the connection to (8, 4) resulting in timeouts and the + # connection to (20, 4) already present. + cn = MachineController("localhost") + w, h = 24, 12 + cn.get_p2p_routing_table = mock.Mock(return_value={ + (x, y): (consts.P2PTableEntry.north + if (x, y) != (16, 8) else + consts.P2PTableEntry.none) + for x in range(w) + for y in range(h) + }) + + def get_ip_address(x, y): + if (x, y) == (4, 8): + return None + else: + return "127.0.0.1" + cn.get_ip_address = mock.Mock(side_effect=get_ip_address) + + def get_software_version(x, y): + if (x, y) == (8, 4): + raise SCPError("Fail.") + cn.get_software_version = mock.Mock(side_effect=get_software_version) + + cn.connections[(20, 4)] = mock.Mock() + + assert cn.discover_connections() == 2 + assert cn._width == w + assert cn._height == h + assert set(cn.connections) == set([None, (0, 0), (12, 0), (20, 4)]) + assert isinstance(cn.connections[(0, 0)], SCPConnection) + assert isinstance(cn.connections[(12, 0)], SCPConnection) + @pytest.mark.parametrize("size", [128, 256]) def test_scp_data_length(self, size): cn = MachineController("localhost") @@ -503,13 +603,13 @@ def test_write(self, buffer_size, window_size, x, y, p, cn = MachineController("localhost") cn._scp_data_length = buffer_size cn._window_size = window_size - cn.connections[0] = mock.Mock(spec_set=SCPConnection) + cn.connections[None] = mock.Mock(spec_set=SCPConnection) # Perform the read and ensure that values are passed on as appropriate with cn(x=x, y=y, p=p): cn.write(start_address, data) - cn.connections[0].write.assert_called_once_with( + cn.connections[None].write.assert_called_once_with( buffer_size, window_size, x, y, p, start_address, data ) @@ -525,14 +625,14 @@ def test_read(self, buffer_size, window_size, x, y, p, cn = MachineController("localhost") cn._scp_data_length = buffer_size cn._window_size = window_size - cn.connections[0] = mock.Mock(spec_set=SCPConnection) - cn.connections[0].read.return_value = data + cn.connections[None] = mock.Mock(spec_set=SCPConnection) + cn.connections[None].read.return_value = data # Perform the read and ensure that values are passed on as appropriate with cn(x=x, y=y, p=p): assert data == cn.read(start_address, length) - cn.connections[0].read.assert_called_once_with( + cn.connections[None].read.assert_called_once_with( buffer_size, window_size, x, y, p, start_address, length ) @@ -548,11 +648,11 @@ def test_read_into(self, buffer_size, window_size, x, y, p, cn = MachineController("localhost") cn._scp_data_length = buffer_size cn._window_size = window_size - cn.connections[0] = mock.Mock(spec_set=SCPConnection) + cn.connections[None] = mock.Mock(spec_set=SCPConnection) def mock_read_into(buffer, *args, **kwargs): buffer[:] = data - cn.connections[0].read_into.side_effect = mock_read_into + cn.connections[None].read_into.side_effect = mock_read_into # Perform the read and ensure that values are passed on as appropriate with cn(x=x, y=y, p=p): @@ -560,8 +660,8 @@ def mock_read_into(buffer, *args, **kwargs): cn.read_into(start_address, read_data, length) assert data == read_data - assert len(cn.connections[0].read_into.mock_calls) == 1 - assert cn.connections[0].read_into.mock_calls[0][1][1:] == \ + assert len(cn.connections[None].read_into.mock_calls) == 1 + assert cn.connections[None].read_into.mock_calls[0][1][1:] == \ (buffer_size, window_size, x, y, p, start_address, length) @pytest.mark.parametrize( diff --git a/tests/machine_control/test_scp_connection.py b/tests/machine_control/test_scp_connection.py index 8e004dc..d3f36a8 100644 --- a/tests/machine_control/test_scp_connection.py +++ b/tests/machine_control/test_scp_connection.py @@ -43,6 +43,11 @@ def mock_conn(): return conn +def test_close(mock_conn): + mock_conn.close() + assert mock_conn.sock.close.called + + def test_scpcall(): """scpcall is a utility for specifying SCP packets and callbacks""" call = scpcall(0, 1, 2, 3) diff --git a/tests/test_geometry.py b/tests/test_geometry.py index 37622eb..46f7293 100644 --- a/tests/test_geometry.py +++ b/tests/test_geometry.py @@ -3,7 +3,7 @@ from rig.geometry import concentric_hexagons, to_xyz, minimise_xyz, \ shortest_mesh_path_length, shortest_mesh_path, \ shortest_torus_path_length, shortest_torus_path, \ - standard_system_dimensions + standard_system_dimensions, spinn5_eth_coords, spinn5_local_eth_coord def test_concentric_hexagons(): @@ -337,3 +337,71 @@ def test_standard_system_dimensions(): assert standard_system_dimensions(3 * 1 * 3) == (36, 12) assert standard_system_dimensions(3 * 2 * 4) == (48, 24) assert standard_system_dimensions(3 * 1 * 17) == (204, 12) + + +def test_spinn5_eth_coords(): + # Minimal system + assert set(spinn5_eth_coords(12, 12)) == set([(0, 0), (4, 8), (8, 4)]) + + # Larger, non-square systems + assert set(spinn5_eth_coords(24, 12)) == set([ + (0, 0), (4, 8), (8, 4), (12, 0), (16, 8), (20, 4)]) + assert set(spinn5_eth_coords(12, 24)) == set([ + (0, 0), (4, 8), (8, 4), (0, 12), (4, 20), (8, 16)]) + + # Larger square system + assert set(spinn5_eth_coords(24, 24)) == set([ + (0, 0), (4, 8), (8, 4), + (12, 0), (16, 8), (20, 4), + (0, 12), (4, 20), (8, 16), + (12, 12), (16, 20), (20, 16) + ]) + + # Subsets for non multiples of 12 (i.e. non-spinn-5 based things) + assert set(spinn5_eth_coords(2, 2)) == set([(0, 0)]) + assert set(spinn5_eth_coords(8, 8)) == set([(0, 0)]) + + +def test_spinn5_local_eth_coord(): + # Points lie on actual eth chips + assert spinn5_local_eth_coord(0, 0, 12, 12) == (0, 0) + assert spinn5_local_eth_coord(4, 8, 12, 12) == (4, 8) + assert spinn5_local_eth_coord(8, 4, 12, 12) == (8, 4) + + assert spinn5_local_eth_coord(12, 0, 24, 12) == (12, 0) + assert spinn5_local_eth_coord(16, 8, 24, 12) == (16, 8) + assert spinn5_local_eth_coord(20, 4, 24, 12) == (20, 4) + + assert spinn5_local_eth_coord(0, 12, 12, 24) == (0, 12) + assert spinn5_local_eth_coord(8, 16, 12, 24) == (8, 16) + assert spinn5_local_eth_coord(4, 20, 12, 24) == (4, 20) + + assert spinn5_local_eth_coord(12, 12, 24, 24) == (12, 12) + assert spinn5_local_eth_coord(16, 20, 24, 24) == (16, 20) + assert spinn5_local_eth_coord(20, 16, 24, 24) == (20, 16) + + # Exhaustive check for a 12x12 system + cases = [ + # X: 0 1 2 3 4 5 6 7 8 9 10 11 # noqa Y: + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)], # noqa 0 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)], # noqa 1 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)], # noqa 2 + [(+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+4, +8), (+4, +8), (+4, +8), (+4, +8)], # noqa 3 + [(+8, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 4 + [(+8, +4), (+8, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 5 + [(+8, +4), (+8, +4), (+8, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 6 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+0, +0), (+0, +0), (+0, +0), (+0, +0), (+8, +4), (+8, +4), (+8, +4), (+8, +4)], # noqa 7 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4), (+8, +4)], # noqa 8 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4), (+8, +4)], # noqa 9 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+8, +4)], # noqa 10 + [(+8, +4), (+8, +4), (+8, +4), (+8, +4), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8), (+4, +8)] # noqa 11 + ] + for y, row in enumerate(cases): + for x, eth_coord in enumerate(row): + assert spinn5_local_eth_coord(x, y, 12, 12) == eth_coord + + # Still works for non multiples of 12 + assert spinn5_local_eth_coord(0, 0, 2, 2) == (0, 0) + assert spinn5_local_eth_coord(0, 1, 2, 2) == (0, 0) + assert spinn5_local_eth_coord(1, 0, 2, 2) == (0, 0) + assert spinn5_local_eth_coord(1, 1, 2, 2) == (0, 0) From 40e415eaba6313478f4ec03db9f0329fe7b03188 Mon Sep 17 00:00:00 2001 From: Jonathan Heathcote Date: Sat, 18 Jul 2015 19:33:54 +0100 Subject: [PATCH 7/8] Change read_into to readinto This now matches [Python's file API](https://docs.python.org/3/library/io.html#io.BufferedIOBase.readinto), hurah! --- rig/machine_control/machine_controller.py | 14 ++++++------ rig/machine_control/scp_connection.py | 8 +++---- .../test_machine_controller.py | 22 +++++++++---------- tests/machine_control/test_scp_connection.py | 10 ++++----- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/rig/machine_control/machine_controller.py b/rig/machine_control/machine_controller.py index 716e48a..da47e53 100644 --- a/rig/machine_control/machine_controller.py +++ b/rig/machine_control/machine_controller.py @@ -442,7 +442,7 @@ def read(self, address, length_bytes, x, y, p=0): x, y, p, address, length_bytes) @ContextMixin.use_contextual_arguments() - def read_into(self, address, buffer, length_bytes, x, y, p=0): + def readinto(self, address, buffer, length_bytes, x, y, p=0): """Read a from an address in memory into the supplied buffer. Parameters @@ -458,9 +458,9 @@ def read_into(self, address, buffer, length_bytes, x, y, p=0): """ # Call the SCPConnection to perform the read on our behalf connection = self._get_connection(x, y) - connection.read_into(buffer, self.scp_data_length, - self.scp_window_size, - x, y, p, address, length_bytes) + connection.readinto(buffer, self.scp_data_length, + self.scp_window_size, + x, y, p, address, length_bytes) def _get_struct_field_and_address(self, struct_name, field_name): field = self.structs[six.b(struct_name)][six.b(field_name)] @@ -1895,11 +1895,11 @@ def read(self, n_bytes=-1): return b'' else: data = bytearray(n_bytes) - self.read_into(data, n_bytes) + self.readinto(data, n_bytes) return data @_if_not_closed - def read_into(self, buffer, n_bytes=-1): + def readinto(self, buffer, n_bytes=-1): """Read a number of bytes from the memory into a supplied buffer. .. note:: @@ -1923,7 +1923,7 @@ def read_into(self, buffer, n_bytes=-1): return else: # Perform the read and increment the offset - self._machine_controller.read_into( + self._machine_controller.readinto( self.address, buffer, n_bytes, self._x, self._y, 0) self._offset += n_bytes diff --git a/rig/machine_control/scp_connection.py b/rig/machine_control/scp_connection.py index acc5204..a31a372 100644 --- a/rig/machine_control/scp_connection.py +++ b/rig/machine_control/scp_connection.py @@ -327,12 +327,12 @@ def read(self, buffer_size, window_size, x, y, p, address, length_bytes): """ # Prepare the buffer to receive the incoming data data = bytearray(length_bytes) - self.read_into(data, buffer_size, window_size, x, y, p, address, - length_bytes) + self.readinto(data, buffer_size, window_size, x, y, p, address, + length_bytes) return bytes(data) - def read_into(self, data, buffer_size, window_size, x, y, p, address, - length_bytes): + def readinto(self, data, buffer_size, window_size, x, y, p, address, + length_bytes): """Read a bytestring from an address in memory into a supplied buffer. ..note:: diff --git a/tests/machine_control/test_machine_controller.py b/tests/machine_control/test_machine_controller.py index 362daf2..431da48 100644 --- a/tests/machine_control/test_machine_controller.py +++ b/tests/machine_control/test_machine_controller.py @@ -642,26 +642,26 @@ def test_read(self, buffer_size, window_size, x, y, p, (256, 5, 1, 4, 5, 0x67801000, 2, b"\x10\x23"), ] ) - def test_read_into(self, buffer_size, window_size, x, y, p, - start_address, length, data): + def test_readinto(self, buffer_size, window_size, x, y, p, + start_address, length, data): # Create the mock controller cn = MachineController("localhost") cn._scp_data_length = buffer_size cn._window_size = window_size cn.connections[None] = mock.Mock(spec_set=SCPConnection) - def mock_read_into(buffer, *args, **kwargs): + def mock_readinto(buffer, *args, **kwargs): buffer[:] = data - cn.connections[None].read_into.side_effect = mock_read_into + cn.connections[None].readinto.side_effect = mock_readinto # Perform the read and ensure that values are passed on as appropriate with cn(x=x, y=y, p=p): read_data = bytearray(length) - cn.read_into(start_address, read_data, length) + cn.readinto(start_address, read_data, length) assert data == read_data - assert len(cn.connections[None].read_into.mock_calls) == 1 - assert cn.connections[None].read_into.mock_calls[0][1][1:] == \ + assert len(cn.connections[None].readinto.mock_calls) == 1 + assert cn.connections[None].readinto.mock_calls[0][1][1:] == \ (buffer_size, window_size, x, y, p, start_address, length) @pytest.mark.parametrize( @@ -1994,7 +1994,7 @@ def test_read(self, mock_controller, x, y, start_address, lengths): offset = 0 for n_bytes in lengths: buf = bytearray(n_bytes) - sdram_file.read_into(buf, n_bytes) + sdram_file.readinto(buf, n_bytes) assert sdram_file.tell() == offset + n_bytes assert sdram_file.address == start_address + offset + n_bytes calls.append(mock.call(start_address + offset, @@ -2003,7 +2003,7 @@ def test_read(self, mock_controller, x, y, start_address, lengths): # Check the reads caused the appropriate calls to the machine # controller. - mock_controller.read_into.assert_has_calls(calls) + mock_controller.readinto.assert_has_calls(calls) @pytest.mark.parametrize("x, y", [(1, 3), (3, 0)]) @pytest.mark.parametrize("start_address, length, offset", @@ -2023,7 +2023,7 @@ def test_read_beyond(self, mock_controller): assert len(sdram_file.read(100)) == 10 assert sdram_file.read(1) == b'' - assert mock_controller.read_into.call_count == 1 + assert mock_controller.readinto.call_count == 1 @pytest.mark.parametrize("x, y", [(4, 2), (255, 1)]) @pytest.mark.parametrize("start_address", [0x60000004, 0x61000003]) @@ -2241,7 +2241,7 @@ def test_zero_length_filelike(self, mock_controller): "flush_event", [lambda filelike: filelike.flush(), lambda filelike: filelike.read(1), - lambda filelike: filelike.read_into(bytearray(1), 1), + lambda filelike: filelike.readinto(bytearray(1), 1), lambda filelike: filelike.close()] ) def test_coalescing_writes(self, get_node, flush_event): diff --git a/tests/machine_control/test_scp_connection.py b/tests/machine_control/test_scp_connection.py index d3f36a8..b92a2e9 100644 --- a/tests/machine_control/test_scp_connection.py +++ b/tests/machine_control/test_scp_connection.py @@ -443,7 +443,7 @@ def recv(self, *args, **kwargs): mock_conn.send_scp_burst(512, 8, packets) -@pytest.mark.parametrize("read_into", [True, False]) +@pytest.mark.parametrize("readinto", [True, False]) @pytest.mark.parametrize( "buffer_size, window_size, x, y, p", [(128, 1, 0, 0, 1), (256, 5, 1, 2, 3)] ) @@ -467,7 +467,7 @@ def recv(self, *args, **kwargs): (256, DataType.word, 0x60000004) ]) def test_read(buffer_size, window_size, x, y, p, n_bytes, - data_type, start_address, read_into): + data_type, start_address, readinto): mock_conn = SCPConnection("localhost") # Construct the expected calls, and hence the expected return packets @@ -508,10 +508,10 @@ def __call__(self, buffer_size, window_size, args): send_scp_burst.side_effect = ccs # Read an amount of memory specified by the size. - if read_into: + if readinto: data = bytearray(n_bytes) - mock_conn.read_into(data, buffer_size, window_size, x, y, p, - start_address, n_bytes) + mock_conn.readinto(data, buffer_size, window_size, x, y, p, + start_address, n_bytes) else: data = mock_conn.read(buffer_size, window_size, x, y, p, start_address, n_bytes) From fd4f61d21ab77b638e926a7f85456b9fa6cb888a Mon Sep 17 00:00:00 2001 From: Jonathan Heathcote Date: Thu, 13 Aug 2015 13:35:01 +0100 Subject: [PATCH 8/8] Add untested parallel I/O implementation. --- rig/machine_control/machine_controller.py | 414 ++++++++++++++++++---- 1 file changed, 352 insertions(+), 62 deletions(-) diff --git a/rig/machine_control/machine_controller.py b/rig/machine_control/machine_controller.py index da47e53..c554751 100644 --- a/rig/machine_control/machine_controller.py +++ b/rig/machine_control/machine_controller.py @@ -1,5 +1,6 @@ """A high level interface for controlling a SpiNNaker system.""" +import sys import collections import functools import os @@ -9,6 +10,9 @@ import struct import time import pkg_resources +import traceback + +from threading import Thread, Lock, Event from .consts import SCPCommands, NNCommands, NNConstants, AppFlags, LEDAction from . import boot, consts, regions, struct_file @@ -125,6 +129,15 @@ def __init__(self, initial_host, scp_port=consts.SCP_PORT, # connections to use. self._width = None self._height = None + + # Postponed operation queues for reads/writes issued with the postponed + # argument set. These are listed per-connection and are flushed by the + # flush_postponed_io method. Each queue contains a list of + # zero-argument function which carries out the required I/O operation. + # + # {connection: deque([f, ...]), ...} + self._postponed = collections.defaultdict(collections.deque) + self._postponed_lock = Lock() def __call__(self, **context_args): """For use with `with`: set default argument values. @@ -395,7 +408,7 @@ def get_ip_address(self, x, y): return None @ContextMixin.use_contextual_arguments() - def write(self, address, data, x, y, p=0): + def write(self, address, data, x, y, p=0, postpone=False): """Write a bytestring to an address in memory. It is strongly encouraged to only read and write to blocks of memory @@ -410,17 +423,37 @@ def write(self, address, data, x, y, p=0): The address at which to start writing the data. Addresses are given within the address space of a SpiNNaker core. See the SpiNNaker datasheet for more information. - data : :py:class:`bytes` + data : :py:class:`bytes` or callable Data to write into memory. Writes are automatically broken into a sequence of SCP write commands. + + If a callable is given, it will be called when the write is about + to be carried out and must return a bytearray (or similar) to be + written. + postpone : bool + If False (the default), writes are performed straight away and this + function returns once the write completes. + + If True, the write will be queued and carried out in parallel when + :py:meth:`.flush_postponed_io` is called. The data object must + remain valid until this function returns. If a callable is passed + as the data argument, the callable may be called from another + thread and with in specific order with respect to other calls to + this function. """ # Call the SCPConnection to perform the write on our behalf connection = self._get_connection(x, y) - return connection.write(self.scp_data_length, self.scp_window_size, - x, y, p, address, data) + f = (lambda: connection.write(self.scp_data_length, self.scp_window_size, + x, y, p, address, + data() if callable(data) else data)) + + if postpone: + self._postponed[connection].append(f) + else: + f() @ContextMixin.use_contextual_arguments() - def read(self, address, length_bytes, x, y, p=0): + def read(self, address, length_bytes, x, y, p=0, on_read=None, postpone=False): """Read a bytestring from an address in memory. Parameters @@ -430,37 +463,100 @@ def read(self, address, length_bytes, x, y, p=0): length_bytes : int The number of bytes to read from memory. Large reads are transparently broken into multiple SCP read commands. + on_read : callable + If supplied, this is called with the read data as an argument when + the read completes. Otherwise, the read data is returned directly. + postpone : bool + If False (the default), the read will occur immediately and the + read value returned or used as an argument to on_read. + + If True, the read will be performed when + :py:meth:`.flush_postponed_io` is called and an uninitialised + bytearray will be returned. When the read actually occurs, this + bytearray will be populated and the on_read method called with + a reference to the bytearray. Note that on_read may be called from + another thread and the order of calls to on_read is not guaranteed. Returns ------- - :py:class:`bytes` - The data is read back from memory as a bytestring. + :py:class:`bytearray` or None + If on_read is not given, the data read back from memory is returned + if postpone is False. If postpone is True, an uninitialised + bytearray will be returned. The bytearray will be populated when + :py:meth:`.flush_postponed_io` is called. + + If on_read is given, this method returns None. """ # Call the SCPConnection to perform the read on our behalf connection = self._get_connection(x, y) - return connection.read(self.scp_data_length, self.scp_window_size, - x, y, p, address, length_bytes) + if on_read is not None: + data = None + f = (lambda: on_read(connection.read(self.scp_data_length, + self.scp_window_size, + x, y, p, + address, length_bytes))) + else: + data = bytearray(length_bytes) + f = (lambda: connection.readinto(data, self.scp_data_length, + self.scp_window_size, + x, y, p, + address, length_bytes)) + + if postpone: + with self._postponed_lock: + self._postponed[connection].append(f) + else: + f() + + return data @ContextMixin.use_contextual_arguments() - def readinto(self, address, buffer, length_bytes, x, y, p=0): + def readinto(self, address, buffer, length_bytes, x, y, p=0, on_read=None, postpone=False): """Read a from an address in memory into the supplied buffer. Parameters ---------- address : int The address at which to start reading the data. - buffer : bytearray - A bufferable object (e.g. bytearray) into which the data will be - read. + buffer : bytearray or callable + A bufferable object (e.g. bytearray) of at least length_bytes in + size into which the data will be read. + + If a callable is supplied, this will be called with no arguments + just before the read is carried out and the function must return a + bufferable object into which the result will be written. length_bytes : int The number of bytes to read from memory. Large reads are transparently broken into multiple SCP read commands. + on_read : callable + If supplied, this is called with supplied buffer as an argument + when the read completes. + postpone : bool + If False (the default), the read will occur immediately and the + read value will be placed into the supplied buffer. + + If True, the read will be performed when + :py:meth:`.flush_postponed_io` is called. Note that on_read and + buffer may be called from another thread and the order of other + calls to buffer and on_read is not guaranteed. """ # Call the SCPConnection to perform the read on our behalf connection = self._get_connection(x, y) - connection.readinto(buffer, self.scp_data_length, - self.scp_window_size, - x, y, p, address, length_bytes) + + def f(): + buf = buffer() if callable(buffer) else buffer + connection.readinto(buf, + self.scp_data_length, self.scp_window_size, + x, y, p, + address, length_bytes) + if on_read is not None: + on_read(buf) + + if postpone: + with self._postponed_lock: + self._postponed[connection].append(f) + else: + f() def _get_struct_field_and_address(self, struct_name, field_name): field = self.structs[six.b(struct_name)][six.b(field_name)] @@ -503,7 +599,8 @@ def read_struct_field(self, struct_name, field_name, x, y, p=0): length = struct.calcsize(pack_chars) # Perform the read - data = self.read(address, length, x, y, p) + data = self.read(address, length, x, y, p, + on_read=None, postpone=False) # Unpack the data unpacked = struct.unpack(pack_chars, data) @@ -514,7 +611,8 @@ def read_struct_field(self, struct_name, field_name, x, y, p=0): return unpacked @ContextMixin.use_contextual_arguments() - def write_struct_field(self, struct_name, field_name, values, x, y, p=0): + def write_struct_field(self, struct_name, field_name, values, x, y, p=0, + postpone=False): """Write a value into a struct. This method is particularly useful for writing values into the ``sv`` @@ -529,6 +627,12 @@ def write_struct_field(self, struct_name, field_name, values, x, y, p=0): Name of the field to write, e.g., `"random"` values : Value(s) to be written into the field. + postpone : bool + If False (the default), writes are performed straight away and this + function returns once the write completes. + + If True, the write will be queued and carried out in parallel when + :py:meth:`.flush_postponed_io` is called. .. warning:: Fields which are arrays must currently be written in their @@ -545,7 +649,7 @@ def write_struct_field(self, struct_name, field_name, values, x, y, p=0): data = struct.pack(pack_chars, values) # Perform the write - self.write(address, data, x, y, p) + self.write(address, data, x, y, p, postpone=postpone) def _get_vcpu_field_and_address(self, field_name, x, y, p): """Get the field and address for a VCPU struct field.""" @@ -581,7 +685,7 @@ def read_vcpu_struct_field(self, field_name, x, y, p): # Perform the read length = struct.calcsize(pack_chars) - data = self.read(address, length, x, y) + data = self.read(address, length, x, y, on_read=None, postpone=False) # Unpack and return unpacked = struct.unpack(pack_chars, data) @@ -598,7 +702,8 @@ def read_vcpu_struct_field(self, field_name, x, y, p): return unpacked # pragma: no cover @ContextMixin.use_contextual_arguments() - def write_vcpu_struct_field(self, field_name, value, x, y, p): + def write_vcpu_struct_field(self, field_name, value, x, y, p, + postponse=True): """Write a value to the VCPU struct for a specific core. Parameters @@ -607,6 +712,12 @@ def write_vcpu_struct_field(self, field_name, value, x, y, p): Name of the field to write (e.g. `"user0"`) value : Value to write to this field. + postpone : bool + If False (the default), writes are performed straight away and this + function returns once the write completes. + + If True, the write will be queued and carried out in parallel when + :py:meth:`.flush_postponed_io` is called. """ field, address, pack_chars = \ self._get_vcpu_field_and_address(field_name, x, y, p) @@ -622,7 +733,7 @@ def write_vcpu_struct_field(self, field_name, value, x, y, p): data = struct.pack(pack_chars, *value) # pragma: no cover # Perform the write - self.write(address, data, x, y) + self.write(address, data, x, y, postpone=postpone) @ContextMixin.use_contextual_arguments() def get_processor_status(self, p, x, y): @@ -638,7 +749,8 @@ def get_processor_status(self, p, x, y): self.structs[b"vcpu"].size * p) # Get the VCPU data - data = self.read(address, self.structs[b"vcpu"].size, x, y) + data = self.read(address, self.structs[b"vcpu"].size, x, y, + on_read=None, postpone=False) # Build the kwargs that describe the current state state = { @@ -677,7 +789,8 @@ def get_iobuf(self, p, x, y): while address: # The IOBUF data is proceeded by a header which gives the next # address and also the length of the string in the current buffer. - iobuf_data = self.read(address, iobuf_size + 16, x, y) + iobuf_data = self.read(address, iobuf_size + 16, x, y, + on_read=False, postpone=False) address, time, ms, length = struct.unpack("<4I", iobuf_data[:16]) iobuf += iobuf_data[16:16 + length].decode("utf-8") @@ -693,7 +806,8 @@ def get_router_diagnostics(self, x, y): Description of the state of the counters. """ # Read the block of memory - data = self.read(0xe1000300, 64, x=x, y=y) + data = self.read(0xe1000300, 64, x=x, y=y, + on_read=None, postpone=False) # Convert to 16 ints, then process that as the appropriate tuple type return RouterDiagnostics(*struct.unpack("<16I", data)) @@ -817,7 +931,8 @@ def sdram_alloc(self, size, tag=0, x=Required, y=Required, @ContextMixin.use_contextual_arguments() def sdram_alloc_as_filelike(self, size, tag=0, x=Required, y=Required, - app_id=Required, buffer_size=0): + app_id=Required, buffer_size=0, + postpone=False): """Like :py:meth:`.sdram_alloc` but returns a file-like object which allows safe reading and writing to the block that is allocated. @@ -828,6 +943,12 @@ def sdram_alloc_as_filelike(self, size, tag=0, x=Required, y=Required, If this is set to anything but `0` (the default) then :py:meth:`~.MemoryIO.flush` should be called to ensure that all writes are completed. + postpone : bool + If False (the default), reads and (flushed) writes to the returned + object are carried out immediately. + + If True, any reads/(flushed) writes will be queued and carried out + in parallel when :py:meth:`.flush_postponed_io` is called. Returns ------- @@ -845,7 +966,7 @@ def sdram_alloc_as_filelike(self, size, tag=0, x=Required, y=Required, start_address = self.sdram_alloc(size, tag, x, y, app_id) return MemoryIO(self, x, y, start_address, start_address + size, - buffer_size=buffer_size) + buffer_size=buffer_size, postpone=postpone) def _get_next_nn_id(self): """Get the next nearest neighbour ID.""" @@ -1347,7 +1468,7 @@ def load_routing_table_entries(self, entries, x, y, app_id): struct.pack_into(consts.RTE_PACK_STRING, data, i*16, i, 0, route, entry.key, entry.mask) - self.write(buf, data, x, y) + self.write(buf, data, x, y, postpone=False) # Perform the load of the data into the router self._send_scp( @@ -1370,7 +1491,8 @@ def get_routing_table_entries(self, x, y): # Determine where to read from, perform the read rtr_addr = self.read_struct_field("sv", "rtr_copy", x, y) read_size = struct.calcsize(consts.RTE_PACK_STRING) - rtr_data = self.read(rtr_addr, consts.RTR_ENTRIES * read_size, x, y) + rtr_data = self.read(rtr_addr, consts.RTR_ENTRIES * read_size, x, y, + on_read=None, postpone=False) # Read each routing table entry in turn table = list() @@ -1417,7 +1539,8 @@ def get_p2p_routing_table(self, x, y): raw_table_col = self.read( consts.SPINNAKER_RTR_P2P + (((256 * col) // 8) * 4), col_words, - x, y + x, y, + on_read=None, postpone=False, ) row = 0 @@ -1451,6 +1574,7 @@ def get_num_working_cores(self, x, y): """Return the number of working cores, including the monitor.""" return self.read_struct_field("sv", "num_cpus", x, y) + @ContextMixin.use_contextual_arguments() def get_machine(self, x=0, y=0, default_num_cores=18): """Probe the machine to discover which cores and links are working. @@ -1541,6 +1665,83 @@ def get_machine(self, x=0, y=0, default_num_cores=18): chip_resources, chip_resource_exceptions, dead_chips, dead_links) + + @ContextMixin.use_contextual_arguments() + def flush_postponed_io(self, max_num_connections=24): + """Carry out all postponed I/O operations in parallel. + + Parameters + ---------- + max_num_connections : int + Gives the maximum number of simultaneous connections to use. + Setting this too high may result in this process becoming CPU bound + and thus not achieving high throughput. + """ + with self._postponed_lock: + num_threads = min(max_num_connections, len(self._postponed)) + + # A flag which is set if one of the threads encounter an error. + terminate_now = Event() + + # This list is populated with all exception objects raised in any of + # the worker threads + exceptions = [] + exceptions_lock = Lock() + + def queue_processor(): + """Attempts to process all postponed events for a particular + connection queue, deleting the queue when it empties.""" + while not terminate_now.is_set(): + # Get the next queue to be processed + try: + with self._postponed_lock: + connection, queue = self._postponed.popitem() + except KeyError: + # There are no more queues which need processing, terminate + # this thread. + return + + # Process that queue + while not terminate_now.is_set(): + try: + with self._postponed_lock: + f = queue.popleft() + except IndexError: + # The queue is empty, move on to the next one + break + + # Run the current queue entry handling failures sensibly + try: + f() + except Exception as e: + sys.stderr.write( + "Exception while processing a queued I/O " + "operation:\n") + traceback.print_exc() + terminate_now.set() + + with exceptions_lock: + exceptions.append(e) + + threads = [] + for _ in range(num_threads): + threads.append(Thread(target=queue_processor)) + + try: + for thread in threads: + thread.start() + + # Wait for the threads to complete + for thread in threads: + thread.join() + finally: + # If something goes wrong in the above, trigger the termination of + # all threads and attempt to wait for this + terminate_now.set() + for thread in threads: + thread.join() + + return exceptions class CoreInfo(collections.namedtuple( @@ -1740,7 +1941,7 @@ class MemoryIO(object): """ def __init__(self, machine_controller, x, y, start_address, end_address, - buffer_size=0, _write_buffer=None): + buffer_size=0, postpone=False, _write_buffer=None): """Create a file-like view onto a subset of the memory-space of a chip. Parameters @@ -1758,6 +1959,13 @@ def __init__(self, machine_controller, x, y, start_address, end_address, End address in memory. buffer_size : int Number of bytes to store in the write buffer. + postpone : bool + If False (the default), reads and (flushed) writes are performed + immediately and their result returned to the caller. + + If True, reads and (flushed) writes will be placed in a queue and + executed in parallel when + :py:meth:`.MachineController.flush_postponed_io` is called. _write_buffer : :py:class:`._WriteBufferChild` Internal use only, the write buffer to use to combine writes. @@ -1769,11 +1977,12 @@ def __init__(self, machine_controller, x, y, start_address, end_address, self._x = x self._y = y self._machine_controller = machine_controller + self._postpone = postpone # Get, or create, a write buffer if _write_buffer is None: _write_buffer = _WriteBuffer(x, y, 0, machine_controller, - buffer_size) + buffer_size, self._postpone) self._write_buffer = _write_buffer # Store and clip the addresses @@ -1838,6 +2047,7 @@ def __getitem__(self, sl): return type(self)( self._machine_controller, self._x, self._y, start_address, end_address, + self._postpone, _write_buffer=self._write_buffer ) else: @@ -1870,7 +2080,7 @@ def _read_n_bytes(self, n_bytes): return n_bytes @_if_not_closed - def read(self, n_bytes=-1): + def read(self, n_bytes=-1, on_read=None): """Read a number of bytes from the memory. .. note:: @@ -1881,25 +2091,45 @@ def read(self, n_bytes=-1): n_bytes : int A number of bytes to read. If the number of bytes is negative or omitted then read all data until the end of memory region. + on_read : callable + If supplied, this is called with the read data as an argument when + the read completes. Otherwise, the read data is returned directly. Returns ------- - :py:class:`bytes` - Data read from SpiNNaker as a bytestring. + :py:class:`bytes` or None + If on_read is not given and postpone is set to False for this + MemoryIO, the data read from SpiNNaker is returned. + + If on_read is not given and postpone is set to True for this + MemoryIO, an uninitialised bytearray is returned which will be + populated with the read data when + :py:meth:`.MachineController.flush_postponed_io` is called. + + If on_read is supplied, this function will return None and when the + read completes, the read data will be given as an argument to + on_read. Note that on_read may be called from another thread and + the order of calls to on_read is not guaranteed. """ # Flush this write buffer self.flush() n_bytes = self._read_n_bytes(n_bytes) if n_bytes <= 0: + if callable(on_read): + on_read(b'') return b'' else: - data = bytearray(n_bytes) - self.readinto(data, n_bytes) - return data + # Note: the offset is updated before the read (and the read address + # compensated) such that if the on_read callback interacts with + # this MemoryIO, the addresses are kept consistent. + self._offset += n_bytes + return self._machine_controller.read( + self.address - n_bytes, n_bytes, self._x, self._y, 0, + on_read=on_read, postpone=self._postpone) @_if_not_closed - def readinto(self, buffer, n_bytes=-1): + def readinto(self, buffer, n_bytes=-1, on_read=None): """Read a number of bytes from the memory into a supplied buffer. .. note:: @@ -1907,12 +2137,24 @@ def readinto(self, buffer, n_bytes=-1): Parameters ---------- - buffer : bytearray - A bufferable object (e.g. bytearray) into which the data will be - read. + buffer : bytearray or callable + A bufferable object (e.g. bytearray) of at least n_bytes in size + into which the data will be read. + + If a callable is supplied, this will be called with no arguments + just before the read is carried out and the function must return a + bufferable object into which the result will be written. + + If postpone is set to False for this MemoryIO, the read will be + completed before this method returns. If set to True, the read will + actually occur when + :py:meth:`.MachineController.flush_postponed_io` is called. n_bytes : int A number of bytes to read. If the number of bytes is negative or omitted then read all data until the end of memory region. + on_read : callable + If supplied, this is called with supplied buffer as an argument + when the read completes. """ # Flush this write buffer self.flush() @@ -1920,15 +2162,22 @@ def readinto(self, buffer, n_bytes=-1): n_bytes = self._read_n_bytes(n_bytes) if n_bytes <= 0: + if callable(on_read): + on_read(buffer() if callable(buffer) else buffer) + elif callable(buffer): + buffer() return else: - # Perform the read and increment the offset - self._machine_controller.readinto( - self.address, buffer, n_bytes, self._x, self._y, 0) + # Note: the offset is updated before the read (and the read address + # compensated) such that if the on_read callback interacts with + # this MemoryIO, the addresses are kept consistent. self._offset += n_bytes + self._machine_controller.readinto( + self.address - n_bytes, buffer, n_bytes, self._x, self._y, 0, + on_read=on_read, postpone=self._postpone) @_if_not_closed - def write(self, bytes): + def write(self, bytes, n_bytes=None): """Write data to the memory. .. warning:: @@ -1942,26 +2191,53 @@ def write(self, bytes): Parameters ---------- - bytes : :py:class:`bytes` - Data to write to the memory as a bytestring. + bytes : :py:class:`bytes` or callable + Data to write into memory. + + If a callable is given, it will be called when the write is about + to be carried out and must return a bytearray (or similar) to be + written. Note that at present write data supplied as a callable is + never buffered and writing a callable causes the write buffer to be + flushed. + + For non-callables, if the buffer size is non-zero, the data will be + buffered immediately. If the buffer size is zero, or the data + written is too large to fit in the buffer, the write will be passed + directly to :py:meth:`.MachineController.write`. This means that if + postpone is set to False for this MemoryIO, the write will be + complete before this method returns but if it is set to True, the + write will actually occur when + :py:meth:`.MachineController.flush_postponed_io` is called. + n_bytes : int + The number of bytes to write. This field is optional when bytes + supports the `len` operator but is mandatory when it does not. Returns ------- - int - Number of bytes written. + int or None + Number of bytes written or buffered. """ - if self.address + len(bytes) > self._end_address: - n_bytes = min(len(bytes), self._end_address - self.address) + n_bytes = n_bytes if n_bytes is not None else len(bytes) + + if self.address + n_bytes > self._end_address: + n_bytes = min(n_bytes, self._end_address - self.address) if n_bytes <= 0: + if callable(bytes): + bytes() return 0 - - bytes = bytes[:n_bytes] + + if callable(bytes): + bytes_ = (lambda: bytes()[:n_bytes]) + else: + bytes_ = bytes[:n_bytes] + else: + bytes_ = bytes # Perform the write and increment the offset - self._write_buffer.add_new_write(self.address, bytes) - self._offset += len(bytes) - return len(bytes) + self._offset += n_bytes + self._write_buffer.add_new_write(self.address - n_bytes, bytes_) + return n_bytes @_if_not_closed def flush(self): @@ -1969,6 +2245,11 @@ def flush(self): This must be called to ensure that all writes to SpiNNaker made using this file-like object (and its siblings, if any) are completed. + + If postpone is set to False for this MemoryIO, the writes will be + completed before this method returns. If set to True, the writes will + actually occur when :py:meth:`.MachineController.flush_postponed_io` is + called. """ self._write_buffer.flush() @@ -2028,11 +2309,12 @@ class _WriteBuffer(object): together. """ - def __init__(self, x, y, p, controller, buffer_size=0): + def __init__(self, x, y, p, controller, buffer_size=0, postpone=False): self.x = x self.y = y self.p = p self.controller = controller + self.postpone = postpone # A buffer of writes self.buffer = bytearray(buffer_size) @@ -2043,11 +2325,14 @@ def __init__(self, x, y, p, controller, buffer_size=0): def add_new_write(self, start_address, data): """Add a new write to the buffer.""" - if len(data) > self.buffer_size: - # Perform the write if we couldn't buffer it at all + if callable(data) or len(data) > self.buffer_size: + # Perform the write if we couldn't buffer it at all. Unbufferable + # writes are those too large to fit in the buffer and those + # provided via callback. self.flush() # Flush to ensure ordering is preserved self.controller.write(start_address, data, - self.x, self.y, self.p) + self.x, self.y, self.p, + postpone=self.postpone) return if self.start_address is None: @@ -2089,12 +2374,17 @@ def flush(self): # Write out all the values from the buffer self.controller.write( self.start_address, self.buffer[:self.current_end], - self.x, self.y, self.p + self.x, self.y, self.p, postpone=self.postpone ) # Reset the buffer self.start_address = None self.current_end = 0 + + # If postponed writes are in use, create a new buffer since the old + # one will be used at an undetermined point in the future. + if self.postpone: + self.buffer = bytearray(buffer_size) def unpack_routing_table_entry(packed):