Flink-Iceberg-Connector Write Process

October 10, 2022 · 1056 words · 5 min · Big Data Lake House Stream Compute Storage

The Iceberg community provides an official Flink Connector, and this chapter’s source code analysis is based on that.

Overview of the Write Submission Process

Flink writes data through RowData -> distributeStream -> WriterStream -> CommitterStream. Before data is committed, it is stored as intermediate files, which become visible to the system after being committed (through writing manifest, snapshot, and metadata files).

Flink-Iceberg Write Flow

private <T> DataStreamSink<T> chainIcebergOperators() {
    Preconditions.checkArgument(inputCreator != null,
        "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
    Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");

    DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);

    if (table == null) {
        tableLoader.open();
        try (TableLoader loader = tableLoader) {
            this.table = loader.loadTable();
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
        }
    }

    List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();

    RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);

    DataStream<RowData> distributeStream = distributeDataStream(
        rowDataInput, table.properties(), equalityFieldIds, table.spec(), table.schema(), flinkRowType);

    SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType,
        equalityFieldIds);

    SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);

    return appendDummySink(committerStream);
}

Write Process Source Code Analysis

WriteStream

private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType,
                                                             List<Integer> equalityFieldIds) {
    boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
        UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);

    if (upsertMode) {
        Preconditions.checkState(!overwrite,
            "OVERWRITE mode shouldn't be enabled when configuring to use UPSERT data stream.");
        Preconditions.checkState(!equalityFieldIds.isEmpty(),
            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
        if (!table.spec().isUnpartitioned()) {
            for (PartitionField partitionField : table.spec().fields()) {
                Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
                    "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
                    partitionField, equalityFieldColumns);
            }
        }
    }

    IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);

    int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
    SingleOutputStreamOperator<WriteResult> writerStream = input
        .transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter)
        .setParallelism(parallelism);
    if (uidPrefix != null) {
        writerStream = writerStream.uid(uidPrefix + "-writer");
    }
    return writerStream;
}

The WriterStream operator is transformed from the distributeStream, with RowData as input and WriteResult as output. The transformation logic is encapsulated in the IcebergStreamWriter, which processes each element using processElement:

private transient TaskWriter<T> writer;
@Override
public void processElement(StreamRecord<T> element) throws Exception {
    writer.write(element.getValue());
}

IcebergStreamWriter delegates the writing to a TaskWriter created by TaskWriterFactory. The specific type could be PartitionedDeltaWriter or UnpartitionedWriter:

public TaskWriter<RowData> create() {
    Preconditions.checkNotNull(outputFileFactory,
        "The outputFileFactory shouldn't be null if we have invoked the initialize().");

    if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
        if (spec.isUnpartitioned()) {
            return new UnpartitionedWriter<>(spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
        } else {
            return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
                io, targetFileSizeBytes, schema, flinkSchema);
        }
    } else {
        if (spec.isUnpartitioned()) {
            return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
                targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
        } else {
            return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
                targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
        }
    }
}

CommitterStream

The CommitterStream receives WriteResult as input with no output. WriteResult contains the data files produced by WriteStream:

public class WriteResult implements Serializable {
  private DataFile[] dataFiles;
  private DeleteFile[] deleteFiles;
  private CharSequence[] referencedDataFiles;
  ...
}

The core logic for processing data file submissions is encapsulated in IcebergFilesCommitter. The IcebergFilesCommitter maintains a list of files that need to be committed for each checkpoint. Once a checkpoint completes, it tries to commit those files to Iceberg.

class IcebergFilesCommitter extends AbstractStreamOperator<Void>
    implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
    ...

    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
    ...
}

The processElement method stores WriteResult from upstream in writeResultsOfCurrentCkpt:

@Override
public void processElement(StreamRecord<WriteResult> element) {
    this.writeResultsOfCurrentCkpt.add(element.getValue());
}

During checkpointing (snapshotState), it saves the current checkpoint’s data in dataFilesPerCheckpoint. Later, once the checkpoint is completed (notifyCheckpointComplete), it commits the files:

public void snapshotState(StateSnapshotContext context) throws Exception {
    long checkpointId = context.getCheckpointId();
    LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);

    dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
    checkpointsState.clear();
    checkpointsState.add(dataFilesPerCheckpoint);

    jobIdState.clear();
    jobIdState.add(flinkJobId);

    writeResultsOfCurrentCkpt.clear();
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
    if (checkpointId > maxCommittedCheckpointId) {
        commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
        this.maxCommittedCheckpointId = checkpointId;
    }
}

The commit logic is handled by commitUpToCheckpoint, which generates a new snapshot and adds it to Iceberg’s metadata:

private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
                                  String newFlinkJobId,
                                  long checkpointId) throws IOException {
    NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
    List<ManifestFile> manifests = Lists.newArrayList();
    NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
    for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
        if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
            continue;
        }

        DeltaManifests deltaManifests = SimpleVersionedSerialization
            .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
        pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
        manifests.addAll(deltaManifests.manifests());
    }

    int totalFiles = pendingResults.values().stream()
        .mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
    continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
    if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
        if (replacePartitions) {
            replacePartitions(pendingResults, newFlinkJobId, checkpointId);
        } else {
            commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
        }
        continuousEmptyCheckpoints = 0;
    }
    pendingMap.clear();

    for (ManifestFile manifest : manifests) {
        try {
            table.io().deleteFile(manifest.path());
        } catch (Exception e) {
            LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
                manifest.path(), e);
        }
    }
}
public void commit(TableMetadata base, TableMetadata metadata) {
    if (base != current()) {
        if (base != null) {
            throw new CommitFailedException("Cannot commit: stale table metadata");
        } else {
            throw new AlreadyExistsException("Table already exists: %s", tableName());
        }
    }
    if (base == metadata) {
        LOG.info("Nothing to commit.");
        return;
    }

    long start = System.currentTimeMillis();
    doCommit(base, metadata);
    deleteRemovedMetadataFiles(base, metadata);
    requestRefresh();

    LOG.info("Successfully committed to table {} in {} ms",
        tableName(),
        System.currentTimeMillis() - start);
}

Write Issues

1. Lots of Small Files

For streaming writes, new files are generated each time, resulting in a lot of small files. While object storage supports small files well, it may increase Iceberg metadata overhead, as metadata files need to keep track of each data file. This can cause metadata files to become large and impact performance.

Solution:

  • Iceberg Rewrite Action: Iceberg supports rewriting data and metadata files via Flink or Spark actions, which need to be triggered separately.
  • Snapshot Expiry: Configure snapshot expiration to periodically delete old snapshots.
import org.apache.iceberg.flink.actions.Actions;

TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
    .rewriteDataFiles()
    .execute();

Iceberg Flink Documentation Iceberg Maintenance Documentation

2. Performance Issues with High Concurrency

Iceberg’s writing process creates a new snapshot for each commit and uses optimistic concurrency control to handle conflicts. In high-concurrency scenarios, this can lead to many commits being retried, impacting performance.

Solution:

  • Batch Commit: Introduce a caching layer or additional service to batch commits to the data lake, reducing the number of concurrent commit operations. This cache layer can also compact multiple data files before committing.

References: Optimizing Iceberg Writes for High Concurrency InfoQ Article on Iceberg Optimization

The Flink Iceberg Connector does not support hidden partitions or preprocessing of partition fields.