Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 101 additions & 96 deletions packages/cloud/src/bridge/SocketTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ export interface SocketTransportOptions {
onConnect?: () => void | Promise<void>
onDisconnect?: (reason: string) => void
onReconnect?: (attemptNumber: number) => void | Promise<void>
onError?: (error: Error) => void
logger?: {
log: (message: string, ...args: unknown[]) => void
error: (message: string, ...args: unknown[]) => void
Expand All @@ -23,7 +22,6 @@ export interface SocketTransportOptions {
export class SocketTransport {
private socket: Socket | null = null
private connectionState: ConnectionState = ConnectionState.DISCONNECTED
private retryAttempt: number = 0
private retryTimeout: NodeJS.Timeout | null = null
private hasConnectedOnce: boolean = false

Expand All @@ -45,6 +43,9 @@ export class SocketTransport {
}
}

// This is the initial connnect attempt. We need to implement our own
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "connnect" should be "connect"

Suggested change
// This is the initial connnect attempt. We need to implement our own
// This is the initial connect attempt. We need to implement our own

// infinite retry mechanism since Socket.io's automatic reconnection only
// kicks in after a successful initial connection.
public async connect(): Promise<void> {
if (this.connectionState === ConnectionState.CONNECTED) {
console.log(`[SocketTransport] Already connected`)
Expand All @@ -56,49 +57,25 @@ export class SocketTransport {
return
}

// Start connection attempt without blocking.
this.startConnectionAttempt()
}

private async startConnectionAttempt() {
this.retryAttempt = 0

try {
await this.connectWithRetry()
} catch (error) {
console.error(
`[SocketTransport] Unexpected error in connection loop: ${error instanceof Error ? error.message : String(error)}`,
)
}
}

private async connectWithRetry(): Promise<void> {
let attempt = 0
let delay = this.retryConfig.initialDelay

while (this.retryAttempt < this.retryConfig.maxInitialAttempts) {
try {
this.connectionState = this.retryAttempt === 0 ? ConnectionState.CONNECTING : ConnectionState.RETRYING

console.log(`[SocketTransport] Connection attempt ${this.retryAttempt + 1}`)

await this.connectSocket()
while (attempt < this.retryConfig.maxInitialAttempts) {
console.log(`[SocketTransport] Initial connect attempt ${attempt + 1}`)
this.connectionState = attempt === 0 ? ConnectionState.CONNECTING : ConnectionState.RETRYING

try {
await this._connect()
console.log(`[SocketTransport] Connected to ${this.options.url}`)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log message duplicates the one on line 103. Is this intentional, or could we remove one of them to reduce redundancy?


this.connectionState = ConnectionState.CONNECTED
this.retryAttempt = 0

this.clearRetryTimeouts()

if (this.options.onConnect) {
await this.options.onConnect()
}

return
} catch (error) {
this.retryAttempt++

console.error(`[SocketTransport] Connection attempt ${this.retryAttempt} failed:`, error)
break
} catch (_error) {
attempt++

if (this.socket) {
this.socket.disconnect()
Expand All @@ -107,40 +84,55 @@ export class SocketTransport {

console.log(`[SocketTransport] Waiting ${delay}ms before retry...`)

await this.delay(delay)
const promise = new Promise((resolve) => {
this.retryTimeout = setTimeout(resolve, delay)
})

await promise

delay = Math.min(delay * this.retryConfig.backoffMultiplier, this.retryConfig.maxDelay)
}
}

if (this.retryTimeout) {
clearTimeout(this.retryTimeout)
this.retryTimeout = null
}

if (this.connectionState === ConnectionState.CONNECTED) {
console.log(`[SocketTransport] Connected to ${this.options.url}`)
} else {
this.connectionState = ConnectionState.FAILED
console.error(`[SocketTransport] Failed to connect to ${this.options.url}, giving up`)
}
}

private async connectSocket(): Promise<void> {
private async _connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.socket = io(this.options.url, this.options.socketOptions)

const connectionTimeout = setTimeout(() => {
console.error(`[SocketTransport] Connection timeout`)
let connectionTimeout: NodeJS.Timeout | null = setTimeout(() => {
console.error(`[SocketTransport] failed to connect after ${this.CONNECTION_TIMEOUT}ms`)

if (this.connectionState !== ConnectionState.CONNECTED) {
this.socket?.disconnect()
reject(new Error("Connection timeout"))
}
}, this.CONNECTION_TIMEOUT)

// https://socket.io/docs/v4/client-api/#event-connect
this.socket.on("connect", async () => {
clearTimeout(connectionTimeout)
console.log(`[SocketTransport] on(connect)`)

const isReconnection = this.hasConnectedOnce

// If this is a reconnection (not the first connect), treat it as a
// reconnect. This handles server restarts where 'reconnect' event might not fire.
if (isReconnection) {
console.log(`[SocketTransport] Treating connect as reconnection (server may have restarted)`)
if (connectionTimeout) {
clearTimeout(connectionTimeout)
connectionTimeout = null
}

if (this.hasConnectedOnce) {
this.connectionState = ConnectionState.CONNECTED

if (this.options.onReconnect) {
// Call onReconnect to re-register instance.
await this.options.onReconnect(0)
}
}
Expand All @@ -149,9 +141,19 @@ export class SocketTransport {
resolve()
})

this.socket.on("disconnect", (reason: string) => {
console.log(`[SocketTransport] Disconnected (reason: ${reason})`)
// https://socket.io/docs/v4/client-api/#event-connect_error
this.socket.on("connect_error", (error) => {
if (connectionTimeout && this.connectionState !== ConnectionState.CONNECTED) {
console.error(`[SocketTransport] on(connect_error): ${error.message}`)
clearTimeout(connectionTimeout)
connectionTimeout = null
reject(error)
}
})

// https://socket.io/docs/v4/client-api/#event-disconnect
this.socket.on("disconnect", (reason, details) => {
console.log(`[SocketTransport] on(disconnect) (reason: ${reason}, details: ${JSON.stringify(details)})`)
this.connectionState = ConnectionState.DISCONNECTED

if (this.options.onDisconnect) {
Expand All @@ -162,77 +164,82 @@ export class SocketTransport {
const isManualDisconnect = reason === "io client disconnect"

if (!isManualDisconnect && this.hasConnectedOnce) {
// After successful initial connection, rely entirely on Socket.IO's
// reconnection.
console.log(`[SocketTransport] Socket.IO will handle reconnection (reason: ${reason})`)
// After successful initial connection, rely entirely on
// Socket.IO's reconnection logic.
console.log("[SocketTransport] will attempt to reconnect")
} else {
console.log("[SocketTransport] will *NOT* attempt to reconnect")
}
})

// Listen for reconnection attempts.
this.socket.on("reconnect_attempt", (attemptNumber: number) => {
console.log(`[SocketTransport] Socket.IO reconnect attempt:`, {
attemptNumber,
})
})
// https://socket.io/docs/v4/client-api/#event-error
// Fired upon a connection error.
this.socket.io.on("error", (error) => {
// Connection error.
if (connectionTimeout && this.connectionState !== ConnectionState.CONNECTED) {
console.error(`[SocketTransport] on(error): ${error.message}`)
clearTimeout(connectionTimeout)
connectionTimeout = null
reject(error)
}

this.socket.on("reconnect", (attemptNumber: number) => {
console.log(`[SocketTransport] Socket reconnected (attempt: ${attemptNumber})`)
// Post-connection error.
if (this.connectionState === ConnectionState.CONNECTED) {
console.error(`[SocketTransport] on(error): ${error.message}`)
}
})

// https://socket.io/docs/v4/client-api/#event-reconnect
// Fired upon a successful reconnection.
this.socket.io.on("reconnect", (attempt) => {
console.log(`[SocketTransport] on(reconnect) - ${attempt}`)
this.connectionState = ConnectionState.CONNECTED

if (this.options.onReconnect) {
this.options.onReconnect(attemptNumber)
this.options.onReconnect(attempt)
}
})

this.socket.on("reconnect_error", (error: Error) => {
console.error(`[SocketTransport] Socket.IO reconnect error:`, error)
// https://socket.io/docs/v4/client-api/#event-reconnect_attempt
// Fired upon an attempt to reconnect.
this.socket.io.on("reconnect_attempt", (attempt) => {
console.log(`[SocketTransport] on(reconnect_attempt) - ${attempt}`)
})

this.socket.on("reconnect_failed", () => {
console.error(`[SocketTransport] Socket.IO reconnection failed after all attempts`)
// https://socket.io/docs/v4/client-api/#event-reconnect_error
// Fired upon a reconnection attempt error.
this.socket.io.on("reconnect_error", (error) => {
console.error(`[SocketTransport] on(reconnect_error): ${error.message}`)
})

this.connectionState = ConnectionState.RETRYING
// https://socket.io/docs/v4/client-api/#event-reconnect_failed
// Fired when couldn't reconnect within `reconnectionAttempts`.
// Since we use infinite retries, this should never fire.
this.socket.io.on("reconnect_failed", () => {
console.error(`[SocketTransport] on(reconnect_failed) - giving up`)
this.connectionState = ConnectionState.FAILED
})

this.socket.on("error", (error) => {
console.error(`[SocketTransport] Socket error:`, error)
// This is a custom event fired by the server.
this.socket.on("auth_error", (error) => {
console.error(`[SocketTransport] on (auth_error):`, error)

if (this.connectionState !== ConnectionState.CONNECTED) {
if (connectionTimeout && this.connectionState !== ConnectionState.CONNECTED) {
clearTimeout(connectionTimeout)
reject(error)
}

if (this.options.onError) {
this.options.onError(error)
connectionTimeout = null
reject(new Error(error.message || "Authentication failed"))
}
})

this.socket.on("auth_error", (error) => {
console.error(`[SocketTransport] Authentication error:`, error)
clearTimeout(connectionTimeout)
reject(new Error(error.message || "Authentication failed"))
})
})
}

private delay(ms: number): Promise<void> {
return new Promise((resolve) => {
this.retryTimeout = setTimeout(resolve, ms)
})
}
public async disconnect(): Promise<void> {
console.log(`[SocketTransport] Disconnecting...`)

private clearRetryTimeouts() {
if (this.retryTimeout) {
clearTimeout(this.retryTimeout)
this.retryTimeout = null
}
}

public async disconnect(): Promise<void> {
console.log(`[SocketTransport] Disconnecting...`)

this.clearRetryTimeouts()

if (this.socket) {
this.socket.removeAllListeners()
Expand All @@ -241,7 +248,6 @@ export class SocketTransport {
}

this.connectionState = ConnectionState.DISCONNECTED

console.log(`[SocketTransport] Disconnected`)
}

Expand All @@ -258,15 +264,14 @@ export class SocketTransport {
}

public async reconnect(): Promise<void> {
console.log(`[SocketTransport] Manually reconnecting...`)

if (this.connectionState === ConnectionState.CONNECTED) {
console.log(`[SocketTransport] Already connected`)
return
}

console.log(`[SocketTransport] Manual reconnection requested`)

this.hasConnectedOnce = false

await this.disconnect()
await this.connect()
}
Expand Down
Loading