实现MapReduce

简介

当我们要统计数亿文本的词频,单个机械性能一样平常,况且是数亿级数据,处置是十分缓慢的,对于这样的义务,希望的是多台电脑配合处置,大幅度削减义务时间。团结多台电脑一起事情的系统就是分布式系统。

最近在学MIT6.824分布式系统课程,第一个Lab就是MapReduce,MapReduce是Google公司2004年揭晓的一篇论文,先容许多义务都可以分为两步操作——Map和Reduce(比如要统计词频,Map分别对每个文件天生单词和单一数目,分差别区块保留,Reduce对差别区块举行统计,获得最终效果),可以将这两个操作之外的包装起来,只提供Map和Reduce的接口,差别义务只需要替换差别的Map函数和Reduce函数即可。论文中还讲述了MapReduce分布式系统的实现细节以及应用场景。本文将以MIT6.824课程的Lab1为例,讲述若何完成MapReduce Lab1以及实现过程中遇到一些的难题。

需要的基础:

  • Go语言基础 (推荐官网的tour)
  • MIT6.824前两节的课程(B站链接
  • 读MapReduce(主要看实现那一块)

条记中的所有代码可以在https://github.com/FangYang970206/MIT6.824-2020中查看下载。

环境设置

环境设置可以看Lab1流程,手把手地教怎么设置,主要分两步:

第一步安装Go环境

wget -qO- https://dl.google.com/go/go1.13.6.linux-amd64.tar.gz | sudo tar xz -C /usr/local

第二步克隆Lab1堆栈

git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824

克隆出来的堆栈的src文件夹中只有几个是与MapReduce相关的,分别是:

  • main目录下的mrmaster.go, mrworker.go, mrsequential.go和test-mr.sh,另有pg*.txt代表的8个文件是要分布式处置的输入,这个文件内容也不需要变,test-mr.sh有五个义务,运行test-mr.sh可以知道自己是否通过所有义务。
  • mr文件夹,这个是MapReduce主要实现代码,事情量就在这了
  • mrapps是差别义务的Map和Reduce函数包,这个不需要管

系统框架一览

MapReduce系统是由一个master历程和多个worker历程组成,master和worker之间是通过RPC(Remote Procedure Call)举行通讯,master历程卖力给多个worker分配义务,纪录义务完成状态,而且需要处置worker奔溃或者超时运行等问题,worker需要处置响应的义务,处置完毕发送讲述给master,再请求下一个义务。我凭据代码函数挪用逻辑画出了一个系统框图,可以更好的明白MapReduce系统的事情原理:

实现MapReduce

代码详解

凭据上面的系统框图,现在来从代码中明白系统。

Master结构

type Flag struct {
	processing bool
	finished   bool
}

type Master struct {
	FileNames      []string
	MapFlags       []Flag
	ReduceFlags    []Flag
	MapTaskCnts    []int
	ReduceTaskCnts []int
	MapAllDone     bool
	ReduceALLDone  bool
	MapNum         int
	ReduceNum      int
	Mut            sync.Mutex
}
  • FileNames:pg*.txt这八个文件名
  • MapFlags:对应八个文件的Map义务状态,processing代表正在处置,finished示意已完成
  • ReduceFlag:同上
  • MapTaskCnts:这是纪录Map的当前义务序列号,若是某个map义务发生timeout,HandleTimeout函数对这个map义务的processing标志清0,重新分配,当前义务的序列号在上一个义务号中加1,若是之前发生timeout的义务来讲述完成,由于小于当前义务号,HandleWorkerReport函数可无需纪录,直接退出
  • ReduceTaskCnts:同上
  • MapAllDone:Map义务所有完成为true
  • ReduceAllDone:Reduce义务所有完成为true
  • MapNum:Map义务数
  • ReduceNum:Reduce义务数
  • Mut:互斥锁,由于有多个worker,制止条件竞争发生不确定行为,master内部数据需要互斥接见

Worker结构

type TaskState int

const (
	MapState    TaskState = 0
	ReduceState TaskState = 1
	StopState   TaskState = 2
	WaitState   TaskState = 3
)

type WorkerTask struct {
	MapID          int
	ReduceID       int
	ReduceNum      int
	MapNum         int
	MapTaskCnt     int
	ReduceTaskCnt  int
	State          TaskState
	FileName       string
	MapFunction    func(string, string) []KeyValue
	ReduceFunction func(string, []string) string
}
  • MapID和ReduceID:Map义务ID和Reduce义务ID
  • MapNum和ReduceNum:Map的义务总数和Reduce义务总数
  • MapTaskCnt和ReduceTaskCnt:Map义务序列号和Reduce序列号
  • State:义务有四种状态,分别是MapState,ReduceState,StopState和WaitState,MapState示意当前需要处置Map义务,ReduceState示意当前需要处置Reduce义务,WaitState示意当前没有需要处置的义务,最先睡眠守候,StopState代表义务已所有完成,可以退出。
  • FileName:示意Map义务需要的文件名
  • MapFunction和ReduceFunction:义务凭据State需要举行的Map函数或者Reduce函数

Master接口

建立Master

func MakeMaster(files []string, nReduce int) *Master {
	m := Master{FileNames: files,
		MapFlags:       make([]Flag, len(files), len(files)),
		ReduceFlags:    make([]Flag, nReduce, nReduce),
		MapNum:         len(files),
		ReduceNum:      nReduce,
		MapAllDone:     false,
		ReduceALLDone:  false,
		MapTaskCnts:    make([]int, len(files)),
		ReduceTaskCnts: make([]int, nReduce),
	}
	m.server()
	args, reply := NoArgs{}, NoReply{}
	go m.HandleTimeOut(&args, &reply)
	return &m
}

这个函数会由mrmaster.go文件的主函数挪用,建立一个master工具,需要传入文件名数组,以及要举行多少个Reduce义务,凭据这两个输入,可以初始化master参数。m.server()是关于RPC的内容,这里不去谈,有兴趣可以看看博客最后关于RPC内容, 只需要知道master函数要使用RPC,函数需要是两个参数(没参数会有忠告),都为指针形式,第一个示意输入参数,第二个示意输出参数,返回错误,无错误返回nil。然后建立一个线程专门处置timeout,然后将master返还给mrmaster的主函数,mrmaster主函数会确认master的MapAllDone和ReduceALLDone是否都为真,都为真则退出,否则睡眠一段时间再确认。

web安全笔记

天生worker task

func (m *Master) CreateWorkerTask(args *NoArgs, workerTask *WorkerTask) error {
	m.Mut.Lock()
	defer m.Mut.Unlock()
	if !m.MapAllDone {
		for idx := 0; idx < m.MapNum; idx++ {
			if !m.MapFlags[idx].processing && !m.MapFlags[idx].finished {
				workerTask.ReduceNum = m.ReduceNum
				workerTask.MapNum = m.MapNum
				workerTask.State = MapState
				workerTask.MapID = idx
				workerTask.FileName = m.FileNames[idx]
				m.MapTaskCnts[idx]++
				workerTask.MapTaskCnt = m.MapTaskCnts[idx]
				m.MapFlags[idx].processing = true
				return nil
			}
		}
		workerTask.State = WaitState
		return nil
	}
	if !m.ReduceALLDone {
		for idx := 0; idx < m.ReduceNum; idx++ {
			if !m.ReduceFlags[idx].processing && !m.ReduceFlags[idx].finished {
				workerTask.State = ReduceState
				workerTask.ReduceNum = m.ReduceNum
				workerTask.MapNum = m.MapNum
				workerTask.ReduceID = idx
				m.ReduceTaskCnts[idx]++
				workerTask.ReduceTaskCnt = m.ReduceTaskCnts[idx]
				m.ReduceFlags[idx].processing = true
				return nil
			}
		}
		workerTask.State = WaitState
		return nil
	}
	workerTask.State = StopState
	return nil
}

函数首先会获得互斥锁,然后判断MapAllDone是否为false,为false进入循环遍历,若是某个义务的processing状态和finished状态都为false,说明这个义务可以需要被处置,可以分配,讲设置参数写入到输出参数中,并标志master中当前义务的状态processing为true以及序列号。若是没有义务需要处置,说明map有些义务正在处置,有些已完成。进入守候阶段。判断ReduceALLDone与前面类似。不加以叙述。

处置worker report

func (m *Master) HandleWorkerReport(wr *WorkerReportArgs, task *NoReply) error {
	m.Mut.Lock()
	defer m.Mut.Unlock()
	if wr.IsSuccess {
		if wr.State == MapState {
			if wr.MapTaskCnt == m.MapTaskCnts[wr.MapID] {
				m.MapFlags[wr.MapID].finished = true
				m.MapFlags[wr.MapID].processing = false
			}
		} else {
			if wr.ReduceTaskCnt == m.ReduceTaskCnts[wr.ReduceID] {
				m.ReduceFlags[wr.ReduceID].finished = true
				m.ReduceFlags[wr.ReduceID].processing = false
			}
		}
	} else {
		if wr.State == MapState {
			if m.MapFlags[wr.MapID].finished == false {
				m.MapFlags[wr.MapID].processing = false
			}
		} else {
			if m.ReduceFlags[wr.ReduceID].finished == false {
				m.ReduceFlags[wr.ReduceID].processing = false
			}
		}
	}
	for id := 0; id < m.MapNum; id++ {
		if !m.MapFlags[id].finished {
			break
		} else {
			if id == m.MapNum-1 {
				m.MapAllDone = true
			}
		}
	}
	for id := 0; id < m.ReduceNum; id++ {
		if !m.ReduceFlags[id].finished {
			break
		} else {
			if id == m.ReduceNum-1 {
				m.ReduceALLDone = true
			}
		}
	}
	return nil
}

输入参数有一个标识位,示意义务是否乐成,乐成判断义务状态以及序列号,若是序列号与master对应上,可以解释这个义务乐成,若是对不上,说明这是个timeout义务,无需处置。若是义务标志位为false,进入错误处置,判断义务是否完成,由于可能是timeout义务标志位为false,未完成让processing置0,CreateWorkerTask可以重新分配。最后判断Map义务和Reduce义务是否响应所有完成,所有完成可以设置MapALLDone和ReduceALLDone为true。

处置timeout

func (m *Master) HandleTimeOut(args *NoArgs, reply *NoReply) error {
	for {
		m.Mut.Lock()
		if m.MapAllDone && m.ReduceALLDone {
			m.Mut.Unlock()
			break
		}
		time.Sleep(30 * time.Millisecond)
		if !m.MapAllDone {
			for idx := 0; idx < m.MapNum; idx++ {
				if m.MapFlags[idx].finished == false {
					m.MapFlags[idx].processing = false
				}
			}
		} else {
			for idx := 0; idx < m.ReduceNum; idx++ {
				if m.ReduceFlags[idx].finished == false {
					m.ReduceFlags[idx].processing = false
				}
			}
		}
		m.Mut.Unlock()
		time.Sleep(2000 * time.Millisecond)
	}
	return nil
}

处置timeout很简朴,先判断MapALLDone和ReduceALLDone是否都为true,都为true则退出即可。然后判断M义务那些还没有完成,对没有完成的义务的processing清0,就可以让CreateWorkerTask重新分配没有完成的义务了。最后释放锁,睡眠2s,可以看到Handletimeout函数是以2s为距离的,2s内没有完成的义务视为timeout。

Worker接口

天生worker

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	wt := WorkerTask{
		MapFunction:    mapf,
		ReduceFunction: reducef,
	}
	for {
		wt.GetWorkerTask()
		if wt.State == MapState {
			wt.DoMapWork()
		} else if wt.State == ReduceState {
			wt.DoReduceWork()
		} else if wt.State == StopState {
			break
		} else if wt.State == WaitState {
			time.Sleep(300 * time.Millisecond)
		}
	}
	return
}

func (wt *WorkerTask) GetWorkerTask() {
	cwa := NoArgs{}
	newWt := WorkerTask{}
	call("Master.CreateWorkerTask", &cwa, &newWt)
	if newWt.State == MapState {
		wt.ReduceNum = newWt.ReduceNum
		wt.MapNum = newWt.MapNum
		wt.State = newWt.State
		wt.MapID = newWt.MapID
		wt.FileName = newWt.FileName
		wt.MapTaskCnt = newWt.MapTaskCnt
	} else if newWt.State == ReduceState {
		wt.State = newWt.State
		wt.ReduceID = newWt.ReduceID
		wt.ReduceTaskCnt = newWt.ReduceTaskCnt
		wt.MapNum = newWt.MapNum
		wt.ReduceNum = newWt.ReduceNum
	} else if newWt.State == StopState {
		wt.State = newWt.State
	} else {
		wt.State = newWt.State
	}
}

mrworker会挪用worker函数,传入map函数和reduce函数,凭据函数参数建立一个worker,然后进入循环,挪用GetWorkerTask函数,这个函数会挪用Master.CreateWorkerTask函数,并传入两个参数,获得义务分配后,讲响应的参数和状态赋值给worker。worker就可以凭据状态进入处置响应义务或者睡眠,或者退出。

Map work

func (wt *WorkerTask) DoMapWork() {
	file, err := os.Open(wt.FileName)
	content, err := ioutil.ReadAll(file)
	file.Close()
	kvs := wt.MapFunction(wt.FileName, string(content))
	intermediate := make([][]KeyValue, wt.ReduceNum, wt.ReduceNum)
	for _, kv := range kvs {
		idx := ihash(kv.Key) % wt.ReduceNum
		intermediate[idx] = append(intermediate[idx], kv)
	}
	for idx := 0; idx < wt.ReduceNum; idx++ {
		intermediateFileName := fmt.Sprintf("mr-%d-%d", wt.MapID, idx)
		file, err = os.Create(intermediateFileName)
		data, _ := json.Marshal(intermediate[idx])
		_, err = file.Write(data)
		file.Close()
	}
	wt.ReportWorkerTask(nil)
}

func (wt *WorkerTask) ReportWorkerTask(err error) {
	wra := WorkerReportArgs{
		MapID:     wt.MapID,
		ReduceID:  wt.ReduceID,
		State:     wt.State,
		IsSuccess: true,
	}
	if wt.State == MapState {
		wra.MapTaskCnt = wt.MapTaskCnt
	} else {
		wra.ReduceTaskCnt = wt.ReduceTaskCnt
	}
	wrr := NoReply{}
	if err != nil {
		wra.IsSuccess = false
	}
	call("Master.HandleWorkerReport", &wra, &wrr)
}

为了增添可读性,我将处置错误的代码删除了,更悦目一些,Map work就是读取响应的文件,挪用MapFunction天生KeyValue对,然后凭据哈希函数获得要讲当前key分配到哪一块中,总共有ReduceNum块,最后凭据这么块天生对应map以及reduce块的文件。然后挪用ReportWorkerTask讲述乐成,传入nil示意乐成。ReportWorkerTask内部会挪用Master.HandleWorkerReport函数来汇报这一执行效果。

Reduce work

func (wt *WorkerTask) DoReduceWork() {
	kvsReduce := make(map[string][]string)
	for idx := 0; idx < wt.MapNum; idx++ {
		filename := fmt.Sprintf("mr-%d-%d", idx, wt.ReduceID)
		file, err := os.Open(filename)
		content, err := ioutil.ReadAll(file)
		file.Close()
		kvs := make([]KeyValue, 0)
		err = json.Unmarshal(content, &kvs)
		for _, kv := range kvs {
			_, ok := kvsReduce[kv.Key]
			if !ok {
				kvsReduce[kv.Key] = make([]string, 0)
			}
			kvsReduce[kv.Key] = append(kvsReduce[kv.Key], kv.Value)
		}
	}
	ReduceResult := make([]string, 0)
	for key, val := range kvsReduce {
		ReduceResult = append(ReduceResult, fmt.Sprintf("%v %v\n", key, wt.ReduceFunction(key, val)))
	}
	outFileName := fmt.Sprintf("mr-out-%d", wt.ReduceID)
	err := ioutil.WriteFile(outFileName, []byte(strings.Join(ReduceResult, "")), 0644)
	wt.ReportWorkerTask(nil)
}

同样把一些错误处置删除了,首先读取相同块的所有文件,需要对相同key的内容聚合在一起,然后循环挪用ReduceFunction获得reduce的效果,最后天生输出。

遇到过的坑

主要遇到的两个坑,一个是关于GetWorkerTask,一个是CreateWorkerTask

首先说GetWorkerTask,最最先代码是下面这样子,我把wt作为参数传入进去,我发现后期挪用的时刻,wt的参数是不会更新的,一直处于WaitState,导致义务worker无法事情。新建立一个WorkerTask为参数,传入即可解决问题。

func (wt *WorkerTask) GetWorkerTask() {
	cwa := NoArgs{}
	call("Master.CreateWorkerTask", &cwa, wt)
}

第二个是头脑还没有转变过来的问题,分布式系统需要有分布式的头脑,这是CreateWorkerTask的截取代码,可以看到少了两行,没有对MapNum和ReduceNum举行初始化,为什么会做不初始化呢,由于那时我想的是上面的Map义务已经初始化,没有必要再举行初始化,这就是错误的泉源,万一之前初始化的worker crash掉了,map义务所有完成,那新的worker进入reduce,你不初始化MapNum和ReduceNum就会有bug,最显著的你运行CrashTest义务时,发现最后天生的效果有的有,有的没有,有的是之前运行Map义务的,现在运行Reduce义务,没有的就是新的worker直接进入Reduce义务,默认初始化为0,则循环读文件直接退出。

if !m.ReduceALLDone {
		for idx := 0; idx < m.ReduceNum; idx++ {
			if !m.ReduceFlags[idx].processing && !m.ReduceFlags[idx].finished {
				workerTask.State = ReduceState
				workerTask.ReduceID = idx
				m.ReduceTaskCnts[idx]++
				workerTask.ReduceTaskCnt = m.ReduceTaskCnts[idx]
				m.ReduceFlags[idx].processing = true
				return nil
			}
		}
		workerTask.State = WaitState
		return nil
	}

RPC

Go语言举行RPC异常简朴,有现成的RPC的包,异常利便。

func masterSock() string {
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

func (m *Master) server() {
	rpc.Register(m)
	rpc.HandleHTTP()
	sockname := masterSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	go http.Serve(l, nil)
}

func call(rpcname string, args interface{}, reply interface{}) bool {
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	defer c.Close()
	c.Call(rpcname, args, reply)
}

删减了一些错误处置代码,焦点代码就是上面这些,只需要20来行就可以构建好RPC流程,首先master要挪用server函数,举行rpc注册以及rpc处置http,然后获得套接字名,移除系统中套接字名,然后最先监听,建立线程举行http服务。server函数运行好之后。worker就可以凭据套接字名举行拨号,然后挪用master的函数。

结语

MapReduce先容就到这了,推荐自己实验实现一遍,收获照样很大的,包罗mapreduce细节实现,加倍熟悉Go,分布式调试(可以看看这个commit下的代码,没有删减打印,可以清晰看输出,特别是Crashtest,可以将test-mr.sh前四个义务注释掉,看CrashTest输出)。

原创文章,作者:28x29新闻网,如若转载,请注明出处:https://www.28x29.com/archives/7338.html