-
Notifications
You must be signed in to change notification settings - Fork 4
RMB-Peer: Enable Observability & Better throughput and resilience under load #1413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
sameh-farouk
wants to merge
5
commits into
development-clean-shutdown-improvements
from
handle-back-pressure
Closed
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
fe5da5b
peer(connection): backpressure + observability improvements
sameh-farouk f02c9e1
format: fix gofmt
sameh-farouk 1d9bb25
format: fix gofmt
sameh-farouk 463cfd1
fix unused noop observer
sameh-farouk c350a4b
refactor: extract connection constants and minor readability refactor
sameh-farouk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package peer | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/rs/zerolog" | ||
"github.com/rs/zerolog/log" | ||
) | ||
|
||
const ( | ||
// WarnSamplePeriod controls burst sampling window for warn-level logs. | ||
WarnSamplePeriod = 5 * time.Second | ||
// WriteSpikeWarnThreshold defines the write duration threshold to warn for spikes. | ||
WriteSpikeWarnThreshold = 100 * time.Millisecond | ||
// WriterQueueWarnPercent defines when to warn on writer queue usage percentage. | ||
WriterQueueWarnPercent = 75 | ||
) | ||
|
||
// NewLogObserver builds a zerolog-based observer with the approved sampling. | ||
func NewLogObserver(url string) ConnObserver { | ||
// reader/output backpressure: 5s after first | ||
readerSampler := zerolog.LevelSampler{WarnSampler: &zerolog.BurstSampler{Burst: 1, Period: WarnSamplePeriod}} | ||
bpReaderLog := log.With().Str("url", url).Logger().Sample(&readerSampler) | ||
|
||
outputSampler := zerolog.LevelSampler{WarnSampler: &zerolog.BurstSampler{Burst: 1, Period: WarnSamplePeriod}} | ||
bpOutLog := log.With().Str("url", url).Logger().Sample(&outputSampler) | ||
|
||
// write spike: 5s after first | ||
spikeSampler := zerolog.LevelSampler{WarnSampler: &zerolog.BurstSampler{Burst: 1, Period: WarnSamplePeriod}} | ||
spikeLog := log.With().Str("url", url).Logger().Sample(&spikeSampler) | ||
|
||
return &logObserver{ | ||
rawdaGastan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
url: url, | ||
bpReaderLog: bpReaderLog, | ||
bpOutLog: bpOutLog, | ||
spikeLog: spikeLog, | ||
} | ||
} | ||
|
||
type logObserver struct { | ||
url string | ||
bpReaderLog zerolog.Logger | ||
bpOutLog zerolog.Logger | ||
spikeLog zerolog.Logger | ||
} | ||
|
||
func (o *logObserver) ReaderBackpressure(_ string, readerLen, readerCap int) { | ||
o.bpReaderLog.Warn(). | ||
Int("reader_len", readerLen).Int("reader_cap", readerCap). | ||
Msg("reader channel backpressure (blocking)") | ||
} | ||
|
||
func (o *logObserver) OutputBackpressure(_ string, outputLen, outputCap, outputChLen, outputChCap, writerLen, writerCap int) { | ||
o.bpOutLog.Warn(). | ||
Int("output_len", outputLen).Int("output_cap", outputCap). | ||
Int("outputCh_len", outputChLen).Int("outputCh_cap", outputChCap). | ||
Int("writer_len", writerLen).Int("writer_cap", writerCap). | ||
Msg("output channel backpressure (blocking)") | ||
} | ||
|
||
func (o *logObserver) MaybeWriteSpike(_ string, writeDur time.Duration, writerLen, writerCap, outputLen, outputCap, outputChLen, outputChCap int) { | ||
// Thresholds preserved from previous behavior: duration > 100ms or writer queue > 75% full. | ||
if writeDur > WriteSpikeWarnThreshold || (writerCap > 0 && writerLen*100 > writerCap*WriterQueueWarnPercent) { | ||
o.spikeLog.Warn(). | ||
Dur("write_dur", writeDur). | ||
Int("writer_len", writerLen).Int("writer_cap", writerCap). | ||
Int("output_len", outputLen).Int("output_cap", outputCap). | ||
Int("outputCh_len", outputChLen).Int("outputCh_cap", outputChCap). | ||
Msg("write spike / queue saturation") | ||
} | ||
} | ||
|
||
func (o *logObserver) Summary(url string, reads, writes, writeErrors int64, avg, max time.Duration, reconnections int64) { | ||
log.Debug(). | ||
Str("url", url). | ||
Int64("reads", reads). | ||
Int64("writes", writes). | ||
Int64("write_errors", writeErrors). | ||
Dur("avg", avg). | ||
Dur("max", max). | ||
Int64("reconnections", reconnections). | ||
Msg("relay loop summary") | ||
} | ||
|
||
func (o *logObserver) Exit(url, reason string, err error, reads, writes, writeErrors int64, avg, max time.Duration, reconnections int64) { | ||
log.Debug(). | ||
Str("url", url). | ||
Str("reason", reason). | ||
Err(err). | ||
Int64("reads", reads). | ||
Int64("writes", writes). | ||
Int64("write_errors", writeErrors). | ||
Dur("avg", avg). | ||
Dur("max", max). | ||
Int64("reconnections", reconnections). | ||
Msg("relay loop exited") | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package peer | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
// ConnObserver abstracts logging/observability from the transport logic. | ||
// Implementations should be lightweight and non-blocking. All methods are best-effort. | ||
// A no-op implementation is provided by default. | ||
|
||
type ConnObserver interface { | ||
// ReaderBackpressure is called when forwarding from the websocket reader to the internal reader channel blocks. | ||
ReaderBackpressure(url string, readerLen, readerCap int) | ||
rawdaGastan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// OutputBackpressure is called when forwarding from the internal outputCh to the peer-wide output blocks. | ||
OutputBackpressure(url string, outputLen, outputCap, outputChLen, outputChCap, writerLen, writerCap int) | ||
|
||
// MaybeWriteSpike receives each write result and may emit a spike warning based on internal thresholds. | ||
// Implementations decide whether to log (e.g., duration > 100ms or writer queue >75%). | ||
MaybeWriteSpike(url string, writeDur time.Duration, writerLen, writerCap, outputLen, outputCap, outputChLen, outputChCap int) | ||
|
||
// Summary logs the per-connection periodic summary. | ||
Summary(url string, reads, writes, writeErrors int64, avg, max time.Duration, reconnections int64) | ||
|
||
// Exit logs a per-connection exit summary. | ||
Exit(url, reason string, err error, reads, writes, writeErrors int64, avg, max time.Duration, reconnections int64) | ||
} | ||
|
||
// A no-op implementation | ||
type noopObserver struct{} | ||
|
||
func (noopObserver) ReaderBackpressure(string, int, int) {} | ||
func (noopObserver) OutputBackpressure(string, int, int, int, int, int, int) {} | ||
func (noopObserver) MaybeWriteSpike(string, time.Duration, int, int, int, int, int, int) {} | ||
func (noopObserver) Summary(string, int64, int64, int64, time.Duration, time.Duration, int64) {} | ||
func (noopObserver) Exit(string, string, error, int64, int64, int64, time.Duration, time.Duration, int64) { | ||
} | ||
|
||
// Ensure noopObserver satisfies ConnObserver to avoid unused warnings | ||
// TODO: Currently NewLogObserver() is what NewConnection() currently sets by default; We meed to support swap implementations. | ||
var _ ConnObserver = (*noopObserver)(nil) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.