Multiprocessing¶
For real concurrency on mutli-core cpus use the package multiprocessing due to the global-interpreter-lock in the python interpreter
Parallel map¶
import urllib2
import multiprocessing
def fetch(url):
print "GET " + url
body = urllib2.urlopen(url).read()
print "FIN " + url; return (url, body)
urls = ['http://www.google.de', 'http://www.ccc.de', 'http://www.heise.de', 'http://www.codekid.net']
pool = multiprocessing.Pool()
result = pool.map_async(fetch, urls):
timeout = 3
try:
for url, body in result.get(timeout):
print "GOT body for " + url
except multiprocessing.TimeoutError:
print "Got timeout :("
pool.terminate()
Gevent¶
import gevent
import urllib2
def fetch(url):
print "GET " + url
body = urllib2.urlopen(url).read()
print "FIN " + url
return (url, body)
urls = ['http://www.google.de', 'http://www.ccc.de', 'http://www.heise.de', 'http://www.codekid.net']
jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs, timeout=2)
[job.get() for job in jobs]
Multiprocessing with Queues¶
from multiprocessing import Process, Queue
import commands
nr_of_threads = 4
def do_work(work_queue, result_queue):
while work_queue.qsize():
job = work_queue.get()
result_queue.put(["what", "ever"])
def parallel_work(jobs, nr_of_threads):
work_queue = Queue()
result_queue = Queue()
result = {}
for job in jobs:
work_queue.put(job)
if nr_of_threads > len(jobs):
nr_of_threads = len(jobs)
for i in range(nr_of_threads):
worker = Process(target=do_work, args=(work_queue,result_queue))
worker.start()
while len(result.keys()) < len(jobs):
data = result_queue.get()
print data
result[data[0]] = data[1]
return result
Fork Decorator¶
def forked(func):
def wrapped(*args, **kwargs):
import os
pid = os.fork()
if pid > 0: func(*args, **kwargs)
return wrapped
Thread Decorator¶
def threaded(name):
def callf(func):
def wrapped(*args, **kwargs):
import thread
def newfunc():
func(*args, **kwargs)
thread.start_new_thread(newfunc, ())
return wrapped
return callf
MapReduce¶
Disco MapReduce Framework with Python API
Local example for multi-core cpu
import sys
from multiprocessing import Pool
def split_words(line):
return [x.rstrip("\n") for x in line.split(" ")]
def myreduce(mylist):
"""
gets [['word1'], ['word1', 'word2', 'word1']]
returns {'word1': 3 'word2': 1}
"""
result = {}
for sublist in mylist:
for word in sublist:
try:
result[word] += 1
except KeyError:
result[word] = 1
return result
if len(sys.argv) < 2:
print sys.argv[0] + ": <file>"
sys.exit(1)
pool = Pool(processes=10)
lines = file(sys.argv[1]).xreadlines()
words = pool.map(split_words, lines)
word_count = myreduce(words)
for (word, count) in word_count.items():
print word + ": " + str(count)