Iceberg社区提供了Flink Connector的官方实现,本章源码阅读正是基于此。
写入提交流程总览
Flink 通过 RowData -> distributeStream -> WriterStream -> CommitterStream
,在写入数据提交之前,数据以中间文件形式存在,提交之后对系统可见(写入manifest + snapshot + metadata,数据文件对外部可见)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
private <T> DataStreamSink<T> chainIcebergOperators() {
Preconditions.checkArgument(inputCreator != null,
"Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
if (table == null) {
tableLoader.open();
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
}
}
// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
// Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
// Distribute the records from input data stream based on the write.distribution-mode and equality fields.
DataStream<RowData> distributeStream = distributeDataStream(
rowDataInput, table.properties(), equalityFieldIds, table.spec(), table.schema(), flinkRowType);
// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType,
equalityFieldIds);
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
// Add dummy discard sink
return appendDummySink(committerStream);
}
|
写入流程源码分析
WriteStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType,
List<Integer> equalityFieldIds) {
// Fallback to use upsert mode parsed from table properties if don't specify in job level.
boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
// Validate the equality fields and partition fields if we enable the upsert mode.
if (upsertMode) {
Preconditions.checkState(!overwrite,
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
Preconditions.checkState(!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField : table.spec().fields()) {
Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
"In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
partitionField, equalityFieldColumns);
}
}
}
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
.transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter)
.setParallelism(parallelism);
if (uidPrefix != null) {
writerStream = writerStream.uid(uidPrefix + "-writer");
}
return writerStream;
}
|
WriterStream流算子由distriteStream转换而来,以RowData作为输入,WriteResult作为输出,其中转换的逻辑封装在IcebergStreamWriter对象中,处理逻辑见processElement。
1
2
3
4
5
|
private transient TaskWriter<T> writer;
@Override
public void processElement(StreamRecord<T> element) throws Exception {
writer.write(element.getValue());
}
|
IcebergStreamWriter将数据的写入委托给TaskWriterFactory(RowDataTaskWriterFactory)创建的TaskWriter(如PartitionedDeltaWriter,UnpartitionedWriter)执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public TaskWriter<RowData> create() {
Preconditions.checkNotNull(outputFileFactory,
"The outputFileFactory shouldn't be null if we have invoked the initialize().");
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
// Initialize a task writer to write INSERT only.
if (spec.isUnpartitioned()) {
return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
} else {
return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
io, targetFileSizeBytes, schema, flinkSchema);
}
} else {
// Initialize a task writer to write both INSERT and equality DELETE.
if (spec.isUnpartitioned()) {
return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
} else {
return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
}
}
}TaskWriterFactory
|
TaskWriter又将写入交给RollingWriter(BaseRollingWriter)执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public void write(T row) throws IOException {
PartitionKey partitionKey = partition(row);
RollingFileWriter writer = writers.get(partitionKey);
if (writer == null) {
// NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
PartitionKey copiedKey = partitionKey.copy();
writer = new RollingFileWriter(copiedKey);
writers.put(copiedKey, writer);
}
writer.write(row);
}
|
RollingWriter会持有一个OutputFileFactory生成的OutputFile作为写入文件,并根据一些策略自动切换的新的OutputFile文件.
1
2
3
4
5
6
7
8
9
|
public void write(T record) throws IOException {
write(currentWriter, record);
this.currentRows++;
if (shouldRollToNewFile()) {
closeCurrent();
openCurrent();
}
}
|
1
2
3
4
5
6
7
8
9
10
11
|
private void openCurrent() {
if (partitionKey == null) {
// unpartitioned
this.currentFile = fileFactory.newOutputFile();
} else {
// partitioned
this.currentFile = fileFactory.newOutputFile(partitionKey);
}
this.currentWriter = newWriter(currentFile, partitionKey);
this.currentRows = 0;
}
|
OutputFile的命名规则是
LocationProvider.newLocation(partitionId+taskId+operationId+fileCountID+ext)
RollingWriter会把写入交个AppenderFactory构造的DataWriter,对于Flink,默认的DataWriter是ParquetDataWriter。ParquetDataWriter根据不同的数据类型委托给不同的ParquertValueWriter
1
2
3
|
DataWriter<T> newWriter(EncryptedOutputFile file, StructLike partitionKey) {
return appenderFactory.newDataWriter(file, format, partitionKey);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Override
public DataWriter<RowData> newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new DataWriter<>(
newAppender(file.encryptingOutputFile(), format), format,
file.encryptingOutputFile().location(), spec, partition, file.keyMetadata());
}
@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
try {
switch (format) {
case AVRO:
return Avro.writer...
case ORC:
return ORC.writer...
case PARQUET:
return Parquet.writer...
default:
throw new UnsupportedOperationException("Cannot write unknown file format: " + format);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
|
CommitterStream
CommitterStream 接受 WriteResult 作为输入,无结果输出。WriteResult包含了WriteStream中产生的数据文件。
1
2
3
4
5
6
|
public class WriteResult implements Serializable {
private DataFile[] dataFiles;
private DeleteFile[] deleteFiles;
private CharSequence[] referencedDataFiles;
...
}
|
处理数据文件提交的核心逻辑封装在IcebergFilesCommitter.IcebergFlieCommitter用一个List(writeResultsOfCurrentCkpt)维护正在处理的Ckpt的需要提交的文件集合,用一个SortedMap(dataFilesPerCheckpoint)维护了系统当前Snapshot中所有Ckpt中提交的文件信息,用List of SortedMap 维护了系统所有的Snapshot的文件提交信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
class IcebergFilesCommitter extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
...
// A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
// to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
// example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
// <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
// any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
// iceberg table when the next checkpoint happen.
private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
// The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
// 'dataFilesPerCheckpoint'.
private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
...
}
|
1
2
3
4
|
@Override
public void processElement(StreamRecord<WriteResult> element) {
this.writeResultsOfCurrentCkpt.add(element.getValue());
}
|
上游的当前checkpoint的WriteResult一个writeResultsOfCurrentCkpt中。在SnapshotState的时候writeResultsOfCurrentCkpt -> dataFilesPerCheckpoint -> checkpointsState提交。之后通过notifyCheckpointComplete通知Ckpt完成,并将其提交。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
long checkpointId = context.getCheckpointId();
LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
// Update the checkpoint state.
dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
// Reset the snapshot state to the latest state.
checkpointsState.clear();
checkpointsState.add(dataFilesPerCheckpoint);
jobIdState.clear();
jobIdState.add(flinkJobId);
// Clear the local buffer for current checkpoint.
writeResultsOfCurrentCkpt.clear();
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
// It's possible that we have the following events:
// 1. snapshotState(ckpId);
// 2. snapshotState(ckpId+1);
// 3. notifyCheckpointComplete(ckpId+1);
// 4. notifyCheckpointComplete(ckpId);
// For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
}
}
|
提交逻辑在commitUpToCheckpoint函数中。Iceberg抽象除了SnapshotUpdate接口处理Snapshot的变更。Iceberg使用乐观策略处理提交时的Snapshot并发冲突。当发生并发提交冲突时,会重试commit操作。每次commit都会基于当前最新的metadata产生一个新的Snapshot 对象,将其添加到新的Metadata中,最后尝试通过TableOperation.commit进行提交。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
public void commit() {
// this is always set to the latest commit attempt's snapshot id.
AtomicLong newSnapshotId = new AtomicLong(-1L);
try {
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(taskOps -> {
Snapshot newSnapshot = apply(); // generate a new snapshot based on new added data file and manifest,manifest list
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH);
} else if (stageOnly) {
update.addSnapshot(newSnapshot);
} else {
update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH);
}
TableMetadata updated = update.build();
if (updated.changes().isEmpty()) {
// do not commit if the metadata has not changed. for example, this may happen when setting the current
// snapshot to an ID that is already current. note that this check uses identity.
return;
}
// if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
// to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
taskOps.commit(base, updated.withUUID()); // commit table metadata, may cause commit failure
});
} catch (CommitStateUnknownException commitStateUnknownException) {
throw commitStateUnknownException;
} catch (RuntimeException e) {
Exceptions.suppressAndThrow(e, this::cleanAll);
}
try {
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());
// at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
// id in case another commit was added between this commit and the refresh.
Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
if (saved != null) {
cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!saved.manifestListLocation().equals(manifestList)) {
deleteFile(manifestList);
}
}
} else {
// saved may not be present if the latest metadata couldn't be loaded due to eventual
// consistency problems in refresh. in that case, don't clean up.
LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
}
} catch (Throwable e) {
LOG.warn("Failed to load committed table metadata or during cleanup, skipping further cleanup", e);
}
try {
notifyListeners();
} catch (Throwable e) {
LOG.warn("Failed to notify event listeners", e);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
String newFlinkJobId,
long checkpointId) throws IOException {
NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
List<ManifestFile> manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
// Skip the empty flink manifest.
continue;
}
DeltaManifests deltaManifests = SimpleVersionedSerialization
.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
manifests.addAll(deltaManifests.manifests());
}
int totalFiles = pendingResults.values().stream()
.mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
if (replacePartitions) {
replacePartitions(pendingResults, newFlinkJobId, checkpointId);
} else {
commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
}
continuousEmptyCheckpoints = 0;
}
pendingMap.clear();
// Delete the committed manifests.
for (ManifestFile manifest : manifests) {
try {
table.io().deleteFile(manifest.path());
} catch (Exception e) {
// The flink manifests cleaning failure shouldn't abort the completed checkpoint.
String details = MoreObjects.toStringHelper(this)
.add("flinkJobId", newFlinkJobId)
.add("checkpointId", checkpointId)
.add("manifestPath", manifest.path())
.toString();
LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
details, e);
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public void commit(TableMetadata base, TableMetadata metadata) {
// if the metadata is already out of date, reject it
if (base != current()) {
if (base != null) {
throw new CommitFailedException("Cannot commit: stale table metadata");
} else {
// when current is non-null, the table exists. but when base is null, the commit is trying to create the table
throw new AlreadyExistsException("Table already exists: %s", tableName());
}
}
// if the metadata is not changed, return early
if (base == metadata) {
LOG.info("Nothing to commit.");
return;
}
long start = System.currentTimeMillis();
doCommit(base, metadata);
deleteRemovedMetadataFiles(base, metadata);
requestRefresh();
LOG.info("Successfully committed to table {} in {} ms",
tableName(),
System.currentTimeMillis() - start);
}
|
写入问题
就以上写入以及提交流程,最大的问题有两个:
- Lots of Small File
对于流式写入,每次都会生成新的文件。这就会产生大量的小文件。小文件在对象存储其实支持比较好。但是大量的小文件可能会带来Iceberg 元数据读取和更新的额外开销,因为这些数据文件的信息都需要靠iceberg元数据文件中维护,大量的小文件可能导致元数据文件的膨胀,导致读取元数据文件时的处理时间增加。如何解决?
- Iceberg Rewrite Action:支持Rewrite Data 和 Metadata 需要Flink / Spark Action 触发
- snapshot 过期删除, 支持配置
1
2
3
4
5
6
7
|
import org.apache.iceberg.flink.actions.Actions;
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
.execute();
|
https://iceberg.apache.org/docs/latest/flink/https://iceberg.apache.org/docs/latest/maintenance/
- 高并发写入时的性能问题
Iceberg的写入会导致产生新的snapshot,并使用乐观并法策略处理并法冲突,冲突的提交将被重试。高并发写入场景下,并发冲突会导致很多提交snapshot的请求被重试,会有性能损失。
在准备提交snapshot前,iceberg会读取以前的快照然后创建新的快照,这个过程十分耗时,并且耗时会随着元数据增加而增加。
如何解决?
- Batch Commit : 引入缓存层或者一个额外的服务,批量提交到数据湖,降低提交操作并发度。同时缓存层也许可以对多次的数据文件进行compact
https://zhuanlan.zhihu.com/p/472617094https://www.infoq.cn/article/hfft7c7ahoomgayjsouz
- Flink Iceberg Connector不支持隐藏分区,也不支持分区字段预处理。