Skip to content

refactor: centralize TopK heap boundary handling#23091

Open
kumarUjjawal wants to merge 3 commits into
apache:mainfrom
kumarUjjawal:refactor/topk-heap-boundary
Open

refactor: centralize TopK heap boundary handling#23091
kumarUjjawal wants to merge 3 commits into
apache:mainfrom
kumarUjjawal:refactor/topk-heap-boundary

Conversation

@kumarUjjawal

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

TopK derives local heap-boundary data in multiple places. This refactor names that boundary and keeps full sort-key bytes, scalar threshold values, and prefix comparison tied to the same heap row.

What changes are included in this PR?

  • Add a private helper for the current local TopK heap boundary.
  • Use it when updating dynamic filter thresholds.
  • Use it for local prefix early-completion checks.
  • Remove the old duplicate scalar threshold extraction path.

Are these changes tested?

Yes

Are there any user-facing changes?

No

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 22, 2026
@kumarUjjawal

Copy link
Copy Markdown
Contributor Author

run benchmark topk_sorted_tpch

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4769862277-612-tqlxx 6.12.68+ #1 SMP Sat May 2 07:49:07 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing refactor/topk-heap-boundary (2a96e34) to 9e68a86 (merge-base) diff using: topk_sorted_tpch
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

Comparing HEAD and refactor_topk-heap-boundary
--------------------
Benchmark run_topk_sorted_tpch.json
--------------------
┏━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃                           HEAD ┃    refactor_topk-heap-boundary ┃        Change ┃
┡━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1    │    2.24 / 2.94 ±0.85 / 4.29 ms │    2.16 / 2.68 ±0.85 / 4.37 ms │ +1.10x faster │
│ Q2    │    2.33 / 2.47 ±0.13 / 2.64 ms │    2.37 / 2.49 ±0.09 / 2.60 ms │     no change │
│ Q3    │ 32.03 / 32.54 ±0.40 / 32.94 ms │ 31.94 / 32.10 ±0.20 / 32.47 ms │     no change │
│ Q4    │    2.66 / 2.78 ±0.11 / 2.97 ms │    2.64 / 2.73 ±0.13 / 2.98 ms │     no change │
│ Q5    │ 10.40 / 10.47 ±0.09 / 10.65 ms │ 10.09 / 10.26 ±0.12 / 10.43 ms │     no change │
│ Q6    │ 17.65 / 17.89 ±0.24 / 18.23 ms │ 17.44 / 17.72 ±0.21 / 18.05 ms │     no change │
│ Q7    │ 37.90 / 38.37 ±0.45 / 39.00 ms │ 37.62 / 38.19 ±0.54 / 39.08 ms │     no change │
│ Q8    │    3.49 / 3.88 ±0.64 / 5.16 ms │    3.42 / 3.82 ±0.74 / 5.30 ms │     no change │
│ Q9    │    4.87 / 4.92 ±0.04 / 4.98 ms │    4.79 / 5.41 ±1.14 / 7.68 ms │  1.10x slower │
│ Q10   │    6.97 / 7.00 ±0.02 / 7.03 ms │    7.00 / 7.20 ±0.32 / 7.84 ms │     no change │
│ Q11   │    3.82 / 3.92 ±0.07 / 4.03 ms │    3.79 / 3.86 ±0.07 / 3.98 ms │     no change │
└───────┴────────────────────────────────┴────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary                          ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (HEAD)                          │ 127.18ms │
│ Total Time (refactor_topk-heap-boundary)   │ 126.46ms │
│ Average Time (HEAD)                        │  11.56ms │
│ Average Time (refactor_topk-heap-boundary) │  11.50ms │
│ Queries Faster                             │        1 │
│ Queries Slower                             │        1 │
│ Queries with No Change                     │        9 │
│ Queries with Failure                       │        0 │
└────────────────────────────────────────────┴──────────┘

Resource Usage

topk_sorted_tpch — base (merge-base)

Metric Value
Wall time 5.0s
Peak memory 260.0 KiB
Avg memory 43.3 KiB
CPU user 0.0s
CPU sys 0.0s
Peak spill 0 B

topk_sorted_tpch — branch

Metric Value
Wall time 5.0s
Peak memory 220.0 KiB
Avg memory 36.7 KiB
CPU user 0.0s
CPU sys 0.0s
Peak spill 0 B

File an issue against this benchmark runner

@geoffreyclaude

Copy link
Copy Markdown
Contributor

Nice cleanup. I'm wondering if TopKHeapBoundary should hold both the heap row and the batch it points to, instead of having threshold_values take &TopKHeap and look back into heap.store.

That would make it clearer that the sort-key bytes, scalar values, and prefix row all come from the same row.

@kumarUjjawal

Copy link
Copy Markdown
Contributor Author

Nice cleanup. I'm wondering if TopKHeapBoundary should hold both the heap row and the batch it points to, instead of having threshold_values take &TopKHeap and look back into heap.store.

That would make it clearer that the sort-key bytes, scalar values, and prefix row all come from the same row.

Thank you @geoffreyclaude for the feedback!

@kosiew kosiew left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kumarUjjawal

Looks 👍 to me

) -> Result<Vec<ScalarValue>> {
let mut scalar_values = Vec::with_capacity(sort_exprs.len());
for sort_expr in sort_exprs {
let expr = Arc::clone(&sort_expr.expr);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small cleanup suggestion: since expr is only used for this one evaluate call, the Arc clone can be avoided.

This could be written directly as:
let value = sort_expr.expr.evaluate(&batch_entry.batch.slice(self.row.index, 1))?;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Refactor: Centralize TopK heap-boundary encoding and checks

4 participants