Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,19 @@ evmstate_test: | build deps evmstate
txparse: | build deps
$(ENV_SCRIPT) nim c $(NIM_PARAMS) "tools/txparse/[email protected]"

# build syncer debugging and analysis tools
SYNCER_TOOLS_DIR := tools/syncer
SYNCER_TOOLS := $(foreach name,trace inspect replay,syncer_test_client_$(name))
.PHONY: syncer-tools syncer-tools-clean $(SYNCER_TOOLS)
syncer-tools: $(SYNCER_TOOLS)
syncer-tools-clean:
rm -f $(foreach exe,$(SYNCER_TOOLS),build/$(exe))
$(SYNCER_TOOLS): | build deps rocksdb
echo -e $(BUILD_MSG) "build/$@"
$(ENV_SCRIPT) nim c $(NIM_PARAMS) -o:build/$@ "$(SYNCER_TOOLS_DIR)/[email protected]"

# usual cleaning
clean: | clean-common
clean: | clean-common syncer-tools-clean
rm -rf build/{nimbus,nimbus_execution_client,nimbus_portal_client,fluffy,portal_bridge,libverifproxy,nimbus_verified_proxy,$(TOOLS_CSV),$(PORTAL_TOOLS_CSV),all_tests,test_kvstore_rocksdb,test_rpc,all_portal_tests,all_history_network_custom_chain_tests,test_portal_testnet,utp_test_app,utp_test,*.dSYM}
rm -rf tools/t8n/{t8n,t8n_test}
rm -rf tools/evmstate/{evmstate,evmstate_test}
Expand Down
76 changes: 66 additions & 10 deletions execution_chain/sync/beacon.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import
pkg/stew/[interval_set, sorted_set],
../core/chain,
../networking/p2p,
./beacon/worker/headers/headers_target,
./beacon/[beacon_desc, worker],
./beacon/worker/blocks/[blocks_fetch, blocks_import],
./beacon/worker/headers/[headers_fetch, headers_target],
./beacon/worker/update,
./[sync_sched, wire_protocol]

export
Expand All @@ -25,33 +27,62 @@ export
logScope:
topics = "beacon sync"

# ------------------------------------------------------------------------------
# Interceptable handlers
# ------------------------------------------------------------------------------

proc schedDaemonCB(
ctx: BeaconCtxRef;
): Future[Duration]
{.async: (raises: []).} =
return worker.runDaemon(ctx, "RunDaemon") # async/template

proc schedStartCB(buddy: BeaconBuddyRef): bool =
return worker.start(buddy, "RunStart")

proc schedStopCB(buddy: BeaconBuddyRef) =
worker.stop(buddy, "RunStop")

proc schedPoolCB(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
return worker.runPool(buddy, last, laps, "RunPool")

proc schedPeerCB(
buddy: BeaconBuddyRef;
): Future[Duration]
{.async: (raises: []).} =
return worker.runPeer(buddy, "RunPeer") # async/template

proc noOpFn(buddy: BeaconBuddyRef) = discard
proc noOpEx(self: BeaconHandlersSyncRef) = discard

# ------------------------------------------------------------------------------
# Virtual methods/interface, `mixin` functions
# ------------------------------------------------------------------------------

proc runSetup(ctx: BeaconCtxRef): bool =
worker.setup(ctx, "RunSetup")
return worker.setup(ctx, "RunSetup")

proc runRelease(ctx: BeaconCtxRef) =
worker.release(ctx, "RunRelease")

proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
return worker.runDaemon(ctx, "RunDaemon")

proc runTicker(ctx: BeaconCtxRef) =
worker.runTicker(ctx, "RunTicker")


proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
return await ctx.handler.schedDaemon(ctx)

proc runStart(buddy: BeaconBuddyRef): bool =
worker.start(buddy, "RunStart")
return buddy.ctx.handler.schedStart(buddy)

proc runStop(buddy: BeaconBuddyRef) =
worker.stop(buddy, "RunStop")
buddy.ctx.handler.schedStop(buddy)

proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
worker.runPool(buddy, last, laps, "RunPool")
return buddy.ctx.handler.schedPool(buddy, last, laps)

proc runPeer(buddy: BeaconBuddyRef): Future[Duration] {.async: (raises: []).} =
return worker.runPeer(buddy, "RunPeer")
return await buddy.ctx.handler.schedPeer(buddy)

# ------------------------------------------------------------------------------
# Public functions
Expand Down Expand Up @@ -83,6 +114,25 @@ proc config*(
desc.initSync(ethNode, maxPeers)
desc.ctx.pool.chain = chain

# Set up handlers so they can be overlayed
desc.ctx.pool.handlers = BeaconHandlersSyncRef(
version: 0,
activate: updateActivateCB,
suspend: updateSuspendCB,
schedDaemon: schedDaemonCB,
schedStart: schedStartCB,
schedStop: schedStopCB,
schedPool: schedPoolCB,
schedPeer: schedPeerCB,
getBlockHeaders: getBlockHeadersCB,
syncBlockHeaders: noOpFn,
getBlockBodies: getBlockBodiesCB,
syncBlockBodies: noOpFn,
importBlock: importBlockCB,
syncImportBlock: noOpFn,
startSync: noOpEx,
stopSync: noOpEx)

if not desc.lazyConfigHook.isNil:
desc.lazyConfigHook(desc)
desc.lazyConfigHook = nil
Expand All @@ -99,10 +149,16 @@ proc configTarget*(desc: BeaconSyncRef; hex: string; isFinal: bool): bool =

proc start*(desc: BeaconSyncRef): bool =
doAssert not desc.ctx.isNil
desc.startSync()
if desc.startSync():
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
w.startSync(w)
return true
# false

proc stop*(desc: BeaconSyncRef) {.async.} =
doAssert not desc.ctx.isNil
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
w.stopSync(w)
await desc.stopSync()

# ------------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions execution_chain/sync/beacon/beacon_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ type
## Instance descriptor, extends scheduler object
lazyConfigHook*: BeaconSyncConfigHook

BeaconHandlersSyncRef* = ref object of BeaconHandlersRef
## Add start/stop helpers to function list. By default, this functiona
## are no-ops.
startSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}
stopSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}

# End
19 changes: 17 additions & 2 deletions execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,27 @@ import
../../../../networking/p2p,
../../../wire_protocol/types,
../[update, worker_desc],
./[blocks_fetch, blocks_helpers, blocks_import, blocks_unproc]
./[blocks_fetch, blocks_helpers, blocks_unproc]

# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------

template importBlock(
buddy: BeaconBuddyRef;
blk: EthBlock;
effPeerID: Hash;
): Result[Duration,BeaconError] =
## Async/template
##
## Wrapper around `importBlock()` handler
##
let
ctx = buddy.ctx
rc = await ctx.handler.importBlock(buddy, blk, effPeerID)
ctx.handler.syncImportBlock(buddy) # debugging, trace, replay
rc

proc getNthHash(ctx: BeaconCtxRef; blocks: seq[EthBlock]; n: int): Hash32 =
ctx.hdrCache.getHash(blocks[n].header.number).valueOr:
return zeroHash32
Expand Down Expand Up @@ -201,7 +216,7 @@ template blocksImport*(

for n in 0 ..< blocks.len:
let nBn = blocks[n].header.number
discard (await buddy.importBlock(blocks[n], peerID)).valueOr:
buddy.importBlock(blocks[n], peerID).isOkOr:
if error.excp != ECancelledError:
isError = true

Expand Down
25 changes: 22 additions & 3 deletions execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,30 @@ import
../worker_desc,
./blocks_helpers

logScope:
topics = "beacon sync"

# ------------------------------------------------------------------------------
# Private helper
# -----------------------------------------------------------------------------

template getBlockBodies(
buddy: BeaconBuddyRef;
req: BlockBodiesRequest;
): Result[FetchBodiesData,BeaconError] =
## Async/template
##
## Wrapper around `getBlockBodies()` handler
##
let rc = await buddy.ctx.handler.getBlockBodies(buddy, req)
buddy.ctx.handler.syncBlockBodies(buddy) # debugging, sync, replay
rc

# ------------------------------------------------------------------------------
# Private helpers
# Public handler
# -----------------------------------------------------------------------------

proc getBlockBodies(
proc getBlockBodiesCB*(
buddy: BeaconBuddyRef;
req: BlockBodiesRequest;
): Future[Result[FetchBodiesData,BeaconError]]
Expand Down Expand Up @@ -70,7 +89,7 @@ template fetchBodies*(
trace trEthSendSendingGetBlockBodies,
peer, nReq, bdyErrors=buddy.bdyErrors

let rc = await buddy.getBlockBodies(request)
let rc = buddy.getBlockBodies(request)
var elapsed: Duration
if rc.isOk:
elapsed = rc.value.elapsed
Expand Down
7 changes: 5 additions & 2 deletions execution_chain/sync/beacon/worker/blocks/blocks_import.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ import
../../../wire_protocol,
../worker_desc

logScope:
topics = "beacon sync"

# ------------------------------------------------------------------------------
# Public function
# Public handler
# ------------------------------------------------------------------------------

proc importBlock*(
proc importBlockCB*(
buddy: BeaconBuddyRef;
blk: EthBlock;
effPeerID: Hash;
Expand Down
23 changes: 21 additions & 2 deletions execution_chain/sync/beacon/worker/headers/headers_fetch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,30 @@ import
../worker_desc,
./headers_helpers

logScope:
topics = "beacon sync"

# ------------------------------------------------------------------------------
# Private helpers
# -----------------------------------------------------------------------------

template getBlockHeaders(
buddy: BeaconBuddyRef;
req: BlockHeadersRequest;
): Result[FetchHeadersData,BeaconError] =
## Async/template
##
## Wrapper around `getBlockHeaders()` handler
##
let rc = await buddy.ctx.handler.getBlockHeaders(buddy, req)
buddy.ctx.handler.syncBlockHeaders(buddy) # debugging, sync, replay
rc

# ------------------------------------------------------------------------------
# Public handler
# ------------------------------------------------------------------------------

proc getBlockHeaders(
proc getBlockHeadersCB*(
buddy: BeaconBuddyRef;
req: BlockHeadersRequest;
): Future[Result[FetchHeadersData,BeaconError]]
Expand Down Expand Up @@ -88,7 +107,7 @@ template fetchHeadersReversed*(
trace trEthSendSendingGetBlockHeaders & " reverse", peer, req=ivReq,
nReq=req.maxResults, hash=topHash.toStr, hdrErrors=buddy.hdrErrors

let rc = await buddy.getBlockHeaders(req)
let rc = buddy.getBlockHeaders(req)
var elapsed: Duration
if rc.isOk:
elapsed = rc.value.elapsed
Expand Down
6 changes: 3 additions & 3 deletions execution_chain/sync/beacon/worker/start_stop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import
pkg/[chronicles, chronos, eth/common, metrics],
../../../networking/p2p,
../../wire_protocol,
./[blocks, headers, update, worker_desc]
./[blocks, headers, worker_desc]

type
SyncStateData = tuple
Expand Down Expand Up @@ -59,8 +59,8 @@ proc setupServices*(ctx: BeaconCtxRef; info: static[string]) =

# Set up the notifier informing when a new syncer session has started.
ctx.hdrCache.start proc() =
# Activates the syncer. Work will be picked up by peers when available.
ctx.updateActivateSyncer()
# This directive captures `ctx` for calling the activation handler.
ctx.handler.activate(ctx)

# Provide progress info call back handler
ctx.pool.chain.com.beaconSyncerProgress = proc(): SyncStateData =
Expand Down
39 changes: 20 additions & 19 deletions execution_chain/sync/beacon/worker/update.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,6 @@ declareGauge nec_sync_head, "" &
# Private functions, state handler helpers
# ------------------------------------------------------------------------------

proc updateSuspendSyncer(ctx: BeaconCtxRef) =
## Clean up sync target buckets, stop syncer activity, and and get ready
## for awaiting a new request from the `CL`.
##
ctx.hdrCache.clear()

ctx.pool.failedPeers.clear()
ctx.pool.seenData = false

ctx.hibernate = true

metrics.set(nec_sync_last_block_imported, 0)
metrics.set(nec_sync_head, 0)

info "Suspending syncer", base=ctx.chain.baseNumber.bnStr,
head=ctx.chain.latestNumber.bnStr, nSyncPeers=ctx.pool.nBuddies

proc commitCollectHeaders(ctx: BeaconCtxRef; info: static[string]): bool =
## Link header chain into `FC` module. Gets ready for block import.
##
Expand Down Expand Up @@ -227,7 +210,7 @@ proc updateSyncState*(ctx: BeaconCtxRef; info: static[string]) =

# Final sync scrum layout reached or inconsistent/impossible state
if newState == idle:
ctx.updateSuspendSyncer()
ctx.handler.suspend(ctx)


proc updateLastBlockImported*(ctx: BeaconCtxRef; bn: BlockNumber) =
Expand All @@ -238,7 +221,7 @@ proc updateLastBlockImported*(ctx: BeaconCtxRef; bn: BlockNumber) =
# Public functions, call-back handler ready
# ------------------------------------------------------------------------------

proc updateActivateSyncer*(ctx: BeaconCtxRef) =
proc updateActivateCB*(ctx: BeaconCtxRef) =
## If in hibernate mode, accept a cache session and activate syncer
##
if ctx.hibernate and # only in idle mode
Expand Down Expand Up @@ -277,6 +260,24 @@ proc updateActivateSyncer*(ctx: BeaconCtxRef) =
head=ctx.chain.latestNumber.bnStr, state=ctx.hdrCache.state,
initTarget=ctx.pool.initTarget.isSome(), nSyncPeers=ctx.pool.nBuddies


proc updateSuspendCB*(ctx: BeaconCtxRef) =
## Clean up sync target buckets, stop syncer activity, and and get ready
## for a new sync request from the `CL`.
##
ctx.hdrCache.clear()

ctx.pool.failedPeers.clear()
ctx.pool.seenData = false

ctx.hibernate = true

metrics.set(nec_sync_last_block_imported, 0)
metrics.set(nec_sync_head, 0)

info "Suspending syncer", base=ctx.chain.baseNumber.bnStr,
head=ctx.chain.latestNumber.bnStr, nSyncPeers=ctx.pool.nBuddies

# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------
Loading
Loading