Iceberg社区提供了Flink Connector的官方实现,本章源码阅读正是基于此。

写入提交流程总览

Flink 通过 RowData -> distributeStream -> WriterStream -> CommitterStream,在写入数据提交之前,数据以中间文件形式存在,提交之后对系统可见(写入manifest + snapshot + metadata,数据文件对外部可见)

image.png

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
   private <T> DataStreamSink<T> chainIcebergOperators() {
      Preconditions.checkArgument(inputCreator != null,
          "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
      Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");

      DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);

      if (table == null) {
        tableLoader.open();
        try (TableLoader loader = tableLoader) {
          this.table = loader.loadTable();
        } catch (IOException e) {
          throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
        }
      }

      // Find out the equality field id list based on the user-provided equality field column names.
      List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();

      // Convert the requested flink table schema to flink row type.
      RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);

      // Distribute the records from input data stream based on the write.distribution-mode and equality fields.
      DataStream<RowData> distributeStream = distributeDataStream(
          rowDataInput, table.properties(), equalityFieldIds, table.spec(), table.schema(), flinkRowType);

      // Add parallel writers that append rows to files
      SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType,
          equalityFieldIds);

      // Add single-parallelism committer that commits files
      // after successful checkpoint or end of input
      SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);

      // Add dummy discard sink
      return appendDummySink(committerStream);
    }

写入流程源码分析

WriteStream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
   private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType,
                                                                 List<Integer> equalityFieldIds) {

      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
          UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);

      // Validate the equality fields and partition fields if we enable the upsert mode.
      if (upsertMode) {
        Preconditions.checkState(!overwrite,
            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
        Preconditions.checkState(!equalityFieldIds.isEmpty(),
            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
        if (!table.spec().isUnpartitioned()) {
          for (PartitionField partitionField : table.spec().fields()) {
            Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
                "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
                partitionField, equalityFieldColumns);
          }
        }
      }

      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);

      int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
      SingleOutputStreamOperator<WriteResult> writerStream = input
          .transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter)
          .setParallelism(parallelism);
      if (uidPrefix != null) {
        writerStream = writerStream.uid(uidPrefix + "-writer");
      }
      return writerStream;
    }

WriterStream流算子由distriteStream转换而来,以RowData作为输入,WriteResult作为输出,其中转换的逻辑封装在IcebergStreamWriter对象中,处理逻辑见processElement。

1
2
3
4
5
    private transient TaskWriter<T> writer;
    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
      writer.write(element.getValue());
    }

IcebergStreamWriter将数据的写入委托给TaskWriterFactory(RowDataTaskWriterFactory)创建的TaskWriter(如PartitionedDeltaWriter,UnpartitionedWriter)执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  public TaskWriter<RowData> create() {
    Preconditions.checkNotNull(outputFileFactory,
        "The outputFileFactory shouldn't be null if we have invoked the initialize().");

    if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
      // Initialize a task writer to write INSERT only.
      if (spec.isUnpartitioned()) {
        return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
      } else {
        return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
            io, targetFileSizeBytes, schema, flinkSchema);
      }
    } else {
      // Initialize a task writer to write both INSERT and equality DELETE.
      if (spec.isUnpartitioned()) {
        return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
            targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
      } else {
        return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
            targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
      }
    }
  }TaskWriterFactory

TaskWriter又将写入交给RollingWriter(BaseRollingWriter)执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  public void write(T row) throws IOException {
    PartitionKey partitionKey = partition(row);

    RollingFileWriter writer = writers.get(partitionKey);
    if (writer == null) {
      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
      PartitionKey copiedKey = partitionKey.copy();
      writer = new RollingFileWriter(copiedKey);
      writers.put(copiedKey, writer);
    }

    writer.write(row);
  }

RollingWriter会持有一个OutputFileFactory生成的OutputFile作为写入文件,并根据一些策略自动切换的新的OutputFile文件.

1
2
3
4
5
6
7
8
9
 public void write(T record) throws IOException {
      write(currentWriter, record);
      this.currentRows++;

      if (shouldRollToNewFile()) {
        closeCurrent();
        openCurrent();
      }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    private void openCurrent() {
      if (partitionKey == null) {
        // unpartitioned
        this.currentFile = fileFactory.newOutputFile();
      } else {
        // partitioned
        this.currentFile = fileFactory.newOutputFile(partitionKey);
      }
      this.currentWriter = newWriter(currentFile, partitionKey);
      this.currentRows = 0;
    }

OutputFile的命名规则是

LocationProvider.newLocation(partitionId+taskId+operationId+fileCountID+ext)

RollingWriter会把写入交个AppenderFactory构造的DataWriter,对于Flink,默认的DataWriter是ParquetDataWriter。ParquetDataWriter根据不同的数据类型委托给不同的ParquertValueWriter

1
2
3
   DataWriter<T> newWriter(EncryptedOutputFile file, StructLike partitionKey) {
      return appenderFactory.newDataWriter(file, format, partitionKey);
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public DataWriter<RowData> newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new DataWriter<>(
    newAppender(file.encryptingOutputFile(), format), format,
    file.encryptingOutputFile().location(), spec, partition, file.keyMetadata());
}

@Override
  public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
    MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
    try {
      switch (format) {
        case AVRO:
          return Avro.writer...
        case ORC:
          return ORC.writer...
        case PARQUET:
          return Parquet.writer...
        default:
          throw new UnsupportedOperationException("Cannot write unknown file format: " + format);
      }
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }

CommitterStream

CommitterStream 接受 WriteResult 作为输入,无结果输出。WriteResult包含了WriteStream中产生的数据文件。

1
2
3
4
5
6
public class WriteResult implements Serializable {
  private DataFile[] dataFiles;
  private DeleteFile[] deleteFiles;
  private CharSequence[] referencedDataFiles;
    ...
}

处理数据文件提交的核心逻辑封装在IcebergFilesCommitter.IcebergFlieCommitter用一个List(writeResultsOfCurrentCkpt)维护正在处理的Ckpt的需要提交的文件集合,用一个SortedMap(dataFilesPerCheckpoint)维护了系统当前Snapshot中所有Ckpt中提交的文件信息,用List of SortedMap 维护了系统所有的Snapshot的文件提交信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class IcebergFilesCommitter extends AbstractStreamOperator<Void>
    implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
    ...
    
        // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
      // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
      // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
      // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
      // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
      // iceberg table when the next checkpoint happen.
      private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();

      // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
      // 'dataFilesPerCheckpoint'.
      private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();

      private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
    ...
    }
1
2
3
4
 @Override
  public void processElement(StreamRecord<WriteResult> element) {
    this.writeResultsOfCurrentCkpt.add(element.getValue());
  }

上游的当前checkpoint的WriteResult一个writeResultsOfCurrentCkpt中。在SnapshotState的时候writeResultsOfCurrentCkpt -> dataFilesPerCheckpoint -> checkpointsState提交。之后通过notifyCheckpointComplete通知Ckpt完成,并将其提交。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void snapshotState(StateSnapshotContext context) throws Exception {
    super.snapshotState(context);
    long checkpointId = context.getCheckpointId();
    LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);

    // Update the checkpoint state.
    dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
    // Reset the snapshot state to the latest state.
    checkpointsState.clear();
    checkpointsState.add(dataFilesPerCheckpoint);

    jobIdState.clear();
    jobIdState.add(flinkJobId);

    // Clear the local buffer for current checkpoint.
    writeResultsOfCurrentCkpt.clear();
  }

 @Override
  public void notifyCheckpointComplete(long checkpointId) throws Exception {
    super.notifyCheckpointComplete(checkpointId);
    // It's possible that we have the following events:
    //   1. snapshotState(ckpId);
    //   2. snapshotState(ckpId+1);
    //   3. notifyCheckpointComplete(ckpId+1);
    //   4. notifyCheckpointComplete(ckpId);
    // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
    // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
    if (checkpointId > maxCommittedCheckpointId) {
      commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
      this.maxCommittedCheckpointId = checkpointId;
    }
  }

提交逻辑在commitUpToCheckpoint函数中。Iceberg抽象除了SnapshotUpdate接口处理Snapshot的变更。Iceberg使用乐观策略处理提交时的Snapshot并发冲突。当发生并发提交冲突时,会重试commit操作。每次commit都会基于当前最新的metadata产生一个新的Snapshot 对象,将其添加到新的Metadata中,最后尝试通过TableOperation.commit进行提交。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
  public void commit() {
    // this is always set to the latest commit attempt's snapshot id.
    AtomicLong newSnapshotId = new AtomicLong(-1L);
    try {
      Tasks.foreach(ops)
          .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
          .exponentialBackoff(
              base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
              base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
              base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
              2.0 /* exponential */)
          .onlyRetryOn(CommitFailedException.class)
          .run(taskOps -> {
            Snapshot newSnapshot = apply(); // generate a new snapshot based on new added data file and manifest,manifest list
            newSnapshotId.set(newSnapshot.snapshotId());
            TableMetadata.Builder update = TableMetadata.buildFrom(base);
            if (base.snapshot(newSnapshot.snapshotId()) != null) {
              // this is a rollback operation
              update.setBranchSnapshot(newSnapshot.snapshotId(), SnapshotRef.MAIN_BRANCH);
            } else if (stageOnly) {
              update.addSnapshot(newSnapshot);
            } else {
              update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH);
            }

            TableMetadata updated = update.build();
            if (updated.changes().isEmpty()) {
              // do not commit if the metadata has not changed. for example, this may happen when setting the current
              // snapshot to an ID that is already current. note that this check uses identity.
              return;
            }

            // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
            // to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
            taskOps.commit(base, updated.withUUID()); // commit table metadata, may cause commit failure
          });

    } catch (CommitStateUnknownException commitStateUnknownException) {
      throw commitStateUnknownException;
    } catch (RuntimeException e) {
      Exceptions.suppressAndThrow(e, this::cleanAll);
    }

    try {
      LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());

      // at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
      // id in case another commit was added between this commit and the refresh.
      Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
      if (saved != null) {
        cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
        // also clean up unused manifest lists created by multiple attempts
        for (String manifestList : manifestLists) {
          if (!saved.manifestListLocation().equals(manifestList)) {
            deleteFile(manifestList);
          }
        }
      } else {
        // saved may not be present if the latest metadata couldn't be loaded due to eventual
        // consistency problems in refresh. in that case, don't clean up.
        LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
      }

    } catch (Throwable e) {
      LOG.warn("Failed to load committed table metadata or during cleanup, skipping further cleanup", e);
    }

    try {
      notifyListeners();
    } catch (Throwable e) {
      LOG.warn("Failed to notify event listeners", e);
    }
  }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
                                    String newFlinkJobId,
                                    long checkpointId) throws IOException {
    NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
    List<ManifestFile> manifests = Lists.newArrayList();
    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
      if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
        // Skip the empty flink manifest.
        continue;
      }

      DeltaManifests deltaManifests = SimpleVersionedSerialization
          .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
      pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
      manifests.addAll(deltaManifests.manifests());
    }

    int totalFiles = pendingResults.values().stream()
        .mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
    continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
    if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
      if (replacePartitions) {
        replacePartitions(pendingResults, newFlinkJobId, checkpointId);
      } else {
        commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
      }
      continuousEmptyCheckpoints = 0;
    }
    pendingMap.clear();

    // Delete the committed manifests.
    for (ManifestFile manifest : manifests) {
      try {
        table.io().deleteFile(manifest.path());
      } catch (Exception e) {
        // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
        String details = MoreObjects.toStringHelper(this)
            .add("flinkJobId", newFlinkJobId)
            .add("checkpointId", checkpointId)
            .add("manifestPath", manifest.path())
            .toString();
        LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
            details, e);
      }
    }
  }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void commit(TableMetadata base, TableMetadata metadata) {
    // if the metadata is already out of date, reject it
    if (base != current()) {
      if (base != null) {
        throw new CommitFailedException("Cannot commit: stale table metadata");
      } else {
        // when current is non-null, the table exists. but when base is null, the commit is trying to create the table
        throw new AlreadyExistsException("Table already exists: %s", tableName());
      }
    }
    // if the metadata is not changed, return early
    if (base == metadata) {
      LOG.info("Nothing to commit.");
      return;
    }

    long start = System.currentTimeMillis();
    doCommit(base, metadata);
    deleteRemovedMetadataFiles(base, metadata);
    requestRefresh();

    LOG.info("Successfully committed to table {} in {} ms",
        tableName(),
        System.currentTimeMillis() - start);
  }

写入问题

就以上写入以及提交流程,最大的问题有两个:

  1. Lots of Small File

对于流式写入,每次都会生成新的文件。这就会产生大量的小文件。小文件在对象存储其实支持比较好。但是大量的小文件可能会带来Iceberg 元数据读取和更新的额外开销,因为这些数据文件的信息都需要靠iceberg元数据文件中维护,大量的小文件可能导致元数据文件的膨胀,导致读取元数据文件时的处理时间增加。如何解决?

  • Iceberg Rewrite Action:支持Rewrite Data 和 Metadata 需要Flink / Spark Action 触发
  • snapshot 过期删除, 支持配置
1
2
3
4
5
6
7
import org.apache.iceberg.flink.actions.Actions;

TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
        .rewriteDataFiles()
        .execute();

https://iceberg.apache.org/docs/latest/flink/https://iceberg.apache.org/docs/latest/maintenance/

  1. 高并发写入时的性能问题

Iceberg的写入会导致产生新的snapshot,并使用乐观并法策略处理并法冲突,冲突的提交将被重试。高并发写入场景下,并发冲突会导致很多提交snapshot的请求被重试,会有性能损失。

在准备提交snapshot前,iceberg会读取以前的快照然后创建新的快照,这个过程十分耗时,并且耗时会随着元数据增加而增加。

如何解决?

  • Batch Commit : 引入缓存层或者一个额外的服务,批量提交到数据湖,降低提交操作并发度。同时缓存层也许可以对多次的数据文件进行compact

https://zhuanlan.zhihu.com/p/472617094https://www.infoq.cn/article/hfft7c7ahoomgayjsouz

  1. Flink Iceberg Connector不支持隐藏分区,也不支持分区字段预处理。