Skip to content

Commit f8250fb

Browse files
authored
fix(interactive): Fix bugs of check ready (#4381)
#4380
1 parent 3b0e2f7 commit f8250fb

File tree

3 files changed

+64
-1
lines changed

3 files changed

+64
-1
lines changed

interactive_engine/executor/engine/pegasus/pegasus/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ ahash = "0.7.2"
2727
dot = "0.1.4"
2828
dyn-clonable = "0.9.0"
2929
opentelemetry = { version = "0.22.0", features = ["trace", "metrics"] }
30+
num_cpus = "1.11"
3031

3132
[features]
3233
mem = ["pegasus_memory/mem"]

interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,9 @@ impl<D: Data, T: Debug + Send + 'static> Task for Worker<D, T> {
293293

294294
fn check_ready(&mut self) -> TaskState {
295295
let _g = crate::worker_id::guard(self.id);
296+
if self.is_finished && self.peer_guard.load(Ordering::SeqCst) == 0 {
297+
return TaskState::Finished;
298+
}
296299
if self.check_cancel() {
297300
self.sink.set_cancel_hook(true);
298301
return TaskState::Finished;

interactive_engine/executor/engine/pegasus/pegasus/tests/timeout_test.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
1616
use std::time::Duration;
1717

18-
use pegasus::api::{Collect, IterCondition, Iteration, Map, Sink};
18+
use pegasus::api::{Collect, HasAny, IterCondition, Iteration, Map, Sink};
1919
use pegasus::JobConf;
2020

21+
static CORE_POOL_SIZE: &'static str = "PEGASUS_CORE_POOL_SIZE";
22+
2123
/// test binary merge pipeline;
2224
#[test]
2325
fn timeout_test_01() {
@@ -133,3 +135,60 @@ fn timeout_resubmit_test() {
133135
result.sort();
134136
assert_eq!(result, [20, 20]);
135137
}
138+
139+
#[test]
140+
fn timeout_caused_by_large_job() {
141+
let core = ::std::env::var(CORE_POOL_SIZE)
142+
.map(|value| {
143+
value
144+
.parse::<usize>()
145+
.unwrap_or_else(|_| num_cpus::get())
146+
})
147+
.unwrap_or_else(|_| num_cpus::get());
148+
let mut conf_1 = JobConf::new("test");
149+
conf_1.time_limit = 5000;
150+
conf_1.set_workers(core as u32);
151+
let mut conf_2 = JobConf::new("block_job");
152+
conf_2.set_workers(core as u32);
153+
let mut results_1 = pegasus::run(conf_1, || {
154+
|input, output| {
155+
let worker_id = input.get_worker_index();
156+
input
157+
.input_from(vec![0u32])?
158+
.map(move |i| {
159+
if worker_id != 0 {
160+
std::thread::sleep(Duration::from_millis(2000));
161+
}
162+
Ok(i)
163+
})?
164+
.any()?
165+
.sink_into(output)
166+
}
167+
})
168+
.expect("submit job failure;");
169+
let _ = pegasus::run(conf_2, || {
170+
|input, output| {
171+
let worker_id = input.get_worker_index();
172+
input
173+
.input_from(vec![0u32])?
174+
.map(|i| {
175+
std::thread::sleep(Duration::from_millis(4000));
176+
Ok(i)
177+
})?
178+
.any()?
179+
.sink_into(output)
180+
}
181+
})
182+
.expect("submit job failure;");
183+
let mut count = 0;
184+
while let Some(result) = results_1.next() {
185+
if let Ok(data) = result {
186+
count += 1;
187+
} else {
188+
let err = result.err().unwrap();
189+
assert_eq!(err.to_string(), "Job is canceled;".to_string());
190+
break;
191+
}
192+
}
193+
assert!(!results_1.is_cancel());
194+
}

0 commit comments

Comments
 (0)