Joining big files (tables) with Python, mrjob and Hadoop Map-Reduce
Hadoop Map-Reduce can simultaneously read lines (records) from all files in a HDFS folder and present lines from one or more files in read order to map tasks running on one or more Hadoop cluster nodes. Let us say that the HDFS folder contains two files: one is a sales transaction file and the other is an employee location file. We want to join the location to the transactions.
Here is an example sales transactions file:
joe,ink,20
joe,pencil,10
sam,whiteboard,120
joe,pencil,8
joe,pencil,18
joe,pencil,28
joe,ink,56
ron,ink,35
sam,paper,300
joe,tablet,2300
tom,staples,23
tom,pencil,20
tom,pen,50
tom,paper,300
ron,tablet,4500
sam,pen,30
joe,pen,50
ron,whiteboard,140
ron,whiteboard,300
Here is an example employee location file:
joe,Clifton
sam,Plainsboro
tom,Edison
paul,Piscataway
moe,Bridgewater
susan,Princeton
Here is the Python map-reduce program that will join the two. It requires the
mrjob package which I think is fantastic for writing map-reduce jobs.
#!/usr/bin/env python
from mrjob.job import MRJob
class TestJob(MRJob):
def __init__(self, *args, **kwargs):
super(TestJob, self).__init__(*args, **kwargs)
def mapper(self, _, line):
# Mapper will either get a record from main or join table
try: # See if it is main table record
name, product, sale = line.split(',')
yield name, (product, int(sale), '')
except ValueError:
try: # See if it is a join table record
name, location = line.split(',')
yield name, ('', '', location)
except ValueError:
pass # Record did not match either so skip the record
def reducer(self, key, values):
loc = None
for product, sale, location in values:
if location: loc = location
else: yield key, (product, sale, loc)
if __name__ == '__main__':
TestJob.run()
Here is the joined output:
"joe" ["ink", 20, "Clifton"]
"joe" ["ink", 56, "Clifton"]
"joe" ["pen", 50, "Clifton"]
"joe" ["pencil", 10, "Clifton"]
"joe" ["pencil", 18, "Clifton"]
"joe" ["pencil", 28, "Clifton"]
"joe" ["pencil", 8, "Clifton"]
"joe" ["tablet", 2300, "Clifton"]
"ron" ["ink", 35, null]
"ron" ["tablet", 4500, null]
"ron" ["whiteboard", 140, null]
"ron" ["whiteboard", 300, null]
"sam" ["paper", 300, "Plainsboro"]
"sam" ["pen", 30, "Plainsboro"]
"sam" ["whiteboard", 120, "Plainsboro"]
"tom" ["paper", 300, "Edison"]
"tom" ["pen", 50, "Edison"]
"tom" ["pencil", 20, "Edison"]
"tom" ["staples", 23, "Edison"]
The reason this works is because Hadoop sorts the output from the mapper before presenting to the reducer. For lines coming from the employee location file, the product and sales amount is blank, putting it at the top of sales records for that employee. Here is the what is presented to the reducer:
"joe" ["", "", "Clifton"]
"joe" ["ink", 20, ""]
"joe" ["ink", 56, ""]
"joe" ["pen", 50, ""]
"joe" ["pencil", 10, ""]
"joe" ["pencil", 18, ""]
"joe" ["pencil", 28, ""]
"joe" ["pencil", 8, ""]
"joe" ["tablet", 2300, ""]
"moe" ["", "", "Bridgewater"]
"paul" ["", "", "Piscataway"]
"ron" ["ink", 35, ""]
"ron" ["tablet", 4500, ""]
"ron" ["whiteboard", 140, ""]
"ron" ["whiteboard", 300, ""]
"sam" ["", "", "Plainsboro"]
"sam" ["paper", 300, ""]
"sam" ["pen", 30, ""]
"sam" ["whiteboard", 120, ""]
"susan" ["", "", "Princeton"]
"tom" ["", "", "Edison"]
"tom" ["paper", 300, ""]
"tom" ["pen", 50, ""]
"tom" ["pencil", 20, ""]
"tom" ["staples", 23, ""]
This is a left join, so employees in the location file but not in transactions are ignored. To change it to an outer join, change the reducer to:
def reducer(self, key, values):
loc = None
for product, sale, location in values:
if location: loc = location
yield key, (product, sale, loc)
For inner join:
def reducer(self, key, values):
loc = None
for product, sale, location in values:
if location: loc = location
elif loc: yield key, (product, sale, loc)
Neat, isn't it? To guarantee that the location record is available at the top of sales records by employee, the joining operation must be done at the reduction step; it cannot be done at the combine step. Thus joining operations can become a bottleneck. Down the line operations such as grouping can be done by another reduction step, which I will illustrate in a future post.