Skip to content

Commit a64427e

Browse files
ldanilekConvex, Inc.
authored andcommitted
[components] loop over components when applicable (#27860)
some parts of our system should be made component-aware in the most obvious way: by looping over all components. This applies to db-verifier and scheduled job garbage collection and usage. GitOrigin-RevId: 3bb3dad23084ad47c6968310bb8a1222e16b82b3
1 parent 1da5c43 commit a64427e

File tree

12 files changed

+121
-101
lines changed

12 files changed

+121
-101
lines changed

crates/application/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1758,7 +1758,7 @@ impl<RT: Runtime> Application<RT> {
17581758

17591759
let config_metadata = ConfigMetadata::from_file(config_file, auth_providers);
17601760

1761-
let (config_diff, schema) = ConfigModel::new(tx)
1761+
let (config_diff, schema) = ConfigModel::new(tx, ComponentId::TODO())
17621762
.apply(
17631763
config_metadata.clone(),
17641764
modules,

crates/application/src/scheduled_jobs/mod.rs

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
cmp,
23
collections::{
34
BTreeMap,
45
HashSet,
@@ -88,10 +89,7 @@ use model::{
8889
use parking_lot::Mutex;
8990
use sync_types::Timestamp;
9091
use usage_tracking::FunctionUsageTracker;
91-
use value::{
92-
ResolvedDocumentId,
93-
TableNamespace,
94-
};
92+
use value::ResolvedDocumentId;
9593

9694
use crate::{
9795
application_function_runner::ApplicationFunctionRunner,
@@ -897,51 +895,65 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
897895
async fn run(&self, backoff: &mut Backoff) -> anyhow::Result<()> {
898896
loop {
899897
let mut tx = self.database.begin(Identity::system()).await?;
900-
let namespace = TableNamespace::by_component_TODO();
901-
let now = self.rt.generate_timestamp()?;
902-
let index_query = Query::index_range(IndexRange {
903-
index_name: SCHEDULED_JOBS_INDEX_BY_COMPLETED_TS.clone(),
904-
range: vec![IndexRangeExpression::Gt(
905-
COMPLETED_TS_FIELD.clone(),
906-
value::ConvexValue::Null,
907-
)],
908-
order: Order::Asc,
909-
})
910-
.limit(*SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE);
911-
let mut query_stream = ResolvedQuery::new(&mut tx, namespace, index_query)?;
912-
898+
let namespaces = tx
899+
.table_mapping()
900+
.namespaces_for_name(&SCHEDULED_JOBS_TABLE);
901+
let mut deleted_jobs = false;
913902
let mut next_job_wait = None;
914-
let mut jobs_to_delete = vec![];
915-
while let Some(doc) = query_stream.next(&mut tx, None).await? {
916-
let job: ParsedDocument<ScheduledJob> = doc.try_into()?;
917-
match job.state {
918-
ScheduledJobState::Success => (),
919-
ScheduledJobState::Failed(_) => (),
920-
ScheduledJobState::Canceled => (),
921-
_ => anyhow::bail!("Scheduled job to be garbage collected has the wrong state"),
922-
}
903+
for namespace in namespaces {
904+
let now = self.rt.generate_timestamp()?;
905+
let index_query = Query::index_range(IndexRange {
906+
index_name: SCHEDULED_JOBS_INDEX_BY_COMPLETED_TS.clone(),
907+
range: vec![IndexRangeExpression::Gt(
908+
COMPLETED_TS_FIELD.clone(),
909+
value::ConvexValue::Null,
910+
)],
911+
order: Order::Asc,
912+
})
913+
.limit(*SCHEDULED_JOB_GARBAGE_COLLECTION_BATCH_SIZE);
914+
let mut query_stream = ResolvedQuery::new(&mut tx, namespace, index_query)?;
915+
916+
let mut jobs_to_delete = vec![];
917+
while let Some(doc) = query_stream.next(&mut tx, None).await? {
918+
let job: ParsedDocument<ScheduledJob> = doc.try_into()?;
919+
match job.state {
920+
ScheduledJobState::Success => (),
921+
ScheduledJobState::Failed(_) => (),
922+
ScheduledJobState::Canceled => (),
923+
_ => anyhow::bail!(
924+
"Scheduled job to be garbage collected has the wrong state"
925+
),
926+
}
923927

924-
let completed_ts = match job.completed_ts {
925-
Some(completed_ts) => completed_ts,
926-
None => {
927-
anyhow::bail!("Could not get completed_ts of finished scheduled job");
928-
},
929-
};
930-
if completed_ts.add(*SCHEDULED_JOB_RETENTION)? > now {
931-
next_job_wait = Some(completed_ts.add(*SCHEDULED_JOB_RETENTION)? - now);
932-
break;
928+
let completed_ts = match job.completed_ts {
929+
Some(completed_ts) => completed_ts,
930+
None => {
931+
anyhow::bail!("Could not get completed_ts of finished scheduled job");
932+
},
933+
};
934+
if completed_ts.add(*SCHEDULED_JOB_RETENTION)? > now {
935+
let next_job_wait_ns = completed_ts.add(*SCHEDULED_JOB_RETENTION)? - now;
936+
next_job_wait = match next_job_wait {
937+
Some(next_job_wait) => Some(cmp::min(next_job_wait, next_job_wait_ns)),
938+
None => Some(next_job_wait_ns),
939+
};
940+
break;
941+
}
942+
jobs_to_delete.push(job.id());
933943
}
934-
jobs_to_delete.push(job.id());
935-
}
936-
if !jobs_to_delete.is_empty() {
937-
tracing::debug!(
938-
"Garbage collecting {} finished scheduled jobs",
939-
jobs_to_delete.len()
940-
);
941-
let mut model = SchedulerModel::new(&mut tx, namespace);
942-
for job_id in jobs_to_delete {
943-
model.delete(job_id).await?;
944+
if !jobs_to_delete.is_empty() {
945+
tracing::debug!(
946+
"Garbage collecting {} finished scheduled jobs",
947+
jobs_to_delete.len()
948+
);
949+
let mut model = SchedulerModel::new(&mut tx, namespace);
950+
for job_id in jobs_to_delete {
951+
model.delete(job_id).await?;
952+
}
953+
deleted_jobs = true;
944954
}
955+
}
956+
if deleted_jobs {
945957
self.database
946958
.commit_with_write_source(tx, "scheduled_job_gc")
947959
.await?;

crates/application/src/test_helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ impl<RT: Runtime> Application<RT> {
338338
.await??;
339339
let schema_id = insert_validated_schema(&mut tx).await?;
340340

341-
ConfigModel::new(&mut tx)
341+
ConfigModel::new(&mut tx, ComponentId::test_user())
342342
.apply(
343343
ConfigMetadata::new(),
344344
test_source.clone(),

crates/application/src/tests/components.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use common::{
1515
},
1616
components::{
1717
ComponentFunctionPath,
18+
ComponentId,
1819
ComponentPath,
1920
Reference,
2021
Resource,
@@ -125,7 +126,7 @@ bar.invokeAction = async (requestId, argsStr) => {
125126
source_mapped: None,
126127
},
127128
);
128-
ConfigModel::new(&mut tx)
129+
ConfigModel::new(&mut tx, ComponentId::test_user())
129130
.apply(
130131
ConfigMetadata::new(),
131132
vec![module],

crates/isolate/src/test_helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ impl<RT: Runtime, P: Persistence + Clone> UdfTest<RT, P> {
343343
)
344344
.await?;
345345
let mut tx = database.begin(Identity::system()).await?;
346-
ConfigModel::new(&mut tx)
346+
ConfigModel::new(&mut tx, ComponentId::test_user())
347347
.apply(
348348
ConfigMetadata::new(),
349349
modules,

crates/isolate/src/tests/internal.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
use common::{
77
components::{
88
CanonicalizedComponentFunctionPath,
9+
ComponentId,
910
ComponentPath,
1011
},
1112
types::{
@@ -57,9 +58,10 @@ async fn test_udf_visibility(rt: TestRuntime) -> anyhow::Result<()> {
5758
let post_internal_npm_version = Version::parse("1.0.0").unwrap();
5859

5960
let mut tx = t.database.begin(Identity::system()).await?;
60-
let (config_metadata, module_configs, _udf_config) = ConfigModel::new(&mut tx)
61-
.get_with_module_source(t.module_loader.as_ref())
62-
.await?;
61+
let (config_metadata, module_configs, _udf_config) =
62+
ConfigModel::new(&mut tx, ComponentId::test_user())
63+
.get_with_module_source(t.module_loader.as_ref())
64+
.await?;
6365
let modules_by_path = module_configs
6466
.iter()
6567
.map(|c| (c.path.clone().canonicalize(), c.clone()))
@@ -78,7 +80,7 @@ async fn test_udf_visibility(rt: TestRuntime) -> anyhow::Result<()> {
7880

7981
// Newer version + analyze results
8082
tx = t.database.begin(Identity::system()).await?;
81-
ConfigModel::new(&mut tx)
83+
ConfigModel::new(&mut tx, ComponentId::test_user())
8284
.apply(
8385
config_metadata.clone(),
8486
module_configs.clone(),

crates/local_backend/src/deploy_config.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use axum::{
2121
response::IntoResponse,
2222
};
2323
use common::{
24+
components::ComponentId,
2425
http::{
2526
extract::Json,
2627
HttpResponseError,
@@ -304,7 +305,7 @@ pub async fn get_config(
304305
.await?;
305306

306307
let mut tx = st.application.begin(identity).await?;
307-
let (config, modules, udf_config) = ConfigModel::new(&mut tx)
308+
let (config, modules, udf_config) = ConfigModel::new(&mut tx, ComponentId::TODO())
308309
.get_with_module_source(st.application.modules_cache())
309310
.await?;
310311
let config = ConvexObject::try_from(config)?;
@@ -334,8 +335,9 @@ pub async fn get_config_hashes(
334335
.await?;
335336

336337
let mut tx = st.application.begin(identity).await?;
337-
let (config, modules, udf_config) =
338-
ConfigModel::new(&mut tx).get_with_module_metadata().await?;
338+
let (config, modules, udf_config) = ConfigModel::new(&mut tx, ComponentId::TODO())
339+
.get_with_module_metadata()
340+
.await?;
339341
let module_hashes: Vec<_> = modules
340342
.into_iter()
341343
.map(|m| ModuleHashJson {

crates/model/src/config/index_test_utils.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66

77
use common::{
88
bootstrap_model::schema::SchemaState,
9+
components::ComponentId,
910
persistence::{
1011
NoopRetentionValidator,
1112
Persistence,
@@ -173,7 +174,7 @@ pub async fn apply_config(
173174
};
174175

175176
let mut tx = db.begin_system().await?;
176-
ConfigModel::new(&mut tx)
177+
ConfigModel::new(&mut tx, ComponentId::test_user())
177178
.apply(
178179
config_metadata,
179180
vec![],

crates/model/src/config/mod.rs

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,7 @@ use database::{
2828
Transaction,
2929
};
3030
use sync_types::CanonicalizedModulePath;
31-
use value::{
32-
ResolvedDocumentId,
33-
TableNamespace,
34-
};
31+
use value::ResolvedDocumentId;
3532

3633
use self::module_loader::ModuleLoader;
3734
use crate::{
@@ -60,11 +57,12 @@ use crate::{
6057

6158
pub struct ConfigModel<'a, RT: Runtime> {
6259
pub tx: &'a mut Transaction<RT>,
60+
component: ComponentId,
6361
}
6462

6563
impl<'a, RT: Runtime> ConfigModel<'a, RT> {
66-
pub fn new(tx: &'a mut Transaction<RT>) -> Self {
67-
Self { tx }
64+
pub fn new(tx: &'a mut Transaction<RT>, component: ComponentId) -> Self {
65+
Self { tx, component }
6866
}
6967

7068
#[minitrace::trace]
@@ -84,41 +82,34 @@ impl<'a, RT: Runtime> ConfigModel<'a, RT> {
8482

8583
let source_package_id = match source_package {
8684
Some(source_package) => Some(
87-
SourcePackageModel::new(self.tx, TableNamespace::by_component_TODO())
85+
SourcePackageModel::new(self.tx, self.component.into())
8886
.put(source_package)
8987
.await?,
9088
),
9189
None => None,
9290
};
9391

94-
let cron_diff = CronModel::new(self.tx, ComponentId::TODO())
92+
let cron_diff = CronModel::new(self.tx, self.component)
9593
.apply(&analyze_results)
9694
.await?;
9795

98-
let (schema_diff, next_schema) =
99-
SchemaModel::new(self.tx, TableNamespace::by_component_TODO())
100-
.apply(schema_id)
101-
.await?;
96+
let (schema_diff, next_schema) = SchemaModel::new(self.tx, self.component.into())
97+
.apply(schema_id)
98+
.await?;
10299

103100
let index_diff = IndexModel::new(self.tx)
104-
.apply(TableNamespace::by_component_TODO(), &next_schema)
101+
.apply(self.component.into(), &next_schema)
105102
.await?;
106103

107104
let module_diff = ModuleModel::new(self.tx)
108-
.apply(
109-
ComponentId::TODO(),
110-
modules,
111-
source_package_id,
112-
analyze_results,
113-
)
105+
.apply(self.component, modules, source_package_id, analyze_results)
114106
.await?;
115107

116108
// Update auth info.
117109
let auth_diff = AuthInfoModel::new(self.tx).put(config.auth_info).await?;
118-
let udf_server_version_diff =
119-
UdfConfigModel::new(self.tx, TableNamespace::by_component_TODO())
120-
.set(new_config)
121-
.await?;
110+
let udf_server_version_diff = UdfConfigModel::new(self.tx, self.component.into())
111+
.set(new_config)
112+
.await?;
122113
let config_diff = ConfigDiff {
123114
module_diff,
124115
auth_diff,
@@ -148,7 +139,7 @@ impl<'a, RT: Runtime> ConfigModel<'a, RT> {
148139
}
149140
let mut config = ConfigMetadata::new();
150141
let modules: Vec<_> = ModuleModel::new(self.tx)
151-
.get_application_modules(ComponentId::TODO(), module_loader)
142+
.get_application_modules(self.component, module_loader)
152143
.await?
153144
.into_values()
154145
.collect();
@@ -162,7 +153,7 @@ impl<'a, RT: Runtime> ConfigModel<'a, RT> {
162153
config.auth_info = auth_info.into_iter().map(|doc| doc.into_value()).collect();
163154
}
164155

165-
let udf_config = UdfConfigModel::new(self.tx, TableNamespace::by_component_TODO())
156+
let udf_config = UdfConfigModel::new(self.tx, self.component.into())
166157
.get()
167158
.await?
168159
.map(|u| u.into_value());
@@ -186,7 +177,7 @@ impl<'a, RT: Runtime> ConfigModel<'a, RT> {
186177
}
187178
let mut config = ConfigMetadata::new();
188179
let modules = ModuleModel::new(self.tx)
189-
.get_application_metadata(ComponentId::TODO())
180+
.get_application_metadata(self.component)
190181
.await?;
191182

192183
// If we have an auth config module do not include auth_info in the config
@@ -198,7 +189,7 @@ impl<'a, RT: Runtime> ConfigModel<'a, RT> {
198189
config.auth_info = auth_info.into_iter().map(|doc| doc.into_value()).collect();
199190
}
200191

201-
let udf_config = UdfConfigModel::new(self.tx, TableNamespace::by_component_TODO())
192+
let udf_config = UdfConfigModel::new(self.tx, self.component.into())
202193
.get()
203194
.await?
204195
.map(|u| u.into_value());

0 commit comments

Comments
 (0)