Our play data set today is account balances on various dates. Here is existing data in the lake in previous.csv file:
2014-01-19,Joe,200 2014-01-22,Bill,300 2014-01-22,Ann,450
Here is a new batch of data in today.csv:
2014-02-10,Joe,100 2014-02-11,Bill,30 2014-02-12,Andy,10
We need to update the data set so that Joe's balance gets updated to 100, Bill's to 30, and Andy gets added:
2014-02-10,Joe,100 2014-02-11,Bill,30 2014-01-22,Ann,450 2014-02-12,Andy,10
We will be combining previous with today and grouping based on the key which in this case is the name of the person. But before we combine we need to add a flag that will differentiate old and new records. You can use the date or any other field value in the data itself, but adding a flag is a much simpler and generic solution.
today = load 'today.csv' using PigStorage(',') as (date:chararray,name:chararray,balance:int); todayWithFlag = foreach today generate *, 0 as flag; previous = load 'previous.csv' using PigStorage(',') as (date:chararray,name:chararray,balance:int); previousWithFlag = foreach previous generate *, 1 as flag; combined = union todayWithFlag, previousWithFlag; grouped = group combined by name;
The "grouped" relation looks like the following:
(Ann,{(2014-01-22,Ann,450,1)}) (Joe,{(2014-01-19,Joe,200,1),(2014-01-19,Joe,100,0)}) (Andy,{(2014-01-14,Andy,10,0)}) (Bill,{(2014-01-22,Bill,30,0),(2014-01-22,Bill,300,1)})
As you can see the Pig group operation created an inner bag, identified by the {}, in each tuple. From within each inner bag, we need to extract only one tuple using the following logic. If there is a tuple with flag = 0 (new data), get that otherwise just get the tuple (old data). To do that we can sort the tuples within the inner bag on flag, and then get the first tuple.
result = foreach grouped { a = order combined by flag; b = limit a 1; generate flatten(b); } resultNoFlag = foreach result generate date..balance; dump resultNoFlag;
That is it!