From 1b21424bd5d583a566d18cd82bf7580a8729d9bc Mon Sep 17 00:00:00 2001 From: "yannan.wyn" Date: Wed, 15 Apr 2026 10:38:03 +0800 Subject: [PATCH 1/3] fix(bthread): refactor sharded priority queue with lock-free MPSC inbound Replace single WorkStealingQueue priority path with per-tag sharded PriorityShard (MPSCQueue inbound + WSQ + atomic owner lifecycle). Steal_task includes salvage logic for ownerless shards to prevent task starvation. Gated by FLAGS_enable_bthread_priority_queue. --- src/bthread/task_control.cpp | 185 ++++++- src/bthread/task_control.h | 29 +- src/bthread/task_group.h | 3 + test/bthread_priority_queue_benchmark.cpp | 478 ++++++++++++++++++ .../bthread_priority_queue_owner_unittest.cpp | 264 ++++++++++ test/bthread_priority_queue_unittest.cpp | 248 +++++++++ 6 files changed, 1197 insertions(+), 10 deletions(-) create mode 100644 test/bthread_priority_queue_benchmark.cpp create mode 100644 test/bthread_priority_queue_owner_unittest.cpp create mode 100644 test/bthread_priority_queue_unittest.cpp diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index ba067e3976..be36a3e863 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -57,6 +57,7 @@ DEFINE_bool(parking_lot_no_signal_when_no_waiter, false, "ParkingLot doesn't signal when there is no waiter. " "In busy worker scenarios, signal overhead can be reduced."); DEFINE_bool(enable_bthread_priority_queue, false, "Whether to enable priority queue"); +DEFINE_int32(priority_queue_shards, 4, "Number of priority queue shards per tag"); DECLARE_int32(bthread_concurrency); DECLARE_int32(bthread_min_concurrency); @@ -151,6 +152,9 @@ TaskGroup* TaskControl::create_group(bthread_tag_t tag) { delete g; return NULL; } + if (_enable_priority_queue) { + bind_priority_owner(g, tag); + } return g; } @@ -205,7 +209,7 @@ TaskControl::TaskControl() , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") , _enable_priority_queue(FLAGS_enable_bthread_priority_queue) - , _priority_queues(FLAGS_task_group_ntags) + , _priority_shards(FLAGS_task_group_ntags) , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag) , _tagged_pl(FLAGS_task_group_ntags) {} @@ -238,9 +242,23 @@ int TaskControl::init(int concurrency) { _tagged_worker_usage_second.push_back(new bvar::PerSecond>( "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1)); _tagged_nbthreads.push_back(new bvar::Adder("bthread_count", tag_str)); - if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) { - LOG(ERROR) << "Fail to init _priority_q"; - return -1; + if (_enable_priority_queue) { + const int workers_per_tag = concurrency / FLAGS_task_group_ntags; + int nshard = std::min(workers_per_tag, FLAGS_priority_queue_shards); + if (nshard < 1) { + nshard = 1; + } + _priority_shards[i].reserve(nshard); + const size_t wsq_cap = BTHREAD_MAX_CONCURRENCY; + for (int s = 0; s < nshard; ++s) { + std::unique_ptr shard(new PriorityShard); + if (shard->wsq.init(wsq_cap) != 0) { + LOG(ERROR) << "Fail to init priority shard wsq, tag=" << i << " shard=" << s; + return -1; + } + // inbound is butil::MPSCQueue + _priority_shards[i].push_back(std::move(shard)); + } } } @@ -489,6 +507,9 @@ int TaskControl::_destroy_group(TaskGroup* g) { { BAIDU_SCOPED_LOCK(_modify_group_mutex); auto tag = g->tag(); + if (_enable_priority_queue) { + unbind_priority_owner(g, tag); + } auto& groups = tag_group(tag); const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed); for (size_t i = 0; i < ngroup; ++i) { @@ -528,8 +549,47 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { auto tag = tls_task_group->tag(); - if (_priority_queues[tag].steal(tid)) { - return true; + // priority queue: owner-first, then steal from other shards + if (_enable_priority_queue && !_priority_shards[tag].empty()) { + auto& shards = _priority_shards[tag]; + const size_t nshard = shards.size(); + + // Owner-first: if current TaskGroup owns a shard, flush and pop + const int my_shard = tls_task_group->_priority_shard_index; + if (my_shard >= 0 && (size_t)my_shard < nshard) { + PriorityShard* shard = shards[my_shard].get(); + if (shard->owner.load(butil::memory_order_relaxed) == tls_task_group) { + static const size_t kFlushBatch = 8; + flush_priority_inbound(shard, kFlushBatch); + if (shard->wsq.pop(tid)) { + return true; + } + } + } + + // Steal from all shards (random start to avoid hot spot) + size_t start = butil::fast_rand() % nshard; + for (size_t i = 0; i < nshard; ++i) { + size_t idx = (start + i) % nshard; + if (shards[idx]->wsq.steal(tid)) { + return true; + } + } + + // Salvage: drain ownerless shards' inbound to prevent task starvation. + // This handles the TOCTOU race where a producer enqueues after unbind + // finishes draining but before a new owner binds. + for (size_t i = 0; i < nshard; ++i) { + size_t idx = (start + i) % nshard; + PriorityShard* shard = shards[idx].get(); + if (shard->owner.load(butil::memory_order_relaxed) == NULL && + !shard->draining.load(butil::memory_order_relaxed)) { + bthread_t salvaged; + if (shard->inbound.Dequeue(salvaged)) { + fallback_enqueue(tag, salvaged); + } + } + } } // 1: Acquiring fence is paired with releasing fence in _add_group to @@ -689,4 +749,117 @@ std::vector TaskControl::get_living_bthreads() { return living_bthread_ids; } +void TaskControl::push_priority_queue(bthread_tag_t tag, bthread_t tid) { + if (!_enable_priority_queue || _priority_shards[tag].empty()) { + fallback_enqueue(tag, tid); + return; + } + auto& shards = _priority_shards[tag]; + const size_t nshard = shards.size(); + + // thread_local round-robin, zero contention + static BAIDU_THREAD_LOCAL size_t tl_rr = 0; + size_t start = tl_rr++ % nshard; + + // Prefer shards that have an active owner (not draining) + for (size_t i = 0; i < nshard; ++i) { + size_t idx = (start + i) % nshard; + if (shards[idx]->owner.load(butil::memory_order_relaxed) != NULL && + !shards[idx]->draining.load(butil::memory_order_relaxed)) { + shards[idx]->inbound.Enqueue(tid); + return; + } + } + + // All shards ownerless, fallback to round-robin pick + shards[start]->inbound.Enqueue(tid); +} + +void TaskControl::bind_priority_owner(TaskGroup* g, bthread_tag_t tag) { + auto& shards = _priority_shards[tag]; + if (shards.empty()) { + return; + } + const size_t nshard = shards.size(); + size_t start = butil::fast_rand() % nshard; + for (size_t i = 0; i < nshard; ++i) { + size_t idx = (start + i) % nshard; + // Skip shards being drained + if (shards[idx]->draining.load(butil::memory_order_acquire)) { + continue; + } + TaskGroup* expected = NULL; + if (shards[idx]->owner.compare_exchange_strong( + expected, g, + butil::memory_order_release, + butil::memory_order_relaxed)) { + g->_priority_shard_index = static_cast(idx); + return; + } + } + // All shards occupied, this group won't own a shard (will only steal) +} + +void TaskControl::unbind_priority_owner(TaskGroup* g, bthread_tag_t tag) { + const int idx = g->_priority_shard_index; + if (idx < 0) { + return; + } + auto& shards = _priority_shards[tag]; + if ((size_t)idx >= shards.size()) { + return; + } + PriorityShard* shard = shards[idx].get(); + if (shard->owner.load(butil::memory_order_relaxed) != g) { + g->_priority_shard_index = -1; + return; + } + + // Mark draining to prevent new owner from binding + shard->draining.store(true, butil::memory_order_release); + shard->owner.store(NULL, butil::memory_order_release); + + // Drain inbound + bthread_t tid; + while (shard->inbound.Dequeue(tid)) { + fallback_enqueue(tag, tid); + } + // Drain wsq, steal since we're no longer the owner + while (shard->wsq.steal(&tid)) { + fallback_enqueue(tag, tid); + } + + // Allow new owner to bind + shard->draining.store(false, butil::memory_order_release); + g->_priority_shard_index = -1; +} + +void TaskControl::flush_priority_inbound(PriorityShard* shard, size_t max_batch) { + bthread_t tid; + for (size_t i = 0; i < max_batch; ++i) { + if (!shard->inbound.Dequeue(tid)) { + break; + } + if (!shard->wsq.push(tid)) { + // wsq full, push back won't work; fallback this task + fallback_enqueue(tls_task_group->tag(), tid); + break; + } + } +} + +void TaskControl::fallback_enqueue(bthread_tag_t tag, bthread_t tid) { + // Clear BTHREAD_GLOBAL_PRIORITY flag to prevent re-entering priority queue + TaskMeta* m = TaskGroup::address_meta(tid); + if (m) { + m->attr.flags &= ~BTHREAD_GLOBAL_PRIORITY; + } + // Enqueue to a random group's remote_rq, thenormal scheduling path + TaskGroup* g = choose_one_group(tag); + if (g) { + g->_remote_rq.push(tid); + signal_task(1, tag); + } +} + } // namespace bthread diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 4480daa677..cff1dcfc1c 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -35,6 +35,7 @@ #include "bthread/task_tracer.h" #include "bthread/task_meta.h" // TaskMeta #include "bthread/work_stealing_queue.h" // WorkStealingQueue +#include "butil/containers/mpsc_queue.h" // MPSCQueue #include "bthread/parking_lot.h" DECLARE_int32(task_group_ntags); @@ -42,6 +43,22 @@ namespace bthread { class TaskGroup; +// A sharded priority queue slot. Each shard has: +// - wsq: WorkStealingQueue owned by exactly one TaskGroup (push/pop by owner, steal by others) +// - inbound: MPSC queue for external producers (event dispatchers) to submit tasks +// - owner: the TaskGroup that owns this shard (does flush + pop) +// - draining: set during owner teardown to prevent new owner binding +struct BAIDU_CACHELINE_ALIGNMENT PriorityShard { + WorkStealingQueue wsq; + + butil::MPSCQueue inbound; + + butil::atomic owner; + butil::atomic draining; + + PriorityShard() : owner(NULL), draining(false) {} +}; + // Control all task groups class TaskControl { friend class TaskGroup; @@ -101,9 +118,7 @@ friend bthread_t init_for_pthread_stack_trace(); std::string stack_trace(bthread_t tid); #endif // BRPC_BTHREAD_TRACER - void push_priority_queue(bthread_tag_t tag, bthread_t tid) { - _priority_queues[tag].push(tid); - } + void push_priority_queue(bthread_tag_t tag, bthread_t tid); std::vector get_living_bthreads(); private: @@ -164,7 +179,13 @@ friend bthread_t init_for_pthread_stack_trace(); std::vector*> _tagged_nbthreads; bool _enable_priority_queue; - std::vector> _priority_queues; + std::vector>> _priority_shards; // [tag][shard] + + // B2 priority queue helpers + void bind_priority_owner(TaskGroup* g, bthread_tag_t tag); + void unbind_priority_owner(TaskGroup* g, bthread_tag_t tag); + void flush_priority_inbound(PriorityShard* shard, size_t max_batch); + void fallback_enqueue(bthread_tag_t tag, bthread_t tid); size_t _pl_num_of_each_tag; std::vector _tagged_pl; diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index 54140c0dc2..5b17099771 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -377,6 +377,9 @@ friend class TaskControl; // tag of this taskgroup bthread_tag_t _tag{BTHREAD_TAG_DEFAULT}; + // Index of the priority shard this TaskGroup owns (-1 = not bound) + int _priority_shard_index{-1}; + // Worker thread id. pthread_t _tid{}; }; diff --git a/test/bthread_priority_queue_benchmark.cpp b/test/bthread_priority_queue_benchmark.cpp new file mode 100644 index 0000000000..1ddd5f6c0f --- /dev/null +++ b/test/bthread_priority_queue_benchmark.cpp @@ -0,0 +1,478 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Microbenchmark for B2 priority queue operations: +// 1. inbound (MPSCQueue) enqueue+dequeue cost +// 2. inbound multi-producer contention +// 3. flush inbound -> wsq cost (owner side) +// 4. wsq push+pop cost (owner side) +// 5. wsq steal cost (non-owner side) +// 6. full pipeline: producer -> inbound -> flush -> pop + +#include +#include +#include +#include +#include "butil/time.h" +#include "bthread/bthread.h" +#include "bthread/work_stealing_queue.h" +#include "bthread/remote_task_queue.h" +#include "butil/containers/mpsc_queue.h" +#include "bthread/processor.h" + +using bthread::WorkStealingQueue; +using bthread::RemoteTaskQueue; + +static const size_t WSQ_CAP = 1024; +static const size_t INBOUND_CAP = 4096; + +// ============================================================ +// Benchmark 1: wsq push+pop (owner, single thread) +// ============================================================ +void bench_wsq_pop() { + WorkStealingQueue wsq; + wsq.init(WSQ_CAP); + + const int N = 500000; + butil::Timer tm; + tm.start(); + for (int i = 0; i < N; ++i) { + wsq.push((bthread_t)i); + bthread_t tid; + wsq.pop(&tid); + } + tm.stop(); + printf(" wsq push+pop (owner): %6.1f ns/op (%d ops)\n", + tm.n_elapsed() / (double)N, N); +} + +// ============================================================ +// Benchmark 5: wsq steal (non-owner threads + owner push) +// ============================================================ +struct StealBenchArg { + WorkStealingQueue* wsq; + std::atomic* stop; + int stolen; +}; + +void* steal_bench_thread(void* arg) { + StealBenchArg* a = static_cast(arg); + a->stolen = 0; + while (!a->stop->load(std::memory_order_relaxed)) { + bthread_t tid; + if (a->wsq->steal(&tid)) { + ++a->stolen; + } else { + cpu_relax(); + } + } + return NULL; +} + +void bench_wsq_steal(int nstealer) { + WorkStealingQueue wsq; + wsq.init(WSQ_CAP); + + const int N = 200000; + std::atomic stop(false); + + std::vector threads(nstealer); + std::vector args(nstealer); + for (int i = 0; i < nstealer; ++i) { + args[i] = {&wsq, &stop, 0}; + pthread_create(&threads[i], NULL, steal_bench_thread, &args[i]); + } + + butil::Timer tm; + tm.start(); + for (int i = 0; i < N; ++i) { + while (!wsq.push((bthread_t)i)) { + cpu_relax(); + } + } + stop.store(true, std::memory_order_release); + tm.stop(); + + int total_stolen = 0; + for (int i = 0; i < nstealer; ++i) { + pthread_join(threads[i], NULL); + total_stolen += args[i].stolen; + } + bthread_t tid; + int remaining = 0; + while (wsq.pop(&tid)) ++remaining; + + printf(" wsq steal (%d stealers): %6.1f ns/push (pushed=%d stolen=%d remain=%d)\n", + nstealer, tm.n_elapsed() / (double)N, N, total_stolen, remaining); +} + +// ============================================================ +// B2 Inbound (MPSCQueue, lock-free) +// ============================================================ + +// MPSC single-thread enqueue+dequeue +void bench_mpsc_single() { + butil::MPSCQueue q; + const int N = 200000; + butil::Timer tm; + tm.start(); + for (int i = 0; i < N; ++i) { + q.Enqueue((bthread_t)i); + bthread_t out; + q.Dequeue(out); + } + tm.stop(); + printf(" mpsc enqueue+dequeue (single): %6.1f ns/op (%d ops)\n", + tm.n_elapsed() / (double)N, N); +} + +// MPSC multi-producer context +struct MpscBenchCtx { + butil::MPSCQueue* q; + std::atomic produced{0}; + std::atomic go{false}; + int total; + int per_producer; +}; + +void* mpsc_producer(void* arg) { + MpscBenchCtx* ctx = static_cast(arg); + while (!ctx->go.load(std::memory_order_acquire)) { + cpu_relax(); + } + for (int i = 0; i < ctx->per_producer; ++i) { + ctx->q->Enqueue((bthread_t)i); + ctx->produced.fetch_add(1, std::memory_order_relaxed); + } + return NULL; +} + +void bench_mpsc_multi(int nproducer) { + butil::MPSCQueue q; + const int PER_PRODUCER = 50000; + MpscBenchCtx ctx; + ctx.q = &q; + ctx.total = nproducer * PER_PRODUCER; + ctx.per_producer = PER_PRODUCER; + + std::vector threads(nproducer); + for (int i = 0; i < nproducer; ++i) { + pthread_create(&threads[i], NULL, mpsc_producer, &ctx); + } + + butil::Timer tm; + tm.start(); + ctx.go.store(true, std::memory_order_release); + + int drained = 0; + while (drained < ctx.total) { + bthread_t tid; + if (q.Dequeue(tid)) { + ++drained; + } else { + sched_yield(); + } + } + + for (int i = 0; i < nproducer; ++i) { + pthread_join(threads[i], NULL); + } + tm.stop(); + + printf(" mpsc push (%d producers): %6.1f ns/op (%d ops)\n", + nproducer, tm.n_elapsed() / (double)ctx.total, ctx.total); +} + +// MPSC flush -> wsq (same pattern as B2 but with MPSCQueue) +void bench_mpsc_flush(int batch_size) { + butil::MPSCQueue q; + WorkStealingQueue wsq; + wsq.init(WSQ_CAP); + + const int ROUNDS = 20000; + butil::Timer tm; + tm.start(); + for (int r = 0; r < ROUNDS; ++r) { + for (int i = 0; i < batch_size; ++i) { + q.Enqueue((bthread_t)i); + } + bthread_t tid; + for (int i = 0; i < batch_size; ++i) { + if (!q.Dequeue(tid)) break; + wsq.push(tid); + } + while (wsq.pop(&tid)) {} + } + tm.stop(); + int total_ops = ROUNDS * batch_size; + printf(" mpsc flush->wsq (batch=%2d): %6.1f ns/op (%d ops)\n", + batch_size, tm.n_elapsed() / (double)total_ops, total_ops); +} + +// MPSC full pipeline (same as bench_pipeline but with MPSCQueue) +void* mpsc_pipeline_producer(void* arg) { + MpscBenchCtx* ctx = static_cast(arg); + while (!ctx->go.load(std::memory_order_acquire)) { + cpu_relax(); + } + for (int i = 0; i < ctx->per_producer; ++i) { + ctx->q->Enqueue((bthread_t)i); + } + return NULL; +} + +void bench_mpsc_pipeline(int nproducer) { + butil::MPSCQueue q; + WorkStealingQueue wsq; + wsq.init(WSQ_CAP); + + const int PER_PRODUCER = 50000; + const int TOTAL = nproducer * PER_PRODUCER; + const int FLUSH_BATCH = 8; + + MpscBenchCtx ctx; + ctx.q = &q; + ctx.total = TOTAL; + ctx.per_producer = PER_PRODUCER; + + std::vector producers(nproducer); + for (int i = 0; i < nproducer; ++i) { + pthread_create(&producers[i], NULL, mpsc_pipeline_producer, &ctx); + } + + butil::Timer tm; + tm.start(); + ctx.go.store(true, std::memory_order_release); + + int consumed = 0; + while (consumed < TOTAL) { + bthread_t tid; + for (int i = 0; i < FLUSH_BATCH; ++i) { + if (!q.Dequeue(tid)) break; + if (!wsq.push(tid)) { + ++consumed; + break; + } + } + while (wsq.pop(&tid)) { + ++consumed; + } + if (consumed < TOTAL) { + cpu_relax(); + } + } + + for (int i = 0; i < nproducer; ++i) { + pthread_join(producers[i], NULL); + } + tm.stop(); + + printf(" mpsc pipeline (%d prod -> flush%d -> pop): %6.1f ns/task (%d tasks)\n", + nproducer, FLUSH_BATCH, tm.n_elapsed() / (double)TOTAL, TOTAL); +} + +// ============================================================ +// Baseline: normal bthread scheduling path primitives +// ============================================================ + +// Baseline A: _rq push+pop (local worker path: WSQ, same as owner wsq) +// This is identical to bench_wsq_pop — included for clarity in comparison. +void bench_baseline_rq() { + WorkStealingQueue rq; + rq.init(WSQ_CAP); + + const int N = 500000; + butil::Timer tm; + tm.start(); + for (int i = 0; i < N; ++i) { + rq.push((bthread_t)i); + bthread_t tid; + rq.pop(&tid); + } + tm.stop(); + printf(" _rq push+pop (local worker): %6.1f ns/op (%d ops)\n", + tm.n_elapsed() / (double)N, N); +} + +// Baseline B: _remote_rq push+pop (non-worker path: mutex + BoundedQueue) +struct InboundBenchCtx { + RemoteTaskQueue* inbound; + std::atomic produced{0}; + std::atomic consumed{0}; + std::atomic go{false}; + int total; + int per_producer; +}; + +void* inbound_producer(void* arg) { + InboundBenchCtx* ctx = static_cast(arg); + while (!ctx->go.load(std::memory_order_acquire)) { + cpu_relax(); + } + for (int i = 0; i < ctx->per_producer; ++i) { + while (!ctx->inbound->push((bthread_t)i)) { + sched_yield(); + } + ctx->produced.fetch_add(1, std::memory_order_relaxed); + } + return NULL; +} +void bench_baseline_remote_rq_single() { + RemoteTaskQueue remote_rq; + remote_rq.init(INBOUND_CAP); + + const int N = 200000; + butil::Timer tm; + tm.start(); + for (int i = 0; i < N; ++i) { + remote_rq.push((bthread_t)i); + bthread_t out; + remote_rq.pop(&out); + } + tm.stop(); + printf(" _remote_rq push+pop (single): %6.1f ns/op (%d ops)\n", + tm.n_elapsed() / (double)N, N); +} + +// Baseline C: _remote_rq multi-producer (same as inbound multi-producer) +void bench_baseline_remote_rq_multi(int nproducer) { + RemoteTaskQueue remote_rq; + remote_rq.init(INBOUND_CAP); + + const int PER_PRODUCER = 50000; + InboundBenchCtx ctx; + ctx.inbound = &remote_rq; + ctx.total = nproducer * PER_PRODUCER; + ctx.per_producer = PER_PRODUCER; + + std::vector threads(nproducer); + for (int i = 0; i < nproducer; ++i) { + pthread_create(&threads[i], NULL, inbound_producer, &ctx); + } + + butil::Timer tm; + tm.start(); + ctx.go.store(true, std::memory_order_release); + + int drained = 0; + while (drained < ctx.total) { + bthread_t tid; + if (remote_rq.pop(&tid)) { + ++drained; + } else { + sched_yield(); + } + } + + for (int i = 0; i < nproducer; ++i) { + pthread_join(threads[i], NULL); + } + tm.stop(); + + printf(" _remote_rq push (%d producers): %6.1f ns/op (%d ops)\n", + nproducer, tm.n_elapsed() / (double)ctx.total, ctx.total); +} + +// Baseline D: _rq steal (non-owner steals from another worker's _rq) +void bench_baseline_rq_steal(int nstealer) { + WorkStealingQueue rq; + rq.init(WSQ_CAP); + + const int N = 200000; + std::atomic stop(false); + + std::vector threads(nstealer); + std::vector args(nstealer); + for (int i = 0; i < nstealer; ++i) { + args[i] = {&rq, &stop, 0}; + pthread_create(&threads[i], NULL, steal_bench_thread, &args[i]); + } + + butil::Timer tm; + tm.start(); + for (int i = 0; i < N; ++i) { + while (!rq.push((bthread_t)i)) { + cpu_relax(); + } + } + stop.store(true, std::memory_order_release); + tm.stop(); + + int total_stolen = 0; + for (int i = 0; i < nstealer; ++i) { + pthread_join(threads[i], NULL); + total_stolen += args[i].stolen; + } + bthread_t tid; + int remaining = 0; + while (rq.pop(&tid)) ++remaining; + + printf(" _rq steal (%d stealers): %6.1f ns/push (pushed=%d stolen=%d remain=%d)\n", + nstealer, tm.n_elapsed() / (double)N, N, total_stolen, remaining); +} + +// ============================================================ +int main() { + printf("=== B2 Priority Queue Microbenchmark ===\n\n"); + + printf("--- B2: Inbound (MPSCQueue, lock-free) ---\n"); + bench_mpsc_single(); + bench_mpsc_multi(2); + bench_mpsc_multi(4); + bench_mpsc_multi(8); + + printf("\n--- B2: Flush inbound -> wsq ---\n"); + bench_mpsc_flush(4); + bench_mpsc_flush(8); + bench_mpsc_flush(16); + bench_mpsc_flush(32); + + printf("\n--- B2: WSQ owner push+pop ---\n"); + bench_wsq_pop(); + + printf("\n--- B2: WSQ steal (with concurrent push) ---\n"); + bench_wsq_steal(1); + bench_wsq_steal(2); + bench_wsq_steal(4); + bench_wsq_steal(8); + + printf("\n--- B2: Full pipeline (producer -> mpsc -> flush -> pop) ---\n"); + bench_mpsc_pipeline(1); + bench_mpsc_pipeline(2); + bench_mpsc_pipeline(4); + bench_mpsc_pipeline(8); + + printf("\n"); + printf("=== Baseline: Normal bthread scheduling path ===\n\n"); + + printf("--- Baseline: _rq push+pop (local worker, WSQ) ---\n"); + bench_baseline_rq(); + + printf("\n--- Baseline: _remote_rq push+pop (non-worker, mutex+BoundedQueue) ---\n"); + bench_baseline_remote_rq_single(); + bench_baseline_remote_rq_multi(2); + bench_baseline_remote_rq_multi(4); + bench_baseline_remote_rq_multi(8); + + printf("\n--- Baseline: _rq steal (non-owner steals from worker's WSQ) ---\n"); + bench_baseline_rq_steal(1); + bench_baseline_rq_steal(2); + bench_baseline_rq_steal(4); + bench_baseline_rq_steal(8); + + return 0; +} diff --git a/test/bthread_priority_queue_owner_unittest.cpp b/test/bthread_priority_queue_owner_unittest.cpp new file mode 100644 index 0000000000..af69b47c18 --- /dev/null +++ b/test/bthread_priority_queue_owner_unittest.cpp @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Tests for B2 priority queue owner dynamic changes: +// 1. New workers bind to available shards correctly +// 2. Priority tasks survive concurrency increases +// 3. Stress: concurrent priority tasks + concurrency scaling + +#include +#include +#include +#include +#include +#include +#include +#include "bthread/bthread.h" + +namespace { + +std::atomic g_count(0); +std::mutex g_mu; +std::set g_ids; + +void reset() { + g_count.store(0); + std::lock_guard lk(g_mu); + g_ids.clear(); +} + +struct Arg { + int id; + int sleep_us; // simulate work +}; + +void* priority_fn(void* a) { + Arg* arg = static_cast(a); + if (arg->sleep_us > 0) { + bthread_usleep(arg->sleep_us); + } + g_count.fetch_add(1, std::memory_order_relaxed); + { + std::lock_guard lk(g_mu); + g_ids.insert(arg->id); + } + delete arg; + return NULL; +} + +class OwnerDynamicTest : public ::testing::Test { +protected: + void SetUp() override { + reset(); + } +}; + +// Test 1: Add workers while priority tasks are in-flight. +// New workers should bind to available priority shards and help process tasks. +TEST_F(OwnerDynamicTest, add_workers_during_priority_tasks) { + int initial_concurrency = bthread_getconcurrency(); + + const int N = 500; + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids; + tids.reserve(N); + + // Submit first batch of priority tasks (slow tasks to keep them in-flight) + for (int i = 0; i < N / 2; ++i) { + bthread_t tid; + Arg* arg = new Arg{i, 1000}; // 1ms sleep each + ASSERT_EQ(0, bthread_start_background(&tid, &attr, priority_fn, arg)); + tids.push_back(tid); + } + + // Increase concurrency — this creates new workers that should + // bind to priority shards + int new_concurrency = initial_concurrency + 4; + if (new_concurrency <= 1024) { + bthread_setconcurrency(new_concurrency); + } + + // Submit second batch + for (int i = N / 2; i < N; ++i) { + bthread_t tid; + Arg* arg = new Arg{i, 500}; // 0.5ms sleep + ASSERT_EQ(0, bthread_start_background(&tid, &attr, priority_fn, arg)); + tids.push_back(tid); + } + + // Join all + for (auto tid : tids) { + bthread_join(tid, NULL); + } + + EXPECT_EQ(N, g_count.load()); + { + std::lock_guard lk(g_mu); + EXPECT_EQ((size_t)N, g_ids.size()); + for (int i = 0; i < N; ++i) { + EXPECT_TRUE(g_ids.count(i)) << "Missing task " << i; + } + } +} + +// Test 2: Rapidly submit priority tasks while scaling concurrency up. +// Tests that no tasks are lost during owner binding transitions. +TEST_F(OwnerDynamicTest, rapid_submit_with_scaling) { + int cur = bthread_getconcurrency(); + + const int ROUNDS = 5; + const int TASKS_PER_ROUND = 100; + const int TOTAL = ROUNDS * TASKS_PER_ROUND; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector all_tids; + all_tids.reserve(TOTAL); + + for (int r = 0; r < ROUNDS; ++r) { + // Submit a batch of priority tasks + for (int i = 0; i < TASKS_PER_ROUND; ++i) { + bthread_t tid; + int id = r * TASKS_PER_ROUND + i; + Arg* arg = new Arg{id, 200}; + ASSERT_EQ(0, bthread_start_background(&tid, &attr, priority_fn, arg)); + all_tids.push_back(tid); + } + + // Try to scale up (will silently fail if already at max) + int next = cur + 2; + if (next <= 1024) { + bthread_setconcurrency(next); + cur = bthread_getconcurrency(); + } + } + + for (auto tid : all_tids) { + bthread_join(tid, NULL); + } + + EXPECT_EQ(TOTAL, g_count.load()); + { + std::lock_guard lk(g_mu); + EXPECT_EQ((size_t)TOTAL, g_ids.size()); + } +} + +// Test 3: Mixed priority and normal tasks with concurrent worker scaling. +// Producer threads submit tasks while main thread scales concurrency. +TEST_F(OwnerDynamicTest, concurrent_submit_and_scale) { + const int N_PRODUCERS = 3; + const int TASKS_PER_PRODUCER = 200; + const int TOTAL = N_PRODUCERS * TASKS_PER_PRODUCER; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::atomic started(0); + std::vector all_tids[N_PRODUCERS]; + std::vector producers; + + for (int p = 0; p < N_PRODUCERS; ++p) { + all_tids[p].resize(TASKS_PER_PRODUCER); + producers.emplace_back([&, p]() { + started.fetch_add(1); + while (started.load() < N_PRODUCERS) { + sched_yield(); + } + for (int i = 0; i < TASKS_PER_PRODUCER; ++i) { + int id = p * TASKS_PER_PRODUCER + i; + Arg* arg = new Arg{id, 100}; + bthread_start_background(&all_tids[p][i], &attr, priority_fn, arg); + } + }); + } + + // Concurrently scale workers while producers are running + int cur = bthread_getconcurrency(); + for (int i = 0; i < 3; ++i) { + usleep(5000); // 5ms + int next = cur + 2; + if (next <= 1024) { + bthread_setconcurrency(next); + cur = bthread_getconcurrency(); + } + } + + for (auto& t : producers) { + t.join(); + } + + // Join all bthreads + for (int p = 0; p < N_PRODUCERS; ++p) { + for (auto tid : all_tids[p]) { + bthread_join(tid, NULL); + } + } + + EXPECT_EQ(TOTAL, g_count.load()); + { + std::lock_guard lk(g_mu); + EXPECT_EQ((size_t)TOTAL, g_ids.size()); + } +} + +// Test 4: Stress test — high volume of priority tasks with scaling. +TEST_F(OwnerDynamicTest, stress_priority_with_scaling) { + const int N = 3000; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids(N); + int cur = bthread_getconcurrency(); + + for (int i = 0; i < N; ++i) { + Arg* arg = new Arg{i, 0}; // no sleep, pure throughput + ASSERT_EQ(0, bthread_start_background(&tids[i], &attr, priority_fn, arg)); + + // Scale up every 500 tasks + if (i % 500 == 499) { + int next = cur + 2; + if (next <= 1024) { + bthread_setconcurrency(next); + cur = bthread_getconcurrency(); + } + } + } + + for (int i = 0; i < N; ++i) { + bthread_join(tids[i], NULL); + } + + EXPECT_EQ(N, g_count.load()); + { + std::lock_guard lk(g_mu); + EXPECT_EQ((size_t)N, g_ids.size()); + } +} + +} // namespace + +int main(int argc, char* argv[]) { + testing::InitGoogleTest(&argc, argv); + google::SetCommandLineOption("enable_bthread_priority_queue", "true"); + google::SetCommandLineOption("priority_queue_shards", "4"); + return RUN_ALL_TESTS(); +} diff --git a/test/bthread_priority_queue_unittest.cpp b/test/bthread_priority_queue_unittest.cpp new file mode 100644 index 0000000000..c87a1eec12 --- /dev/null +++ b/test/bthread_priority_queue_unittest.cpp @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include "bthread/bthread.h" +#include "bthread/processor.h" + +namespace { + +// Counter incremented by priority bthreads to verify execution +std::atomic g_priority_count(0); +// Mutex + set for collecting executed tids to verify no loss +std::mutex g_tid_mutex; +std::set g_executed_ids; + +void reset_globals() { + g_priority_count.store(0); + std::lock_guard lk(g_tid_mutex); + g_executed_ids.clear(); +} + +struct TaskArg { + int id; +}; + +void* priority_task_fn(void* arg) { + TaskArg* ta = static_cast(arg); + g_priority_count.fetch_add(1, std::memory_order_relaxed); + { + std::lock_guard lk(g_tid_mutex); + g_executed_ids.insert(ta->id); + } + delete ta; + return NULL; +} + +void* normal_task_fn(void* /*arg*/) { + // Just a normal task that does nothing, used as a filler + bthread_usleep(1000); + return NULL; +} + +class PriorityQueueTest : public ::testing::Test { +protected: + void SetUp() override { + reset_globals(); + } +}; + +// Test 1: End-to-end priority task submission and execution. +// Multiple producers submit priority tasks, verify all tasks are executed. +TEST_F(PriorityQueueTest, e2e_priority_tasks_all_executed) { + const int N = 200; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids(N); + for (int i = 0; i < N; ++i) { + TaskArg* arg = new TaskArg{i}; + ASSERT_EQ(0, bthread_start_background(&tids[i], &attr, + priority_task_fn, arg)); + } + + for (int i = 0; i < N; ++i) { + bthread_join(tids[i], NULL); + } + + ASSERT_EQ(N, g_priority_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)N, g_executed_ids.size()); + for (int i = 0; i < N; ++i) { + ASSERT_TRUE(g_executed_ids.count(i)) << "Missing task id=" << i; + } +} + +// Test 2: Mix of priority and normal tasks, all complete correctly. +TEST_F(PriorityQueueTest, mixed_priority_and_normal_tasks) { + const int N_PRIORITY = 100; + const int N_NORMAL = 100; + + bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL; + priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids; + tids.reserve(N_PRIORITY + N_NORMAL); + + for (int i = 0; i < N_PRIORITY + N_NORMAL; ++i) { + bthread_t tid; + if (i % 2 == 0 && (i / 2) < N_PRIORITY) { + // Priority task + TaskArg* arg = new TaskArg{i / 2}; + ASSERT_EQ(0, bthread_start_background(&tid, &priority_attr, + priority_task_fn, arg)); + } else { + // Normal task + ASSERT_EQ(0, bthread_start_background(&tid, NULL, + normal_task_fn, NULL)); + } + tids.push_back(tid); + } + + for (auto tid : tids) { + bthread_join(tid, NULL); + } + + ASSERT_EQ(N_PRIORITY, g_priority_count.load()); +} + +// Test 3: Concurrent producers submitting priority tasks from multiple pthreads. +// Simulates multiple event dispatchers pushing to the priority queue. +TEST_F(PriorityQueueTest, concurrent_producers_no_task_loss) { + const int NUM_PRODUCERS = 4; + const int TASKS_PER_PRODUCER = 50; + const int TOTAL = NUM_PRODUCERS * TASKS_PER_PRODUCER; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::atomic started(0); + std::vector producers(NUM_PRODUCERS); + std::vector> all_tids(NUM_PRODUCERS); + + struct ProducerArg { + int producer_id; + int tasks_per_producer; + bthread_attr_t* attr; + std::vector* tids; + std::atomic* started; + }; + + auto producer_fn = [](void* arg) -> void* { + ProducerArg* pa = static_cast(arg); + pa->started->fetch_add(1); + // Spin until all producers are ready + while (pa->started->load() < 4) { + cpu_relax(); + } + pa->tids->resize(pa->tasks_per_producer); + for (int i = 0; i < pa->tasks_per_producer; ++i) { + int id = pa->producer_id * pa->tasks_per_producer + i; + TaskArg* ta = new TaskArg{id}; + int rc = bthread_start_background(&(*pa->tids)[i], pa->attr, + priority_task_fn, ta); + EXPECT_EQ(0, rc); + } + return NULL; + }; + + std::vector pargs(NUM_PRODUCERS); + for (int i = 0; i < NUM_PRODUCERS; ++i) { + pargs[i] = {i, TASKS_PER_PRODUCER, &attr, &all_tids[i], &started}; + ASSERT_EQ(0, pthread_create(&producers[i], NULL, producer_fn, &pargs[i])); + } + + for (int i = 0; i < NUM_PRODUCERS; ++i) { + pthread_join(producers[i], NULL); + } + + // Join all bthreads + for (int i = 0; i < NUM_PRODUCERS; ++i) { + for (auto tid : all_tids[i]) { + bthread_join(tid, NULL); + } + } + + ASSERT_EQ(TOTAL, g_priority_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)TOTAL, g_executed_ids.size()); +} + +// Test 4: Priority tasks submitted with only 1 shard (degenerate case). +// Verifies correctness when nshard=1. +TEST_F(PriorityQueueTest, single_shard_correctness) { + // This test relies on FLAGS_priority_queue_shards being set before + // TaskControl init. Since TaskControl is already initialized by the + // time we run, we test with whatever shard count is configured. + // The key verification is no task loss. + const int N = 100; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids(N); + for (int i = 0; i < N; ++i) { + TaskArg* arg = new TaskArg{i}; + ASSERT_EQ(0, bthread_start_background(&tids[i], &attr, + priority_task_fn, arg)); + } + for (int i = 0; i < N; ++i) { + bthread_join(tids[i], NULL); + } + + ASSERT_EQ(N, g_priority_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)N, g_executed_ids.size()); +} + +// Test 5: Stress test with high volume of priority tasks. +TEST_F(PriorityQueueTest, stress_high_volume) { + const int N = 2000; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids(N); + for (int i = 0; i < N; ++i) { + TaskArg* arg = new TaskArg{i}; + ASSERT_EQ(0, bthread_start_background(&tids[i], &attr, + priority_task_fn, arg)); + } + for (int i = 0; i < N; ++i) { + bthread_join(tids[i], NULL); + } + + ASSERT_EQ(N, g_priority_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)N, g_executed_ids.size()); +} + +} // namespace + +int main(int argc, char* argv[]) { + testing::InitGoogleTest(&argc, argv); + // Enable priority queue before any bthread is created (triggers TaskControl init) + google::SetCommandLineOption("enable_bthread_priority_queue", "true"); + google::SetCommandLineOption("priority_queue_shards", "4"); + return RUN_ALL_TESTS(); +} From cdeee5d1c6bf99d2d5f33f3ee011eb4fd1085588 Mon Sep 17 00:00:00 2001 From: "yannan.wyn" Date: Thu, 16 Apr 2026 11:02:30 +0800 Subject: [PATCH 2/3] fix: comments and bazel compile issue --- src/bthread/task_control.cpp | 38 +++++++++++++------ test/BUILD.bazel | 25 ++++++++++++ .../bthread_priority_queue_owner_unittest.cpp | 2 + test/bthread_priority_queue_unittest.cpp | 16 +++----- 4 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index be36a3e863..5d7dacfe52 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -582,13 +582,29 @@ bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { for (size_t i = 0; i < nshard; ++i) { size_t idx = (start + i) % nshard; PriorityShard* shard = shards[idx].get(); - if (shard->owner.load(butil::memory_order_relaxed) == NULL && - !shard->draining.load(butil::memory_order_relaxed)) { - bthread_t salvaged; - if (shard->inbound.Dequeue(salvaged)) { - fallback_enqueue(tag, salvaged); - } + if (shard->owner.load(butil::memory_order_relaxed) != NULL) { + continue; + } + bool expected = false; + // Use CAS on draining to ensure only one thread dequeues at a time, + // preserving the MPSC single-consumer. + if (!shard->draining.compare_exchange_strong( + expected, true, + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + continue; // Already draining + } + // Re-check owner after acquiring draining + // a new owner may have bound between first check and the CAS. + if (shard->owner.load(butil::memory_order_acquire) != NULL) { + shard->draining.store(false, butil::memory_order_release); + continue; + } + bthread_t salvaged; + while (shard->inbound.Dequeue(salvaged)) { + fallback_enqueue(tag, salvaged); } + shard->draining.store(false, butil::memory_order_release); } } @@ -771,8 +787,8 @@ void TaskControl::push_priority_queue(bthread_tag_t tag, bthread_t tid) { } } - // All shards ownerless, fallback to round-robin pick - shards[start]->inbound.Enqueue(tid); + // All shards ownerless, no consumer will drain inbound so just fallback + fallback_enqueue(tag, tid); } void TaskControl::bind_priority_owner(TaskGroup* g, bthread_tag_t tag) { @@ -854,11 +870,11 @@ void TaskControl::fallback_enqueue(bthread_tag_t tag, bthread_t tid) { if (m) { m->attr.flags &= ~BTHREAD_GLOBAL_PRIORITY; } - // Enqueue to a random group's remote_rq, thenormal scheduling path + // Enqueue via ready_to_run_remote which retries on queue-full, + // preventing silent task loss. TaskGroup* g = choose_one_group(tag); if (g) { - g->_remote_rq.push(tid); - signal_task(1, tag); + g->ready_to_run_remote(m); } } diff --git a/test/BUILD.bazel b/test/BUILD.bazel index b68b3fa08a..afdd443c10 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -240,6 +240,9 @@ cc_test( "bthread_butex_multi_tag_unittest.cpp", "bthread_rwlock_unittest.cpp", "bthread_semaphore_unittest.cpp", + # Have custom main() that conflicts with gtest_main + "bthread_priority_queue_unittest.cpp", + "bthread_priority_queue_owner_unittest.cpp", ], ), copts = COPTS, @@ -252,6 +255,28 @@ cc_test( ], ) +cc_test( + name = "bthread_priority_queue_test", + srcs = ["bthread_priority_queue_unittest.cpp"], + copts = COPTS, + deps = [ + ":sstream_workaround", + "//:brpc", + "@com_google_googletest//:gtest", + ], +) + +cc_test( + name = "bthread_priority_queue_owner_test", + srcs = ["bthread_priority_queue_owner_unittest.cpp"], + copts = COPTS, + deps = [ + ":sstream_workaround", + "//:brpc", + "@com_google_googletest//:gtest", + ], +) + cc_test( name = "brpc_prometheus_test", srcs = glob( diff --git a/test/bthread_priority_queue_owner_unittest.cpp b/test/bthread_priority_queue_owner_unittest.cpp index af69b47c18..82fe9c4bf8 100644 --- a/test/bthread_priority_queue_owner_unittest.cpp +++ b/test/bthread_priority_queue_owner_unittest.cpp @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include "bthread/bthread.h" namespace { diff --git a/test/bthread_priority_queue_unittest.cpp b/test/bthread_priority_queue_unittest.cpp index c87a1eec12..bbdfcfcaea 100644 --- a/test/bthread_priority_queue_unittest.cpp +++ b/test/bthread_priority_queue_unittest.cpp @@ -21,8 +21,8 @@ #include #include #include +#include #include "bthread/bthread.h" -#include "bthread/processor.h" namespace { @@ -152,8 +152,8 @@ TEST_F(PriorityQueueTest, concurrent_producers_no_task_loss) { ProducerArg* pa = static_cast(arg); pa->started->fetch_add(1); // Spin until all producers are ready - while (pa->started->load() < 4) { - cpu_relax(); + while (pa->started->load() < NUM_PRODUCERS) { + sched_yield(); } pa->tids->resize(pa->tasks_per_producer); for (int i = 0; i < pa->tasks_per_producer; ++i) { @@ -188,13 +188,9 @@ TEST_F(PriorityQueueTest, concurrent_producers_no_task_loss) { ASSERT_EQ((size_t)TOTAL, g_executed_ids.size()); } -// Test 4: Priority tasks submitted with only 1 shard (degenerate case). -// Verifies correctness when nshard=1. -TEST_F(PriorityQueueTest, single_shard_correctness) { - // This test relies on FLAGS_priority_queue_shards being set before - // TaskControl init. Since TaskControl is already initialized by the - // time we run, we test with whatever shard count is configured. - // The key verification is no task loss. +// Test 4: Priority tasks submitted with configured shard count. +// Verifies correctness with the current shard configuration. +TEST_F(PriorityQueueTest, configured_shards_correctness) { const int N = 100; bthread_attr_t attr = BTHREAD_ATTR_NORMAL; From 10e9b6b13a31c833e874032f015ce083a1ebf18f Mon Sep 17 00:00:00 2001 From: "yannan.wyn" Date: Thu, 16 Apr 2026 13:52:08 +0800 Subject: [PATCH 3/3] ci: re-trigger CI