科普文:重读并翻译分布式计算经典文论-MapReduce

评论: 0 浏览: 1427 ***新更新时间: 2个月前

MapReduce计算模型于2003年提出于google,并迅速在google内部被广泛运用在各个领域,因为google存在许多基于大型数据的计算问题,而刚好MapReduce能够同时利用上千台机器进行分布式并行计算。MapRedue易于使用,由于它把并行化、分布式、容错、负载均衡等细节全部隐藏在内部实现中,因此,哪怕不具备分布式并行计算经验的程序员也能轻松利用MapReduce和大量机器来进行大规模计算,文中还谈到许多关于分布式计算的优化,如果要系统学习分布式计算,可以好好重温一下经典。

Abstract

Mapreduce是一种处理和生产大型数据集的大型数据集的编程模型和相关实现。用户指定一个map函数,该函数处理键/值对以生成一组中间健/值对,以及一个reduce函数,该函数合并与同一中间键关联的所有中间值。如本文所示,许多现实世界中的任务都可以用该模型表示。

用这种函数式风格编写的程序可以自动并行化,并在大型商用机器集群上执行。运行时系统需要关注对输入数据进行分区、在一组机器上调度程序的执行、处理机器故障以及管理所需的机器间通信等细节。这使得没有任何并行和分布式经验的程序员可以轻松地利用大型分布式系统的资源。

我们实现的MapReduce可运行在大型商业集群机器上,且具有高度可扩展性:一个典型的MapReduce计算能够处理数千台机器上的TB级别的数据。程序员会发现这个系统非常易用:目前已经实现了数百个MapReduce程序,且每天都在google集群上执行着数千个MapReduce任务。

1. Introduction

在过去5年里,作者和google的许多其他人已经实现了数百种特殊用途的计算,这些计算处理大量的原始数据,如抓取的文档、web请求日志等,以及处理各种派生数据,如倒排索引、web文档的图形结构的各种表示,每台主机抓取的页面数量的摘要、给定***中***频繁的查询集等。大多数这样的计算在概念上是很直白简单的,然后,输入数据量通常很大,为了确保在合理的时间内完成,必须将计算分布到成百上千台机器上,这样就会出现如何处理并行计算、分布数据以及处理故障等问题,为了解决这些问题,不得不使用大量复杂的代码来处理原本简单的计算,进而让其变得异常模糊。

为了应对这种复杂性,我们设计了一个新的抽象,它允许我们表达尝试执行的简单计算,但是把并行、容错、数据分布以及负载均衡的混乱细节隐藏在库中。这个抽象灵感是来自Lisp和许多其他函数式语言的map和reduce原语。我们发现,大多数计算涉及将map操作运用到输入的每个逻辑“记录”,以算出一组中间键/值对,然后对共享同一键的所有值应用reduce操作,以便适当的组合派生数据。我们使用带有用户指定的map和reduce操作的功能模型,使我们能够轻松地并行化大型计算,并将重新执行作为容错的主要机制。

这项工作的主要贡献是实现了一个简单而强大的接口,它可以实现大规模计算的自动并行化和分布,并结合该接口的实现,在大型商用pc集群上实现高性能。

第二节描述了基本的编程模型,并给出了一些示例。第三节描述了针对基于集群的计算环境定制的MapReduce接口实现。第四节描述了我们认为有用的编程模型的几个改进。第五节对各种任务的实现进行了性能测试。第六节探索了MapReduce在Google中的运用,包括我们使用它作为基础的经验重写了生产环境中的索引系统。第7节讨论了相关的和未来的工作。

2. Programming Model

计算接受一组输入键值对,并产生一组输出键值对。MapReduce库的用户将计算表达为两个函数:Map 和 Reduce。

Map,由用户编写,接受一个输入对并产生一组中间键值对。MapReduce库将所有和相同中间键I相关的值组合在一起,并将其传递给Reduce函数。

同样由用户编写的Reduce函数接受一个中间键I和该键的一组值。它将这些值合并在一起,形成了一个可能更小的值集。通常,每次Reduce调用只会产生0个或者1个输出值。中间值通过迭代器提供给用户的reduce函数。这允许我们处理大到内存无法容纳的值列表。

2.1 Example

考虑计算一个大型文档集合中每个单词出现次数的问题。用户将编写类似以下伪代码的代码:

map(String key, String value):
 // key: document name
 // value: document contents
 for each word w in value:
  EmitIntermediate(w, "1");
reduce(String key, Iterator values):
 // key : a word
 // values: a list of counts
 int result = 0;
 for each v in values:
  result += ParseInt(v);
  Emit(AsString(result));

Map函数发出每个单词加上相关的出现次数(在这个简单示例中只有1)。Reduce函数对特定单词的所有的计数求和。

此外,用户编写代码,用输入和输出文件的名称以及可选的调优参数填充mapreduce规范对象。然后用户调用Mapreduce函数,将规范对象传给它。用户代码要和MapReduce库链接在一起(c++实现)。附录A给出了这个例子的完整代码。

2.2 Types

尽管之前的伪代码是根据字符串输入和输出编写的,但从概念上来讲,用户提供的map和reduce函数具有相关类型:

map (k1, v1)  -> list(k2, v2)
reduce (k2, list(V2))  -> list(v2)

比如,输入和键值和输出的键值来自不同的域,此外,中间键值和输出键值来自相同的域。

我们C++实现将字符串传递给用户定义的函数,并接受其输入的字符串,用户可以自己在代码中将字符串转换成想要的类型。

2.3 More Examples

接下来是一些有趣程序的简单示例,它们能够轻松地表示为MapReduce计算。

分布式Grep:如果map函数匹配到给定模式,会发出匹配到的行。Reduce是一个恒等函数,只负责透传中间值。

URL访问频率统计:map函数处理网页请求日志并输出. reduce函数将相同URL的所有值加在一起,并输出键值对

反向网页连接图:map函数在名为source的页面中找到的每个指向目标url的链接,并输出键值对。reduce函数把所有与给定的目标URL关联的所有源URL拼接起来,并发出键值对。

每台主机的术语向量:术语向量将一个文档或一组文档中出现的***重要的单词总结为的键值对的列表。map函数为每一个输入文档发出(其中hostname从文档的url中提取出来)。给定主机的每个文档术语向量被传递给reduce函数,reduce将同一主机的术语向量汇总,丢掉不常见的术语,并发出***终的键值对。

倒排索引:map函数解析每个文档并发出一系列键值对。reduce函数接受给定单词的所有键值对,根据文档ID进行排序并发出键值对。所有输出键值对的集合组成了一个简单的倒排索引。很容易扩散这个计算过程,以便同时记录单词的位置信息

分布式排序:map函数从每条记录中提出键,并发出键值对。reduce函数不做任何修改的进行转发。这个计算依赖4.1节中讨论的分区工具以及4.2节中描述的排序属性。

3. Implementation

MapReduce接口的多种不同实现是有可能的,正确的选择取决于环境。例如,一种实现可能适用于小型共享内存机器,另一种可能适用于大型NUMA多处理器,还有一种适用于大型网络集群。

本节描述的实现是针对google广泛使用的计算环境:大型商用PC集群通过交换式以太网连接在一起:在我们的环境中:

1) 使用的机器通常是x86双处理器,每台机器2-4GB的内存。

2)商业网络使用的硬件:通常在机器级别上是100M/s或者1G/s,但从总体双向带宽的平均值来看要少得多。

3)集群由数百台或数千台机器组成,因此机器故障是很常见的。

4)存储是由直接连接在单台机器上的廉价IDE磁盘提供。内部开发的分布式文件系统用于管理存储在这些磁盘上的数据。文件系统使用副本集方式在不可靠的硬件上提供可用性和可靠性。

5)用户分配工作给调度系统。每项工作包含一组任务,并由调度器映射到集群中的一组可用机器上。

3.1 Execution Overview

通过自动将输入数据分割M份,来将Map调用分布在多台机器上。输入分割可以由多台机器并行处理。Reduce调用通过使用分区函数(例如hash(key) mod R)将中间键空间划分为R块来分发。分区的数量R和分区函数由用户指定。

图1展示了我们实现的MapReduce的总体流程,当用户调用MapReduce函数时,会发生以下系列动作(图一中编号标签对应下面列表中的数字)

1) 用户程序中MapReduce库首先会把输入文件分成M块,每块通常是16到64M(可由用户通过可选参数控制)。然后在一组机器上启动该程序的多个副本。

2)其中,主节点上的程序备份是比较特殊的。其他工作节点的任务都由主节点分配。有M个map任务和R个reduce任务要分配。主节点会选择一个空闲工作,并为其分配一个map或者reduce任务。

3)分配到map任务的工作节点会读取对应输入分割的内容。从输入数据中解析出键值对,然后传递给用户定义的Map函数。该Map函数产生的中间键值对会缓存在内存中。

4)缓存的键值对会定期写入本地磁盘,通过分区函数划分成R个区域。这些缓存键值对在本地磁盘中的位置需要上传给主节点,由主节点负责将这些位置信息转发给reduce工作节点。

5)当一个reduce节点被主节点告知这些位置信息后,它会用远程调用从map节点的本地磁盘中读取缓存数据。当一个reduce节点读取完全部的中间数据后,会按中间键对数据进行排序,以便所有出现的相同键被分在一组。排序是必要的,因为通常有许多不同的键映射到同一个reduce任务。如果中间数据量太大,内存无法容纳,则使用外部排序。

6)reduce工作节点会遍历已排序的中间数据,每当遇到一个***的中间键,就会将键和对应的中间值集合传递给用户的Reduce函数。reduce函数的输出会被附加到这个reduce分区的***终输出文件中。

7)当所有的map和reduce任务完成后,主节点会唤醒对应的用户程序。***此,用户程序中的mapreduce函数调用返回到用户代码。

成功完成之后,mapreduce执行的输出在R份输出文件中可用(每一个reduce任务一个,文件名由用户指定)。通常,用户不需要将R份输出文件组合成一个文件,他们通常将这些文件作为输入传给另一个mapreduce调用,或者由另一个能够处理输入为多个分区文件的分布式应用程序来使用他们。

3.2 Master Data Structures

主节点保留了几个数据结构,用来存储每个map和reduce任务的状态(空闲,进行中,或者完成),以及每台工作机器的标识(对于非空闲任务)。

主节点是将中间文件区域的位置信息从map任务传递给reduce任务的通道,因此,对于每一个完成的map任务,主节点会存储由map任务产生的R中间文件区域的位置和大小。消息被增量的推送到正在执行reduce任务的工作节点。

3.3 Fault Tolerance

由于MapReduce库的设计目的是帮助处理使用成百上千台机器的大量数据,因此必须优雅地容纳机器故障。

Worker Failure

主节点会定期ping每个工作节点,如果在一定时间内没有收到该节点的回应,就会将其标记为故障节点。任何由该节点完成的map任务都会被重置为初始空闲状态,进而能够被调度到其他节点。同样地,任何在故障节点上正在被处理的map或reduce任务,也会被重置为空闲状态,并有资格重新调度。

已完成的map任务发生故障时需要重新执行,因为这些任务的输出数据存储在故障机器的本地磁盘上,因此无法访问。完成的reduce任务不需要重新执行,是因为它的输出数据存储在全局文件系统中。

当一个map任务首先被节点A执行,然后由被节点B执行(因为A故障了),所有执行reduce任务的节点都会被通知重新执行。任何还有从节点A读取数据的reduce任务,会从节点B读取数据。

MapReduce对大规模机器故障是有具有弹性的。例如:在一个mapreduce操作期间,由于运行集群上的网络维护会导致同时80台机器在几分钟内不可用,MapReduce主节点会无脑地重新执行由这些机器完成的任务,并继续向前推进,***终完成MapReduce操作。

Master Failure

让主节点将上述重要数据结构做出检查点备份是一件很容易的事情,如果主节点任务挂掉了,可以利用***近一次检查点状态启动一个新的副本。但是,考虑到只有一个主节点,其发生故障的可能性比较低。因此,如果主节点挂掉,目前的处理方式是则直接终止MapReduce的计算。客户端可以检查主节点的状态,如果需要的话,可以重试MapReduce操作。

Semantics in the Presence of Failures

当用户提供的map和reduce操作符是其输入值的确定性函数时,我们分布式系统实现产生的输出与整个程序的非错误顺序执行所产生的输出是一样的。

我们依赖map和reduce任务输出的原子提交来实现这个属性,每个处理中的任务会将它的输出写入私有临时文件中。一个reduce任务生成这样一份文件,一个map任务会有R份这样的文件(每个reduce任务生成一个)。每当一个map任务完成,工作节点会发送一个消息给主节点,消息中会包含R份临时文件的名字。如果主节点收到已经完成的map任务的完成信息,就会忽略该信息。同时,会在主节点数据结构中记录R份文件的名字。

每当一个reduce任务完成,reduce节点就会自动将临时输出文件命名为***终输出文件。如果在多台机器上执行相同的reduce任务,则将对相同的***终输出文件执行多个命名操作。我们依赖于底层文件系统提供的原子命名操作来保证***终文件系统状态只包含一个reduce任务执行所产生的数据。

***大多数map和reduce操作符都是确定的,在这种情况下,我们的语义相当于顺序执行,这就让程序员很容易推断他们程序的行为。

当一个map和/或reduce操作不确定时,我们提供较弱但仍然合理的语义,当存在不确定性操作符的情况下,特性reduce任务R1的输出相当于非确定性程序顺序执行对R1产生的输出。然而,不同的reduce任务R2产生的输出可能对应于不确定程序的不同顺序执行所产生对R2的输出。

给定map任务M和reduce任务R1和R2,设e(Ri)为提交的Ri执行(只考虑一次执行),弱语义就会产生是因为e(R1)可能读取一次M执行产生的输出,e(R2)可能会读取另一次M执行所产生的输出。

3.4 Localiy

网络带宽在我们的计算环境中是相对稀缺的资源。我们通过利用输出数据存储在组成集群的机器的本地磁盘这一事实来节省网络带宽。GFS会将每个文件分割成64M的块,并将每块数据的多份拷贝(通常3份)存储在不同的机器。MapReduce主节点会将输入文件的位置信息纳入考虑,并试图将map任务调度到包含对应输入文件拷贝的机器上,如果失败,它会尝试在该任务输入数据的副本附近调度map任务,(例如,在与工作机器和包含数据的机器处在相同网络交换机上)。当在一个集群中相当一部份工作节点运行大型mapreduce操作时,通常大部份输入数据都在本地读取,不需要消耗任何网络带宽。

3.5 Task Granularity

如上所述,我们会将map阶段细分为M个片段,将reduce阶段分成R个片段,理想情况下,M和R应该比工作机器的数量大得多,会让每台机器执行许多不同的任务可以改善动态负载均衡,同时当一个工作节点出现故障时可以加快恢复:它完成的许多map任务可以分散到所有其他工作机器上。

在我们的实现中M和R的大小是有实际限制的,因为主节点必须做出O(M+R)的调度决策,并如上所述内存中保持O(M*R)个状态,(然而,内存使用的常量因素很小:状态的O(M*R)块由每个map任务/reduce任务对大约一个字节的数据组成)

此外,R经常受到用户约束,因为每个reduce任务的输出***终会在一个单独的输出文件中。在实践中,我们倾向于选择M,这样每个单独任务大约有16到64M的输出数据(因此上述描述的局部性优化是***有效的),我们让R是我们期待使用的工作机器数量的一个小倍数。我们经常使用2000台工作机器,在M=200,000,R=5,000的情况下执行MapReduce计算。

3.6 Backup Tasks

导致mapreduce操作总时间延长的常见原因之一是“掉队者”:一台机器在完成计算中***后map和reduce任务中的一个时,花费了异常长的时间。“掉队者”出现有很多原因。例如:具有坏磁盘的机器可能会遇到频繁的可纠正错误,从而使其读取性能从读性能从30M/s降为1MB/s。集群调度系统可能已经调度了机器上的其他任务,由于CPU、内存,本地磁盘或者网络带宽的竞争导致它执行MapReduce代码速度更慢。我们***近遇到的一个问题是机器初始化代码的一个bug,导致处理器缓存被禁用:受影响机器上的计算速度减慢了一百多倍。

我们有一个通用机制来缓解掉队者的问题,当一个MapReduce操作接近完成时,主节点调度剩余正在执行的任务执行备份,每当任务完成还是备份执行完成,任务都会被标记为完成。我们已经对这种机制进行了调优,使它通常只增加操作所使用的计算资源几个百分点。我们发现这个机制大大减少了完成大型MapReduce操作的时间。例如,当禁掉备份任务的机制,5.3节中描述的排序程序要多花44%的时间来完成。

4 Refinements

虽然简单编写Map和Reduce函数提供的基础功能足以满足大多数需求,但我们发现有一些扩展很有用。本节我们将介绍这些特性。

4.1 Partitioning Functions

MapReduce用户指定他们想要的reduce任务/输出文件的数量(R),使用中间键上的分区函数在这些任务之间进行分区。提供了一个使用哈希(hash(key) mod R)的默认分区函数,这往往会导致相当均衡的分区。但是,在某些情况下,根据键的其他函数来对数据进行分区更有用。例如,有时输出键是URL,我们希望单个主机的所有条目***终都在相同的数据文件中,为了支持像这样的情况,mapreduce库的用户可以提供一个特殊分区函数。例如:使用hash(Hostname(urlkey)) mod R作为分区函数来让相同主机的所有URL***终被放到一个输出文件中。

4.2 Ordering Guarantees

我们保证在同一个分区中,中间键/值对会按键递增顺序处理。这个排序保证可以很容地为每个分区很生成排序的输出文件,当输入文件的格式需要支持按键进行高效的按键随机访问查找,或者使用输出数据的用户发现拥有排序数据处理起来更为方便时,这是很有用的。

4.3 Combiner Function

在某些情况下,每个map任务产生的中间键存在显著的重复,并且用户指定的reduce函数是可交换和关联的。2.1节中的单子计数就是一个很好的例子。由于单词频率倾向于遵循Zipf分布,每个map任务会产生成百上千条形式为的记录,所有这些计数将通过网络发送给单独的reduce任务,然后由reduce函数加起来得到一个数字。我们允许用户指定一个可选的组合函数,该函数在数据通过网络被发送之前对其进行部份合并。

组合函数在每台执行map任务的机器上被执行,通常组合函数和reduce函数的实现可以用一份代码。reduce函数和组合函数的***区别是mapreduce库如何处理函数的输出。reduce函数的输出被写进一个***终的输出文件,组合函数的输出被写进一个中间文件,这个文件将会被发送给reduce任务。

部份组合显著加快了某些类型的mapreduce操作,附录A中给出了使用组合器的例子。

4.4 Input and Output Types

MapReduce库提供了支持以几种不同的格式读取输入数据。例如,文本形式的输入会将每一行视为一个键值对:键是文件中的偏移量,值是该行的内容。另一种常见的支持格式存储按键排序和键值对序列。每个输入类型实现都知道如何将自己划分为有意义的范围,以便作为单独的map任务进行处理。(例如,文本格式的范围分割确保范围分割只发生在行边界处)。用户可以提供简单阅读器接口的实现增加对新输入类型的支持,尽管大多数用户只使用少数预定义输入类型中的一种。

读取器不一定需提供从文件读取的数据。例如,从数据库中读取记录或者从内存中映射的数据结构读取记录的读取器是很容易定义的。

以类似的方式,我们支持一组输出类型来生成不同格式的数据,用户代码同样可以很容的添加对新输出类型的支持。

4.5 Side-effects

在某些情况下,mapreduce用户发现从他们的map和/或reduce操作符生成的辅助文件,作为附加输出很方便。我们依赖应用程序编写器来保证这些副作用的原子化和幂等化。通常,应用程序写入临时文件,并在完全生成该文件后对文件进行原子的命名操作。

我们不支持单个任务生成的多个输出文件的原子两阶段提交。因此,产生具有跨文件一致性要求的多个输出文件的任务应该是确定的。这种限制在实践中从来没有成为问题。

4.6 Skipping Bad Records

有时,因为用户代码的bug会导致map或reduce函数在某些记录上必然崩溃,这些bug会导致mapreduce操作无法完成,通常的做法是修复bug,但有时没有那么容易。可能bug存在于没法获得源代码的第三方库中。此外,有时忽略一些记录也是可以接受的,例如对大型数据集中进行统计分析时,我们提供一个可选的执行模式,其中mapreduce库检测到造成确定崩溃的记录时,会跳过这些记录并向前推进。

每一个工作进程会安装一个捕捉段污染或总线错误的信号处理器,在调用用户map或reduce操作前,mapreduce库会将参数的序列号存储在全局变量中。如果用户代码生成一个信号,信号处理器就会发送一个包含序列号的“last gasp”UDP包给mapreduce主节点。当主节点在一个特定的记录上看到不止一个失败时,它就表示在下次重新执行对应的map或者reduce任务时应该跳过这些记录。

4.7 Local Execution

map或reduce的调试问题非常的诡异,因为实际的计算一个分布式系统中,这个系统的机器是在由主节点在几千台机器中动态分配的。为了方便调试、性能监控以及小型的测试,我们开发了一个mapreduce库可选的实现,可以在本地机器上顺序执行mapreduce操作的所有工作。用户可以进行控制将计算限制在特定的map任务中,用户可以用一些特殊的标记来调用他们的程序,进而能够方便的使用他们认为有用的任何调试或者测试工具(例如:gdb)

4.8 Status Information

主节点运行一个内部http服务并导出一组状态页面供人们使用,状态页面会展示计算的进程,例如有多少任务已经完成,有多少任务正在在进行中,输入字节数,中间数据字节数,输出字节数,处理速率等。这些页面还包含了每个任务生成的标准错误和标准输出文件的链接。用户可以根据这些数据来预测计算还需多长时间,以及是否应该在计算中增加更多的资源,这些页面也可以用来确定计算何时比预期慢得多。

此外,***状态页面展示了哪些工作节点出现了故障,以及他们出现故障时正在执行哪些map和reduce任务。当试图诊断用户代码中的bug时,此信息非常有用。

4.9 Counters

mapreduce库提供一个计数器工具来计算各种事件发生的次数。例如,用户代码可能想要统计处理的单词总数或者索引的德国文档的数量等。

使用这个工具,用户代码创建一个命名计数器对象,然后在map和/或reduce函数中适当地增加计数。例如:

Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
 for each word w in contents:
  if(IsCapitalized(w)):
   uppercase->Increment();
  EmitIntermediate(w, "l");

来自各个工作节点的统计值会定期传给主节点(在ping响应的基础上),主节点从成功的map和reduce任务中聚合统计值,并在mapreduce操作完成时返回给用户代码。当前计数器值也会展示在主节点状态页面,以便人们可以观察实时计算的进度。当聚合计数器值时,主节点会消除重复执行相同map或reduce任务的影响,以避免重复计算。(重复执行可能来自我们使用备份任务和由于失败而重新执行任务)。

一些计数器值由mapreduce库自动维护,例如处理的输入键值对的数量和产生的输出键值对的数量。

用户发现计数器功能对于检查mapreduce操作的行为非常有用。例如,在一些mapreduce操作中,用户代码希望确保生成的输出对数量正好等于处理的输入对数量,或者处理的德语文档的比例在处理的文档总数的可容忍的比例范围内。

5 Performance

本节我们将在大型集群机器上运行的两个计算来测量Mapreduce的性能,一个计算是搜索大约1T的数据来寻找一个特定模式,另一个计算是对大约1T的数据进行排序。

这两个程序代表来MapReduce用户编写的实际程序的一个大子集:一类程序将数据从一个表示变换为另一个表示,另一类程序从一个大数据集中提出少量感兴趣的数据。

5.1 Cluster Configuration

所有程序都在一个由大约1800台机器组成的集群上执行,每台机器有2个2GHz Intel Xeon处理器,支持超线程,4GB内存,2个160GB的IDE磁盘,和千兆以太网链路。这些机器被安排在一个两级树状交换网络中,根节点的总带宽大约100-200gbps,所有的机器都在同一个托管设备中,因此任何一对机器之间的往返时间都不到一毫秒。

在4GB内存中,大约有1-1.5GB是由集群上运行的其他任务保留,这些程序在一个星期天的下午被执行,当时CPU,磁盘和网络大多处于空闲状态。

5.2 Grep

grep程序会扫描10^10条100字节的记录,搜索相对罕见的3个字符模式(该模式出现在92337条记录中),输入被分割成大约64M的块(M=15000),整个输出被放在同一个文件中(R=1)

图2展示了计算随时间的进展,Y轴表示扫描输入数据的速率,当更多的机器被分配到这个mapreduce计算时,速率逐渐上升,当分配了1764个工作节点时,峰值超过30GB/s,当map任务完成时,速率开始下降,并在计算后大约80s***后达到0,整个计算从开始到结束大约需要150s,包括了大约一分钟的启动开销,开销是由于要将程序传播到所有工作机器,以及延迟与GFS交互以打开1000个输入文件集并获得局部性优化所需的信息。

5.3 Sort

排序程序对10^10条100字节的记录进行排序(大约有1T数据),该程序是在TeraSort基准测试后建模的。

排序程序由不到50行用户代码组成。3行map函数从文本行中提取10字节的排序键,然后将键和原始文本行作为中间键值对发出。我们使用一个内置的identity函数作为Reduce操作符,该函数将中间键值对不加改变的传递给输出键值对。将***终的排序输出写入一组双向复制的GFS文件(即,将2T写入程序的输出)

和前面一样,输入数据被分割成64M的块(M=15000),我们将排序后的输出分割成4000个文件(R=4000),分区函数使用键的初始字节来将其隔离为R个片段中的一个。

这个基准测试的分区函数内置了键分布的知识,在一般排序程序中,我们将添加一个传递前的MapReduce操作,这个操作将收集键的样本,并使用采样键的分布来计算***终排序传递的分割点。

图三(a)中展示了排序程序正常执行的进度,左上角的图展示了读取输入的速率,速率峰值大约13GB/s,并且很快就消失,因为所有的map任务在200秒前就完成了。注意输入速率要低于grep,这是因为排序map任务花了一半的时间和IO带宽将中间输出写入本地磁盘,而grep对应的中间输出的大小可以忽略不计。

左中图展现了数据通过网络从map任务传输到reduce任务的速率,当***个map任务完成时数据转移就开始了,图中的***驼峰是***批大约1700个reduce任务(整个Mapreduce分配了大约1700台机器,每台机器一次***多执行一个reduce任务),在大约300s的计算中,***批reduce任务中的一些完成了,我们开始传输为剩下的reduce任务传输数据,所有的数据传输在计算后大约600s完成。

左下角的图显示了reduce任务将排序数据写入***终输出文件的速率,在***个传输数据周期结束和写入周期开始之间有一个延迟,因为机器忙着对中间数据进行排序,写入数据持续在2-4GB/s之间,所有的写操作在计算完成后大约850s完成,包括启动开销在内,整个计算耗时891秒,这与TeraSort基准测试目前报告的***好结果1057秒相似。

有几件事情需要注意:由于我们的局部性优化,输入数据的速率要高于传输数据速率和输出速率:大部份数据从本地磁盘读取,绕过了相对带宽受限的网络,传输数据速率要高于输出速率,因为输出阶段需要写入已排序数据的两个副本(出于可靠性和可用的原因,我们创造了两个输出副本)。我们编写两个副本,因为这是底层文件系统提供的可靠性和可用性机制,如果底层文件系统使用擦除编码而不是复制,则写入数据所需的网络带宽将会减少。

5.4 Effect of Backup Tasks

在图三(b)中,我们展示了禁用备份任务的排序程序的执行,执行流和图三(a)相似,除了有一个非常长的尾部,几乎没有任何写活动发生,960s后除了5个reduce任务外,其他任务全部完成。然而,***后几名落伍者直到300s后才完成,整个计算花费1283秒,运行时增加了44%。

5.5 Machine Failures

在图三(c),我们展示排序程序的执行,在计算开始几分钟后,我们故意杀死了1746个工作进程中的200个。底层集群调度器立刻重新启动这些机器上的新工作进程(因为进程被终止,机器依然在正常运行)。

工作节点的崩掉会显示负的输入率,因此之前完成的一些map工作丢失了(因为相应的map进程被杀掉了),需要重新做。此map任务的重新执行相对较快,整个计算在933秒内完成,包括了启动开销(仅比正常执行时间增加了5%)

6 Experience

我们在2003年2月份写了mapreduce库的***个版本,并在2003年8月份对其进行了重大增强,包括局部性优化、跨工作机器执行任务的动态负载均衡等,从那以后,我们就惊喜地发现,mapreduce库对我们所处理的各种问题是如此广泛地适用。它已被用于google内部的广泛领域,包括:

  • 大规模机器学习问题

  • 谷歌新闻和Froogle产品的聚类问题

  • 提取用来生成流行查询报告的数据(比如:google zeitgeist)

  • 提取用于新实验和产品的网页属性(比如,从大型网页语料库中提取地理位置信息用于本地化搜索)

  • 大规模图形计算

    图四展示了随着时间的推移,检入主要源代码管理系统的独立mapreduce程序数量的显著增加,从2003年初的0到2004年9月底的近900个独立实例。mapreduce之所以能如此成功,是因为它使得编写一个简单的程序并在半小时内在数千台机器上高效运行成为了可能,大大加快了开发和原型的周期。此外,它让没有任何分布式和/或平行系统经验的程序员能够轻松地利用大量资源。

    在每个作业结束时,mapreduce库会记录该作业使用的计算资源统计数据。在表1中,我们展示了2004年8月在google运行的mapreduce作业子集的一些统计数据。

    6.1 Large-Scale Indexing

    到目前为止,MapReduce***重要的用途之一是完全重写了生产索引系统,该系统生产用于 google页面搜索服务的数据结构。索引系统将由我们的爬虫系统检索到的大量文文档集作为输入,存储在一组GFS文件中。这些文档的原始内容超过20TB的数据,索引过程以5到10个mapreduce操作的顺序运行。使用mapreduce(而不是此前版本的索引系统中的ad-hoc分布式传递)有以下几个好处:

    • 索引代码更简单,更小,更容易理解,因为处理容错、分布式和并行化的代码隐藏在mapreduce库中,例如,当使用mapreduce表达后,计算的一个阶段的代码大小从大约3800行c++减少到大约700行。

    • mapreduce库的性能足够好,我们可以将概念上不相关的计算分开,而不是将他们混合在一起,以避免额外的数据传递,这让更改索引过程变得更容易,例如,在旧系统上需要花掉几个月才能完成的一个更改,在新系统中只需要几天就可以实现。

    • 索引过程变得更易于操作,因为大多数由于机器故障、慢机器、以及网络故障引起的问题都由mapreduce库自动处理,无需人工干预。此外,通过想索引集群中添加新机器,可以很容易地提升索引过程的性能。

      7 Related Work

      许多系统也提供了受限的变成模型,并利用这些约束自动并行化计算。例如,可以在N个处理器上运行并行前缀计算,在logN时间内对N元素数组的所有前缀计算一个关联函数。mapreduce可以被认为是基于我们在现实世界中大量计算的经验对这些模型的简化和提炼。更重要的是,我们提供了可扩展到数千个处理器的容错实现。相比之下,大多数并行处理系统只能在小规模机器上运行,并且需要程序员自己处理机器故障。

      批量同步编程和一些MPI原语提供了更高水平的抽象,使程序员更容易编写并行程序,这些系统与mapreduce之间一个关键区别是,mapreduce利用了受限程序模型自动并行化用户程序,并且提供透明的容错。

      我们局部性优化的灵感来源于活动磁盘等技术,其中计算被推入靠近本地磁盘的处理元素,以减少IO子系统或网络发送数据量,我们在直接连接少量磁盘的普通处理器上运行,而不是直接在磁盘控制器处理器上运行,但一般方法是相似的。

      我们的备份任务机制类似于Charlotte系统中此阿勇的急切调度机制,简单的急切调度的缺点之一是,如果一个给定的任务导致重复的失败,整个计算无法完成,我们用跳过坏记录的机制修复了这个问题的一些实例。

      mapreduce的实现依赖于内部集群管理系统,该系统负责在大量共享机器上分发和运行用户任务,集群管理系统虽然不是本文的重点,但在精神上与Condor等其他系统类似。

      排序工具作为mapreduce的一部份,在操作上类似于NOW-Sort,源机器对待排序的数据进行分区,并将其发送给R个reduce工作节点的一个,每个reduce节点在本地(如果可能的话,在内存中)对数据进行排序,当然,NOW-Sort没有用户自定义的map和reduce函数,而正是这些自定义函数让我们的库具有广泛的适用性。

      River提供了一个编程模型,其中进程通过在分布式队列上发送数据来相互通信,类似MapReduce,River系统即使是在异构硬件或系统扰动引入的不均匀性情况下,也试图提供良好的均等性能。River通过小心调度磁盘和网络传输来实现均衡的完成时间,MapReduce则采用了不同的方法,通过限制程序模型,MapReduce框架能够将问题分割成大量细粒度任务,这些任务被动态的调度到可用的工作节点,以便效率更高的节点处理更多的任务,受限编程模型也允许我们调度作业结束时安排任务的冗余执行,进而大大减少了存在不一致性(例如,减缓或者卡住工作节点)的完成时间。

      BAD-FS有一个不同于mapreduce的编程模型,和mapreduce不同的是,它致力于广域网上执行任务,但是,有两个基本相似之处:1)都用冗余执行来恢复由于故障而丢失的数据。2)都使用局部化调度来减少通过拥挤的网络链路传输的数据量

      TACC是一个为了简化高可用网络服务构建而设计的系统,和MapReduce一样,它利用重新执行机制来实现容错。

      8 Conclusions

      MapReduce编程模型已经成功地因为许多不同目的在google被使用,我们将这种成功归结为几个原因,首先,该模型易用,即使是对没有任何并行和分布式系统经验的程序员也是如此,因为其隐藏了并行化,容错,局部性优化,以及负载均衡的细节。其次,大量问题可以很容易地表达为MapReduce计算,例如,Mapreduce用来为google生产网页搜索服务生成数据,用于排序,数据挖掘,机器学习和许多其他系统。第三,我们开发的Mapreduce计算能够扩展到由上千台机器组成的大型机器集群,该实现有效地利用了这些机器资源,因此适用于google遇到的许多大规模计算问题。

      从本项工作中我们学到了一些东西,首先,限制编程模型使并行化和分布式计算变得容易,并使这种计算具有容错性。其次,网络带宽是一种稀缺资源,因此,我们系统的许多优化都是致力于减少通过网络传输的数据量:局部性优化让我们从本地磁盘读数据,并将中间数据的单个副本写入本地磁盘,从而节省网络带宽。第三,冗余执行可用来减少慢速机器的影响,以及处理机器故障和数据丢失。


发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

点击启动AI问答
Draggable Icon