written 8.5 years ago by |
JOIN using MapReduce:
The joins can be done at both Map side and Join side according to the nature of data sets of to be joined.
Reduce Side Join
Let’s take the following tables containing employee and department data.
Let’s see how join query below can be achieved using reduce side join.
SELECT Employees.Name, Employees.Age, Department.Name FROM Employees INNER JOIN Department ON Employees.Dept_Id=Department.Dept_Id
Map side is responsible for emitting the join predicate values along with the corresponding record from each table so that records having same department id in both tables will end up at on same reducer which would then do the joining of records having same department id. However it is also required to tag the each record to indicate from which table the record originated so that joining happens between records of two tables. Following diagram illustrates the reduce side join process.
Here is the pseudo code for map function for this scenario. map (K table, V rec) {
dept_id = rec.Dept_Id
tagged_rec.tag = table
tagged_rec.rec = rec
emit(dept_id, tagged_rec)
}
At reduce side join happens within records having different tags. reduce (K dept_id, list<tagged_rec> tagged_recs) {
for (tagged_rec : tagged_recs) {
for (tagged_rec1 : taagged_recs) {
if (tagged_rec.tag != tagged_rec1.tag) {
joined_rec = join(tagged_rec, tagged_rec1)
}
emit (tagged_rec.rec.Dept_Id, joined_rec)
}
}
Map Side Join (Replicated Join)
Using Distributed Cache on Smaller Table
For this implementation to work one relation has to fit in to memory. The smaller table is replicated to each node and loaded to the memory. The join happens at map side without reducer involvement which significantly speeds up the process since this avoids shuffling all data across the network even-though most of the records not matching are later dropped. Smaller table can be populated to a hash-table so look-up by Dept_Id can be done. The pseudo code is outlined below.
map (K table, V rec) {
list recs = lookup(rec.Dept_Id) // Get smaller table records having this Dept_Id
for (small_table_rec : recs) {
joined_rec = join (small_table_rec, rec)
}
emit (rec.Dept_id, joined_rec)
}