Sunday, June 29, 2014

Updates with Pig

Here is a fairly common problem. There is a nightly ETL that creates a data file that needs to be added to your Hadoop data lake. In my previous post we used Pig to do cleansing of the data. But what about relationship between existing data in the lake and the new batch? Instead of just appending to the end, you may want to update any existing records and also append any new records. This would optimize storage, and conserve CPU for subsequent map-reduce jobs.

Our play data set today is account balances on various dates. Here is existing data in the lake in previous.csv file:

2014-01-19,Joe,200
2014-01-22,Bill,300
2014-01-22,Ann,450

Here is a new batch of data in today.csv:

2014-02-10,Joe,100
2014-02-11,Bill,30
2014-02-12,Andy,10

We need to update the data set so that Joe's balance gets updated to 100, Bill's to 30, and Andy gets added:

2014-02-10,Joe,100
2014-02-11,Bill,30
2014-01-22,Ann,450
2014-02-12,Andy,10

We will be combining previous with today and grouping based on the key which in this case is the name of the person. But before we combine we need to add a flag that will differentiate old and new records. You can use the date or any other field value in the data itself, but adding a flag is a much simpler and generic solution.

today = load 'today.csv' using PigStorage(',') as (date:chararray,name:chararray,balance:int);
todayWithFlag = foreach today generate *, 0 as flag;
previous = load 'previous.csv' using PigStorage(',') as (date:chararray,name:chararray,balance:int);
previousWithFlag = foreach previous generate *, 1 as flag;
combined = union todayWithFlag, previousWithFlag;
grouped = group combined by name; 
 
The "grouped" relation looks like the following:


(Ann,{(2014-01-22,Ann,450,1)})
(Joe,{(2014-01-19,Joe,200,1),(2014-01-19,Joe,100,0)})
(Andy,{(2014-01-14,Andy,10,0)})
(Bill,{(2014-01-22,Bill,30,0),(2014-01-22,Bill,300,1)})

As you can see the Pig group operation created an inner bag, identified by the {}, in each tuple. From within each inner bag, we need to extract only one tuple using the following logic. If there is a tuple with flag = 0 (new data), get that otherwise just get the tuple (old data). To do that we can sort the tuples within the inner bag on flag, and then get the first tuple.

result = foreach grouped {
 a = order combined by flag;
 b = limit a 1;
 generate flatten(b);
}

resultNoFlag = foreach result generate date..balance;
dump resultNoFlag;

That is it!

Tuesday, June 24, 2014

Cleansing data with Pig

Hadoop is a great solution for cleansing and matching as a part of the ETL pipeline. The Map in Map-Reduce is about data transformation.

Consider the following two rows of CSV data:

01-15-2014 12:04:05, row1
2014/02/20, row2

Row 1 has the desired timestamp format MM-DD-YYYY HH:MM:SS. Row 2 timestamp needs to be transformed to match. Here are three ways you can accomplish this with Pig.

Method 1 (Preferred)

a = load 'testdata.csv' USING PigStorage(',') as (date:chararray, row:chararray);
b = foreach a generate (date matches '\\d+/.+'
        ?
            CONCAT(CONCAT(CONCAT(CONCAT(CONCAT(SUBSTRING(date,6,10), '-'), SUBSTRING(date,0,2)), '-'), SUBSTRING(date,3,5)), ' 00:00:00')
        :
            date) as date, row;
dump b;

Method 2

a = load 'testdata.csv' USING PigStorage(',') as (date:chararray, row:chararray);
goodDate = filter a by date matches '\\d+-.+';
parseBadDate = foreach (filter a by date matches '\\d+/.+') generate REGEX_EXTRACT_ALL(date,'(\\d+)/(\\d+)/(\\d+).*') as datenew:(month, day, year), row;

fixedBadDate = foreach parseBadDate generate CONCAT(CONCAT(CONCAT(CONCAT(CONCAT((chararray)datenew.year, '-'), (chararray)datenew.month), '-'), (chararray)datenew.day), ' 00:00:00') as date, row;

together = union goodDate, fixedBadDate;
dump together;

Method 3

a = load 'testdata.csv' USING PigStorage(',') as (date:chararray, row:chararray);
b = foreach a generate REGEX_EXTRACT_ALL(date,'(\\d+)-(\\d+)-(\\d+).*') as date:(year, month, day), row;
c = foreach b generate date.day as day, date.month as month, date.year as year, row;
d = foreach a generate REGEX_EXTRACT_ALL(date,'(\\d+)/(\\d+)/(\\d+).*') as date:(month, day, year), row;
e = foreach d generate date.day as day, date.month as month, date.year as year, row;
f = union c,e;
g = filter f by day is not null;
dump g;