05-HDFS(3)

发布时间 2023-06-29 00:21:29作者: tree6x7

1. 存储格式

1.1 前置说明

Hadoop 上的文件存储格式,肯定不会像 Windows 这么丰富,因为目前我们用 Hadoop 来存储、处理数据。我们不会用 Hadoop 来听歌、看电影或者打游戏。

在 Hadoop 中,没有默认的文件格式,格式的选择取决于其用途。而选择一种优秀、适合的数据存储格式是非常重要的。

后续我们要学习的,使用 HDFS 的应用程序(e.g. MapReduce、Spark)性能中的最大问题、瓶颈是在特定位置查找数据的时间写入到另一个位置的时间,而且管理大量数据的处理和存储也很复杂(e.g. 数据的格式会不断变化,原来一行有 12 列,后面要存储 20 列)。

l Hadoop 文件格式发展了好一段时间,这些文件存储格式可以解决大部分问题。我们在开发大数据中,选择合适的文件格式可能会带来一些明显的好处:

  1. 可以保证写入的速度
  2. 可以保证读取的速度
  3. 文件是可被切分的
  4. 对压缩支持友好
  5. 支持 schema 的更改

某些文件格式是为通用设计的,而其他文件则是针对更特定的场景,有些在设计时考虑了特定的数据特征。因此,确实有很多选择。

每种格式都有优点和缺点,数据处理的不同阶段可以使用不同的格式才会更有效率。通过选择一种格式,最大程度地发挥该存储格式的优势,最小化劣势。

BigData File Viewer

BigData File Viewer 是一个跨平台(Windows/MAC/Linux)桌面应用程序,用于查看常见的大数据二进制格式,例如 Parquet,ORC,AVRO 等。支持本地文件系统/HDFS/AWS S3 等。

https://github.com/Eugene-Mark/bigdata-file-viewer/

行式存储、列式存储

  • 行式存储(Row-Based):同一行数据存储在一起
  • 列式存储(Column-Based):同一列数据存储在一起

  • 「行存储」的写入是一次性完成,消耗的时间比列存储少,并且能够保证数据的完整性,缺点是数据读取过程中会产生冗余数据,如果只有少量数据,此影响可以忽略;数量大可能会影响到数据的处理效率。行适合插入、不适合查询
  • 「列存储」在写入效率、保证数据完整性上都不如行存储,它的优势是在读取过程,不会产生冗余数据,这对数据完整性要求不高的大数据处理领域,比如互联网,尤为重要。列适合查询,不适合插入

1.2 常见存储格式

a. Text File

文本格式是 Hadoop 生态系统内部和外部的最常见格式。通常按行存储,数据一行一行到排列,每一行都是一条记录。以回车换行符区分不同行数据。

最大缺点是不支持块级别压缩,因此在进行压缩时会带来较高的读取成本。

解析开销一般会比二进制格式高,尤其是 XML 和 JSON,它们的解析开销比 Textfile 还要大。

仅在需要从 Hadoop 中直接提取数据,或直接从文件中加载大量数据的情况下,才建议使用纯文本格式或 CSV。

b. Sequence File

Sequence 最初是为 MapReduce 设计的,因此和 MapReduce 集成很好。

在 Sequence File 中,每条数据记录(record)都是以 key、value 键值对进行序列化存储(二进制格式)。

Sequence File 中的数据是以二进制格式存储,这种格式所需的存储空间小于文本的格式。与文本文件一样,Sequence File 内部也不支持对键和值的结构指定格式编码。

序列化文件与文本文件相比更紧凑,支持 record 级、block 块级压缩。压缩的同时支持文件切分(下图的 Sync 即为拆分标记)。

通常把 Sequence File 作为中间数据存储格式。例如:将大量小文件合并放入到一个 Sequence File 中。

record 就是一个 kv 键值对,其中数据保存在 value 中,可以选择是否针对 value 进行压缩。block 就是多个 record 的集合,block 级别压缩性能更好。

缺点:

  • 对于具有 SQL 类型的 Hive 支持不好,需要读取和解压缩所有字段。
  • 不存储元数据,并且对 schema 扩展中的唯一方式是在末尾添加新字段。

c. Avro File

Apache Avro 是与语言无关的序列化系统,由 Hadoop 创始人 Doug Cutting 开发。

Avro 是基于行的存储格式,它在每个文件中都包含 JSON 格式的 schema 定义,从而提高了互操作性并允许 schema 的变化(删除/添加列)。除了支持可切分以外,还支持块压缩。

Avro 是一种自描述格式,它将数据的 schema 直接编码存储在文件中,可以用来存储复杂结构的数据。

适合于大量频繁写入宽表数据(字段多列多)的场景,其序列化反序列化很快。

优点:

  • Avro 是与语言无关的数据序列化系统;
  • Avro 将 schema 存储在 header 中,数据是自描述的;
  • 序列化和反序列化速度很快;
  • Avro 文件是可切分的、可压缩的,非常适合在 Hadoop 生态系统中进行数据存储。

缺点:

  • 如果我们只需要对数据文件中的少数列进行操作,行式存储效率较低。例如:我们读取 15 列中的 2 列数据,基于行式存储就需要读取数百万行的 15 列。而列式存储就会比行式存储方式高效;
  • 列式存储因为是将同一列(类)的数据存储在一起,压缩率要比行式存储高。

d. RC File

Hive Record Columnar File(记录列文件),这种类型的文件首先将数据按行划分为行组,然后在行组内部将数据存储在列中

RC File 是为基于 MapReduce 的数据仓库系统设计的数据存储结构。它结合了行存储和列存储的优点,可以满足快速数据加载和查询,有效利用存储空间以及适应高负载的需求。

RC File 是由二进制键/值对组成的 flat 文件,它与 Sequence File 有很多相似之处。支持压缩、切分但不支持 schema 扩展,如果要添加新的列,则必须重写文件,这会降低操作效率。

在数仓中执行分析时,这种面向列的存储非常有用。当我们使用面向列的存储类型时,执行分析很容易。

  • RC File 可将数据分为几组行,并且在其中将数据存储在列中;
  • RC File 首先将行水平划分为行拆分(Row Group),然后以列方式垂直划分每个行拆分(Columns);
  • RC File 将行拆分的元数据存储为 record 的 key,并将行拆分的所有数据存储 value;
  • 作为行存储,RC File 保证同一行中的数据位于同一节点中;
  • 作为列存储,RC File 可以利用列数据压缩,并跳过不必要的列读取。

e. ORC File

ORC File(Optimized Row Columnar)提供了比 RC File 更有效的文件格式。

它在内部将数据划分为默认大小为 250M 的 Stripe。每个条带均包含索引、数据和页脚。索引存储每列的最大值和最小值以及列中每一行的位置。

它并不是一个单纯的列式存储格式,仍然是首先根据 Stripe 分割整个表,在每一个 Stripe 内进行按列存储。

ORC 有多种文件压缩方式,并且有着很高的压缩比。文件是可切分(Split)的。ORC File 是以二进制方式存储的,所以是不可以直接读取。

优点:

  • 比 TextFile、Sequence File 和 RC File 具备更好的的性能;
  • 列数据单独存储;
  • 带类型的数据存储格式,使用类型专用的编码器;
  • 轻量级索引

缺点:

  • 与 RC 一样,ORC 也是不支持列扩展的。

f. Parquet File

Parquet 是面向分析型业务的列式存储格式,由 Twitter 和 Cloudera 合作开发,2015 年 5 月从 Apache 的孵化器里毕业成为 Apache 顶级项目。

Parquet 文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此 Parquet 格式文件是自解析的。

和 ORCFile 一样,Parquet 也是基于列的二进制存储格式,可以存储嵌套的数据结构。

与 RC 和 ORC 文件不同,Parquet serdes 支持有限的 schema 扩展。在 Parquet 中,可以在结构的末尾添加新列。

Parquet 的存储模型主要由行组(Row Group)、列块(Column Chuck)、页(Page)组成。

在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。

行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。

Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位,同一列块的不同页可以使用不同的编码方式。

文件的首位都是该文件的 Magic Code,用于校验它是否是一个 Parquet 文件。


Parquet VS ORC:

  • Parquet 与 RC/ORC 一样,也具有压缩和查询性能方面的优点,与非列文件格式相比,写入速度通常较慢;
  • ORC 文件格式压缩比 Parquet 要高,Parquet 文件的数据格式 Schema 要比 ORC 复杂,占用的空间也就越高;
  • ORC 文件格式的读取效率要比 Parquet 文件格式高;
  • 如果数据中有嵌套结构的数据,则 Parquet 会更好;
  • Hive 对 ORC 的支持更好,对 Parquet 支持不好,ORC 与 Hive 关联紧密。
  • Spark 对 Parquet 支持较好,对 ORC 支持不好;
  • 为了数据能够兼容更多的查询引擎,Parquet 也是一种较好的选择;
  • ORC 还可以支持 ACID、Update 操作等;

关于 Hive 对 Parquet 文件的支持的一个注意事项:Parquet 列名必须小写,这一点非常重要。如果 Parquet 文件包含大小写混合的列名,则 Hive 将无法读取该列。

g. Apache Arrow

Apache Arrow 是一个跨语言平台,是一种列式内存数据结构,主要用于构建数据系统。Apache Arrow 在 2016/2/17 作为顶级 Apache 项目引入。

Apache Arrow 发展非常迅速,并且在未来会有更好的发展空间。 它可以在系统之间进行高效且快速的数据交换,而无需进行序列化,而这些成本已与其他系统(e.g. Thrift、Avro、Protocol Buffers)相关联。

每一个系统实现,它的方法(method)都有自己的内存存储格式,在开发中 70%~80% 的时间浪费在了序列化和反序列化上。

Arrow 促进了许多组件之间的通信。 例如,使用 Python(pandas)读取复杂的文件并将其转换为 Spark DataFrame。

Arrow 是如何提升数据移动性能的?

  • 利用 Arrow 作为内存中数据表示的两个过程可以将数据从一种方法“重定向”到另一种方法,而无需序列化或反序列化。 例如,Spark 可以使用 Python 进程发送 Arrow 数据来执行用户定义的函数;
  • 无需进行反序列化,可以直接从启用了 Arrow 的数据存储系统中接收 Arrow 数据。例如,Kudu 可以将 Arrow 数据直接发送到 Impala 进行分析;
  • Arrow 的设计针对嵌套结构化数据(如 Impala、Spark Data 框架中)的分析性能进行了优化。

1.3 文件压缩格式

在 Hadoop 中,一般存储着非常大的文件,以及在存储 HDFS 块或运行 MapReduce 任务时,Hadoop 集群中节点之间的存在大量数据传输。 如果条件允许时,尽量减少文件大小,这将有助于减少存储需求以及减少网络上的数据传输

a. Hadoop 支持

Haodop 对文件压缩均实现 org.apache.hadoop.io.compress.CompressionCodec 接口,所有的实现类都在 org.apache.hadoop.io.compress 包下。

b. 压缩算法优劣指标

  • 压缩比 (原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5,显然压缩比越高越好)
  • 压缩/解压缩吞吐量 (每秒能压缩或解压缩多少 MB 的数据,吞吐量也是越高越好)
  • 压缩算法实现是否简单、开源
  • 是否为无损压缩
  • 压缩后的文件是否支持切分

c. 压缩算法比较

有不少的压缩算法可以应用到 Hadoop 中,但不同压缩算法有各自的特点。

压缩格式 工具 算法 文件扩展名 是否可切分 对应的编码解码器
DEFLATE DEFLATE .deflate org.apache.hadoop.io.compress.DefaultCodec
Gzip gzip gzip .gz org.apache.hadoop.io.compress.GzipCodec
bzip2 bzip2 bzip2 .bz2 org.apache.hadoop.io.compress.BZip2Codec
LZO lzop LZO .lzo 是(切分点索引) com.hadoop.compression.lzo.LzopCodec
LZ4 LZ4 .lz4 org.apache.hadoop.io.compress.Lz4Codec
Snappy Snappy .snappy org.apache.hadoop.io.compress.SnappyCodec

存放数据到 HDFS 中,可以选择指定的压缩方式,在 MapReduce 程序读取时,会根据扩展名自动解压。例如:如果文件扩展名为 .snappy,Hadoop 框架将自动使用 SnappyCodec 解压缩文件。

压缩比:

压缩、解压缩时间:

通过上图,我们可以看到哪些压缩算法压缩比更高。整体排序:Snappy < LZ4 < LZO < GZIP < BZIP2,但压缩比越高,压缩的时间也会更长。

d. 压缩如何抉择

既然压缩能够节省空间、而且可以提升 IO 效率,那么能否将所有数据都以压缩格式存储在 HDFS 中呢?

例如:BZIP2,而且文件是支持切分的。

如果选择 Gzip,就会出现以下情况:

  • 如果文件是不可切分的,只有一个 CPU 在处理所有的文件,其他的 CPU 都是空闲的。如果 HDFS 中的 block 和文件大小差不多还好,一个文件、一个块、一个 CPU。如果是一个很大的文件就会出现问题了。
  • bzip2 在压缩和解压缩数据方面实际上平均比 Gzip 差 3 倍,这对性能是有一定的影响的。如果我们需要频繁地查询数据,数据压缩一定会影响查询效率。
  • 如果不关心查询性能(没有任何 SLA)并且很少选择此数据,则 bzip2 可能是不错的选择。最好是对自己的数据进行基准测试,然后再做决定。

小结:

  • 压缩的合理使用可以提高 HDFS 存储效率;
  • 压缩解压缩意味着 CPU、内存需要参与编码;
  • 解码选择压缩算法时不能一味追求某一指标极致,综合考虑性价比较高的;
  • 文件的压缩解压需要程序或者工具的参与来对数据进行处理,大数据相关处理软件都支持直接设置。

2. 存储类型和存储策略

2.1 HDFS 异构存储类型

冷、热、温、冻数据

通常,公司或者组织总是有相当多的历史数据占用昂贵的存储空间。典型的数据使用模式是新传入的数据被应用程序大量使用,从而该数据被标记为"热"数据。随着时间的推移,存储的数据每周被访问几次,而不是一天几次,这时认为其是"暖"数据。在接下来的几周和几个月中,数据使用率下降得更多,成为"冷"数据。如果很少使用数据,例如每年查询一次或两次,这时甚至可以根据其年龄创建第四个数据分类,并将这组很少被查询的旧数据称为"冻结数据"。

Hadoop 允许将不是热数据或者活跃数据的数据分配到比较便宜的存储上,用于归档或冷存储。可以设置存储策略,将较旧的数据从昂贵的高性能存储上转移到性价比较低(较便宜)的存储设备上。

Hadoop 2.5 及以上版本都支持存储策略,在该策略下,不仅可以在默认的传统磁盘上存储 HDFS 数据,还可以在 SSD(固态硬盘)上存储数据。

什么是异构存储?

异构存储是 Hadoop2.6.0 版本出现的新特性,可以根据各个存储介质读写特性不同进行选择

例如冷热数据的存储,对冷数据采取容量大,读写性能不高的存储介质如机械硬盘,对于热数据,可使用 SSD 硬盘存储。

在读写效率上性能差距大。异构特性允许我们对不同文件选择不同的存储介质进行保存,以实现机器性能的最大化。

HDFS 中声明定义了 4 种异构存储类型

public enum StorageType {
  // sorted by the speed of the storage types, from fast to slow
  RAM_DISK(true),
  SSD(false),
  DISK(false),
  ARCHIVE(false),
  PROVIDED(false);
  
  // 表示该存储类型是否是临时的或易失性的
  private final boolean isTransient;

  StorageType(boolean isTransient) {
    this.isTransient = isTransient;
  }
}
  1. RAM_DISK:这种存储类型表示数据块存储在内存中。因为内存的读写速度比磁盘和固态硬盘都要快,所以这种存储类型适用于需要快速读写数据的场景。不过需要注意的是,这种存储类型的数据会在机器重启或停电时丢失,因此只适用于暂时存储数据的场景。
  2. SSD:这种存储类型表示数据块存储在固态硬盘上。固态硬盘的访问速度比传统的机械硬盘快,因此这种存储类型适用于对速度要求比较高的场景。
  3. DISK:这种存储类型表示数据块存储在常规的磁盘上。这是一种比较常见的存储介质类型。
  4. ARCHIVE:这种存储类型表示数据块存储在归档介质中。归档介质是一种低速的存储介质,适用于长期存储数据,但不适合频繁读写的场景。因此,这种存储类型一般用于存储备份数据或归档数据。
  5. PROVIDED:这种存储类型表示数据块是由外部的存储系统提供的,而不是由 Hadoop 系统管理的。这种存储类型通常用于集成外部存储系统,如 Amazon S3 或 Azure Blob Storage。

如何让 HDFS 知道集群中的数据存储目录是哪种类型存储介质?

配置属性时主动声明,HDFS 并没有自动检测的能力。

配置参数 dfs.datanode.data.dir,如果目录前没有带上[SSD] [DISK] [ARCHIVE] [RAM_DISK] 这 4 种类型中的任何一种,则默认是 DISK 类型 。

<property>
  <name>dfs.datanode.data.dir</name>
  <value>[DISK]xxx</value>
</property>

2.2 块存储类型选择策略

块存储指的是对 HDFS 文件的数据块副本储存。

对于数据的存储介质,HDFS 的 BlockStoragePolicySuite 类内部定义了 6 种策略:HOT、WARN、COLD、ALL_SSD、One_SSD、Lazy_Persistence 等存储策略。为了根据不同的存储策略将文件存储在不同的存储类型中,引入了一种新的存储策略概念。

前 3 种根据冷热数据区分,后 3 种根据磁盘性质区分。

  • HOT:策略的设计初衷是为了优化热点数据的访问速度,它会优先选择访问频率高的数据块,并将它们存储在速度更快的存储介质中,例如 RAM_DISKSSD。这样做的目的是提高热点数据的访问速度,从而加速整个系统的响应速度。
  • WARNWARN 策略更注重存储空间的利用率,它会考虑数据块的使用情况和空间利用率,避免过度占用存储空间。当存储空间越来越紧张时,WARN 策略会放宽对存储介质的要求,例如选择将数据块存储在相对较慢的 DISK 中,以便释放更多的 RAM_DISKSSD 存储空间,从而提高存储空间的利用率。
  • COLD:该策略优先选择存储在 ARCHIVE 中的数据块,用于存储访问频率较低的数据。这种策略适用于对存储成本有较高要求,且可以接受数据访问速度较慢的场景。
  • ALL_SSD:该策略选择存储在 SSD 中的数据块,用于加速所有数据的访问速度。这种策略适用于对数据访问速度有非常高要求的场景,但存储成本也非常高。
  • ONE_SSD:该策略选择存储在 SSD 中的单个数据块,用于加速该数据块的访问速度。这种策略适用于对某些数据块有非常高要求的场景,但不适用于加速整个数据集的访问速度。
  • LAZY_PERSISTENCE:该策略将数据块首先存储在 RAM_DISK 中,然后异步将其写入 DISK 中。这种策略适用于需要快速写入数据,但不需要立即持久化的场景。此外,由于数据块会被异步写入 DISK 中,因此可能会丢失一些数据。

尽管 HOTWARN 策略都优先选择 RAM_DISKSSDDISK 中的数据块,但它们的权衡点不同,前者更加注重数据块的访问频率,而后者更加注重存储空间的利用率。


(1)列出所有存储策略

hdfs storagepolicies -listPolicies

(2)设置存储策略

hdfs storagepolicies -setStoragePolicy -path <path> -policy <policy>

(3)取消存储策略

hdfs storagepolicies -unsetStoragePolicy -path <path>

(4)获取存储策略

hdfs storagepolicies -getStoragePolicy -path <path>

2.3 冷热温数据异构存储测试

(1)为了能够支撑不同类型的数据,我们需要在 hdfs-site.xml 中配置不同存储类型数据的位置。

<!--配置DataNode存储目录,指定存储介质类型 -->
<property>
  <name>dfs.datanode.data.dir</name>
  <value>[DISK]file://${hadoop.tmp.dir}/dfs/data,[ARCHIVE]file://${hadoop.tmp.dir}/dfs/data/archive</value>
</property>

(2)分发到不同另外两个节点中,然后重启 HDFS 集群。

配置好后,我们在 WebUI 的 DataNodes 页面中点击任意一个 DataNode 节点:

可以看到,现在配置的是两个目录,一个 StorageType 为 ARCHIVE、一个 StorageType 为 DISK。

(3)创建测试目录结构

hdfs dfs -mkdir -p /data/hdfs-test/data_phase/hot
hdfs dfs -mkdir -p /data/hdfs-test/data_phase/warm
hdfs dfs -mkdir -p /data/hdfs-test/data_phase/cold

(4)分别设置三个目录的存储策略

# 分别设置三个目录的存储策略
hdfs storagepolicies -setStoragePolicy -path /data/hdfs-test/data_phase/hot -policy HOT
hdfs storagepolicies -setStoragePolicy -path /data/hdfs-test/data_phase/warm -policy WARM
hdfs storagepolicies -setStoragePolicy -path /data/hdfs-test/data_phase/cold -policy COLD
# 查看当前 HDFS 支持的存储策略
hdfs storagepolicies -getStoragePolicy -path /data/hdfs-test/data_phase/hot
hdfs storagepolicies -getStoragePolicy -path /data/hdfs-test/data_phase/warm
hdfs storagepolicies -getStoragePolicy -path /data/hdfs-test/data_phase/cold 

(5)分别上传文件到三个目录中测试

hdfs dfs -put /etc/profile /data/hdfs-test/data_phase/hot
hdfs dfs -put /etc/profile /data/hdfs-test/data_phase/warm
hdfs dfs -put /etc/profile /data/hdfs-test/data_phase/cold

(6)查看不同存储策略文件的 block 位置

# hot 目录中的 block,3 个 block 都在 DISK 磁盘
hdfs fsck /data/hdfs-test/data_phase/hot/profile -files -blocks -locations

# warm 目录中的 block,1 个 block 在 DISK 磁盘,另外两个在 ARCHIVE 磁盘
hdfs fsck /data/hdfs-test/data_phase/warm/profile -files -blocks -locations

# cold 目录中的 block,3 个 block 都在 ARCHIVE 磁盘
hdfs fsck /data/hdfs-test/data_phase/cold/profile -files -blocks -locations

2.4 HDFS 内存存储策略支持

HDFS 支持写入由 DataNode 管理的堆外内存。DataNode 异步地将内存中数据刷新到磁盘,从而减少代价较高的磁盘 IO 操作,这种写入称之为懒持久写入 Lazy Persist。

客户端进程向 NameNode 发起创建/写文件的请求。客户端请求到具体的 DataNode 后,DataNode 会把这些数据块写入 RAM 内存中,同时启动异步线程服务将内存数据持久化写到磁盘上。内存的异步持久化存储是指数据不是马上落盘,而是懒惰的、延时地进行处理

HDFS 为懒持久化写做了较大的持久性保证。在将副本保存到磁盘之前,如果节点重新启动,有非常小的几率会出现数据丢失。

该特性从 Hadoop 2.6.0 开始支持。

对目标文件目录设置 StoragePolicy 为 LAZY_PERSIST 的内存存储策略

(1)虚拟内存盘配置。将 tmpfs 挂载到目录 /mnt/dn-tmpfs/,并且限制内存使用大小为 1GB。

(2)内存存储介质设置。将机器中已经完成好的虚拟内存盘配置到 dfs.datanode.data.dir 中,其次还要带上 RAM_DISK 标签的配置项。

<!-- 配置 DataNode 存储目录,指定存储介质类型 -->
<property>
  <name>dfs.datanode.data.dir</name>
  <value>[RAM_DISK]/xxx/xxx, ...</value>
</property>

(3)参数设置优化

# 是否开启异构存储,默认开启
dfs.storage.policy.enabled=true
# 用于在数据节点上的内存中缓存块副本的内存量(以字节为单位)。默认情况下,此参数设置为 0,这将禁用内存中缓存。
# 内存值过小会导致内存中的总的可存储的数据块变少,但如果超过 DataNode 能承受的最大内存大小的话,部分内存块会被直接移出。
dfs.datanode.max.locked.memory=xxx

(4)在目录上设置存储策略

hdfs storagepolicies -setStoragePolicy -path <path> -policy LAZY_PERSIST

2.5 转账数据分层案例

银行每一天都有大量的转账、交易处理。用户每进行一笔交易或者转账,银行都需要将所有相关信息保存下来。银行拥有数 10 亿的用户。要保存的数据量可想而知。如果说有的数据,都同等对待,为了保证使用数据的性能采用高性能存储,这将是一笔不小的资源浪费。

实际上,超过一定时间的数据,数据访问的频率要低得多。例如:用户查询 5。年前的转账记录、要比查询 1 年类的转账记录频率要低得多。为了能够更好地利用资源,需要对数据进行分层。也就是不同时间范围的数据,放在不同的层(冷热温)中。

(1)创建存储数据目录

hdfs dfs -mkdir -p /source/bank/transfer/log_lte1y
hdfs dfs -mkdir -p /source/bank/transfer/log_gt1y

(2)指定存储策略

hdfs storagepolicies -setStoragePolicy -path /source/bank/transfer/log_lte1y -policy HOT
hdfs storagepolicies -setStoragePolicy -path /source/bank/transfer/log_gt1y -policy COLD

(3)上传文件测试

hdfs dfs -put ~/bank_record.csv /source/bank/transfer/log_lte1y/bank_record_2022_10.csv
hdfs dfs -put ~/bank_record.csv /source/bank/transfer/log_gt1y/bank_record_2021_07.csv

(4)查看目录存储策略

hdfs storagepolicies -getStoragePolicy -path /source/bank/transfer/log_lte1y
hdfs storagepolicies -getStoragePolicy -path /source/bank/transfer/log_gt1y

(5)查看文件情况

hdfs fsck /source/bank/transfer/log_gt1y/bank_record_2021_07.csv -files -blocks -locations

hdfs fsck /source/bank/transfer/log_lte1y/bank_record_2022_10.csv -files -blocks -locations

(6)假设现在到了 2023 年 10 年,我们可以将之前的数据移动到 log_gt1y。

# 文件移动 [log_lte1y -> log_gt1y]
hdfs dfs -mv /source/bank/transfer/log_lte1y/bank_record_2022_10.csv /source/bank/transfer/log_gt1y/bank_record_2022_10.csv

# 再次查看文件块分布,我们可以看到文件块依然放在原处
hdfs fsck /source/bank/transfer/log_gt1y/bank_record_2022_10.csv -files -blocks -locations

# 需要让它 HDFS 按照存储策略自行移动文件块
hdfs mover /hdfsdata

# 再次查看文件块分布,就已经符合设置的策略了

3. HDFS 小文件解决方案

3.1 Archive 归档

HDFS 并不擅长存储小文件,因为每个文件最少一个 block,每个 block 的元数据都会在 NameNode 占用内存,如果存在大量的小文件,它们会吃掉 NameNode 节点的大量内存。

Hadoop Archives 可以有效的处理以上问题,它可以把多个文件归档成为一个文件,归档成一个文件后还可以透明的访问每一个文件。

a. 创建 Archive

hadoop archive -archiveName <name> -p <parent> <src>* <dest>

其中 -archiveName 是指要创建的存档的名称。比如 test.har,archive 的名字的扩展名应该是 *.har-p 参数指定了归档文件中存储的文件路径前缀。

举个例子:-p /foo/bar a/b/c e/f/g,这里的 /foo/bara/b/ce/f/g 的父路径,所以完整路径为 /foo/bar/a/b/c/foo/bar/e/f/g

如果你只想存档一个目录 /smallfile 下的所有文件,使用如下命令就会在 /outputdir 目录下创建一个名为 test.har 的存档文件。

hadoop archive -archiveName test.har -p /smallfile /outputdir

【注】Archive 归档是通过 MapReduce 程序完成的,所以需要启动 YARN 集群。

b. 查看 Archive

查看归档之后的样子

首先我们来看下创建好的 har 文件。使用命令:hadoop fs -ls /outputdir/test.har

这里可以看到 har 文件包括:两个索引文件,多个 part 文件(本例只有 1 个)以及一个标识成功与否的文件。part 文件是多个原文件的集合, 通过 index 文件可以去找到原文件。

例如上述的三个小文件 1.txt 2.txt 3.txt 内容分别为 1 2 3。进行 archive 操作之后,三个小文件就归档到 test.har 里的 part-0 这一个文件里。

查看归档之前的样子

在查看 har 文件的时候,如果没有指定访问协议,默认使用的就是 hdfs://,此时所能看到的就是归档之后的样子。

此外,Archive 还提供了自己的 har uri 访问协议。如果用 har uri 去访问的话,索引、标识等文件就会隐藏起来,只显示创建档案之前的原文件。

$ har://scheme-hostname:port/archivepath/fileinarchive
# hadoop fs -ls har://hdfs-node1:8020/outputdir/test.har/
# hadoop fs -ls har:///outputdir/test.har
# hadoop fs -cat har:///outputdir/test.har/1.txt

c. 提取 Archive

# 按顺序解压存档(串行)
$ hadoop fs -cp har:///outputdir/test.har/* /smallfile1
# 要并行解压存档,使用 DistCp,对应大的归档文件可以提高效率
$ hadoop distcp har:///outputdir/test.har/* /smallfile2

d. 注意事项

  1. Hadoop archives 是特殊的档案格式。一个 Hadoop archive 对应一个文件系统目录。Hadoop archive 的扩展名是 *.har;
  2. 创建 archive 本质是运行一个 MapReduce 任务,所以应该在 Hadoop 集群上运行创建档案的命令;
  3. 创建 archive 文件要消耗和原文件一样多的硬盘空间;
  4. archive 文件不支持压缩,尽管 archive 文件看起来像已经被压缩过;
  5. archive 文件一旦创建就无法改变,要修改的话,需要创建新的 archive 文件。事实上,一般不会再对存档后的文件进行修改,因为它们是定期存档的,比如每周或每日;
  6. 当创建 archive 时,源文件不会被更改或删除,所以创建完 archive 后,需手动删除原文件。

3.2 Sequence File

a. 简述

Sequence File 是 Hadoop API 提供的一种二进制文件支持。这种二进制文件直接将 <key, value> 键值对序列化到文件中。

  • 优点
    • 二级制格式存储,比文本文件更紧凑;
    • 支持不同级别压缩(基于 Record 或 Block 压缩);
    • 文件可以拆分和并行处理,适用于 MapReduce。
  • 局限性
    • 二进制格式文件不方便查看;
    • 特定于 Hadoop,只有 Java API 可用于与之件进行交互。尚未提供多语言支持。

b. 格式

Hadoop Sequence File 是一个由二进制键/值对组成的。根据压缩类型,有 3 种不同的 Sequence File 格式:

  • 未压缩格式
  • record 压缩格式
  • block 压缩格式

Sequence File 由一个 header 和一个或多个 record 组成。以上 3 种格式均使用相同的 header 结构,如下所示:

前 3 个字节为 SEQ,表示该文件是序列文件,后跟一个字节表示实际版本号(例如 SEQ4 或 SEQ6)。Header 中其他也包括 key、value class 名字、 压缩细节、metadata、Sync marker。Sync Marker 同步标记,用于可以读取任意位置的数据。

(1)未压缩格式

未压缩的 Sequence File 文件由 header、record、sync 三个部分组成。其中 record 包含了 4 个部分:record length(记录长度)、key length(键长)、key、value。

每隔几个 record(100 字节左右)就有一个同步标记。

(2)基于 record 压缩格式

基于 record 压缩的 Sequence File 文件由 header、record、sync 三个部分组成。其中 record 包含了 4 个部分:record length(记录长度)、key length(键长)、key、compressed value(被压缩的值)

每隔几个 record(100 字节左右)就有一个同步标记。

(3)基于 block 压缩格式

基于 block 压缩的 Sequence File 文件由 header、block、sync 三个部分组成。

block 指的是 record block,可以理解为多个 record 记录组成的块。注意,这个 block 和 HDFS 中分块存储的 block(128M)是不同的概念。

block中包括:record 条数、压缩的 key 长度、压缩的 keys、压缩的 value 长度、压缩的 values。每隔一个 block 就有一个同步标记。

block 压缩比 record 压缩提供更好的压缩率。使用 Sequence File 时,通常首选块压缩。

c. 文件读写

(1)开发环境构建

<dependencies>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.1.4</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>3.1.4</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.4</version>
  </dependency>
</dependencies>

(2)SequenceFileWrite

package io.tree6x7.sequence;

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;


public class SequenceFileWrite {

    private static final String[] DATA = {
            "One, two, buckle my shoe",
            "Three, four, shut the door",
            "Five, six, pick up sticks",
            "Seven, eight, lay them straight",
            "Nine, ten, a big fat hen"
    };

    public static void main(String[] args) throws Exception {
        // 设置客户端运行身份:以 root 去操作访问 HDFS
        System.setProperty("HADOOP_USER_NAME","root");
        // Configuration 用于指定相关参数属性
        Configuration conf = new Configuration();
        // sequence file key、value
        IntWritable key = new IntWritable();
        Text value = new Text();
        // 构造Writer参数属性
        SequenceFile.Writer writer = null;
        CompressionCodec Codec = new GzipCodec();
        SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(new Path("hdfs://node1:8020/seq.out"));
        SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(key.getClass());
        SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(value.getClass());
        SequenceFile.Writer.Option optCom = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD,Codec);

        try {
            writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom);

            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                writer.append(key, value);
            }
        } finally {
            IOUtils.closeStream(writer);
        }
    }

}

(3)SequenceFileRead

package io.tree6x7.sequence;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;


public class SequenceFileRead {
    public static void main(String[] args) throws IOException {
        // 设置客户端运行身份,以 root 去操作访问 HDFS
        System.setProperty("HADOOP_USER_NAME", "root");
        // Configuration 用于指定相关参数属性
        Configuration conf = new Configuration();

        SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(new Path("hdfs://node1:8020/seq.out"));
        // 这个参数表示读取的长度
        SequenceFile.Reader.Option option2 = SequenceFile.Reader.length(174);

        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(conf, option1, option2);
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
            long position = reader.getPosition();
            while (reader.next(key, value)) {
                // 是否返回了SyncMark同步标记
                String syncSeen = reader.syncSeen() ? "*" : "";
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
                position = reader.getPosition(); // beginning of next record
            }
        } finally {
            IOUtils.closeStream(reader);
        }
    }

}

d. 合并小文件

可以使用 Sequence File 对小文件合并,即将文件名作为 key,文件内容作为 value 序列化到大文件中。例如,假设有 10,000 个 100KB 文件,那么我们可以编写一个程序将它们放入单个 Sequence File 中,如下所示,您可以在其中使用 filename 作为键,并使用 content 作为值。

package io.tree6x7.sequence;

import java.io.File;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MergeSmallFilesToSequenceFile {
    private Configuration configuration = new Configuration();
    private List<String> smallFilePaths = new ArrayList<String>();

    // 定义方法用来添加小文件的路径
    public void addInputPath(String inputPath) throws Exception {
        File file = new File(inputPath);
        // 给定路径是文件夹,则遍历文件夹,将子文件夹中的文件都放入smallFilePaths
        // 给定路径是文件,则把文件的路径放入smallFilePaths
        if (file.isDirectory()) {
            File[] files = FileUtil.listFiles(file);
            for (File sFile : files) {
                smallFilePaths.add(sFile.getPath());
                System.out.println("添加小文件路径:" + sFile.getPath());
            }
        } else {
            smallFilePaths.add(file.getPath());
            System.out.println("添加小文件路径:" + file.getPath());
        }
    }

    // 把 smallFilePaths 的小文件遍历读取,然后放入合并的 SequenceFile 容器中
    public void mergeFile() throws Exception {
        Writer.Option bigFile = Writer.file(new Path("D:\\bigfile.seq"));
        Writer.Option keyClass = Writer.keyClass(Text.class);
        Writer.Option valueClass = Writer.valueClass(BytesWritable.class);
        // 构造 writer
        Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);
        // 遍历读取小文件,逐个写入 SequenceFile
        Text key = new Text();
        for (String path : smallFilePaths) {
            File file = new File(path);
            long fileSize = file.length();// 获取文件的字节数大小
            byte[] fileContent = new byte[(int) fileSize];
            FileInputStream inputStream = new FileInputStream(file);
            // 把文件的二进制流加载到 fileContent 字节数组中去
            inputStream.read(fileContent, 0, (int) fileSize);
            String md5Str = DigestUtils.md5Hex(fileContent);
            System.out.println("merge小文件:" + path + ",md5:" + md5Str);
            key.set(path);
            // 把文件路径作为 key,文件内容做为 value,放入到 SequenceFile 中
            writer.append(key, new BytesWritable(fileContent));
        }
        writer.hflush();
        writer.close();
    }

    // 读取大文件中的小文件
    public void readMergedFile() throws Exception {
        Reader.Option file = Reader.file(new Path("D:\\bigfile.seq"));
        Reader reader = new Reader(configuration, file);
        Text key = new Text();
        BytesWritable value = new BytesWritable();
        while (reader.next(key, value)) {
            byte[] bytes = value.copyBytes();
            String md5 = DigestUtils.md5Hex(bytes);
            String content = new String(bytes, Charset.forName("GBK"));
            System.out.println("读取到文件:" + key + ",md5:" + md5 + ",content:" + content);
        }
    }

    public static void main(String[] args) throws Exception {
        MergeSmallFilesToSequenceFile msf = new MergeSmallFilesToSequenceFile();
        // 合并小文件
        // msf.addInputPath("D:\\datasets\\smallfile");
        // msf.mergeFile();
        // 读取大文件
        msf.readMergedFile();
    }
}