Skip to content

Commit 1d22e35

Browse files
committed
feat: Add CacheTask processing logic
## Description This pull request adds the CacheTask task type, completes the adaptation for CacheTask during gRPC download and piece processing, and finally adds a cache parameter to dfget for creating and managing CacheTask. ## Related Issue ### Changes - Added --cache parameter to dfget to create and manage CacheTask. - Added cache task and implemented its download logic. Implemented piece download execution for cache tasks in piece.rs and piece_download.rs. - Added handling for cache peer and cache task in dragonfly-client/grpc/scheduler.rs. - Added cache task support to dfdaemon gRPC servers. ## Motivation and Context To introduce the CacheTask task type to avoid disk interactions, thereby increasing response speed. Signed-off-by: fu220 <[email protected]>
1 parent fdd3fa7 commit 1d22e35

File tree

11 files changed

+4189
-271
lines changed

11 files changed

+4189
-271
lines changed

dragonfly-client-util/src/id_generator/mod.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ const SEED_PEER_SUFFIX: &str = "seed";
3131
/// PERSISTENT_CACHE_TASK_SUFFIX is the suffix of the persistent cache task.
3232
const PERSISTENT_CACHE_TASK_SUFFIX: &str = "persistent-cache-task";
3333

34+
/// CACHE_TASK_SUFFIX is the suffix of the cache task.
35+
const CACHE_TASK_SUFFIX: &str = "cache-task";
36+
3437
/// TaskIDParameter is the parameter of the task id.
3538
pub enum TaskIDParameter {
3639
/// Content uses the content to generate the task id.
@@ -59,6 +62,21 @@ pub enum PersistentCacheTaskIDParameter {
5962
},
6063
}
6164

65+
/// CacheTaskIDParameter is the parameter of the cache task id.
66+
pub enum CacheTaskIDParameter {
67+
/// Content uses the content to generate the cache task id.
68+
Content(String),
69+
/// URLBased uses the url, piece_length, tag, application and filtered_query_params to generate
70+
/// the cache task id.
71+
URLBased {
72+
url: String,
73+
piece_length: Option<u64>,
74+
tag: Option<String>,
75+
application: Option<String>,
76+
filtered_query_params: Vec<String>,
77+
},
78+
}
79+
6280
/// IDGenerator is used to generate the id for the resources.
6381
#[derive(Debug)]
6482
pub struct IDGenerator {
@@ -210,6 +228,69 @@ impl IDGenerator {
210228
}
211229
}
212230

231+
/// cache_task_id generates the cache task id.
232+
#[inline]
233+
pub fn cache_task_id(&self, parameter: CacheTaskIDParameter) -> Result<String> {
234+
match parameter {
235+
CacheTaskIDParameter::Content(content) => {
236+
Ok(hex::encode(Sha256::digest(content.as_bytes())))
237+
}
238+
CacheTaskIDParameter::URLBased {
239+
url,
240+
piece_length,
241+
tag,
242+
application,
243+
filtered_query_params,
244+
} => {
245+
// Filter the query parameters.
246+
let url = Url::parse(url.as_str()).or_err(ErrorType::ParseError)?;
247+
let query = url
248+
.query_pairs()
249+
.filter(|(k, _)| !filtered_query_params.contains(&k.to_string()));
250+
251+
let mut artifact_url = url.clone();
252+
if query.clone().count() == 0 {
253+
artifact_url.set_query(None);
254+
} else {
255+
artifact_url.query_pairs_mut().clear().extend_pairs(query);
256+
}
257+
258+
let artifact_url_str = artifact_url.to_string();
259+
let final_url = if artifact_url_str.ends_with('/') && artifact_url.path() == "/" {
260+
artifact_url_str.trim_end_matches('/').to_string()
261+
} else {
262+
artifact_url_str
263+
};
264+
265+
// Initialize the hasher.
266+
let mut hasher = Sha256::new();
267+
268+
// Add the url to generate the cache task id.
269+
hasher.update(final_url);
270+
271+
// Add the tag to generate the cache task id.
272+
if let Some(tag) = tag {
273+
hasher.update(tag);
274+
}
275+
276+
// Add the application to generate the cache task id.
277+
if let Some(application) = application {
278+
hasher.update(application);
279+
}
280+
281+
// Add the piece length to generate the cache task id.
282+
if let Some(piece_length) = piece_length {
283+
hasher.update(piece_length.to_string());
284+
}
285+
286+
hasher.update(TaskType::Cache.as_str_name().as_bytes());
287+
288+
// Generate the cache task id.
289+
Ok(hex::encode(hasher.finalize()))
290+
}
291+
}
292+
}
293+
213294
/// peer_id generates the peer id.
214295
#[inline]
215296
pub fn peer_id(&self) -> String {
@@ -232,6 +313,10 @@ impl IDGenerator {
232313
return TaskType::PersistentCache;
233314
}
234315

316+
if id.ends_with(CACHE_TASK_SUFFIX) {
317+
return TaskType::Cache;
318+
}
319+
235320
TaskType::Standard
236321
}
237322
}
@@ -434,6 +519,7 @@ mod tests {
434519
"some-task-id-persistent-cache-task",
435520
TaskType::PersistentCache,
436521
),
522+
("some-task-id-cache-task", TaskType::Cache),
437523
];
438524

439525
let generator = IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false);

dragonfly-client/src/bin/dfdaemon/main.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use dragonfly_client::grpc::{
2525
use dragonfly_client::health::Health;
2626
use dragonfly_client::metrics::Metrics;
2727
use dragonfly_client::proxy::Proxy;
28-
use dragonfly_client::resource::{persistent_cache_task::PersistentCacheTask, task::Task};
28+
use dragonfly_client::resource::{
29+
cache_task::CacheTask, persistent_cache_task::PersistentCacheTask, task::Task,
30+
};
2931
use dragonfly_client::stats::Stats;
3032
use dragonfly_client::tracing::init_tracing;
3133
use dragonfly_client_backend::BackendFactory;
@@ -218,6 +220,16 @@ async fn main() -> Result<(), anyhow::Error> {
218220
)?;
219221
let task = Arc::new(task);
220222

223+
// Initialize cache task manager.
224+
let cache_task = CacheTask::new(
225+
config.clone(),
226+
id_generator.clone(),
227+
storage.clone(),
228+
scheduler_client.clone(),
229+
backend_factory.clone(),
230+
)?;
231+
let cache_task = Arc::new(cache_task);
232+
221233
// Initialize persistent cache task manager.
222234
let persistent_cache_task = PersistentCacheTask::new(
223235
config.clone(),
@@ -279,6 +291,7 @@ async fn main() -> Result<(), anyhow::Error> {
279291
config.clone(),
280292
SocketAddr::new(config.upload.server.ip.unwrap(), config.upload.server.port),
281293
task.clone(),
294+
cache_task.clone(),
282295
persistent_cache_task.clone(),
283296
interface.clone(),
284297
shutdown.clone(),
@@ -290,6 +303,7 @@ async fn main() -> Result<(), anyhow::Error> {
290303
config.clone(),
291304
config.download.server.socket_path.clone(),
292305
task.clone(),
306+
cache_task.clone(),
293307
persistent_cache_task.clone(),
294308
shutdown.clone(),
295309
shutdown_complete_tx.clone(),

0 commit comments

Comments
 (0)