Monday, December 23, 2013

A Basic Map Reduce Framework in Python

As I mentioned in my previous post mrjob is a great framework for writing Hadoop Map-Reduce jobs in Python. And there are several others such as Dumbo and Pydoop. However, if you work in a shop where you have to validate all external modules before use, you may want to create something from scratch for the quick wins even as you select and certify a framework.

Here is something I have been using. There is nothing to install or configure. It emphasizes transparency; there are no wrappers, and thus no worries of matching with Hadoop versions while getting full access to all Hadoop streaming options. You can chain multiple jobs, manipulate HDFS, and test individual parts with local, small files before running on Hadoop, all in one place.

The example below uses the data from my previous post. Notice that I am joining two files. The "sortOrder" forces the records from the file that is being joined to the top, and also identifies those records in the reducer. Note that if there are multiple records that match the main record, my code will only pick the last record from the joining file. This behavior can easily be adjusted to allow creation of as many records in the main table by storing joining file records in a list, or by aggregating joining fields through concatenation etc.

#!/usr/bin/env python

import sys
from itertools import groupby
from subprocess import call

def hadoop():
 call('hadoop fs -rm -r -skipTrash /user/cloudera/test/output', shell=True)

 cmd = 'hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.4.0.jar \
 -input /user/cloudera/test \
 -output /user/cloudera/test/output \
 -file ' + sys.argv[0] + ' \
 -mapper "' + sys.argv[0] + ' mapper" \
 -reducer "' + sys.argv[0] + ' reducer" '
 call(cmd, shell=True)
  
def mapper():
 for line in sys.stdin:
  try:
   person, product, sale = line.strip().split(',')
   print('%s\t%s\t%s\t%s\t%s' % (person, '1', product, sale, None))
  except ValueError:
   try:
    person, location = line.strip().split(',')
    print('%s\t%s\t%s\t%s\t%s' % (person, '0', None, None, location))
   except ValueError:
    continue

def lines():
 for line in sys.stdin:
  yield line.strip().split('\t')

def reducer():
 for key, values in groupby(lines(), lambda x: x[0]):
  count = total = 0
  location = ''
  for person, sortOrder, product, sale, locn in values:
   if sortOrder == '0': location = locn
   else:
    total += int(sale)
    count += 1
  if count: print(person, location, total)
   
def printUsage():
  sys.stderr.write('Usage: ' + sys.argv[0] + ' functionNametoRun\n')
  exit(1)
 
if __name__ == '__main__':
 if len(sys.argv) != 2: printUsage()
 try:
  locals()[sys.argv[1]]()
 except KeyError:
  sys.stderr.write('Error: No such function name: ' + sys.argv[1] + '\n')
  printUsage()

To chain jobs, simply create another Hadoop command in hadoop() and call new mapping and reducing functions from its command line. To add combiners, add -combiner to the command line in hadoop() to call your combiner function.

Tuesday, December 10, 2013

Joining big files (tables) with Python, mrjob and Hadoop Map-Reduce

Hadoop Map-Reduce can simultaneously read lines (records) from all files in a HDFS folder and present lines from one or more files in read order to map tasks running on one or more Hadoop cluster nodes. Let us say that the HDFS folder contains two files: one is a sales transaction file and the other is an employee location file. We want to join the location to the transactions.

Here is an example sales transactions file:

joe,ink,20
joe,pencil,10
sam,whiteboard,120
joe,pencil,8
joe,pencil,18
joe,pencil,28
joe,ink,56
ron,ink,35
sam,paper,300
joe,tablet,2300
tom,staples,23
tom,pencil,20
tom,pen,50
tom,paper,300
ron,tablet,4500
sam,pen,30
joe,pen,50
ron,whiteboard,140
ron,whiteboard,300

Here is an example employee location file:

joe,Clifton
sam,Plainsboro
tom,Edison
paul,Piscataway
moe,Bridgewater
susan,Princeton

Here is the Python map-reduce program that will join the two. It requires the mrjob package which I think is fantastic for writing map-reduce jobs.

#!/usr/bin/env python
from mrjob.job import MRJob

class TestJob(MRJob):
    def __init__(self, *args, **kwargs):
        super(TestJob, self).__init__(*args, **kwargs)

    def mapper(self, _, line):
        # Mapper will either get a record from main or join table
        try: # See if it is main table record
            name, product, sale = line.split(',')
            yield name, (product, int(sale), '')
        except ValueError:
            try: # See if it is a join table record
                name, location = line.split(',')
                yield name, ('', '', location)
            except ValueError:
                pass # Record did not match either so skip the record

    def reducer(self, key, values):
        loc = None
        for product, sale, location in values:
            if location: loc = location
            else: yield key, (product, sale, loc)

if __name__ == '__main__':
    TestJob.run()

Here is the joined output:

"joe"    ["ink", 20, "Clifton"]
"joe"    ["ink", 56, "Clifton"]
"joe"    ["pen", 50, "Clifton"]
"joe"    ["pencil", 10, "Clifton"]
"joe"    ["pencil", 18, "Clifton"]
"joe"    ["pencil", 28, "Clifton"]
"joe"    ["pencil", 8, "Clifton"]
"joe"    ["tablet", 2300, "Clifton"]
"ron"    ["ink", 35, null]
"ron"    ["tablet", 4500, null]
"ron"    ["whiteboard", 140, null]
"ron"    ["whiteboard", 300, null]
"sam"    ["paper", 300, "Plainsboro"]
"sam"    ["pen", 30, "Plainsboro"]
"sam"    ["whiteboard", 120, "Plainsboro"]
"tom"    ["paper", 300, "Edison"]
"tom"    ["pen", 50, "Edison"]
"tom"    ["pencil", 20, "Edison"]
"tom"    ["staples", 23, "Edison"]

The reason this works is because Hadoop sorts the output from the mapper before presenting to the reducer. For lines coming from the employee location file, the product and sales amount is blank, putting it at the top of sales records for that employee. Here is the what is presented to the reducer:

"joe"    ["", "", "Clifton"]
"joe"    ["ink", 20, ""]
"joe"    ["ink", 56, ""]
"joe"    ["pen", 50, ""]
"joe"    ["pencil", 10, ""]
"joe"    ["pencil", 18, ""]
"joe"    ["pencil", 28, ""]
"joe"    ["pencil", 8, ""]
"joe"    ["tablet", 2300, ""]
"moe"    ["", "", "Bridgewater"]
"paul"    ["", "", "Piscataway"]
"ron"    ["ink", 35, ""]
"ron"    ["tablet", 4500, ""]
"ron"    ["whiteboard", 140, ""]
"ron"    ["whiteboard", 300, ""]
"sam"    ["", "", "Plainsboro"]
"sam"    ["paper", 300, ""]
"sam"    ["pen", 30, ""]
"sam"    ["whiteboard", 120, ""]
"susan"    ["", "", "Princeton"]
"tom"    ["", "", "Edison"]
"tom"    ["paper", 300, ""]
"tom"    ["pen", 50, ""]
"tom"    ["pencil", 20, ""]
"tom"    ["staples", 23, ""]

This is a left join, so employees in the location file but not in transactions are ignored. To change it to an outer join, change the reducer to:

def reducer(self, key, values):
        loc = None
        for product, sale, location in values:
            if location: loc = location
        yield key, (product, sale, loc)
For inner join:
def reducer(self, key, values):
        loc = None
        for product, sale, location in values:
            if location: loc = location
            elif loc: yield key, (product, sale, loc) 
Neat, isn't it? To guarantee that the location record is available at the top of sales records by employee, the joining operation must be done at the reduction step; it cannot be done at the combine step. Thus joining operations can become a bottleneck. Down the line operations such as grouping can be done by another reduction step, which I will illustrate in a future post.

Monday, December 9, 2013

Analyzing unstructured data with Apache Hive, Hadoop and RegexSerDe


Download IMDB movies list with countries to your favorite folder within HDFS on your Hadoop cluster. If you observe records in the file, the movie name is enclosed in quotes and is the first item on the record. The year follows enclosed in parenthesis. Following that there is optional information such as TV episode title, season, episode etc. within curly braces. The last item on the record is the country name.

"'Da Kink in My Hair" (2007) {Empty Bag Can't Stand Up (#1.4)}    Canada

These items must be parsed into fields to enable analysis, such as count of movies made in the year 2000 in the USA. One way to do it is by first creating a table within Hive with the entire record as one field, and then using Create Table as Select (CTAS) where the select uses regexp_extract function to parse fields. The shortcomings of this approach are (1) the CTAS table cannot be external and (2) you have just doubled the storage space required if you intend to keep the original external table.

A better approach is to create the table with a Regular Expressions Serializer/Derserializer class (RegexSerDe) that will parse fields from the record with regular expressions at the time of initial table create.

The RegexSerDe class is contained within the hive-contrib-*.jar file. On my Cloudera Hadoop (CDH) cluster this file is located in the /usr/lib/hive/lib/ folder. Before using the RegexSerDe class, you will need to tell Hive about this library by running the following command on hive prompt:

add jar /usr/lib/hive/lib/hive-contrib-0.10.0-cdh4.4.0.jar;

This will add the jar file as a resource for that session. To make this jar available automatically each time you start a hive session, you can edit the /etc/hive/conf/hive-site.xml file to add the following:
<property>
        <name>hive.aux.jars.path</name>
        <value>file:///usr/lib/hive/lib/hive-contrib-0.10.0-cdh4.4.0.jar</value>
</property>
The <value> can take multiple files separated by comma in case you are using other jars.

Now that the RegexSerDe class is available for use within Hive, we can create the table using RegexSerDe like so:
CREATE EXTERNAL TABLE (title string, year string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = '"([^"]+)"\\s+\\(([^)]+).+'
)
LOCATION '/user/smith/imdb';
The above assumes that the countries.list file is located within the /user/smith/imdb folder on HDFS. We are only parsing out the movie title and date. I will update the post in the future with code that parses all fields. Notice that RegexSerDe requires the "input.regex" pattern to match the entire record, thus the .+ at the end. Also note that Regex shorthand and escape such as \s is written as \\s because Hive's shell escape character is also \. Each field to be parsed is a capture group enclosed by parenthesis.

Thursday, November 28, 2013

Loading and Extracting Big Data from your Hadoop Cluster from Windows

I have data files on my Windows machine I want to send to a Hadoop cluster for map-reduce. I also want to get back the reduced data back to my Windows system , perhaps to analyze with Excel or QlikView. And I want to be able to do this without installing Hadoop clients on my Windows machine. One way to do that is to SFTP or SCP files to/from one of the Hadoop cluster nodes, and then use hdfs fs commands to put/get from HDFS. However this needs twice the storage space on the cluster. A better way is needed.

Hadoop natively offers a REST API to HDFS called WebHDFS. It is a pretty straightforward API that can be used with cURL. However Windows does not natively come with cURL and also it is not practical to transfer large multi-GB files with cURL. So I decided to whip up a PowerShell script that can put, get, rename, delete files, and also make and delete directories. Right now the script will only work on a cluster where security is not enabled. A future version will include security.

To use the REST API you will need to enable WebHDFS on all nodes. To do so edit the hdfs-site.xml on each node to add the following:
        <property>
           <name>dfs.webhdfs.enabled</name>
           <value>true</value>
        </property>
Also because the REST API will re-direct the request to the node that has the data, or where Hadoop determines the data should be stored in case of a PUT request, your Windows machine should be able to resolve names of all nodes in the cluster to IP addresses either through DNS or local hosts file.

The script requires PowerShell v3 or higher, which comes natively with Windows 2012 and 8. For other versions of Windows you may need to upgrade PowerShell. You will also have to configure your system to permit running of PowerShell scripts by setting the executionpolicy to unrestricted. 

Run the following script from PowerShell prompt. No output will appear. The script is a set of functions that will load into memory.

function Hdfs-Put {
    param (
        [Parameter(Mandatory=$True)][string]$hostname,
        [Parameter(Mandatory=$False)][int]$port = 50070,
        [Parameter(Mandatory=$True)][byte[]]$data,
        [Parameter(Mandatory=$True)][string]$hdfsPath,
        [Parameter(Mandatory=$True)][string]$user,
        [Parameter(Mandatory=$False)][ValidateSet('open', 'append', 'overwrite')][string]$mode = 'open'
    )
        
    if (!(Test-Path $localPath)) { throw "$localPath does not exist" }
    if ($hdfsPath -notmatch '^/') { throw "hdfsPath must start with a /" }
    $method = 'PUT'
    $uri = "http://${hostname}:$port/webhdfs/v1${hdfspath}?op=CREATE&overwrite=false&user.name=$user"
    if ($mode -match 'append') { $uri = "http://${hostname}:$port/webhdfs/v1${hdfspath}?op=APPEND&user.name=$user"; $method = 'POST' }
    if ($mode -match 'overwrite') { $uri = "http://${hostname}:$port/webhdfs/v1${hdfspath}?op=CREATE&overwrite=true&user.name=$user" }
    # webHDFS Create operation requires two requests. The first is sent without data and redirects
    # to node name and port where data should be sent
    $wr = [System.Net.WebRequest]::Create($uri)
    $wr.Method = $method
    $wr.AllowAutoRedirect = $false
    $response = $wr.GetResponse()
    if ($response.StatusCode -ne 'TemporaryRedirect') {
        throw 'Error: Expected temporary redirect, got ' + $response.StatusCode
    }
    $wr = [System.Net.WebRequest]::Create($response.Headers['Location'])
    $wr.Method = $method
    $wr.ContentLength = $data.Length
    $requestBody = $wr.GetRequestStream()
    $requestBody.Write($data, 0, $data.Length)
    $requestBody.Close()

    # Return the reponse from webHDFS to the caller
    $wr.GetResponse()
}

function Hdfs-Get {
    param (
        [Parameter(Mandatory=$True)][string]$hostname,
        [Parameter(Mandatory=$False)][int]$port = 50070,
        [Parameter(Mandatory=$True)][string]$hdfsPath,
        [Parameter(Mandatory=$False)][string]$user,
        [Parameter(Mandatory=$False)][long]$offset = 0,
        [Parameter(Mandatory=$False)][long]$length = 67108864
    )
        
    $uri = "http://${hostname}:$port/webhdfs/v1${hdfspath}?op=OPEN&offset=$offset&length=$length"
  
    if ($user) { $uri += '&user.name=' + $user }
    $wr = [System.Net.WebRequest]::Create($uri)
    $response = $wr.GetResponse()
    $responseStream = $response.GetResponseStream()
    $br = New-Object System.IO.BinaryReader($responseStream)
    $br.ReadBytes($response.ContentLength)
    $br.Close()
    $responseStream.Close()
}

function Hdfs-List {
    param (
        [Parameter(Mandatory=$True)][string]$hostname,
        [Parameter(Mandatory=$False)][int]$port = 50070,
        [Parameter(Mandatory=$True)][string]$hdfsPath
    )
    if ($hdfsPath -notmatch '^/') { throw "hdfsPath must start with a /" }
    $fileStatus= Invoke-RestMethod -Method Get -Uri "http://${hostname}:$port/webhdfs/v1${hdfsPath}?op=LISTSTATUS"
    foreach ($item in $fileStatus.FileStatuses.FileStatus) {
        $item.accessTime = Convert-FromEpochTime $item.accessTime
        $item.modificationTime = Convert-FromEpochTime $item.modificationTime
        $item
    }
}

function Hdfs-Remove {
    param (
        [Parameter(Mandatory=$True)][string]$hostname,
        [Parameter(Mandatory=$False)][int]$port = 50070,
        [Parameter(Mandatory=$True)][string]$hdfsPath,
        [Parameter(Mandatory=$True)][string]$user,
        [switch]$recurse
    )
    if ($hdfsPath -notmatch '^/') { throw "hdfsPath must start with a /" }
    if ($recurse) { $rec = 'true' } else { $rec = 'false' }
    $result = Invoke-RestMethod -Method Delete -Uri "http://${hostname}:$port/webhdfs/v1${hdfsPath}?op=DELETE&recursive=$rec&user.name=$user"
    $result.boolean
}

function Hdfs-Mkdir {
    param (
        [Parameter(Mandatory=$True)][string]$hostname,
        [Parameter(Mandatory=$False)][int]$port = 50070,
        [Parameter(Mandatory=$True)][string]$hdfsPath,
        [Parameter(Mandatory=$True)][string]$user,
        [Parameter(Mandatory=$False)][string]$permission
    )
    if ($hdfsPath -notmatch '^/') { throw "hdfsPath must start with a /" }
    if ($permission) {
        $result = Invoke-RestMethod -Method Put -Uri "http://${hostname}:$port/webhdfs/v1${hdfsPath}?op=MKDIRS&permission=$permission&user.name=$user" }
    else { $result = Invoke-RestMethod -Method Put -Uri "http://${hostname}:$port/webhdfs/v1${hdfsPath}?op=MKDIRS&user.name=$user" }
    $result.boolean
}

function Hdfs-Rename {
    param (
        [Parameter(Mandatory=$True)][string]$hostname,
        [Parameter(Mandatory=$False)][int]$port = 50070,
        [Parameter(Mandatory=$True)][string]$hdfsPath,
        [Parameter(Mandatory=$True)][string]$hdfsNewPath,
        [Parameter(Mandatory=$True)][string]$user
    )
    if ($hdfsPath -notmatch '^/') { throw "hdfsPath must start with a /" }
    if ($hdfsNewPath -notmatch '^/') { throw "hdfsNewPath must start with a /" }
    $result = Invoke-RestMethod -Method Put -Uri "http://${hostname}:$port/webhdfs/v1${hdfsPath}?op=RENAME&user.name=$user&destination=$hdfsNewPath"
    $result.boolean
}

function Convert-FromEpochTime ([long]$epochTime) {
    [TimeZone]::CurrentTimeZone.ToLocalTime(([datetime]'1/1/1970').AddSeconds($epochTime/1000))
}

function Hdfs-PutFile {
   param (
        [Parameter(Mandatory=$True)][string]$hostname,
        [Parameter(Mandatory=$False)][int]$port = 50070,
        [Parameter(Mandatory=$True)][string]$localPath,
        [Parameter(Mandatory=$True)][string]$hdfsPath,
        [Parameter(Mandatory=$True)][string]$user,
        [Parameter(Mandatory=$False)][int]$sliceSize = 67108864,
        [Parameter(Mandatory=$False)][ValidateSet('open', 'append', 'overwrite')][string]$mode = 'open'
    )
      
    try {
        $br = New-Object System.IO.BinaryReader([System.IO.File]::Open($localPath, [System.IO.FileMode]::Open))
    } catch { throw $error[0].Exception.Message }
    $total = $br.BaseStream.Length
    $sent = 0
    $firstRun = $true
  
    do {
        Write-Progress -Activity "Copying $localPath to HDFS on $hostname" -PercentComplete ($sent/$total * 100)
        $data = $br.ReadBytes($sliceSize)
        try {
            Hdfs-Put -hostname $hostname -port $port -user $user -hdfsPath $hdfsPath -data $data -mode $mode | out-null
        } catch { $br.Close(); throw $error[0].Exception.Message }
        $sent += $sliceSize
        if ($firstRun) { $firstRun = $false; $mode = 'append' }
    } while ($data.LongLength -eq $sliceSize)
    $br.Close()
}

function Hdfs-GetFile {
    param (
        [Parameter(Mandatory=$True)][string]$hostname,
        [Parameter(Mandatory=$False)][int]$port = 50070,
        [Parameter(Mandatory=$True)][string]$hdfsPath,
        [Parameter(Mandatory=$False)][string]$user,
        [Parameter(Mandatory=$False)][string]$localPath,
        [Parameter(Mandatory=$False)][long]$length,
        [switch]$append,
        [switch]$overwrite
    )
    if ($append -and $overwrite) { throw 'Cannot use -append and -overwrite together' }
    $mode = [System.IO.FileMode]::CreateNew
    if ($append) {$mode = [System.IO.FileMode]::Append}
    if ($overwrite) {$mode = [System.IO.FileMode]::Create}
  
    try {
        $bw = New-Object System.IO.BinaryWriter([System.IO.File]::Open($localPath, $mode))
    } catch { throw $error[0].Exception.Message }
  
    $fileAttribs = Hdfs-List -hostname $hostname -hdfsPath $hdfsPath -port $port
    if (!$length) { $length = $fileAttribs.length }
    $blockSize = $fileAttribs.blockSize
    if ($length -lt $blockSize) { $blockSize = $length }
    $got = 0
  
    do {
        Write-Progress -Activity "Copying $hdfsPath to $localPath" -PercentComplete ($got/$length * 100)
      
        try {
            [byte[]]$data = Hdfs-Get -hostname $hostname -port $port -user $user -hdfsPath $hdfsPath -offset $got -length $blockSize
        } catch { $bw.Close(); throw $error[0].Exception.Message }
        try {
            $bw.Write($data)
        } catch { $bw.Close(); throw $error[0].Exception.Message }
        $got += $data.LongLength
    } while ($got -lt $length)
    $bw.Close()
}

You can now interact with HDFS from the PowerShell command prompt like so:
Hdfs-List -hostname nameNode -hdfsPath /user/jack
Hdfs-Mkdir -hostname nameNode -hdfsPath /user/jack/folder1 -user jack
Hdfs-PutFile -hostname nameNode -hdfsPath /user/jack/folder1/myfile.txt -localPath c:\myfile.txt -user jack
Hdfs-PutFile -hostname nameNode -hdfsPath /user/jack/folder1/myfile.txt -localPath c:\myfile2.txt -user jack -mode append
Hdfs-PutFile -hostname nameNode -hdfsPath /user/jack/folder1/myfile.txt -localPath c:\myfile3.txt -user jack -mode overwrite
Hdfs-Rename -hostname nameNode -hdfsPath /user/jack/folder1/myfile.txt -hdfsNewPath /user/jack/folder1/newfile.txt -user jack
Hdfs-GetFile -hostname nameNode -hdfsPath /user/jack/folder1/newfile.txt -localPath c:\myfile4.txt -user jack
Hdfs-GetFile -hostname nameNode -hdfsPath /user/jack/folder1/newfile.txt -localPath c:\myfile4.txt -user jack -length 2000
Hdfs-GetFile -hostname nameNode -hdfsPath /user/jack/folder1/newfile.txt -localPath c:\myfile4.txt -user jack -overwrite
Hdfs-GetFile -hostname nameNode -hdfsPath /user/jack/folder1/newfile.txt -localPath c:\myfile4.txt -user jack -append
Hdfs-Remove -hostname nameNode -hdfsPath /user/jack -user jack -recurse

The Hdfs-PutFile and Hdfs-GetFile will work with large files because they transfer portions at a time.