Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
# Release notes 1.1.3
# Release notes 1.2.0

## What's New

This release contains substantial revisions to the SDK flow control feature first released in v1.1.0.
See the v1.1.0 release notes for more details.

It has now received a substantial amount of testing including long running tests and backwards compability testing.

These features should be used with version 1.6.6 or newer of OpenZiti.

It is still considered experimental, and the feature and APIs may still change, however Go SDK
users who are multi-plexing connections, are encouraged to try it out.

Once it has undergone sufficient soak time in a production environment, it will marked as stable.

## Issues Fixed and Dependency Updates

* github.com/openziti/sdk-golang: [v1.1.2 -> v1.1.3](https://github.com/openziti/sdk-golang/compare/v1.1.2...v1.1.3)
* github.com/openziti/sdk-golang: [v1.1.2 -> v1.2.0](https://github.com/openziti/sdk-golang/compare/v1.1.2...v1.2.0)
* [Issue #765](https://github.com/openziti/sdk-golang/issues/765) - Allow independent close of xgress send and receive
* [Issue #763](https://github.com/openziti/sdk-golang/issues/763) - Use a go-routine pool for payload ingest
* [Issue #761](https://github.com/openziti/sdk-golang/issues/761) - Use cmap.ConcurrentMap for message multiplexer
* [Issue #754](https://github.com/openziti/sdk-golang/issues/754) - panic: unaligned 64-bit atomic operation when running on 32-bit raspberry pi
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1
1.2
78 changes: 66 additions & 12 deletions xgress/link_send_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package xgress

import (
"context"
"github.com/michaelquigley/pfxlog"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"math"
"os"
"slices"
"sync/atomic"
"time"
Expand Down Expand Up @@ -52,6 +53,7 @@ type LinkSendBuffer struct {
closeWhenEmpty atomic.Bool
inspectRequests chan *sendBufferInspectEvent
blockedSince time.Time
closeStart time.Time
}

type txPayload struct {
Expand Down Expand Up @@ -111,22 +113,39 @@ func NewLinkSendBuffer(x *Xgress) *LinkSendBuffer {
inspectRequests: make(chan *sendBufferInspectEvent, 1),
}

go buffer.run()
return buffer
}

func (buffer *LinkSendBuffer) CloseWhenEmpty() bool {
pfxlog.ContextLogger(buffer.x.Label()).Debug("close when empty")
return buffer.closeWhenEmpty.CompareAndSwap(false, true)
}

func (buffer *LinkSendBuffer) BufferPayload(payload *Payload) (func(), error) {
txPayload := &txPayload{payload: payload, age: math.MaxInt64, x: buffer.x}

select {
case buffer.newlyBuffered <- txPayload:
pfxlog.ContextLogger(buffer.x.Label()).Debugf("buffered [%d]", payload.GetSequence())
return txPayload.markSent, nil
case <-buffer.closeNotify:
return nil, errors.Errorf("payload buffer closed")
return nil, ErrWriteClosed
}
}

func (buffer *LinkSendBuffer) BufferPayloadWithDeadline(payload *Payload, ctx context.Context) (func(), error) {
txPayload := &txPayload{payload: payload, age: math.MaxInt64, x: buffer.x}

for {
select {
case <-ctx.Done():
return nil, os.ErrDeadlineExceeded
case buffer.newlyBuffered <- txPayload:
pfxlog.ContextLogger(buffer.x.Label()).Debugf("buffered [%d]", payload.GetSequence())
return txPayload.markSent, nil
case <-buffer.closeNotify:
return nil, ErrWriteClosed
}
}
}

Expand All @@ -151,10 +170,15 @@ func (buffer *LinkSendBuffer) metrics() Metrics {
}

func (buffer *LinkSendBuffer) Close() {
pfxlog.ContextLogger(buffer.x.Label()).Debugf("[%p] closing", buffer)
if buffer.closed.CompareAndSwap(false, true) {
pfxlog.ContextLogger(buffer.x.Label()).Debugf("[%p] closing", buffer)
close(buffer.closeNotify)
}
buffer.x.closeIfRxAndTxDone()
}

func (buffer *LinkSendBuffer) IsClosed() bool {
return buffer.closed.Load()
}

func (buffer *LinkSendBuffer) isBlocked() bool {
Expand Down Expand Up @@ -211,7 +235,7 @@ func (buffer *LinkSendBuffer) run() {
case ack := <-buffer.newlyReceivedAcks:
buffer.receiveAcknowledgement(ack)
case <-buffer.closeNotify:
buffer.close()
buffer.cleanupMetrics()
return
default:
}
Expand All @@ -232,7 +256,7 @@ func (buffer *LinkSendBuffer) run() {
log.Tracef("buffering payload %v with size %v. payload buffer size: %v",
txPayload.payload.Sequence, len(txPayload.payload.Data), buffer.linkSendBufferSize)
case <-buffer.closeNotify:
buffer.close()
buffer.cleanupMetrics()
return
default:
}
Expand All @@ -245,9 +269,7 @@ func (buffer *LinkSendBuffer) run() {
case ack := <-buffer.newlyReceivedAcks:
buffer.receiveAcknowledgement(ack)
buffer.retransmit()
if buffer.closeWhenEmpty.Load() && len(buffer.buffer) == 0 && !buffer.x.Closed() && buffer.x.IsEndOfCircuitSent() {
go buffer.x.Close()
}
buffer.checkForClose()

case txPayload := <-buffered:
buffer.buffer[txPayload.payload.GetSequence()] = txPayload
Expand All @@ -259,15 +281,46 @@ func (buffer *LinkSendBuffer) run() {

case <-retransmitTicker.C:
buffer.retransmit()
buffer.checkForClose()

case <-buffer.closeNotify:
buffer.close()
buffer.cleanupMetrics()
if len(buffer.buffer) > 0 {
isCircuitEnd := false
if len(buffer.buffer) == 1 {
for _, p := range buffer.buffer {
isCircuitEnd = p.payload.IsCircuitEndFlagSet() || p.payload.IsFlagEOFSet()
}
}
if !isCircuitEnd {
log.WithField("payloadCount", len(buffer.buffer)).Warn("closing while buffer contains unacked payloads")
}
}
return
}
}
}

func (buffer *LinkSendBuffer) close() {
func (buffer *LinkSendBuffer) checkForClose() {
if buffer.closeWhenEmpty.Load() {
if buffer.closeStart.IsZero() {
buffer.closeStart = time.Now()
}
closeDuration := time.Since(buffer.closeStart)

if (len(buffer.buffer) == 0 && closeDuration > 5*time.Second) || closeDuration > buffer.x.Options.MaxCloseWait {
buffer.Close()
} else if len(buffer.buffer) == 1 && closeDuration > 5*time.Second {
for _, p := range buffer.buffer {
if p.payload.IsCircuitEndFlagSet() || p.payload.IsFlagEOFSet() {
buffer.Close()
}
}
}
}
}

func (buffer *LinkSendBuffer) cleanupMetrics() {
if buffer.blockedByLocalWindow {
buffer.metrics().BufferUnblockedByLocalWindow()
}
Expand Down Expand Up @@ -358,7 +411,7 @@ func (buffer *LinkSendBuffer) retransmit() {
}

if retransmitted > 0 {
log.Debugf("retransmitted [%d] payloads, [%d] buffered, linkSendBufferSize: %d", retransmitted, len(buffer.buffer), buffer.linkSendBufferSize)
log.WithField("circuitId", buffer.x.circuitId).Debugf("retransmitted [%d] payloads, [%d] buffered, linkSendBufferSize: %d", retransmitted, len(buffer.buffer), buffer.linkSendBufferSize)
}
buffer.lastRetransmitTime = now
}
Expand All @@ -379,6 +432,7 @@ func (buffer *LinkSendBuffer) inspect() *SendBufferDetail {
timeSinceLastRetransmit := time.Duration(time.Now().UnixMilli()-buffer.lastRetransmitTime) * time.Millisecond
result := &SendBufferDetail{
WindowSize: buffer.windowsSize,
QueuedPayloadCount: len(buffer.buffer),
LinkSendBufferSize: buffer.linkSendBufferSize,
LinkRecvBufferSize: buffer.linkRecvBufferSize,
Accumulator: buffer.accumulator,
Expand Down
10 changes: 10 additions & 0 deletions xgress/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ const (
PayloadFlagCircuitStart Flag = 4
PayloadFlagChunk Flag = 8
PayloadFlagRetransmit Flag = 16
PayloadFlagEOF Flag = 32
PayloadFlagWriteFailed Flag = 64
)

func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement {
Expand Down Expand Up @@ -308,6 +310,14 @@ func (payload *Payload) IsCircuitEndFlagSet() bool {
return isFlagSet(payload.Flags, PayloadFlagCircuitEnd)
}

func (payload *Payload) IsFlagEOFSet() bool {
return isFlagSet(payload.Flags, PayloadFlagEOF)
}

func (payload *Payload) IsFlagWriteFailedSet() bool {
return isFlagSet(payload.Flags, PayloadFlagWriteFailed)
}

func (payload *Payload) IsCircuitStartFlagSet() bool {
return isFlagSet(payload.Flags, PayloadFlagCircuitStart)
}
Expand Down
3 changes: 2 additions & 1 deletion xgress/minimal_payload_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package xgress

import (
"context"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -177,7 +178,7 @@ func (self *testIntermediary) ForwardAcknowledgement(ack *Acknowledgement, addre
self.acker.SendAck(ack, address)
}

func (self *testIntermediary) ForwardPayload(payload *Payload, x *Xgress) {
func (self *testIntermediary) ForwardPayload(payload *Payload, x *Xgress, ctx context.Context) {
m := payload.Marshall()
self.payloadTransformer.Tx(m, nil)
b, err := self.msgs.GetMarshaller()(m)
Expand Down
3 changes: 2 additions & 1 deletion xgress/ordering_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package xgress

import (
"context"
"encoding/binary"
"github.com/openziti/channel/v4"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (n noopReceiveHandler) GetPayloadIngester() *PayloadIngester {

func (n noopReceiveHandler) ForwardAcknowledgement(*Acknowledgement, Address) {}

func (n noopReceiveHandler) ForwardPayload(*Payload, *Xgress) {}
func (n noopReceiveHandler) ForwardPayload(*Payload, *Xgress, context.Context) {}

func (n noopReceiveHandler) ForwardControlMessage(*Control, *Xgress) {}

Expand Down
Loading
Loading