Tuesday, December 10, 2013

Joining big files (tables) with Python, mrjob and Hadoop Map-Reduce

Hadoop Map-Reduce can simultaneously read lines (records) from all files in a HDFS folder and present lines from one or more files in read order to map tasks running on one or more Hadoop cluster nodes. Let us say that the HDFS folder contains two files: one is a sales transaction file and the other is an employee location file. We want to join the location to the transactions.

Here is an example sales transactions file:

joe,ink,20
joe,pencil,10
sam,whiteboard,120
joe,pencil,8
joe,pencil,18
joe,pencil,28
joe,ink,56
ron,ink,35
sam,paper,300
joe,tablet,2300
tom,staples,23
tom,pencil,20
tom,pen,50
tom,paper,300
ron,tablet,4500
sam,pen,30
joe,pen,50
ron,whiteboard,140
ron,whiteboard,300

Here is an example employee location file:

joe,Clifton
sam,Plainsboro
tom,Edison
paul,Piscataway
moe,Bridgewater
susan,Princeton

Here is the Python map-reduce program that will join the two. It requires the mrjob package which I think is fantastic for writing map-reduce jobs.

#!/usr/bin/env python
from mrjob.job import MRJob

class TestJob(MRJob):
    def __init__(self, *args, **kwargs):
        super(TestJob, self).__init__(*args, **kwargs)

    def mapper(self, _, line):
        # Mapper will either get a record from main or join table
        try: # See if it is main table record
            name, product, sale = line.split(',')
            yield name, (product, int(sale), '')
        except ValueError:
            try: # See if it is a join table record
                name, location = line.split(',')
                yield name, ('', '', location)
            except ValueError:
                pass # Record did not match either so skip the record

    def reducer(self, key, values):
        loc = None
        for product, sale, location in values:
            if location: loc = location
            else: yield key, (product, sale, loc)

if __name__ == '__main__':
    TestJob.run()

Here is the joined output:

"joe"    ["ink", 20, "Clifton"]
"joe"    ["ink", 56, "Clifton"]
"joe"    ["pen", 50, "Clifton"]
"joe"    ["pencil", 10, "Clifton"]
"joe"    ["pencil", 18, "Clifton"]
"joe"    ["pencil", 28, "Clifton"]
"joe"    ["pencil", 8, "Clifton"]
"joe"    ["tablet", 2300, "Clifton"]
"ron"    ["ink", 35, null]
"ron"    ["tablet", 4500, null]
"ron"    ["whiteboard", 140, null]
"ron"    ["whiteboard", 300, null]
"sam"    ["paper", 300, "Plainsboro"]
"sam"    ["pen", 30, "Plainsboro"]
"sam"    ["whiteboard", 120, "Plainsboro"]
"tom"    ["paper", 300, "Edison"]
"tom"    ["pen", 50, "Edison"]
"tom"    ["pencil", 20, "Edison"]
"tom"    ["staples", 23, "Edison"]

The reason this works is because Hadoop sorts the output from the mapper before presenting to the reducer. For lines coming from the employee location file, the product and sales amount is blank, putting it at the top of sales records for that employee. Here is the what is presented to the reducer:

"joe"    ["", "", "Clifton"]
"joe"    ["ink", 20, ""]
"joe"    ["ink", 56, ""]
"joe"    ["pen", 50, ""]
"joe"    ["pencil", 10, ""]
"joe"    ["pencil", 18, ""]
"joe"    ["pencil", 28, ""]
"joe"    ["pencil", 8, ""]
"joe"    ["tablet", 2300, ""]
"moe"    ["", "", "Bridgewater"]
"paul"    ["", "", "Piscataway"]
"ron"    ["ink", 35, ""]
"ron"    ["tablet", 4500, ""]
"ron"    ["whiteboard", 140, ""]
"ron"    ["whiteboard", 300, ""]
"sam"    ["", "", "Plainsboro"]
"sam"    ["paper", 300, ""]
"sam"    ["pen", 30, ""]
"sam"    ["whiteboard", 120, ""]
"susan"    ["", "", "Princeton"]
"tom"    ["", "", "Edison"]
"tom"    ["paper", 300, ""]
"tom"    ["pen", 50, ""]
"tom"    ["pencil", 20, ""]
"tom"    ["staples", 23, ""]

This is a left join, so employees in the location file but not in transactions are ignored. To change it to an outer join, change the reducer to:

def reducer(self, key, values):
        loc = None
        for product, sale, location in values:
            if location: loc = location
        yield key, (product, sale, loc)
For inner join:
def reducer(self, key, values):
        loc = None
        for product, sale, location in values:
            if location: loc = location
            elif loc: yield key, (product, sale, loc) 
Neat, isn't it? To guarantee that the location record is available at the top of sales records by employee, the joining operation must be done at the reduction step; it cannot be done at the combine step. Thus joining operations can become a bottleneck. Down the line operations such as grouping can be done by another reduction step, which I will illustrate in a future post.

4 comments: