feat(physical-expr): add Partitioning::DynamicRange variant#23094
feat(physical-expr): add Partitioning::DynamicRange variant#23094avantgardnerio wants to merge 2 commits into
Conversation
Adds a new `Partitioning::DynamicRange(DynamicRangePartitioning)`
variant alongside the existing `Partitioning::Range`. Where `Range`
takes its split points as a plan-time constant (declared by a
`TableProvider` or computed by a planner from statistics),
`DynamicRange` describes the same model except the boundary set is
only known once an upstream operator has observed its actual data
range. The implementing operator is expected to discover the range
at `execute()` time and compute interior split points before it
routes the first row.
```rust
pub struct DynamicRangePartitioning {
ordering: LexOrdering,
partition_count: usize,
}
```
The number of output partitions is fixed at plan time so downstream
distribution requirements have a stable answer; only the split point
values are runtime-discovered.
This PR only adds the variant + plumbing. Behavior mirrors `Range`:
- `RepartitionExec`'s `repartitioned()`, `try_pushdown_sort()`, and
projection-pushdown sites return `not_impl_err!` for the new
variant, same as they already do for `Range`.
- `RepartitionExec`'s row-routing path was already a catch-all
`not_impl_err!` for non-Hash/non-RoundRobin variants, so no change
is needed there.
- `Partitioning::compatible_with`, `partition_count`, `project`, and
`PartialEq` arms are added; `DynamicRange` is treated symmetrically
to `Range`.
- FFI bridges to `UnknownPartitioning(n)`, same path `Range` takes
(per apache#22394).
- Proto serialization returns `not_impl_err!`. Proto plumbing for
`DynamicRange` will be added incrementally, mirroring how `Range`
landed in steps.
Three unit tests cover construction + display + partition_count,
`compatible_with` (same/different ordering, same/different
partition_count, single-partition, cross-variant), and `project`
preservation/degradation.
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
Adds a per-side halo distance to `DynamicRangePartitioning` so the
runtime-routing operator can declare "this bucket carries rows
outside its primary range by distance D" at plan time:
pub struct HaloSpec {
preceding: ScalarValue,
following: ScalarValue,
}
pub struct DynamicRangePartitioning {
ordering: LexOrdering,
partition_count: usize,
halo: Option<HaloSpec>, // <- new
}
Distances are in the leading sort key's domain — e.g. for `RANGE
BETWEEN 5 PRECEDING AND 3 FOLLOWING` over an `Int64` key, the halo
is `preceding=5, following=3`. ROWS-frame halo (a count of
neighboring rows) is intentionally not represented here; a separate
variant can be added later if motivated.
Builder pattern keeps the common case (no halo) terse:
DynamicRangePartitioning::new(ordering, k) // disjoint
DynamicRangePartitioning::new(ordering, k).with_halo(halo) // overlapping
This is the API hook for a downstream operator (e.g. a halo-strip
filter) to read `ExtremaKind::Expanded` extrema from the routing
operator: the routing operator publishes the primary range, the
filter trims the halo. Without halo, the partitioning produces
disjoint buckets and a downstream consumer sees
`ExtremaKind::Observed` extrema.
- `compatible_with`: halos must match (or both `None`) to be compatible.
- `project`: halo passes through unchanged. Halo is measured in the
leading sort key's domain; projection must keep that key's
`DataType` stable for the result to be valid.
- `Display`: shape `DynamicRange([{ordering}], {k})` without halo;
`DynamicRange([{ordering}], {k}, halo(preceding=P, following=F))`
with.
Three new tests: halo metadata + display, halo affecting
`compatible_with` (mismatch, plain-vs-halo asymmetry), halo preserved
through `project`.
|
Pushed Why the scope grew: an honest answer to "what do we need before a runtime range repartitioner is implementable" surfaced halo as a missing piece. The routing operator and the downstream halo-strip operator need to agree at plan time on how far each bucket extends beyond its primary range; the field on the partitioning type is the natural carrier for that agreement. Without it, the two operators would need a side channel. Shape: pub struct HaloSpec {
preceding: ScalarValue,
following: ScalarValue,
}
pub struct DynamicRangePartitioning {
ordering: LexOrdering,
partition_count: usize,
halo: Option<HaloSpec>,
}Distances are in the leading sort key's domain. Builder keeps the common (no-halo) case terse: This is the API hook the runtime range repartitioner needs to publish ROWS-frame halo (a count of neighbor rows rather than a domain distance) is intentionally not represented; a separate variant can be added later if motivated.
I'll update the discussion at #23093 to mirror this scope shift. |
| Partitioning::DynamicRange(_) => { | ||
| // Proto plumbing for DynamicRange is intentionally not | ||
| // implemented in the variant-introduction PR and will be | ||
| // added incrementally. See | ||
| // <https://github.com/apache/datafusion/issues/22395>. | ||
| return not_impl_err!( | ||
| "Serialization of DynamicRange partitioning is not implemented" | ||
| ); | ||
| } |
There was a problem hiding this comment.
Following the discussion in the GH issue & PR comments - I don't have a ton of feedback, as I'm still trying to understand the broader context (and Datafusion in general haha!) but if we get alignment here I'm happy to help out with this piece specifically :)
Which issue does this PR close?
Implements the proposal in #23093. (Not using
Closes #23093so the discussion thread can stay open for the broader API conversation.)Rationale for this change
See #23093 for full design rationale. Short version:
Partitioning::Range(landed in #22207) covers the declarative case where split points are known at plan time. This adds the symmetric runtime-discovered sibling — where the boundary set is only known once an upstream operator has observed its actual data range. The partition count stays fixed at plan time so downstream distribution requirements have a stable answer; only the split point values are runtime-discovered.Motivating downstream use case: parallelizing single-partition window functions (RANGE frames, no PARTITION BY) — see the spike at #23026.
What changes are included in this PR?
Variant introduction only — no execution slot in this PR.
Behavior mirrors
Rangeat every match site:Partitioning::partition_count,compatible_with,project,PartialEq,Displayarms added.DynamicRangePartitioninghasnew,ordering,partition_count,compatible_with,project,DisplaymirroringRangePartitioning(minus split-point validation, since there are no split points to validate at plan time).RepartitionExec'srepartitioned(),try_pushdown_sort(), and projection-pushdown sites returnnot_impl_err!for the new variant, same as they already do forRange.RepartitionExec's row-routing path was already a catch-allnot_impl_err!for non-Hash / non-RoundRobin variants, so no change is needed there.UnknownPartitioning(n), same pathRangetakes per Expose Range Partitioning Across FFI Boundaries #22394.not_impl_err!— proto plumbing forDynamicRangewill be added incrementally, mirroring howRangelanded in steps (Add PhysicalPartitioning::Rangeenum variant #22207 → Support logical protobuf serialization for range repartitioning #22787).Are these changes tested?
Three new tests in
datafusion/physical-expr/src/partitioning.rs::tests:test_dynamic_range_partitioning_metadata— construction,Display,partition_count, accessors.test_dynamic_range_partitioning_compatible_with— same ordering and same partition_count → compatible; different partition_count → not; different sort options → not; single-partition / single-partition → always compatible; through thePartitioningenum, including cross-variant (DynamicRange vs declared Range never compatible).test_dynamic_range_partitioning_project_preserves_or_degrades— projection preserves the ordering when the key survives; degrades toUnknownPartitioning(n)(preserving partition count) when the key is dropped.cargo clippy --all-features --all-targets -- -D warnings --no-depsclean.cargo fmt --allclean.cargo test -p datafusion-physical-expr --lib partitioning::: 20 pass (3 new).Are there any user-facing changes?
datafusion::physical_expr:DynamicRangePartitioning,Partitioning::DynamicRangeenum variant.Partitioningmay need a new arm — most upstream code already had one forRangeand can extend it. The crates updated in this PR (physical-plan,proto,ffi) cover the in-tree consumers.