diff --git a/CMakeLists.txt b/CMakeLists.txt index 16308c44c..b4ce1684f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -150,6 +150,14 @@ else() set(stdexec_compiler_frontend ${CMAKE_CXX_COMPILER_ID}) endif() +# Build relacy tests by default only for GNU compiler and if tests are enabled +if(${STDEXEC_BUILD_TESTS} AND stdexec_compiler_frontend STREQUAL "GNU") + set(STDEXEC_BUILD_RELACY_TESTS_DEFAULT ON) +else() + set(STDEXEC_BUILD_RELACY_TESTS_DEFAULT OFF) +endif() +option(STDEXEC_BUILD_RELACY_TESTS "Build stdexec relacy tests" ${STDEXEC_BUILD_RELACY_TESTS_DEFAULT}) + set(stdexec_export_targets) # Define the main library diff --git a/include/exec/timed_thread_scheduler.hpp b/include/exec/timed_thread_scheduler.hpp index ac04b0341..fec93bb7c 100644 --- a/include/exec/timed_thread_scheduler.hpp +++ b/include/exec/timed_thread_scheduler.hpp @@ -43,6 +43,9 @@ namespace exec { stop }; + // Default ctor for __intrusive_mpsc_queue's internal stub node + constexpr timed_thread_operation_base() = default; + constexpr timed_thread_operation_base( void (*set_value)(timed_thread_operation_base*) noexcept, command_type command = command_type::schedule) noexcept @@ -50,7 +53,7 @@ namespace exec { , set_value_{set_value} { } - STDEXEC::__std::atomic next_{nullptr}; + STDEXEC::__std::atomic next_{nullptr}; command_type command_; void (*set_value_)(timed_thread_operation_base*) noexcept; }; diff --git a/include/stdexec/__detail/__atomic.hpp b/include/stdexec/__detail/__atomic.hpp index d97946b59..0b94ca77b 100644 --- a/include/stdexec/__detail/__atomic.hpp +++ b/include/stdexec/__detail/__atomic.hpp @@ -61,7 +61,7 @@ namespace STDEXEC::__std { using std::atomic_thread_fence; using std::atomic_signal_fence; -# if __cpp_lib_atomic_ref >= 2018'06L && !defined(STDEXEC_RELACY) +# if __cpp_lib_atomic_ref >= 2018'06L using std::atomic_ref; # else inline constexpr int __atomic_flag_map[] = { diff --git a/include/stdexec/__detail/__intrusive_mpsc_queue.hpp b/include/stdexec/__detail/__intrusive_mpsc_queue.hpp index fb89f618c..e349e3b3c 100644 --- a/include/stdexec/__detail/__intrusive_mpsc_queue.hpp +++ b/include/stdexec/__detail/__intrusive_mpsc_queue.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) Dmitiy V'jukov + * Copyright (c) Dmitry V'jukov * Copyright (c) 2024 Maikel Nadolski * Copyright (c) 2024 NVIDIA Corporation * @@ -24,59 +24,78 @@ #include "__atomic.hpp" +#include "stdexec/__detail/__config.hpp" + #include "./__spin_loop_pause.hpp" namespace STDEXEC { template class __intrusive_mpsc_queue; - template _Node::* _Next> + // _Node must be default_initializable only for the queue to construct an + // internal "stub" node - only the _Next data element is accessed internally. + template _Node::* _Next> + requires __std::default_initializable<_Node> class __intrusive_mpsc_queue<_Next> { - __std::atomic __back_{&__nil_}; - void* __front_{&__nil_}; - __std::atomic<_Node*> __nil_ = nullptr; - constexpr void push_back_nil() { - __nil_.store(nullptr, __std::memory_order_relaxed); - auto* __prev = static_cast<_Node*>(__back_.exchange(&__nil_, __std::memory_order_acq_rel)); - (__prev->*_Next).store(&__nil_, __std::memory_order_release); - } + __std::atomic<_Node*> __head_{&__stub_}; + _Node* __tail_{&__stub_}; + _Node __stub_{}; public: + + __intrusive_mpsc_queue() { + (__stub_.*_Next).store(nullptr, __std::memory_order_release); + } + constexpr auto push_back(_Node* __new_node) noexcept -> bool { (__new_node->*_Next).store(nullptr, __std::memory_order_relaxed); - void* __prev_back = __back_.exchange(__new_node, __std::memory_order_acq_rel); - bool __is_nil = __prev_back == static_cast(&__nil_); - if (__is_nil) { - __nil_.store(__new_node, __std::memory_order_release); - } else { - (static_cast<_Node*>(__prev_back)->*_Next).store(__new_node, __std::memory_order_release); - } - return __is_nil; + _Node* __prev = __head_.exchange(__new_node, __std::memory_order_acq_rel); + (__prev->*_Next).store(__new_node, __std::memory_order_release); + return __prev == &__stub_; } constexpr auto pop_front() noexcept -> _Node* { - if (__front_ == static_cast(&__nil_)) { - _Node* __next = __nil_.load(__std::memory_order_acquire); - if (!__next) { - return nullptr; + _Node* __tail = this->__tail_; + STDEXEC_ASSERT(__tail != nullptr); + _Node* __next = (__tail->*_Next).load(__std::memory_order_acquire); + // If tail is pointing to the stub node we need to advance it once more + if (&__stub_ == __tail) { + if (nullptr == __next) { + return nullptr; } - __front_ = __next; + this->__tail_ = __next; + __tail = __next; + __next = (__next->*_Next).load(__std::memory_order_acquire); + } + // Normal case: there is a next node and we can just advance the tail + if (nullptr != __next) { + this->__tail_ = __next; + return __tail; } - auto* __front = static_cast<_Node*>(__front_); - void* __next = (__front->*_Next).load(__std::memory_order_acquire); - if (__next) { - __front_ = __next; - return __front; + // Next is nullptr here means that either: + // 1) There are no more nodes in the queue + // 2) A producer is in the middle of adding a new node + const _Node* __head = this->__head_.load(__std::memory_order_acquire); + // A producer is in the middle of adding a new node + // we cannot return tail as we cannot link the next node yet + if (__tail != __head) { + return nullptr; } - STDEXEC_ASSERT(!__next); - push_back_nil(); - do { - __spin_loop_pause(); - __next = (__front->*_Next).load(__std::memory_order_acquire); - } while (!__next); - __front_ = __next; - return __front; + // No more nodes in the queue - we need to insert a stub node + // to be able to link to an eventual empty state (or new nodes) + push_back(&__stub_); + // Now re-attempt to load next + __next = (__tail->*_Next).load(__std::memory_order_acquire); + if (nullptr != __next) { + // Successfully linked either a new node or the stub node + this->__tail_ = __next; + return __tail; + } + // A producer is in the middle of adding a new node since next is still nullptr + // and not our stub node, thus we cannot link the next node yet + return nullptr; } }; -} // namespace STDEXEC \ No newline at end of file + +} // namespace STDEXEC diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a9194d69c..6bbbd846d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -62,6 +62,7 @@ set(stdexec_test_sources stdexec/algos/other/test_execute.cpp stdexec/detail/test_completion_signatures.cpp stdexec/detail/test_utility.cpp + stdexec/detail/test_intrusive_mpsc_queue.cpp stdexec/schedulers/test_task_scheduler.cpp stdexec/queries/test_env.cpp stdexec/queries/test_get_forward_progress_guarantee.cpp @@ -137,6 +138,10 @@ icm_add_build_failure_test( FOLDER test ) +if(STDEXEC_BUILD_RELACY_TESTS) + add_subdirectory(rrd) +endif() + # # Adding multiple tests with a glob # icm_glob_build_failure_tests( # PATTERN *_fail*.cpp diff --git a/test/rrd/CMakeLists.txt b/test/rrd/CMakeLists.txt new file mode 100644 index 000000000..74b7b39cc --- /dev/null +++ b/test/rrd/CMakeLists.txt @@ -0,0 +1,44 @@ +include(FetchContent) + +FetchContent_Declare( + relacy + GIT_REPOSITORY https://github.com/dvyukov/relacy + GIT_TAG master +) + +FetchContent_Populate(relacy) + +add_library(relacy INTERFACE) +target_include_directories(relacy INTERFACE + $ + $ + $ +) + +function(add_relacy_test target_name) + add_executable(${target_name} ${target_name}.cpp) + + target_link_libraries(${target_name} PRIVATE relacy) + target_include_directories(${target_name} PRIVATE ../../include) + target_include_directories(${target_name} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) + set_target_properties(${target_name} PROPERTIES + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF) + + add_test(NAME relacy-${target_name} COMMAND ${target_name}) +endfunction() + +set(relacy_tests + async_scope + intrusive_mpsc_queue + split + sync_wait +) + +foreach(test ${relacy_tests}) + add_relacy_test(${test}) +endforeach() + +# Target to build all relacy tests +add_custom_target(relacy-tests DEPENDS ${relacy_tests}) diff --git a/test/rrd/Makefile b/test/rrd/Makefile deleted file mode 100644 index d68321426..000000000 --- a/test/rrd/Makefile +++ /dev/null @@ -1,43 +0,0 @@ -# User-customizable variables: -CXX ?= c++ -CXX_STD ?= c++20 -CXXFLAGS ?= -DSTDEXEC_RELACY -I relacy -I relacy/relacy/fakestd -O1 -std=$(CXX_STD) -I ../../include -I ../../test -g -DEPFLAGS ?= -MD -MF $(@).d -MP -MT $(@) -build_dir = build - -.SECONDARY: - -test_programs = split async_scope sync_wait - -test_exe_files = $(foreach name,$(test_programs),$(build_dir)/$(name)) - -exe_files = $(test_exe_files) -o_files = $(exe_files:=.cpp.o) - -ansi_term_csi = [ -ansi_term_bold = $(ansi_term_csi)1m -ansi_term_green = $(ansi_term_csi)32m -ansi_term_red = $(ansi_term_csi)31m -ansi_term_reset = $(ansi_term_csi)m - -COMPILE.cpp = $(CXX) $(DEPFLAGS) $(CXXFLAGS) -c -LINK.cpp = $(CXX) $(CXXFLAGS) - -.PHONY: all -all: tests - -.PHONY: tests -tests: $(test_exe_files) - -$(build_dir)/%: $(build_dir)/%.cpp.o - $(LINK.cpp) $(^) -o $(@) - -$(build_dir)/%.cpp.o: %.cpp - @mkdir -p $(dir $(@)) - $(COMPILE.cpp) -o $(@) $(<) - -.PHONY: clean -clean: - rm -fr -- $(build_dir)/ - --include $(o_files:=.d) diff --git a/test/rrd/README.md b/test/rrd/README.md index 45d299028..799d343e4 100644 --- a/test/rrd/README.md +++ b/test/rrd/README.md @@ -15,13 +15,20 @@ STDEXEC library could needs to use `x.fetch_add(1)` to be compatible with Relacy ## Instructions -Run the following commands from within this directory (`./tests/rrd`). +Configure and build stdexec following the build instructions in the top level +[README.md](../../README.md). There are a couple relacy specific build and ctest +targets, though they are part of the standard build and ctest and will be run +automatically if cmake is configured with `-DSTDEXEC_BUILD_RELACY_TESTS=1`. +`STDEXEC_BUILD_RELACY_TESTS` is set by default for GCC today. + +Run the following on a Linux machine with GCC as the toolchain. ``` -git clone -b STDEXEC https://github.com/dvyukov/relacy -CXX=g++-11 make -j 4 -./build/split -./build/async_scope +mkdir build && cd build +cmake .. +make relacy-tests -j 4 +ctest -R relacy # Run all relacy tests +./test/rrd/sync_wait # Run a specific relacy test directly ``` ## Recommended use @@ -35,8 +42,16 @@ out a more stable build on all environments/compilers, we should revisit this. ## Supported platforms The STDEXEC Relacy tests have been verified to build and run on - * Linux based GCC+11 with libstdc++ (`x86_64`) - * Mac with Apple Clang 15 with libc++ (`x86_64`) + * Linux based GCC+11-14 with libstdc++ (`x86_64`) + * Mac with Apple Clang 15 and 17 with libc++ (`x86_64`) + +## Caveat + +Relacy relies on a less than robust approach to implement its runtime: it replaces +std:: names with its own versions, for example, std::atomic and std::mutex, as well +as pthread_* APIs. As libstdc++/libc++ evolve, newer versions may not be compatible with +Relacy. In these cases, changes to Relacy are needed to correctly intercept and replace +std:: names. -G++12 and newer are known to have issues that could be addressed with patches -to Relacy. +When the compilers and standard libraries release new versions, we will need to test the +new versions can compile the stdexec Relacy tests before enabling the new compiler. diff --git a/test/rrd/async_scope.cpp b/test/rrd/async_scope.cpp index e471a5769..f21bc44f0 100644 --- a/test/rrd/async_scope.cpp +++ b/test/rrd/async_scope.cpp @@ -1,17 +1,26 @@ -#include "../../relacy/relacy_cli.hpp" -#include "../../relacy/relacy_std.hpp" +/* + * Copyright (c) 2025 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include #include #include #include #include -#include - -using rl::nvar; -using rl::nvolatile; -using rl::mutex; - namespace ex = STDEXEC; using exec::async_scope; diff --git a/test/rrd/intrusive_mpsc_queue.cpp b/test/rrd/intrusive_mpsc_queue.cpp new file mode 100644 index 000000000..0646d965b --- /dev/null +++ b/test/rrd/intrusive_mpsc_queue.cpp @@ -0,0 +1,358 @@ +/* + * Copyright (c) 2025 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +struct test_node { + std::atomic next_{nullptr}; + int value_{0}; + + test_node() = default; + explicit test_node(int val) : value_(val) {} +}; + +using test_queue = STDEXEC::__intrusive_mpsc_queue<&test_node::next_>; + +struct mpsc_single_producer_consumer : rl::test_suite { + test_queue queue; + test_node node1{42}; + test_node node2{100}; + int consumed_count{0}; + int values_sum{0}; + + void thread(unsigned thread_id) { + if (thread_id == 0) { + queue.push_back(&node1); + queue.push_back(&node2); + } else { + while (consumed_count < 2) { + test_node* node = queue.pop_front(); + if (node) { + values_sum += node->value_; + ++consumed_count; + } + } + } + } + + void after() { + RL_ASSERT(consumed_count == 2); + RL_ASSERT(values_sum == 142); // 42 + 100 + } +}; + +struct mpsc_two_producers : rl::test_suite { + test_queue queue; + test_node nodes[4] = {test_node{1}, test_node{2}, test_node{3}, test_node{4}}; + int consumed_count{0}; + bool seen[4]; + + void before() { + for (int i = 0; i < 4; ++i) { + seen[i] = false; + } + } + + void thread(unsigned thread_id) { + if (thread_id == 0) { + // Producer 1 + queue.push_back(&nodes[0]); + queue.push_back(&nodes[1]); + } else if (thread_id == 1) { + // Producer 2 + queue.push_back(&nodes[2]); + queue.push_back(&nodes[3]); + } else { + // Consumer + while (consumed_count < 4) { + test_node* node = queue.pop_front(); + if (node) { + int idx = node->value_ - 1; + RL_ASSERT(idx >= 0 && idx < 4); + bool was_seen = std::exchange(seen[idx], true); + RL_ASSERT(!was_seen); // Each node should be seen exactly once + ++consumed_count; + } + } + } + } + + void after() { + RL_ASSERT(consumed_count == 4); + for (int i = 0; i < 4; ++i) { + RL_ASSERT(seen[i]); + } + } +}; + +struct mpsc_push_return_value : rl::test_suite { + test_queue queue; + test_node node1{1}; + test_node node2{2}; + test_node node3{3}; + + void thread(unsigned thread_id) { + RL_ASSERT(queue.push_back(&node1)); + RL_ASSERT(!queue.push_back(&node2)); + RL_ASSERT(!queue.push_back(&node3)); + + queue.pop_front(); + RL_ASSERT(!queue.push_back(&node1)); + + queue.pop_front(); + queue.pop_front(); + queue.pop_front(); + + RL_ASSERT(queue.push_back(&node1)); + RL_ASSERT(!queue.push_back(&node2)); + RL_ASSERT(!queue.push_back(&node3)); + } +}; + +struct mpsc_fifo_order : rl::test_suite { + test_queue queue; + test_node nodes[3] = {test_node{1}, test_node{2}, test_node{3}}; + int order[3]; + int consumed_count{0}; + + void before() { + for (int i = 0; i < 3; ++i) { + order[i] = -1; + } + } + + void thread(unsigned thread_id) { + if (thread_id == 0) { + queue.push_back(&nodes[0]); + queue.push_back(&nodes[1]); + queue.push_back(&nodes[2]); + } else { + int pop_order = 0; + while (consumed_count < 3) { + test_node* node = queue.pop_front(); + if (node) { + int idx = node->value_ - 1; + order[idx] = pop_order++; + ++consumed_count; + } + } + } + } + + void after() { + RL_ASSERT(consumed_count == 3); + RL_ASSERT(order[0] == 0); + RL_ASSERT(order[1] == 1); + RL_ASSERT(order[2] == 2); + } +}; + +struct mpsc_pop_from_empty_never_returns_node : rl::test_suite { + test_queue queue; + test_node node{99}; + std::atomic pushed{false}; + + void thread(unsigned thread_id) { + if (thread_id == 0) { + queue.push_back(&node); + pushed.store(true); + } else { + while (!pushed.load()); + + test_node* node = queue.pop_front(); + RL_ASSERT(node->value_ == 99); + + for (int i = 0; i != 10; ++i) { + node = queue.pop_front(); + RL_ASSERT(node == nullptr); + } + } + } +}; + +struct mpsc_five_prod_one_cons : rl::test_suite { + test_queue queue; + test_node nodes[10] = { + // Producer 0: values 0-1 + test_node{0}, test_node{1}, + // Producer 1: values 10000-10001 + test_node{10000}, test_node{10001}, + // Producer 2: values 20000-20001 + test_node{20000}, test_node{20001}, + // Producer 3: values 30000-30001 + test_node{30000}, test_node{30001}, + // Producer 4: values 40000-40001 + test_node{40000}, test_node{40001} + }; + int consumed_count{0}; + bool seen[10]; + + void before() { + for (int i = 0; i < 10; ++i) { + seen[i] = false; + } + } + + void thread(unsigned thread_id) { + if (thread_id < 5) { + // Producer threads (0-4) + int base_idx = thread_id * 2; + queue.push_back(&nodes[base_idx]); + queue.push_back(&nodes[base_idx + 1]); + } else { + // Consumer thread (5) + while (consumed_count < 10) { + test_node* node = queue.pop_front(); + if (node) { + // Map value to index + int idx; + if (node->value_ < 10000) { + idx = node->value_; // 0 or 1 + } else { + int producer_id = node->value_ / 10000; + int item_in_producer = node->value_ % 10000; + idx = producer_id * 2 + item_in_producer; + } + RL_ASSERT(idx >= 0 && idx < 10); + bool was_seen = std::exchange(seen[idx], true); + RL_ASSERT(!was_seen); // Each node should be seen exactly once + ++consumed_count; + } + } + } + } + + void after() { + RL_ASSERT(consumed_count == 10); + for (int i = 0; i < 10; ++i) { + RL_ASSERT(seen[i]); + } + } +}; + +struct mpsc_five_producers_ordered : rl::test_suite { + static constexpr int ITEMS_PER_PRODUCER = 100; + static constexpr int NUM_PRODUCERS = 5; + static constexpr int TOTAL_ITEMS = ITEMS_PER_PRODUCER * NUM_PRODUCERS; + + test_queue queue; + test_node nodes[TOTAL_ITEMS]; + int consumed_count{0}; + int consumed_values[TOTAL_ITEMS]; + + void before() { + for (int i = 0; i < TOTAL_ITEMS; ++i) { + consumed_values[i] = -1; + // Initialize nodes with their values + // Producer 0: 1-100, Producer 1: 101-200, etc. + nodes[i].value_ = i + 1; + } + } + + void thread(unsigned thread_id) { + if (thread_id < NUM_PRODUCERS) { + int start_idx = thread_id * ITEMS_PER_PRODUCER; + for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) { + queue.push_back(&nodes[start_idx + i]); + } + } else { + // int count = 0; + while (consumed_count < TOTAL_ITEMS) { + test_node* node = queue.pop_front(); + if (node) { + consumed_values[consumed_count] = node->value_; + ++consumed_count; + } + } + } + } + + void after() { + RL_ASSERT(consumed_count == TOTAL_ITEMS); + + // Check that each value appears exactly once + bool seen[TOTAL_ITEMS + 1] = {false}; // values are 1-500, so need 501 elements + for (int i = 0; i < TOTAL_ITEMS; ++i) { + int value = consumed_values[i]; + RL_ASSERT(value >= 1 && value <= TOTAL_ITEMS); + RL_ASSERT(!seen[value]); // Each value should appear exactly once + seen[value] = true; + } + + // Group consumed values into 5 arrays based on their range + int range_values[NUM_PRODUCERS][ITEMS_PER_PRODUCER]; + int range_counts[NUM_PRODUCERS] = {0}; + + for (int i = 0; i < TOTAL_ITEMS; ++i) { + int value = consumed_values[i]; + // Determine which producer this value belongs to (0-4) + int producer = (value - 1) / ITEMS_PER_PRODUCER; + RL_ASSERT(producer >= 0 && producer < NUM_PRODUCERS); + range_values[producer][range_counts[producer]++] = value; + } + + // Verify each producer contributed exactly ITEMS_PER_PRODUCER items + for (int producer = 0; producer < NUM_PRODUCERS; ++producer) { + RL_ASSERT(range_counts[producer] == ITEMS_PER_PRODUCER); + } + + // Verify each range is in ascending order + for (int producer = 0; producer < NUM_PRODUCERS; ++producer) { + int range_start = producer * ITEMS_PER_PRODUCER + 1; + for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) { + RL_ASSERT(range_values[producer][i] == range_start + i); + } + } + } +}; + +auto main(int argc, char** argv) -> int { + int iterations = argc > 1 ? strtol(argv[1], nullptr, 10) : 250000; + rl::test_params p; + p.iteration_count = iterations; + p.execution_depth_limit = 10000; + p.search_type = rl::random_scheduler_type; + +#define CHECK(x) if (!(x)) { std::cout << "Test " #x " failed\n"; return 1; } + + printf("Running mpsc_single_producer_consumer...\n"); + CHECK(rl::simulate(p)); + + printf("Running mpsc_two_producers...\n"); + CHECK(rl::simulate(p)); + + printf("Running mpsc_push_return_value...\n"); + CHECK(rl::simulate(p)); + + printf("Running mpsc_fifo_order...\n"); + CHECK(rl::simulate(p)); + + printf("Running five_prod_one_cons...\n"); + CHECK(rl::simulate(p)); + + printf("Running mpsc_pop_from_empty_never_returns_node...\n"); + CHECK(rl::simulate(p)); + + // Beefy test... + p.iteration_count = 5000; + printf("Running mpsc_five_producers_ordered...\n"); + CHECK(rl::simulate(p)); + + printf("All tests passed!\n"); + return 0; +} diff --git a/test/rrd/split.cpp b/test/rrd/split.cpp index 00482903d..d699d945f 100644 --- a/test/rrd/split.cpp +++ b/test/rrd/split.cpp @@ -1,13 +1,24 @@ -#include "../../relacy/relacy_cli.hpp" -#include "../../relacy/relacy_std.hpp" +/* + * Copyright (c) 2025 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include #include #include -using rl::nvar; -using rl::nvolatile; -using rl::mutex; - namespace ex = STDEXEC; struct split_bug : rl::test_suite { diff --git a/test/rrd/stdexec_relacy.hpp b/test/rrd/stdexec_relacy.hpp new file mode 100644 index 000000000..3e509bb97 --- /dev/null +++ b/test/rrd/stdexec_relacy.hpp @@ -0,0 +1,7 @@ +#pragma once + +#include "../../relacy/relacy_std.hpp" + +namespace std { + template struct atomic_ref; +} diff --git a/test/rrd/sync_wait.cpp b/test/rrd/sync_wait.cpp index 1d78d9528..242db6665 100644 --- a/test/rrd/sync_wait.cpp +++ b/test/rrd/sync_wait.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "../../relacy/relacy_std.hpp" +#include #include #include diff --git a/test/stdexec/detail/test_intrusive_mpsc_queue.cpp b/test/stdexec/detail/test_intrusive_mpsc_queue.cpp new file mode 100644 index 000000000..ebeac82d2 --- /dev/null +++ b/test/stdexec/detail/test_intrusive_mpsc_queue.cpp @@ -0,0 +1,85 @@ +#include +#include + +#include +#include +#include +#include +#include + +namespace { + + struct test_node { + std::atomic next_{nullptr}; + int value_{0}; + + test_node() = default; + + explicit test_node(int val) + : value_(val) { + } + }; + + using test_queue = STDEXEC::__intrusive_mpsc_queue<&test_node::next_>; + + TEST_CASE( + "intrusive_mpsc_queue with 2 producers and 1 consumer", + "[detail][intrusive_mpsc_queue]") { + test_queue queue; + constexpr int num_items_per_producer = 500; + constexpr int num_producers = 2; + constexpr int total_items = num_items_per_producer * num_producers; + + std::vector> nodes1; + std::vector> nodes2; + + for (int i = 0; i < num_items_per_producer; ++i) { + nodes1.push_back(std::make_unique(i * 1000)); + nodes2.push_back(std::make_unique(i * 1000 + 1)); + } + + std::atomic produced_count{0}; + + std::set consumed_addrs; + + std::jthread producer1([&]() { + for (int i = 0; i < num_items_per_producer; ++i) { + queue.push_back(nodes1[i].get()); + produced_count.fetch_add(1, std::memory_order_relaxed); + } + }); + + std::jthread producer2([&]() { + for (int i = 0; i < num_items_per_producer; ++i) { + queue.push_back(nodes2[i].get()); + produced_count.fetch_add(1, std::memory_order_relaxed); + } + }); + + std::set consumed; + std::jthread consumer([&]() { + int count = 0; + while (count < total_items) { + test_node* node = queue.pop_front(); + if (node) { + consumed.insert(node->value_); + ++count; + } else { + std::this_thread::yield(); + } + } + }); + + producer1.join(); + producer2.join(); + consumer.join(); + + REQUIRE(consumed.size() == total_items); + + for (int i = 0; i < num_items_per_producer; ++i) { + CHECK(consumed.count(i * 1000) == 1); + CHECK(consumed.count(i * 1000 + 1) == 1); + } + } + +} // namespace