Skip to content

Commit 1cbb57b

Browse files
cteplaycations
authored andcommitted
Implement deferred task subscriptions (RooCodeInc#7517)
1 parent be4fe29 commit 1cbb57b

File tree

8 files changed

+106
-56
lines changed

8 files changed

+106
-56
lines changed

packages/cloud/src/bridge/BridgeOrchestrator.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ export interface BridgeOrchestratorOptions {
3131
export class BridgeOrchestrator {
3232
private static instance: BridgeOrchestrator | null = null
3333

34+
private static pendingTask: TaskLike | null = null
35+
3436
// Core
3537
private readonly userId: string
3638
private readonly socketBridgeUrl: string
@@ -116,6 +118,22 @@ export class BridgeOrchestrator {
116118
}
117119
}
118120

121+
/**
122+
* @TODO: What if subtasks also get spawned? We'd probably want deferred
123+
* subscriptions for those too.
124+
*/
125+
public static async subscribeToTask(task: TaskLike): Promise<void> {
126+
const instance = BridgeOrchestrator.instance
127+
128+
if (instance && instance.socketTransport.isConnected()) {
129+
console.log(`[BridgeOrchestrator#subscribeToTask] Subscribing to task ${task.taskId}`)
130+
await instance.subscribeToTask(task)
131+
} else {
132+
console.log(`[BridgeOrchestrator#subscribeToTask] Deferring subscription for task ${task.taskId}`)
133+
BridgeOrchestrator.pendingTask = task
134+
}
135+
}
136+
119137
private constructor(options: BridgeOrchestratorOptions) {
120138
this.userId = options.userId
121139
this.socketBridgeUrl = options.socketBridgeUrl
@@ -180,12 +198,27 @@ export class BridgeOrchestrator {
180198
const socket = this.socketTransport.getSocket()
181199

182200
if (!socket) {
183-
console.error("[BridgeOrchestrator] Socket not available after connect")
201+
console.error("[BridgeOrchestrator#handleConnect] Socket not available")
184202
return
185203
}
186204

187205
await this.extensionChannel.onConnect(socket)
188206
await this.taskChannel.onConnect(socket)
207+
208+
if (BridgeOrchestrator.pendingTask) {
209+
console.log(
210+
`[BridgeOrchestrator#handleConnect] Subscribing to task ${BridgeOrchestrator.pendingTask.taskId}`,
211+
)
212+
213+
try {
214+
await this.subscribeToTask(BridgeOrchestrator.pendingTask)
215+
BridgeOrchestrator.pendingTask = null
216+
} catch (error) {
217+
console.error(
218+
`[BridgeOrchestrator#handleConnect] subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
219+
)
220+
}
221+
}
189222
}
190223

191224
private handleDisconnect() {
@@ -261,6 +294,7 @@ export class BridgeOrchestrator {
261294
await this.taskChannel.cleanup(this.socketTransport.getSocket())
262295
await this.socketTransport.disconnect()
263296
BridgeOrchestrator.instance = null
297+
BridgeOrchestrator.pendingTask = null
264298
}
265299

266300
public async reconnect(): Promise<void> {

packages/cloud/src/bridge/TaskChannel.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,25 +163,27 @@ export class TaskChannel extends BaseChannel<
163163
public async unsubscribeFromTask(taskId: string, _socket: Socket): Promise<void> {
164164
const task = this.subscribedTasks.get(taskId)
165165

166+
if (!task) {
167+
return
168+
}
169+
166170
await this.publish(TaskSocketEvents.LEAVE, { taskId }, (response: LeaveResponse) => {
167171
if (response.success) {
168-
console.log(`[TaskChannel#unsubscribeFromTask] unsubscribed from ${taskId}`, response)
172+
console.log(`[TaskChannel#unsubscribeFromTask] unsubscribed from ${taskId}`)
169173
} else {
170174
console.error(`[TaskChannel#unsubscribeFromTask] failed to unsubscribe from ${taskId}`)
171175
}
172176

173177
// If we failed to unsubscribe then something is probably wrong and
174178
// we should still discard this task from `subscribedTasks`.
175-
if (task) {
176-
this.removeTaskListeners(task)
177-
this.subscribedTasks.delete(taskId)
178-
}
179+
this.removeTaskListeners(task)
180+
this.subscribedTasks.delete(taskId)
179181
})
180182
}
181183

182184
private setupTaskListeners(task: TaskLike): void {
183185
if (this.taskListeners.has(task.taskId)) {
184-
console.warn("[TaskChannel] Listeners already exist for task, removing old listeners:", task.taskId)
186+
console.warn(`[TaskChannel] Listeners already exist for task, removing old listeners for ${task.taskId}`)
185187
this.removeTaskListeners(task)
186188
}
187189

packages/cloud/src/bridge/__tests__/TaskChannel.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,7 @@ describe("TaskChannel", () => {
299299

300300
// Verify warning was logged
301301
expect(warnSpy).toHaveBeenCalledWith(
302-
"[TaskChannel] Listeners already exist for task, removing old listeners:",
303-
taskId,
302+
`[TaskChannel] Listeners already exist for task, removing old listeners for ${taskId}`,
304303
)
305304

306305
// Verify only one set of listeners exists

packages/types/npm/package.metadata.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@roo-code/types",
3-
"version": "1.64.0",
3+
"version": "1.65.0",
44
"description": "TypeScript type definitions for Roo Code.",
55
"publishConfig": {
66
"access": "public",

src/core/task/Task.ts

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,6 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
258258

259259
// Task Bridge
260260
enableBridge: boolean
261-
bridge: BridgeOrchestrator | null = null
262261

263262
// Streaming
264263
isWaitingForFirstChunk = false
@@ -1086,14 +1085,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
10861085
private async startTask(task?: string, images?: string[]): Promise<void> {
10871086
if (this.enableBridge) {
10881087
try {
1089-
this.bridge = this.bridge || BridgeOrchestrator.getInstance()
1090-
1091-
if (this.bridge) {
1092-
await this.bridge.subscribeToTask(this)
1093-
}
1088+
await BridgeOrchestrator.subscribeToTask(this)
10941089
} catch (error) {
10951090
console.error(
1096-
`[Task#startTask] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`,
1091+
`[Task#startTask] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
10971092
)
10981093
}
10991094
}
@@ -1158,14 +1153,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
11581153
private async resumeTaskFromHistory() {
11591154
if (this.enableBridge) {
11601155
try {
1161-
this.bridge = this.bridge || BridgeOrchestrator.getInstance()
1162-
1163-
if (this.bridge) {
1164-
await this.bridge.subscribeToTask(this)
1165-
}
1156+
await BridgeOrchestrator.subscribeToTask(this)
11661157
} catch (error) {
11671158
console.error(
1168-
`[Task#resumeTaskFromHistory] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`,
1159+
`[Task#resumeTaskFromHistory] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
11691160
)
11701161
}
11711162
}
@@ -1419,10 +1410,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
14191410
}
14201411

14211412
public dispose(): void {
1422-
// Disposing task
1423-
console.log(`[Task] disposing task ${this.taskId}.${this.instanceId}`)
1413+
console.log(`[Task#dispose] disposing task ${this.taskId}.${this.instanceId}`)
14241414

1425-
// Remove all event listeners to prevent memory leaks
1415+
// Remove all event listeners to prevent memory leaks.
14261416
try {
14271417
this.removeAllListeners()
14281418
} catch (error) {
@@ -1442,13 +1432,14 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
14421432
this.pauseInterval = undefined
14431433
}
14441434

1445-
// Unsubscribe from TaskBridge service.
1446-
if (this.bridge) {
1447-
this.bridge
1448-
.unsubscribeFromTask(this.taskId)
1449-
.catch((error: unknown) => console.error("Error unsubscribing from task bridge:", error))
1450-
1451-
this.bridge = null
1435+
if (this.enableBridge) {
1436+
BridgeOrchestrator.getInstance()
1437+
?.unsubscribeFromTask(this.taskId)
1438+
.catch((error) =>
1439+
console.error(
1440+
`[Task#dispose] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`,
1441+
),
1442+
)
14521443
}
14531444

14541445
// Release any terminals associated with this task.

src/core/task/__tests__/Task.dispose.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ describe("Task dispose method", () => {
134134

135135
// Verify dispose was called and logged
136136
expect(consoleLogSpy).toHaveBeenCalledWith(
137-
expect.stringContaining(`[Task] disposing task ${task.taskId}.${task.instanceId}`),
137+
expect.stringContaining(`[Task#dispose] disposing task ${task.taskId}.${task.instanceId}`),
138138
)
139139

140140
// Verify removeAllListeners was called first (before other cleanup)

src/core/webview/ClineProvider.ts

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ export class ClineProvider
167167

168168
this.marketplaceManager = new MarketplaceManager(this.context, this.customModesManager)
169169

170+
// Forward <most> task events to the provider.
171+
// We do something fairly similar for the IPC-based API.
170172
this.taskCreationCallback = (instance: Task) => {
171173
this.emit(RooCodeEventName.TaskCreated, instance)
172174

@@ -348,18 +350,18 @@ export class ClineProvider
348350
let task = this.clineStack.pop()
349351

350352
if (task) {
353+
task.emit(RooCodeEventName.TaskUnfocused)
354+
351355
try {
352356
// Abort the running task and set isAbandoned to true so
353357
// all running promises will exit as well.
354358
await task.abortTask(true)
355359
} catch (e) {
356360
this.log(
357-
`[removeClineFromStack] encountered error while aborting task ${task.taskId}.${task.instanceId}: ${e.message}`,
361+
`[ClineProvider#removeClineFromStack] abortTask() failed ${task.taskId}.${task.instanceId}: ${e.message}`,
358362
)
359363
}
360364

361-
task.emit(RooCodeEventName.TaskUnfocused)
362-
363365
// Remove event listeners before clearing the reference.
364366
const cleanupFunctions = this.taskEventListeners.get(task)
365367

@@ -407,12 +409,6 @@ export class ClineProvider
407409
await this.getCurrentTask()?.resumePausedTask(lastMessage)
408410
}
409411

410-
// Clear the current task without treating it as a subtask.
411-
// This is used when the user cancels a task that is not a subtask.
412-
async clearTask() {
413-
await this.removeClineFromStack()
414-
}
415-
416412
resumeTask(taskId: string): void {
417413
// Use the existing showTaskWithId method which handles both current and historical tasks
418414
this.showTaskWithId(taskId).catch((error) => {
@@ -1365,6 +1361,16 @@ export class ClineProvider
13651361
await this.createTaskWithHistoryItem({ ...historyItem, rootTask, parentTask, preservedFCOState })
13661362
}
13671363

1364+
// Clear the current task without treating it as a subtask.
1365+
// This is used when the user cancels a task that is not a subtask.
1366+
async clearTask() {
1367+
if (this.clineStack.length > 0) {
1368+
const task = this.clineStack[this.clineStack.length - 1]
1369+
console.log(`[clearTask] clearing task ${task.taskId}.${task.instanceId}`)
1370+
await this.removeClineFromStack()
1371+
}
1372+
}
1373+
13681374
async updateCustomInstructions(instructions?: string) {
13691375
// User may be clearing the field.
13701376
await this.updateGlobalState("customInstructions", instructions || undefined)
@@ -1643,6 +1649,7 @@ export class ClineProvider
16431649
})
16441650
} catch (error) {
16451651
console.error("Failed to fetch marketplace data:", error)
1652+
16461653
// Send empty data on error to prevent UI from hanging
16471654
this.postMessageToWebview({
16481655
type: "marketplaceData",
@@ -2272,24 +2279,23 @@ export class ClineProvider
22722279
if (bridge) {
22732280
const currentTask = this.getCurrentTask()
22742281

2275-
if (currentTask && !currentTask.bridge) {
2282+
if (currentTask && !currentTask.enableBridge) {
22762283
try {
2277-
currentTask.bridge = bridge
2278-
await currentTask.bridge.subscribeToTask(currentTask)
2284+
currentTask.enableBridge = true
2285+
await BridgeOrchestrator.subscribeToTask(currentTask)
22792286
} catch (error) {
2280-
const message = `[ClineProvider#remoteControlEnabled] subscribeToTask failed - ${error instanceof Error ? error.message : String(error)}`
2287+
const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`
22812288
this.log(message)
22822289
console.error(message)
22832290
}
22842291
}
22852292
} else {
22862293
for (const task of this.clineStack) {
2287-
if (task.bridge) {
2294+
if (task.enableBridge) {
22882295
try {
2289-
await task.bridge.unsubscribeFromTask(task.taskId)
2290-
task.bridge = null
2296+
await BridgeOrchestrator.getInstance()?.unsubscribeFromTask(task.taskId)
22912297
} catch (error) {
2292-
const message = `[ClineProvider#remoteControlEnabled] unsubscribeFromTask failed - ${error instanceof Error ? error.message : String(error)}`
2298+
const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`
22932299
this.log(message)
22942300
console.error(message)
22952301
}

src/extension/api.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -253,11 +253,29 @@ export class API extends EventEmitter<RooCodeEvents> implements RooCodeAPI {
253253
this.taskMap.delete(task.taskId)
254254
})
255255

256-
// Optional:
257-
// RooCodeEventName.TaskFocused
258-
// RooCodeEventName.TaskUnfocused
259-
// RooCodeEventName.TaskActive
260-
// RooCodeEventName.TaskIdle
256+
task.on(RooCodeEventName.TaskFocused, () => {
257+
this.emit(RooCodeEventName.TaskFocused, task.taskId)
258+
})
259+
260+
task.on(RooCodeEventName.TaskUnfocused, () => {
261+
this.emit(RooCodeEventName.TaskUnfocused, task.taskId)
262+
})
263+
264+
task.on(RooCodeEventName.TaskActive, () => {
265+
this.emit(RooCodeEventName.TaskActive, task.taskId)
266+
})
267+
268+
task.on(RooCodeEventName.TaskInteractive, () => {
269+
this.emit(RooCodeEventName.TaskInteractive, task.taskId)
270+
})
271+
272+
task.on(RooCodeEventName.TaskResumable, () => {
273+
this.emit(RooCodeEventName.TaskResumable, task.taskId)
274+
})
275+
276+
task.on(RooCodeEventName.TaskIdle, () => {
277+
this.emit(RooCodeEventName.TaskIdle, task.taskId)
278+
})
261279

262280
// Subtask Lifecycle
263281

0 commit comments

Comments
 (0)