Source code for distributed

"""
Distributed evaluation of genomes.

About compute nodes:
The primary node (=the node which creates and mutates genomes) and the secondary
nodes (=the nodes which evaluate genomes) can execute the same script. The
role of a compute node is determined using the ``mode`` argument of the
DistributedEvaluator. If the mode is MODE_AUTO, the `host_is_local()` function
is used to check if the ``addr`` argument points to the localhost. If it does,
the compute node starts as a primary node, otherwise as a secondary node. If
``mode`` is MODE_PRIMARY, the compute node always starts as a primary node. If
``mode`` is MODE_SECONDARY, the compute node will always start as a secondary node.

There can only be one primary node per NEAT, but any number of secondary nodes.
The primary node will not evaluate any genomes, which means you will always need
at least two compute nodes.

You can run any number of compute nodes on the same physical machine (or VM).
However, if a machine has both a primary node and one or more secondary nodes,
MODE_AUTO cannot be used for those secondary nodes - MODE_SECONDARY will need to be
specified.

NOTE:
    This module is in a **beta** state, and still *unstable* even in single-machine
    testing. Reliability is likely to vary, including depending on the Python version
    and implementation (e.g., cpython vs pypy) in use and the likelihoods of timeouts
    (due to machine and/or network slowness). In particular, while the code can try
    to reconnect between between primary and secondary nodes, as noted in the
    `multiprocessing` documentation this may not work due to data loss/corruption.
    Note also that this module is *not* responsible for starting the script copies
    on the different compute nodes, since this is very site/configuration-dependent.


Usage:
1. Import modules and define the evaluation logic (the eval_genome function).
  (After this, check for ``if __name__ == '__main__'``, and put the rest of
  the code inside the body of the statement.)
2. Load config and create a population - here, the variable ``p``.
3. If required, create and add reporters.
4. Create a ``DistributedEvaluator(addr_of_primary_node, b'some_password',
  eval_function, mode=MODE_AUTO)`` - here, the variable ``de``.
5. Call ``de.start(exit_on_stop=True)``. The `start()` call will block on the
  secondary nodes and call `sys.exit(0)` when the NEAT evolution finishes. This
  means that the following code will only be executed on the primary node.
6. Start the evaluation using ``p.run(de.evaluate, number_of_generations)``.
7. Stop the secondary nodes using ``de.stop()``.
8. You are done. You may want to save the winning genome or show some statistics.

See ``examples/xor/evolve-feedforward-distributed.py`` for a complete example.

Utility functions:

``host_is_local(hostname, port=22)`` returns True if ``hostname`` points to
the local node/host. This can be used to check if a compute node will run as
a primary node or as a secondary node with MODE_AUTO.

``chunked(data, chunksize)``: splits data into a list of chunks with at most
``chunksize`` elements.
"""
from __future__ import print_function

import logging
import socket
import sys
import time
import warnings

# below still needed for queue.Empty
try:
    # pylint: disable=import-error
    import Queue as queue
except ImportError:
    # pylint: disable=import-error
    import queue

import multiprocessing
from multiprocessing import managers
from argparse import Namespace

# Some of this code is based on
# http://eli.thegreenplace.net/2012/01/24/distributed-computing-in-python-with-multiprocessing
# According to the website, the code is in the public domain
# ('public domain' links to unlicense.org).
# This means that we can use the code from this website.
# Thanks to Eli Bendersky for making his code open for use.


# modes to determine the role of a compute node
# the primary handles the evolution of the genomes
# the secondary handles the evaluation of the genomes
MODE_AUTO = 0  # auto-determine mode
MODE_PRIMARY = MODE_MASTER = 1  # enforce primary mode
MODE_SECONDARY = MODE_SLAVE = 2  # enforce secondary mode

# what a return from _check_exception means
_EXCEPTION_TYPE_OK = 1 # queue empty and similar; try again
_EXCEPTION_TYPE_UNCERTAIN = 0 # disconnected but may be able to reconnect
_EXCEPTION_TYPE_BAD = -1 # raise it again or immediately return & exit with non-zero status code

[docs]class ModeError(RuntimeError): """ An exception raised when a mode-specific method is being called without being in the mode - either a primary-specific method called by a secondary node or a secondary-specific method called by a primary node. """ pass
[docs]def host_is_local(hostname, port=22): # no port specified, just use the ssh port """ Returns True if the hostname points to the localhost, otherwise False. """ hostname = socket.getfqdn(hostname) if hostname in ("localhost", "0.0.0.0", "127.0.0.1", "1.0.0.127.in-addr.arpa", "1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa"): return True localhost = socket.gethostname() if hostname == localhost: return True localaddrs = socket.getaddrinfo(localhost, port) targetaddrs = socket.getaddrinfo(hostname, port) for (ignored_family, ignored_socktype, ignored_proto, ignored_canonname, sockaddr) in localaddrs: for (ignored_rfamily, ignored_rsocktype, ignored_rproto, ignored_rcanonname, rsockaddr) in targetaddrs: if rsockaddr[0] == sockaddr[0]: return True return False
[docs]def _determine_mode(addr, mode): """ Returns the mode which should be used. If mode is MODE_AUTO, this is determined by checking if 'addr' points to the local host. If it does, return MODE_PRIMARY, else return MODE_SECONDARY. If mode is either MODE_PRIMARY or MODE_SECONDARY, return the 'mode' argument. Otherwise, a ValueError is raised. """ if isinstance(addr, tuple): host = addr[0] elif type(addr) == type(b"binary_string"): host = addr else: raise TypeError("'addr' needs to be a tuple or bytestring!") if mode == MODE_AUTO: if host_is_local(host): return MODE_PRIMARY return MODE_SECONDARY elif mode in (MODE_SECONDARY, MODE_PRIMARY): return mode else: raise ValueError("Invalid mode {!r}!".format(mode))
[docs]def chunked(data, chunksize): """ Returns a list of chunks containing at most ``chunksize`` elements of data. """ if chunksize < 1: raise ValueError("Chunksize must be at least 1!") if int(chunksize) != chunksize: raise ValueError("Chunksize needs to be an integer") res = [] cur = [] for e in data: cur.append(e) if len(cur) >= chunksize: res.append(cur) cur = [] if cur: res.append(cur) return res
[docs]def _check_exception(e): string = repr(e).lower() if ('timed' in string) or ('timeout' in string): return _EXCEPTION_TYPE_OK elif isinstance(e, (queue.Full, queue.Empty)): return _EXCEPTION_TYPE_OK elif isinstance(e, (EOFError, TypeError, socket.gaierror)): return _EXCEPTION_TYPE_UNCERTAIN elif (('eoferror' in string) or ('typeerror' in string) or ('gaierror' in string) or ('pipeerror' in string) or ('authenticationerror' in string) or ('refused' in string) or ('file descriptor' in string) or ('reset' in string)): return _EXCEPTION_TYPE_UNCERTAIN return _EXCEPTION_TYPE_BAD
class _DecrefLogHandler(logging.Handler): """ Class to catch 'decref failed' multiprocessing messages in secondaries and cause a shutdown """ def __init__(self, to_notify, level=logging.DEBUG): self.to_notify = to_notify logging.Handler.__init__(self, level=min(level,logging.DEBUG)) def filter(self, record): text = record.getMessage() if ('decref' in text.lower()) and ('failed' in text.lower()): return True return False def emit(self, record): text = record.getMessage() if ('decref' not in text.lower()) or ('failed' not in text.lower()): # pragma: no cover warnings.warn( "_DecrefLogHandler called with message {0!r} ({1!r})".format( text, record), RuntimeWarning) return self.to_notify.logger_notified = True if 'refused' in text.lower(): self.to_notify.logger_saw_refused = True elif 'reset' in text.lower(): self.to_notify.logger_saw_reset = True
[docs]class _ExtendedManager(object): """A class for managing the multiprocessing.managers.SyncManager""" __safe_for_unpickling__ = True # this may not be safe for unpickling, # but this is required by pickle. def __init__(self, addr, authkey, mode, start=False): self.addr = addr self.authkey = authkey self.mode = _determine_mode(addr, mode) self.manager = None if start: self.start()
[docs] def __reduce__(self): # pragma: no cover """ This method is used by pickle to serialize instances of this class. """ return ( self.__class__, (self.addr, self.authkey, self.mode, bool(self.manager is not None)), )
[docs] def start(self): """Starts or connects to the manager.""" if self.manager is None: if self.mode == MODE_PRIMARY: i = self._start() else: i = self._connect() self.manager = i
[docs] def stop(self): """Stops the manager.""" if (self.manager is not None) and (self.mode == MODE_PRIMARY): self.manager.shutdown() self.manager = None
@staticmethod def _get_manager_class(register_callables=False): """ Returns a new 'Manager' subclass with registered methods. If 'register_callable' is True, defines the 'callable' arguments. """ class _EvaluatorSyncManager(managers.BaseManager): """ A custom BaseManager. Please see the documentation of `multiprocessing` for more information. """ pass inqueue = queue.Queue() outqueue = queue.Queue() namespace = Namespace() # Does this need to be from multiprocessing.managers.SyncManager? if register_callables: _EvaluatorSyncManager.register( "get_inqueue", callable=lambda: inqueue, ) _EvaluatorSyncManager.register( "get_outqueue", callable=lambda: outqueue, ) _EvaluatorSyncManager.register( "get_namespace", callable=lambda: namespace, ) else: _EvaluatorSyncManager.register( "get_inqueue", ) _EvaluatorSyncManager.register( "get_outqueue", ) _EvaluatorSyncManager.register( "get_namespace", ) return _EvaluatorSyncManager def _connect(self): """Connects to the manager.""" cls = self._get_manager_class(register_callables=False) ins = cls(address=self.addr, authkey=self.authkey) ins.connect() return ins def _start(self): """Starts the manager.""" cls = self._get_manager_class(register_callables=True) ins = cls(address=self.addr, authkey=self.authkey) ins.start() return ins
[docs] def get_inqueue(self): """Returns the inqueue.""" if self.manager is None: raise RuntimeError("Manager not started") return self.manager.get_inqueue()
[docs] def get_outqueue(self): """Returns the outqueue.""" if self.manager is None: raise RuntimeError("Manager not started") return self.manager.get_outqueue()
[docs] def get_namespace(self): """Returns the namespace.""" if self.manager is None: raise RuntimeError("Manager not started") return self.manager.get_namespace()
[docs]class DistributedEvaluator(object): """An evaluator working across multiple machines""" def __init__( self, addr, authkey, eval_function, secondary_chunksize=1, num_workers=None, worker_timeout=60, mode=MODE_AUTO, ): """ ``addr`` should be a tuple of (hostname, port) pointing to the machine running the DistributedEvaluator in primary mode. If mode is MODE_AUTO, the mode is determined by checking whether the hostname points to this host or not. ``authkey`` is the password used to restrict access to the manager; see ``Authentication Keys`` in the `multiprocessing` manual for more information. All DistributedEvaluators need to use the same authkey. Note that this needs to be a `bytes` object for Python 3.X, and should be in 2.7 for compatibility (identical in 2.7 to a `str` object). ``eval_function`` should take two arguments (a genome object and the configuration) and return a single float (the genome's fitness). 'secondary_chunksize' specifies the number of genomes that will be sent to a secondary at any one time. ``num_workers`` is the number of child processes to use if in secondary mode. It defaults to None, which means `multiprocessing.cpu_count()` is used to determine this value. If 1 in a secondary node, the process creating the DistributedEvaluator instance will also do the evaulations. ``worker_timeout`` specifies the timeout (in seconds) for a secondary node getting the results from a worker subprocess; if None, there is no timeout. ``mode`` specifies the mode to run in; it defaults to MODE_AUTO. """ self.addr = addr self.authkey = authkey self.eval_function = eval_function self.secondary_chunksize = secondary_chunksize self.slave_chunksize = secondary_chunksize # backward compatibility if num_workers: self.num_workers = num_workers else: try: self.num_workers = max(1,multiprocessing.cpu_count()) except (RuntimeError, AttributeError): # pragma: no cover print("multiprocessing.cpu_count() gave an error; assuming 1", file=sys.stderr) self.num_workers = 1 self.worker_timeout = worker_timeout self.mode = _determine_mode(self.addr, mode) self.em = _ExtendedManager(self.addr, self.authkey, mode=self.mode, start=False) self.inqueue = None self.outqueue = None self.namespace = None self.started = False self.exit_string = None self.exit_on_stop = True self.reconnect = False self.reconnect_max_time = None self.n_tasks = None self.logger_notified = False self.logger_saw_refused = False self.logger_saw_reset = False def __getstate__(self): # pragma: no cover """Required by the pickle protocol.""" # we do not actually save any state, but we need __getstate__ to be # called. return True # return some nonzero value def __setstate__(self, state): # pragma: no cover """Called when instances of this class are unpickled.""" self._set_shared_instances()
[docs] def is_primary(self): """Returns True if the caller is the primary node""" return (self.mode == MODE_PRIMARY)
[docs] def is_master(self): # pragma: no cover """Returns True if the caller is the primary (master) node""" warnings.warn("Use is_primary, not is_master", DeprecationWarning) return self.is_primary()
def _do_exit(self): if self.exit_string is None: sys.exit(0) else: # pragma: no cover sys.exit(self.exit_string)
[docs] def start(self, exit_on_stop=True, secondary_wait=0, reconnect=False, reconnect_max_time=None): """ If the DistributedEvaluator is in primary mode, starts the manager process and returns. In this case, the ``exit_on_stop`` argument will be ignored. If the DistributedEvaluator is in secondary mode, it connects to the manager and waits for tasks. If in secondary mode and ``exit_on_stop`` is True, sys.exit() will be called when the connection is lost. ``secondary_wait`` specifies the time (in seconds) to sleep before actually starting when in secondary mode. If 'reconnect' is True, the secondary nodes will try to reconnect when the connection is lost. In this case, sys.exit() will only be called when 'exit_on_stop' is True and the primary node send a forced shutdown command. """ if self.started: raise RuntimeError("DistributedEvaluator already started!") self.started = True self.exit_on_stop = exit_on_stop self.reconnect = reconnect if reconnect_max_time is None: if reconnect: reconnect_max_time = 6*max(60,self.worker_timeout) else: reconnect_max_time = 2*max(60,self.worker_timeout) self.reconnect_max_time = max(0.5,reconnect_max_time) if self.mode == MODE_PRIMARY: self._start_primary() elif self.mode == MODE_SECONDARY: time.sleep(secondary_wait) while True: self._start_secondary() self._secondary_loop(reconnect_max_time=reconnect_max_time) if self.exit_on_stop: self._do_exit() else: self.inqueue = self.outqueue = self.namespace = None if self.reconnect: self.em.stop() else: break if exit_on_stop: self._do_exit() else: raise ValueError("Invalid mode {!r}!".format(self.mode))
[docs] def stop(self, wait=1, shutdown=True, force_secondary_shutdown=False): """ Stops all secondaries. 'wait' specifies the time (in seconds) to wait before shutting down the manager or returning. If 'shutdown', shutdown the manager. If 'force_secondary_shutdown', shutdown the secondary nodes even if they are started with 'reconnect=True'. """ if self.mode != MODE_PRIMARY: raise ModeError("Not in primary mode!") if not self.started: raise RuntimeError("Not yet started!") if self.n_tasks is None: # pragma: no cover self.n_tasks = max(5, (wait*5), self.num_workers) warnings.warn("Self.n_tasks is None; estimating at {:n}".format(self.n_tasks)) start_time = time.time() num_added = 0 poss_time_wait = max(1, (self.reconnect_max_time+60), wait, (self.worker_timeout*self.n_tasks)) while (num_added < (self.n_tasks+1)) and ((time.time() - start_time) < poss_time_wait): try: if force_secondary_shutdown: self.inqueue.put(0, block=True, timeout=0.2) else: self.inqueue.put(1, block=True, timeout=0.2) except (EOFError, IOError, OSError, socket.gaierror, TypeError, queue.Full, managers.RemoteError, multiprocessing.ProcessError) as e: # pragma: no cover status = _check_exception(e) if status == _EXCEPTION_TYPE_OK: num_added += 1 continue else: break else: num_added += 1 time_passed = time.time() - start_time if time_passed < wait: # pragma: no cover time.sleep(wait - time_passed) self.outqueue = self.inqueue = self.namespace = None if shutdown: self.em.stop() self.started = False
def _start_primary(self): """Start as the primary""" self.em.start() self._set_shared_instances() def _start_secondary(self): """Start as a secondary.""" if self.reconnect: timeout = max(5,self.worker_timeout) + time.time() keep_trying = True while keep_trying: try: self.em.start() except (EOFError, IOError, OSError, socket.gaierror, TypeError, managers.RemoteError, multiprocessing.ProcessError) as e: if ((_check_exception(e) == _EXCEPTION_TYPE_BAD) or (time.time() > timeout)): raise continue else: keep_trying = False else: self.em.start() self._set_shared_instances() def _set_shared_instances(self): """Sets attributes from the shared instances.""" self.inqueue = self.em.get_inqueue() self.outqueue = self.em.get_outqueue() self.namespace = self.em.get_namespace() def _reset_em(self): """Resets self.em and the shared instances.""" self.em = _ExtendedManager(self.addr, self.authkey, mode=self.mode, start=True) self._set_shared_instances() def _get_fitness(self, tasks, pool=None): if pool is None: res = [] for genome_id, genome, config in tasks: fitness = self.eval_function(genome, config) res.append((genome_id, fitness)) return res genome_ids = [] jobs = [] for genome_id, genome, config in tasks: genome_ids.append(genome_id) jobs.append( pool.apply_async( self.eval_function, (genome, config) ) ) results = [ job.get(timeout=self.worker_timeout) for job in jobs ] return zip(genome_ids, results) def _secondary_loop(self, reconnect_max_time): """The worker loop for the secondary nodes.""" if self.num_workers > 1: pool = multiprocessing.Pool(self.num_workers) else: pool = None self.logger_notified = False self.logger_saw_refused = False self.logger_saw_reset = False handler = _DecrefLogHandler(to_notify=self) logger = multiprocessing.get_logger() logger.addHandler(handler) should_reconnect = True em_bad = self.reconnect last_time_done = time.time() while should_reconnect: prev_last_time_done = last_time_done last_time_done = time.time() # so that if loops below, have a chance to check _reset_em running = True try: self._reset_em() except (EOFError, IOError, OSError, socket.gaierror, TypeError, managers.RemoteError, multiprocessing.ProcessError) as e: if ((time.time() - last_time_done) >= reconnect_max_time) or self.logger_notified: should_reconnect = False em_bad = True last_time_done = prev_last_time_done if _check_exception(e) == _EXCEPTION_TYPE_BAD: # pragma: no cover self.exit_on_stop = True self.exit_string = repr(e) break elif _check_exception(e) == _EXCEPTION_TYPE_BAD: # pragma: no cover raise else: continue last_time_done = time.time() # being successful at reconnecting - used as a keepalive while running: try: tasks = self.inqueue.get(block=True, timeout=0.2) except queue.Empty: continue except (EOFError, TypeError, socket.gaierror, managers.RemoteError, multiprocessing.ProcessError, IOError, OSError) as e: if ('empty' in repr(e).lower()): # pragma: no cover continue curr_status = _check_exception(e) if curr_status != _EXCEPTION_TYPE_BAD: if ((time.time() - last_time_done) >= reconnect_max_time) or self.logger_notified: if em_bad: should_reconnect = False break elif curr_status == _EXCEPTION_TYPE_OK: continue else: break elif ((time.time() - last_time_done) >= reconnect_max_time) or self.logger_notified: # pragma: no cover self.exit_on_stop = True self.exit_string = repr(e) should_reconnect = False break else: # pragma: no cover raise if isinstance(tasks, int): # from primary running = False should_reconnect = False if tasks and self.reconnect: self.exit_on_stop = False elif not tasks: if self.reconnect: reconnect_max_time /= 3 em_bad = True self.reconnect = False break last_time_done = time.time() res = self._get_fitness(tasks, pool) prev_last_time_done = last_time_done last_time_done = time.time() try: self.outqueue.put(res) except queue.Full: # pragma: no cover continue except (EOFError, TypeError, socket.gaierror, managers.RemoteError, multiprocessing.ProcessError, IOError, OSError) as e: if ('full' in repr(e).lower()): # pragma: no cover continue curr_status = _check_exception(e) if curr_status != _EXCEPTION_TYPE_BAD: if ((time.time() - last_time_done) >= reconnect_max_time) or self.logger_notified: if em_bad: should_reconnect = False last_time_done = prev_last_time_done break elif curr_status == _EXCEPTION_TYPE_OK: continue else: last_time_done = prev_last_time_done break elif ((time.time() - last_time_done) >= reconnect_max_time) or self.logger_notified: # pragma: no cover self.exit_on_stop = True self.exit_string = repr(e) should_reconnect = False break else: # pragma: no cover raise else: last_time_done = time.time() if ((time.time() - last_time_done) >= reconnect_max_time) or self.logger_notified: if em_bad: should_reconnect = False break if ((time.time() - last_time_done) >= reconnect_max_time) and (self.logger_saw_refused or self.logger_saw_reset): self.reconnect = False elif em_bad and self.logger_notified: self.reconnect = False logger.addHandler(logging.NullHandler()) logger.removeHandler(handler) if pool is not None: pool.terminate()
[docs] def evaluate(self, genomes, config): """ Evaluates the genomes. This method raises a ModeError if the DistributedEvaluator is not in primary mode. """ if self.mode != MODE_PRIMARY: raise ModeError("Not in primary mode!") tasks = [(genome_id, genome, config) for genome_id, genome in genomes] id2genome = {genome_id: genome for genome_id, genome in genomes} tasks = chunked(tasks, self.secondary_chunksize) n_tasks = len(tasks) for task in tasks: self.inqueue.put(task) # should this be w/timeouts and checking for exceptions? tresults = [] while len(tresults) < n_tasks: try: sr = self.outqueue.get(block=True, timeout=0.2) except (queue.Empty, managers.RemoteError): # more detailed check? continue tresults.append(sr) results = [] for sr in tresults: results += sr for genome_id, fitness in results: genome = id2genome[genome_id] genome.fitness = fitness self.n_tasks = n_tasks