Skip to content

Commit ccdbccb

Browse files
feat: optimize epoch chain sync with specialized staged sync pipeline
1 parent 6eb9f4a commit ccdbccb

File tree

3 files changed

+127
-2
lines changed

3 files changed

+127
-2
lines changed

api/service/synchronize/stagedstreamsync/default_stages.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,29 @@ func DefaultStages(ctx context.Context,
189189
},
190190
}
191191
}
192+
193+
func EpochStages(ctx context.Context,
194+
seCfg StageEpochCfg,
195+
finishCfg StageFinishCfg,
196+
) []*Stage {
197+
198+
handlerStageEpochSync := NewStageEpoch(seCfg)
199+
handlerStageFinish := NewStageFinish(finishCfg)
200+
201+
return []*Stage{
202+
{
203+
ID: SyncEpoch,
204+
Description: "Sync only Last Block of Epoch",
205+
Handler: handlerStageEpochSync,
206+
RangeMode: OnlyShortRange,
207+
ChainExecutionMode: OnlyEpochChain,
208+
},
209+
{
210+
ID: Finish,
211+
Description: "Finalize Changes",
212+
Handler: handlerStageFinish,
213+
RangeMode: LongRangeAndShortRange,
214+
ChainExecutionMode: AllChains,
215+
},
216+
}
217+
}

api/service/synchronize/stagedstreamsync/downloader.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func NewDownloader(host p2p.Host,
5353
setNodeSyncStatus func(bool)) *Downloader {
5454

5555
config.fixValues()
56+
isEpochChain := !isBeaconNode && bc.ShardID() == shard.BeaconChainShardID
5657

5758
protoCfg := protocolConfig(host, bc, nodeConfig, isBeaconNode, config)
5859
sp := streamSyncProtocol.NewProtocol(*protoCfg)
@@ -80,7 +81,13 @@ func NewDownloader(host p2p.Host,
8081
ctx, cancel := context.WithCancel(context.Background())
8182

8283
// create an instance of staged sync for the downloader
83-
stagedSyncInstance, err := CreateStagedSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger, setNodeSyncStatus)
84+
var stagedSyncInstance *StagedStreamSync
85+
var err error
86+
if isEpochChain {
87+
stagedSyncInstance, err = CreateStagedEpochSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger, setNodeSyncStatus)
88+
} else {
89+
stagedSyncInstance, err = CreateStagedSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger, setNodeSyncStatus)
90+
}
8491
if err != nil {
8592
cancel()
8693
return nil

api/service/synchronize/stagedstreamsync/syncing.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ func CreateStagedSync(ctx context.Context,
151151
Str("SyncMode", config.SyncMode.String()).
152152
Bool("serverOnly", config.ServerOnly).
153153
Int("minStreams", config.MinStreams).
154-
Str("dbDir", dbDir).
155154
Msg(WrapStagedSyncMsg("staged stream sync created successfully"))
156155

157156
return New(
@@ -172,6 +171,94 @@ func CreateStagedSync(ctx context.Context,
172171
), nil
173172
}
174173

174+
// CreateStagedEpochSync creates an instance of staged sync for epoch chain
175+
func CreateStagedEpochSync(ctx context.Context,
176+
bc core.BlockChain,
177+
nodeConfig *nodeconfig.ConfigType,
178+
consensus *consensus.Consensus,
179+
dbDir string,
180+
protocol syncProtocol,
181+
config Config,
182+
isBeaconNode bool,
183+
logger zerolog.Logger,
184+
setNodeSyncStatus func(bool),
185+
) (*StagedStreamSync, error) {
186+
187+
logger.Info().
188+
Uint32("shard", bc.ShardID()).
189+
Bool("isBeaconNode", isBeaconNode).
190+
Bool("memdb", config.UseMemDB).
191+
Str("dbDir", dbDir).
192+
Bool("serverOnly", config.ServerOnly).
193+
Int("minStreams", config.MinStreams).
194+
Msg(WrapStagedSyncMsg("creating staged epoch sync"))
195+
196+
isExplorer := nodeConfig.Role() == nodeconfig.ExplorerNode
197+
isValidator := nodeConfig.Role() == nodeconfig.Validator
198+
isBeaconShard := true
199+
isEpochChain := true
200+
isBeaconValidator := false
201+
joinConsensus := false
202+
203+
var mainDB kv.RwDB
204+
if config.UseMemDB {
205+
mdbPath := getEpochDbPath(dbDir)
206+
logger.Info().
207+
Str("path", mdbPath).
208+
Msg(WrapStagedSyncMsg("creating epoch main db in memory"))
209+
mainDB = mdbx.NewMDBX(log.New()).InMem(mdbPath).MustOpen()
210+
} else {
211+
mdbPath := getEpochDbPath(dbDir)
212+
logger.Info().
213+
Str("path", mdbPath).
214+
Msg(WrapStagedSyncMsg("creating epoch main db in disk"))
215+
mainDB = mdbx.NewMDBX(log.New()).Path(mdbPath).MustOpen()
216+
}
217+
218+
stageSyncEpochCfg := NewStageEpochCfg(bc, nil, logger)
219+
stageFinishCfg := NewStageFinishCfg(nil, logger)
220+
221+
// init stages order based on sync mode
222+
initStagesOrder(config.SyncMode)
223+
224+
epochStages := EpochStages(ctx,
225+
stageSyncEpochCfg,
226+
stageFinishCfg,
227+
)
228+
229+
logger.Info().
230+
Uint32("shard", bc.ShardID()).
231+
Bool("isEpochChain", isEpochChain).
232+
Bool("isExplorer", isExplorer).
233+
Bool("isValidator", isValidator).
234+
Bool("isBeaconShard", isBeaconShard).
235+
Bool("isBeaconValidator", isBeaconValidator).
236+
Bool("joinConsensus", joinConsensus).
237+
Bool("memdb", config.UseMemDB).
238+
Str("dbDir", dbDir).
239+
Str("SyncMode", config.SyncMode.String()).
240+
Bool("serverOnly", config.ServerOnly).
241+
Int("minStreams", config.MinStreams).
242+
Msg(WrapStagedSyncMsg("staged stream epoch sync created successfully"))
243+
244+
return New(
245+
bc,
246+
consensus,
247+
mainDB,
248+
epochStages,
249+
protocol,
250+
isEpochChain,
251+
isBeaconShard,
252+
isBeaconValidator,
253+
isExplorer,
254+
isValidator,
255+
joinConsensus,
256+
config,
257+
logger,
258+
setNodeSyncStatus,
259+
), nil
260+
}
261+
175262
// initDB inits the sync loop main database and create buckets
176263
func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB) error {
177264

@@ -236,6 +323,11 @@ func getBlockDbPath(shardID uint32, beacon bool, workerID int, dbDir string) str
236323
}
237324
}
238325

326+
// getEpochDbPath returns the path of the cache database which stores epoch blocks
327+
func getEpochDbPath(dbDir string) string {
328+
return filepath.Join(dbDir, "cache/epoch_db_main")
329+
}
330+
239331
func (s *StagedStreamSync) Debug(source string, msg interface{}) {
240332
// only log the msg in debug mode
241333
if !s.config.DebugMode {

0 commit comments

Comments
 (0)