Monday, February 4, 2013

Fun with Python's multiprocessing module

At my current client, they asked me to write a Python script to aggregate data from a RESTful web service.  Essentially, they have a web service end-point that takes a single customer ID as an argument and it returns some customer profile records as JSON.

They wanted my script to call the web service for each and every customer listed in a text file.  The script will be run from a scheduled job (cron, windows scheduled task, etc.).  Once productionalized, the text file will contain a large number of customer ID's (like a million or so).  All the results need to be stored in a single text file for the run.

My first thought was to use Python's threading features, but I came with 2 problems with this:

  • Python's threading isn't "real" threading.  A single Python process will run all threads on a single physical core regardless of how many cores the server has.  The other cores will not be utilized.
  • Python's urllib and urllib2 packages aren't even thread safe.
Now I know there's a third-party thread-safe urllib3 package, but that doesn't solve the multi-core issue and I wasn't looking to install anything beyond Python 2.7's standard library on the server.  After doing a little research, I came up with the idea of using Python's multiprocessing module.

The rest of this blog posting is a walkthrough of what I ended up building.

First things first - configuration

Since this is a script that will be run often and certain configuration settings will need to be updated from time to time by support staff, I decided to extract the configuration out to a separate configuration file.  Initially, I had the settings stored in a .ini file, but I later thought why not use Python for settings.  To that end, I have a settings.py file that holds a class that contains the settings:

#!/usr/bin/env python
"""
Settings file.  See job.py for more info.
"""

# -----------------------------------------------------------
# Edit the values in this Settings class to control settings.
# -----------------------------------------------------------

class Settings:
    """Settings as a class.  Why?  Because I'm lazy."""

    source_file = 'TestData.csv'

    profile_data_file = 'results.txt'

    log_file = 'log.txt'

    failed_customer_file = 'failed.txt'

    process_count = 10

    url = 'http://division-1.internal-server.com/rest-service/endpoint/{0}'

There is one input file (source_file).  This is the file that contains the list of customer ID's.  Then there are 3 output files:  profile_data_file, log_file and failed_customer_file.  The profile_data_file file is where the script will put the customer detail records that it receives from the web service.  The log_file is where the script will write logging messages for debugging problems after the fact.  Finally, failed_customer_file is where the script will write customer ID's of customers that the script fails to retrieve from the web services.

process_count is a setting that will allow you to specify how many parallel processes the script should spawn.

url is the url of the web service end-point.

The main job script is called job.py.  It imports the settings.py module and then creates a new class that inherits from the Settings class.

class Config(settings.Settings):
    """Configuration class"""

    _runtime = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
   
    @staticmethod
    def timestamp(filename):
        """Stamps a filename with the timestamp for this run"""
        parts = os.path.splitext(filename)
        return "%s-%s%s" % (parts[0], Config._runtime, parts[1])

This class has a static method called timestamp.  In the main job, I will call the timestamp method to add the date/time of the run to the output filenames.  So you might see something like:

outfile = open(Config.timestamp(Config.log_file), 'w')

This will open a file named 'log-20120204-112530.txt' for writing.

The Job class

The Job class is the main class that executes the job and spawns the worker processes.  It has the following methods:
  • __init__ - constructor, set's up a multiprocessing Pool and 3 Queue's
  • run - Kicks off the job
  • get_customers - Reads the customer file
  • get_requests - Transforms the customers list into a list of processes requests
  • process_customer_queue - Deals with responses from the web service
  • process_log_queue - Deals with log messages from the worker processes
  • process_exceptions_queue - Deals with exceptions that occur in the worker processes
  • log - Writes log messages to the log file
Here's a copy of the Job class:

class Job:
    """Job that performs the run."""

    def __init__(self):
        self._manager = multiprocessing.Manager()
        self._pool = multiprocessing.Pool(Config.process_count)
        self._customer_queue = self._manager.Queue()
        self._log_queue = self._manager.Queue()
        self._exception_queue = self._manager.Queue()

    def run(self):
        """Do it."""
        start_time = datetime.datetime.now()
        self.log('[Run started]')
        customers = self.get_customers()
        requests = self.get_requests(customers)
        result = self._pool.map_async(get_customer, requests)
        while not result.ready():
            self.process_customer_queue()
            self.process_log_queue()
            self.process_exceptions_queue()
        self._pool.close()
        self._pool.join()
        self.process_customer_queue()
        self.process_log_queue()
        self.process_exceptions_queue()
        self.log('[Run finished]')
        self.log('[Total runtime: %s]' % (datetime.datetime.now() - start_time))

    def get_customers(self):
        """Read the source file."""
        buf = [line.strip() for line in open(Config.source_file).readlines()[1:]]
        return buf

    def get_requests(self, customers):
        """Generate requests from customers."""
        requests = [{ \
            'customer': customer, \
            'data': self._customer_queue, \
            'log': self._log_queue, \
            'exceptions': self._exception_queue, \
            'url': Config.url, \
        } for customer in customers]
        return requests

    def process_customer_queue(self):
        """Pull messages off the data queue."""
        try:
            message = self._customer_queue.get_nowait()
        except Queue.Empty:
            return
        customer = message['customer']
        details = message['server_response']['details']
        outfile = open(Config.timestamp(Config.profile_data_file), 'a')
        for record in details:
            buf = "%s, %s, %s, %s\n" % (customer, record['id'], \
                record['relevanceScore'], record['relevanceRank'])
            outfile.write(buf)
        outfile.close()
            
    def process_log_queue(self):
        """Pull messages off the log queue."""
        try:
            message = self._log_queue.get_nowait()
        except Queue.Empty:
            return
        self.log(message)

    def process_exceptions_queue(self):
        """Pull messages off the exceptions queue."""
        try:
            message = self._exception_queue.get_nowait()
        except Queue.Empty:
            return
        customer = message['customer']
        exception = message['exception']
        self.log("EXCEPTION GETTING %s! - %s" % \
            (customer, str(exception)))
        failed_file = open(Config.timestamp(Config.failed_customer_file), 'a')
        buf = "%s\n" % customer
        failed_file.write(buf)
        failed_file.close()

    def log(self, message):
        """Write message to the log file."""
        logfile = open(Config.timestamp(Config.log_file), 'a')
        timestamp = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
        logfile.write("%s - %s\n" % (timestamp, message))
        logfile.close()

Some important things to highlight.  The constructor creates three queues:  customer_queue, log_queue, and exceptions_queue.  These queues are used for interprocess communication.  The constructor creates a Pool of worker processes.  Each process will communicate back to the parent process via these queues.  The customer_queue is responsible for holding the results of the web services calls.  The log_queue is responsible for holding log messages.  The exceptions_queue is responsible for holding exceptions that occur when worker processes have problems talking to the web service.

In the run method, the Job class will call the Pool class' map_async method.  The map_async method splits the requests list into multiple lists and passes each list to a separate worker process for processing. Then the run method goes into a loop, waiting for the worker processes to all complete.  While it's waiting, it will continually check the queues to see if any messages have been received.

The stand-alone function

The map_async method's first argument is the name of the function that each worker process should run the request list items with.  In this case, the name of that function is get_customer.  get_customer looks like this:

def get_customer(request):
    """Retrieve the customer details from the given request."""
    customer = request['customer']
    url = request['url']
    data = request['data']
    log = request['log']
    exceptions = request['exceptions']
    try:
        log.put("Requesting details for customer %s" % customer)
        req_url = url.replace('{0}', customer)
        request = urllib2.Request(req_url)
        response = urllib2.urlopen(request)
        buf = response.read()
        data_records = json.loads(buf)
        coupons.put({'customer: customer, 'server_response': data_records})
        log.put("Successfully retrieved %d details(s) for %s" % \
            (len(data_records), customer))
    except Exception, exc:
        exceptions.put({'customer': customer, 'exception': exc})

This function is called for each request in the worker processes.  It extracts queues from the request.  It uses the put method on each queue to pass information back to the parent process.

Pulling it all together

The last thing in my script file is this:

if __name__ == '__main__':
    # Let's do this!
    Job().run()

It's important to embed the Job().run() call inside the 'if __main__' branching.  This is because of how the multiprocessing internals work.  When the Pool.map_async method is executed, Python will create a number of sub processes.  When each process starts up, it's actually spawning a copy of the Python executable.  When that process comes up, it imports the job module and then starts calling the get_customer function over and over with the requests.  If the Job().run() were not embedded in a 'if __main__' branch, every worker process would spawn more Job classes.  Nothing good would come of that!

Hope you find this helpful.