Skip to content
26 changes: 26 additions & 0 deletions api/service/synchronize/stagedstreamsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,29 @@ func DefaultStages(ctx context.Context,
},
}
}

func EpochStages(ctx context.Context,
seCfg StageEpochCfg,
finishCfg StageFinishCfg,
) []*Stage {

handlerStageEpochSync := NewStageEpoch(seCfg)
handlerStageFinish := NewStageFinish(finishCfg)

return []*Stage{
{
ID: SyncEpoch,
Description: "Sync only Last Block of Epoch",
Handler: handlerStageEpochSync,
RangeMode: OnlyShortRange,
ChainExecutionMode: OnlyEpochChain,
},
{
ID: Finish,
Description: "Finalize Changes",
Handler: handlerStageFinish,
RangeMode: LongRangeAndShortRange,
ChainExecutionMode: AllChains,
},
}
}
9 changes: 8 additions & 1 deletion api/service/synchronize/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewDownloader(host p2p.Host,
setNodeSyncStatus func(bool)) *Downloader {

config.fixValues()
isEpochChain := !isBeaconNode && bc.ShardID() == shard.BeaconChainShardID

protoCfg := protocolConfig(host, bc, nodeConfig, isBeaconNode, config)
sp := streamSyncProtocol.NewProtocol(*protoCfg)
Expand Down Expand Up @@ -80,7 +81,13 @@ func NewDownloader(host p2p.Host,
ctx, cancel := context.WithCancel(context.Background())

// create an instance of staged sync for the downloader
stagedSyncInstance, err := CreateStagedSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger, setNodeSyncStatus)
var stagedSyncInstance *StagedStreamSync
var err error
if isEpochChain {
stagedSyncInstance, err = CreateStagedEpochSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger, setNodeSyncStatus)
} else {
stagedSyncInstance, err = CreateStagedSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger, setNodeSyncStatus)
}
if err != nil {
cancel()
return nil
Expand Down
37 changes: 25 additions & 12 deletions api/service/synchronize/stagedstreamsync/staged_stream_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,36 +334,41 @@ func New(
setNodeSyncStatus func(bool),
) *StagedStreamSync {

forwardStages := make([]*Stage, len(StagesForwardOrder))
for i, stageIndex := range StagesForwardOrder {
var forwardStages []*Stage
for _, stageID := range StagesForwardOrder {
for _, s := range stagesList {
if s.ID == stageIndex {
forwardStages[i] = s
if s.ID == stageID {
forwardStages = append(forwardStages, s)
break
}
}
}

revertStages := make([]*Stage, len(StagesRevertOrder))
for i, stageIndex := range StagesRevertOrder {
var revertStages []*Stage
for _, stageID := range StagesRevertOrder {
for _, s := range stagesList {
if s.ID == stageIndex {
revertStages[i] = s
if s.ID == stageID {
revertStages = append(revertStages, s)
break
}
}
}

pruneStages := make([]*Stage, len(StagesCleanUpOrder))
for i, stageIndex := range StagesCleanUpOrder {
var pruneStages []*Stage
for _, stageID := range StagesCleanUpOrder {
for _, s := range stagesList {
if s.ID == stageIndex {
pruneStages[i] = s
if s.ID == stageID {
pruneStages = append(pruneStages, s)
break
}
}
}

// Validate that we have at least one stage
if len(forwardStages) == 0 {
logger.Error().Msg(WrapStagedSyncMsg("no valid stages found - this will cause sync failures"))
}

logPrefixes := make([]string, len(stagesList))
for i := range stagesList {
logPrefixes[i] = fmt.Sprintf("%d/%d %s", i+1, len(stagesList), stagesList[i].ID)
Expand Down Expand Up @@ -467,6 +472,14 @@ func (sss *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, fi

stage := sss.stages[sss.currentStage]

if stage == nil {
sss.logger.Error().
Uint("currentStage", sss.currentStage).
Int("totalStages", len(sss.stages)).
Msg(WrapStagedSyncMsg("stage is nil, skipping to next stage"))
return fmt.Errorf("stage is nil")
}

if stage.Disabled {
sss.logger.Trace().
Msg(WrapStagedSyncMsg(fmt.Sprintf("%s disabled. %s", stage.ID, stage.DisabledDescription)))
Expand Down
101 changes: 100 additions & 1 deletion api/service/synchronize/stagedstreamsync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func CreateStagedSync(ctx context.Context,
Str("SyncMode", config.SyncMode.String()).
Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams).
Str("dbDir", dbDir).
Msg(WrapStagedSyncMsg("staged stream sync created successfully"))

return New(
Expand All @@ -172,6 +171,101 @@ func CreateStagedSync(ctx context.Context,
), nil
}

// CreateStagedEpochSync creates an instance of staged sync for epoch chain
func CreateStagedEpochSync(ctx context.Context,
bc core.BlockChain,
nodeConfig *nodeconfig.ConfigType,
consensus *consensus.Consensus,
dbDir string,
protocol syncProtocol,
config Config,
isBeaconNode bool,
logger zerolog.Logger,
setNodeSyncStatus func(bool),
) (*StagedStreamSync, error) {

logger.Info().
Uint32("shard", bc.ShardID()).
Bool("isBeaconNode", isBeaconNode).
Bool("memdb", config.UseMemDB).
Str("dbDir", dbDir).
Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams).
Msg(WrapStagedSyncMsg("creating staged epoch sync"))

isExplorer := nodeConfig.Role() == nodeconfig.ExplorerNode
isValidator := nodeConfig.Role() == nodeconfig.Validator
isBeaconShard := true
isEpochChain := true
isBeaconValidator := false
joinConsensus := false

var mainDB kv.RwDB
if config.UseMemDB {
mdbPath := getEpochDbPath(dbDir)
logger.Info().
Str("path", mdbPath).
Msg(WrapStagedSyncMsg("creating epoch main db in memory"))
mainDB = mdbx.NewMDBX(log.New()).InMem(mdbPath).MustOpen()
} else {
mdbPath := getEpochDbPath(dbDir)
logger.Info().
Str("path", mdbPath).
Msg(WrapStagedSyncMsg("creating epoch main db in disk"))
mainDB = mdbx.NewMDBX(log.New()).Path(mdbPath).MustOpen()
}

// Initialize database buckets for epoch sync
// Epoch sync doesn't need sub-databases, so we pass an empty slice
if errInitDB := initDB(ctx, mainDB, []kv.RwDB{}); errInitDB != nil {
logger.Error().Err(errInitDB).Msg("create staged epoch sync instance failed")
return nil, errInitDB
}

stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB, logger)
stageFinishCfg := NewStageFinishCfg(mainDB, logger)

// init stages order based on sync mode
initStagesOrder(config.SyncMode)

epochStages := EpochStages(ctx,
stageSyncEpochCfg,
stageFinishCfg,
)

logger.Info().
Uint32("shard", bc.ShardID()).
Bool("isEpochChain", isEpochChain).
Bool("isExplorer", isExplorer).
Bool("isValidator", isValidator).
Bool("isBeaconShard", isBeaconShard).
Bool("isBeaconValidator", isBeaconValidator).
Bool("joinConsensus", joinConsensus).
Bool("memdb", config.UseMemDB).
Str("dbDir", dbDir).
Str("SyncMode", config.SyncMode.String()).
Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams).
Msg(WrapStagedSyncMsg("staged stream epoch sync created successfully"))

return New(
bc,
consensus,
mainDB,
epochStages,
protocol,
isEpochChain,
isBeaconShard,
isBeaconValidator,
isExplorer,
isValidator,
joinConsensus,
config,
logger,
setNodeSyncStatus,
), nil
}

// initDB inits the sync loop main database and create buckets
func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB) error {

Expand Down Expand Up @@ -236,6 +330,11 @@ func getBlockDbPath(shardID uint32, beacon bool, workerID int, dbDir string) str
}
}

// getEpochDbPath returns the path of the cache database which stores epoch blocks
func getEpochDbPath(dbDir string) string {
return filepath.Join(dbDir, "cache/epoch_db_main")
}

func (s *StagedStreamSync) Debug(source string, msg interface{}) {
// only log the msg in debug mode
if !s.config.DebugMode {
Expand Down
28 changes: 27 additions & 1 deletion core/blockchain_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ var (
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)

// CrossLinkPendingQueueGauge is used to monitor the current size of pending crosslink queue
CrossLinkPendingQueueGauge = metrics.NewRegisteredGauge("chain/crosslink/pending_queue_size", nil)

// ErrCrosslinkNotFound is the error when no crosslink found
ErrCrosslinkNotFound = errors.New("crosslink not found")
// ErrZeroBytes is the error when it reads empty crosslink
Expand Down Expand Up @@ -2558,7 +2562,12 @@ func (bc *BlockChainImpl) ReadPendingCrossLinks() ([]types.CrossLink, error) {
bc.pendingCrossLinksMutex.Lock()
defer bc.pendingCrossLinksMutex.Unlock()

return bc.readPendingCrossLinks()
cls, err := bc.readPendingCrossLinks()
if err == nil {
// Update pending queue gauge with current size
bc.updatePendingCrossLinkQueueGauge(len(cls))
}
return cls, err
}

func (bc *BlockChainImpl) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, error) {
Expand All @@ -2568,10 +2577,18 @@ func (bc *BlockChainImpl) AddPendingCrossLinks(pendingCLs []types.CrossLink) (in
cls, err := bc.readPendingCrossLinks()
if err != nil || len(cls) == 0 {
err := bc.CachePendingCrossLinks(pendingCLs)
if err == nil {
// Update pending queue gauge with new size
bc.updatePendingCrossLinkQueueGauge(len(pendingCLs))
}
return len(pendingCLs), err
}
cls = append(cls, pendingCLs...)
err = bc.CachePendingCrossLinks(cls)
if err == nil {
// Update pending queue gauge with new size
bc.updatePendingCrossLinkQueueGauge(len(cls))
}
return len(cls), err
}

Expand Down Expand Up @@ -2603,9 +2620,18 @@ func (bc *BlockChainImpl) DeleteFromPendingCrossLinks(crossLinks []types.CrossLi
pendingCLs = append(pendingCLs, cl)
}
err = bc.CachePendingCrossLinks(pendingCLs)
if err == nil {
// Update pending queue gauge with new size after deletion
bc.updatePendingCrossLinkQueueGauge(len(pendingCLs))
}
return len(pendingCLs), err
}

// updatePendingCrossLinkQueueGauge updates the pending crosslink queue size gauge
func (bc *BlockChainImpl) updatePendingCrossLinkQueueGauge(size int) {
CrossLinkPendingQueueGauge.Update(int64(size))
}

func (bc *BlockChainImpl) IsSameLeaderAsPreviousBlock(block *types.Block) bool {
if IsEpochBlock(block) {
return false
Expand Down
11 changes: 11 additions & 0 deletions node/harmony/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ var (
},
)

// CrossLinkPendingQueueGauge is used to monitor the current size of pending crosslink queue
CrossLinkPendingQueueGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "p2p",
Name: "crosslink_pending_queue_size",
Help: "current number of crosslinks in pending queue",
},
)

onceMetrics sync.Once
)

Expand All @@ -81,6 +91,7 @@ func initMetrics() {
nodeConsensusMessageCounterVec,
nodeNodeMessageCounterVec,
nodeCrossLinkMessageCounterVec,
CrossLinkPendingQueueGauge,
)
})
}
25 changes: 15 additions & 10 deletions node/harmony/node_cross_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ func (node *Node) processCrossLinkHeartbeatMessage(msgPayload []byte) error {
return errors.New("epoch chain current block not available")
}

// Check if epoch chain is synced to at least the heartbeat epoch
currentEpoch := epochCurrentBlock.Epoch()
currentEpochU64 := currentEpoch.Uint64()
if currentEpochU64 < hb.Epoch {
epochChainEpoch := epochCurrentBlock.Epoch().Uint64()

// epoch chain should be at least at the previous epoch
if epochChainEpoch < hb.Epoch-1 {
utils.Logger().Warn().
Uint64("currentEpoch", currentEpochU64).
Uint64("epochChainEpoch", epochChainEpoch).
Uint64("heartbeatEpoch", hb.Epoch).
Msg("[ProcessCrossLinkHeartbeatMessage] epoch chain not synced to heartbeat epoch, ignoring heartbeat")
return errors.Errorf("epoch chain not synced to heartbeat epoch: current=%d, heartbeat=%d", currentEpochU64, hb.Epoch)
return errors.Errorf("epoch chain not synced to heartbeat epoch: current=%d, heartbeat=%d", epochChainEpoch, hb.Epoch)
}

// Outdated signal.
Expand Down Expand Up @@ -199,7 +199,12 @@ func (node *Node) processCrossLinkHeartbeatMessage(msgPayload []byte) error {

state, err := epochChain.ReadShardState(cur.Epoch())
if err != nil {
return errors.WithMessagef(err, "cannot read shard state for epoch %d", cur.Epoch())
utils.Logger().Warn().
Err(err).
Uint64("heartbeatEpoch", hb.Epoch).
Uint64("epochChainEpoch", epochChainEpoch).
Msg("[ProcessCrossLinkHeartbeatMessage] cannot read shard state for heartbeat epoch from beacon chain")
return errors.WithMessagef(err, "cannot read shard state for epoch %d from beacon chain", hb.Epoch)
}
committee, err := state.FindCommitteeByID(shard.BeaconChainShardID)
if err != nil {
Expand Down Expand Up @@ -279,7 +284,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {

// Check if cross-link already exists in pending queue
if _, exists := existingCLs[cl.Hash()]; exists {
nodeCrossLinkMessageCounterVec.With(prometheus.Labels{"type": "duplicate_crosslink"}).Inc()
nodeCrossLinkMessageCounterVec.With(prometheus.Labels{"type": "duplicate_crosslink_pending_queue"}).Inc()
utils.Logger().Debug().
Str("crossLinkHash", cl.Hash().Hex()).
Uint64("beaconEpoch", node.Blockchain().CurrentHeader().Epoch().Uint64()).
Expand All @@ -293,7 +298,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
// Check if cross-link already exists in blockchain
exist, err := node.Blockchain().ReadCrossLink(cl.ShardID(), cl.Number().Uint64())
if err == nil && exist != nil {
nodeCrossLinkMessageCounterVec.With(prometheus.Labels{"type": "duplicate_crosslink"}).Inc()
nodeCrossLinkMessageCounterVec.With(prometheus.Labels{"type": "duplicate_crosslink_already_processed"}).Inc()
utils.Logger().Debug().
Str("crossLinkHash", cl.Hash().Hex()).
Uint64("beaconEpoch", node.Blockchain().CurrentHeader().Epoch().Uint64()).
Expand All @@ -310,7 +315,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {

// Allow processing cross-links from current epoch and earlier
// Cross-links from future epochs should not exist and may be malicious or maybe it is not fully synced
if crossLinkEpoch > localEpoch {
if crossLinkEpoch > localEpoch+1 {
utils.Logger().Debug().
Str("crossLinkHash", cl.Hash().Hex()).
Uint64("crossLinkEpoch", crossLinkEpoch).
Expand Down