MapReduce Framework: Users define map function that processes a key/value pair to generate a set of intermediate k/v pairs; A reduce function that merge all intermediate k/v pairs associated with the same intermediate key.

MapReduce Framework hides the messy details of parallelization, fault-tolerance, data distribution and load balancing in a library.

MapReduce overview

Execution Overview

  1. Split input data into a set of M splits

  2. Master assigned task to work

  3. Worker call map function and produces intermediate files

  4. Partitioning the intermediate key into R pieces using partitioning function(like hash*(key) mode R). Map阶段将key分成R份,由R个reduce task来处理,一共有M个map task。map阶段完成后,一共有M*R个文件。

  5. Reduce worker collects the same intermediate keys(one of the R pieces partition)

  6. Worker call reduce, produces the final outputs.

Master Data Structure

  • For each map task and reduce task, it stores the state (idle, in-progress,or completed)
  • for each completed map task, the master stores the locations and sizes of the R intermediate file regions .

Fault Tolerance

Worker Failure

The master ping every worker periodically.

  1. If no response is received from a worker in a certain amount of time, the master marks the worker as failed.

  2. The task is reset to idle state and rescheduling to other workers.

Master Failure

Aborts the MapReduce computation if the master fails.

问题

看过论文有个疑惑,Map阶段和Reduce阶段可以并行吗?应该是不行?因为没有运行完Map阶段的所有任务,所产生的intermediate file数据是不完整,这时候去运行Reduce也没用。Lab1里也印证了这一点,只有全部Map task完成后,才开始Reduce task。

Workers will sometimes need to wait, e.g. reduces can’t start until the last map has finished.

Lab1

和论文中的描述不同,Lab1由worker向master发起RPC请求任务(单向RPC),而不是worker向master注册,master再主动分配任务(双向RPC)。master有两个RPC调用:``AssignTaskDoneTask`

  • Master存储任务和其状态

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    type WorkerTask struct {
    	filename string // for map tasks
    	state    TaskState
    	id       int // for non-idle tasks
    }
    
    type Master struct {
    	allMapTaksDone    bool
    	allReduceTaksDone bool
    	mapTask    []WorkerTask
    	reduceTask []WorkerTask
    	nReduce    int
    	nFiles     int
    
    	sync.Mutex
    }
    
  • worker函数死循环获取任务

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    func Worker(mapf func(string, string) []KeyValue,
    	reducef func(string, []string) string) {
    	for {
    		time.Sleep(time.Second)
    		reply := askTask()
    
    		if reply.Phase == MapPhase {
    			go doMapTask(mapf, reply.File, reply.Id, reply.NumForOtherPhase)
    		} else if reply.Phase == ReducePhase {
    			go doReduceTask(reducef, reply.Id, reply.NumForOtherPhase)
    		}
    	}
    }
    
  • AssignTask:给worker分配任务,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    type TaskReply struct {
    	File  string // only for map phase
    	Phase JobPhase
    	Id    int //task index
    	// for map, it's nReduce to naming the intermediate file,
    	// for reduce, it's nFile to collect intermediate files
    	NumForOtherPhase int
    	Kill             bool // all task done, kill the worker
    }
    
  • DoneTask:任务完成,包含完成任务的id和阶段

    1
    2
    3
    4
    5
    
    type TaskDoneArgs struct {
    	Id    int // done task id
    	Phase JobPhase
    	// TempFiles *[]os.File
    }
    

Fault Tolerance

在Lab1的提示中已经给出容错处理的方法:Master分配任务后,开新线程等待10秒,如果该任务还没完成就判断为失败,重新分配任务。

不过依旧有一个reduce parallelism test没过,不知道哪有问题了。一边看论文,一边写,Lab1用了16个小时,我好菜啊,下一课。。。