Multiprocessing

Parallel map

import urllib2
import multiprocessing

def fetch(url):
  print "GET " + url
  body = urllib2.urlopen(url).read()
  print "FIN " + url; return (url, body)

urls = ['http://www.google.de', 'http://www.ccc.de', 'http://www.heise.de', 'http://www.codekid.net']
pool = multiprocessing.Pool()
result = pool.map_async(fetch, urls):
timeout = 3

try:
  for url, body in result.get(timeout):
    print "GOT body for " + url
except multiprocessing.TimeoutError:
  print "Got timeout :("
  pool.terminate()

Gevent

import gevent
import urllib2

def fetch(url):
  print "GET " + url
  body = urllib2.urlopen(url).read()
  print "FIN " + url
  return (url, body)

urls = ['http://www.google.de', 'http://www.ccc.de', 'http://www.heise.de', 'http://www.codekid.net']
jobs = [gevent.spawn(fetch, url) for url in urls]

gevent.joinall(jobs, timeout=2)
[job.get() for job in jobs]

Multiprocessing with Queues

from multiprocessing import Process, Queue
import commands

nr_of_threads = 4

def do_work(work_queue, result_queue):
    while work_queue.qsize():
      job = work_queue.get()
      result_queue.put(["what", "ever"])

def parallel_work(jobs, nr_of_threads):
  work_queue = Queue()
  result_queue = Queue()
  result = {}

  for job in jobs:
      work_queue.put(job)

  if nr_of_threads > len(jobs):
      nr_of_threads = len(jobs)

  for i in range(nr_of_threads):
      worker = Process(target=do_work, args=(work_queue,result_queue))
      worker.start()

  while len(result.keys()) < len(jobs):
      data = result_queue.get()
      print data
      result[data[0]] = data[1]

  return result

Fork Decorator

def forked(func):
  def wrapped(*args, **kwargs):
    import os

    pid = os.fork()
    if pid > 0: func(*args, **kwargs)

  return wrapped

Thread Decorator

def threaded(name):
  def callf(func):
    def wrapped(*args, **kwargs):
      import thread

      def newfunc():
        func(*args, **kwargs)
      thread.start_new_thread(newfunc, ())
    return wrapped
  return callf

MapReduce

  • Disco MapReduce Framework with Python API

  • Local example for multi-core cpu

import sys
from multiprocessing import Pool

def split_words(line):
    return [x.rstrip("\n") for x in line.split(" ")]


def myreduce(mylist):
    """
    gets [['word1'], ['word1', 'word2', 'word1']]
    returns {'word1': 3 'word2': 1}
    """
    result = {}

    for sublist in mylist:
        for word in sublist:
            try:
                result[word] += 1
            except KeyError:
                result[word] = 1

    return result


if len(sys.argv) < 2:
    print sys.argv[0] + ": <file>"
    sys.exit(1)

pool = Pool(processes=10)
lines = file(sys.argv[1]).xreadlines()

words = pool.map(split_words, lines)
word_count = myreduce(words)

for (word, count) in word_count.items():
    print word + ": " + str(count)