Monday, December 23, 2013

A Basic Map Reduce Framework in Python

As I mentioned in my previous post mrjob is a great framework for writing Hadoop Map-Reduce jobs in Python. And there are several others such as Dumbo and Pydoop. However, if you work in a shop where you have to validate all external modules before use, you may want to create something from scratch for the quick wins even as you select and certify a framework.

Here is something I have been using. There is nothing to install or configure. It emphasizes transparency; there are no wrappers, and thus no worries of matching with Hadoop versions while getting full access to all Hadoop streaming options. You can chain multiple jobs, manipulate HDFS, and test individual parts with local, small files before running on Hadoop, all in one place.

The example below uses the data from my previous post. Notice that I am joining two files. The "sortOrder" forces the records from the file that is being joined to the top, and also identifies those records in the reducer. Note that if there are multiple records that match the main record, my code will only pick the last record from the joining file. This behavior can easily be adjusted to allow creation of as many records in the main table by storing joining file records in a list, or by aggregating joining fields through concatenation etc.

#!/usr/bin/env python

import sys
from itertools import groupby
from subprocess import call

def hadoop():
 call('hadoop fs -rm -r -skipTrash /user/cloudera/test/output', shell=True)

 cmd = 'hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.4.0.jar \
 -input /user/cloudera/test \
 -output /user/cloudera/test/output \
 -file ' + sys.argv[0] + ' \
 -mapper "' + sys.argv[0] + ' mapper" \
 -reducer "' + sys.argv[0] + ' reducer" '
 call(cmd, shell=True)
  
def mapper():
 for line in sys.stdin:
  try:
   person, product, sale = line.strip().split(',')
   print('%s\t%s\t%s\t%s\t%s' % (person, '1', product, sale, None))
  except ValueError:
   try:
    person, location = line.strip().split(',')
    print('%s\t%s\t%s\t%s\t%s' % (person, '0', None, None, location))
   except ValueError:
    continue

def lines():
 for line in sys.stdin:
  yield line.strip().split('\t')

def reducer():
 for key, values in groupby(lines(), lambda x: x[0]):
  count = total = 0
  location = ''
  for person, sortOrder, product, sale, locn in values:
   if sortOrder == '0': location = locn
   else:
    total += int(sale)
    count += 1
  if count: print(person, location, total)
   
def printUsage():
  sys.stderr.write('Usage: ' + sys.argv[0] + ' functionNametoRun\n')
  exit(1)
 
if __name__ == '__main__':
 if len(sys.argv) != 2: printUsage()
 try:
  locals()[sys.argv[1]]()
 except KeyError:
  sys.stderr.write('Error: No such function name: ' + sys.argv[1] + '\n')
  printUsage()

To chain jobs, simply create another Hadoop command in hadoop() and call new mapping and reducing functions from its command line. To add combiners, add -combiner to the command line in hadoop() to call your combiner function.

Tuesday, December 10, 2013

Joining big files (tables) with Python, mrjob and Hadoop Map-Reduce

Hadoop Map-Reduce can simultaneously read lines (records) from all files in a HDFS folder and present lines from one or more files in read order to map tasks running on one or more Hadoop cluster nodes. Let us say that the HDFS folder contains two files: one is a sales transaction file and the other is an employee location file. We want to join the location to the transactions.

Here is an example sales transactions file:

joe,ink,20
joe,pencil,10
sam,whiteboard,120
joe,pencil,8
joe,pencil,18
joe,pencil,28
joe,ink,56
ron,ink,35
sam,paper,300
joe,tablet,2300
tom,staples,23
tom,pencil,20
tom,pen,50
tom,paper,300
ron,tablet,4500
sam,pen,30
joe,pen,50
ron,whiteboard,140
ron,whiteboard,300

Here is an example employee location file:

joe,Clifton
sam,Plainsboro
tom,Edison
paul,Piscataway
moe,Bridgewater
susan,Princeton

Here is the Python map-reduce program that will join the two. It requires the mrjob package which I think is fantastic for writing map-reduce jobs.

#!/usr/bin/env python
from mrjob.job import MRJob

class TestJob(MRJob):
    def __init__(self, *args, **kwargs):
        super(TestJob, self).__init__(*args, **kwargs)

    def mapper(self, _, line):
        # Mapper will either get a record from main or join table
        try: # See if it is main table record
            name, product, sale = line.split(',')
            yield name, (product, int(sale), '')
        except ValueError:
            try: # See if it is a join table record
                name, location = line.split(',')
                yield name, ('', '', location)
            except ValueError:
                pass # Record did not match either so skip the record

    def reducer(self, key, values):
        loc = None
        for product, sale, location in values:
            if location: loc = location
            else: yield key, (product, sale, loc)

if __name__ == '__main__':
    TestJob.run()

Here is the joined output:

"joe"    ["ink", 20, "Clifton"]
"joe"    ["ink", 56, "Clifton"]
"joe"    ["pen", 50, "Clifton"]
"joe"    ["pencil", 10, "Clifton"]
"joe"    ["pencil", 18, "Clifton"]
"joe"    ["pencil", 28, "Clifton"]
"joe"    ["pencil", 8, "Clifton"]
"joe"    ["tablet", 2300, "Clifton"]
"ron"    ["ink", 35, null]
"ron"    ["tablet", 4500, null]
"ron"    ["whiteboard", 140, null]
"ron"    ["whiteboard", 300, null]
"sam"    ["paper", 300, "Plainsboro"]
"sam"    ["pen", 30, "Plainsboro"]
"sam"    ["whiteboard", 120, "Plainsboro"]
"tom"    ["paper", 300, "Edison"]
"tom"    ["pen", 50, "Edison"]
"tom"    ["pencil", 20, "Edison"]
"tom"    ["staples", 23, "Edison"]

The reason this works is because Hadoop sorts the output from the mapper before presenting to the reducer. For lines coming from the employee location file, the product and sales amount is blank, putting it at the top of sales records for that employee. Here is the what is presented to the reducer:

"joe"    ["", "", "Clifton"]
"joe"    ["ink", 20, ""]
"joe"    ["ink", 56, ""]
"joe"    ["pen", 50, ""]
"joe"    ["pencil", 10, ""]
"joe"    ["pencil", 18, ""]
"joe"    ["pencil", 28, ""]
"joe"    ["pencil", 8, ""]
"joe"    ["tablet", 2300, ""]
"moe"    ["", "", "Bridgewater"]
"paul"    ["", "", "Piscataway"]
"ron"    ["ink", 35, ""]
"ron"    ["tablet", 4500, ""]
"ron"    ["whiteboard", 140, ""]
"ron"    ["whiteboard", 300, ""]
"sam"    ["", "", "Plainsboro"]
"sam"    ["paper", 300, ""]
"sam"    ["pen", 30, ""]
"sam"    ["whiteboard", 120, ""]
"susan"    ["", "", "Princeton"]
"tom"    ["", "", "Edison"]
"tom"    ["paper", 300, ""]
"tom"    ["pen", 50, ""]
"tom"    ["pencil", 20, ""]
"tom"    ["staples", 23, ""]

This is a left join, so employees in the location file but not in transactions are ignored. To change it to an outer join, change the reducer to:

def reducer(self, key, values):
        loc = None
        for product, sale, location in values:
            if location: loc = location
        yield key, (product, sale, loc)
For inner join:
def reducer(self, key, values):
        loc = None
        for product, sale, location in values:
            if location: loc = location
            elif loc: yield key, (product, sale, loc) 
Neat, isn't it? To guarantee that the location record is available at the top of sales records by employee, the joining operation must be done at the reduction step; it cannot be done at the combine step. Thus joining operations can become a bottleneck. Down the line operations such as grouping can be done by another reduction step, which I will illustrate in a future post.

Monday, December 9, 2013

Analyzing unstructured data with Apache Hive, Hadoop and RegexSerDe


Download IMDB movies list with countries to your favorite folder within HDFS on your Hadoop cluster. If you observe records in the file, the movie name is enclosed in quotes and is the first item on the record. The year follows enclosed in parenthesis. Following that there is optional information such as TV episode title, season, episode etc. within curly braces. The last item on the record is the country name.

"'Da Kink in My Hair" (2007) {Empty Bag Can't Stand Up (#1.4)}    Canada

These items must be parsed into fields to enable analysis, such as count of movies made in the year 2000 in the USA. One way to do it is by first creating a table within Hive with the entire record as one field, and then using Create Table as Select (CTAS) where the select uses regexp_extract function to parse fields. The shortcomings of this approach are (1) the CTAS table cannot be external and (2) you have just doubled the storage space required if you intend to keep the original external table.

A better approach is to create the table with a Regular Expressions Serializer/Derserializer class (RegexSerDe) that will parse fields from the record with regular expressions at the time of initial table create.

The RegexSerDe class is contained within the hive-contrib-*.jar file. On my Cloudera Hadoop (CDH) cluster this file is located in the /usr/lib/hive/lib/ folder. Before using the RegexSerDe class, you will need to tell Hive about this library by running the following command on hive prompt:

add jar /usr/lib/hive/lib/hive-contrib-0.10.0-cdh4.4.0.jar;

This will add the jar file as a resource for that session. To make this jar available automatically each time you start a hive session, you can edit the /etc/hive/conf/hive-site.xml file to add the following:
<property>
        <name>hive.aux.jars.path</name>
        <value>file:///usr/lib/hive/lib/hive-contrib-0.10.0-cdh4.4.0.jar</value>
</property>
The <value> can take multiple files separated by comma in case you are using other jars.

Now that the RegexSerDe class is available for use within Hive, we can create the table using RegexSerDe like so:
CREATE EXTERNAL TABLE (title string, year string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = '"([^"]+)"\\s+\\(([^)]+).+'
)
LOCATION '/user/smith/imdb';
The above assumes that the countries.list file is located within the /user/smith/imdb folder on HDFS. We are only parsing out the movie title and date. I will update the post in the future with code that parses all fields. Notice that RegexSerDe requires the "input.regex" pattern to match the entire record, thus the .+ at the end. Also note that Regex shorthand and escape such as \s is written as \\s because Hive's shell escape character is also \. Each field to be parsed is a capture group enclosed by parenthesis.