spark读取hdfs文件(解析Spark读取hdfs的文件如何分区)

spark读取hdfs的文件是怎么分区的,读取代码如下:

val df = sc.textFile("data/wc.txt",3)

spark读取hdfs文件(解析Spark读取hdfs的文件如何分区)

闪闪发光的星星灰尘散景背景上的足迹

一.分析

spark读取hdfs的文件分区跟hadoop的分区完全相同,因为底层使用的就是Hadoop的TextInputFormat,考虑两内容:

1)关于文件分区数量计算:

指定的预分区数量是最小分区数量,如:代码中的参数3。

真正的分区计算: 每个分区字节数 = 文件字节数/预分区数量

如果没有整除,判断余数是否大于分区字节数 * 0.1,如果大于则会新增一个分区,剩余的放在这个分区。否则不会新加分区,把余数放到最后一个分区里。

2)分区数据如何读取:

分区读取数据是按行读取,但是会考虑文件的偏移量(offset)的设置。虽然第一个分区字节数不包含一整行,但是会读取一整行。当某个分区的偏移量全被之前的分读走了,这个分区就是空的。

注意:

1.当位移量读取了回撤换行,会把下一行的数据也会读取。

2.当读取多个文件时,会把所有文件字节加起来计算分区,但是读取的时候不会夸文件读取。

 

二.代码分析

1)读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark核心源码如下,FileInputFormat类中的方法:

public InputSplit[] getSplits(JobConf job, int numSplits)

throws IOException {

 

long totalSize = 0; // compute total size

for (FileStatus file: files) {// check we have valid files

if (file.isDirectory()) {

throw new IOException("Not a file: "+ file.getPath());

}

totalSize += file.getLen();

}

 

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.

FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

...

for (FileStatus file: files) {

...

if (isSplitable(fs, path)) {

long blockSize = file.getBlockSize();

long splitSize = computeSplitSize(goalSize, minSize, blockSize);

 

...

 

}

protected long computeSplitSize(long goalSize, long minSize,long blockSize) {

return Math.max(minSize, Math.min(goalSize, blockSize));

 

2) 分区数据读取的代码LineReader

public LineRecordReader(Configuration job, FileSplit split,
    byte[] recordDelimiter) throws IOException {
  this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
    LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();
  compressionCodecs = new CompressionCodecFactory(job);
  codec = compressionCodecs.getCodec(file);

  // open the file and seek to the start of the split
  final FileSystem fs = file.getFileSystem(job);
  fileIn = fs.open(file);
  if (isCompressedInput()) {
    decompressor = CodecPool.getDecompressor(codec);
    if (codec instanceof SplittableCompressionCodec) {
      final SplitCompressionInputStream cIn =
        ((SplittableCompressionCodec)codec).createInputStream(
          fileIn, decompressor, start, end,
          SplittableCompressionCodec.READ_MODE.BYBLOCK);
      in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
      start = cIn.getAdjustedStart();
      end = cIn.getAdjustedEnd();
      filePosition = cIn; // take pos from compressed stream
    } else {
      in = new SplitLineReader(codec.createInputStream(fileIn,
          decompressor), job, recordDelimiter);
      filePosition = fileIn;
    }
  } else {
    fileIn.seek(start);
    in = new UncompressedSplitLineReader(
        fileIn, job, recordDelimiter, split.getLength());
    filePosition = fileIn;
  }

spark读取hdfs文件(解析Spark读取hdfs的文件如何分区)

三案例分析

案例:读取文件重新分区,再写入到文件

代码:

val conf = new Configuration()
val fs = FileSystem.getLocal(conf)

fs.delete(new Path("data/wc.out2"),true)

val df = sc.textFile("data/wc.txt",3)

df.saveAsTextFile("data/wc.out2")

 

 文件内容:
   d
   a
   y
   u
   e
   e
   t
 1)计算分区:
 总字节数:14;指定预分区为3.
 目标分区大小: goalSize = 14/3 =4, 余2; 2/4>0.1,所以会生成4个分区,每个分区大小是4个字节。每个分区读取偏移量入下:   0,4
   4,8
   8,12
   12, 13
2)分区读取内容
 按行读取
 文件内容的offset为:
 0 1
 2 3
 4 5
 6 7
 8 9
 10 11
 12

  第1个分区读取:0到4,读取前3行 ;注意:当位移量偏过换行符时,会把下一行的数据也会读取了
  第2个分区读取:4到8,因为4,5被读去了,从来6开始读到9。(因为按行读取所以读到9)
  第3个分区读取:8到12,因为8,9被读去了,从来10开始读到12。
  第4个分区读取:12到13 所以为

四.总结

spark读取hdfs文件分区比较复杂,需要仔细研究研究。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发表评论

登录后才能评论