"""
Distributed evaluation of genomes.
About compute nodes:
The primary node (=the node which creates and mutates genomes) and the secondary
nodes (=the nodes which evaluate genomes) can execute the same script. The
role of a compute node is determined using the ``mode`` argument of the
DistributedEvaluator. If the mode is MODE_AUTO, the `host_is_local()` function
is used to check if the ``addr`` argument points to the localhost. If it does,
the compute node starts as a primary node, otherwise as a secondary node. If
``mode`` is MODE_PRIMARY, the compute node always starts as a primary node. If
``mode`` is MODE_SECONDARY, the compute node will always start as a secondary node.
There can only be one primary node per NEAT, but any number of secondary nodes.
The primary node will not evaluate any genomes, which means you will always need
at least two compute nodes.
You can run any number of compute nodes on the same physical machine (or VM).
However, if a machine has both a primary node and one or more secondary nodes,
MODE_AUTO cannot be used for those secondary nodes - MODE_SECONDARY will need to be
specified.
NOTE:
This module is in a **beta** state, and still *unstable* even in single-machine
testing. Reliability is likely to vary, including depending on the Python version
and implementation (e.g., cpython vs pypy) in use and the likelihoods of timeouts
(due to machine and/or network slowness). In particular, while the code can try
to reconnect between between primary and secondary nodes, as noted in the
`multiprocessing` documentation this may not work due to data loss/corruption.
Note also that this module is *not* responsible for starting the script copies
on the different compute nodes, since this is very site/configuration-dependent.
Usage:
1. Import modules and define the evaluation logic (the eval_genome function).
(After this, check for ``if __name__ == '__main__'``, and put the rest of
the code inside the body of the statement.)
2. Load config and create a population - here, the variable ``p``.
3. If required, create and add reporters.
4. Create a ``DistributedEvaluator(addr_of_primary_node, b'some_password',
eval_function, mode=MODE_AUTO)`` - here, the variable ``de``.
5. Call ``de.start(exit_on_stop=True)``. The `start()` call will block on the
secondary nodes and call `sys.exit(0)` when the NEAT evolution finishes. This
means that the following code will only be executed on the primary node.
6. Start the evaluation using ``p.run(de.evaluate, number_of_generations)``.
7. Stop the secondary nodes using ``de.stop()``.
8. You are done. You may want to save the winning genome or show some statistics.
See ``examples/xor/evolve-feedforward-distributed.py`` for a complete example.
Utility functions:
``host_is_local(hostname, port=22)`` returns True if ``hostname`` points to
the local node/host. This can be used to check if a compute node will run as
a primary node or as a secondary node with MODE_AUTO.
``chunked(data, chunksize)``: splits data into a list of chunks with at most
``chunksize`` elements.
"""
from __future__ import print_function
import logging
import socket
import sys
import time
import warnings
# below still needed for queue.Empty
try:
# pylint: disable=import-error
import Queue as queue
except ImportError:
# pylint: disable=import-error
import queue
import multiprocessing
from multiprocessing import managers
from argparse import Namespace
# Some of this code is based on
# http://eli.thegreenplace.net/2012/01/24/distributed-computing-in-python-with-multiprocessing
# According to the website, the code is in the public domain
# ('public domain' links to unlicense.org).
# This means that we can use the code from this website.
# Thanks to Eli Bendersky for making his code open for use.
# modes to determine the role of a compute node
# the primary handles the evolution of the genomes
# the secondary handles the evaluation of the genomes
MODE_AUTO = 0 # auto-determine mode
MODE_PRIMARY = MODE_MASTER = 1 # enforce primary mode
MODE_SECONDARY = MODE_SLAVE = 2 # enforce secondary mode
# what a return from _check_exception means
_EXCEPTION_TYPE_OK = 1 # queue empty and similar; try again
_EXCEPTION_TYPE_UNCERTAIN = 0 # disconnected but may be able to reconnect
_EXCEPTION_TYPE_BAD = -1 # raise it again or immediately return & exit with non-zero status code
[docs]class ModeError(RuntimeError):
"""
An exception raised when a mode-specific method is being
called without being in the mode - either a primary-specific method
called by a secondary node or a secondary-specific method called by a primary node.
"""
pass
[docs]def host_is_local(hostname, port=22): # no port specified, just use the ssh port
"""
Returns True if the hostname points to the localhost, otherwise False.
"""
hostname = socket.getfqdn(hostname)
if hostname in ("localhost", "0.0.0.0", "127.0.0.1", "1.0.0.127.in-addr.arpa",
"1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa"):
return True
localhost = socket.gethostname()
if hostname == localhost:
return True
localaddrs = socket.getaddrinfo(localhost, port)
targetaddrs = socket.getaddrinfo(hostname, port)
for (ignored_family, ignored_socktype, ignored_proto, ignored_canonname,
sockaddr) in localaddrs:
for (ignored_rfamily, ignored_rsocktype, ignored_rproto,
ignored_rcanonname, rsockaddr) in targetaddrs:
if rsockaddr[0] == sockaddr[0]:
return True
return False
[docs]def _determine_mode(addr, mode):
"""
Returns the mode which should be used.
If mode is MODE_AUTO, this is determined by checking if 'addr' points to the
local host. If it does, return MODE_PRIMARY, else return MODE_SECONDARY.
If mode is either MODE_PRIMARY or MODE_SECONDARY,
return the 'mode' argument. Otherwise, a ValueError is raised.
"""
if isinstance(addr, tuple):
host = addr[0]
elif type(addr) == type(b"binary_string"):
host = addr
else:
raise TypeError("'addr' needs to be a tuple or bytestring!")
if mode == MODE_AUTO:
if host_is_local(host):
return MODE_PRIMARY
return MODE_SECONDARY
elif mode in (MODE_SECONDARY, MODE_PRIMARY):
return mode
else:
raise ValueError("Invalid mode {!r}!".format(mode))
[docs]def chunked(data, chunksize):
"""
Returns a list of chunks containing at most ``chunksize`` elements of data.
"""
if chunksize < 1:
raise ValueError("Chunksize must be at least 1!")
if int(chunksize) != chunksize:
raise ValueError("Chunksize needs to be an integer")
res = []
cur = []
for e in data:
cur.append(e)
if len(cur) >= chunksize:
res.append(cur)
cur = []
if cur:
res.append(cur)
return res
[docs]def _check_exception(e):
string = repr(e).lower()
if ('timed' in string) or ('timeout' in string):
return _EXCEPTION_TYPE_OK
elif isinstance(e, (queue.Full, queue.Empty)):
return _EXCEPTION_TYPE_OK
elif isinstance(e, (EOFError, TypeError, socket.gaierror)):
return _EXCEPTION_TYPE_UNCERTAIN
elif (('eoferror' in string) or ('typeerror' in string) or ('gaierror' in string)
or ('pipeerror' in string) or ('authenticationerror' in string)
or ('refused' in string) or ('file descriptor' in string)
or ('reset' in string)):
return _EXCEPTION_TYPE_UNCERTAIN
return _EXCEPTION_TYPE_BAD
class _DecrefLogHandler(logging.Handler):
"""
Class to catch 'decref failed' multiprocessing messages
in secondaries and cause a shutdown
"""
def __init__(self, to_notify, level=logging.DEBUG):
self.to_notify = to_notify
logging.Handler.__init__(self, level=min(level,logging.DEBUG))
def filter(self, record):
text = record.getMessage()
if ('decref' in text.lower()) and ('failed' in text.lower()):
return True
return False
def emit(self, record):
text = record.getMessage()
if ('decref' not in text.lower()) or ('failed' not in text.lower()): # pragma: no cover
warnings.warn(
"_DecrefLogHandler called with message {0!r} ({1!r})".format(
text, record),
RuntimeWarning)
return
self.to_notify.logger_notified = True
if 'refused' in text.lower():
self.to_notify.logger_saw_refused = True
elif 'reset' in text.lower():
self.to_notify.logger_saw_reset = True
[docs]class _ExtendedManager(object):
"""A class for managing the multiprocessing.managers.SyncManager"""
__safe_for_unpickling__ = True # this may not be safe for unpickling,
# but this is required by pickle.
def __init__(self, addr, authkey, mode, start=False):
self.addr = addr
self.authkey = authkey
self.mode = _determine_mode(addr, mode)
self.manager = None
if start:
self.start()
[docs] def __reduce__(self): # pragma: no cover
"""
This method is used by pickle to serialize instances of this class.
"""
return (
self.__class__,
(self.addr, self.authkey, self.mode, bool(self.manager is not None)),
)
[docs] def start(self):
"""Starts or connects to the manager."""
if self.manager is None:
if self.mode == MODE_PRIMARY:
i = self._start()
else:
i = self._connect()
self.manager = i
[docs] def stop(self):
"""Stops the manager."""
if (self.manager is not None) and (self.mode == MODE_PRIMARY):
self.manager.shutdown()
self.manager = None
@staticmethod
def _get_manager_class(register_callables=False):
"""
Returns a new 'Manager' subclass with registered methods.
If 'register_callable' is True, defines the 'callable' arguments.
"""
class _EvaluatorSyncManager(managers.BaseManager):
"""
A custom BaseManager.
Please see the documentation of `multiprocessing` for more
information.
"""
pass
inqueue = queue.Queue()
outqueue = queue.Queue()
namespace = Namespace() # Does this need to be from multiprocessing.managers.SyncManager?
if register_callables:
_EvaluatorSyncManager.register(
"get_inqueue",
callable=lambda: inqueue,
)
_EvaluatorSyncManager.register(
"get_outqueue",
callable=lambda: outqueue,
)
_EvaluatorSyncManager.register(
"get_namespace",
callable=lambda: namespace,
)
else:
_EvaluatorSyncManager.register(
"get_inqueue",
)
_EvaluatorSyncManager.register(
"get_outqueue",
)
_EvaluatorSyncManager.register(
"get_namespace",
)
return _EvaluatorSyncManager
def _connect(self):
"""Connects to the manager."""
cls = self._get_manager_class(register_callables=False)
ins = cls(address=self.addr, authkey=self.authkey)
ins.connect()
return ins
def _start(self):
"""Starts the manager."""
cls = self._get_manager_class(register_callables=True)
ins = cls(address=self.addr, authkey=self.authkey)
ins.start()
return ins
[docs] def get_inqueue(self):
"""Returns the inqueue."""
if self.manager is None:
raise RuntimeError("Manager not started")
return self.manager.get_inqueue()
[docs] def get_outqueue(self):
"""Returns the outqueue."""
if self.manager is None:
raise RuntimeError("Manager not started")
return self.manager.get_outqueue()
[docs] def get_namespace(self):
"""Returns the namespace."""
if self.manager is None:
raise RuntimeError("Manager not started")
return self.manager.get_namespace()
[docs]class DistributedEvaluator(object):
"""An evaluator working across multiple machines"""
def __init__(
self,
addr,
authkey,
eval_function,
secondary_chunksize=1,
num_workers=None,
worker_timeout=60,
mode=MODE_AUTO,
):
"""
``addr`` should be a tuple of (hostname, port) pointing to the machine
running the DistributedEvaluator in primary mode. If mode is MODE_AUTO,
the mode is determined by checking whether the hostname points to this
host or not.
``authkey`` is the password used to restrict access to the manager; see
``Authentication Keys`` in the `multiprocessing` manual for more information.
All DistributedEvaluators need to use the same authkey. Note that this needs
to be a `bytes` object for Python 3.X, and should be in 2.7 for compatibility
(identical in 2.7 to a `str` object).
``eval_function`` should take two arguments (a genome object and the
configuration) and return a single float (the genome's fitness).
'secondary_chunksize' specifies the number of genomes that will be sent to
a secondary at any one time.
``num_workers`` is the number of child processes to use if in secondary
mode. It defaults to None, which means `multiprocessing.cpu_count()`
is used to determine this value. If 1 in a secondary node, the process creating
the DistributedEvaluator instance will also do the evaulations.
``worker_timeout`` specifies the timeout (in seconds) for a secondary node
getting the results from a worker subprocess; if None, there is no timeout.
``mode`` specifies the mode to run in; it defaults to MODE_AUTO.
"""
self.addr = addr
self.authkey = authkey
self.eval_function = eval_function
self.secondary_chunksize = secondary_chunksize
self.slave_chunksize = secondary_chunksize # backward compatibility
if num_workers:
self.num_workers = num_workers
else:
try:
self.num_workers = max(1,multiprocessing.cpu_count())
except (RuntimeError, AttributeError): # pragma: no cover
print("multiprocessing.cpu_count() gave an error; assuming 1",
file=sys.stderr)
self.num_workers = 1
self.worker_timeout = worker_timeout
self.mode = _determine_mode(self.addr, mode)
self.em = _ExtendedManager(self.addr, self.authkey, mode=self.mode, start=False)
self.inqueue = None
self.outqueue = None
self.namespace = None
self.started = False
self.exit_string = None
self.exit_on_stop = True
self.reconnect = False
self.reconnect_max_time = None
self.n_tasks = None
self.logger_notified = False
self.logger_saw_refused = False
self.logger_saw_reset = False
def __getstate__(self): # pragma: no cover
"""Required by the pickle protocol."""
# we do not actually save any state, but we need __getstate__ to be
# called.
return True # return some nonzero value
def __setstate__(self, state): # pragma: no cover
"""Called when instances of this class are unpickled."""
self._set_shared_instances()
[docs] def is_primary(self):
"""Returns True if the caller is the primary node"""
return (self.mode == MODE_PRIMARY)
[docs] def is_master(self): # pragma: no cover
"""Returns True if the caller is the primary (master) node"""
warnings.warn("Use is_primary, not is_master", DeprecationWarning)
return self.is_primary()
def _do_exit(self):
if self.exit_string is None:
sys.exit(0)
else: # pragma: no cover
sys.exit(self.exit_string)
[docs] def start(self, exit_on_stop=True, secondary_wait=0, reconnect=False, reconnect_max_time=None):
"""
If the DistributedEvaluator is in primary mode, starts the manager
process and returns. In this case, the ``exit_on_stop`` argument will
be ignored.
If the DistributedEvaluator is in secondary mode, it connects to the manager
and waits for tasks.
If in secondary mode and ``exit_on_stop`` is True, sys.exit() will be called
when the connection is lost.
``secondary_wait`` specifies the time (in seconds) to sleep before actually
starting when in secondary mode.
If 'reconnect' is True, the secondary nodes will try to reconnect when
the connection is lost. In this case, sys.exit() will only be called
when 'exit_on_stop' is True and the primary node send a forced shutdown
command.
"""
if self.started:
raise RuntimeError("DistributedEvaluator already started!")
self.started = True
self.exit_on_stop = exit_on_stop
self.reconnect = reconnect
if reconnect_max_time is None:
if reconnect:
reconnect_max_time = 6*max(60,self.worker_timeout)
else:
reconnect_max_time = 2*max(60,self.worker_timeout)
self.reconnect_max_time = max(0.5,reconnect_max_time)
if self.mode == MODE_PRIMARY:
self._start_primary()
elif self.mode == MODE_SECONDARY:
time.sleep(secondary_wait)
while True:
self._start_secondary()
self._secondary_loop(reconnect_max_time=reconnect_max_time)
if self.exit_on_stop:
self._do_exit()
else:
self.inqueue = self.outqueue = self.namespace = None
if self.reconnect:
self.em.stop()
else:
break
if exit_on_stop:
self._do_exit()
else:
raise ValueError("Invalid mode {!r}!".format(self.mode))
[docs] def stop(self, wait=1, shutdown=True, force_secondary_shutdown=False):
"""
Stops all secondaries.
'wait' specifies the time (in seconds) to wait before shutting down the
manager or returning.
If 'shutdown', shutdown the manager.
If 'force_secondary_shutdown', shutdown the secondary nodes even if
they are started with 'reconnect=True'.
"""
if self.mode != MODE_PRIMARY:
raise ModeError("Not in primary mode!")
if not self.started:
raise RuntimeError("Not yet started!")
if self.n_tasks is None: # pragma: no cover
self.n_tasks = max(5, (wait*5), self.num_workers)
warnings.warn("Self.n_tasks is None; estimating at {:n}".format(self.n_tasks))
start_time = time.time()
num_added = 0
poss_time_wait = max(1,
(self.reconnect_max_time+60),
wait,
(self.worker_timeout*self.n_tasks))
while (num_added < (self.n_tasks+1)) and ((time.time() - start_time) < poss_time_wait):
try:
if force_secondary_shutdown:
self.inqueue.put(0, block=True, timeout=0.2)
else:
self.inqueue.put(1, block=True, timeout=0.2)
except (EOFError, IOError, OSError, socket.gaierror, TypeError, queue.Full,
managers.RemoteError, multiprocessing.ProcessError) as e: # pragma: no cover
status = _check_exception(e)
if status == _EXCEPTION_TYPE_OK:
num_added += 1
continue
else:
break
else:
num_added += 1
time_passed = time.time() - start_time
if time_passed < wait: # pragma: no cover
time.sleep(wait - time_passed)
self.outqueue = self.inqueue = self.namespace = None
if shutdown:
self.em.stop()
self.started = False
def _start_primary(self):
"""Start as the primary"""
self.em.start()
self._set_shared_instances()
def _start_secondary(self):
"""Start as a secondary."""
if self.reconnect:
timeout = max(5,self.worker_timeout) + time.time()
keep_trying = True
while keep_trying:
try:
self.em.start()
except (EOFError, IOError, OSError, socket.gaierror, TypeError,
managers.RemoteError, multiprocessing.ProcessError) as e:
if ((_check_exception(e) == _EXCEPTION_TYPE_BAD) or
(time.time() > timeout)):
raise
continue
else:
keep_trying = False
else:
self.em.start()
self._set_shared_instances()
def _set_shared_instances(self):
"""Sets attributes from the shared instances."""
self.inqueue = self.em.get_inqueue()
self.outqueue = self.em.get_outqueue()
self.namespace = self.em.get_namespace()
def _reset_em(self):
"""Resets self.em and the shared instances."""
self.em = _ExtendedManager(self.addr, self.authkey, mode=self.mode, start=True)
self._set_shared_instances()
def _get_fitness(self, tasks, pool=None):
if pool is None:
res = []
for genome_id, genome, config in tasks:
fitness = self.eval_function(genome, config)
res.append((genome_id, fitness))
return res
genome_ids = []
jobs = []
for genome_id, genome, config in tasks:
genome_ids.append(genome_id)
jobs.append(
pool.apply_async(
self.eval_function, (genome, config)
)
)
results = [
job.get(timeout=self.worker_timeout) for job in jobs
]
return zip(genome_ids, results)
def _secondary_loop(self, reconnect_max_time):
"""The worker loop for the secondary nodes."""
if self.num_workers > 1:
pool = multiprocessing.Pool(self.num_workers)
else:
pool = None
self.logger_notified = False
self.logger_saw_refused = False
self.logger_saw_reset = False
handler = _DecrefLogHandler(to_notify=self)
logger = multiprocessing.get_logger()
logger.addHandler(handler)
should_reconnect = True
em_bad = self.reconnect
last_time_done = time.time()
while should_reconnect:
prev_last_time_done = last_time_done
last_time_done = time.time() # so that if loops below, have a chance to check _reset_em
running = True
try:
self._reset_em()
except (EOFError, IOError, OSError, socket.gaierror, TypeError,
managers.RemoteError, multiprocessing.ProcessError) as e:
if ((time.time() - last_time_done) >= reconnect_max_time) or self.logger_notified:
should_reconnect = False
em_bad = True
last_time_done = prev_last_time_done
if _check_exception(e) == _EXCEPTION_TYPE_BAD: # pragma: no cover
self.exit_on_stop = True
self.exit_string = repr(e)
break
elif _check_exception(e) == _EXCEPTION_TYPE_BAD: # pragma: no cover
raise
else:
continue
last_time_done = time.time() # being successful at reconnecting - used as a keepalive
while running:
try:
tasks = self.inqueue.get(block=True, timeout=0.2)
except queue.Empty:
continue
except (EOFError, TypeError, socket.gaierror,
managers.RemoteError, multiprocessing.ProcessError,
IOError, OSError) as e:
if ('empty' in repr(e).lower()): # pragma: no cover
continue
curr_status = _check_exception(e)
if curr_status != _EXCEPTION_TYPE_BAD:
if ((time.time() - last_time_done)
>= reconnect_max_time) or self.logger_notified:
if em_bad:
should_reconnect = False
break
elif curr_status == _EXCEPTION_TYPE_OK:
continue
else:
break
elif ((time.time() - last_time_done)
>= reconnect_max_time) or self.logger_notified: # pragma: no cover
self.exit_on_stop = True
self.exit_string = repr(e)
should_reconnect = False
break
else: # pragma: no cover
raise
if isinstance(tasks, int): # from primary
running = False
should_reconnect = False
if tasks and self.reconnect:
self.exit_on_stop = False
elif not tasks:
if self.reconnect:
reconnect_max_time /= 3
em_bad = True
self.reconnect = False
break
last_time_done = time.time()
res = self._get_fitness(tasks, pool)
prev_last_time_done = last_time_done
last_time_done = time.time()
try:
self.outqueue.put(res)
except queue.Full: # pragma: no cover
continue
except (EOFError, TypeError, socket.gaierror,
managers.RemoteError, multiprocessing.ProcessError,
IOError, OSError) as e:
if ('full' in repr(e).lower()): # pragma: no cover
continue
curr_status = _check_exception(e)
if curr_status != _EXCEPTION_TYPE_BAD:
if ((time.time() - last_time_done)
>= reconnect_max_time) or self.logger_notified:
if em_bad:
should_reconnect = False
last_time_done = prev_last_time_done
break
elif curr_status == _EXCEPTION_TYPE_OK:
continue
else:
last_time_done = prev_last_time_done
break
elif ((time.time() - last_time_done)
>= reconnect_max_time) or self.logger_notified: # pragma: no cover
self.exit_on_stop = True
self.exit_string = repr(e)
should_reconnect = False
break
else: # pragma: no cover
raise
else:
last_time_done = time.time()
if ((time.time() - last_time_done)
>= reconnect_max_time) or self.logger_notified:
if em_bad:
should_reconnect = False
break
if ((time.time() - last_time_done)
>= reconnect_max_time) and (self.logger_saw_refused
or self.logger_saw_reset):
self.reconnect = False
elif em_bad and self.logger_notified:
self.reconnect = False
logger.addHandler(logging.NullHandler())
logger.removeHandler(handler)
if pool is not None:
pool.terminate()
[docs] def evaluate(self, genomes, config):
"""
Evaluates the genomes.
This method raises a ModeError if the
DistributedEvaluator is not in primary mode.
"""
if self.mode != MODE_PRIMARY:
raise ModeError("Not in primary mode!")
tasks = [(genome_id, genome, config) for genome_id, genome in genomes]
id2genome = {genome_id: genome for genome_id, genome in genomes}
tasks = chunked(tasks, self.secondary_chunksize)
n_tasks = len(tasks)
for task in tasks:
self.inqueue.put(task) # should this be w/timeouts and checking for exceptions?
tresults = []
while len(tresults) < n_tasks:
try:
sr = self.outqueue.get(block=True, timeout=0.2)
except (queue.Empty, managers.RemoteError): # more detailed check?
continue
tresults.append(sr)
results = []
for sr in tresults:
results += sr
for genome_id, fitness in results:
genome = id2genome[genome_id]
genome.fitness = fitness
self.n_tasks = n_tasks