Home > Software > BIGDATA > HADOOP
Interview Questions   Tutorials   Discussions   Programs   Videos   Discussion   

HADOOP - How to Custom Partitioner?

asked Experts-976 November 25, 2014 10:34 AM  

How to Custom Partitioner?


2 Answers

answered By marvit   0  

And also refere below videos

   add comment

answered By Experts-976   0  

how to use customized partitioner in a MapReduce program.

The partitioning phase takes place after the map phase and before the reduce phase. The number of partitions is equal to the number of reducers. The data gets partitioned across the reducers according to the partitioning function[1] . The difference between a partitioner and a combiner is that the partitioner divides the data according to the number of reducers so that all the data in a single partition gets executed by a single reducer. However, the combiner functions similar to the reducer and processes the data in each partition. The combiner is an optimization to the reducer. The default partitioning function is the hash partitioning function where the hashing is done on the key. However it might be useful to partition the data according to some other function of the key or the value.

For this example we do not consider a graph problem. Let's consider the data that has input in the following format:

Input Format: nameagegenderscore

We will use custom partitioning in MapReduce program to find the maximum scorer in each gender and three age categories: less than 20, 20 to 50, greater than 50.

















Partition - 0: (this partition contains the maximum scorers for each gender whose age is less than 20)

Nancy<tab>age- 7<tab>female<tab>score-98
Adam<tab>age- 9<tab>male<tab>score-37

Partition - 1: (this partition contains the maximum scorers for each gender whose age is between 20 and 50)

Kristine<tab>age- 38<tab>female<tab>score-53
Bob<tab>age- 34<tab>male<tab>score-89

Partition - 2: (this partition contains the maximum scorers for each gender whose age is greater than 50)

Monica<tab>age- 56<tab>female<tab>score-92
Chris<tab>age- 67<tab>male<tab>score-97


PartitionMapper prepares the data for the partitioner and the reducer. It parses the input records and emits key-value pairs where the key is the gender and the value is the other information associated with a person.

//mapper output format : gender is the key, the value is formed by concatenating the name, age and the score

// the type parameters are the input keys type, the input values type, the
// output keys type, the output values type
public static class PartitionMapper extends
        Mapper<Object, Text, Text, Text> {

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        String[] tokens = value.toString().split("\t");

        String gender = tokens[2].toString();
        String nameAgeScore = tokens[0]+"\t"+tokens[1]+"\t"+tokens[3];

        //the mapper emits key, value pair where the key is the gender and the value is the other information which includes name, age and score
        context.write(new Text(gender), new Text(nameAgeScore));


customized partitioner called the AgePartitioner that extends the Partitioner class. It overrides the getPartition function (lines 9-32), which has three parameters. The key and value are the intermediate key and value produced by the map function. The numReduceTasks is the number of reducers used in the MapReduce program and it is specified in the driver program. Here we parse the value and get the age information in lines 12-13. Then we assign them to the partition 0, 1, or 2, depending on the age categories [lines 20-30]. Lines 16-17 return partition 0 in case the number of reducers is set to 0, to avoid divide by zero exception. It is possible to have empty partitions with no data. We do the assigned partition number modulo numReduceTasks to avoid illegal partitions if the system has a lesser number of possible reducers than the assigned number.

//AgePartitioner is a custom Partitioner to partition the data according to age.
    //The age is a part of the value from the input file.
    //The data is partitioned based on the range of the age.
    //In this example, there are 3 partitions, the first partition contains the information where the age is less than 20
    //The second partition contains data with age ranging between 20 and 50 and the third partition contains data where the age is >50.
    public static class AgePartitioner extends Partitioner<Text, Text> {

        public int getPartition(Text key, Text value, int numReduceTasks) {

            String [] nameAgeScore = value.toString().split("\t");
            String age = nameAgeScore[1];
            int ageInt = Integer.parseInt(age);

            //this is done to avoid performing mod with 0
            if(numReduceTasks == 0)
                return 0;

            //if the age is <20, assign partition 0
            if(ageInt <=20){               
                return 0;
            //else if the age is between 20 and 50, assign partition 1
            if(ageInt >20 && ageInt <=50){

                return 1 % numReduceTasks;
            //otherwise assign partition 2
                return 2 % numReduceTasks;



PartitionReducer finds the maximum scorer in each age category. That is, it finds the maximum scorer in each partition for both male and female.

overrides the reduce function of the Reducer class. iterate over all the values and finds the maximum scorer for male and female in each age category. This information is emitted from the reducer in the form of key-value pair in. The key is the name and the value is the rest of the person's information.

//The data belonging to the same partition go to the same reducer. In a particular partition, all the values with the same key are iterated and the person with the maximum score is found. //Therefore the output of the reducer will contain the male and female maximum scorers in each of the 3 age categories.

// the type parameters are the input keys type, the input values type, the
// output keys type, the output values type

static class ParitionReducer extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        int maxScore = Integer.MIN_VALUE;

        String name = " ";
        String age = " ";
        String gender = " ";
        int score = 0;
        //iterating through the values corresponding to a particular key
        for(Text val: values){

            String [] valTokens = val.toString().split("\\t");
            score = Integer.parseInt(valTokens[2]);

            //if the new score is greater than the current maximum score, update the fields as they will be the output of the reducer after all the values are processed for a particular key          
            if(score > maxScore){
                name = valTokens[0];
                age = valTokens[1];
                gender = key.toString();
                maxScore = score;
        context.write(new Text(name), new Text("age- "+age+"\t"+gender+"\tscore-"+maxScore));


The driver code is similar to the driver code of the single-source shortest-path problem except that the classes are set according to the classes of this program. In addition it has the following line:

   add comment

Your answer

Join with account you already have



 Write A Tutorials
Online-Classroom Classes

  1 person following this question

  4 people following this tag

  Question tags

hadoop × 7

Asked 1 year and 11 months ago ago
Number of Views -491
Number of Answers -2
Last updated
1 year and 10 months ago ago

Ready to start your tutorial with us? That's great! Send us an email and we will get back to you as soon as possible!