iceberg支持ORC和Parquat两种列存格式。相比于Parquat,ORC在查询性能以及ACID支持方面具有一定的优势。于是考虑到后续湖仓查询性能以及数据湖的ACID需求,现对ORC进行调研,以支持后续Flink+iceberg+orc的demo测试。

调研重点:ORC文件的编码以及文件组织方式,索引支持。

File Layout

一个ORC文件总体上可以分为三大部分:

  • Header:标识文件的类型
  • Body:包含Row Data和Indexes,如下图
  • Tail:包含文件级别的信息

ORC Specification v1

image.png

File Tail

由于现在的分布式存储一般只提供AppendOnly语义,ORC 文件在文件的尾部有一块区域,用于维护文件级别(Top Level)的信息。 source code

The sections of the file tail are (and their protobuf message type):

  • encrypted stripe statistics: list of ColumnarStripeStatistics
  • stripe statistics: Metadata
  • footer: Footer
  • postscript: PostScript
  • psLen: byte

postscript

不允许被压缩,包含了解析文件尾部所需要的信息,比如:Footer和Metadata区域的长度,压缩方法等

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
message PostScript {
 // the length of the footer section in bytes
 optional uint64 footerLength = 1;
 // the kind of generic compression used
 optional CompressionKind compression = 2;
 // the maximum size of each compression chunk
 optional uint64 compressionBlockSize = 3;
 // the version of the writer
 repeated uint32 version = 4 [packed = true];
 // the length of the metadata section in bytes
 optional uint64 metadataLength = 5;
 // the fixed string "ORC"
 optional string magic = 8000;
}

...

enum CompressionKind {
 NONE = 0;
 ZLIB = 1;
 SNAPPY = 2;
 LZO = 3;
 LZ4 = 4;
 ZSTD = 5;
}

Footer包含了文件的schema信息,文件的行数,列级别的统计信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
message Footer {
 // the length of the file header in bytes (always 3)
 optional uint64 headerLength = 1;
 // the length of the file header and body in bytes
 optional uint64 contentLength = 2;
 // the information about the stripes
 repeated StripeInformation stripes = 3;
 // the schema information
 repeated Type types = 4;
 // the user metadata that was added
 repeated UserMetadataItem metadata = 5;
 // the total number of rows in the file
 optional uint64 numberOfRows = 6;
 // the statistics of each column across the file
 repeated ColumnStatistics statistics = 7;
 // the maximum number of rows in each index entry
 optional uint32 rowIndexStride = 8;
 // Each implementation that writes ORC files should register for a code
 // 0 = ORC Java
 // 1 = ORC C++
 // 2 = Presto
 // 3 = Scritchley Go from https://github.com/scritchley/orc
 // 4 = Trino
 optional uint32 writer = 9;
 // information about the encryption in this file
 optional Encryption encryption = 10;
 // the number of bytes in the encrypted stripe statistics
 optional uint64 stripeStatisticsLength = 11;
}
  • Stripe Information Body中的数据会被分为多个Stripe,行数据不允许跨Stripe存储。Stripe包含:**indexes of rows、row data、stripe footer。**indexes和data在stripe中按列存储。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
message StripeInformation {
 // the start of the stripe within the file
 optional uint64 offset = 1;
 // the length of the indexes in bytes
 optional uint64 indexLength = 2;
 // the length of the data in bytes
 optional uint64 dataLength = 3;
 // the length of the footer in bytes
 optional uint64 footerLength = 4;
 // the number of rows in the stripe
 optional uint64 numberOfRows = 5;
 // If this is present, the reader should use this value for the encryption
 // stripe id for setting the encryption IV. Otherwise, the reader should
 // use one larger than the previous stripe's encryptStripeId.
 // For unmerged ORC files, the first stripe will use 1 and the rest of the
 // stripes won't have it set. For merged files, the stripe information
 // will be copied from their original files and thus the first stripe of
 // each of the input files will reset it to 1.
 // Note that 1 was choosen, because protobuf v3 doesn't serialize
 // primitive types that are the default (eg. 0).
 optional uint64 encryptStripeId = 6;
 // For each encryption variant, the new encrypted local key to use until we
 // find a replacement.
 repeated bytes encryptedLocalKeys = 7;
}
  • Type Infomation

ORC文件的type schema必须保持一致。嵌套数据类型使用树形结构组织。 Case in Hive DDL: image.png

1
2
3
4
5
6
7
create table Foobar (
 myInt int,
 myMap map<string,
 struct<myString : string,
 myDouble: double>>,
 myTime timestamp
);

支持的类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
message Type {
 enum Kind {
 BOOLEAN = 0;
 BYTE = 1;
 SHORT = 2;
 INT = 3;
 LONG = 4;
 FLOAT = 5;
 DOUBLE = 6;
 STRING = 7;
 BINARY = 8;
 TIMESTAMP = 9;
 LIST = 10;
 MAP = 11;
 STRUCT = 12;
 UNION = 13;
 DECIMAL = 14;
 DATE = 15;
 VARCHAR = 16;
 CHAR = 17;
 TIMESTAMP_INSTANT = 18;
 }
 // the kind of this type
 required Kind kind = 1;
 // the type ids of any subcolumns for list, map, struct, or union
 repeated uint32 subtypes = 2 [packed=true];
 // the list of field names for struct
 repeated string fieldNames = 3;
 // the maximum length of the type for varchar or char in UTF-8 characters
 optional uint32 maximumLength = 4;
 // the precision and scale for decimal
 optional uint32 precision = 5;
 optional uint32 scale = 6;
}
  • Column Statistics

有关列的一些简单的statistics,可用于粗粒度过滤以及cbo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
message ColumnStatistics {
 // the number of values
 optional uint64 numberOfValues = 1;
 // At most one of these has a value for any column
 optional IntegerStatistics intStatistics = 2;
 optional DoubleStatistics doubleStatistics = 3;
 optional StringStatistics stringStatistics = 4;
 optional BucketStatistics bucketStatistics = 5;
 optional DecimalStatistics decimalStatistics = 6;
 optional DateStatistics dateStatistics = 7;
 optional BinaryStatistics binaryStatistics = 8;
 optional TimestampStatistics timestampStatistics = 9;
 optional bool hasNull = 10;
}

...

message IntegerStatistics {
 optional sint64 minimum = 1;
 optional sint64 maximum = 2;
 optional sint64 sum = 3;
}
  • User Metadata:用户自定的KV信息
1
2
3
4
5
6
message UserMetadataItem {
 // the user defined key
 required string name = 1;
 // the user defined binary value
 required bytes value = 2;
}
  • File Metadata
1
2
3
4
5
6
7
8
9
message Metadata {
 repeated StripeStatistics stripeStats = 1;
}

...

message StripeStatistics {
 repeated ColumnStatistics colStats = 1;
}

Stripe

Body部分的数据可以被拆分成Stripes,Stripes are large (typically ~200MB)。包含三个部分:Index Data、Row Data、Stripe Footer

在Stripe中,列数据分别以一组Stream的形式相邻于文件,如:

  • integer column:由两种数据流构成,PRESENT(代表是否为空) + DATA(具体数值)
  • string column:PRESENT + DATA + LENGTH

包含列的编码信息,以及流的相关信息

1
2
3
4
5
6
7
8
9
message StripeFooter {
 // the location of each stream
 repeated Stream streams = 1;
 // the encoding of each column
 repeated ColumnEncoding columns = 2;
 optional string writerTimezone = 3;
 // one for each column encryption variant
 repeated StripeEncryptionVariant encryption = 4;
}

stream

表示文件中的一段二进制流,每一个stream的数据会根据该列的类型使用特定的压缩算法保存。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
message Stream {
 enum Kind {
   // boolean stream of whether the next value is non-null
   PRESENT = 0;
   // the primary data stream
   DATA = 1;
   // the length of each value for variable length data
   LENGTH = 2;
   // the dictionary blob
   DICTIONARY_DATA = 3;
   // deprecated prior to Hive 0.11
   // It was used to store the number of instances of each value in the
   // dictionary
   DICTIONARY_COUNT = 4;
   // a secondary data stream
   SECONDARY = 5;
   // the index for seeking to particular row groups
   ROW_INDEX = 6;
   // original bloom filters used before ORC-101
   BLOOM_FILTER = 7;
   // bloom filters that consistently use utf8
   BLOOM_FILTER_UTF8 = 8;

   // Virtual stream kinds to allocate space for encrypted index and data.
   ENCRYPTED_INDEX = 9;
   ENCRYPTED_DATA = 10;

   // stripe statistics streams
   STRIPE_STATISTICS = 100;
   // A virtual stream kind that is used for setting the encryption IV.
   FILE_STATISTICS = 101;
 }
 required Kind kind = 1;
 // the column id
 optional uint32 column = 2;
 // the number of bytes in the file
 optional uint64 length = 3;
}

message ColumnEncoding {
 enum Kind {
 // the encoding is mapped directly to the stream using RLE v1
 DIRECT = 0;
 // the encoding uses a dictionary of unique values using RLE v1
 DICTIONARY = 1;
 // the encoding is direct using RLE v2
 DIRECT_V2 = 2;
 // the encoding is dictionary-based using RLE v2
 DICTIONARY_V2 = 3;
 }
 required Kind kind = 1;
 // for dictionary encodings, record the size of the dictionary
 optional uint32 dictionarySize = 2;
}

索引支持

ORC支持三种级别的索引:

level 位于ORC的哪块 数据内容
file level file footer 整个文件的列级别的统计
stripe level file footer 每个stripe的列级别的统计
row level stripe的最开始部分 每个RowGroup的列级别的统计和每个RowGroup的起始位置

Row Level

行级的索引主要有Row Group Index 和 Bloom Fliter Index

Row Group Index

索引由多个原始类型列的ROW_INDEX stream组成,每个Row Group拥有一个RowIndexEntry。

默认10000行的数据组成一个RowGroup

1
2
3
4
5
6
7
8
message RowIndex {
 repeated RowIndexEntry entry = 1;
}

message RowIndexEntry {
 repeated uint64 positions = 1 [packed=true];
 optional ColumnStatistics statistics = 2;
}
Bloom Fliter Index

由多个列的BLOOM_FILTER stream组成。每个列拥有一个BloomFilter

1
2
3
4
5
6
7
8
9
message BloomFilter {
 optional uint32 numHashFunctions = 1;
 repeated fixed64 bitset = 2;
 optional bytes utf8bitset = 3;
}

message BloomFilterIndex {
 repeated BloomFilter bloomFilter = 1;
}

Bloom filter streams are interlaced with row group indexes. image.png

数据访问路径

Postscript -> Footer -> 获取对于的StripeInfomation -> Stripe Footer -> Stripe Index -> Stripe Row Group -> Column

参考