Skip to content

Commit

Permalink
make syncer permanent
Browse files Browse the repository at this point in the history
In case of several validators it should be able to be in-sync.
So we need to create syncer even for active shards.
  • Loading branch information
olegrok committed Jan 29, 2025
1 parent 5d159b5 commit 056307e
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 62 deletions.
9 changes: 3 additions & 6 deletions nil/internal/collate/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,12 @@ func NewScheduler(txFabric db.DB, pool txnpool.Pool, params Params, networkManag
}, nil
}

func (s *Scheduler) Run(ctx context.Context, consensus Consensus) error {
func (s *Scheduler) Run(ctx context.Context, syncer *Syncer, consensus Consensus) error {
syncer.WaitComplete()

s.logger.Info().Msg("Starting collation...")
s.consensus = consensus

// At first generate zero-state if needed
if err := s.generateZeroState(ctx); err != nil {
return err
}

// Enable handler for snapshot relaying
SetBootstrapHandler(ctx, s.networkManager, s.params.ShardId, s.txFabric)

Expand Down
35 changes: 26 additions & 9 deletions nil/internal/collate/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ type Syncer struct {
lastBlockHash common.Hash

validatorPublicKey []byte
waitForSync *sync.WaitGroup
}

func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Syncer, error) {
if cfg.ValidatorPublicKey == nil {
return nil, errors.New("validator public key is required")
}

var waitForSync sync.WaitGroup
waitForSync.Add(1)

return &Syncer{
config: cfg,
topic: topicShardBlocks(cfg.ShardId),
Expand All @@ -67,6 +71,7 @@ func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Sy
Logger(),
lastBlockNumber: types.BlockNumber(math.MaxUint64),
validatorPublicKey: crypto.CompressPubkey(cfg.ValidatorPublicKey),
waitForSync: &waitForSync,
}, nil
}

Expand Down Expand Up @@ -101,12 +106,18 @@ func (s *Syncer) shardIsEmpty(ctx context.Context) (bool, error) {
return err != nil, nil
}

func (s *Syncer) WaitComplete() {
s.waitForSync.Wait()
}

func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error {
if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil {
return err
} else if snapIsRequired {
if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil {
return fmt.Errorf("failed to fetch snapshot: %w", err)
if s.config.ReplayBlocks {
if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil {
return err
} else if snapIsRequired {
if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil {
return fmt.Errorf("failed to fetch snapshot: %w", err)
}
}
}

Expand All @@ -115,10 +126,13 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error {
wgFetch.Done()
wgFetch.Wait()

if s.config.ReplayBlocks {
if err := s.generateZerostate(ctx); err != nil {
return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err)
}
if err := s.generateZerostate(ctx); err != nil {
return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err)
}

if s.networkManager == nil {
s.waitForSync.Done()
return nil
}

err := s.readLastBlockNumber(ctx)
Expand All @@ -133,6 +147,9 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error {

s.logger.Info().Msg("Starting sync")

s.fetchBlocks(ctx)
s.waitForSync.Done()

sub, err := s.networkManager.PubSub().Subscribe(s.topic)
if err != nil {
return err
Expand Down
96 changes: 49 additions & 47 deletions nil/services/nilservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,32 +464,68 @@ func createShards(
collatorTickPeriod := time.Millisecond * time.Duration(cfg.CollatorTickPeriodMs)
syncerTimeout := syncTimeoutFactor * collatorTickPeriod

funcs := make([]concurrent.Func, cfg.NShards)
funcs := make([]concurrent.Func, 0, 2*cfg.NShards)
pools := make(map[types.ShardId]txnpool.Pool)

var wgFetch sync.WaitGroup
wgFetch.Add(int(cfg.NShards) - len(cfg.GetMyShards()))
wgFetch.Add(int(cfg.NShards))

for i := range cfg.NShards {
shard := types.ShardId(i)
shardId := types.ShardId(i)

pKey, err := cfg.LoadValidatorPrivateKey(shard)
pKey, err := cfg.LoadValidatorPrivateKey(shardId)
if err != nil {
return nil, nil, err
}

zeroState := execution.DefaultZeroStateConfig
zeroStateConfig := cfg.ZeroState
if len(cfg.ZeroStateYaml) != 0 {
zeroState = cfg.ZeroStateYaml
}

syncerCfg := collate.SyncerConfig{
ShardId: shardId,
ReplayBlocks: shardId.IsMainShard() || cfg.IsShardActive(shardId),
Timeout: syncerTimeout,
BlockGeneratorParams: cfg.BlockGeneratorParams(shardId),
ValidatorPublicKey: &pKey.PublicKey,
ZeroState: zeroState,
ZeroStateConfig: zeroStateConfig,
}

if int(shardId) < len(cfg.BootstrapPeers) {
syncerCfg.BootstrapPeer = &cfg.BootstrapPeers[shardId]
}

syncer, err := collate.NewSyncer(syncerCfg, database, networkManager)
if err != nil {
return nil, nil, err
}

if cfg.IsShardActive(shard) {
txnPool, err := txnpool.New(ctx, txnpool.NewConfig(shard), networkManager)
funcs = append(funcs, func(ctx context.Context) error {
if err := syncer.Run(ctx, &wgFetch); err != nil {
logger.Error().
Err(err).
Stringer(logging.FieldShardId, shardId).
Msg("Syncer goroutine failed")
return err
}
return nil
})

if cfg.IsShardActive(shardId) {
txnPool, err := txnpool.New(ctx, txnpool.NewConfig(shardId), networkManager)
if err != nil {
return nil, nil, err
}
collator, err := createActiveCollator(shard, cfg, collatorTickPeriod, database, networkManager, txnPool)
collator, err := createActiveCollator(shardId, cfg, collatorTickPeriod, database, networkManager, txnPool)
if err != nil {
return nil, nil, err
}

consensus := ibft.NewConsensus(&ibft.ConsensusParams{
ShardId: shard,
ShardId: shardId,
Db: database,
Scheduler: collator,
NetManager: networkManager,
Expand All @@ -499,55 +535,21 @@ func createShards(
return nil, nil, err
}

pools[shard] = txnPool
funcs[i] = func(ctx context.Context) error {
if err := collator.Run(ctx, consensus); err != nil {
pools[shardId] = txnPool
funcs = append(funcs, func(ctx context.Context) error {
if err := collator.Run(ctx, syncer, consensus); err != nil {
logger.Error().
Err(err).
Stringer(logging.FieldShardId, shard).
Stringer(logging.FieldShardId, shardId).
Msg("Collator goroutine failed")
return err
}
return nil
}
})
} else {
zeroState := execution.DefaultZeroStateConfig
zeroStateConfig := cfg.ZeroState
if len(cfg.ZeroStateYaml) != 0 {
zeroState = cfg.ZeroStateYaml
}

config := collate.SyncerConfig{
ShardId: shard,
ReplayBlocks: shard.IsMainShard(),
Timeout: syncerTimeout,
BlockGeneratorParams: cfg.BlockGeneratorParams(shard),
ValidatorPublicKey: &pKey.PublicKey,
ZeroState: zeroState,
ZeroStateConfig: zeroStateConfig,
}
if int(shard) < len(cfg.BootstrapPeers) {
config.BootstrapPeer = &cfg.BootstrapPeers[shard]
}
if networkManager == nil {
return nil, nil, errors.New("trying to start syncer without network configuration")
}

syncer, err := collate.NewSyncer(config, database, networkManager)
if err != nil {
return nil, nil, err
}

funcs[i] = func(ctx context.Context) error {
if err := syncer.Run(ctx, &wgFetch); err != nil {
logger.Error().
Err(err).
Stringer(logging.FieldShardId, shard).
Msg("Syncer goroutine failed")
return err
}
return nil
}
}
}

Expand Down

0 comments on commit 056307e

Please sign in to comment.