Multiprocessing in Python: a guided tour with examples

This document is a survey of several different ways of implementing
multiprocessing systems in Python. It attempts to provide a small
amount of guidance on when it is appropriate and useful to use these
different approaches, and when not.

1.1 Motivation

We all have multi-core machines. It’s easy to imagine a home will
multiple computers and devices of several different kinds connected
on a LAN (local area network) through Ethernet or wireless
connections. Most (soon all) of those devices have multiple cores.
And, yet most of that power is wasted while many of those cores are
idle.

So, why do we all have machines with so many unused cores. Because
Intel and AMD must compete, and to do so, must give us what appear
to be faster machines. They can’t give us more cycles (per second),
since, if they did, our machines would melt. So, they give us
additional cores. They number of transistors goes up, and Moore’s
law (technically) holds true, but for most of us, that power is
largely unused and unusable.

Enough ranting … The alternatives and options discussed in this
document are all intended to solve that problem. We have tools that
are looking for uses. We need to learn how to put them to fuller
use so that next year we can justify buying yet another machine with
more cores to add to our home networks.

My central goal in writing this document is to enable and encourage
more of us to write the software that puts those machines and cores
to work.

And, note Larry Wall’s three virtues of programming, in particular:

“Impatience: The anger you feel when the computer is being lazy.
This makes you write programs that don’t just react to your
needs, but actually anticipate them. Or at least pretend to.”

See: http://threevirtues.com/

1.3 Our approach in this document

For each of the above alternatives I’ll try to cover: (1)
appropriate (and inappropriate) uses; (2) possible use cases; (3)
some how-to instruction; and example code.

Notice that we will be paying special attention to one specific
multiprocessing programming pattern. We want a scheme in which (1)
there are multiple servers; (2) there are multiple clients; (3) any
client can submit a task (function call) to be evaluated by any
available server. You might thing of this pattern as using a pool
of servers (processes) to which clients can submit (often compute
intensive) function calls.

XML-RPC is a simple and easy way to get distributed processing.
With it, you cat request that a function be called in a Python
process on a remote machine and that the result be returned to you.

We’ll use the modules in the Python standard library.

On the server side, we implement conventional Python functions, and
then register them with an XML-RPC server. Here is a simple, sample
server:

#!/usr/bin/env python

“””
Synopsis:
A simple XML-RPC server.
“””

#import xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCServer
import inspect

class Methods(object):
def multiply(self, x, y):
return x * y

def is_even(n):
“””Return True if n is even.”””
return n % 2 == 0

def is_odd(n):
“””Return True if n is odd.”””
return n % 2 == 1

def listMethods():
“””Return a list of supported method names.”””
return Supported_methods.keys()

def methodSignature(method_name):
“””Return the signature of a method.”””
if method_name in Supported_methods:
func = Supported_methods[method_name]
return inspect.getargspec(func).args
else:
return ‘Error. Function “{}” not supported.’.format(method_name)

def methodHelp(method_name):
“””Return the doc string for a method.”””
if method_name in Supported_methods:
func = Supported_methods[method_name]
return func.__doc__
else:
return ‘Error. Function “{}” not supported.’.format(method_name)

Supported_methods = {
‘is_even’: is_even,
‘is_odd’: is_odd,
‘listMethods’: listMethods,
‘methodSignature’: methodSignature,
‘methodHelp’: methodHelp,
}

def start():
node = ‘192.168.0.7’
port = 8000
server = SimpleXMLRPCServer((node, port))
print “Listening on {} at port {} …”.format(node, port)
for name, func in Supported_methods.items():
server.register_function(func, name)
methods = Methods()
multiply = methods.multiply
server.register_function(multiply, ‘multiply’)
server.register_function(listMethods, “listMethods”)
server.serve_forever()

def main():
start()

if __name__ == ‘__main__’:
main()

And, on the client side, it’s simply a matter of creating a “proxy”
and doing what looks like a standard Python function call through
that proxy. Here is a simple, sample client:

#!/usr/bin/env python

“””
Synopsis:
A simple XML-RPC client.
“””

import xmlrpclib

def discover_methods(proxy):
method_names = proxy.listMethods()
for method_name in method_names:
sig = proxy.methodSignature(method_name)
help = proxy.methodHelp(method_name)
print ‘Method — {}’.format(method_name)
print ‘ Signature: {}’.format(sig)
print ‘ Help : {}’.format(help)

def request(proxy, ival):
ret_ival = str(proxy.is_even(ival))
print ” is even: “.format(ival, ret_ival)

def main():
node = ‘192.168.0.7’
port = 8000
url = “http://{}:{}”.format(node, port)
proxy = xmlrpclib.ServerProxy(url)
print “Requests sent to {} at port {} …”.format(node, port)
discover_methods(proxy)
for ival in range(10):
request(proxy, ival)
answer = proxy.multiply(5, 3)
print ‘multiply answer: {}’.format(answer)

if __name__ == ‘__main__’:
main()

Notes:

  • If you only want to access this XML-RPC server only from the local
    machine, then you might create the server with the following:

    server = SimpleXMLRPCServer((‘localhost’, 8000))

    And, in the client, create the proxy with the following:

    proxy = xmlrpclib.ServerProxy(http://localhost:8000)

  • Notice that in the server, we can expose a method from within
    a class, also.

FYI, I’ve been able to run the above XML-RPC scripts across my LAN.
In fact, I’ve run the server on one of my desktop machines, and I
connect via WiFi from the client on my Android smart phone using
QPython. For more information about QPython see:
http://qpython.com/.

There is documentation here:
http://ipython.org/ipython-doc/dev/parallel/index.html

One easy way to install Python itself and IPython, SciPy, Numpy, etc.
is to install the Anaconda toolkit. You can find out about it here:
http://www.continuum.io/ and here
https://store.continuum.io/cshop/anaconda/.

We’d like to know how to submit tasks for parallel execution. Here
is a bit of instruction on how to do it.

  1. Create the cluster. Use the ipcluster executable from the
    IPython parallel processing. Example:

    $ ipcluster start -n 4

  2. Create a client and a load balanced view. Example:

    client = Client()
    view = client.load_balanced_view()

  1. Submit several tasks. Example:

    r1 = view.apply(f1, delay, value1, value2)
    r2 = view.apply(f1, delay, value1 + 1, value2 + 1)

  2. Get the results. Example:

    print r1.result, r2.result

Here is the code. Example:

from IPython.parallel import Client

def test(view, delay, value1, value2):
r1 = view.apply(f1, delay, value1, value2)
r2 = view.apply(f1, delay, value1 + 1, value2 + 1)
r3 = view.apply(f1, delay, value1 + 2, value2 + 2)
r4 = view.apply(f1, delay, value1 + 3, value2 + 3)
print ‘waiting …’
return r1.result, r2.result, r3.result, r4.result

def f1(t, x, y):
import time
time.sleep(t)
r = x + y + 3
return r

def main():
client = Client()
view = client.load_balanced_view()
results = test(view, 5, 3, 4)
print ‘results:’, results

if __name__ == ‘__main__’:
main()

Notes:

  • This example asks parallel python to execute four function calls
    in parallel in four separate processes.

  • Because these function calls are executed in separate processes,
    they avoid conflict over Python’s GIL (global interpreter lock).

  • We started the cluster with the default scheduler scheme, which is
    “least load”. For other schemes do the following and look for
    “scheme”:

    $ ipcontroller help

    Also see:
    http://ipython.org/ipython-doc/dev/parallel/parallel_task.html#schedulers

3.1 Remote machines and engines

Submitting jobs to be run on IPython engines on a remote machine
turns out, in some cases at least, to be very easy. Do the
following:

  • Start the IPython controller and engines on the remote machine.
    For example:

    $ ipcluster start -n 4

  • Copy your client profile
    ~/.ipython/profile_default/security/ipcontroller-client.json
    from the remote machine to the security/ directory under the
    profile you will be using on the local machine.

  • When you create your client, use something like the following:

    client = Client(sshserver=’your_user_name@192.168.0.7′)

    But change the user name and IP address to that of the remote
    machine.

There is more information on using IPython parallel computing with
remote hosts here:
http://ipython.org/ipython-doc/dev/parallel/parallel_process.html#using-the-ipcontroller-and-ipengine-commands

The python standard library contains the module multiprocessing.
That module (it’s actually a Python package or a library that acts
like a module) contains some reasonable support for creating and
running multiple processes implemented in Python and for
communicating between those processes using Queues and Pipes
(also in the multiprocessing module). You can learn more about
that module here:
https://docs.python.org/2/library/multiprocessing.html

Be aware that the multiprocessing module creates separate
operating system processes. Each one runs in its own memory
space; each one has its own Python interpreter; each one has its own
GIL (global interpreter lock); each one has its own copies of
imported modules; and each module in each of these multiple
processes has its own copies of global variables.

The documentation has examples. And, here is some sample code that
is a little more complex:

#!/usr/bin/env python
“””
synopsis:
Example of the use of the Python multiprocessing module.
usage:
python multiprocessing_module_01.py
“””

import argparse
import operator
from multiprocessing import Process, Queue
import numpy as np
import py_math_01

def run_jobs(args):
“””Create several processes, start each one, and collect the results.
“””
queue01 = Queue()
queue02 = Queue()
queue03 = Queue()
queue04 = Queue()
m = 4
n = 3
process01 = Process(target=f_multiproc, args=(queue01, ‘process01’, m, n))
process02 = Process(target=f_multiproc, args=(queue02, ‘process02’, m, n))
process03 = Process(target=f_multiproc, args=(queue03, ‘process03’, m, n))
process04 = Process(target=f_multiproc, args=(queue04, ‘process04’, m, n))
process01.start()
process02.start()
process03.start()
process04.start()
raw_input(‘Check for existence of multiple processes, then press Enter’)
process01.join()
process02.join()
process03.join()
process04.join()
raw_input(‘Check to see if they disappeared, then press Enter’)
print queue01.get()
print queue02.get()
print queue03.get()
print queue04.get()

def f_multiproc(queue, processname, m, n):
seed = reduce(operator.add, [ord(x) for x in processname], 0)
np.random.seed(seed)
result = py_math_01.test_01(m, n)
result1 = result.tolist()
result2 = ‘Process name: {}n{}n—–‘.format(processname, result1)
queue.put(result2)

def main():
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,)
args = parser.parse_args()
run_jobs(args)

if __name__ == ‘__main__’:
#import ipdb; ipdb.set_trace()
main()

The above code does the following:

  1. Create a number of processes (instances of class Process) and
    a queue (instance of class Queue) for each one.
  2. Start each process.
  3. Wait for the user to press enter. This gives the user time to
    check to see that separate processes have actually been created.
    On Linux, I use htop or top to view processes. On MS
    Windows, you should be able to use the Task Manager.
  4. Wait for each process to finish by calling process.join().
  5. Get and print the contents that each process put in its queue.

Some benefits of using the multiprocessing module:

  • For CPU intensive computations, you will be able to make use of
    multiple cores and multiple CPUs concurrently. This is because
    each process has its own Python interpreter with its own GIL
    (global interpreter lock).
  • For processes that are I/O bound, you should also be able to get
    the speed-up from concurrency.
  • Since each process runs independently in its own memory space,
    your processes will be safer from the possibility of stepping on
    each other’s values. However, the multiprocessing module
    provides synchronization primitives (for example, class
    multiprocessing.Lock) and a facility for shared memory across
    processes (the multiprocessing.Array class), if you really
    want that kind of problem.

Information about Parallel Python is here:
http://www.parallelpython.com/

Here is a description from the Parallel Python Web site:

PP is a python module which provides mechanism for parallel
execution of python code on SMP (systems with multiple
processors or cores) and clusters (computers connected via
network).

It is light, easy to install and integrate with other python
software.

PP is an open source and cross-platform module written in
pure python

Features:

  • Parallel execution of python code on SMP and clusters
  • Easy to understand and implement job-based parallelization
    technique (easy to convert serial application in parallel)
  • Automatic detection of the optimal configuration (by default
    the number of worker processes is set to the number of
    effective processors)
  • Dynamic processors allocation (number of worker processes can
    be changed at run-time)
  • Low overhead for subsequent jobs with the same function
    (transparent caching is implemented to decrease the overhead)
  • Dynamic load balancing (jobs are distributed between processors at run-time)
  • Fault-tolerance (if one of the nodes fails tasks are
    rescheduled on others)
  • Auto-discovery of computational resources
  • Dynamic allocation of computational resources (consequence of
    auto-discovery and fault-tolerance)
  • SHA based authentication for network connections
  • Cross-platform portability and interoperability (Windows,
    Linux, Unix, Mac OS X)
  • Cross-architecture portability and interoperability (x86,
    x86-64, etc.)
  • Open source

The examples provided with the distribution work well. But, the
project does not seem very active.

Here is a quote:

ØMQ in a Hundred Words

ØMQ (also known as ZeroMQ, 0MQ, or zmq) looks like an embeddable
networking library but acts like a concurrency framework. It
gives you sockets that carry atomic messages across various
transports like in-process, inter-process, TCP, and multicast.
You can connect sockets N-to-N with patterns like fan-out,
pub-sub, task distribution, and request-reply. It’s fast enough
to be the fabric for clustered products. Its asynchronous I/O
model gives you scalable multi-core applications, built as
asynchronous message-processing tasks. It has a score of
language APIs and runs on most operating systems. ØMQ is from
iMatix and is LGPLv3 open source. [Pieter Hintjens;
http://zguide.zeromq.org/page:all]

pyzmq, which provides zmq, is the Python bindings for
ZeroMQ.

Note that ZeroMQ is underneath IPython parallel. So, it may be
appropriate to think of IPython parallel computing as a high level
wrapper around ZeroMQ.

There is a good set of examples written in a number of different
languages for ZeroMQ. To get them, download the ZeroMQ guide
(https://github.com/imatix/zguide.git), then (for us Python
programmers) look in zguide/examples/Python.

In order to use pyzmq and to run the examples, you will need to
install:

  • ZeroMQ — http://zeromq.org/
  • Python bindings for ZeroMQ —
    http://zeromq.org/bindings:python
    You can also find it at the Python Package Index:
    https://pypi.python.org/pypi/pyzmq

For my testing with Python, I used the Anaconda Python distribution,
which contains support for zmq.

We should note that with ZeroMQ, our programming is in some sense
using the Actor model, as does Erlang. This is the Actor model in
the sense that (1) we are creating separate processes which do not share
(in memory) resources and (2) we communicate between those processes
by sending messages and waiting on message queues. ZeroMQ differs
from Erlang, with respect to the Actor model in the following ways:

  • In Erlang the processes are internal to Erlang; in ZeroMQ the
    different processes are operating system processes. Therefore, in
    ZeroMQ the processes are more heavy weight and slow to start and
    stop than those in Erlang. However, notice that when we use an
    Erlang port or use Erlport to call Python code from Erlang, we do
    in fact create separate OS processes.
  • One way to start those ZeroMQ processes is at the shell (e.g.,
    bash) command line. I can imagine starting multiple processes
    from within Python code using the subprocess module or within
    Node.js code using the Child Process (child_process) module. But,
    whereas in Erlang, it is reasonable to start up hundreds of
    concurrent processes, with ZeroMQ, we are unlikely to want to do
    that.
  • In Erlang the processes each have an identity, and we send
    messages to a process; in Erlang, the queues are anonymous. In
    ZeroMQ, the queues have an identity, and we send messages to a
    queue and receive messages from a queue; in ZeroMQ, the processes
    are anonymous, and, in fact, our code often does not even know how
    many processes have connected to the far end of one of our queues.

6.1 Hello world server and client

Here is a “Hello, World” server that uses pyzmq:

#!/usr/bin/env python

“””
Hello World server in Python
Binds REP socket to tcp://*:5555
Expects b”Hello” from client, replies with b”World”
“””

import sys
import time
import zmq

def main():
args = sys.argv[1:]
if len(args) != 1:
sys.exit(‘usage: python hwserver.py <label>’)
label = args[0]
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(“tcp://*:5555”)
count = 0
while True:
count += 1
# Wait for next request from client
message = socket.recv()
print(“Received request: {} {}”.format(message, count))
# Do some ‘work’
time.sleep(1)
# Send reply back to client
socket.send(b”{} {} {}”.format(label, message, count))

main()

And, here is the “Hello, World” client using pyzmq:

#!/usr/bin/env python

“””
Hello World client in Python
Connects REQ socket to tcp://localhost:5555
Sends “Hello” to server, expects “World” back
“””

import sys
import zmq

def main():
args = sys.argv[1:]
if len(args) != 1:
sys.exit(‘usage: python hwserver.py <label>’)
label = args[0]
context = zmq.Context()
# Socket to talk to server
print(“Connecting to hello world server…”)
socket = context.socket(zmq.REQ)
socket.connect(“tcp://localhost:5555”)
# Do 10 requests, waiting each time for a response
for request in range(10):
print(“Sending request %s …” % request)
socket.send(b”Hello from {}”.format(label))
# Get the reply.
message = socket.recv()
print(“Received reply %s [ %s ]” % (request, message))

main()

If you start hwserver.py in one (bash) session and
hwclient.py in another session, you should see the server and
the client echoing each other in their respective sessions.

However, if you start one instance of hwserver.py and multiple
instances of hwclient.py, you will notice a longer delay between
each echo. That’s because multiple clients are waiting on a single
server. Notice the delay (time.sleep(1)) in the server. Our
next challenge is to run the server in multiple processes so that
the load from multiple clients will balanced across multiple
servers. We could use IPython multiple processing to do that. But,
there are ways to accomplish something similar with ZeroMQ itself.
See, for example, the documentation on
A Load Balancing Message Broker.

6.2 ZeroMQ for multiprocessing and mixed language programming

One significant benefit of using ZeroMQ is that we can write
different processes in different languages. Thus, we can, for
example, implement a process in Node.js that sends messages to and
requests services from a process written in Python.

In this example, we will use ZeroMQ to accomplish (at least) two
things:

  1. We’ll run code written in one language from code written in a
    different language. In the example code that follows, from a
    client written in JavaScript/Node.js, we’ll request a service
    from code written in Python. That gives us a way to request
    services that are available, for example, in
    the Lxml XML package
    or in
    the SciPy and Numpy numerical and scientific computing packages.
  2. We’ll be able to start up more than one server or worker,
    whatever, and balance load across them. That should help us
    increase through-put under load, while avoiding the Python GIL
    (global interpreter lock) problem that only allows a single
    thread to execute in Python native code at any one time. (See
    this for more on that issue:
    https://wiki.python.org/moin/GlobalInterpreterLock.)

Debugging — A few clues:

  • You can of course trace execution by using print statements in
    Python code.
  • Depending on the structure of your code, you might consider
    implementing a decorator that traces the entry and exit from a
    decorated function. You can find help with decorators here:
    http://www.davekuhlman.org/python_book_01.html#decorators-and-how-to-implement-them.
  • The Python debugger pdb works fine. Actually, I typically use
    ipdb (https://pypi.python.org/pypi/ipdb). You will want to
    consider running the module which you are debugging in a separate
    window/session of its own so that debug commands and results are
    not mixed in with other output.

6.2.1 A simple round robin load distribution

In our example, the Node.js module makes multiple requests in the form
of ZeroMQ messages that go to a “broker”, which passes them along to
a Python worker module. If we start up more than one worker
processes, these requests will be forwarded, round-robin style, to
one or another worker.

Here is our Node.js client:

#!/usr/bin/env node

/*
A ZeroMQ client implemented in Node.js that requests an XML service
from a separte process.
*/

// Hello World client in Node.js
// Connects REQ socket to tcp://localhost:5559
// Sends “Hello” to server, expects “World” back

var fileList = [
[‘Data/data01.xml’, ‘person’, ],
[‘Data/data01.xml’, null, ],
[‘Data/data02.xml’, null, ],
[‘Data/data03.xml’, null, ],
[‘Data/missing_file.xml’, null, ],
[‘Data/data04.xml’, null, ],
[‘Data/data05.xml’, null, ],
[‘Data/data06.xml’, null, ],
];

function run() {
var zmq = require(‘zmq’),
requester = zmq.socket(‘req’),
maxNbr = fileList.length;

requester.connect(‘tcp://localhost:5559’);
var replyNbr = 0;
requester.on(‘message’, function(msg) {
var content = msg.toString();
// console.log(‘content:’, content);
content = JSON.parse(content);
console.log(‘got reply’, replyNbr, ‘file name:’, content[0],
‘filter: ‘, content[1],
‘ count:’, content[2]);
replyNbr += 1;
if (replyNbr >= maxNbr) {
console.log(‘finished and closing socket’);
requester.close();
}
});

for (var idx = 0; idx < maxNbr; ++idx) {
var filename = fileList[idx][0],
filter = fileList[idx][1],
payload = ;
payload = JSON.stringify(payload);
requester.send(payload);
}
}

run();

And, this is the Python broker that acts like an intermediary
between clients and one or more workers:

#!/usr/bin/env python

“””
A broker/intermediary implemented in Python that forwards messages
from clients to any one of connected workers.
“””

# Simple request-reply broker
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import zmq

def run():
# Prepare our context and sockets
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
backend = context.socket(zmq.DEALER)
frontend.bind(“tcp://*:5559”)
backend.bind(“tcp://*:5560”)
# Initialize poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
# Switch messages between sockets
count = 0
while True:
count += 1
socks = dict(poller.poll())
if socks.get(frontend) == zmq.POLLIN:
message = frontend.recv_multipart()
#import ipdb; ipdb.set_trace()
print ‘{}. broker: frontend –> backend msg: “{}”‘.format(
count, message[2])
backend.send_multipart(message)
if socks.get(backend) == zmq.POLLIN:
message = backend.recv_multipart()
print ‘{}. broker: backend –> frontend msg len: “{}”‘.format(
count, len(message[2]))
frontend.send_multipart(message)

if __name__ == ‘__main__’:
run()

Finally, here is the Python worker that actually uses Lxml to
provide XML processing capabilities:

#!/usr/bin/env python

“””
A ZeroMQ worker implemented in Python that:
– receives a message that identifies an XML document;
– uses Lxml to parse the document and cound the elements in it;
– sends a reply message that contains the count of the elements in
the document.
“””

import zmq
from lxml import etree
#import json
import re

def count_elements(root, tagfilter_pat):
if (root.tag is not etree.Comment and
(tagfilter_pat is None or
tagfilter_pat.search(root.tag)is not None)):
count = 1
else:
count = 0
for node in root.iterdescendants():
if (node.tag is not etree.Comment and
(tagfilter_pat is None or
tagfilter_pat.search(node.tag)is not None)):
count += 1
return count

def run():
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect(“tcp://localhost:5560”)
while True:
payload = socket.recv()
#import ipdb; ipdb.set_trace()
payload = json.loads(payload)
filename = payload[‘filename’]
tagfilter = payload.get(‘filter’)
if tagfilter is not None:
tagfilter_pat = re.compile(tagfilter)
else:
tagfilter_pat = None
print(“Received request — filename: %s filter: %s” % (
filename, tagfilter, ))
try:
doc = etree.parse(filename)
except IOError:
doc = None
if doc is None:
count = -1
else:
root = doc.getroot()
count = count_elements(root, tagfilter_pat)
print ‘sending — name: {} count: {}’.format(filename, count)
payload = (filename, tagfilter, count)
#payload = json.dumps(payload)
#socket.send(payload)
socket.send_json(payload)

if __name__ == ‘__main__’:
run()

Notes:

  • The above code will also handle multiple clients, if we were to
    start up those multiple Node.js processes.
  • I’ve written this example in Node.js and Python. But, we could
    also use ZeroMQ to communicate across other languages as well. A
    few of the languages supported by ZeroMQ are: C, PHP, Python, Lua,
    Haxe, C++, C#, Common Lisp, Delphi, Erlang, F#, Felix, Haskell,
    Java, Objective-C, Ruby, Ada, Basic, Clojure, Go, Haxe, Node.js,
    ooc, Perl, and Scala. Check here for the binding for your
    language of interest:
    http://zeromq.org/bindings:_start.
  • In this code, we are using a simple round robin mechanism for
    distributing work across our worker processes. That will not
    always be optimal or what we want. You can look at the ZeroMQ
    guide to learn about other options.
  • For my testing, I used the Node.js language binding that can be
    installed using NPM (https://www.npmjs.com/package/zmq), and the
    Python zmq language binding that is part of the Anaconda
    Python distribution (https://store.continuum.io/cshop/anaconda/).
  • During my testing, I started the client, broker, and worker
    processes, each in its own bash session.

6.2.2 A load balancing distribution of tasks

The previous example sent a task to a worker, even if that worker
was not yet finished with its previous task. In this next example,
the broker will forward a request to a worker only if that worker
has signaled that it is finished with it’s previous task, if it had
one, and that it is ready for its next task.

Again, so as to show how to request services implemented in Python
from Node.js, our client is written in Node.js and the broker and
workers are written in Python.

Here is the Node.js client:

#!/usr/bin/env node

/*
Load-balancing broker

Node.js client for the load-balancing broker/worker.
Send requests asking worker to count elements in an XML document.
Optionally send a pattern used to filter elements to me counted.
*/

var zmq = require(‘zmq’)
, frontAddr = ‘tcp://127.0.0.1:12346’
, log = console.log;

var fileList = [
// [ filename, filter-pattern ]
[‘Data/data02.xml’, null, ],
[‘Data/data03.xml’, null, ],
[‘Data/data01.xml’, ‘person’, ],
[‘Data/data01.xml’, null, ],
[‘Data/missing_file.xml’, null, ],
[‘Data/data04.xml’, null, ],
[‘Data/data05.xml’, null, ],
[‘Data/data06.xml’, null, ],
[‘Data/data01.xml’, ‘person’, ],
[‘Data/data01.xml’, null, ],
[‘Data/data02.xml’, null, ],
[‘Data/data03.xml’, null, ],
[‘Data/missing_file.xml’, null, ],
[‘Data/data04.xml’, null, ],
[‘Data/data05.xml’, null, ],
[‘Data/data06.xml’, null, ],
[‘Data/data14.xml’, null, ],
];

function clientProcess(ident) {
var sock = zmq.socket(‘req’)
, maxNbr = fileList.length
;
sock.identity = “Client-” + ident;
sock.connect(frontAddr);

// Start listening for replies.
var replyNbr = 0;
sock.on(‘message’, function(payload) {
replyNbr += 1;
var args = Array.apply(null, arguments);
data = JSON.parse(payload);
log(replyNbr + ‘. client received filename: ‘ + data.filename +
‘ count: ‘ + data.count);
if (replyNbr >= maxNbr) {
log(‘client finished’);
sock.close();
}
});

// Send requests to the broker.
for (var idx = 0; idx < fileList.length; ++idx) {
var filename = fileList[idx][0],
filter = fileList[idx][1],
payload = ;
payload = JSON.stringify(payload);
var lineno = idx + 1;
log(lineno + ‘. client sending ‘ + payload);
sock.send(payload);
}
}

function main() {
var args = process.argv;
if (args.length != 3) {
process.stderr.write(‘usage: test03_client.js <client_number>n’);
process.exit(1);
}
var ident = args[2];
clientProcess(ident);
}

main();

Notes:

  • Notice how we start up the listener (sock.on(‘message’ …)
    before sending requests to the broker. Initially, I started the
    listener after sending those requests and that version was,
    intermittently, missing and did not receive the first reply.

Here is the broker written in Python:

#!/usr/bin/env python

“””
Load-balancing broker

Clients implemented by test03_client.js and test03_client.py.
Workers implemented in test03_worker.py.
“””

import zmq

FrontAddress = ‘tcp://127.0.0.1:12346’
BackAddress = ‘tcp://127.0.0.1:12345’

def main():
“””Load balancer main loop.”””
# Prepare context and sockets
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind(FrontAddress)
backend = context.socket(zmq.ROUTER)
backend.bind(BackAddress)
# Initialize main loop state
workers = []
poller = zmq.Poller()
# Only poll for requests from backend until workers are available
poller.register(backend, zmq.POLLIN)
print ‘broker waiting’
while True:
sockets = dict(poller.poll())
if backend in sockets:
# Handle worker activity on the backend
request = backend.recv_multipart()
worker, empty, client = request[:3]
print ‘broker received request: {}’.format(request)
if not workers:
# Poll for clients now that a worker is available
poller.register(frontend, zmq.POLLIN)
workers.append(worker)
if client != b”READY” and len(request) > 3:
# If client reply, send rest back to frontend
empty, reply = request[3:]
frontend.send_multipart([client, b””, reply])
if frontend in sockets:
# Get next client request, route to last-used worker
data = frontend.recv_multipart()
print ‘broker received request: {}’.format(data)
client, empty, request = data
worker = workers.pop(0)
backend.send_multipart([worker, b””, client, b””, request])
if not workers:
# Don’t poll clients if no workers are available
poller.unregister(frontend)
# Clean up
backend.close()
frontend.close()
context.term()

if __name__ == “__main__”:
main()

And, here is the worker, also written in Python:

#!/usr/bin/env python

“””
Worker process for the load-balancing broker.

A worker process implemented in Python for the load-balancing broker.
– Receives request message.
– Counts elements in XML document, possibly filtering elements by tag.
– Sends reply message with results.
“””

import sys
import re
import zmq
import json
from lxml import etree

BackAddress = ‘tcp://127.0.0.1:12345’

def count_elements(root, tagfilter_pat):
if (root.tag is not etree.Comment and
(tagfilter_pat is None or
tagfilter_pat.search(root.tag)is not None)):
count = 1
else:
count = 0
for node in root.iterdescendants():
if (node.tag is not etree.Comment and
(tagfilter_pat is None or
tagfilter_pat.search(node.tag)is not None)):
count += 1
return count

def worker_task(ident):
“””Worker task, using a REQ socket to do load-balancing.”””
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u”Worker-{}”.format(ident).encode(“ascii”)
socket.connect(BackAddress)
# Tell broker we’re ready for work
socket.send(b”READY”)
print ‘Worker {} sent READY’.format(ident)
while True:
address, empty, request = socket.recv_multipart()
payload = request.decode(‘ascii’)
payload = json.loads(payload)
filename = payload[‘filename’]
tagfilter = payload.get(‘filter’)
if tagfilter is not None:
tagfilter_pat = re.compile(tagfilter)
else:
tagfilter_pat = None
try:
doc = etree.parse(filename)
except IOError:
doc = None
if doc is None:
count = -1
else:
root = doc.getroot()
count = count_elements(root, tagfilter_pat)
payload = {
‘filename’: filename,
‘filter’: tagfilter,
‘count’: count,
}
payload = json.dumps(payload)
print ‘worker {} sending payload: {}’.format(ident, payload)
socket.send_multipart([address, b”, payload])

def main():
args = sys.argv[1:]
if len(args) != 1:
sys.exit(‘usage: test03_worker.py <task_number>’)
ident = args[0]
worker_task(ident)

if __name__ == “__main__”:
main()

Notes:

  • Our worker uses Lxml to count the elements in a document. In
    fact, the point of this example is to show that, from Node.js, we
    can request XML processing done in Python. Of course, there are
    other kinds of services and processing for which Python is a good
    choice and which we’d like to be able to request from, e.g.,
    Node.js as well as other languages, in particular, scientific and
    numerical processing with SciPy and Numpy (http://scipy.org/).
  • You should note that it would be trivial to replace the processing
    done by this worker with some other functionality entirely. A few
    hints:

    • The payload in the messages passed into this worker is JSON;
      specifically, it’s a dictionary that could be adapted so as to
      carry other parameters.
    • You might want to include a key-value pair in this dictionary
      that specifies which of several functions is to be called.

This section discusses several strategies for running parallel
python processes behind a Web server.

7.2 Python parallel processing behind nodejs

When we implement a Web site with Nodejs, Nodejs gives us parallel
processing almost with no extra effort. This is because, although
a Nodejs Web server handles all requests in a single thread, we can
use the Nodejs Cluster module to distribute the handling of requests
across multiple processes. Nodejs uses a separate process for each
HTTP request (Web socket and AJAX requests not included?). Thus if
we use the Nodejs cluster add-on, we get separate, parallel
processes and load balancing.

Web application development is not a goal of this document, but
there is plenty help and lots of docs at http://nodejs.org and sites
that it links to.

What’s left to do is to call Python. Since Nodejs is written in
JavaScript, this requires some kind of foreign function call. One
solution would be to use a message based system, for example ZeroMQ
(http://zeromq.org/). zerorpc, which is a package built on top
of ZeroMQ, looks hopeful (see: http://zerorpc.dotcloud.com/).

Here is an example of JavaScript (running under Nodejs, say),
calling a method in a class written in Python:

var zerorpc = require(“zerorpc”);

var client = new zerorpc.Client();
client.connect(“tcp://127.0.0.1:4242”);

client.invoke(“hello”, “RPC”, function(error, res, more) {
console.log(res);
});

And, here is Python code that could be called by the above:

import zerorpc

class HelloRPC(object):
def hello(self, name):
return “Hello, %s” % name

s = zerorpc.Server(HelloRPC())
s.bind(“tcp://0.0.0.0:4242”)
s.run()

What’s left to do is to make sure (1) that each Nodejs process has
its own Python process (so that compute intensive, long-running
Python code (for example, those that result in complex calls to
Numpy/SciPy) do not wait on each other and become slowed down by
conflict over the same Python GIL (global interpreter lock) and (2)
that the Python processes, once started, stay alive, because
starting a process is slow.

Erlang does multiprocessing; Erlang enables us to communicate
between processes; Erlang with Erlport enables us to create and
communicate with Python processes. So, why not try multiprocessing
in Python with an Erlang controller of some kind?

A few clarifications:

  • Erlport/Python processes are different from the processes that are
    internal to Erlang and that Erlang enables us to create and use.
    Erlport/Python processes are OS processes, not Erlang processes. They are much
    heavier weight that Erlang processes, and therefore, are slower to
    create. An implication of this, if we want to make many requests,
    is that we will want to create a pool of these processes and reuse
    them when requested.
  • Erlang programming is based on the actor model. This means that
    Erlang programs, even local ones, are based on a model that
    implements and creates separate processes that do not share
    resources (memory, for example) and then sending messages between
    those processes. And so, a multiprocessing and remote/distributed
    processing application implemented in a way in which external (OS)
    processes send messages between each other seems to be a good fit
    with Erlang and its capabilities.

We’ll look at several examples in this document:

  1. The first is a simple one that creates a single Erlport/Python
    process and then sends it requests and receives results back from
    it.
  2. Next we’ll write an Erlang program that creates a pool of
    Erlport/Python processes and sends a series of requests to an
    available process, but waits for a process to become available if
    all processes in the pool are busy.
  3. And, finally, we’ll implement something like the above pool of
    processes, but with the use of Erlang behaviors. One of the
    benefits to be gained from this is that, if one of our
    Erlport/Python processes dies, a new process will be started to
    replace it.

All of our examples will use the same Python code. Here it is:

#!/usr/bin/env python

“””
Synopsis:
Sample math functions for use with Erlang and Erlport.
Details:
test_01 — Solve the continuous algebraic Riccati equation, or
CARE, defined as (A’X + XA – XBR^-1B’X+Q=0) directly using a
Schur decomposition method.
“””

import numpy as np
from scipy import linalg
from erlport.erlterms import Atom
#import json

def test_01(m, n):
a = np.random.random((m, m))
b = np.random.random((m, n))
q = np.random.random((m, m))
r = np.random.random((n, n))
print ‘(test_01) m: {} n: {}’.format(m, n, )
result = linalg.solve_continuous_are(a, b, q, r)
return result

def run(m=4, n=3):
result = test_01(m, n)
#print result
#json_result = json.dumps(result.tolist())
return (Atom(‘ok’), result.tolist())

def main():
run()

if __name__ == ‘__main__’:
main()

Notes:

  • The erlport package must be located where Python can import
    it, and Numpy and Scipy must be installed. Once again,
    I’m using the Anaconda distribution from Continuum Analytics
    (see: http://www.continuum.io/).
  • The function test uses Numpy to create several arrays of
    random numbers, then uses linalg in SciPy to solve the
    continuous algebraic Riccati equation, or CARE, defined as (A’X +
    XA – XBR^-1B’X+Q=0) directly using a Schur decomposition method.
  • The function main calls function test, then returns a
    tuple containing the Erlang atom “ok” and the solution array after
    converting it to a list. We convert the Numpy array to a
    Python list before returning it, because Erlport understands
    Python lists but not Numpy arrays.

8.1 A simple call from Erlang into Python

And, here is a simple Erlang program that uses that Python sample
with the help of Erlport:

-module(erlport_01).
-export([main/0, show_list/2]).

main() ->
= python:start(),
= python:call(Pid, ‘py_math_01’, main, []),
show_list(Result, 1),
ok.

show_list([], _) -> ok;
show_list([Item|Items], Count) ->
io:format(“~p. Item: ~p~n”, [Count, Item]),
show_list(Items, Count + 1).

Notes:

  • We use the Erlport Python support to call function main in
    python module py_math_01.
  • The show_list/2 function prints out each item in the list
    returned from py_math_01.main().

In the Erlang interactive shell erl we can compile and then run
this as follows:

11> c(erlport_01).

12> erlport_01:main().
1. Item: [12.74527763335136,-4.514001033364517,-7.4452420386061835,
5.7441252569184345]
2. Item: [-5.795658295009697,3.897769387542307,4.148522353989249,
-3.1221815191228965]
3. Item: [-7.157830191325373,4.088737828859971,8.493144407323305,
-6.348281687731655]
4. Item: [3.996836318360595,-2.353597255054639,-3.098202007414951,
3.5956798233304914]
ok

8.2 Erlang and a simple pool of Erlang/Python processes

In this example, we implement a pool of Erlang+Python processes so
that we can request a process from the pool of processes (and wait
until one is available, if necessary), use it, and then return it to
the pool. The processes in the pool are actually Erlang processes,
however, each of those Erlang processes holds (remembers the PID or
process identifier) of a Python process. We create each Python
process with Erlport.

Here is our Erlang code that implements the pool of processes:

-module(erlport_04).
-export([
init/0,
start/3,
stop/0,
rpc/1
]).

init() ->
ets:new(pipelinetable01, [named_table]),
ok.

%
% Args:
% NumProcesses — (int) number of processes to put in the pool.
% PythonModule — (atom) the name of the Python module.
% ProcessWaitTime — (int) number of milliseconds to wait if all processes
% are busy.
%
start(NumProcesses, PythonModule, ProcessWaitTime) ->
PyProcPids = start_python_processes(NumProcesses, PythonModule, []),
PoolPid = spawn(fun() -> pool_loop(PyProcPids, ProcessWaitTime) end),
ets:insert(pipelinetable01, ),
ok.

stop() ->
rpc(stop_python),
ok.

rpc(Request) ->
case Request of
->
[ | _] = ets:lookup(pipelinetable01, poolpid),
PoolPid ! ,
receive
->
PyProcPid ! },
receive
->
PoolPid ! ,
case Result of
->
;
_ ->
unknown_result
end;
Msg ->

end;
_ ->
error
end;
get_pypid ->
[ | _] = ets:lookup(pipelinetable01, poolpid),
PoolPid ! ,
receive
->

end;
->
[ | _] = ets:lookup(pipelinetable01, poolpid),
PoolPid ! ,
receive
ok -> ok
end;
stop_python ->
[ | _] = ets:lookup(pipelinetable01, poolpid),
PoolPid ! ,
receive
ok -> ok
end
end.

pool_loop(PyProcPids, ProcessWaitTime) ->
receive
->
PyProcPids1 = [Proc | PyProcPids],
pool_loop(PyProcPids1, ProcessWaitTime);
->
case PyProcPids of
[] ->
% Give it a chance to return a process to the pool.
timer:sleep(ProcessWaitTime),
self() ! ,
pool_loop(PyProcPids, ProcessWaitTime);
[PyProcPid | PyProcPids1] ->
From ! ,
pool_loop(PyProcPids1, ProcessWaitTime)
end;
->
stop_python_processes(PyProcPids),
From ! ok,
ok
end.

python_loop(PyPid, PythonModule) ->
receive
} ->
Result = python:call(PyPid, PythonModule, Function, Args),
From ! ,
python_loop(PyPid, PythonModule);
->
python:stop(PyPid),
From ! ok
end.

start_python_processes(0, _, PyProcPids) ->
PyProcPids;
start_python_processes(N, PythonModule, PyProcPids) ->
= python:start(),
PyProcPid = spawn(fun() -> python_loop(PyPid, PythonModule) end),
io:format(“Started Erlang/Python process — PyProcPid: ~p~n”, [PyProcPid]),
start_python_processes(N – 1, PythonModule, [PyProcPid | PyProcPids]).

stop_python_processes([]) -> ok;
stop_python_processes([PyProcPid|PyProcPids]) ->
io:format(“Stopping Erlang/Python process — PyProcPid: ~p~n”, [PyProcPid]),
PyProcPid ! ,
stop_python_processes(PyProcPids).

Notes:

  • Before using the above code, we compile it with erlc.
  • init/0 sets up an ETS table that enables us to remember the
    process ID of the pool.
  • The process pool is itself a process. We use it by sending it
    messages to pop (get) and push (return) a Python process.
  • start/3 (1) creates the Python processes, each of which is
    implemented by loop/2 and (2) creates a processes to hold
    those Python processes. It’s this second process from which we’ll
    request the next available Python process, and so we save its
    process ID in the ETS table.
  • rpc/1 implements our interface or API that enables us to make
    our requests (remote procedure calls) to the Python processes and
    get results back.
  • We might ask: Why is pool_loop/2 implemented as a process
    rather than an ordinary function. The first thing to recognize is
    that a process, in Erlang, is just a function that we spawn. And,
    next, consider that in the future implementing the ability to
    create and use a process may give us some flexibility later when
    we want to request and use a Python process, created in Erlang
    with Erlport, from a separate application (a separate OS process)
    or even from an application running on a different machine.

And, here is an Erlang script that can be run from the command line
and can be used to drive and test the above Erlang code:

#!/usr/bin/env escript
%% vim:ft=erlang:
%%! -sname magpie1 -setcookie dp01

main([“-h”]) -> usage();
main([“–help”]) -> usage();
main(Args) ->
ArgsSpec = [
{“p”, “processes”, yes},
{“o”, “outfile”, yes}
],
Args1 = erlopt:getopt(ArgsSpec, Args),
%io:format(“Args1: ~p~n”, [Args1]),
Opts = proplists:get_all_values(opt, Args1),
Args2 = proplists:get_all_values(arg, Args1),
%io:format(“Opts: ~p~n”, [Opts]),
%io:format(“Args2: ~p~n”, [Args2]),
NumProcs1 = proplists:get_value(“p”, Opts),
NumProcs2 = proplists:get_value(“processes”, Opts),
%io:format(“NumProcs1: ~p NumProcs2: ~p~n”, [NumProcs1, NumProcs2]),
NumProcs = case NumProcs1 of
undefined ->
case NumProcs2 of
undefined ->
2;
_ ->
list_to_integer(NumProcs2)
end;
_ ->
list_to_integer(NumProcs1)
end,
OutFile1 = proplists:get_value(“o”, Opts),
OutFile2 = proplists:get_value(“outfile”, Opts),
OutFile = case OutFile1 of
undefined ->
case OutFile2 of
undefined ->
standard_io;
_ ->
= file:open(OutFile2, [write]),
OutFile3
end;
_ ->
= file:open(OutFile1, [write]),
OutFile3
end,
= case Args2 of
[] ->
;
[NumReps] ->
;
[NumReps, M, N] ->

end,
run(NumProcs, NumReps1, M1, N1, OutFile),
case OutFile of
standard_io -> ok;
_ ->
file:close(OutFile),
ok
end.

run(NumProcs, Count, M, N, IoDevice) ->
io:format(“NumProcs: ~p Count: ~p M: ~p N: ~p~n”, [NumProcs, Count, M, N]),
erlport_04:init(),
erlport_04:start(NumProcs, py_math_01, 100),
run_n(1, Count, M, N, IoDevice),
erlport_04:stop(),
ok.

run_n(Count, Max, _, _, _) when Count > Max -> ok;
run_n(Count, Max, M, N, IoDevice) ->
%io:format(“M: ~p N: ~p~n”, [M, N]),
Result = erlport_04:rpc(),
io:format(IoDevice, “Result ~p:~n~p~n”, [Count, Result]),
run_n(Count + 1, Max, M, N, IoDevice).

usage() ->
io:format(standard_error, “usage:~n”, []),
io:format(standard_error, ” $ erlport_04.escript [options] iters [m n]~n”, []),
io:format(standard_error, “options:~n”, []),
io:format(standard_error, ” -p — number of processes~n”, []),
io:format(standard_error, ” -o filename — output file name~n”, []),
io:format(standard_error, “arguments:~n”, []),
io:format(standard_error, ” iters — number of iterations to run~n”, []),
io:format(standard_error, ” m n — size of array to create~n”, []),
ok.

Notes:

  • We use erlopt to parse command line options and arguments.
    It’s available here:
    erlopt getopt() for Erlang <https://code.google.com/p/erlopt/>
  • We call run/5 to perform initialization and create processes,
    then call run_n/5 some specified number of times, and finally
    stops (kills) the processes we created.
  • run_n/5 does a call to rpc/1 that uses one of the created
    processes to call into Python to perform a calculation using
    Numpy and return the result.

You can test the above code by running the following:

$ ./erlport_04.escript 3 4 3
NumProcs: 2 Count: 3 M: 4 N: 3
Started Erlang/Python process — PyProcPid: <0.39.0>
Started Erlang/Python process — PyProcPid: <0.41.0>
Result 1:

(test_01) m: 4 n: 3
Result 2:

(test_01) m: 4 n: 3
Result 3:

Stopping Erlang/Python process — PyProcPid: <0.41.0>
Stopping Erlang/Python process — PyProcPid: <0.39.0>

8.3 A pool of processes with failure recovery

What we try to gain in this example, over and above the previous
example, is the ability to recover from the failure of one of the
Erlang/Python processes. In this code, we ask that we be notified
when one of the Erlang/Python processes fails so that we can (1)
remove the old (dead) process from the pool and (2) create a new
process and insert it into the pool.

Here is the code that does this:

-module(erlport_06).
-export([
start/3,
start_link/3,
init/0,
stop/1,
restarter/1,
rpc/1,
pool_loop/3,
python_loop/2
]).

%
% Args:
% NumProcesses — (int) number of processes to put in the pool.
% PythonModule — (atom) the name of the Python module.
% ProcessWaitTime — (int) number of milliseconds to wait if all processes
% are busy.
%
start(NumProcesses, PythonModule, ProcessWaitTime) ->
init(),
PyProcs = start_python_processes(NumProcesses, PythonModule, []),
PoolPid = spawn(?MODULE, pool_loop, [
PyProcs, ProcessWaitTime, PythonModule]),
ets:insert(pipelinetable01, ),
RestarterPid = spawn(?MODULE, restarter, [PoolPid]),
RestarterPid.

start_link(NumProcesses, PythonModule, ProcessWaitTime) ->
init(),
PyProcs = start_python_processes(NumProcesses, PythonModule, []),
PoolPid = spawn(?MODULE, pool_loop, [
PyProcs, ProcessWaitTime, PythonModule]),
ets:insert(pipelinetable01, ),
RestarterPid = spawn(?MODULE, restarter, [PoolPid]),
RestarterPid.

init() ->
io:format(“creating ETS table~n”),
ets:new(pipelinetable01, [named_table]),
ok.

stop(RestarterPid) ->
rpc(stop_python),
RestarterPid ! shutdown,
ok.

rpc(Request) ->
case Request of
->
io:format(“call_python. F: ~p A: ~p~n”, [Function, Args]),
[ | _] = ets:lookup(pipelinetable01, poolpid),
PoolPid ! ,
receive
->
PyProcPid ! },
receive
->
PoolPid ! ,
case Result of
->
;
_ ->
unknown_result
end;
Msg ->

end;
_ ->
error1
end;
get_pypid ->
[ | _] = ets:lookup(pipelinetable01, poolpid),
PoolPid ! ,
receive
->

end;
->
[ | _] = ets:lookup(pipelinetable01, poolpid),
PoolPid ! ,
receive
ok -> ok
end;
exit ->
io:format(“(rpc) 1. testing exit~n”, []),
[ | _] = ets:lookup(pipelinetable01, poolpid),
PoolPid ! ,
receive
->
io:format(“(rpc) 2. testing exit. P: ~p~n”, [PyProcPid]),
exit(PyProcPid, test_failure),
ok;
_ ->
error2
end;
stop_python ->
[ | _] = ets:lookup(pipelinetable01, poolpid),
PoolPid ! ,
receive
ok -> ok
end
end.

%~ monitor_loop() ->
%~ receive
%~ {‘DOWN’, Ref, process, Pid, Reason} ->
%~ % remove this python process and start a new one to replace it.
%~ io:format(“Python process ~p because ~p crashed; restarting~n”,
%~ [Pid, Reason]),
%~ [ | _] = ets:lookup(pipelinetable01, poolpid),
%~ PoolPid ! ,
%~ monitor_loop()
%~ end.

restarter(PoolPid) ->
receive
{‘EXIT’, _Pid, normal} -> % not a crash
ok;
{‘EXIT’, _From, shutdown} ->
exit(shutdown); % manual termination, not a crash
{‘EXIT’, PyProcPid, Reason} ->
io:format(“Restarting Py process ~p/~p~n”,[PyProcPid, Reason]),
%
% Remove the old process that died from the pool.
% Restart a new erlang/python process to replace the one that died.
% Insert the new one in the pool.
%
PoolPid ! ,
restarter(PoolPid);
shutdown ->
ok
end.

pool_loop(PyProcs, ProcessWaitTime, PythonModule) ->
receive
->
PyProcs1 = [Proc | PyProcs],
pool_loop(PyProcs1, ProcessWaitTime, PythonModule);
->
case PyProcs of
[] ->
% Give it a chance to return a process to the pool.
timer:sleep(ProcessWaitTime),
self() ! ,
pool_loop(PyProcs, ProcessWaitTime, PythonModule);
[PyProc | PyProcs1] ->
From ! ,
pool_loop(PyProcs1, ProcessWaitTime, PythonModule)
end;
->
case lists:member(PyProcPid, PyProcs) of
true ->
% remove the python process from the pool.
PyProcs1 = proplists:delete(PyProcPid, PyProcs),
% create a new python process.
= python:start(),
PyProcPid1 = spawn_link(
erlport_05_py, python_loop, [PyPid, PythonModule]),
% add the new python process to the pool.
PyProcs2 = [PyProcPid1 | PyProcs1],
pool_loop(PyProcs2, ProcessWaitTime, PythonModule);
false ->
pool_loop(PyProcs, ProcessWaitTime, PythonModule)
end;
->
stop_python_processes(PyProcs),
From ! ok,
ok
end.

python_loop(PyPid, PythonModule) ->
receive
} ->
Result = python:call(PyPid, PythonModule, Function, Args),
From ! ,
python_loop(PyPid, PythonModule);
->
python:stop(PyPid),
From ! ok
end.

start_python_processes(0, _, PyProcs) ->
PyProcs;
start_python_processes(N, PythonModule, PyProcs) ->
= python:start(),
PyProcPid = spawn_link(?MODULE, python_loop, [PyPid, PythonModule]),
%PyProcsPid = spawn(fun() ->
% erlport_05_py:python_loop(PyPid, PythonModule) end),
io:format(“Started Erlang/Python process — PyProcPid: ~p PyPid: ~p~n”, [PyProcPid, PyPid]),
start_python_processes(N – 1, PythonModule, [PyProcPid | PyProcs]).

stop_python_processes([]) -> ok;
stop_python_processes([PyProcPid | PyProcs]) ->
io:format(“Stopping Erlang/Python process — PyProcPid: ~p~n”, [PyProcPid]),
PyProcPid ! ,
stop_python_processes(PyProcs).

Notes:

  • Notice how, in function start_python_processes, we use
    spawn_link rather than spawn to create our processes.
    That tells Erlang to send us (i.e. the process that called
    spawn_link) a message when any of these processes fails.
  • Then in our “start” function, we create a process to listen for
    and receive those failure messages. This process is implemented
    in function restarter.
  • Function restarter listens for those messages. If it receives
    a message that indicates a failure, it sends a message to the
    pool process telling it to remove the dead one and to create
    a new one and add it to the pool.
  • And, this capability is implemented by a new clause in the
    receive statement in function pool_loop.

And, here is the driver, an Erlang script that can be used to run
the above code:

#!/usr/bin/env escript
%% vim:ft=erlang:
%%! -sname crow1 -setcookie dp01

main([“-h”]) -> usage();
main([“–help”]) -> usage();
main(Args) ->
ArgsSpec = [
{“p”, “processes”, yes},
{“o”, “outfile”, yes}
],
Args1 = erlopt:getopt(ArgsSpec, Args),
Opts = proplists:get_all_values(opt, Args1),
Args2 = proplists:get_all_values(arg, Args1),
NumProcs1 = proplists:get_value(“p”, Opts),
NumProcs2 = proplists:get_value(“processes”, Opts),
NumProcs = case NumProcs1 of
undefined ->
case NumProcs2 of
undefined ->
2;
_ ->
list_to_integer(NumProcs2)
end;
_ ->
list_to_integer(NumProcs1)
end,
OutFile1 = proplists:get_value(“o”, Opts),
OutFile2 = proplists:get_value(“outfile”, Opts),
OutFile = case OutFile1 of
undefined ->
case OutFile2 of
undefined ->
standard_io;
_ ->
= file:open(OutFile2, [write]),
OutFile3
end;
_ ->
= file:open(OutFile1, [write]),
OutFile3
end,
= case Args2 of
[] ->
;
[NumReps] ->
;
[NumReps, M, N] ->

end,
run(NumProcs, NumReps1, M1, N1, OutFile),
case OutFile of
standard_io -> ok;
_ ->
file:close(OutFile),
ok
end.

run(NumProcs, Count, M, N, IoDevice) ->
io:format(“NumProcs: ~p Count: ~p M: ~p N: ~p~n”, [NumProcs, Count, M, N]),
RestarterPid = erlport_06:start(NumProcs, py_math_01, 100),
run_n(1, Count, M, N, IoDevice),
erlport_06:stop(RestarterPid),
ok.

run_n(Count, Max, _, _, _) when Count > Max -> ok;
run_n(Count, Max, M, N, IoDevice) ->
Result = erlport_06:rpc(),
io:format(IoDevice, “Result ~p:~n~p~n”, [Count, Result]),
run_n(Count + 1, Max, M, N, IoDevice).

usage() ->
io:format(standard_error, “usage:~n”, []),
io:format(standard_error, ” $ erlport_06.escript -h|–help — show this help~n”, []),
io:format(standard_error, ” $ erlport_06.escript [options] iters [m n]~n”, []),
io:format(standard_error, “options:~n”, []),
io:format(standard_error, ” -p — number of processes~n”, []),
io:format(standard_error, ” -o filename — output file name~n”, []),
io:format(standard_error, “arguments:~n”, []),
io:format(standard_error, ” iters — number of iterations to run~n”, []),
io:format(standard_error, ” m n — size of array to create~n”, []),
ok.

Notes:

  • We use erlopt to give some help with parsing command line
    arguments. It’s available here:
    https://code.google.com/p/erlopt/.
  • After collecting command line options and arguments, we call
    function run/5, which (1) initializes our processes and starts
    up restarter/1; (2) call the Python function the requested
    number of times; and, finally, (3) stop all the Erlang/Python
    processes and the restarter/1 process itself.

8.4 Behaviors and pools

Our purpose for adding the use of Erlang behaviors to the previous
example it to gain resiliency. Erlang behaviors as like templates
or frameworks. The behavior provides the structure, boilerplate,
and common functionality; we provide (only) the code that is
specific to our needs.

Our “pool server” creates and manages the pool of Erlang/Python
processes. When a client needs a process in which to run a Python
function, it can request a process (and wait for one to become
available, if necessary), and return the process to the pool when it
(the client) finishes with it. There are several features that we
are trying to gain over and above those provided by the previous
example: (1) If (when) a Python processes from the pool dies, it
will be replaced; (2) if the pool itself (i.e. the processes that
provides the pool services) dies, it will be restarted.

In a general sense, we will attempt to implement a server in Erlang
that responds to requests (1) to start a number of Erlang/Python
processes and the pool to hold them; (2) to get an Erlang/Python
process from the pool; (3) return an Erlang/Python process to the
pool; and (4) stop all Erlang/Python processes in the pool and stop
the pool server itself.

Here is the code for our process server, implemented using the
following:

  • Erlang supervisor OTP behavior
  • Erlang gen_server OTP behavior
  • The poolboy pool library, which is available here:
    https://github.com/devinus/poolboy.git

This is the supervisor module that sets up the pool of worker
processes:

%%%——————————————————————-
%%% @author Dave Kuhlman
%%% @copyright (C) 2015, Dave Kuhlman
%%% @doc
%%%
%%% @end
%%% Created : 2015-04-15 14:40:41.165827
%%%——————————————————————-
-module(process_server_02).

-behaviour(supervisor).

%% API
-export([start_link/1]).

%% Supervisor callbacks
-export([init/1]).

-define(SERVER, ?MODULE).

%%%===================================================================
%%% API functions
%%%===================================================================

%%——————————————————————–
%% @doc
%% Starts the supervisor
%%
%% @spec start_link(Args) -> | ignore |
%%
%%
%% @end
%%——————————————————————–
start_link() ->
supervisor:start_link(, ?MODULE, ).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================

%%——————————————————————–
%% @private
%% @doc
%% Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart frequency and child
%% specifications.
%%
%% @spec init(Args) -> } |
%% ignore |
%%
%% @end
%%——————————————————————–
init() ->
RestartStrategy = one_for_one,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = ,
PoolId = pool1,
PoolArgs = [
},
,
,

],
WorkerArgs = PyModule,
PoolSpec = poolboy:child_spec(PoolId, PoolArgs, WorkerArgs),
PoolSpecs = [PoolSpec],
}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

Notes:

  • The above module is the root of our supervision tree of processes.
    It uses the Erlang supervisor behavior. Our tree is rather
    short; it’s only one level deep: the above supervisor process and
    its children the worker processes (see below), which are the
    members of our pool of Erlang/Python processes.
  • The init/1 function does the work. It gets a specification of
    the processes in the pool from poolboy, then when it returns,
    asks the supervisor module to create those processes and add
    them to our supervision tree. See this for more on that:
    http://www.erlang.org/doc/man/supervisor.html#Module:init-1

And, here is the worker module. It actually does the call into the
Python process:

%%%——————————————————————-
%%% @author Dave Kuhlman
%%% @copyright (C) 2015, Dave Kuhlman
%%% @doc
%%%
%%% @end
%%% Created : 2015-04-13 13:27:42.569292
%%%——————————————————————-
-module(process_server_worker_02).

-behaviour(gen_server).
-behaviour(poolboy_worker).

%% API
-export([
start_link/1,
call_python/2
]).

%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).

-define(SERVER, ?MODULE).

-record(state, ).

%%%===================================================================
%%% API
%%%===================================================================

%%——————————————————————–
%% @doc
%% Starts the server
%%
%% @spec start_link() -> | ignore |
%% @end
%%——————————————————————–
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).

%%——————————————————————–
%% @doc
%% Call a function in the Python module.
%%
%% @spec call_python(Args) -> | ignore |
%% @end
call_python(Function, Args) ->
gen_server:call().

%%——————————————————————–
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%%——————————————————————–
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> |
%% |
%% ignore |
%%
%% @end
%%——————————————————————–
init(PyModule) ->
= python:start(),
State = #state,
io:format(“(worker:init) S: ~p~n”, [State]),
.

%%——————————————————————–
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% |
%% |
%% |
%% |
%% |
%%
%% @end
%%——————————————————————–
handle_call(, _From, State) ->
#state = State,
Result = python:call(PyPid, PyModule, Function, Args),
Reply = ,
;
handle_call(Request, From, State) ->
io:format(“(handle_call) error. R: ~p F: ~p S: ~p~n”,
[Request, From, State]),
.

%%——————————————————————–
%% @private
%% @doc
%% Handling cast messages
%%
%% @spec handle_cast(Msg, State) -> |
%% |
%%
%% @end
%%——————————————————————–
handle_cast(_Msg, State) ->
.

%%——————————————————————–
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> |
%% |
%%
%% @end
%%——————————————————————–
handle_info(_Info, State) ->
.

%%——————————————————————–
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%——————————————————————–
terminate(_Reason, State) ->
#state = State,
python:stop(PyPid),
ok.

%%——————————————————————–
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) ->
%% @end
%%——————————————————————–
code_change(_OldVsn, State, _Extra) ->
.

%%%===================================================================
%%% Internal functions
%%%===================================================================

Notes:

  • Most of the above is scaffolding that obeys the gen_server
    Erlang behavior.
  • When our supervisor module and the Erlang supervisor behavior
    creates its child processes, it will call start_link/1 in
    this module to create each child and link each one to the
    supervisor process.
  • gen_server will then call our init/1 callback function,
    where we create the Erlang/Python process using Erlport. It might
    help to keep in mind that the Erlang/Python process created by
    Erlport is an operating system processes, not an Erlang processes.
  • We save the Erlang/Python process (its process ID, actually) in
    the State. This is the state variable that is passed to the
    other callback functions; in effect, it gives us some global data.
    By doing this, we effectively attach the Erlang/Python process to
    this Erlang process.
  • The other callback function we are interested in is
    handle_call/3. It’s first argument specifies what we want it
    to do, specifically it’s a tuple containing three items: (1) the
    operation (an atom requesting call a Python function), (2) the
    Python module containing the Python function (an atom), and (3)
    the Python function (an atom).
  • And, what our handle_call/3 function does is use Erlport to
    call the Python function in its attached Erlang/Python process,
    and return the result in the format specified for the
    gen_server handle_call/3 callback. See the following for
    more on that:
    http://www.erlang.org/doc/man/gen_server.html#Module:handle_call-3

And, here is an example that shows the use the above modules:

1> = process_server_02:start_link().

2> W1 = poolboy:checkout(pool1).
<0.40.0>
3> W2 = poolboy:checkout(pool1).
<0.38.0>
4> gen_server:call(W1, ).
}
5> gen_server:call(W2, ).
}
6> poolboy:checkin(pool1, W1).
ok
7> W3 = poolboy:checkout(pool1).
<0.40.0>
8> gen_server:call(W3, ).
o
o
o

The following shows the commands only, without their output:

1> = process_server_02:start_link().
2> W1 = poolboy:checkout(pool1).
3> W2 = poolboy:checkout(pool1).
4> gen_server:call(W1, ).
5> gen_server:call(W2, ).
6> poolboy:checkin(pool1, W1).
7> W3 = poolboy:checkout(pool1).
8> gen_server:call(W3, ).
o
o
o

You can also do parallel processing in JavaScript using Node.js.
However, there are reservations — Node.js is single threaded.
It does parallelism by using callbacks. Therefore, you can get
tasks to run in parallel, but they will not utilize multiple
cores or multiple CPUs.

However, there is support for creating multiple Node.js
processes that run on multiple cores. To learn about that support,
go to https://www.npmjs.com/ and search for “multi core”.

You can learn more about Node.js here: https://nodejs.org/

I use the Async module to control flow. One of many flow
control patterns that Async supports in parallel execution. You
can install Async with npm:

$ npm install async

And, you can find it here: https://www.npmjs.com/package/async

Here is a JavaScript script that can be run with Node.js:

#!/usr/bin/env node

var fs = require(‘fs’);
var async = require(‘async’);
var log = console.log;

function reader(name, cb) {
content = fs.readFileSync(name);
cb(null, content);
}

function test1() {
async.map([‘tmp1.txt’, ‘tmp2.txt’], reader, function (err, results) {
log(‘map —————–‘);
log(‘map results:’);
for (var idx = 0; idx < results.length; idx++) {
log(‘length: ‘ + results[idx].length);
}
log(‘count: ‘ + results.length);
});
async.filter([‘tmp1.txt’, ‘tmp3.txt’, ‘tmp2.txt’], fs.exists, function(results) {
log(‘filter —————–‘);
log(‘filter results: %s’, results);
});
async.parallel([
function (cb) {
fs.readFile(‘tmp1.txt’, , function (err, content) {
content = content.toUpperCase();
content = ‘content (tmp1.txt): [[‘ + content + ‘]]’;
cb(null, content);
});
},
function (cb) {
fs.readFile(‘tmp2.txt’, , function (err, content) {
content = content.toUpperCase();
content = ‘content (tmp2.txt): [[‘ + content + ‘]]’;
cb(null, content);
});
},
],
function (err, results) {
log(‘parallel —————–‘);
log(‘parallel results: [[%s]]’, results);
});
}

function main() {
test1();
}

main();

Notes:

  • Async has quite a few additional functions in addition to
    map, filter, and parallel (used in the above example),
    which assist with flow control and the use of collections.
  • Documentation on these functions is in the README.md file that
    is included with the distribution. It’s also here:
    https://www.npmjs.com/package/async

Here are a few comments that might be of help in making a decision
on which of the above technologies to choose for the work that you
are planning:

  • The Python multiprocessing module (in the Python standard
    library) provides a base so that you can build the parallel
    processing model that you want. You will, however, have to build
    much of that yourself. The module enables you to write Python
    code that starts additional “child” process, as opposed to having
    to start those additional child processes is some more manual way,
    although I suppose you could always write scripts in whatever
    language that suits your needs which start up additional processes
    is some more automatic way. This module provides the Pipe and
    Queue classes for communication between “child” processes and
    between child processes and the process that starts them, but
    don’t forget, you could, instead, use ZeroMQ to implement that
    inter-process communication. The Python multiprocessing
    module also enables you to share memory between processes, in a
    way that is not safe (see Sharing state between processes.

  • Parallel Python seems to work well. It provides lots of
    power. It’s relatively quick to learn. Seems simple enough to
    use. It gives us multiprocessing/parallelism and distributed
    processing. So, we can make use of multiple cores and CPUs on the
    same machine; and can also run tasks across multiple machines
    (nodes on a network). There does not seem to be much active work
    on the code base itself. However, that might be because it works
    and there is no reason to change it.

  • IPython parallel processing has enough capabilities that you
    can spend lots of time learning how to use it if you choose to do
    so. However, IPython parallel processing makes it easy and even
    simple to do many of the common parallel tasks that many of us
    will want to do. Importantly, you can run the same task in
    parallel across different processes and you can distribute
    different tasks across processes. You can also share values
    between processes in synchronous and asynchronous ways that, I
    believe, but am not sure, are safe (e.g., from race conditions).

    The capabilities of IPython are so many and varied that it is hard
    to wrap your mind around them all. Read the IPython docs for an
    overview, and then look for the capabilities that you need. And,
    of course, if you need to, you could use ZeroMQ to communicate
    between processes. ZeroMQ is, in fact, used in the implementation
    of IPython itself. For certain purposes it perhaps make sense to
    think of IPython parallel processing as building on ZeroMQ, taking
    away some of ZeroMQ’s flexibility, and enabling us to do some of
    the things that could be done with ZeroMQ (given enough effort and
    smarts) but doing them in ways that are much easier and simpler.

    The work on the IPython code base seems quite active, so active
    that the project has gotten large enough and powerful enough that
    it is being split into separate projects. There will be support
    for scripting in Ruby and Perl and other languages as well. Both
    Python 2 and Python 3 are supported.

  • ZeroMQ provides a good deal of power for communicating
    (sending messages) between processes. Perhaps you can think of it
    as providing the low level plumbing that enable you to write the
    logic for inter-process communication and data transfer. You will
    need to take care of starting up the processes. And, you will
    need to bring them down, although it’s easy to imagine using
    ZeroMQ to send a message to a process asking it to exit.

    One huge benefit of ZeroMQ is that there are language bindings for
    a variety of programming languages, so you can create your
    mega-application out of processes each of which is implemented in
    a different language, for example, Python, Ruby,
    JavaScript/Node.js, and more. Our example above has a “main” or
    client process written in Node.js requesting services from a
    worker process implemented in Python. That enabled us to,
    effectively but not terribly conveniently, request the
    capabilities of Lxml from Node.js, even though Lxml is available
    for Python but not Node.js. We can imagine something analogous
    for the use of Numpy and SciPy, which are available for Python.

  • Erlang, Erlport process pools, etc.– Although there is
    plenty to learn in order to use IPython, Erlang (for non-Erlang
    programmers) means learning using a new and different language
    and, if you intend to take advantage of some of Erlang’s benefits,
    it entails learning a good deal of Erlang infrastructure, too.
    That has several implications: (1) Erlang must be installed on
    each machine where it will be used. (2) You (or someone) will
    need to learn Erlang and will have to write the code that controls
    the Python processes that do the parallel and distributed work.
    Erlport helps with that, but the burden of learning Erlang is
    still there. My sample code (above) may help get you started, but
    for any serious use, you will want an experienced Erlang
    programmer on your team.

    Elixir runs on top of Erlang, and, for some of us at least, will
    seem like a friendlier style of language. It still, for some of
    us, means learning a new language and installing both Erlang and
    Elixir on all your target machines. I have not tried using
    Erlport with Elixir.


Source