iceberg支持ORC和Parquat两种列存格式。相比于Parquat,ORC在查询性能以及ACID支持方面具有一定的优势。于是考虑到后续湖仓查询性能以及数据湖的ACID需求,现对ORC进行调研,以支持后续Flink+iceberg+orc的demo测试。
调研重点:ORC文件的编码以及文件组织方式,索引支持。
File Layout
一个ORC文件总体上可以分为三大部分:
- Header:标识文件的类型
- Body:包含Row Data和Indexes,如下图
- Tail:包含文件级别的信息
ORC Specification v1
File Tail
由于现在的分布式存储一般只提供AppendOnly语义,ORC 文件在文件的尾部有一块区域,用于维护文件级别(Top Level)的信息。
source code
The sections of the file tail are (and their protobuf message type):
- encrypted stripe statistics: list of ColumnarStripeStatistics
- stripe statistics: Metadata
- footer: Footer
- postscript: PostScript
- psLen: byte
postscript
不允许被压缩,包含了解析文件尾部所需要的信息,比如:Footer和Metadata区域的长度,压缩方法等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
message PostScript {
// the length of the footer section in bytes
optional uint64 footerLength = 1;
// the kind of generic compression used
optional CompressionKind compression = 2;
// the maximum size of each compression chunk
optional uint64 compressionBlockSize = 3;
// the version of the writer
repeated uint32 version = 4 [packed = true];
// the length of the metadata section in bytes
optional uint64 metadataLength = 5;
// the fixed string "ORC"
optional string magic = 8000;
}
...
enum CompressionKind {
NONE = 0;
ZLIB = 1;
SNAPPY = 2;
LZO = 3;
LZ4 = 4;
ZSTD = 5;
}
|
Footer包含了文件的schema信息,文件的行数,列级别的统计信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
message Footer {
// the length of the file header in bytes (always 3)
optional uint64 headerLength = 1;
// the length of the file header and body in bytes
optional uint64 contentLength = 2;
// the information about the stripes
repeated StripeInformation stripes = 3;
// the schema information
repeated Type types = 4;
// the user metadata that was added
repeated UserMetadataItem metadata = 5;
// the total number of rows in the file
optional uint64 numberOfRows = 6;
// the statistics of each column across the file
repeated ColumnStatistics statistics = 7;
// the maximum number of rows in each index entry
optional uint32 rowIndexStride = 8;
// Each implementation that writes ORC files should register for a code
// 0 = ORC Java
// 1 = ORC C++
// 2 = Presto
// 3 = Scritchley Go from https://github.com/scritchley/orc
// 4 = Trino
optional uint32 writer = 9;
// information about the encryption in this file
optional Encryption encryption = 10;
// the number of bytes in the encrypted stripe statistics
optional uint64 stripeStatisticsLength = 11;
}
|
- Stripe Information
Body中的数据会被分为多个Stripe,行数据不允许跨Stripe存储。Stripe包含:**indexes of rows、row data、stripe footer。**indexes和data在stripe中按列存储。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
message StripeInformation {
// the start of the stripe within the file
optional uint64 offset = 1;
// the length of the indexes in bytes
optional uint64 indexLength = 2;
// the length of the data in bytes
optional uint64 dataLength = 3;
// the length of the footer in bytes
optional uint64 footerLength = 4;
// the number of rows in the stripe
optional uint64 numberOfRows = 5;
// If this is present, the reader should use this value for the encryption
// stripe id for setting the encryption IV. Otherwise, the reader should
// use one larger than the previous stripe's encryptStripeId.
// For unmerged ORC files, the first stripe will use 1 and the rest of the
// stripes won't have it set. For merged files, the stripe information
// will be copied from their original files and thus the first stripe of
// each of the input files will reset it to 1.
// Note that 1 was choosen, because protobuf v3 doesn't serialize
// primitive types that are the default (eg. 0).
optional uint64 encryptStripeId = 6;
// For each encryption variant, the new encrypted local key to use until we
// find a replacement.
repeated bytes encryptedLocalKeys = 7;
}
|
ORC文件的type schema必须保持一致。嵌套数据类型使用树形结构组织。
Case in Hive DDL:
1
2
3
4
5
6
7
|
create table Foobar (
myInt int,
myMap map<string,
struct<myString : string,
myDouble: double>>,
myTime timestamp
);
|
支持的类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
message Type {
enum Kind {
BOOLEAN = 0;
BYTE = 1;
SHORT = 2;
INT = 3;
LONG = 4;
FLOAT = 5;
DOUBLE = 6;
STRING = 7;
BINARY = 8;
TIMESTAMP = 9;
LIST = 10;
MAP = 11;
STRUCT = 12;
UNION = 13;
DECIMAL = 14;
DATE = 15;
VARCHAR = 16;
CHAR = 17;
TIMESTAMP_INSTANT = 18;
}
// the kind of this type
required Kind kind = 1;
// the type ids of any subcolumns for list, map, struct, or union
repeated uint32 subtypes = 2 [packed=true];
// the list of field names for struct
repeated string fieldNames = 3;
// the maximum length of the type for varchar or char in UTF-8 characters
optional uint32 maximumLength = 4;
// the precision and scale for decimal
optional uint32 precision = 5;
optional uint32 scale = 6;
}
|
有关列的一些简单的statistics,可用于粗粒度过滤以及cbo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
message ColumnStatistics {
// the number of values
optional uint64 numberOfValues = 1;
// At most one of these has a value for any column
optional IntegerStatistics intStatistics = 2;
optional DoubleStatistics doubleStatistics = 3;
optional StringStatistics stringStatistics = 4;
optional BucketStatistics bucketStatistics = 5;
optional DecimalStatistics decimalStatistics = 6;
optional DateStatistics dateStatistics = 7;
optional BinaryStatistics binaryStatistics = 8;
optional TimestampStatistics timestampStatistics = 9;
optional bool hasNull = 10;
}
...
message IntegerStatistics {
optional sint64 minimum = 1;
optional sint64 maximum = 2;
optional sint64 sum = 3;
}
|
1
2
3
4
5
6
|
message UserMetadataItem {
// the user defined key
required string name = 1;
// the user defined binary value
required bytes value = 2;
}
|
1
2
3
4
5
6
7
8
9
|
message Metadata {
repeated StripeStatistics stripeStats = 1;
}
...
message StripeStatistics {
repeated ColumnStatistics colStats = 1;
}
|
Stripe
Body部分的数据可以被拆分成Stripes,Stripes are large (typically ~200MB)。包含三个部分:Index Data、Row Data、Stripe Footer
在Stripe中,列数据分别以一组Stream的形式相邻于文件,如:
- integer column:由两种数据流构成,PRESENT(代表是否为空) + DATA(具体数值)
- string column:PRESENT + DATA + LENGTH
包含列的编码信息,以及流的相关信息
1
2
3
4
5
6
7
8
9
|
message StripeFooter {
// the location of each stream
repeated Stream streams = 1;
// the encoding of each column
repeated ColumnEncoding columns = 2;
optional string writerTimezone = 3;
// one for each column encryption variant
repeated StripeEncryptionVariant encryption = 4;
}
|
stream
表示文件中的一段二进制流,每一个stream的数据会根据该列的类型使用特定的压缩算法保存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
message Stream {
enum Kind {
// boolean stream of whether the next value is non-null
PRESENT = 0;
// the primary data stream
DATA = 1;
// the length of each value for variable length data
LENGTH = 2;
// the dictionary blob
DICTIONARY_DATA = 3;
// deprecated prior to Hive 0.11
// It was used to store the number of instances of each value in the
// dictionary
DICTIONARY_COUNT = 4;
// a secondary data stream
SECONDARY = 5;
// the index for seeking to particular row groups
ROW_INDEX = 6;
// original bloom filters used before ORC-101
BLOOM_FILTER = 7;
// bloom filters that consistently use utf8
BLOOM_FILTER_UTF8 = 8;
// Virtual stream kinds to allocate space for encrypted index and data.
ENCRYPTED_INDEX = 9;
ENCRYPTED_DATA = 10;
// stripe statistics streams
STRIPE_STATISTICS = 100;
// A virtual stream kind that is used for setting the encryption IV.
FILE_STATISTICS = 101;
}
required Kind kind = 1;
// the column id
optional uint32 column = 2;
// the number of bytes in the file
optional uint64 length = 3;
}
message ColumnEncoding {
enum Kind {
// the encoding is mapped directly to the stream using RLE v1
DIRECT = 0;
// the encoding uses a dictionary of unique values using RLE v1
DICTIONARY = 1;
// the encoding is direct using RLE v2
DIRECT_V2 = 2;
// the encoding is dictionary-based using RLE v2
DICTIONARY_V2 = 3;
}
required Kind kind = 1;
// for dictionary encodings, record the size of the dictionary
optional uint32 dictionarySize = 2;
}
|
索引支持
ORC支持三种级别的索引:
level |
位于ORC的哪块 |
数据内容 |
file level |
file footer |
整个文件的列级别的统计 |
stripe level |
file footer |
每个stripe的列级别的统计 |
row level |
stripe的最开始部分 |
每个RowGroup的列级别的统计和每个RowGroup的起始位置 |
Row Level
行级的索引主要有Row Group Index 和 Bloom Fliter Index
Row Group Index
索引由多个原始类型列的ROW_INDEX stream组成,每个Row Group拥有一个RowIndexEntry。
默认10000行的数据组成一个RowGroup
1
2
3
4
5
6
7
8
|
message RowIndex {
repeated RowIndexEntry entry = 1;
}
message RowIndexEntry {
repeated uint64 positions = 1 [packed=true];
optional ColumnStatistics statistics = 2;
}
|
Bloom Fliter Index
由多个列的BLOOM_FILTER stream组成。每个列拥有一个BloomFilter
1
2
3
4
5
6
7
8
9
|
message BloomFilter {
optional uint32 numHashFunctions = 1;
repeated fixed64 bitset = 2;
optional bytes utf8bitset = 3;
}
message BloomFilterIndex {
repeated BloomFilter bloomFilter = 1;
}
|
Bloom filter streams are interlaced with row group indexes.
数据访问路径
Postscript -> Footer -> 获取对于的StripeInfomation -> Stripe Footer -> Stripe Index -> Stripe Row Group -> Column
参考