Skip to content

Commit 07ad953

Browse files
committed
complete WAL implementation
Signed-off-by: Mohammed Al Sahaf <[email protected]>
1 parent 030ade0 commit 07ad953

File tree

2 files changed

+273
-201
lines changed

2 files changed

+273
-201
lines changed

modules/logging/netwriter.go

Lines changed: 112 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,15 @@ type NetWriter struct {
5858
// Buffer size for the WAL flush channel.
5959
BufferSize int `json:"buffer_size,omitempty"`
6060

61-
logger *slog.Logger
62-
addr caddy.NetworkAddress
63-
wal *wal.WAL
64-
walDir string
65-
flushCtx context.Context
66-
flushCtxCancel context.CancelFunc
67-
flushWg sync.WaitGroup
68-
lastProcessedChunk uint32
69-
mu sync.RWMutex
61+
logger *slog.Logger
62+
addr caddy.NetworkAddress
63+
wal *wal.WAL
64+
walDir string
65+
flushCtx context.Context
66+
flushCtxCancel context.CancelFunc
67+
flushWg sync.WaitGroup
68+
lastProcessedOffset int64
69+
mu sync.RWMutex
7070
}
7171

7272
// CaddyModule returns the Caddy module information.
@@ -126,7 +126,9 @@ func (nw *NetWriter) WriterKey() string {
126126
// OpenWriter opens a new network connection and sets up the WAL.
127127
func (nw *NetWriter) OpenWriter() (io.WriteCloser, error) {
128128
// Set up WAL directory
129-
nw.walDir = filepath.Join(caddy.AppDataDir(), "wal", "netwriter", nw.addr.String())
129+
baseDir := caddy.AppDataDir()
130+
131+
nw.walDir = filepath.Join(baseDir, "wal", "netwriter", nw.addr.String())
130132
if err := os.MkdirAll(nw.walDir, 0o755); err != nil {
131133
return nil, fmt.Errorf("failed to create WAL directory: %v", err)
132134
}
@@ -141,8 +143,17 @@ func (nw *NetWriter) OpenWriter() (io.WriteCloser, error) {
141143
}
142144
nw.wal = w
143145

144-
// Load last processed chunk position from metadata file if it exists
145-
nw.loadLastProcessedChunk()
146+
// Load last processed offset from metadata file if it exists
147+
nw.loadLastProcessedOffset()
148+
149+
// If SoftStart is disabled, test the connection immediately
150+
if !nw.SoftStart {
151+
testConn, err := net.DialTimeout(nw.addr.Network, nw.addr.JoinHostPort(0), time.Duration(nw.DialTimeout))
152+
if err != nil {
153+
return nil, fmt.Errorf("failed to connect to log destination (SoftStart disabled): %v", err)
154+
}
155+
testConn.Close()
156+
}
146157

147158
// Create the writer wrapper
148159
writer := &netWriterConn{
@@ -157,41 +168,50 @@ func (nw *NetWriter) OpenWriter() (io.WriteCloser, error) {
157168
return writer, nil
158169
}
159170

160-
// loadLastProcessedChunk loads the last processed chunk position from a metadata file
161-
func (nw *NetWriter) loadLastProcessedChunk() {
171+
// loadLastProcessedOffset loads the last processed offset from a metadata file
172+
func (nw *NetWriter) loadLastProcessedOffset() {
162173
metaFile := filepath.Join(nw.walDir, "last_processed")
163174
data, err := os.ReadFile(metaFile)
164175
if err != nil {
165-
nw.lastProcessedChunk = 0
176+
// Use -1 to indicate "no entries processed yet"
177+
nw.lastProcessedOffset = -1
178+
nw.logger.Debug("no last processed offset file found, starting from beginning", "file", metaFile, "error", err)
166179
return
167180
}
168181

169-
var chunk uint32
170-
if _, err := fmt.Sscanf(string(data), "%d", &chunk); err != nil {
171-
nw.lastProcessedChunk = 0
182+
var offset int64
183+
if _, err := fmt.Sscanf(string(data), "%d", &offset); err != nil {
184+
// Use -1 to indicate "no entries processed yet"
185+
nw.lastProcessedOffset = -1
172186
return
173187
}
174188

175-
nw.lastProcessedChunk = chunk
176-
nw.logger.Info("loaded last processed chunk", "block", chunk)
189+
nw.lastProcessedOffset = offset
190+
nw.logger.Debug("loaded last processed offset", "offset", offset)
177191
}
178192

179-
// saveLastProcessedChunk saves the last processed chunk position to a metadata file
180-
func (nw *NetWriter) saveLastProcessedChunk(chunk uint32) {
193+
// saveLastProcessedOffset saves the last processed offset to a metadata file
194+
func (nw *NetWriter) saveLastProcessedOffset(cp *wal.ChunkPosition) {
195+
// Create a unique offset by combining segment, block, and chunk offset
196+
offset := (int64(cp.SegmentId) << 32) | (int64(cp.BlockNumber) << 16) | int64(cp.ChunkOffset)
197+
181198
nw.mu.Lock()
182-
nw.lastProcessedChunk = chunk
199+
nw.lastProcessedOffset = offset
183200
nw.mu.Unlock()
184201

185202
metaFile := filepath.Join(nw.walDir, "last_processed")
186-
data := fmt.Sprintf("%d", chunk)
203+
data := fmt.Sprintf("%d", offset)
187204
if err := os.WriteFile(metaFile, []byte(data), 0o600); err != nil {
188-
nw.logger.Error("failed to save last processed chunk", "error", err)
205+
nw.logger.Error("failed to save last processed offset", "error", err)
206+
} else {
207+
nw.logger.Debug("saved last processed offset", "offset", offset)
189208
}
190209
}
191210

192211
// backgroundFlusher runs in the background and flushes WAL entries to the network
193212
func (nw *NetWriter) backgroundFlusher() {
194213
defer nw.flushWg.Done()
214+
nw.logger.Debug("background flusher started")
195215

196216
var conn net.Conn
197217
var connMu sync.RWMutex
@@ -225,6 +245,15 @@ func (nw *NetWriter) backgroundFlusher() {
225245
}
226246

227247
_, err := currentConn.Write(data)
248+
if err != nil {
249+
// Connection failed, clear it so reconnection logic kicks in
250+
connMu.Lock()
251+
if conn == currentConn {
252+
conn.Close()
253+
conn = nil
254+
}
255+
connMu.Unlock()
256+
}
228257
return err
229258
}
230259

@@ -237,41 +266,8 @@ func (nw *NetWriter) backgroundFlusher() {
237266
}
238267
}
239268

240-
// Set up WAL reader
241-
reader := nw.wal.NewReader()
242-
243-
// Skip already processed entries
244-
nw.mu.RLock()
245-
lastChunk := nw.lastProcessedChunk
246-
nw.mu.RUnlock()
247-
248-
if lastChunk > 0 {
249-
nw.logger.Info("skipping already processed entries", "lastProcessedBlock", lastChunk)
250-
// Skip already processed entries
251-
skipped := 0
252-
for {
253-
data, cp, err := reader.Next()
254-
if err == io.EOF {
255-
break
256-
}
257-
if err != nil {
258-
nw.logger.Error("error reading WAL during skip", "error", err)
259-
break
260-
}
261-
262-
// Skip entries that have already been processed
263-
if cp.BlockNumber <= lastChunk {
264-
skipped++
265-
continue
266-
}
267-
268-
// This is a new entry, process it
269-
if err := nw.processWALEntry(data, cp, writeToConn); err != nil {
270-
nw.logger.Error("error processing WAL entry", "error", err)
271-
}
272-
}
273-
nw.logger.Info("skipped processed entries", "count", skipped)
274-
}
269+
// Process any existing entries in the WAL immediately
270+
nw.processWALEntries(writeToConn)
275271

276272
ticker := time.NewTicker(100 * time.Millisecond) // Check for new entries every 100ms
277273
defer ticker.Stop()
@@ -283,7 +279,7 @@ func (nw *NetWriter) backgroundFlusher() {
283279
select {
284280
case <-nw.flushCtx.Done():
285281
// Flush remaining entries before shutting down
286-
nw.flushRemainingEntries(reader, writeToConn)
282+
nw.flushRemainingWALEntries(writeToConn)
287283

288284
connMu.Lock()
289285
if conn != nil {
@@ -294,51 +290,74 @@ func (nw *NetWriter) backgroundFlusher() {
294290

295291
case <-ticker.C:
296292
// Process available WAL entries
297-
nw.processAvailableEntries(reader, writeToConn)
293+
nw.processWALEntries(writeToConn)
298294

299295
case <-reconnectTicker.C:
300296
// Try to reconnect if we don't have a connection
301297
connMu.RLock()
302298
hasConn := conn != nil
303299
connMu.RUnlock()
304300

301+
nw.logger.Debug("reconnect ticker fired", "hasConn", hasConn)
305302
if !hasConn {
306303
if err := dial(); err != nil {
307304
nw.logger.Debug("reconnection attempt failed", "error", err)
305+
} else {
306+
// Successfully reconnected, process any buffered WAL entries
307+
nw.logger.Info("reconnected, processing buffered WAL entries")
308+
nw.processWALEntries(writeToConn)
308309
}
309310
}
310311
}
311312
}
312313
}
313314

314-
// processAvailableEntries processes all available entries in the WAL
315-
func (nw *NetWriter) processAvailableEntries(reader *wal.Reader, writeToConn func([]byte) error) {
315+
// processWALEntries processes all available entries in the WAL using a fresh reader
316+
func (nw *NetWriter) processWALEntries(writeToConn func([]byte) error) {
317+
// Create a fresh reader to see all current entries
318+
reader := nw.wal.NewReader()
319+
320+
processed := 0
321+
skipped := 0
322+
nw.logger.Debug("processing available WAL entries")
316323
for {
317324
data, cp, err := reader.Next()
318325
if err == io.EOF {
326+
if processed > 0 {
327+
nw.logger.Debug("processed WAL entries", "processed", processed, "skipped", skipped)
328+
}
319329
break
320330
}
321331
if err != nil {
322332
if err == wal.ErrClosed {
333+
nw.logger.Debug("WAL closed during processing")
323334
return
324335
}
325336
nw.logger.Error("error reading from WAL", "error", err)
326337
break
327338
}
328339

329-
// Check if we've already processed this block
340+
// Check if we've already processed this entry
330341
nw.mu.RLock()
331-
lastProcessed := nw.lastProcessedChunk
342+
lastProcessedOffset := nw.lastProcessedOffset
332343
nw.mu.RUnlock()
333344

334-
if cp.BlockNumber <= lastProcessed {
345+
// Create current entry offset for comparison
346+
currentOffset := (int64(cp.SegmentId) << 32) | (int64(cp.BlockNumber) << 16) | int64(cp.ChunkOffset)
347+
nw.logger.Debug("found WAL entry", "segmentId", cp.SegmentId, "blockNumber", cp.BlockNumber, "chunkOffset", cp.ChunkOffset, "currentOffset", currentOffset, "lastProcessedOffset", lastProcessedOffset, "size", len(data))
348+
349+
if currentOffset <= lastProcessedOffset {
335350
// Already processed, skip
351+
nw.logger.Debug("skipping already processed entry", "currentOffset", currentOffset, "lastProcessedOffset", lastProcessedOffset)
352+
skipped++
336353
continue
337354
}
338355

339356
if err := nw.processWALEntry(data, cp, writeToConn); err != nil {
340357
nw.logger.Error("error processing WAL entry", "error", err)
341358
// Don't break here - we want to continue processing other entries
359+
} else {
360+
processed++
342361
}
343362
}
344363
}
@@ -351,16 +370,19 @@ func (nw *NetWriter) processWALEntry(data []byte, cp *wal.ChunkPosition, writeTo
351370
return err
352371
}
353372

354-
// Mark this block as processed
355-
nw.saveLastProcessedChunk(cp.BlockNumber)
356-
nw.logger.Debug("processed WAL entry", "blockNumber", cp.BlockNumber)
373+
// Mark this entry as processed
374+
nw.saveLastProcessedOffset(cp)
375+
nw.logger.Debug("processed WAL entry", "segmentId", cp.SegmentId, "blockNumber", cp.BlockNumber, "chunkOffset", cp.ChunkOffset, "data", string(data))
357376
return nil
358377
}
359378

360-
// flushRemainingEntries flushes all remaining entries during shutdown
361-
func (nw *NetWriter) flushRemainingEntries(reader *wal.Reader, writeToConn func([]byte) error) {
379+
// flushRemainingWALEntries flushes all remaining entries during shutdown
380+
func (nw *NetWriter) flushRemainingWALEntries(writeToConn func([]byte) error) {
362381
nw.logger.Info("flushing remaining WAL entries during shutdown")
363382

383+
// Create a fresh reader for shutdown processing
384+
reader := nw.wal.NewReader()
385+
364386
count := 0
365387
for {
366388
data, cp, err := reader.Next()
@@ -372,12 +394,15 @@ func (nw *NetWriter) flushRemainingEntries(reader *wal.Reader, writeToConn func(
372394
break
373395
}
374396

375-
// Check if we've already processed this block
397+
// Check if we've already processed this entry
376398
nw.mu.RLock()
377-
lastProcessed := nw.lastProcessedChunk
399+
lastProcessedOffset := nw.lastProcessedOffset
378400
nw.mu.RUnlock()
379401

380-
if cp.BlockNumber <= lastProcessed {
402+
// Create current entry offset for comparison
403+
currentOffset := (int64(cp.SegmentId) << 32) | (int64(cp.BlockNumber) << 16) | int64(cp.ChunkOffset)
404+
405+
if currentOffset <= lastProcessedOffset {
381406
// Already processed, skip
382407
continue
383408
}
@@ -394,8 +419,8 @@ func (nw *NetWriter) flushRemainingEntries(reader *wal.Reader, writeToConn func(
394419
time.Sleep(time.Second)
395420
}
396421
} else {
397-
nw.saveLastProcessedChunk(cp.BlockNumber)
398-
nw.logger.Debug("flushed WAL entry during shutdown", "blockNumber", cp.BlockNumber)
422+
nw.saveLastProcessedOffset(cp)
423+
nw.logger.Debug("flushed WAL entry during shutdown", "segmentId", cp.SegmentId, "blockNumber", cp.BlockNumber, "chunkOffset", cp.ChunkOffset)
399424
break
400425
}
401426
}
@@ -415,15 +440,25 @@ type netWriterConn struct {
415440
// Write writes data to the WAL (non-blocking)
416441
func (w *netWriterConn) Write(p []byte) (n int, err error) {
417442
if w.nw.wal == nil {
443+
w.nw.logger.Error("WAL not initialized")
418444
return 0, errors.New("WAL not initialized")
419445
}
420446

447+
w.nw.logger.Debug("writing to WAL", "size", len(p))
448+
421449
// Write to WAL - this should be fast and non-blocking
422450
_, err = w.nw.wal.Write(p)
423451
if err != nil {
452+
w.nw.logger.Error("failed to write to WAL", "error", err)
424453
return 0, fmt.Errorf("failed to write to WAL: %v", err)
425454
}
426455

456+
// Sync WAL to ensure data is available for reading
457+
if err = w.nw.wal.Sync(); err != nil {
458+
w.nw.logger.Error("failed to sync WAL", "error", err)
459+
}
460+
461+
w.nw.logger.Debug("wrote data to WAL", "size", len(p))
427462
return len(p), nil
428463
}
429464

0 commit comments

Comments
 (0)