Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions .github/workflows/prerelease.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ jobs:
cmake --build build_artifacts --config RelWithDebInfo
- name: Test C++
run: |
build_artifacts/scripts/fork_union_test_cpp17
build_artifacts/scripts/fork_union_test_cpp20
build_artifacts/fork_union_test_cpp17
build_artifacts/fork_union_test_cpp20

# Rust
- name: Set up Rust
Expand Down Expand Up @@ -94,8 +94,8 @@ jobs:
cmake --build build_artifacts --config RelWithDebInfo
- name: Test C++
run: |
build_artifacts/scripts/fork_union_test_cpp17
build_artifacts/scripts/fork_union_test_cpp20
build_artifacts/fork_union_test_cpp17
build_artifacts/fork_union_test_cpp20

# Rust
- name: Set up Rust
Expand Down Expand Up @@ -125,8 +125,8 @@ jobs:
cmake --build build_artifacts --config RelWithDebInfo
- name: Test C++
run: |
build_artifacts/scripts/fork_union_test_cpp17
build_artifacts/scripts/fork_union_test_cpp20
build_artifacts/fork_union_test_cpp17
build_artifacts/fork_union_test_cpp20

# Rust
- name: Set up Rust
Expand All @@ -152,10 +152,12 @@ jobs:
choco install cmake
cmake -B build_artifacts -D CMAKE_BUILD_TYPE=RelWithDebInfo
cmake --build build_artifacts --config RelWithDebInfo
- name: List build artifacts
run: Get-ChildItem -Recurse .\build_artifacts
- name: Test C++
run: |
.\build_artifacts\scripts\fork_union_test_cpp17.exe
.\build_artifacts\scripts\fork_union_test_cpp20.exe
.\build_artifacts\fork_union_test_cpp17.exe
.\build_artifacts\fork_union_test_cpp20.exe

# Rust
- name: Set up Rust
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
build/
build_debug/
build_release/
Testing/

# Rust build artifacts
Cargo.lock
Expand Down
6 changes: 3 additions & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"type": "cppdbg",
"request": "launch",
"preLaunchTask": "Build: Debug with GDB",
"program": "${workspaceFolder}/build_debug/scripts/fork_union_test_cpp20",
"program": "${workspaceFolder}/build_debug/fork_union_test_cpp20",
"cwd": "${workspaceFolder}",
"args": [],
"setupCommands": [
Expand Down Expand Up @@ -49,7 +49,7 @@
"type": "cppdbg",
"request": "launch",
"preLaunchTask": "Build: Debug with LLDB",
"program": "${workspaceFolder}/build_debug/scripts/fork_union_test_cpp20",
"program": "${workspaceFolder}/build_debug/fork_union_test_cpp20",
"cwd": "${workspaceFolder}",
"args": [],
"setupCommands": [
Expand Down Expand Up @@ -78,7 +78,7 @@
"type": "cppdbg",
"request": "launch",
"preLaunchTask": "Build: Debug with GDB",
"program": "${workspaceFolder}/build_debug/scripts/fork_union_nbody",
"program": "${workspaceFolder}/build_debug/fork_union_nbody",
"cwd": "${workspaceFolder}",
"args": [],
"setupCommands": [
Expand Down
10 changes: 9 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.14)
project(
fork_union
VERSION 2.1.1
DESCRIPTION "OpenMP-style cross-platform fine-grained parallelism library"
DESCRIPTION "Low-latency OpenMP-style NUMA-aware cross-platform fine-grained parallelism library"
LANGUAGES CXX
)

Expand Down Expand Up @@ -34,6 +34,14 @@ set_target_properties(
target_link_libraries(fork_union_dynamic PUBLIC fork_union)
target_link_libraries(fork_union_static PUBLIC fork_union)

# Set the output directory for all executables - on Windows requires more boilerplate:
# https://stackoverflow.com/a/25328001
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_RELEASE ${CMAKE_BINARY_DIR})
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_DEBUG ${CMAKE_BINARY_DIR})
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_RELWITHDEBINFO ${CMAKE_BINARY_DIR})
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_MINSIZEREL ${CMAKE_BINARY_DIR})

# Tests & benchmarking scripts
include(CTest)
if (BUILD_TESTING)
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fork_union"
description = "OpenMP-style cross-platform fine-grained parallelism library"
description = "Low-latency OpenMP-style NUMA-aware cross-platform fine-grained parallelism library"
version = "2.1.1"
edition = "2021"
authors = ["Ash Vardanian"]
Expand Down
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ search_result_t search(std::span<float, dimensions> query) {
fu::spin_mutex_t result_update; // ? Lighter `std::mutex` alternative w/out system calls

auto concurrent_searcher = [&](auto first_prong, std::size_t count) noexcept {
auto [index, _, colocation] = first_prong;
auto [first_index, _, colocation] = first_prong;
auto& vectors = colocation == 0 ? first_half : second_half;
search_result_t thread_local_result;
for (std::size_t task_index = first_index; task_index < first_index + count; ++task_index) {
Expand Down Expand Up @@ -355,6 +355,7 @@ Works in tight loops.
One of the most common parallel workloads is the N-body simulation ¹.
Implementations are available in both C++ and Rust in `scripts/nbody.cpp` and `scripts/nbody.rs`, respectively.
Both are lightweight and involve little logic outside of number-crunching, so both can be easily profiled with `time` and introspected with `perf` Linux tools.
Additional NUMA-aware Search examples are available in `scripts/search.rs`.

---

Expand Down Expand Up @@ -382,8 +383,8 @@ You can rerun those benchmarks with the following commands:
```bash
cmake -B build_release -D CMAKE_BUILD_TYPE=Release
cmake --build build_release --config Release
time NBODY_COUNT=128 NBODY_ITERATIONS=1000000 NBODY_BACKEND=fork_union_static build_release/scripts/fork_union_nbody
time NBODY_COUNT=128 NBODY_ITERATIONS=1000000 NBODY_BACKEND=fork_union_dynamic build_release/scripts/fork_union_nbody
time NBODY_COUNT=128 NBODY_ITERATIONS=1000000 NBODY_BACKEND=fork_union_static build_release/fork_union_nbody
time NBODY_COUNT=128 NBODY_ITERATIONS=1000000 NBODY_BACKEND=fork_union_dynamic build_release/fork_union_nbody
```

## Safety & Logic
Expand Down Expand Up @@ -424,16 +425,16 @@ To run the C++ tests, use CMake:

```bash
cmake -B build_release -D CMAKE_BUILD_TYPE=Release
cmake --build build_release --config Release
ctest -C build_release # run all tests
build_release/scripts/fork_union_nbody # run the benchmarks
cmake --build build_release --config Release -j
ctest --test-dir build_release # run all tests
build_release/fork_union_nbody # run the benchmarks
```

For C++ debug builds, consider using the VS Code debugger presets or the following commands:

```bash
cmake --build build_debug --config Debug # build with Debug symbols
build_debug/scripts/fork_union_test_cpp20 # run a single test executable
build_debug/fork_union_test_cpp20 # run a single test executable
```

To include NUMA, Huge Pages, and other optimizations on Linux, make sure to install dependencies:
Expand All @@ -450,7 +451,7 @@ To build with an alternative compiler, like LLVM Clang, use the following comman
sudo apt-get install libomp-15-dev clang++-15 # OpenMP version must match Clang
cmake -B build_debug -D CMAKE_BUILD_TYPE=Debug -D CMAKE_CXX_COMPILER=clang++-15
cmake --build build_debug --config Debug
build_debug/scripts/fork_union_test_cpp20
build_debug/fork_union_test_cpp20
```

For Rust, use the following command:
Expand Down
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() -> Result<(), cc::Error> {
}

if let Err(e) = build.try_compile("fork_union") {
print!("cargo:warning={}", e);
print!("cargo:warning={e}");
return Err(e);
}

Expand Down
2 changes: 1 addition & 1 deletion c/lib.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* @brief OpenMP-style NUMA-aware cross-platform fine-grained parallelism library.
* @brief Low-latency OpenMP-style NUMA-aware cross-platform fine-grained parallelism library.
* @file lib.cpp
* @author Ash Vardanian
* @date June 27, 2025
Expand Down
2 changes: 1 addition & 1 deletion include/fork_union.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* @brief OpenMP-style NUMA-aware cross-platform fine-grained parallelism library.
* @brief Low-latency OpenMP-style NUMA-aware cross-platform fine-grained parallelism library.
* @file fork_union.h
* @author Ash Vardanian
* @date June 17, 2025
Expand Down
2 changes: 1 addition & 1 deletion include/fork_union.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* @brief OpenMP-style NUMA-aware cross-platform fine-grained parallelism library.
* @brief Low-latency OpenMP-style NUMA-aware cross-platform fine-grained parallelism library.
* @file fork_union.hpp
* @author Ash Vardanian
* @date May 2, 2025
Expand Down
79 changes: 41 additions & 38 deletions rust/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Low-latency OpenMP-style NUMA-aware thread pool for fork-join parallelism.
//! Low-latency OpenMP-style NUMA-aware cross-platform fine-grained parallelism library.
//!
//! Fork Union provides a minimalistic cross-platform thread-pool implementation and Parallel Algorithms,
//! avoiding dynamic memory allocations, exceptions, system calls, and heavy Compare-And-Swap instructions.
Expand Down Expand Up @@ -93,7 +93,7 @@ impl<T, const PAUSE: bool> BasicSpinMutex<T, PAUSE> {
/// let mut guard = mutex.lock();
/// *guard = 42;
/// ```
pub fn lock(&self) -> BasicSpinMutexGuard<T, PAUSE> {
pub fn lock(&self) -> BasicSpinMutexGuard<'_, T, PAUSE> {
while self
.locked
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
Expand Down Expand Up @@ -126,7 +126,7 @@ impl<T, const PAUSE: bool> BasicSpinMutex<T, PAUSE> {
/// println!("Lock is currently held by another thread");
/// };
/// ```
pub fn try_lock(&self) -> Option<BasicSpinMutexGuard<T, PAUSE>> {
pub fn try_lock(&self) -> Option<BasicSpinMutexGuard<'_, T, PAUSE>> {
if self
.locked
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
Expand Down Expand Up @@ -743,7 +743,7 @@ impl ThreadPool {
/// // Work executes when _op is dropped
/// }
/// ```
pub fn for_threads<F>(&mut self, function: F) -> ForThreadsOperation<F>
pub fn for_threads<F>(&mut self, function: F) -> ForThreadsOperation<'_, F>
where
F: Fn(usize, usize) + Sync,
{
Expand Down Expand Up @@ -773,7 +773,7 @@ impl ThreadPool {
/// std::hint::black_box(result); // Prevent optimization
/// });
/// ```
pub fn for_n<F>(&mut self, n: usize, function: F) -> ForNOperation<F>
pub fn for_n<F>(&mut self, n: usize, function: F) -> ForNOperation<'_, F>
where
F: Fn(Prong) + Sync,
{
Expand Down Expand Up @@ -810,7 +810,7 @@ impl ThreadPool {
/// }
/// });
/// ```
pub fn for_n_dynamic<F>(&mut self, n: usize, function: F) -> ForNDynamicOperation<F>
pub fn for_n_dynamic<F>(&mut self, n: usize, function: F) -> ForNDynamicOperation<'_, F>
where
F: Fn(Prong) + Sync,
{
Expand Down Expand Up @@ -853,7 +853,7 @@ impl ThreadPool {
/// prong.thread_index, start_index, start_index + count);
/// });
/// ```
pub fn for_slices<F>(&mut self, n: usize, function: F) -> ForSlicesOperation<F>
pub fn for_slices<F>(&mut self, n: usize, function: F) -> ForSlicesOperation<'_, F>
where
F: Fn(Prong, usize) + Sync,
{
Expand Down Expand Up @@ -2366,32 +2366,38 @@ impl<T> RoundRobinVec<T> {
let threads_in_colocation = pool.count_threads_in(colocation_index);
let thread_local_index = pool.locate_thread_in(thread_index, colocation_index);

if node_len > current_len {
// Growing: construct new elements in parallel
let new_elements = node_len - current_len;
let split = IndexedSplit::new(new_elements, threads_in_colocation);
let range = split.get(thread_local_index);

unsafe {
let ptr = node_vec.as_mut_ptr();
for i in range {
let idx = current_len + i;
core::ptr::write(ptr.add(idx), value.clone());
match node_len.cmp(&current_len) {
std::cmp::Ordering::Greater => {
// Growing: construct new elements in parallel
let new_elements = node_len - current_len;
let split = IndexedSplit::new(new_elements, threads_in_colocation);
let range = split.get(thread_local_index);

unsafe {
let ptr = node_vec.as_mut_ptr();
for i in range {
let idx = current_len + i;
core::ptr::write(ptr.add(idx), value.clone());
}
}
}
} else if node_len < current_len {
// Shrinking: drop elements in parallel
let elements_to_drop = current_len - node_len;
let split = IndexedSplit::new(elements_to_drop, threads_in_colocation);
let range = split.get(thread_local_index);

unsafe {
let ptr = node_vec.as_mut_ptr();
for i in range {
let idx = node_len + i;
core::ptr::drop_in_place(ptr.add(idx));
std::cmp::Ordering::Less => {
// Shrinking: drop elements in parallel
let elements_to_drop = current_len - node_len;
let split = IndexedSplit::new(elements_to_drop, threads_in_colocation);
let range = split.get(thread_local_index);

unsafe {
let ptr = node_vec.as_mut_ptr();
for i in range {
let idx = node_len + i;
core::ptr::drop_in_place(ptr.add(idx));
}
}
}
std::cmp::Ordering::Equal => {
// No change needed
}
}
}
});
Expand Down Expand Up @@ -2428,11 +2434,13 @@ impl<T> SafePtr<T> {
}

/// Accesses the element at the given index.
#[allow(clippy::mut_from_ref)]
pub fn get_mut_at(&self, index: usize) -> &mut T {
unsafe { &mut *self.0.add(index) }
}

/// Accesses the element.
#[allow(clippy::mut_from_ref)]
pub fn get_mut(&self) -> &mut T {
unsafe { &mut *self.0 }
}
Expand Down Expand Up @@ -2778,7 +2786,7 @@ where
/// The remaining chunks have size `floor(tasks / threads)`.
///
/// This ensures optimal load balancing across threads with minimal size variance.
/// See: https://lemire.me/blog/2025/05/22/dividing-an-array-into-fair-sized-chunks/
/// See: <https://lemire.me/blog/2025/05/22/dividing-an-array-into-fair-sized-chunks/>
#[derive(Debug, Clone)]
pub struct IndexedSplit {
quotient: usize,
Expand Down Expand Up @@ -2831,7 +2839,7 @@ mod tests {
#[test]
fn test_capabilities() {
let caps = capabilities_string();
std::println!("Capabilities: {:?}", caps);
std::println!("Capabilities: {caps:?}");
assert!(caps.is_some());
}

Expand All @@ -2843,11 +2851,7 @@ mod tests {
let qos = count_quality_levels();

std::println!(
"Cores: {}, NUMA: {}, Colocations: {}, QoS: {}",
cores,
numa,
colocations,
qos
"Cores: {cores}, NUMA: {numa}, Colocations: {colocations}, QoS: {qos}"
);
assert!(cores > 0);
}
Expand Down Expand Up @@ -2879,8 +2883,7 @@ mod tests {
for (i, flag) in visited.iter().enumerate() {
assert!(
flag.load(Ordering::Relaxed),
"thread {} never reached the callback",
i
"thread {i} never reached the callback"
);
}
}
Expand Down
Loading