Skip to content

Commit 7446373

Browse files
GODRIVER-3363 Detect and discard closed idle connections. (#1815) [master] (#1842)
Co-authored-by: Steven Silvester <[email protected]>
1 parent 6057c7d commit 7446373

File tree

7 files changed

+358
-163
lines changed

7 files changed

+358
-163
lines changed

x/mongo/driver/topology/connection.go

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"fmt"
1515
"io"
1616
"net"
17+
"os"
1718
"strings"
1819
"sync"
1920
"sync/atomic"
@@ -56,7 +57,7 @@ type connection struct {
5657
nc net.Conn // When nil, the connection is closed.
5758
addr address.Address
5859
idleTimeout time.Duration
59-
idleDeadline atomic.Value // Stores a time.Time
60+
idleStart atomic.Value // Stores a time.Time
6061
desc description.Server
6162
helloRTT time.Duration
6263
compressor wiremessage.CompressorID
@@ -520,25 +521,65 @@ func (c *connection) close() error {
520521
return err
521522
}
522523

524+
// closed returns true if the connection has been closed by the driver.
523525
func (c *connection) closed() bool {
524526
return atomic.LoadInt64(&c.state) == connDisconnected
525527
}
526528

529+
// isAlive returns true if the connection is alive and ready to be used for an
530+
// operation.
531+
//
532+
// Note that the liveness check can be slow (at least 1ms), so isAlive only
533+
// checks the liveness of the connection if it's been idle for at least 10
534+
// seconds. For frequently in-use connections, a network error during an
535+
// operation will be the first indication of a dead connection.
536+
func (c *connection) isAlive() bool {
537+
if c.nc == nil {
538+
return false
539+
}
540+
541+
// If the connection has been idle for less than 10 seconds, skip the
542+
// liveness check.
543+
//
544+
// The 10-seconds idle bypass is based on the liveness check implementation
545+
// in the Python Driver. That implementation uses 1 second as the idle
546+
// threshold, but we chose to be more conservative in the Go Driver because
547+
// this is new behavior with unknown side-effects. See
548+
// https://github.com/mongodb/mongo-python-driver/blob/e6b95f65953e01e435004af069a6976473eaf841/pymongo/synchronous/pool.py#L983-L985
549+
idleStart, ok := c.idleStart.Load().(time.Time)
550+
if !ok || idleStart.Add(10*time.Second).After(time.Now()) {
551+
return true
552+
}
553+
554+
// Set a 1ms read deadline and attempt to read 1 byte from the connection.
555+
// Expect it to block for 1ms then return a deadline exceeded error. If it
556+
// returns any other error, the connection is not usable, so return false.
557+
// If it doesn't return an error and actually reads data, the connection is
558+
// also not usable, so return false.
559+
//
560+
// Note that we don't need to un-set the read deadline because the "read"
561+
// and "write" methods always reset the deadlines.
562+
err := c.nc.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
563+
if err != nil {
564+
return false
565+
}
566+
var b [1]byte
567+
_, err = c.nc.Read(b[:])
568+
return errors.Is(err, os.ErrDeadlineExceeded)
569+
}
570+
527571
func (c *connection) idleTimeoutExpired() bool {
528-
now := time.Now()
529-
if c.idleTimeout > 0 {
530-
idleDeadline, ok := c.idleDeadline.Load().(time.Time)
531-
if ok && now.After(idleDeadline) {
532-
return true
533-
}
572+
if c.idleTimeout == 0 {
573+
return false
534574
}
535575

536-
return false
576+
idleStart, ok := c.idleStart.Load().(time.Time)
577+
return ok && idleStart.Add(c.idleTimeout).Before(time.Now())
537578
}
538579

539-
func (c *connection) bumpIdleDeadline() {
580+
func (c *connection) bumpIdleStart() {
540581
if c.idleTimeout > 0 {
541-
c.idleDeadline.Store(time.Now().Add(c.idleTimeout))
582+
c.idleStart.Store(time.Now())
542583
}
543584
}
544585

x/mongo/driver/topology/connection_test.go

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/google/go-cmp/cmp"
2121
"go.mongodb.org/mongo-driver/v2/internal/assert"
22+
"go.mongodb.org/mongo-driver/v2/internal/require"
2223
"go.mongodb.org/mongo-driver/v2/mongo/address"
2324
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
2425
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
@@ -278,7 +279,7 @@ func TestConnection(t *testing.T) {
278279

279280
want := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A}
280281
err := conn.writeWireMessage(context.Background(), want)
281-
noerr(t, err)
282+
require.NoError(t, err)
282283
got := tnc.buf
283284
if !cmp.Equal(got, want) {
284285
t.Errorf("writeWireMessage did not write the proper bytes. got %v; want %v", got, want)
@@ -471,7 +472,7 @@ func TestConnection(t *testing.T) {
471472
conn.cancellationListener = listener
472473

473474
got, err := conn.readWireMessage(context.Background())
474-
noerr(t, err)
475+
require.NoError(t, err)
475476
if !cmp.Equal(got, want) {
476477
t.Errorf("did not read full wire message. got %v; want %v", got, want)
477478
}
@@ -1099,3 +1100,85 @@ func (tcl *testCancellationListener) assertCalledOnce(t *testing.T) {
10991100
assert.Equal(t, 1, tcl.numListen, "expected Listen to be called once, got %d", tcl.numListen)
11001101
assert.Equal(t, 1, tcl.numStopListening, "expected StopListening to be called once, got %d", tcl.numListen)
11011102
}
1103+
1104+
func TestConnection_IsAlive(t *testing.T) {
1105+
t.Parallel()
1106+
1107+
t.Run("uninitialized", func(t *testing.T) {
1108+
t.Parallel()
1109+
1110+
conn := newConnection("")
1111+
assert.False(t,
1112+
conn.isAlive(),
1113+
"expected isAlive for an uninitialized connection to always return false")
1114+
})
1115+
1116+
t.Run("connection open", func(t *testing.T) {
1117+
t.Parallel()
1118+
1119+
cleanup := make(chan struct{})
1120+
defer close(cleanup)
1121+
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
1122+
// Keep the connection open until the end of the test.
1123+
<-cleanup
1124+
_ = nc.Close()
1125+
})
1126+
1127+
conn := newConnection(address.Address(addr.String()))
1128+
err := conn.connect(context.Background())
1129+
require.NoError(t, err)
1130+
1131+
conn.idleStart.Store(time.Now().Add(-11 * time.Second))
1132+
assert.True(t,
1133+
conn.isAlive(),
1134+
"expected isAlive for an open connection to return true")
1135+
})
1136+
1137+
t.Run("connection closed", func(t *testing.T) {
1138+
t.Parallel()
1139+
1140+
conns := make(chan net.Conn)
1141+
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
1142+
conns <- nc
1143+
})
1144+
1145+
conn := newConnection(address.Address(addr.String()))
1146+
err := conn.connect(context.Background())
1147+
require.NoError(t, err)
1148+
1149+
// Close the connection before calling isAlive.
1150+
nc := <-conns
1151+
err = nc.Close()
1152+
require.NoError(t, err)
1153+
1154+
conn.idleStart.Store(time.Now().Add(-11 * time.Second))
1155+
assert.False(t,
1156+
conn.isAlive(),
1157+
"expected isAlive for a closed connection to return false")
1158+
})
1159+
1160+
t.Run("connection reads data", func(t *testing.T) {
1161+
t.Parallel()
1162+
1163+
cleanup := make(chan struct{})
1164+
defer close(cleanup)
1165+
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
1166+
// Write some data to the connection before calling isAlive.
1167+
_, err := nc.Write([]byte{5, 0, 0, 0, 0})
1168+
require.NoError(t, err)
1169+
1170+
// Keep the connection open until the end of the test.
1171+
<-cleanup
1172+
_ = nc.Close()
1173+
})
1174+
1175+
conn := newConnection(address.Address(addr.String()))
1176+
err := conn.connect(context.Background())
1177+
require.NoError(t, err)
1178+
1179+
conn.idleStart.Store(time.Now().Add(-11 * time.Second))
1180+
assert.False(t,
1181+
conn.isAlive(),
1182+
"expected isAlive for an open connection that reads data to return false")
1183+
})
1184+
}

x/mongo/driver/topology/pool.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,11 @@ type reason struct {
169169
// connectionPerished checks if a given connection is perished and should be removed from the pool.
170170
func connectionPerished(conn *connection) (reason, bool) {
171171
switch {
172-
case conn.closed():
173-
// A connection would only be closed if it encountered a network error during an operation and closed itself.
172+
case conn.closed() || !conn.isAlive():
173+
// A connection would only be closed if it encountered a network error
174+
// during an operation and closed itself. If a connection is not alive
175+
// (e.g. the connection was closed by the server-side), it's also
176+
// considered a network error.
174177
return reason{
175178
loggerConn: logger.ReasonConnClosedError,
176179
event: event.ReasonError,
@@ -900,13 +903,15 @@ func (p *pool) checkInNoEvent(conn *connection) error {
900903
return nil
901904
}
902905

903-
// Bump the connection idle deadline here because we're about to make the connection "available".
904-
// The idle deadline is used to determine when a connection has reached its max idle time and
905-
// should be closed. A connection reaches its max idle time when it has been "available" in the
906-
// idle connections stack for more than the configured duration (maxIdleTimeMS). Set it before
907-
// we call connectionPerished(), which checks the idle deadline, because a newly "available"
908-
// connection should never be perished due to max idle time.
909-
conn.bumpIdleDeadline()
906+
// Bump the connection idle start time here because we're about to make the
907+
// connection "available". The idle start time is used to determine how long
908+
// a connection has been idle and when it has reached its max idle time and
909+
// should be closed. A connection reaches its max idle time when it has been
910+
// "available" in the idle connections stack for more than the configured
911+
// duration (maxIdleTimeMS). Set it before we call connectionPerished(),
912+
// which checks the idle deadline, because a newly "available" connection
913+
// should never be perished due to max idle time.
914+
conn.bumpIdleStart()
910915

911916
r, perished := connectionPerished(conn)
912917
if !perished && conn.pool.getState() == poolClosed {

0 commit comments

Comments
 (0)