Home > Software > BIGDATA > HADOOP
Interview Questions   Tutorials   Discussions   Programs   Videos   Discussion   

HADOOP - How to customize combiner?




1265
views
asked Experts-976 November 24, 2014 09:09 PM  

How to customize combiner?


           

2 Answers



 
answered By Experts-976   0  

please see he below videos

flag   
   add comment

 
answered By Experts-976   0  

Combiner in Mapreduce

Combiners:

Combiner is a semi-reducer in mapreduce. This is an optional class which can be specified in mapreduce driver class to process the output of map tasks before submitting it to reducer tasks.

In Mapreduce framework, usually the output from the map tasks is large and data transfer between map and reduce tasks will be high. Since the data transfer across the network is expensive and to limit the volume of data transfer between map and reduce tasks.

Combiner functions summarize the map output records with the same key and output of combiner will be sent over network to actual reduce task as input.

Further details:

The combiner does not have its own interface and it must implement Reducer interface and reduce() method of combiner will be called on each map output key. The combiner class’s reduce() method must have the same input and output key-value types as the reducer class.

Combiner functions are suitable for producing summary information from a large data set because combiner will replace that set of original map outputs, ideally with fewer records or smaller records.

Hadoop doesn’t guarantee on how many times a combiner function will be called for each map output key. At times, it may not be executed at all, while at times it may be used once, twice, or more times depending on the size and number of output files generated by the mapper for each reducer.

It is a general practice that, the same reducer class is used a combiner class many times. but this practice leads to some undesired results in some cases. The combiner function must only aggregate values and It is very important that the combiner class not have side effects, and that the actual reducer be able to properly process the results of the combiner.

Note:

When using the same reducer class as combiner class and then, if job’s output has problems, try running the job without the combiner to check the output.

//Example Word Count Program with Combiner Class.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountWithCombiner
{
  public static void main (String [] args) throws Exception
  {
    //Basic Sanity check to verify the arguments
    if (args.length != 2)
    {
        System.err.println("Usage: wordcount  ");
        System.exit(2);
    }
    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);
    Configuration conf = new Configuration();

    // Job Object configuration
    Job job = new Job();
    job.setJobName("WordCount3");
    job.setJarByClass(WordCountWithCombiner.class);

    //Set Mapper class
    job.setMapperClass(WordcountMapper.class);
    //Set Combiner class as WordcounReducer class.
    job.setCombinerClass(WordcountReducer.class);
    job.setReducerClass(WordcountReducer.class);

    //Set Output Key and value data types
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //Set Input and Output formats and paths
    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);

    System.exit(job.waitForCompletion(true) ? 0:1);
  }
}

Output with combiner

.    1
a    1
and    1
as    1
count    1
counts    1
file    1
for    1
input    1
is    1
job    1
job.    1
map    1
returns    1
sample    1
takes    1
this    1
word    1

Where as the actual output received without the combiner class is as follows

.    1
a    1
and    1
as    1
count    1
counts    1
file    2
for    1
input    1
is    1
job    1
job.    1
map    1
returns    1
sample    1
takes    1
this    3
word    2

So, here the value for the keys or words ‘file’, ‘this’ and ‘word’ is is now incorrectly specified as 1 instead of 2, 3 and 2 respectively. This is because of the way reduce method in combiner is implemented

The map output from our mapper for these three words is like:

(file, 1, 1)
(this, 1, 1, 1)
(word, 1, 1)

Combiner Output is:

(file, 2)
(this, 3)
(word, 2)

Reducer Output is:

(file, 1)
(this, 1)
(word, 1)

The final output of reduce() method is just a count of integers associated with each key but the not the actual value in the integer as per our reduce() method implementation. Let’s review the code snippet of reduce() method in our WordcountReducer Class

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
  {
    //counter for total count of each word
    int sum = 0;
    //Iterates over all the values present in the collection "values". 
    for (IntWritable val : values) 
    {
      sum++ ;  //i.e. 'sum' will be incremented by 1 for every value associated with a key but not extracting the actual value in the integer
    }

    result.set(sum);
    //finally write (word, count) pairs into Reducer's context.
    context.write(key, result);
  }

Since, counter will be incremented by 1 for every value associated with key but not summing the actual values associated with keys, the results are undesirable.

Fix Combiner in Mapreduce Word Count program:

In order to fix the above combiner issue, lets try modifying our reduce() method in our combiner or reducer class WordcountReducer.java. Lets copy WordcountReducer.java into WordcountReducer2.java and modify the reduce() method as below.

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer2 extends Reducer <Text, IntWritable, Text, IntWritable>
{
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
  {
    //counter for total count of each word
    int sum = 0;
    for (IntWritable val : values) 
    {
      sum += val.get() ; //Summing actual values associated with each key instead of counting the values
    }

    result.set(sum);
    //finally write (word, total) pairs into Reducer's context.
    context.write(key, result);
  }
}

and change the driver class to use WordcountReducer2.class as combiner as well as reducer classes. Lets copy WordCountWithCombiner.java into WordCountWithFixedCombiner.java program and perform below highlighted changes.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountWithFixedCombiner
{
  public static void main (String [] args) throws Exception
  {
    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);
    Configuration conf = new Configuration();

    // Job Object configuration
    Job job = new Job();
    job.setJobName("WordCount4");
    job.setJarByClass(WordCountWithFixedCombiner.class);

    job.setMapperClass(WordcountMapper.class);
    //Set Combiner class as WordcounReducer2 class.
    job.setCombinerClass(WordcountReducer2.class);
    job.setReducerClass(WordcountReducer2.class);

    //Set Output Key and value data types
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //Set Input and Output formats and paths
    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);

    System.exit(job.waitForCompletion(true) ? 0:1);
  }
}

You will get actual output,

flag   
   add comment

Your answer

Join with account you already have

FF

Preview

 Write A Tutorials
Online-Classroom Classes
www.writeabc.com


  1 person following this question

  5 people following this tag

  Question tags

hadoop × 7

Asked 3 years and 1 month ago ago
Number of Views -1265
Number of Answers -2
Last updated
3 years and 21 days ago ago

Ready to start your tutorial with us? That's great! Send us an email and we will get back to you as soon as possible!

Alert