Monday, December 23, 2013

A Basic Map Reduce Framework in Python

As I mentioned in my previous post mrjob is a great framework for writing Hadoop Map-Reduce jobs in Python. And there are several others such as Dumbo and Pydoop. However, if you work in a shop where you have to validate all external modules before use, you may want to create something from scratch for the quick wins even as you select and certify a framework.

Here is something I have been using. There is nothing to install or configure. It emphasizes transparency; there are no wrappers, and thus no worries of matching with Hadoop versions while getting full access to all Hadoop streaming options. You can chain multiple jobs, manipulate HDFS, and test individual parts with local, small files before running on Hadoop, all in one place.

The example below uses the data from my previous post. Notice that I am joining two files. The "sortOrder" forces the records from the file that is being joined to the top, and also identifies those records in the reducer. Note that if there are multiple records that match the main record, my code will only pick the last record from the joining file. This behavior can easily be adjusted to allow creation of as many records in the main table by storing joining file records in a list, or by aggregating joining fields through concatenation etc.

#!/usr/bin/env python

import sys
from itertools import groupby
from subprocess import call

def hadoop():
 call('hadoop fs -rm -r -skipTrash /user/cloudera/test/output', shell=True)

 cmd = 'hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.4.0.jar \
 -input /user/cloudera/test \
 -output /user/cloudera/test/output \
 -file ' + sys.argv[0] + ' \
 -mapper "' + sys.argv[0] + ' mapper" \
 -reducer "' + sys.argv[0] + ' reducer" '
 call(cmd, shell=True)
  
def mapper():
 for line in sys.stdin:
  try:
   person, product, sale = line.strip().split(',')
   print('%s\t%s\t%s\t%s\t%s' % (person, '1', product, sale, None))
  except ValueError:
   try:
    person, location = line.strip().split(',')
    print('%s\t%s\t%s\t%s\t%s' % (person, '0', None, None, location))
   except ValueError:
    continue

def lines():
 for line in sys.stdin:
  yield line.strip().split('\t')

def reducer():
 for key, values in groupby(lines(), lambda x: x[0]):
  count = total = 0
  location = ''
  for person, sortOrder, product, sale, locn in values:
   if sortOrder == '0': location = locn
   else:
    total += int(sale)
    count += 1
  if count: print(person, location, total)
   
def printUsage():
  sys.stderr.write('Usage: ' + sys.argv[0] + ' functionNametoRun\n')
  exit(1)
 
if __name__ == '__main__':
 if len(sys.argv) != 2: printUsage()
 try:
  locals()[sys.argv[1]]()
 except KeyError:
  sys.stderr.write('Error: No such function name: ' + sys.argv[1] + '\n')
  printUsage()

To chain jobs, simply create another Hadoop command in hadoop() and call new mapping and reducing functions from its command line. To add combiners, add -combiner to the command line in hadoop() to call your combiner function.

1 comment: