Joining big files (tables) with Python, mrjob and Hadoop Map-Reduce
Hadoop Map-Reduce can simultaneously read lines (records) from all files in a HDFS folder and present lines from one or more files in read order to map tasks running on one or more Hadoop cluster nodes. Let us say that the HDFS folder contains two files: one is a sales transaction file and the other is an employee location file. We want to join the location to the transactions.Here is an example sales transactions file:
joe,ink,20
joe,pencil,10
sam,whiteboard,120
joe,pencil,8
joe,pencil,18
joe,pencil,28
joe,ink,56
ron,ink,35
sam,paper,300
joe,tablet,2300
tom,staples,23
tom,pencil,20
tom,pen,50
tom,paper,300
ron,tablet,4500
sam,pen,30
joe,pen,50
ron,whiteboard,140
ron,whiteboard,300
Here is an example employee location file:
joe,Clifton
sam,Plainsboro
tom,Edison
paul,Piscataway
moe,Bridgewater
susan,Princeton
Here is the Python map-reduce program that will join the two. It requires the mrjob package which I think is fantastic for writing map-reduce jobs.
#!/usr/bin/env python from mrjob.job import MRJob class TestJob(MRJob): def __init__(self, *args, **kwargs): super(TestJob, self).__init__(*args, **kwargs) def mapper(self, _, line): # Mapper will either get a record from main or join table try: # See if it is main table record name, product, sale = line.split(',') yield name, (product, int(sale), '') except ValueError: try: # See if it is a join table record name, location = line.split(',') yield name, ('', '', location) except ValueError: pass # Record did not match either so skip the record def reducer(self, key, values): loc = None for product, sale, location in values: if location: loc = location else: yield key, (product, sale, loc) if __name__ == '__main__': TestJob.run()
Here is the joined output:
"joe" ["ink", 20, "Clifton"]
"joe" ["ink", 56, "Clifton"]
"joe" ["pen", 50, "Clifton"]
"joe" ["pencil", 10, "Clifton"]
"joe" ["pencil", 18, "Clifton"]
"joe" ["pencil", 28, "Clifton"]
"joe" ["pencil", 8, "Clifton"]
"joe" ["tablet", 2300, "Clifton"]
"ron" ["ink", 35, null]
"ron" ["tablet", 4500, null]
"ron" ["whiteboard", 140, null]
"ron" ["whiteboard", 300, null]
"sam" ["paper", 300, "Plainsboro"]
"sam" ["pen", 30, "Plainsboro"]
"sam" ["whiteboard", 120, "Plainsboro"]
"tom" ["paper", 300, "Edison"]
"tom" ["pen", 50, "Edison"]
"tom" ["pencil", 20, "Edison"]
"tom" ["staples", 23, "Edison"]
The reason this works is because Hadoop sorts the output from the mapper before presenting to the reducer. For lines coming from the employee location file, the product and sales amount is blank, putting it at the top of sales records for that employee. Here is the what is presented to the reducer:
"joe" ["", "", "Clifton"]
"joe" ["ink", 20, ""]
"joe" ["ink", 56, ""]
"joe" ["pen", 50, ""]
"joe" ["pencil", 10, ""]
"joe" ["pencil", 18, ""]
"joe" ["pencil", 28, ""]
"joe" ["pencil", 8, ""]
"joe" ["tablet", 2300, ""]
"moe" ["", "", "Bridgewater"]
"paul" ["", "", "Piscataway"]
"ron" ["ink", 35, ""]
"ron" ["tablet", 4500, ""]
"ron" ["whiteboard", 140, ""]
"ron" ["whiteboard", 300, ""]
"sam" ["", "", "Plainsboro"]
"sam" ["paper", 300, ""]
"sam" ["pen", 30, ""]
"sam" ["whiteboard", 120, ""]
"susan" ["", "", "Princeton"]
"tom" ["", "", "Edison"]
"tom" ["paper", 300, ""]
"tom" ["pen", 50, ""]
"tom" ["pencil", 20, ""]
"tom" ["staples", 23, ""]
This is a left join, so employees in the location file but not in transactions are ignored. To change it to an outer join, change the reducer to:
def reducer(self, key, values): loc = None for product, sale, location in values: if location: loc = location yield key, (product, sale, loc)For inner join:
def reducer(self, key, values): loc = None for product, sale, location in values: if location: loc = location elif loc: yield key, (product, sale, loc)Neat, isn't it? To guarantee that the location record is available at the top of sales records by employee, the joining operation must be done at the reduction step; it cannot be done at the combine step. Thus joining operations can become a bottleneck. Down the line operations such as grouping can be done by another reduction step, which I will illustrate in a future post.
This comment has been removed by the author.
ReplyDeleteHowdy, would you mind letting me know which web host you’re utilizing? I’ve loaded your blog in 3 completely different web browsers, and I must say this blog loads a lot quicker then most. Can you suggest a good internet hosting provider at a reasonable price?
ReplyDeleteSurya Informatics
Yeni perde modelleri
ReplyDeleteNumara Onay
Vodafone mobil ödeme bozdurma
nft nasıl alınır
Ankara Evden Eve Nakliyat
TRAFİK SİGORTASI
dedektör
web sitesi kurma
ASK ROMANLARİ
üsküdar bosch klima servisi
ReplyDeletetuzla toshiba klima servisi
tuzla beko klima servisi
çekmeköy lg klima servisi
beykoz beko klima servisi
pendik lg klima servisi
pendik alarko carrier klima servisi
pendik daikin klima servisi
çekmeköy bosch klima servisi