Skip to content

[python] Add predicate-driven bucket pruning for HASH_FIXED tables#7744

Open
TheR1sing3un wants to merge 1 commit intoapache:masterfrom
TheR1sing3un:py-bucket-pruning-hash-fixed
Open

[python] Add predicate-driven bucket pruning for HASH_FIXED tables#7744
TheR1sing3un wants to merge 1 commit intoapache:masterfrom
TheR1sing3un:py-bucket-pruning-hash-fixed

Conversation

@TheR1sing3un
Copy link
Copy Markdown
Member

Purpose

Today, on a HASH_FIXED PK table with N buckets, a point query like

table.new_read_builder().with_filter(pb.equal('id', 'X')).new_scan().plan().splits()

scans every bucket — N times the manifest decoding, N times the file open. Java's org.apache.paimon.operation.BucketSelectConverter solves this at the manifest-entry filter layer: an Equal/In predicate on the bucket-key columns derives a finite set of buckets the query can possibly hit, and entries outside that set are skipped before any stats decoding.

This PR ports the same pattern to pypaimon. PK point queries now touch a single bucket; PK IN queries touch only the buckets covering the literal set.

Mechanism

pypaimon/read/scanner/bucket_select_converter.py (new):

  • Walk the predicate, isolate AND clauses that constrain bucket-key fields with equal / in, take the cartesian product of literal values (capped at MAX_VALUES=1000), hash each combination using _hash_bytes_by_words / _bucket_from_hash from pypaimon.write.row_key_extractor, and return a callable selector(bucket, total_buckets) -> bool.
  • Cached per total_buckets — handles the rescale case where bucket count varies between manifest entries.

Conservative scope, deliberately narrower than Java's general flexibility:

  • Only HASH_FIXED tables (the caller is responsible for gating).
  • All bucket-key fields must be constrained — otherwise None, full scan.
  • Repeated constraints on the same column under top-level AND (id = 1 AND id = 2) → None. Java does the same rather than reasoning about unsatisfiability.
  • Cartesian product cap at MAX_VALUES = 1000 — above that, fall back to full scan.

pypaimon/read/scanner/file_scanner.py:

  • Enable the selector only when table.bucket_mode() == BucketMode.HASH_FIXED.
  • Derive the bucket-key fields by instantiating the writer's FixedBucketRowKeyExtractor and reading _bucket_key_fields. Reusing the writer class — rather than re-implementing bucket-key resolution on the read side — is the safety net against future write-side resolution changes that would otherwise silently break read/write hash agreement and lose data.
  • Apply the selector inside _filter_manifest_entry, after the bucket validity check and before partition / stats decoding, so it short-circuits the hot path on point queries.

Soundness

The selector returns a superset of the buckets containing matching rows. False-positive (over-keep) is fine; false-negative is silent data loss and must never happen.

  • total_buckets <= 0 (legacy / special manifest entries that the writer placed under a different convention) → fail open.
  • Any hashing / serialization error inside the deferred hash — e.g. pb.equal('id_bigint', 'foo') so GenericRowSerializer.to_bytes raises struct.error mid-scan — is caught and the selector fails open. Crashing the entire scan with an opaque error would be a worse user experience than silently skipping pruning, and the soundness contract is preserved.

Linked issue

N/A — surfaced when running PK point lookups against tables with non-trivial bucket counts and seeing every bucket in the resulting splits list.

Tests

New pypaimon/tests/pushdown_bucket_test.py, three layers, 25 cases:

  • Layer 1 — Unit (17 cases): direct create_bucket_selector calls — Equal / In / OR-of-Equals / composite-key cartesian / unconstrained-key returns None / non-bucket-key returns None / range returns None / OR with non-bucket-key returns None / repeated AND on same key returns None / unrelated AND clause is unaffected / cartesian above cap returns None / null-only literal collapses to empty bucket set / no-predicate returns None / no-bucket-keys returns None / per-total_buckets cache (rescale) / total_buckets <= 0 fails open / type-mismatched literal fails open.
  • Layer 2 — Integration (5 cases): real PK tables (8 buckets), public API. Asserts BOTH result correctness AND that pruning fired ({s.bucket for s in splits} has the expected size). Includes a "selector must be None" assertion for the value-only predicate so a buggy selector that prunes wrongly but happens to keep the test rows still fails.
  • Layer 3 — Property (60 random trials, deterministic seed): random bucket counts × random PKs × random Equal/In, result == oracle. Uses seeded random.Random rather than hypothesis so the PR doesn't introduce a new dev dependency and the test stays Python 3.6 compatible.

Local: pytest pypaimon/tests/pushdown_bucket_test.py → 25 passed; surrounding suites (predicates_test, reader_split_generator_test, reader_primary_key_test, identifier_test) → 73 passed, 2 failed (pre-existing lance environment issues unrelated to this PR); flake8 --config=dev/cfg.ini clean.

API and format

No public API change. No file format change. Read result is unchanged for any predicate; only the set of manifest entries actually decoded shrinks for predicates the selector can match.

Documentation

Inline docstrings on create_bucket_selector, _Selector, and FileScanner._init_bucket_selector document the selector contract (superset semantics, fail-open conditions, why bucket-key fields come from the writer's extractor) so future maintainers don't accidentally regress soundness.

Generative AI disclosure

Drafted with assistance from an AI coding tool; the design follows org.apache.paimon.operation.BucketSelectConverter and the soundness contract is exercised end-to-end by the three-layer test suite.

Mirrors Java's org.apache.paimon.operation.BucketSelectConverter at the
manifest-entry filter layer: PK Equal/In predicates derive a finite set
of buckets the query can hit, and entries outside that set are skipped.
PK point queries (`pk = 'X'`) now touch a single bucket instead of the
full bucket count.

bucket_select_converter.py
  Walk the predicate, isolate AND clauses that constrain bucket-key
  fields with Equal/In, take the cartesian product of literal values
  (capped at MAX_VALUES=1000), hash each combination using the writer's
  ``_hash_bytes_by_words`` / ``_bucket_from_hash`` from RowKeyExtractor,
  and return a callable selector. Cached per total_buckets to handle
  rescale.

  Conservative scope, deliberately narrower than Java's general
  flexibility:
    * Only HASH_FIXED tables (caller's responsibility to gate).
    * All bucket-key fields must be constrained, with Equal or In, in a
      single AND-of-OR-of-literals shape — otherwise None.
    * Repeated constraints on the same column under top-level AND
      (e.g. ``id = 1 AND id = 2``) → None. Java does the same rather
      than reasoning about unsatisfiability.
    * Cartesian product cap at MAX_VALUES=1000 — above that, fall back
      to full scan.

  Soundness contract:
    * Selector returns a SUPERSET of buckets containing matching rows.
      False-positive (over-keep) fine; false-negative is silent data
      loss and never happens.
    * total_buckets <= 0 (legacy / special manifest entries) → fail
      open: must NOT drop rows the writer placed under a different
      convention.
    * Any hashing/serialization error inside the deferred hash (e.g. a
      literal type that doesn't match the bucket-key column's atomic
      type — STRING literal on a BIGINT column makes
      GenericRowSerializer.to_bytes raise struct.error) is caught and
      the selector fails open. Crashing the entire scan with an opaque
      error is a worse user experience than silently skipping pruning.

file_scanner.py
  Enable the selector only for BucketMode.HASH_FIXED. Bucket-key fields
  are derived by instantiating the writer's FixedBucketRowKeyExtractor
  and reading back ``_bucket_key_fields``. Reusing the writer class
  (rather than re-implementing bucket-key resolution) is the safety net
  against future write-side resolution changes that would otherwise
  break read/write hash agreement.

  Apply the selector in ``_filter_manifest_entry`` after the bucket
  validity check and before partition / stats decoding — it's the
  cheapest possible discriminator and short-circuits the rest of the
  hot path on point queries.

Tests (pushdown_bucket_test.py, three layers, 25 cases):

  Layer 1 — Unit (17 cases): direct ``create_bucket_selector`` calls
    covering Equal / In / OR / composite-keys / cap / null literals /
    rescale / fail-open / type-mismatched-literal-fails-open.
  Layer 2 — Integration (5 cases): real PK tables, public API,
    asserts BOTH result correctness AND that pruning fired (split
    bucket count). Includes a "selector must be None" assertion for
    value-only predicates so a buggy selector that prunes wrongly but
    happens to keep the test rows would still fail.
  Layer 3 — Property (60 random trials, deterministic seed): random
    bucket counts × random PKs × random Equal/In; result == oracle.
    Uses seeded ``random.Random`` rather than hypothesis so we don't
    need a new dev dependency and stay Python 3.6 compatible.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant