其实流程算起来也不算复杂,所以就直接用代码注释来做吧
这里边涉及这么几个方法:
1、public List<InputSplit> getSplits(JobContext job), 这个由客户端调用来获得当前Job的所有分片(split),然后发送给JobTracker(新API中应该是ResourceManager),而JobTracker根据这些分片的存储位置来给TaskTracker分配map任务去处理这些分片。这个方法用到了后边的listStatus,然后根据得到的这些文件信息,从FileSystem那里去拉取这些组成这些文件的块的信息(BlockLocation),使用的是getFileBlockLocation(file,start,len),这个方法是与使用的文件系统实现相关的(FileSystem,LocalFileSystem,DistributedFileSystem)
/**
* Generate the list of files and make them into FileSplits.
*/
public List<InputSplit> getSplits(JobContext job
) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus>files = listStatus(job); //2
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
/*Return an array containing hostnames, offset and size of portions of the given
file. For a nonexistent file or regions, null will be returned. This call is
most helpful with DFS, where it returns hostnames of machines that contain the
given file. The FileSystem will simply return an elt containing 'localhost'.*/
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize); //3
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //4
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
//这里使用的是FileSplit,在RecordReader实现中拿到Split的时候就可以向下转型,从而拿到一些分片的信息
splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files in the job-conf
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
LOG.debug("Total # of splits: " + splits.size());
return splits;
}
2、protected List<FileStatus> listStatus(JobContext job), 先根据“mapred.input.dir”的配置值去得到用户指定的所有Path。然后根据这个JobContext的Configuration得到FileSystem(当然,更可能是
DistributedFileSystem )。最后应用用户可能设置了的PathFilter,通过FileSystem获取所有这些Path所代表的File(FileStatus)。注:这个方法的东西相当多,很多内容还十分陌生。
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
List<IOException> errors = new ArrayList<IOException>();
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
for (int i=0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat: matches) {
//可以看到,这里没有对遍历进行递归,但是在权威指南里边提到可以通过设置mapred.input.dir.recursive来
//修改这一行为,暂时还不太清楚是如何实现的,在项目中查找这一字符串应该能找到一些线索
if (globStat.isDir()) {
for(FileStatus stat: fs.listStatus(globStat.getPath(),
inputFilter)) {
result.add(stat);
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
3、protected long computeSplitSize(long blockSize, long minSize, long maxSize),计算出当前Job所配置的分片最大尺寸。
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
4、protected int getBlockIndex(BlockLocation[] blkLocations,
long offset), 由于组成文件的块的信息已经获得了,只需要根据offset来计算所在的那个块就行了。
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
throw new IllegalArgumentException("Offset " + offset +
" is outside of file (0.." +
fileLength + ")");
}
分享到:
相关推荐
Hadoop存储与计算分离实践
Hadoop在相似度计算中的优化_林述民
java操作hadoop之mapreduce计算整数的最大值和最小值实战源码,附带全部所需jar包,欢迎下载一起学习。
在Hadoop MapReduce环境中,如果能预知作业的执行时间,...该方法已在Hadoop-0.20.2中实现,并在一个包含19个节点的Linux集群中进行了验证.实验结果表明,在最好情况下,根据该方法预测的执行时间和真实执行时间的误差约2%.
关于Hadoop的环境搭建,进行分布式计算的入门知识,很适合分布式计算的初学者。
Hadoop 迭代式计算框架 Guagua 是 PayPal 的一个开源机器学习框架 Shifu 的子项目。Guagua 主要解决了模型训练的分布式问题。同时 Guagua 并没有将自己局限在分类模型,Guagua 是一个基于 Hadoop 的迭代式计算框架,...
文档较详尽的讲述了MR的简介,MR初学分析示例(有代码)、MR特性,MR的执行过程(有代码),MR单元测试介绍(有代码)、HA的架构和配置、同时也向大众推荐了两本书。其中部分有较为详尽的链接以供参考。
hadoop集群配置流程以及用到的配置文件,hadoop2.8.4、hbase2.1.0、zookeeper3.4.12
Hadoop使用常见问题以及解决方法.doc Hadoop使用常见问题以及解决方法.doc
Hadoop使用常见问题以及解决方法,简单实用
第十七讲hadoop分布计算配置.pptx
计算Hadoop:快速部署Hadoop集群 详细的Hadoop集群部署文档,对您绝对有用~
云栖大会,Hadoop存储与计算分离实践
基于多元线性回归模型的Hadoop集群节点性能计算方法.pdf
hadoop客户端需要和namenode进行交互,而namenode中存放的是datanode的文件属性,且都是在内存中,如果小文件过多,namenode是存放不了的; 3、多用户写入,任意修改文件。 Hadoop适合于一次计算,多次读取的场景...
Hadoop
Hadoop大数据计算平台搭建实践.pdf
同时,通过实际案例研究,展示了Hadoop在实际场景中的应用和效果。 适用人群: 本论文适合计算机科学与技术、软件工程等相关专业的本科专科毕业生,以及对大数据处理和分析感兴趣的学习者。 使用场景及目标: 本...
针对原生的Hadoop云平台处理海洋环境信息可视化效率不高的问题,提出了一种GPU嵌入Hadoop云平台的并行计算框架。该框架以原生Hadoop为基础,GPU并行计算与MapReduce相结合,实现了高效的海洋流场可视化和特征可视化...
安装hadoop的时候或者使用的时候,会出现hadoop常见问题及解决方法