From 313610a777781d74147b9baee45c29a288f0c11e Mon Sep 17 00:00:00 2001 From: Oleg Babin Date: Wed, 29 Jan 2025 18:12:33 +0400 Subject: [PATCH 1/2] make syncer permanent In case of several validators it should be able to be in-sync. So we need to create syncer even for active shards. --- nil/internal/collate/scheduler.go | 42 +-------- nil/internal/collate/syncer.go | 61 ++++++++++--- nil/internal/execution/block_generator.go | 13 +-- nil/services/nilservice/service.go | 102 +++++++++++----------- 4 files changed, 110 insertions(+), 108 deletions(-) diff --git a/nil/internal/collate/scheduler.go b/nil/internal/collate/scheduler.go index 26227d4d..2ae91f15 100644 --- a/nil/internal/collate/scheduler.go +++ b/nil/internal/collate/scheduler.go @@ -2,7 +2,6 @@ package collate import ( "context" - "errors" "time" "github.com/NilFoundation/nil/nil/common" @@ -37,7 +36,6 @@ type Params struct { ZeroState string ZeroStateConfig *execution.ZeroStateConfig - MainKeysOutPath string Topology ShardTopology } @@ -75,15 +73,12 @@ func (s *Scheduler) Validator() *Validator { } } -func (s *Scheduler) Run(ctx context.Context, consensus Consensus) error { +func (s *Scheduler) Run(ctx context.Context, syncer *Syncer, consensus Consensus) error { + syncer.WaitComplete() + s.logger.Info().Msg("Starting collation...") s.consensus = consensus - // At first generate zero-state if needed - if err := s.generateZeroState(ctx); err != nil { - return err - } - // Enable handler for snapshot relaying SetBootstrapHandler(ctx, s.networkManager, s.params.ShardId, s.txFabric) @@ -109,37 +104,6 @@ func (s *Scheduler) Run(ctx context.Context, consensus Consensus) error { } } -func (s *Scheduler) generateZeroState(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, s.params.Timeout) - defer cancel() - - if _, err := s.readLastBlockHash(ctx); !errors.Is(err, db.ErrKeyNotFound) { - // error or nil if last block found - return err - } - - if len(s.params.MainKeysOutPath) != 0 && s.params.ShardId == types.BaseShardId { - if err := execution.DumpMainKeys(s.params.MainKeysOutPath); err != nil { - return err - } - } - - s.logger.Info().Msg("Generating zero-state...") - - gen, err := execution.NewBlockGenerator(ctx, s.params.BlockGeneratorParams, s.txFabric) - if err != nil { - return err - } - defer gen.Rollback() - - block, err := gen.GenerateZeroState(s.params.ZeroState, s.params.ZeroStateConfig) - if err != nil { - return err - } - - return PublishBlock(ctx, s.networkManager, s.params.ShardId, &types.BlockWithExtractedData{Block: block}) -} - func (s *Scheduler) doCollate(ctx context.Context) error { id, err := s.readLastBlockId(ctx) if err != nil { diff --git a/nil/internal/collate/syncer.go b/nil/internal/collate/syncer.go index d241a168..548e9afe 100644 --- a/nil/internal/collate/syncer.go +++ b/nil/internal/collate/syncer.go @@ -47,9 +47,14 @@ type Syncer struct { lastBlockNumber types.BlockNumber lastBlockHash common.Hash + + waitForSync *sync.WaitGroup } func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Syncer, error) { + var waitForSync sync.WaitGroup + waitForSync.Add(1) + return &Syncer{ config: cfg, topic: topicShardBlocks(cfg.ShardId), @@ -59,6 +64,7 @@ func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Sy Stringer(logging.FieldShardId, cfg.ShardId). Logger(), lastBlockNumber: types.BlockNumber(math.MaxUint64), + waitForSync: &waitForSync, }, nil } @@ -93,12 +99,18 @@ func (s *Syncer) shardIsEmpty(ctx context.Context) (bool, error) { return err != nil, nil } +func (s *Syncer) WaitComplete() { + s.waitForSync.Wait() +} + func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error { - if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil { - return err - } else if snapIsRequired { - if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil { - return fmt.Errorf("failed to fetch snapshot: %w", err) + if s.config.ReplayBlocks { + if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil { + return err + } else if snapIsRequired { + if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil { + return fmt.Errorf("failed to fetch snapshot: %w", err) + } } } @@ -107,10 +119,13 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error { wgFetch.Done() wgFetch.Wait() - if s.config.ReplayBlocks { - if err := s.generateZerostate(ctx); err != nil { - return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err) - } + if err := s.generateZerostate(ctx); err != nil { + return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err) + } + + if s.networkManager == nil { + s.waitForSync.Done() + return nil } err := s.readLastBlockNumber(ctx) @@ -125,9 +140,16 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error { s.logger.Info().Msg("Starting sync") + s.fetchBlocks(ctx) + s.waitForSync.Done() + + if ctx.Err() != nil { + return nil + } + sub, err := s.networkManager.PubSub().Subscribe(s.topic) if err != nil { - return err + return fmt.Errorf("Failed to subscribe to %s: %w", s.topic, err) } defer sub.Close() @@ -333,20 +355,35 @@ func (s *Syncer) saveDirectly(ctx context.Context, blocks []*types.BlockWithExtr } func (s *Syncer) generateZerostate(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, s.config.Timeout) + defer cancel() + if empty, err := s.shardIsEmpty(ctx); err != nil { return err } else if !empty { return nil } + if len(s.config.BlockGeneratorParams.MainKeysOutPath) != 0 && s.config.ShardId == types.BaseShardId { + if err := execution.DumpMainKeys(s.config.BlockGeneratorParams.MainKeysOutPath); err != nil { + return err + } + } + + s.logger.Info().Msg("Generating zero-state...") + gen, err := execution.NewBlockGenerator(ctx, s.config.BlockGeneratorParams, s.db) if err != nil { return err } defer gen.Rollback() - _, err = gen.GenerateZeroState(s.config.ZeroState, s.config.ZeroStateConfig) - return err + block, err := gen.GenerateZeroState(s.config.ZeroState, s.config.ZeroStateConfig) + if err != nil { + return err + } + + return PublishBlock(ctx, s.networkManager, s.config.ShardId, &types.BlockWithExtractedData{Block: block}) } func validateRepliedBlock( diff --git a/nil/internal/execution/block_generator.go b/nil/internal/execution/block_generator.go index 34a12deb..62a51091 100644 --- a/nil/internal/execution/block_generator.go +++ b/nil/internal/execution/block_generator.go @@ -16,12 +16,13 @@ import ( ) type BlockGeneratorParams struct { - ShardId types.ShardId - NShards uint32 - TraceEVM bool - Timer common.Timer - GasBasePrice types.Value - GasPriceScale float64 + ShardId types.ShardId + NShards uint32 + TraceEVM bool + Timer common.Timer + GasBasePrice types.Value + GasPriceScale float64 + MainKeysOutPath string } type Proposal struct { diff --git a/nil/services/nilservice/service.go b/nil/services/nilservice/service.go index fc7eb557..24f6c9b5 100644 --- a/nil/services/nilservice/service.go +++ b/nil/services/nilservice/service.go @@ -240,7 +240,7 @@ func createArchiveSyncers(cfg *Config, nm *network.Manager, database db.DB, logg logger.Error(). Err(err). Stringer(logging.FieldShardId, shardId). - Msg("Collator goroutine failed") + Msg("Syncer goroutine failed") return err } return nil @@ -460,11 +460,11 @@ func createShards( collatorTickPeriod := time.Millisecond * time.Duration(cfg.CollatorTickPeriodMs) syncerTimeout := syncTimeoutFactor * collatorTickPeriod - funcs := make([]concurrent.Func, cfg.NShards) + funcs := make([]concurrent.Func, 0, 2*cfg.NShards) pools := make(map[types.ShardId]txnpool.Pool) var wgFetch sync.WaitGroup - wgFetch.Add(int(cfg.NShards) - len(cfg.GetMyShards())) + wgFetch.Add(int(cfg.NShards)) pKey, err := cfg.LoadValidatorPrivateKey() if err != nil { @@ -476,6 +476,42 @@ func createShards( blockVerifier := signer.NewBlockVerifier(shardId, cfg.Validators[shardId]) + zeroState := execution.DefaultZeroStateConfig + zeroStateConfig := cfg.ZeroState + if len(cfg.ZeroStateYaml) != 0 { + zeroState = cfg.ZeroStateYaml + } + + syncerCfg := collate.SyncerConfig{ + ShardId: shardId, + ReplayBlocks: shardId.IsMainShard() || cfg.IsShardActive(shardId), + Timeout: syncerTimeout, + BlockGeneratorParams: cfg.BlockGeneratorParams(shardId), + BlockVerifier: blockVerifier, + ZeroState: zeroState, + ZeroStateConfig: zeroStateConfig, + } + + if int(shardId) < len(cfg.BootstrapPeers) { + syncerCfg.BootstrapPeer = &cfg.BootstrapPeers[shardId] + } + + syncer, err := collate.NewSyncer(syncerCfg, database, networkManager) + if err != nil { + return nil, nil, err + } + + funcs = append(funcs, func(ctx context.Context) error { + if err := syncer.Run(ctx, &wgFetch); err != nil { + logger.Error(). + Err(err). + Stringer(logging.FieldShardId, shardId). + Msg("Syncer goroutine failed") + return err + } + return nil + }) + if cfg.IsShardActive(shardId) { txnPool, err := txnpool.New(ctx, txnpool.NewConfig(shardId), networkManager) if err != nil { @@ -496,8 +532,8 @@ func createShards( } pools[shardId] = txnPool - funcs[i] = func(ctx context.Context) error { - if err := collator.Run(ctx, consensus); err != nil { + funcs = append(funcs, func(ctx context.Context) error { + if err := collator.Run(ctx, syncer, consensus); err != nil { logger.Error(). Err(err). Stringer(logging.FieldShardId, shardId). @@ -505,45 +541,9 @@ func createShards( return err } return nil - } - } else { - zeroState := execution.DefaultZeroStateConfig - zeroStateConfig := cfg.ZeroState - if len(cfg.ZeroStateYaml) != 0 { - zeroState = cfg.ZeroStateYaml - } - - config := collate.SyncerConfig{ - ShardId: shardId, - ReplayBlocks: shardId.IsMainShard(), - Timeout: syncerTimeout, - BlockGeneratorParams: cfg.BlockGeneratorParams(shardId), - BlockVerifier: blockVerifier, - ZeroState: zeroState, - ZeroStateConfig: zeroStateConfig, - } - if int(shardId) < len(cfg.BootstrapPeers) { - config.BootstrapPeer = &cfg.BootstrapPeers[shardId] - } - if networkManager == nil { - return nil, nil, errors.New("trying to start syncer without network configuration") - } - - syncer, err := collate.NewSyncer(config, database, networkManager) - if err != nil { - return nil, nil, err - } - - funcs[i] = func(ctx context.Context) error { - if err := syncer.Run(ctx, &wgFetch); err != nil { - logger.Error(). - Err(err). - Stringer(logging.FieldShardId, shardId). - Msg("Syncer goroutine failed") - return err - } - return nil - } + }) + } else if networkManager == nil { + return nil, nil, errors.New("trying to start syncer without network configuration") } } @@ -553,18 +553,18 @@ func createShards( func createActiveCollator(shard types.ShardId, cfg *Config, collatorTickPeriod time.Duration, database db.DB, networkManager *network.Manager, txnPool txnpool.Pool) *collate.Scheduler { collatorCfg := collate.Params{ BlockGeneratorParams: execution.BlockGeneratorParams{ - ShardId: shard, - NShards: cfg.NShards, - TraceEVM: cfg.TraceEVM, - Timer: common.NewTimer(), - GasBasePrice: types.NewValueFromUint64(cfg.GasBasePrice), - GasPriceScale: cfg.GasPriceScale, + ShardId: shard, + NShards: cfg.NShards, + TraceEVM: cfg.TraceEVM, + Timer: common.NewTimer(), + GasBasePrice: types.NewValueFromUint64(cfg.GasBasePrice), + GasPriceScale: cfg.GasPriceScale, + MainKeysOutPath: cfg.MainKeysOutPath, }, CollatorTickPeriod: collatorTickPeriod, Timeout: collatorTickPeriod, ZeroState: execution.DefaultZeroStateConfig, ZeroStateConfig: cfg.ZeroState, - MainKeysOutPath: cfg.MainKeysOutPath, Topology: collate.GetShardTopologyById(cfg.Topology), } if len(cfg.ZeroStateYaml) != 0 { From e0056218bff0790d967704e706ba5760a302fa5b Mon Sep 17 00:00:00 2001 From: Oleg Babin Date: Thu, 30 Jan 2025 10:04:30 +0400 Subject: [PATCH 2/2] drop lastBlockNumber/Hash from syncer Syncer works at the same time with consensus and block generation. So it's possible that we get new blocks via a network that was previously committed via consensus. In future we can cache it somehow but it should be done in separate module that keeps fresh blockchain state. --- nil/internal/collate/scheduler.go | 11 ----- nil/internal/collate/syncer.go | 67 +++++++++++++++---------------- 2 files changed, 32 insertions(+), 46 deletions(-) diff --git a/nil/internal/collate/scheduler.go b/nil/internal/collate/scheduler.go index 2ae91f15..62eb39b5 100644 --- a/nil/internal/collate/scheduler.go +++ b/nil/internal/collate/scheduler.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/NilFoundation/nil/nil/common" "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/internal/db" "github.com/NilFoundation/nil/nil/internal/execution" @@ -113,16 +112,6 @@ func (s *Scheduler) doCollate(ctx context.Context) error { return s.consensus.RunSequence(ctx, id.Uint64()+1) } -func (s *Scheduler) readLastBlockHash(ctx context.Context) (common.Hash, error) { - roTx, err := s.txFabric.CreateRoTx(ctx) - if err != nil { - return common.Hash{}, err - } - defer roTx.Rollback() - - return db.ReadLastBlockHash(roTx, s.params.ShardId) -} - func (s *Scheduler) readLastBlockId(ctx context.Context) (types.BlockNumber, error) { roTx, err := s.txFabric.CreateRoTx(ctx) if err != nil { diff --git a/nil/internal/collate/syncer.go b/nil/internal/collate/syncer.go index 548e9afe..522b5547 100644 --- a/nil/internal/collate/syncer.go +++ b/nil/internal/collate/syncer.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "math" "slices" "sync" "time" @@ -45,9 +44,6 @@ type Syncer struct { logger zerolog.Logger - lastBlockNumber types.BlockNumber - lastBlockHash common.Hash - waitForSync *sync.WaitGroup } @@ -63,40 +59,33 @@ func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Sy logger: logging.NewLogger("sync").With(). Stringer(logging.FieldShardId, cfg.ShardId). Logger(), - lastBlockNumber: types.BlockNumber(math.MaxUint64), - waitForSync: &waitForSync, + waitForSync: &waitForSync, }, nil } -func (s *Syncer) readLastBlockNumber(ctx context.Context) error { +func (s *Syncer) readLastBlock(ctx context.Context) (*types.Block, common.Hash, error) { rotx, err := s.db.CreateRoTx(ctx) if err != nil { - return err + return nil, common.EmptyHash, err } defer rotx.Rollback() - lastBlock, lastBlockHash, err := db.ReadLastBlock(rotx, s.config.ShardId) + + block, hash, err := db.ReadLastBlock(rotx, s.config.ShardId) if err != nil && !errors.Is(err, db.ErrKeyNotFound) { - return err + return nil, common.EmptyHash, err } if err == nil { - s.lastBlockNumber = lastBlock.Id - s.lastBlockHash = lastBlockHash + return block, hash, err } - return nil + return nil, common.EmptyHash, nil } func (s *Syncer) shardIsEmpty(ctx context.Context) (bool, error) { - rotx, err := s.db.CreateRoTx(ctx) + block, _, err := s.readLastBlock(ctx) if err != nil { return false, err } - defer rotx.Rollback() - - _, err = db.ReadLastBlockHash(rotx, s.config.ShardId) - if err != nil && !errors.Is(err, db.ErrKeyNotFound) { - return false, err - } - return err != nil, nil + return block == nil, nil } func (s *Syncer) WaitComplete() { @@ -128,15 +117,15 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error { return nil } - err := s.readLastBlockNumber(ctx) + block, hash, err := s.readLastBlock(ctx) if err != nil { return fmt.Errorf("failed to read last block number: %w", err) } s.logger.Debug(). - Stringer(logging.FieldBlockHash, s.lastBlockHash). - Uint64(logging.FieldBlockNumber, s.lastBlockNumber.Uint64()). - Msgf("Initialized sync proposer at starting block") + Stringer(logging.FieldBlockHash, hash). + Uint64(logging.FieldBlockNumber, uint64(block.Id)). + Msg("Initialized sync proposer at starting block") s.logger.Info().Msg("Starting sync") @@ -191,17 +180,22 @@ func (s *Syncer) processTopicTransaction(ctx context.Context, data []byte) (bool Stringer(logging.FieldBlockHash, block.Hash(s.config.ShardId)). Msg("Received block") - if block.Id != s.lastBlockNumber+1 { + lastBlock, lastHash, err := s.readLastBlock(ctx) + if err != nil { + return false, err + } + + if block.Id != lastBlock.Id+1 { s.logger.Debug(). Stringer(logging.FieldBlockNumber, block.Id). - Msgf("Received block is out of order with the last block %d", s.lastBlockNumber) + Msgf("Received block is out of order with the last block %d", lastBlock.Id) // todo: queue the block for later processing return false, nil } - if block.PrevBlock != s.lastBlockHash { - txn := fmt.Sprintf("Prev block hash mismatch: expected %x, got %x", s.lastBlockHash, block.PrevBlock) + if block.PrevBlock != lastHash { + txn := fmt.Sprintf("Prev block hash mismatch: expected %x, got %x", lastHash, block.PrevBlock) s.logger.Error(). Stringer(logging.FieldBlockNumber, block.Id). Stringer(logging.FieldBlockHash, block.Hash(s.config.ShardId)). @@ -243,11 +237,16 @@ func (s *Syncer) fetchBlocksRange(ctx context.Context) []*types.BlockWithExtract s.logger.Debug().Msgf("Found %d peers to fetch block from:\n%v", len(peers), peers) + lastBlock, _, err := s.readLastBlock(ctx) + if err != nil { + return nil + } + for _, p := range peers { - s.logger.Debug().Msgf("Requesting block %d from peer %s", s.lastBlockNumber+1, p) + s.logger.Debug().Msgf("Requesting block %d from peer %s", lastBlock.Id+1, p) const count = 100 - blocks, err := RequestBlocks(ctx, s.networkManager, p, s.config.ShardId, s.lastBlockNumber+1, count) + blocks, err := RequestBlocks(ctx, s.networkManager, p, s.config.ShardId, lastBlock.Id+1, count) if err == nil { return blocks } @@ -296,11 +295,9 @@ func (s *Syncer) saveBlocks(ctx context.Context, blocks []*types.BlockWithExtrac } } - s.lastBlockNumber = blocks[len(blocks)-1].Block.Id - s.lastBlockHash = blocks[len(blocks)-1].Block.Hash(s.config.ShardId) - + lastBlockNumber := blocks[len(blocks)-1].Block.Id s.logger.Debug(). - Stringer(logging.FieldBlockNumber, s.lastBlockNumber). + Stringer(logging.FieldBlockNumber, lastBlockNumber). Msg("Blocks written") return nil