Skip to content

Commit b25f10f

Browse files
authored
feat: add tail worker that incorporates the queue (#14)
* add tail worker that incorporates the queue * deploy also the tail worker
1 parent 047bb3c commit b25f10f

File tree

5 files changed

+66
-9
lines changed

5 files changed

+66
-9
lines changed

confidence-cloudflare-resolver/deployer/script.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,11 @@ RUSTFLAGS='--cfg getrandom_backend="wasm_js"' worker-build --release
309309

310310
# only deploy if NO_DEPLOY is not set
311311
if test -z "$NO_DEPLOY"; then
312+
# deploy the tail worker
313+
cd tail
314+
wrangler deploy
315+
# deploy the resolver worker
316+
cd ..
312317
wrangler deploy
313318
else
314319
echo "NO_DEPLOY is set, skipping deploy"

confidence-cloudflare-resolver/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
214214
&& aggregated.client_resolve_info.is_empty())
215215
{
216216
let converted = serde_json::to_string(&aggregated)?;
217-
FLAGS_LOGS_QUEUE.get().unwrap().send(converted).await?;
217+
console_log!("FLAGS_LOGS_QUEUE:{}", converted);
218218
}
219219

220220
response
@@ -249,7 +249,6 @@ pub async fn consume_flag_logs_queue(
249249

250250
Ok(())
251251
}
252-
253252
fn get_token(client_id: &str, client_secret: &str) -> String {
254253
let combined = format!("{}:{}", client_id, client_secret);
255254
let encoded = STANDARD.encode(combined.as_bytes());
@@ -268,7 +267,6 @@ async fn send_flags_logs(
268267
client_secret: &str,
269268
message: WriteFlagLogsRequest,
270269
) -> Result<Response> {
271-
console_log!("Sending logs {:?}", message);
272270
let resolve_url = "https://resolver.confidence.dev/v1/flagLogs:write";
273271
let mut init = RequestInit::new();
274272
let headers = Headers::new();
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
export default {
2+
async tail(events, env, ctx) {
3+
// Retrieve the queue binding
4+
const queue = env.flag_logs_queue;
5+
if (!queue) {
6+
console.error("LOG_QUEUE binding is missing");
7+
return;
8+
}
9+
10+
for (const event of events) {
11+
try {
12+
// Filter events from confidence-cloudflare-resolver only
13+
if (event.scriptName !== "confidence-cloudflare-resolver") {
14+
continue;
15+
}
16+
17+
// Process each log entry in the event
18+
for (const logEntry of event.logs || []) {
19+
try {
20+
// Check if the log message starts with FLAGS_LOGS_QUEUE:
21+
const messageStr = Array.isArray(logEntry.message)
22+
? logEntry.message.join(' ')
23+
: String(logEntry.message || '');
24+
25+
if (messageStr.startsWith('FLAGS_LOGS_QUEUE:')) {
26+
// Remove the FLAGS_LOGS_QUEUE: prefix
27+
const cleanedMessage = messageStr.substring('FLAGS_LOGS_QUEUE:'.length).trim();
28+
29+
try {
30+
// Parse the JSON payload to validate it's valid JSON
31+
JSON.parse(cleanedMessage);
32+
await queue.send(cleanedMessage);
33+
} catch (parseErr) {
34+
console.error("Failed to parse JSON payload:", parseErr);
35+
console.error("Raw message was:", cleanedMessage);
36+
}
37+
}
38+
} catch (logErr) {
39+
console.error("Failed to process log entry:", logErr);
40+
}
41+
}
42+
} catch (err) {
43+
console.error("Failed to process event:", err);
44+
}
45+
}
46+
}
47+
};
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
name = "confidence-resolver-tail"
2+
type = "javascript"
3+
compatibility_date = "2025-01-28"
4+
workers_dev = true
5+
main = "src/index.js"
6+
7+
[observability]
8+
enabled = true
9+
10+
[[queues.producers]]
11+
queue = "flag-logs-queue"
12+
binding = "flag_logs_queue"

confidence-cloudflare-resolver/wrangler.toml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
name = "confidence-cloudflare-resolver"
22
main = "build/worker/shim.mjs"
33
compatibility_date = "2025-01-28"
4+
tail_consumers = [{service = "confidence-resolver-tail"}]
45

56
[observability]
67
enabled = true
78

8-
9-
10-
[[queues.producers]]
11-
queue = "flag-logs-queue"
12-
binding = "flag_logs_queue"
13-
149
[[queues.consumers]]
1510
queue = "flag-logs-queue"
1611
max_batch_size = 100 # max number of messages in a batch

0 commit comments

Comments
 (0)