Hadoop中如何处理大量小文件的问题?代码举例讲解

在Hadoop中处理大量小文件的问题的主要方式是:

1、 合并小文件:

  • 使用文件合并工具如CombineFileInputFormat统一合并小文件。
  • 自定义InputFormat实现小文件合并。

2、 序列化和缩小小文件:

  • 使用SequenceFileOutputFormat将小文件序列化成SequenceFile。
  • 自定义OutputFormat实现小文件压缩和编码。

3、 在MapReduce中处理小文件:

  • 自定义InputFormat将多个小文件作为一个split输入。
  • 在Mapper中对多个小文件进行聚合。

4、 使用CombineFileInputFormat示例:

  • 设置minimumSplitSize和maximumSplitSize:
<property>
  <name>mapreduce.input.fileinputformat.split.maxsize</name> 
  <value>536870912</value> 
</property>
<property>  
  <name>mapreduce.input.fileinputformat.split.minsize</name>
  <value>536870912</value>  
</property> 
  • 在作业中设置InputFormatClass:
job.setInputFormatClass(CombineFileInputFormat.class);

5、 自定义InputFormat示例:

  • 继承FileInputFormat,重写getSplits()方法:
public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
  @Override
  public List<InputSplit> getSplits(JobContext context) throws IOException {
    List<FileSplit> splits = new ArrayList<>();
    long maxSize = 536870912; // 512MB
    long totalSize = 0;
    List<FileStatus> files = listStatus(context);

    for(FileStatus file : files) {
      if((totalSize + file.getLen()) <= maxSize) {
        totalSize += file.getLen();
      } else {
        splits.add(new FileSplit(files, 0, totalSize, new String[0])); 
        totalSize = file.getLen();
      }
    }
    splits.add(new FileSplit(files, 0, totalSize, new String[0]));

    return splits;
  }  
}
  • 在作业中设置InputFormatClass:
job.setInputFormatClass(CustomInputFormat.class); 

大文件处理的主要作用是:

  1. 避免产生过多的Map任务导致资源浪费。
  2. 合理设置块大小提高 NameNode 和 DataNode 的性能。
  3. 统一处理小文件更易于业务实现。

来看一些简单示例:

  1. CombineFileInputFormat – 配置:
<property>  
  <name>mapreduce.input.fileinputformat.split.maxsize</name> 
  <value>536870912</value> 
</property>
<property>
  <name>mapreduce.input.fileinputformat.split.minsize</name>
  <value>536870912</value>  
</property>
  • 设置InputFormatClass:
job.setInputFormatClass(CombineFileInputFormat.class);
  1. 自定义InputFormat:
public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
  @Override
  public List<InputSplit> getSplits(JobContext context) throws IOException {
    List<FileSplit> splits = new ArrayList<>();
    long maxSize = 536870912; // 512MB
    long totalSize = 0;
    List<FileStatus> files = listStatus(context);

    for(FileStatus file : files) {
      if((totalSize + file.getLen()) <= maxSize) {
        totalSize += file.getLen();
      } else {
        splits.add(new FileSplit(files, 0, totalSize, new String[0])); 
        totalSize = file.getLen();
      } 
    }
    splits.add(new FileSplit(files, 0, totalSize, new String[0]));

    return splits;
  }  

  @Override
  public RecordReader<LongWritable, Text> createRecordReader(
      InputSplit split, TaskAttemptContext context) {
    return new CombineFileRecordReader((FileSplit)split, context);
  }
}
  • 重写createRecordReader()方法返回CombineFileRecordReader。
  • 在作业中设置InputFormatClass:
job.setInputFormatClass(CustomInputFormat.class);  
  1. SequenceFileOutputFormat – 设置OutputFormatClass:
job.setOutputFormatClass(SequenceFileOutputFormat.class); 
  • Mapper输出的key和value类型必须是WritableComparable。

所以通过InputFormat设置最小最大分片大小、自定义InputFormat实现文件合并,或使用SequenceFileOutputFormat序列化文件,我们可以高效解决Hadoop中的小文件问题。

自定义InputFormat是解决小文件问题的最佳方式,因为它可以最大限度满足我们的业务需求。SequenceFileOutputFormat也是一种简单高效的小文件压缩方法,尤其适合 Reduce端的输出文件。