Tuesday, February 21, 2012

Set operations in Apache Pig


Simple set operation examples

While writing Apache Pig scripts, I realized that in many cases the result I was after was attainable through a series of set operations performed on various relations. It’s not very clear from the documentation how to perform these operations. I googled a bit and found this PDF from Tufts University on ‘Advanced Pig’. In a nutshell, COGROUP is your friend. Here are some simple examples that show how you can perform set operations in Pig using COGROUP.

Let’s assume we have 2 relations TEST1 and TEST2. We load TEST1 from test1.txt containing:


1,aaaa
2,bbbb
3,cccc
4,dddd
5,eeee
6,ffff
7,gggg
8,hhhh
9,iiii

TEST1 = LOAD 's3://mybucket/test1.txt' USING PigStorage(',') as (
id: chararray,
value: chararray);

We load TEST2 from test2.txt containing:

7,ggggggg
8,hhhhhhh
9,iiiiiii
10,jjjjjjj
11,kkkkkkk

TEST2 = LOAD 's3://mybucket/test2.txt' USING PigStorage(',') as (
id: chararray,
value: chararray);


We use COGROUP to generate a new relation. COGROUP is similar to JOIN, in that it takes one or more fields of each of its member relations. Here is how we cogroup based on the id field of TEST1 and also id of TEST2:

CGRP = COGROUP TEST1 BY id, TEST2 BY id;
DUMP CGRP;

(1,{(1,aaaa)},{})
(2,{(2,bbbb)},{})
(3,{(3,cccc)},{})
(4,{(4,dddd)},{})
(5,{(5,eeee)},{})
(6,{(6,ffff)},{})
(7,{(7,gggg)},{(7,ggggggg)})
(8,{(8,hhhh)},{(8,hhhhhhh)})
(9,{(9,iiii)},{(9,iiiiiii)})
(10,{},{(10,jjjjjjj)})
(11,{},{(11,kkkkkkk)})

If we DESCRIBE the new relation CGRP we get:

CGRP: {group: chararray,TEST1: {(id: chararray,value: chararray)},TEST2: {(id: chararray,value: chararray)}}

What is important to notice is that the second element of each tuple from the new relation is a bag of tuples from TEST1 containing the id value by which we cogrouped, and the third element of each tuple is a bag of tuples from TEST2 containing that same id value. These bags are empty if TEST1 or TEST2 do not contain a given id value. Based on this, we can perform the set operations I mentioned.

To perform set intersection (based on the id field), we only keep those tuples which have non-empty bags for both TEST and TEST2:


INTERSECT = FILTER CGRP BY NOT IsEmpty(TEST1) AND NOT IsEmpty(TEST2);
INTERSECT_ID = FOREACH INTERSECT GENERATE group AS id;   
DUMP INTERSECT_ID;

(7)
(8)
(9)


To perform the set difference TEST1 - TEST2 (based again on the id field), we keep only those tuples which have empty bags for TEST2 (which means those particular id values are in TEST1, but not in TEST2:

TEST1_MINUS_TEST2 = FILTER CGRP BY IsEmpty(TEST2);
TEST1_MINUS_TEST2_ID = FOREACH TEST1_MINUS_TEST2 GENERATE group AS id;
DUMP TEST1_MINUS_TEST2_ID;

(1)
(2)
(3)
(4)
(5)
(6)


The difference going the other way (TEST2 - TEST1) is similar. We keep only those tuples which have empty bags for TEST1:

TEST2_MINUS_TEST1 = FILTER CGRP BY IsEmpty(TEST1);
TEST2_MINUS_TEST1_ID = FOREACH TEST2_MINUS_TEST1 GENERATE group AS id;
DUMP TEST2_MINUS_TEST1_ID;

(10)
(11)


Note that if we wanted the set union based on the id field, we could simply generate the ‘group’ element of the CGRP relation:

UNION_ID = FOREACH CGRP GENERATE group AS id;
DUMP UNION_ID;

(1)
(2)
(3)
(4)
(5)
(6)
(7)
(8)
(9)
(10)
(11)


To perform the set intersection operation, we could also do a JOIN of TEST1 and TEST2 on the id field:

J = JOIN TEST1 BY id, TEST2 BY id;
DESCRIBE J;
DUMP J;

J: {TEST1::id: chararray,TEST1::value: chararray,TEST2::id: chararray,TEST2::value: chararray}

(7,gggg,7,ggggggg)
(8,hhhh,8,hhhhhhh)
(9,iiii,9,iiiiiii)


After the JOIN, we keep only the first field of the J relation (the id field):

J_ID = FOREACH J GENERATE $0;
DUMP J_ID;

(7)
(8)
(9)


To perform the set union operation, we could do a UNION of TEST1 and TEST2:

U = UNION TEST1, TEST2;
DESCRIBE U;
DUMP U;

U: {id: chararray,value: chararray}

(1,aaaa)
(2,bbbb)
(3,cccc)
(4,dddd)
(5,eeee)
(6,ffff)
(7,gggg)
(8,hhhh)
(9,iiii)
(7,ggggggg)
(8,hhhhhhh)
(9,iiiiiii)
(10,jjjjjjj)
(11,kkkkkkk)


However, note that the tuples containing common id values (7, 8 and 9) are duplicated at this point. So to generate the true set union, we need to keep only the distinct id values:

U_ID = FOREACH U GENERATE $0;
U_ID_DISTINCT = DISTINCT U_ID;
DUMP U_ID_DISTINCT;

(1)
(2)
(3)
(4)
(5)
(6)
(7)
(8)
(9)
(10)
(11)

More ‘real life’ set operation examples

The following examples slightly more realistic. At least they’re based on real data -- the GeoWordNet datasets. As stated in this “Background Knowledge Datasets” document:

“A geo-spatial ontology is an ontology consisting of geo-spatial classes (e.g. lake, city), entities (e.g., Lago di Molveno, Trento), their metadata (e.g. latitude and longitude coordinates) and relations between them (e.g., part-of, instance-of). GeoWordNet is a multilingual geo-spatial ontology built from the full integration of WordNet, GeoNames and the Italian part of MultiWordNet.”

The GeoWordNet dataset contains several CSV files which can be either imported in a relational database, or, in our case, loaded into Pig as relations:

concept = LOAD 's3://mybucket/geowordnet/concept.csv.gz' USING PigStorage(',') as (
 con_id: int,
 name: chararray,
 gloss:chararray,
 lang: chararray,
 provenance: chararray);

relation= LOAD 's3://mybucket/geowordnet/relation.csv.gz' USING PigStorage(',') as (
         src_con_id: int,
         trg_con_id: int,
         name: chararray,
         gloss:chararray,
         lang: chararray);

entity = LOAD 's3://mybucket/geowordnet/entity.csv.gz' USING PigStorage(',') as (
         entity_id: int,
         name: chararray,
         con_id: int,
         lang: chararray,
         latitude: chararray,
         longitude: chararray,
         provenance: chararray);

part_of = LOAD 's3://mybucket/geowordnet/part_of.csv.gz' USING PigStorage(',') as (
         src_entity_id: int,
         trg_entity_id: int);

alternative_name_eng = LOAD 's3://mybucket/geowordnet/alternative_name_eng.csv.gz' USING PigStorage(',') as (
         entity_id: int,
         name: chararray);

alternative_name_ita = LOAD 's3://mybucket/geowordnet/alternative_name_ita.csv.gz' USING PigStorage(',') as (
         entity_id: int,
         name: chararray);


Example 1

-- Find entities with both alternative english AND italian names
COGRP1 = COGROUP alternative_name_eng BY entity_id, alternative_name_ita BY entity_id;
INTERSECT = FILTER COGRP1 BY NOT IsEmpty(alternative_name_eng) AND NOT IsEmpty(alternative_name_ita);
R1 = FOREACH INTERSECT GENERATE FLATTEN(alternative_name_eng), FLATTEN(alternative_name_ita);


Example 2

-- Find entities with alternative english names but with no alternative italian names
COGRP2 = COGROUP alternative_name_eng BY entity_id, alternative_name_ita BY entity_id;
DIFF2 = FILTER COGRP2 BY IsEmpty(alternative_name_ita);
R2 = FOREACH DIFF2 GENERATE FLATTEN(alternative_name_eng);


Example 3

-- Find entities with alternative italian names but with no alternative english names
COGRP3 = COGROUP alternative_name_ita BY entity_id, alternative_name_eng BY entity_id;
DIFF3 = FILTER COGRP3 BY IsEmpty(alternative_name_eng);
R3 = FOREACH DIFF3 GENERATE FLATTEN(alternative_name_ita);

Example 4

-- Find entities with alternative english OR italian names
U = UNION alternative_name_eng, alternative_name_ita;
J = JOIN entity BY entity_id, U BY entity_id;
R4 = FOREACH J GENERATE entity::name, entity::con_id, entity::lang, entity::latitude, entity::longitude, U::name;

Example 5

-- Find entities with NO alternative english and NO italian names (by doing set difference)
COGRP5 = COGROUP entity BY entity_id, U BY entity_id;
DIFF5 = FILTER COGRP5 BY IsEmpty(U);
R5 = FOREACH DIFF5 GENERATE FLATTEN(entity);


Although not strictly set-operation-related, here are some more things you can find out from the GeoWordNet dataset by means of JOINs between the appropriate relations:

-- Find relations between concepts
J1 = JOIN concept BY con_id, relation BY src_con_id;
J2 = JOIN J1 by trg_con_id, concept by con_id;
R6 = FOREACH J2 GENERATE J1::concept::con_id, J1::concept::name, J1::concept::gloss, J1::concept::lang, J1::concept::provenance, J1::relation::src_con_id, J1::relation::trg_con_id, J1::relation::name, J1::relation::gloss, J1::relation::lang, concept::con_id, concept::name, concept::gloss, concept::lang, concept::provenance;

-- Find entities which are part of other entities
J3 = JOIN entity BY entity_id, part_of BY src_entity_id;
J4 = JOIN J3 by trg_entity_id, entity by entity_id;
R7 = FOREACH J4 GENERATE J3::entity::name, J3::entity::con_id, J3::entity::lang, J3::entity::latitude, J3::entity::longitude, 'is part of', entity::name, entity::con_id, entity::lang, entity::latitude, entity::longitude;


2 comments:

Anonymous said...

<3 thank you

Anonymous said...

Most helpful write up on the usefulness of cogroup and set operations. Thanks for posting!

Modifying EC2 security groups via AWS Lambda functions

One task that comes up again and again is adding, removing or updating source CIDR blocks in various security groups in an EC2 infrastructur...