A Basic Map Reduce Framework in Python
As I mentioned in my previous post mrjob is a great framework for writing Hadoop Map-Reduce jobs in Python. And there are several others such as Dumbo and Pydoop. However, if you work in a shop where you have to validate all external modules before use, you may want to create something from scratch for the quick wins even as you select and certify a framework.
Here is something I have been using. There is nothing to install or configure. It emphasizes transparency; there are no wrappers, and thus no worries of matching with Hadoop versions while getting full access to all Hadoop streaming options. You can chain multiple jobs, manipulate HDFS, and test individual parts with local, small files before running on Hadoop, all in one place.
The example below uses the data from my previous post. Notice that I am joining two files. The "sortOrder" forces the records from the file that is being joined to the top, and also identifies those records in the reducer. Note that if there are multiple records that match the main record, my code will only pick the last record from the joining file. This behavior can easily be adjusted to allow creation of as many records in the main table by storing joining file records in a list, or by aggregating joining fields through concatenation etc.
Here is something I have been using. There is nothing to install or configure. It emphasizes transparency; there are no wrappers, and thus no worries of matching with Hadoop versions while getting full access to all Hadoop streaming options. You can chain multiple jobs, manipulate HDFS, and test individual parts with local, small files before running on Hadoop, all in one place.
The example below uses the data from my previous post. Notice that I am joining two files. The "sortOrder" forces the records from the file that is being joined to the top, and also identifies those records in the reducer. Note that if there are multiple records that match the main record, my code will only pick the last record from the joining file. This behavior can easily be adjusted to allow creation of as many records in the main table by storing joining file records in a list, or by aggregating joining fields through concatenation etc.
#!/usr/bin/env python import sys from itertools import groupby from subprocess import call def hadoop(): call('hadoop fs -rm -r -skipTrash /user/cloudera/test/output', shell=True) cmd = 'hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.4.0.jar \ -input /user/cloudera/test \ -output /user/cloudera/test/output \ -file ' + sys.argv[0] + ' \ -mapper "' + sys.argv[0] + ' mapper" \ -reducer "' + sys.argv[0] + ' reducer" ' call(cmd, shell=True) def mapper(): for line in sys.stdin: try: person, product, sale = line.strip().split(',') print('%s\t%s\t%s\t%s\t%s' % (person, '1', product, sale, None)) except ValueError: try: person, location = line.strip().split(',') print('%s\t%s\t%s\t%s\t%s' % (person, '0', None, None, location)) except ValueError: continue def lines(): for line in sys.stdin: yield line.strip().split('\t') def reducer(): for key, values in groupby(lines(), lambda x: x[0]): count = total = 0 location = '' for person, sortOrder, product, sale, locn in values: if sortOrder == '0': location = locn else: total += int(sale) count += 1 if count: print(person, location, total) def printUsage(): sys.stderr.write('Usage: ' + sys.argv[0] + ' functionNametoRun\n') exit(1) if __name__ == '__main__': if len(sys.argv) != 2: printUsage() try: locals()[sys.argv[1]]() except KeyError: sys.stderr.write('Error: No such function name: ' + sys.argv[1] + '\n') printUsage()
To chain jobs, simply create another Hadoop command in hadoop() and call new mapping and reducing functions from its command line. To add combiners, add -combiner to the command line in hadoop() to call your combiner function.
Great Article
ReplyDeleteB.Tech Final Year Projects for CSE in Python
FInal Year Project Centers in Chennai
JavaScript Training in Chennai
JavaScript Training in Chennai