Skip to content

timely-util: add ConsolidatingColumnBuilder#36340

Open
antiguru wants to merge 3 commits intoMaterializeInc:mainfrom
antiguru:columnar-consolidating-builder
Open

timely-util: add ConsolidatingColumnBuilder#36340
antiguru wants to merge 3 commits intoMaterializeInc:mainfrom
antiguru:columnar-consolidating-builder

Conversation

@antiguru
Copy link
Copy Markdown
Member

Motivation

Infrastructure for the columnar rendering migration. Today, render operators that produce updates use differential_dataflow::consolidation::ConsolidatingContainerBuilder<Vec<(D, T, R)>>, which emits Vec<(D, T, R)> containers. Downstream operators want Column<(D, T, R)>, so we currently re-convert around every operator. This PR adds a builder that consolidates and emits columnar containers directly. No call sites change here.

Description

ConsolidatingColumnBuilder<D, T, R> lives in src/timely-util/src/columnar/consolidate.rs. It vendors DD's staging-buffer logic — lazy Vec reserve to 2 * default_capacity::<(D, T, R)>(), consolidate_updates when full, drain the largest multiple-of-preferred_capacity prefix — and pushes the survivors into an inner ColumnBuilder<(D, T, R)>. The inner builder handles ~2MB-aligned chunking, so we don't manage that here.

extract delegates to the inner builder (does not flush partial state, matching DD). finish drains everything that survives consolidation, then delegates to the inner finish.

Not length-preserving — consolidation may drop tuples with diff that sums to zero — so no LengthPreservingContainerBuilder impl. Reusing drained current capacity is left for a later pass; behavior matches DD first.

Verification

Five new unit tests in the module: empty finish, single-push round-trip, threshold-triggered consolidation (all +1/-1 cancel), multi-key consolidation, and forced emission of multiple inner containers. cargo test -p mz-timely-util passes.

Vendor differential-dataflow's `ConsolidatingContainerBuilder` staging
logic into `mz-timely-util` and have it emit columnar containers via the
existing `ColumnBuilder`. Updates are buffered in a `Vec`, run through
`consolidate_updates`, and then drained into the inner column builder
which mints aligned `Column<(D, T, R)>` containers.

This is infrastructure for the columnar rendering migration; no call
sites change in this commit. Includes unit tests covering empty/finish,
single-push, threshold consolidation, multi-key consolidation, and
emission of multiple containers.
@antiguru antiguru requested a review from a team as a code owner April 30, 2026 16:01
Comment on lines +40 to +44
where
D: Data,
T: Data,
R: Semigroup + 'static,
(D, T, R): Columnar,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the type annotations here? Reduce to a minimum if possible.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — struct bounds reduced to just (D, T, R): Columnar (the only one any field actually requires). DD-style bounds (D: Data, T: Data, R: Semigroup + 'static) are kept on the impls that call consolidate_updates/column.push_into, where they're real.

Comment on lines +54 to +57
D: Data,
T: Data,
R: Semigroup + 'static,
(D, T, R): Columnar,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, minimal parameters.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same change applied here.

/// Precondition: `current` is not allocated or has space for at least one element.
#[inline]
fn push_into(&mut self, item: (D, T, R)) {
let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A risk: the preferred capacity might be rather small, about 200 elements for (Row, Timestamp, Diff), which doesn't justify the cost of building a columnar structure. Think about either increasing this (what we do in the merge batcher, target 64KiB instead of 8KiB), or find an alternative design.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumped to a 64 KiB BUFFER_SIZE_BYTES constant matching ColumnationChunker in src/timely-util/src/columnation.rs. chunk_capacity::<(D, T, R)>() is the per-chunk item count; staging buffer is 2 * chunk_capacity so the consolidate-on-full path triggers at the same fill ratio as before, just at 64 KiB instead of 8 KiB.

Reduce struct and `Default` impl bounds to `(D, T, R): Columnar`; the
DD-style bounds are only needed by the impls that actually consolidate.

Replace `timely::container::buffer::default_capacity` (8 KiB target)
with a 64 KiB `BUFFER_SIZE_BYTES`, matching `ColumnationChunker`. With
the smaller target, the staging buffer for `(Row, Timestamp, Diff)` was
~200 elements, which doesn't justify the cost of building a columnar
container.
Two-level buffering instead of per-tuple push to an inner ColumnBuilder:

  1. AoS staging Vec (16k cap) for in-place `consolidate_updates`. Drains
     a multiple-of-half-cap so the leftover stays in staging — gives
     cross-batch consolidation the same way the prior impl's
     drain-multiple-of-chunk-size trick did.
  2. SoA accumulator: one sub-container per column
     (`D::Container`, `T::Container`, `R::Container`) via the columnar
     tuple decomposition. Drains do three sequential passes (one per
     column) so each pass writes a single cache-line stream and the
     compiler can vectorize for primitive containers. When the
     accumulator's serialized size reaches ~2 MiB it ships as
     `Column::Align`; the trailing partial on `finish` ships as
     `Column::Typed` to skip a serialize copy.

Also change `Column::Align(Region<u64>)` to `Column::Align(Vec<u64>)`
and serialize via `columnar::bytes::indexed::encode` (which builds the
buffer with `Vec::push`/`extend_from_slice`), so the column-mint path
no longer pre-zeroes 2 MiB it's about to overwrite. The lgalloc path
for these aligned buffers goes away with this — `set_enable_columnar_lgalloc`
stays as a no-op for source compatibility with the dyncfg flag.

Throughput on 100M `(u64, u64, i64)` tuples (lgalloc off, single thread,
median of three runs):

  workload          before      after      bare ColumnBuilder
  cancel-pairs      2.6 GB/s    7.1 GB/s   6.0 GB/s
  single-key        2.7 GB/s    8.3 GB/s   6.8 GB/s
  all-distinct      1.7 GB/s    2.7 GB/s   6.4 GB/s
  dup-pairs         2.0 GB/s    3.6 GB/s   6.5 GB/s

Cancel-heavy workloads now beat bare ColumnBuilder because most rows
die in the small staging Vec before reaching any column structure.
The all-distinct floor is the sort tax (~30% of cycles) plus an
unavoidable typed-accumulator → aligned-bytes memcpy (~17%) that
remains because we can't pre-size aligned output across multiple
staging drains.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant