Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ahash = "0.7.2"
dot = "0.1.4"
dyn-clonable = "0.9.0"
opentelemetry = { version = "0.22.0", features = ["trace", "metrics"] }
num_cpus = "1.11"

[features]
mem = ["pegasus_memory/mem"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ impl<D: Data, T: Debug + Send + 'static> Task for Worker<D, T> {

fn check_ready(&mut self) -> TaskState {
let _g = crate::worker_id::guard(self.id);
if self.is_finished && self.peer_guard.load(Ordering::SeqCst) == 0 {
return TaskState::Finished;
}
if self.check_cancel() {
self.sink.set_cancel_hook(true);
return TaskState::Finished;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::time::Duration;

use pegasus::api::{Collect, IterCondition, Iteration, Map, Sink};
use pegasus::api::{Collect, HasAny, IterCondition, Iteration, Map, Sink};
use pegasus::JobConf;

static CORE_POOL_SIZE: &'static str = "PEGASUS_CORE_POOL_SIZE";

/// test binary merge pipeline;
#[test]
fn timeout_test_01() {
Expand Down Expand Up @@ -133,3 +135,60 @@ fn timeout_resubmit_test() {
result.sort();
assert_eq!(result, [20, 20]);
}

#[test]
fn timeout_caused_by_large_job() {
let core = ::std::env::var(CORE_POOL_SIZE)
.map(|value| {
value
.parse::<usize>()
.unwrap_or_else(|_| num_cpus::get())
})
.unwrap_or_else(|_| num_cpus::get());
let mut conf_1 = JobConf::new("test");
conf_1.time_limit = 5000;
conf_1.set_workers(core as u32);
let mut conf_2 = JobConf::new("block_job");
conf_2.set_workers(core as u32);
let mut results_1 = pegasus::run(conf_1, || {
|input, output| {
let worker_id = input.get_worker_index();
input
.input_from(vec![0u32])?
.map(move |i| {
if worker_id != 0 {
std::thread::sleep(Duration::from_millis(2000));
}
Ok(i)
})?
.any()?
.sink_into(output)
}
})
.expect("submit job failure;");
let _ = pegasus::run(conf_2, || {
|input, output| {
let worker_id = input.get_worker_index();
input
.input_from(vec![0u32])?
.map(|i| {
std::thread::sleep(Duration::from_millis(4000));
Ok(i)
})?
.any()?
.sink_into(output)
}
})
.expect("submit job failure;");
let mut count = 0;
while let Some(result) = results_1.next() {
if let Ok(data) = result {
count += 1;
} else {
let err = result.err().unwrap();
assert_eq!(err.to_string(), "Job is canceled;".to_string());
break;
}
}
assert!(!results_1.is_cancel());
}
Loading