1. 存储格式
1.1 前置说明
Hadoop 上的文件存储格式,肯定不会像 Windows 这么丰富,因为目前我们用 Hadoop 来存储、处理数据。我们不会用 Hadoop 来听歌、看电影或者打游戏。
在 Hadoop 中,没有默认的文件格式,格式的选择取决于其用途。而选择一种优秀、适合的数据存储格式是非常重要的。
后续我们要学习的,使用 HDFS 的应用程序(e.g. MapReduce、Spark)性能中的最大问题、瓶颈是在特定位置查找数据的时间和写入到另一个位置的时间,而且管理大量数据的处理和存储也很复杂(e.g. 数据的格式会不断变化,原来一行有 12 列,后面要存储 20 列)。
l Hadoop 文件格式发展了好一段时间,这些文件存储格式可以解决大部分问题。我们在开发大数据中,选择合适的文件格式可能会带来一些明显的好处:
- 可以保证写入的速度
- 可以保证读取的速度
- 文件是可被切分的
- 对压缩支持友好
- 支持 schema 的更改
某些文件格式是为通用设计的,而其他文件则是针对更特定的场景,有些在设计时考虑了特定的数据特征。因此,确实有很多选择。
每种格式都有优点和缺点,数据处理的不同阶段可以使用不同的格式才会更有效率。通过选择一种格式,最大程度地发挥该存储格式的优势,最小化劣势。
BigData File Viewer
BigData File Viewer 是一个跨平台(Windows/MAC/Linux)桌面应用程序,用于查看常见的大数据二进制格式,例如 Parquet,ORC,AVRO 等。支持本地文件系统/HDFS/AWS S3 等。
https://github.com/Eugene-Mark/bigdata-file-viewer/
行式存储、列式存储
- 行式存储(Row-Based):同一行数据存储在一起
- 列式存储(Column-Based):同一列数据存储在一起
- 「行存储」的写入是一次性完成,消耗的时间比列存储少,并且能够保证数据的完整性,缺点是数据读取过程中会产生冗余数据,如果只有少量数据,此影响可以忽略;数量大可能会影响到数据的处理效率。行适合插入、不适合查询。
- 「列存储」在写入效率、保证数据完整性上都不如行存储,它的优势是在读取过程,不会产生冗余数据,这对数据完整性要求不高的大数据处理领域,比如互联网,尤为重要。列适合查询,不适合插入。
1.2 常见存储格式
a. Text File
文本格式是 Hadoop 生态系统内部和外部的最常见格式。通常按行存储,数据一行一行到排列,每一行都是一条记录。以回车换行符区分不同行数据。
最大缺点是不支持块级别压缩,因此在进行压缩时会带来较高的读取成本。
解析开销一般会比二进制格式高,尤其是 XML 和 JSON,它们的解析开销比 Textfile 还要大。
仅在需要从 Hadoop 中直接提取数据,或直接从文件中加载大量数据的情况下,才建议使用纯文本格式或 CSV。
b. Sequence File
Sequence 最初是为 MapReduce 设计的,因此和 MapReduce 集成很好。
在 Sequence File 中,每条数据记录(record)都是以 key、value 键值对进行序列化存储(二进制格式)。
Sequence File 中的数据是以二进制格式存储,这种格式所需的存储空间小于文本的格式。与文本文件一样,Sequence File 内部也不支持对键和值的结构指定格式编码。
序列化文件与文本文件相比更紧凑,支持 record 级、block 块级压缩。压缩的同时支持文件切分(下图的 Sync
即为拆分标记)。
通常把 Sequence File 作为中间数据存储格式。例如:将大量小文件合并放入到一个 Sequence File 中。
record 就是一个 kv 键值对,其中数据保存在 value 中,可以选择是否针对 value 进行压缩。block 就是多个 record 的集合,block 级别压缩性能更好。
缺点:
- 对于具有 SQL 类型的 Hive 支持不好,需要读取和解压缩所有字段。
- 不存储元数据,并且对 schema 扩展中的唯一方式是在末尾添加新字段。
c. Avro File
Apache Avro 是与语言无关的序列化系统,由 Hadoop 创始人 Doug Cutting 开发。
Avro 是基于行的存储格式,它在每个文件中都包含 JSON 格式的 schema 定义,从而提高了互操作性并允许 schema 的变化(删除/添加列)。除了支持可切分以外,还支持块压缩。
Avro 是一种自描述格式,它将数据的 schema 直接编码存储在文件中,可以用来存储复杂结构的数据。
适合于大量频繁写入宽表数据(字段多列多)的场景,其序列化反序列化很快。
优点:
- Avro 是与语言无关的数据序列化系统;
- Avro 将 schema 存储在 header 中,数据是自描述的;
- 序列化和反序列化速度很快;
- Avro 文件是可切分的、可压缩的,非常适合在 Hadoop 生态系统中进行数据存储。
缺点:
- 如果我们只需要对数据文件中的少数列进行操作,行式存储效率较低。例如:我们读取 15 列中的 2 列数据,基于行式存储就需要读取数百万行的 15 列。而列式存储就会比行式存储方式高效;
- 列式存储因为是将同一列(类)的数据存储在一起,压缩率要比行式存储高。
d. RC File
Hive Record Columnar File(记录列文件),这种类型的文件首先将数据按行划分为行组,然后在行组内部将数据存储在列中。
RC File 是为基于 MapReduce 的数据仓库系统设计的数据存储结构。它结合了行存储和列存储的优点,可以满足快速数据加载和查询,有效利用存储空间以及适应高负载的需求。
RC File 是由二进制键/值对组成的 flat 文件,它与 Sequence File 有很多相似之处。支持压缩、切分但不支持 schema 扩展,如果要添加新的列,则必须重写文件,这会降低操作效率。
在数仓中执行分析时,这种面向列的存储非常有用。当我们使用面向列的存储类型时,执行分析很容易。
- RC File 可将数据分为几组行,并且在其中将数据存储在列中;
- RC File 首先将行水平划分为行拆分(Row Group),然后以列方式垂直划分每个行拆分(Columns);
- RC File 将行拆分的元数据存储为 record 的 key,并将行拆分的所有数据存储 value;
- 作为行存储,RC File 保证同一行中的数据位于同一节点中;
- 作为列存储,RC File 可以利用列数据压缩,并跳过不必要的列读取。
e. ORC File
ORC File(Optimized Row Columnar)提供了比 RC File 更有效的文件格式。
它在内部将数据划分为默认大小为 250M 的 Stripe。每个条带均包含索引、数据和页脚。索引存储每列的最大值和最小值以及列中每一行的位置。
它并不是一个单纯的列式存储格式,仍然是首先根据 Stripe 分割整个表,在每一个 Stripe 内进行按列存储。
ORC 有多种文件压缩方式,并且有着很高的压缩比。文件是可切分(Split)的。ORC File 是以二进制方式存储的,所以是不可以直接读取。
优点:
- 比 TextFile、Sequence File 和 RC File 具备更好的的性能;
- 列数据单独存储;
- 带类型的数据存储格式,使用类型专用的编码器;
- 轻量级索引
缺点:
- 与 RC 一样,ORC 也是不支持列扩展的。
f. Parquet File
Parquet 是面向分析型业务的列式存储格式,由 Twitter 和 Cloudera 合作开发,2015 年 5 月从 Apache 的孵化器里毕业成为 Apache 顶级项目。
Parquet 文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此 Parquet 格式文件是自解析的。
和 ORCFile 一样,Parquet 也是基于列的二进制存储格式,可以存储嵌套的数据结构。
与 RC 和 ORC 文件不同,Parquet serdes 支持有限的 schema 扩展。在 Parquet 中,可以在结构的末尾添加新列。
Parquet 的存储模型主要由行组(Row Group)、列块(Column Chuck)、页(Page)组成。
在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。
行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。
Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位,同一列块的不同页可以使用不同的编码方式。
文件的首位都是该文件的 Magic Code,用于校验它是否是一个 Parquet 文件。
Parquet VS ORC:
- Parquet 与 RC/ORC 一样,也具有压缩和查询性能方面的优点,与非列文件格式相比,写入速度通常较慢;
- ORC 文件格式压缩比 Parquet 要高,Parquet 文件的数据格式 Schema 要比 ORC 复杂,占用的空间也就越高;
- ORC 文件格式的读取效率要比 Parquet 文件格式高;
- 如果数据中有嵌套结构的数据,则 Parquet 会更好;
- Hive 对 ORC 的支持更好,对 Parquet 支持不好,ORC 与 Hive 关联紧密。
- Spark 对 Parquet 支持较好,对 ORC 支持不好;
- 为了数据能够兼容更多的查询引擎,Parquet 也是一种较好的选择;
- ORC 还可以支持 ACID、Update 操作等;
关于 Hive 对 Parquet 文件的支持的一个注意事项:Parquet 列名必须小写,这一点非常重要。如果 Parquet 文件包含大小写混合的列名,则 Hive 将无法读取该列。
g. Apache Arrow
Apache Arrow 是一个跨语言平台,是一种列式内存数据结构,主要用于构建数据系统。Apache Arrow 在 2016/2/17 作为顶级 Apache 项目引入。
Apache Arrow 发展非常迅速,并且在未来会有更好的发展空间。 它可以在系统之间进行高效且快速的数据交换,而无需进行序列化,而这些成本已与其他系统(e.g. Thrift、Avro、Protocol Buffers)相关联。
每一个系统实现,它的方法(method)都有自己的内存存储格式,在开发中 70%~80% 的时间浪费在了序列化和反序列化上。
Arrow 促进了许多组件之间的通信。 例如,使用 Python(pandas)读取复杂的文件并将其转换为 Spark DataFrame。
Arrow 是如何提升数据移动性能的?
- 利用 Arrow 作为内存中数据表示的两个过程可以将数据从一种方法“重定向”到另一种方法,而无需序列化或反序列化。 例如,Spark 可以使用 Python 进程发送 Arrow 数据来执行用户定义的函数;
- 无需进行反序列化,可以直接从启用了 Arrow 的数据存储系统中接收 Arrow 数据。例如,Kudu 可以将 Arrow 数据直接发送到 Impala 进行分析;
- Arrow 的设计针对嵌套结构化数据(如 Impala、Spark Data 框架中)的分析性能进行了优化。
1.3 文件压缩格式
在 Hadoop 中,一般存储着非常大的文件,以及在存储 HDFS 块或运行 MapReduce 任务时,Hadoop 集群中节点之间的存在大量数据传输。 如果条件允许时,尽量减少文件大小,这将有助于减少存储需求以及减少网络上的数据传输。
a. Hadoop 支持
Haodop 对文件压缩均实现 org.apache.hadoop.io.compress.CompressionCodec
接口,所有的实现类都在 org.apache.hadoop.io.compress
包下。
b. 压缩算法优劣指标
- 压缩比 (原先占 100 份空间的东西经压缩之后变成了占 20 份空间,那么压缩比就是 5,显然压缩比越高越好)
- 压缩/解压缩吞吐量 (每秒能压缩或解压缩多少 MB 的数据,吞吐量也是越高越好)
- 压缩算法实现是否简单、开源
- 是否为无损压缩
- 压缩后的文件是否支持切分
c. 压缩算法比较
有不少的压缩算法可以应用到 Hadoop 中,但不同压缩算法有各自的特点。
压缩格式 | 工具 | 算法 | 文件扩展名 | 是否可切分 | 对应的编码解码器 |
---|---|---|---|---|---|
DEFLATE | 无 | DEFLATE | .deflate | 否 | org.apache.hadoop.io.compress.DefaultCodec |
Gzip | gzip | gzip | .gz | 否 | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | bzip2 | bzip2 | .bz2 | 是 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | lzop | LZO | .lzo | 是(切分点索引) | com.hadoop.compression.lzo.LzopCodec |
LZ4 | 无 | LZ4 | .lz4 | 否 | org.apache.hadoop.io.compress.Lz4Codec |
Snappy | 无 | Snappy | .snappy | 否 | org.apache.hadoop.io.compress.SnappyCodec |
存放数据到 HDFS 中,可以选择指定的压缩方式,在 MapReduce 程序读取时,会根据扩展名自动解压。例如:如果文件扩展名为 .snappy
,Hadoop 框架将自动使用 SnappyCodec 解压缩文件。
压缩比:
压缩、解压缩时间:
通过上图,我们可以看到哪些压缩算法压缩比更高。整体排序:Snappy < LZ4 < LZO < GZIP < BZIP2,但压缩比越高,压缩的时间也会更长。
d. 压缩如何抉择
既然压缩能够节省空间、而且可以提升 IO 效率,那么能否将所有数据都以压缩格式存储在 HDFS 中呢?
例如:BZIP2,而且文件是支持切分的。
如果选择 Gzip,就会出现以下情况:
- 如果文件是不可切分的,只有一个 CPU 在处理所有的文件,其他的 CPU 都是空闲的。如果 HDFS 中的 block 和文件大小差不多还好,一个文件、一个块、一个 CPU。如果是一个很大的文件就会出现问题了。
- bzip2 在压缩和解压缩数据方面实际上平均比 Gzip 差 3 倍,这对性能是有一定的影响的。如果我们需要频繁地查询数据,数据压缩一定会影响查询效率。
- 如果不关心查询性能(没有任何 SLA)并且很少选择此数据,则 bzip2 可能是不错的选择。最好是对自己的数据进行基准测试,然后再做决定。
小结:
- 压缩的合理使用可以提高 HDFS 存储效率;
- 压缩解压缩意味着 CPU、内存需要参与编码;
- 解码选择压缩算法时不能一味追求某一指标极致,综合考虑性价比较高的;
- 文件的压缩解压需要程序或者工具的参与来对数据进行处理,大数据相关处理软件都支持直接设置。
2. 存储类型和存储策略
2.1 HDFS 异构存储类型
冷、热、温、冻数据
通常,公司或者组织总是有相当多的历史数据占用昂贵的存储空间。典型的数据使用模式是新传入的数据被应用程序大量使用,从而该数据被标记为"热"数据。随着时间的推移,存储的数据每周被访问几次,而不是一天几次,这时认为其是"暖"数据。在接下来的几周和几个月中,数据使用率下降得更多,成为"冷"数据。如果很少使用数据,例如每年查询一次或两次,这时甚至可以根据其年龄创建第四个数据分类,并将这组很少被查询的旧数据称为"冻结数据"。
Hadoop 允许将不是热数据或者活跃数据的数据分配到比较便宜的存储上,用于归档或冷存储。可以设置存储策略,将较旧的数据从昂贵的高性能存储上转移到性价比较低(较便宜)的存储设备上。
Hadoop 2.5 及以上版本都支持存储策略,在该策略下,不仅可以在默认的传统磁盘上存储 HDFS 数据,还可以在 SSD(固态硬盘)上存储数据。
什么是异构存储?
异构存储是 Hadoop2.6.0 版本出现的新特性,可以根据各个存储介质读写特性不同进行选择。
例如冷热数据的存储,对冷数据采取容量大,读写性能不高的存储介质如机械硬盘,对于热数据,可使用 SSD 硬盘存储。
在读写效率上性能差距大。异构特性允许我们对不同文件选择不同的存储介质进行保存,以实现机器性能的最大化。
HDFS 中声明定义了 4 种异构存储类型
public enum StorageType {
// sorted by the speed of the storage types, from fast to slow
RAM_DISK(true),
SSD(false),
DISK(false),
ARCHIVE(false),
PROVIDED(false);
// 表示该存储类型是否是临时的或易失性的
private final boolean isTransient;
StorageType(boolean isTransient) {
this.isTransient = isTransient;
}
}
RAM_DISK
:这种存储类型表示数据块存储在内存中。因为内存的读写速度比磁盘和固态硬盘都要快,所以这种存储类型适用于需要快速读写数据的场景。不过需要注意的是,这种存储类型的数据会在机器重启或停电时丢失,因此只适用于暂时存储数据的场景。SSD
:这种存储类型表示数据块存储在固态硬盘上。固态硬盘的访问速度比传统的机械硬盘快,因此这种存储类型适用于对速度要求比较高的场景。DISK
:这种存储类型表示数据块存储在常规的磁盘上。这是一种比较常见的存储介质类型。ARCHIVE
:这种存储类型表示数据块存储在归档介质中。归档介质是一种低速的存储介质,适用于长期存储数据,但不适合频繁读写的场景。因此,这种存储类型一般用于存储备份数据或归档数据。PROVIDED
:这种存储类型表示数据块是由外部的存储系统提供的,而不是由 Hadoop 系统管理的。这种存储类型通常用于集成外部存储系统,如 Amazon S3 或 Azure Blob Storage。
如何让 HDFS 知道集群中的数据存储目录是哪种类型存储介质?
配置属性时主动声明,HDFS 并没有自动检测的能力。
配置参数 dfs.datanode.data.dir
,如果目录前没有带上[SSD] [DISK] [ARCHIVE] [RAM_DISK]
这 4 种类型中的任何一种,则默认是 DISK 类型 。
<property>
<name>dfs.datanode.data.dir</name>
<value>[DISK]xxx</value>
</property>
2.2 块存储类型选择策略
块存储指的是对 HDFS 文件的数据块副本储存。
对于数据的存储介质,HDFS 的 BlockStoragePolicySuite 类内部定义了 6 种策略:HOT、WARN、COLD、ALL_SSD、One_SSD、Lazy_Persistence 等存储策略。为了根据不同的存储策略将文件存储在不同的存储类型中,引入了一种新的存储策略概念。
前 3 种根据冷热数据区分,后 3 种根据磁盘性质区分。
HOT
:策略的设计初衷是为了优化热点数据的访问速度,它会优先选择访问频率高的数据块,并将它们存储在速度更快的存储介质中,例如RAM_DISK
或SSD
。这样做的目的是提高热点数据的访问速度,从而加速整个系统的响应速度。WARN
:WARN
策略更注重存储空间的利用率,它会考虑数据块的使用情况和空间利用率,避免过度占用存储空间。当存储空间越来越紧张时,WARN
策略会放宽对存储介质的要求,例如选择将数据块存储在相对较慢的DISK
中,以便释放更多的RAM_DISK
或SSD
存储空间,从而提高存储空间的利用率。COLD
:该策略优先选择存储在ARCHIVE
中的数据块,用于存储访问频率较低的数据。这种策略适用于对存储成本有较高要求,且可以接受数据访问速度较慢的场景。ALL_SSD
:该策略选择存储在SSD
中的数据块,用于加速所有数据的访问速度。这种策略适用于对数据访问速度有非常高要求的场景,但存储成本也非常高。ONE_SSD
:该策略选择存储在SSD
中的单个数据块,用于加速该数据块的访问速度。这种策略适用于对某些数据块有非常高要求的场景,但不适用于加速整个数据集的访问速度。LAZY_PERSISTENCE
:该策略将数据块首先存储在RAM_DISK
中,然后异步将其写入DISK
中。这种策略适用于需要快速写入数据,但不需要立即持久化的场景。此外,由于数据块会被异步写入DISK
中,因此可能会丢失一些数据。
尽管 HOT
和 WARN
策略都优先选择 RAM_DISK
、SSD
或 DISK
中的数据块,但它们的权衡点不同,前者更加注重数据块的访问频率,而后者更加注重存储空间的利用率。
(1)列出所有存储策略
hdfs storagepolicies -listPolicies
(2)设置存储策略
hdfs storagepolicies -setStoragePolicy -path <path> -policy <policy>
(3)取消存储策略
hdfs storagepolicies -unsetStoragePolicy -path <path>
(4)获取存储策略
hdfs storagepolicies -getStoragePolicy -path <path>
2.3 冷热温数据异构存储测试
(1)为了能够支撑不同类型的数据,我们需要在 hdfs-site.xml 中配置不同存储类型数据的位置。
<!--配置DataNode存储目录,指定存储介质类型 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>[DISK]file://${hadoop.tmp.dir}/dfs/data,[ARCHIVE]file://${hadoop.tmp.dir}/dfs/data/archive</value>
</property>
(2)分发到不同另外两个节点中,然后重启 HDFS 集群。
配置好后,我们在 WebUI 的 DataNodes 页面中点击任意一个 DataNode 节点:
可以看到,现在配置的是两个目录,一个 StorageType 为 ARCHIVE、一个 StorageType 为 DISK。
(3)创建测试目录结构
hdfs dfs -mkdir -p /data/hdfs-test/data_phase/hot
hdfs dfs -mkdir -p /data/hdfs-test/data_phase/warm
hdfs dfs -mkdir -p /data/hdfs-test/data_phase/cold
(4)分别设置三个目录的存储策略
# 分别设置三个目录的存储策略
hdfs storagepolicies -setStoragePolicy -path /data/hdfs-test/data_phase/hot -policy HOT
hdfs storagepolicies -setStoragePolicy -path /data/hdfs-test/data_phase/warm -policy WARM
hdfs storagepolicies -setStoragePolicy -path /data/hdfs-test/data_phase/cold -policy COLD
# 查看当前 HDFS 支持的存储策略
hdfs storagepolicies -getStoragePolicy -path /data/hdfs-test/data_phase/hot
hdfs storagepolicies -getStoragePolicy -path /data/hdfs-test/data_phase/warm
hdfs storagepolicies -getStoragePolicy -path /data/hdfs-test/data_phase/cold
(5)分别上传文件到三个目录中测试
hdfs dfs -put /etc/profile /data/hdfs-test/data_phase/hot
hdfs dfs -put /etc/profile /data/hdfs-test/data_phase/warm
hdfs dfs -put /etc/profile /data/hdfs-test/data_phase/cold
(6)查看不同存储策略文件的 block 位置
# hot 目录中的 block,3 个 block 都在 DISK 磁盘
hdfs fsck /data/hdfs-test/data_phase/hot/profile -files -blocks -locations
# warm 目录中的 block,1 个 block 在 DISK 磁盘,另外两个在 ARCHIVE 磁盘
hdfs fsck /data/hdfs-test/data_phase/warm/profile -files -blocks -locations
# cold 目录中的 block,3 个 block 都在 ARCHIVE 磁盘
hdfs fsck /data/hdfs-test/data_phase/cold/profile -files -blocks -locations
2.4 HDFS 内存存储策略支持
HDFS 支持写入由 DataNode 管理的堆外内存。DataNode 异步地将内存中数据刷新到磁盘,从而减少代价较高的磁盘 IO 操作,这种写入称之为懒持久写入 Lazy Persist。
客户端进程向 NameNode 发起创建/写文件的请求。客户端请求到具体的 DataNode 后,DataNode 会把这些数据块写入 RAM 内存中,同时启动异步线程服务将内存数据持久化写到磁盘上。内存的异步持久化存储是指数据不是马上落盘,而是懒惰的、延时地进行处理。
HDFS 为懒持久化写做了较大的持久性保证。在将副本保存到磁盘之前,如果节点重新启动,有非常小的几率会出现数据丢失。
该特性从 Hadoop 2.6.0 开始支持。
对目标文件目录设置 StoragePolicy 为 LAZY_PERSIST 的内存存储策略
(1)虚拟内存盘配置。将 tmpfs 挂载到目录 /mnt/dn-tmpfs/,并且限制内存使用大小为 1GB。
(2)内存存储介质设置。将机器中已经完成好的虚拟内存盘配置到 dfs.datanode.data.dir
中,其次还要带上 RAM_DISK 标签的配置项。
<!-- 配置 DataNode 存储目录,指定存储介质类型 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>[RAM_DISK]/xxx/xxx, ...</value>
</property>
(3)参数设置优化
# 是否开启异构存储,默认开启
dfs.storage.policy.enabled=true
# 用于在数据节点上的内存中缓存块副本的内存量(以字节为单位)。默认情况下,此参数设置为 0,这将禁用内存中缓存。
# 内存值过小会导致内存中的总的可存储的数据块变少,但如果超过 DataNode 能承受的最大内存大小的话,部分内存块会被直接移出。
dfs.datanode.max.locked.memory=xxx
(4)在目录上设置存储策略
hdfs storagepolicies -setStoragePolicy -path <path> -policy LAZY_PERSIST
2.5 转账数据分层案例
银行每一天都有大量的转账、交易处理。用户每进行一笔交易或者转账,银行都需要将所有相关信息保存下来。银行拥有数 10 亿的用户。要保存的数据量可想而知。如果说有的数据,都同等对待,为了保证使用数据的性能采用高性能存储,这将是一笔不小的资源浪费。
实际上,超过一定时间的数据,数据访问的频率要低得多。例如:用户查询 5。年前的转账记录、要比查询 1 年类的转账记录频率要低得多。为了能够更好地利用资源,需要对数据进行分层。也就是不同时间范围的数据,放在不同的层(冷热温)中。
(1)创建存储数据目录
hdfs dfs -mkdir -p /source/bank/transfer/log_lte1y
hdfs dfs -mkdir -p /source/bank/transfer/log_gt1y
(2)指定存储策略
hdfs storagepolicies -setStoragePolicy -path /source/bank/transfer/log_lte1y -policy HOT
hdfs storagepolicies -setStoragePolicy -path /source/bank/transfer/log_gt1y -policy COLD
(3)上传文件测试
hdfs dfs -put ~/bank_record.csv /source/bank/transfer/log_lte1y/bank_record_2022_10.csv
hdfs dfs -put ~/bank_record.csv /source/bank/transfer/log_gt1y/bank_record_2021_07.csv
(4)查看目录存储策略
hdfs storagepolicies -getStoragePolicy -path /source/bank/transfer/log_lte1y
hdfs storagepolicies -getStoragePolicy -path /source/bank/transfer/log_gt1y
(5)查看文件情况
hdfs fsck /source/bank/transfer/log_gt1y/bank_record_2021_07.csv -files -blocks -locations
hdfs fsck /source/bank/transfer/log_lte1y/bank_record_2022_10.csv -files -blocks -locations
(6)假设现在到了 2023 年 10 年,我们可以将之前的数据移动到 log_gt1y。
# 文件移动 [log_lte1y -> log_gt1y]
hdfs dfs -mv /source/bank/transfer/log_lte1y/bank_record_2022_10.csv /source/bank/transfer/log_gt1y/bank_record_2022_10.csv
# 再次查看文件块分布,我们可以看到文件块依然放在原处
hdfs fsck /source/bank/transfer/log_gt1y/bank_record_2022_10.csv -files -blocks -locations
# 需要让它 HDFS 按照存储策略自行移动文件块
hdfs mover /hdfsdata
# 再次查看文件块分布,就已经符合设置的策略了
3. HDFS 小文件解决方案
3.1 Archive 归档
HDFS 并不擅长存储小文件,因为每个文件最少一个 block,每个 block 的元数据都会在 NameNode 占用内存,如果存在大量的小文件,它们会吃掉 NameNode 节点的大量内存。
Hadoop Archives 可以有效的处理以上问题,它可以把多个文件归档成为一个文件,归档成一个文件后还可以透明的访问每一个文件。
a. 创建 Archive
hadoop archive -archiveName <name> -p <parent> <src>* <dest>
其中 -archiveName
是指要创建的存档的名称。比如 test.har,archive 的名字的扩展名应该是 *.har
。-p
参数指定了归档文件中存储的文件路径前缀。
举个例子:-p /foo/bar a/b/c e/f/g
,这里的 /foo/bar
是 a/b/c
与 e/f/g
的父路径,所以完整路径为 /foo/bar/a/b/c
与 /foo/bar/e/f/g
。
如果你只想存档一个目录 /smallfile 下的所有文件,使用如下命令就会在 /outputdir 目录下创建一个名为 test.har 的存档文件。
hadoop archive -archiveName test.har -p /smallfile /outputdir
【注】Archive 归档是通过 MapReduce 程序完成的,所以需要启动 YARN 集群。
b. 查看 Archive
查看归档之后的样子
首先我们来看下创建好的 har 文件。使用命令:hadoop fs -ls /outputdir/test.har
这里可以看到 har 文件包括:两个索引文件,多个 part 文件(本例只有 1 个)以及一个标识成功与否的文件。part 文件是多个原文件的集合, 通过 index 文件可以去找到原文件。
例如上述的三个小文件 1.txt 2.txt 3.txt 内容分别为 1 2 3。进行 archive 操作之后,三个小文件就归档到 test.har 里的 part-0 这一个文件里。
查看归档之前的样子
在查看 har 文件的时候,如果没有指定访问协议,默认使用的就是 hdfs://
,此时所能看到的就是归档之后的样子。
此外,Archive 还提供了自己的 har uri 访问协议。如果用 har uri 去访问的话,索引、标识等文件就会隐藏起来,只显示创建档案之前的原文件。
$ har://scheme-hostname:port/archivepath/fileinarchive
# hadoop fs -ls har://hdfs-node1:8020/outputdir/test.har/
# hadoop fs -ls har:///outputdir/test.har
# hadoop fs -cat har:///outputdir/test.har/1.txt
c. 提取 Archive
# 按顺序解压存档(串行)
$ hadoop fs -cp har:///outputdir/test.har/* /smallfile1
# 要并行解压存档,使用 DistCp,对应大的归档文件可以提高效率
$ hadoop distcp har:///outputdir/test.har/* /smallfile2
d. 注意事项
- Hadoop archives 是特殊的档案格式。一个 Hadoop archive 对应一个文件系统目录。Hadoop archive 的扩展名是 *.har;
- 创建 archive 本质是运行一个 MapReduce 任务,所以应该在 Hadoop 集群上运行创建档案的命令;
- 创建 archive 文件要消耗和原文件一样多的硬盘空间;
- archive 文件不支持压缩,尽管 archive 文件看起来像已经被压缩过;
- archive 文件一旦创建就无法改变,要修改的话,需要创建新的 archive 文件。事实上,一般不会再对存档后的文件进行修改,因为它们是定期存档的,比如每周或每日;
- 当创建 archive 时,源文件不会被更改或删除,所以创建完 archive 后,需手动删除原文件。
3.2 Sequence File
a. 简述
Sequence File 是 Hadoop API 提供的一种二进制文件支持。这种二进制文件直接将 <key, value>
键值对序列化到文件中。
- 优点
- 二级制格式存储,比文本文件更紧凑;
- 支持不同级别压缩(基于 Record 或 Block 压缩);
- 文件可以拆分和并行处理,适用于 MapReduce。
- 局限性
- 二进制格式文件不方便查看;
- 特定于 Hadoop,只有 Java API 可用于与之件进行交互。尚未提供多语言支持。
b. 格式
Hadoop Sequence File 是一个由二进制键/值对组成的。根据压缩类型,有 3 种不同的 Sequence File 格式:
- 未压缩格式
- record 压缩格式
- block 压缩格式
Sequence File 由一个 header 和一个或多个 record 组成。以上 3 种格式均使用相同的 header 结构,如下所示:
前 3 个字节为 SEQ,表示该文件是序列文件,后跟一个字节表示实际版本号(例如 SEQ4 或 SEQ6)。Header 中其他也包括 key、value class 名字、 压缩细节、metadata、Sync marker。Sync Marker 同步标记,用于可以读取任意位置的数据。
(1)未压缩格式
未压缩的 Sequence File 文件由 header、record、sync 三个部分组成。其中 record 包含了 4 个部分:record length(记录长度)、key length(键长)、key、value。
每隔几个 record(100 字节左右)就有一个同步标记。
(2)基于 record 压缩格式
基于 record 压缩的 Sequence File 文件由 header、record、sync 三个部分组成。其中 record 包含了 4 个部分:record length(记录长度)、key length(键长)、key、compressed value(被压缩的值)。
每隔几个 record(100 字节左右)就有一个同步标记。
(3)基于 block 压缩格式
基于 block 压缩的 Sequence File 文件由 header、block、sync 三个部分组成。
block 指的是 record block,可以理解为多个 record 记录组成的块。注意,这个 block 和 HDFS 中分块存储的 block(128M)是不同的概念。
block中包括:record 条数、压缩的 key 长度、压缩的 keys、压缩的 value 长度、压缩的 values。每隔一个 block 就有一个同步标记。
block 压缩比 record 压缩提供更好的压缩率。使用 Sequence File 时,通常首选块压缩。
c. 文件读写
(1)开发环境构建
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
</dependencies>
(2)SequenceFileWrite
package io.tree6x7.sequence;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
public class SequenceFileWrite {
private static final String[] DATA = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};
public static void main(String[] args) throws Exception {
// 设置客户端运行身份:以 root 去操作访问 HDFS
System.setProperty("HADOOP_USER_NAME","root");
// Configuration 用于指定相关参数属性
Configuration conf = new Configuration();
// sequence file key、value
IntWritable key = new IntWritable();
Text value = new Text();
// 构造Writer参数属性
SequenceFile.Writer writer = null;
CompressionCodec Codec = new GzipCodec();
SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(new Path("hdfs://node1:8020/seq.out"));
SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(value.getClass());
SequenceFile.Writer.Option optCom = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD,Codec);
try {
writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom);
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
(3)SequenceFileRead
package io.tree6x7.sequence;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
public class SequenceFileRead {
public static void main(String[] args) throws IOException {
// 设置客户端运行身份,以 root 去操作访问 HDFS
System.setProperty("HADOOP_USER_NAME", "root");
// Configuration 用于指定相关参数属性
Configuration conf = new Configuration();
SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(new Path("hdfs://node1:8020/seq.out"));
// 这个参数表示读取的长度
SequenceFile.Reader.Option option2 = SequenceFile.Reader.length(174);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf, option1, option2);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
// 是否返回了SyncMark同步标记
String syncSeen = reader.syncSeen() ? "*" : "";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next record
}
} finally {
IOUtils.closeStream(reader);
}
}
}
d. 合并小文件
可以使用 Sequence File 对小文件合并,即将文件名作为 key,文件内容作为 value 序列化到大文件中。例如,假设有 10,000 个 100KB 文件,那么我们可以编写一个程序将它们放入单个 Sequence File 中,如下所示,您可以在其中使用 filename 作为键,并使用 content 作为值。
package io.tree6x7.sequence;
import java.io.File;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MergeSmallFilesToSequenceFile {
private Configuration configuration = new Configuration();
private List<String> smallFilePaths = new ArrayList<String>();
// 定义方法用来添加小文件的路径
public void addInputPath(String inputPath) throws Exception {
File file = new File(inputPath);
// 给定路径是文件夹,则遍历文件夹,将子文件夹中的文件都放入smallFilePaths
// 给定路径是文件,则把文件的路径放入smallFilePaths
if (file.isDirectory()) {
File[] files = FileUtil.listFiles(file);
for (File sFile : files) {
smallFilePaths.add(sFile.getPath());
System.out.println("添加小文件路径:" + sFile.getPath());
}
} else {
smallFilePaths.add(file.getPath());
System.out.println("添加小文件路径:" + file.getPath());
}
}
// 把 smallFilePaths 的小文件遍历读取,然后放入合并的 SequenceFile 容器中
public void mergeFile() throws Exception {
Writer.Option bigFile = Writer.file(new Path("D:\\bigfile.seq"));
Writer.Option keyClass = Writer.keyClass(Text.class);
Writer.Option valueClass = Writer.valueClass(BytesWritable.class);
// 构造 writer
Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);
// 遍历读取小文件,逐个写入 SequenceFile
Text key = new Text();
for (String path : smallFilePaths) {
File file = new File(path);
long fileSize = file.length();// 获取文件的字节数大小
byte[] fileContent = new byte[(int) fileSize];
FileInputStream inputStream = new FileInputStream(file);
// 把文件的二进制流加载到 fileContent 字节数组中去
inputStream.read(fileContent, 0, (int) fileSize);
String md5Str = DigestUtils.md5Hex(fileContent);
System.out.println("merge小文件:" + path + ",md5:" + md5Str);
key.set(path);
// 把文件路径作为 key,文件内容做为 value,放入到 SequenceFile 中
writer.append(key, new BytesWritable(fileContent));
}
writer.hflush();
writer.close();
}
// 读取大文件中的小文件
public void readMergedFile() throws Exception {
Reader.Option file = Reader.file(new Path("D:\\bigfile.seq"));
Reader reader = new Reader(configuration, file);
Text key = new Text();
BytesWritable value = new BytesWritable();
while (reader.next(key, value)) {
byte[] bytes = value.copyBytes();
String md5 = DigestUtils.md5Hex(bytes);
String content = new String(bytes, Charset.forName("GBK"));
System.out.println("读取到文件:" + key + ",md5:" + md5 + ",content:" + content);
}
}
public static void main(String[] args) throws Exception {
MergeSmallFilesToSequenceFile msf = new MergeSmallFilesToSequenceFile();
// 合并小文件
// msf.addInputPath("D:\\datasets\\smallfile");
// msf.mergeFile();
// 读取大文件
msf.readMergedFile();
}
}