Skip to content

Commit 1d1654c

Browse files
authored
Targeted optimizations for BlockSTMv2 (#17336)
1 parent 68747d3 commit 1d1654c

File tree

6 files changed

+119
-28
lines changed

6 files changed

+119
-28
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,7 @@ quote = "1.0.18"
728728
rand = "0.7.3"
729729
rand_core = "0.5.1"
730730
random_word = "0.3.0"
731+
rapidhash = "1.4.0"
731732
rayon = "1.5.2"
732733
redis = { version = "0.22.3", features = [
733734
"tokio-comp",

aptos-move/block-executor/src/executor.rs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -137,24 +137,26 @@ where
137137
}
138138
}
139139

140+
// The bool in the result indicates whether execution result is a speculative abort.
140141
fn process_execution_result<'a>(
141142
execution_result: &'a ExecutionStatus<E::Output, E::Error>,
142143
read_set: &mut CapturedReads<T, ModuleId, CompiledModule, Module, AptosModuleExtension>,
143144
txn_idx: TxnIndex,
144-
) -> Result<Option<&'a E::Output>, PanicError> {
145+
) -> Result<(Option<&'a E::Output>, bool), PanicError> {
145146
match execution_result {
146147
ExecutionStatus::Success(output) | ExecutionStatus::SkipRest(output) => {
147-
Ok(Some(output))
148+
Ok((Some(output), false))
148149
},
149150
ExecutionStatus::SpeculativeExecutionAbortError(_msg) => {
150151
// TODO(BlockSTMv2): cleaner to rename or distinguish V2 early abort
151-
// from DeltaApplicationFailure.
152+
// from DeltaApplicationFailure. This is also why we return the bool
153+
// separately for now instead of relying on the read set.
152154
read_set.capture_delayed_field_read_error(&PanicOr::Or(
153155
MVDelayedFieldsError::DeltaApplicationFailure,
154156
));
155-
Ok(None)
157+
Ok((None, true))
156158
},
157-
ExecutionStatus::Abort(_err) => Ok(None),
159+
ExecutionStatus::Abort(_err) => Ok((None, false)),
158160
ExecutionStatus::DelayedFieldsCodeInvariantError(msg) => {
159161
Err(code_invariant_error(format!(
160162
"[Execution] At txn {}, failed with DelayedFieldsCodeInvariantError: {:?}",
@@ -365,9 +367,19 @@ where
365367
)));
366368
}
367369

368-
let maybe_output =
370+
let (maybe_output, is_speculative_failure) =
369371
Self::process_execution_result(&execution_result, &mut read_set, idx_to_execute)?;
370372

373+
if is_speculative_failure {
374+
// Recording in order to check the invariant that the final, committed incarnation
375+
// of each transaction is not a speculative failure.
376+
last_input_output.record_speculative_failure(idx_to_execute);
377+
// Ignoring module validation requirements since speculative failure
378+
// anyway requires re-execution.
379+
let _ = scheduler.finish_execution(abort_manager)?;
380+
return Ok(());
381+
}
382+
371383
Self::process_delayed_field_output(
372384
maybe_output,
373385
idx_to_execute,
@@ -510,7 +522,7 @@ where
510522
idx_to_execute, incarnation
511523
)));
512524
}
513-
let processed_output =
525+
let (processed_output, _) =
514526
Self::process_execution_result(&execution_result, &mut read_set, idx_to_execute)?;
515527

516528
let mut prev_modified_resource_keys = last_input_output
@@ -666,29 +678,31 @@ where
666678
// 2. The only possible time to take the read-set from txn_last_input_output
667679
// is in prepare_and_queue_commit_ready_txn (applying module publishing output).
668680
// However, required module validation necessarily occurs before the commit.
669-
let read_set = last_input_output.read_set(idx_to_validate).ok_or_else(|| {
670-
code_invariant_error(format!(
671-
"Prior read-set of txn {} incarnation {} not recorded for module verification",
672-
idx_to_validate, incarnation_to_validate
673-
))
674-
})?;
681+
let (read_set, is_speculative_failure) =
682+
last_input_output.read_set(idx_to_validate).ok_or_else(|| {
683+
code_invariant_error(format!(
684+
"Prior read-set of txn {} incarnation {} not recorded for module verification",
685+
idx_to_validate, incarnation_to_validate
686+
))
687+
})?;
675688
// Perform invariant checks or return early based on read set's incarnation.
676689
let blockstm_v2_incarnation = read_set.blockstm_v2_incarnation().ok_or_else(|| {
677690
code_invariant_error(
678691
"BlockSTMv2 must be enabled in CapturedReads when validating module reads",
679692
)
680693
})?;
694+
if blockstm_v2_incarnation > incarnation_to_validate || is_speculative_failure {
695+
// No need to validate as a newer incarnation has already been executed
696+
// and recorded its output, or the incarnation has resulted in a speculative
697+
// failure, which means there will be a further re-execution.
698+
return Ok(true);
699+
}
681700
if blockstm_v2_incarnation < incarnation_to_validate {
682701
return Err(code_invariant_error(format!(
683702
"For txn_idx {}, read set incarnation {} < incarnation to validate {}",
684703
idx_to_validate, blockstm_v2_incarnation, incarnation_to_validate
685704
)));
686705
}
687-
if blockstm_v2_incarnation > incarnation_to_validate {
688-
// No need to validate as a newer incarnation has already been executed
689-
// and recorded its output.
690-
return Ok(true);
691-
}
692706

693707
if !read_set.validate_module_reads(
694708
global_module_cache,
@@ -715,10 +729,14 @@ where
715729
skip_module_reads_validation: bool,
716730
) -> bool {
717731
let _timer = TASK_VALIDATE_SECONDS.start_timer();
718-
let read_set = last_input_output
732+
let (read_set, is_speculative_failure) = last_input_output
719733
.read_set(idx_to_validate)
720734
.expect("[BlockSTM]: Prior read-set must be recorded");
721735

736+
if is_speculative_failure {
737+
return false;
738+
}
739+
722740
assert!(
723741
!read_set.is_incorrect_use(),
724742
"Incorrect use must be handled after execution"
@@ -775,10 +793,14 @@ where
775793
last_input_output: &TxnLastInputOutput<T, E::Output, E::Error>,
776794
is_v2: bool,
777795
) -> Result<bool, PanicError> {
778-
let read_set = last_input_output
796+
let (read_set, is_speculative_failure) = last_input_output
779797
.read_set(txn_idx)
780798
.ok_or_else(|| code_invariant_error("Read set must be recorded"))?;
781799

800+
if is_speculative_failure {
801+
return Ok(false);
802+
}
803+
782804
if !read_set.validate_delayed_field_reads(versioned_cache.delayed_fields(), txn_idx)?
783805
|| (is_v2
784806
&& !read_set.validate_aggregator_v1_reads(

aptos-move/block-executor/src/txn_last_input_output.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ use std::{
3131
collections::{BTreeSet, HashSet},
3232
fmt::Debug,
3333
iter::{empty, Iterator},
34-
sync::Arc,
34+
sync::{
35+
atomic::{AtomicBool, Ordering},
36+
Arc,
37+
},
3538
};
3639

3740
type TxnInput<T> = CapturedReads<T, ModuleId, CompiledModule, Module, AptosModuleExtension>;
@@ -66,6 +69,9 @@ pub struct TxnLastInputOutput<T: Transaction, O: TransactionOutput<Txn = T>, E:
6669

6770
// TODO: Consider breaking down the outputs when storing (avoid traversals, cache below).
6871
outputs: Vec<CachePadded<ArcSwapOption<ExecutionStatus<O, E>>>>, // txn_idx -> output.
72+
// Used to record if the latest incarnation of a txn was a failure due to the
73+
// speculative nature of parallel execution.
74+
speculative_failures: Vec<CachePadded<AtomicBool>>,
6975
}
7076

7177
impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone>
@@ -81,6 +87,9 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone>
8187
outputs: (0..num_txns)
8288
.map(|_| CachePadded::new(ArcSwapOption::empty()))
8389
.collect(),
90+
speculative_failures: (0..num_txns)
91+
.map(|_| CachePadded::new(AtomicBool::new(false)))
92+
.collect(),
8493
}
8594
}
8695

@@ -90,10 +99,15 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone>
9099
input: TxnInput<T>,
91100
output: ExecutionStatus<O, E>,
92101
) {
102+
self.speculative_failures[txn_idx as usize].store(false, Ordering::Relaxed);
93103
self.inputs[txn_idx as usize].store(Some(Arc::new(input)));
94104
self.outputs[txn_idx as usize].store(Some(Arc::new(output)));
95105
}
96106

107+
pub(crate) fn record_speculative_failure(&self, txn_idx: TxnIndex) {
108+
self.speculative_failures[txn_idx as usize].store(true, Ordering::Relaxed);
109+
}
110+
97111
pub fn fetch_exchanged_data(
98112
&self,
99113
key: &T::Key,
@@ -119,8 +133,13 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone>
119133
)
120134
}
121135

122-
pub(crate) fn read_set(&self, txn_idx: TxnIndex) -> Option<Arc<TxnInput<T>>> {
123-
self.inputs[txn_idx as usize].load_full()
136+
// Alongside the latest read set, returns the indicator of whether the latest
137+
// incarnation of the txn resulted in a speculative failure.
138+
pub(crate) fn read_set(&self, txn_idx: TxnIndex) -> Option<(Arc<TxnInput<T>>, bool)> {
139+
let input = self.inputs[txn_idx as usize].load_full()?;
140+
let speculative_failure =
141+
self.speculative_failures[txn_idx as usize].load(Ordering::Relaxed);
142+
Some((input, speculative_failure))
124143
}
125144

126145
// Should be called when txn_idx is committed, while holding commit lock.
@@ -539,7 +558,7 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone>
539558
}
540559

541560
pub(crate) fn get_txn_read_write_summary(&self, txn_idx: TxnIndex) -> ReadWriteSummary<T> {
542-
let read_set = self.read_set(txn_idx).expect("Read set must be recorded");
561+
let read_set = self.read_set(txn_idx).expect("Read set must be recorded").0;
543562

544563
let reads = read_set.get_read_summary();
545564
let writes = self.get_write_summary(txn_idx);

types/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ proptest = { workspace = true, optional = true }
5757
proptest-derive = { workspace = true, optional = true }
5858
quick_cache = { workspace = true }
5959
rand = { workspace = true }
60+
rapidhash = { workspace = true }
6061
rayon = { workspace = true }
6162
ref-cast = { workspace = true }
6263
ring = { workspace = true }

types/src/state_store/state_value.rs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
1010
use bytes::Bytes;
1111
#[cfg(any(test, feature = "fuzzing"))]
1212
use proptest::{arbitrary::Arbitrary, collection::vec, prelude::*};
13+
use rapidhash::rapidhash;
1314
use serde::{Deserialize, Deserializer, Serialize, Serializer};
1415

1516
#[derive(Deserialize, Serialize)]
@@ -178,14 +179,34 @@ impl PersistedStateValue {
178179
}
179180
}
180181

181-
#[derive(BCSCryptoHash, Clone, CryptoHasher, Debug, Eq, PartialEq)]
182+
#[derive(Clone, Debug, BCSCryptoHash, CryptoHasher)]
182183
pub struct StateValue {
183184
data: Bytes,
184185
metadata: StateValueMetadata,
186+
maybe_rapid_hash: Option<(u64, usize)>,
185187
}
186188

189+
impl PartialEq for StateValue {
190+
fn eq(&self, other: &Self) -> bool {
191+
// Fast path: if both have rapid hashes and they differ, values can't be equal
192+
if let (Some(hash1), Some(hash2)) = (&self.maybe_rapid_hash, &other.maybe_rapid_hash) {
193+
if hash1 != hash2 {
194+
return false;
195+
}
196+
}
197+
198+
// Full comparison: data and metadata
199+
self.data == other.data && self.metadata == other.metadata
200+
}
201+
}
202+
203+
impl Eq for StateValue {}
204+
187205
pub const ARB_STATE_VALUE_MAX_SIZE: usize = 100;
188206

207+
/// Threshold for computing rapid hash on StateValue data to optimize equality checks
208+
pub const RAPID_HASH_THRESHOLD: usize = 32;
209+
189210
#[cfg(any(test, feature = "fuzzing"))]
190211
impl Arbitrary for StateValue {
191212
type Parameters = ();
@@ -217,8 +238,17 @@ impl Serialize for StateValue {
217238
}
218239

219240
impl StateValue {
241+
/// Computes rapid hash if data is large enough, otherwise returns None
242+
fn compute_rapid_hash(data: &Bytes) -> Option<(u64, usize)> {
243+
(data.len() >= RAPID_HASH_THRESHOLD).then(|| (rapidhash(data), data.len()))
244+
}
245+
220246
fn to_persistable_form(&self) -> PersistedStateValue {
221-
let Self { data, metadata } = self.clone();
247+
let Self {
248+
data,
249+
metadata,
250+
maybe_rapid_hash: _,
251+
} = self.clone();
222252
let metadata = metadata.into_persistable();
223253
match metadata {
224254
None => PersistedStateValue::V0(data),
@@ -231,7 +261,12 @@ impl StateValue {
231261
}
232262

233263
pub fn new_with_metadata(data: Bytes, metadata: StateValueMetadata) -> Self {
234-
Self { data, metadata }
264+
let maybe_rapid_hash = Self::compute_rapid_hash(&data);
265+
Self {
266+
data,
267+
metadata,
268+
maybe_rapid_hash,
269+
}
235270
}
236271

237272
pub fn size(&self) -> usize {
@@ -249,6 +284,7 @@ impl StateValue {
249284
f: F,
250285
) -> anyhow::Result<StateValue> {
251286
self.data = f(self.data)?;
287+
self.maybe_rapid_hash = Self::compute_rapid_hash(&self.data);
252288
Ok(self)
253289
}
254290

@@ -258,6 +294,7 @@ impl StateValue {
258294

259295
pub fn set_bytes(&mut self, data: Bytes) {
260296
self.data = data;
297+
self.maybe_rapid_hash = Self::compute_rapid_hash(&self.data);
261298
}
262299

263300
pub fn metadata(&self) -> &StateValueMetadata {
@@ -273,7 +310,11 @@ impl StateValue {
273310
}
274311

275312
pub fn unpack(self) -> (StateValueMetadata, Bytes) {
276-
let Self { data, metadata } = self;
313+
let Self {
314+
data,
315+
metadata,
316+
maybe_rapid_hash: _,
317+
} = self;
277318

278319
(metadata, data)
279320
}

0 commit comments

Comments
 (0)