Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions rmb-sdk-go/peer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,37 @@ app.WithHandler("sub", func(ctx context.Context, payload []byte) (interface{}, e
})
```

### Handler expectations and recommended patterns

- __Handler callback semantics__
- The `Peer` calls the configured handler synchronously from its processing loop: `handler(ctx, peer, env, err)`.
- If you do heavy or blocking work inside your handler, you should offload it to a goroutine or a bounded worker pool to avoid stalling the peer loop.

- __Server-side: use `Router.Serve`__
- `Router.Serve` is designed for servers. It immediately spawns a goroutine per incoming request and runs middlewares/handlers there, then replies via `peer.SendResponse(...)`.
- This decouples heavy handler work from the peer loop. Register handlers with `router.SubRoute(...).WithHandler(...)`.

- __Client-side: use `RpcClient`__
- `RpcClient` wraps a `Peer` and correlates responses to callers via `uid`. Its internal handler is fast and non-blocking.
- Prefer `RpcClient.Call(ctx, twin, fn, data, &result)` over wiring your own response handler. Always pass a context with timeout/deadline.

- __Concurrency and ordering__
- With `Router.Serve`, each request is processed concurrently in its own goroutine. Requests to the same or different commands run concurrently; there are no ordering guarantees across requests.
- `RpcClient` handles responses without blocking the peer loop; each call blocks only the caller goroutine until its response arrives or `ctx` cancels.

- __Backpressure__
- Sending a response uses `peer.SendResponse(...)` which ultimately writes to a relay connection. Under sustained load, the write path applies backpressure; only the handler goroutine sending that response will block. Other requests continue.
- The peer employs bounded channel buffering on ingress and per-connection IO to improve burst tolerance while preserving backpressure.

### Shutdown

- Cancel the parent context you passed to `NewPeer(...)` (or `NewRpcClient(...)`) to request shutdown.
- Then call `p.Wait()` (or `rpc.Wait()`) to block until all goroutines have exited (including connection workers).

### Quick reference

- __Server__
- Build routes with `router := peer.NewRouter()` and `router.SubRoute(...).WithHandler(...)`.

- __Client__
- Use `rpc, _ := peer.NewRpcClient(ctx, mnemonics, subManager, opts...)` and call `rpc.Call(...)` with a timeout.
127 changes: 99 additions & 28 deletions rmb-sdk-go/peer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
const (
pongWait = 40 * time.Second
pingInterval = 20 * time.Second
// SummaryInterval controls how often the per-connection summary is logged.
SummaryInterval = time.Minute
// BackpressureProbeInterval is used for periodic diagnostics while blocked on channel sends.
BackpressureProbeInterval = 100 * time.Millisecond
)

var errTimeout = fmt.Errorf("connection timeout")
Expand All @@ -33,12 +37,14 @@ var (

// InnerConnection holds the required state to create a self healing websocket connection to the rmb relay.
type InnerConnection struct {
twinID uint32
session string
identity substrate.Identity
url string
writer chan send
connected int32 // 1 when loop is active with an open websocket
twinID uint32
session string
identity substrate.Identity
url string
writer chan send
connected int32 // 1 when loop is active with an open websocket
connectionsTotal int64 // total successful connections (initial + reconnects)
observer ConnObserver
}

type send struct {
Expand Down Expand Up @@ -71,11 +77,13 @@ func (r Reader) Read() []byte {
// NewConnection creates a new InnerConnection instance
func NewConnection(identity substrate.Identity, url string, session string, twinID uint32) InnerConnection {
return InnerConnection{
twinID: twinID,
identity: identity,
url: url,
session: session,
writer: make(chan send), // TODO: it should be buffered
twinID: twinID,
identity: identity,
url: url,
session: session,
writer: make(chan send, 64), // buffered to smooth short spikes from concurrent senders (bounded for backpressure)
observer: NewLogObserver(url),
connectionsTotal: 0,
}
}

Expand All @@ -93,7 +101,11 @@ func (c *InnerConnection) reader(ctx context.Context, cancel context.CancelFunc,
for {
typ, data, err := con.ReadMessage()
if err != nil {
if websocket.IsCloseError(err) || websocket.IsUnexpectedCloseError(err) || err == io.EOF {
// If we're shutting down, prefer a clean context-canceled reason over transport error noise
if ctx.Err() != nil {
exitReason = ErrContextCanceled.Error()
exitErr = ctx.Err()
} else if websocket.IsCloseError(err) || websocket.IsUnexpectedCloseError(err) || err == io.EOF {
exitReason = "close"
exitErr = err
} else {
Expand All @@ -112,12 +124,22 @@ func (c *InnerConnection) reader(ctx context.Context, cancel context.CancelFunc,
return
}

select {
case <-ctx.Done():
exitReason = ErrContextCanceled.Error()
exitErr = ctx.Err()
return
case reader <- data:
// Backpressure-aware send into reader channel: never drop.
{
delivered := false
for !delivered {
select {
case <-ctx.Done():
exitReason = ErrContextCanceled.Error()
exitErr = ctx.Err()
return
case reader <- data:
// delivered
delivered = true
case <-time.After(BackpressureProbeInterval):
c.observer.ReaderBackpressure(c.url, len(reader), cap(reader))
}
}
}
}
}
Expand Down Expand Up @@ -150,6 +172,7 @@ func (c *InnerConnection) send(ctx context.Context, data []byte) error {
func (c *InnerConnection) loop(ctx context.Context, con *websocket.Conn, output chan []byte) error {
var exitReason string
var exitErr error
var stats ConnStats

// Attempt a graceful close handshake on exit
defer func() {
Expand All @@ -158,7 +181,9 @@ func (c *InnerConnection) loop(ctx context.Context, con *websocket.Conn, output
} else {
log.Debug().Str("url", c.url).Str("reason", exitReason).Err(exitErr).Msg("relay loop exited")
}

recons := c.reconnections()
stats.Exit(c.observer, c.url, exitReason, exitErr, recons)
// Gracefully close the websocket: send a normal close frame, then close the connection.
deadline := time.Now().Add(1 * time.Second)
_ = con.WriteControl(
websocket.CloseMessage,
Expand All @@ -185,31 +210,65 @@ func (c *InnerConnection) loop(ctx context.Context, con *websocket.Conn, output
return nil
})

outputCh := make(chan []byte) // TODO: it should be buffered
defer close(outputCh)
outputCh := make(chan []byte, 1024)

atomic.AddInt64(&c.connectionsTotal, 1)

go c.reader(local, cancel, con, outputCh)

lastPong := time.Now()
summaryTicker := time.NewTicker(SummaryInterval)
defer summaryTicker.Stop()
pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop()
logSummary := func() {
stats.Summary(c.observer, c.url, c.reconnections())
}
for {
select {
case <-summaryTicker.C:
logSummary()
case <-ctx.Done():
exitReason = ErrContextCanceled.Error()
exitErr = ctx.Err()
return ctx.Err()
case <-local.Done():
exitReason = ErrLocalCanceled.Error()
exitErr = ErrLocalCanceled
return nil // error happened with the connection, return nil to try again
case data := <-outputCh:
// TODO: can we protect the loop from stalling by using a short timeout with logging
output <- data
return nil
case data, ok := <-outputCh:
if !ok {
exitReason = ErrLocalCanceled.Error()
exitErr = ErrLocalCanceled
return nil
}
delivered := false
for !delivered {
select {
case output <- data:
stats.OnDelivered()
delivered = true
case <-time.After(BackpressureProbeInterval):
c.observer.OutputBackpressure(c.url, len(output), cap(output), len(outputCh), cap(outputCh), len(c.writer), cap(c.writer))
case <-ctx.Done():
exitReason = ErrContextCanceled.Error()
exitErr = ctx.Err()
return ctx.Err()
}
}
lastPong = time.Now()
case <-ctx.Done():
exitReason = ErrContextCanceled.Error()
exitErr = ctx.Err()
return ctx.Err()
case sent := <-c.writer:
// Write the message to the websocket transport.
writeStart := time.Now()
err := con.WriteMessage(websocket.BinaryMessage, sent.data)
writeDur := time.Since(writeStart)
stats.OnWriteResult(writeDur, err)
// Spike diagnostics: warn (sampled) if write is slow or writer queue is saturated
c.observer.MaybeWriteSpike(c.url, writeDur, len(c.writer), cap(c.writer), len(output), cap(output), len(outputCh), cap(outputCh))
// Try to notify the sender about the write result. If notification fails,
// log and continue; it's not a transport failure.
if replyErr := sent.reply(ctx, err); replyErr != nil {
log.Warn().Err(replyErr).Msg("failed to deliver write result to sender")
}
Expand Down Expand Up @@ -237,11 +296,23 @@ func (c *InnerConnection) loop(ctx context.Context, con *websocket.Conn, output
}
}

// reconnections returns the number of reconnections that happened after the initial successful connection.
// It is computed as max(connectionsTotal-1, 0).
func (c *InnerConnection) reconnections() int64 {
total := atomic.LoadInt64(&c.connectionsTotal)
if total <= 1 {
return 0
}
return total - 1
}

// Start initiates the websocket connection
func (c *InnerConnection) Start(ctx context.Context, output chan []byte, wg *sync.WaitGroup) {
if wg != nil {
wg.Add(1)
}

// Start initiates the websocket connection.
go func() {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -315,7 +386,7 @@ func (c *InnerConnection) connect() (*websocket.Conn, error) {
log.Debug().Str("url", c.url).Msg("connecting")

dialer := websocket.Dialer{
HandshakeTimeout: 5 * time.Second,
HandshakeTimeout: 10 * time.Second,
Proxy: http.ProxyFromEnvironment,
}

Expand Down
97 changes: 97 additions & 0 deletions rmb-sdk-go/peer/connection_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package peer

import (
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

const (
// WarnSamplePeriod controls burst sampling window for warn-level logs.
WarnSamplePeriod = 5 * time.Second
// WriteSpikeWarnThreshold defines the write duration threshold to warn for spikes.
WriteSpikeWarnThreshold = 100 * time.Millisecond
// WriterQueueWarnPercent defines when to warn on writer queue usage percentage.
WriterQueueWarnPercent = 75
)

// NewLogObserver builds a zerolog-based observer with the approved sampling.
func NewLogObserver(url string) ConnObserver {
// reader/output backpressure: 5s after first
readerSampler := zerolog.LevelSampler{WarnSampler: &zerolog.BurstSampler{Burst: 1, Period: WarnSamplePeriod}}
bpReaderLog := log.With().Str("url", url).Logger().Sample(&readerSampler)

outputSampler := zerolog.LevelSampler{WarnSampler: &zerolog.BurstSampler{Burst: 1, Period: WarnSamplePeriod}}
bpOutLog := log.With().Str("url", url).Logger().Sample(&outputSampler)

// write spike: 5s after first
spikeSampler := zerolog.LevelSampler{WarnSampler: &zerolog.BurstSampler{Burst: 1, Period: WarnSamplePeriod}}
spikeLog := log.With().Str("url", url).Logger().Sample(&spikeSampler)

return &logObserver{
url: url,
bpReaderLog: bpReaderLog,
bpOutLog: bpOutLog,
spikeLog: spikeLog,
}
}

type logObserver struct {
url string
bpReaderLog zerolog.Logger
bpOutLog zerolog.Logger
spikeLog zerolog.Logger
}

func (o *logObserver) ReaderBackpressure(_ string, readerLen, readerCap int) {
o.bpReaderLog.Warn().
Int("reader_len", readerLen).Int("reader_cap", readerCap).
Msg("reader channel backpressure (blocking)")
}

func (o *logObserver) OutputBackpressure(_ string, outputLen, outputCap, outputChLen, outputChCap, writerLen, writerCap int) {
o.bpOutLog.Warn().
Int("output_len", outputLen).Int("output_cap", outputCap).
Int("outputCh_len", outputChLen).Int("outputCh_cap", outputChCap).
Int("writer_len", writerLen).Int("writer_cap", writerCap).
Msg("output channel backpressure (blocking)")
}

func (o *logObserver) MaybeWriteSpike(_ string, writeDur time.Duration, writerLen, writerCap, outputLen, outputCap, outputChLen, outputChCap int) {
// Thresholds preserved from previous behavior: duration > 100ms or writer queue > 75% full.
if writeDur > WriteSpikeWarnThreshold || (writerCap > 0 && writerLen*100 > writerCap*WriterQueueWarnPercent) {
o.spikeLog.Warn().
Dur("write_dur", writeDur).
Int("writer_len", writerLen).Int("writer_cap", writerCap).
Int("output_len", outputLen).Int("output_cap", outputCap).
Int("outputCh_len", outputChLen).Int("outputCh_cap", outputChCap).
Msg("write spike / queue saturation")
}
}

func (o *logObserver) Summary(url string, reads, writes, writeErrors int64, avg, max time.Duration, reconnections int64) {
log.Debug().
Str("url", url).
Int64("reads", reads).
Int64("writes", writes).
Int64("write_errors", writeErrors).
Dur("avg", avg).
Dur("max", max).
Int64("reconnections", reconnections).
Msg("relay loop summary")
}

func (o *logObserver) Exit(url, reason string, err error, reads, writes, writeErrors int64, avg, max time.Duration, reconnections int64) {
log.Debug().
Str("url", url).
Str("reason", reason).
Err(err).
Int64("reads", reads).
Int64("writes", writes).
Int64("write_errors", writeErrors).
Dur("avg", avg).
Dur("max", max).
Int64("reconnections", reconnections).
Msg("relay loop exited")
}
41 changes: 41 additions & 0 deletions rmb-sdk-go/peer/connection_observe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package peer

import (
"time"
)

// ConnObserver abstracts logging/observability from the transport logic.
// Implementations should be lightweight and non-blocking. All methods are best-effort.
// A no-op implementation is provided by default.

type ConnObserver interface {
// ReaderBackpressure is called when forwarding from the websocket reader to the internal reader channel blocks.
ReaderBackpressure(url string, readerLen, readerCap int)

// OutputBackpressure is called when forwarding from the internal outputCh to the peer-wide output blocks.
OutputBackpressure(url string, outputLen, outputCap, outputChLen, outputChCap, writerLen, writerCap int)

// MaybeWriteSpike receives each write result and may emit a spike warning based on internal thresholds.
// Implementations decide whether to log (e.g., duration > 100ms or writer queue >75%).
MaybeWriteSpike(url string, writeDur time.Duration, writerLen, writerCap, outputLen, outputCap, outputChLen, outputChCap int)

// Summary logs the per-connection periodic summary.
Summary(url string, reads, writes, writeErrors int64, avg, max time.Duration, reconnections int64)

// Exit logs a per-connection exit summary.
Exit(url, reason string, err error, reads, writes, writeErrors int64, avg, max time.Duration, reconnections int64)
}

// A no-op implementation
type noopObserver struct{}

func (noopObserver) ReaderBackpressure(string, int, int) {}
func (noopObserver) OutputBackpressure(string, int, int, int, int, int, int) {}
func (noopObserver) MaybeWriteSpike(string, time.Duration, int, int, int, int, int, int) {}
func (noopObserver) Summary(string, int64, int64, int64, time.Duration, time.Duration, int64) {}
func (noopObserver) Exit(string, string, error, int64, int64, int64, time.Duration, time.Duration, int64) {
}

// Ensure noopObserver satisfies ConnObserver to avoid unused warnings
// TODO: Currently NewLogObserver() is what NewConnection() currently sets by default; We meed to support swap implementations.
var _ ConnObserver = (*noopObserver)(nil)
Loading
Loading