diff --git a/nil/internal/collate/scheduler.go b/nil/internal/collate/scheduler.go index 558cf158c..e81cd960e 100644 --- a/nil/internal/collate/scheduler.go +++ b/nil/internal/collate/scheduler.go @@ -2,7 +2,6 @@ package collate import ( "context" - "errors" "fmt" "time" @@ -41,7 +40,6 @@ type Params struct { ZeroState string ZeroStateConfig *execution.ZeroStateConfig - MainKeysOutPath string Topology ShardTopology } @@ -78,15 +76,12 @@ func NewScheduler(txFabric db.DB, pool txnpool.Pool, params Params, networkManag }, nil } -func (s *Scheduler) Run(ctx context.Context, consensus Consensus) error { +func (s *Scheduler) Run(ctx context.Context, syncer *Syncer, consensus Consensus) error { + syncer.WaitComplete() + s.logger.Info().Msg("Starting collation...") s.consensus = consensus - // At first generate zero-state if needed - if err := s.generateZeroState(ctx); err != nil { - return err - } - // Enable handler for snapshot relaying SetBootstrapHandler(ctx, s.networkManager, s.params.ShardId, s.txFabric) @@ -112,43 +107,6 @@ func (s *Scheduler) Run(ctx context.Context, consensus Consensus) error { } } -func (s *Scheduler) generateZeroState(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, s.params.Timeout) - defer cancel() - - roTx, err := s.txFabric.CreateRoTx(ctx) - if err != nil { - return err - } - defer roTx.Rollback() - - if _, err := db.ReadLastBlockHash(roTx, s.params.ShardId); !errors.Is(err, db.ErrKeyNotFound) { - // error or nil if last block found - return err - } - - if len(s.params.MainKeysOutPath) != 0 && s.params.ShardId == types.BaseShardId { - if err := execution.DumpMainKeys(s.params.MainKeysOutPath); err != nil { - return err - } - } - - s.logger.Info().Msg("Generating zero-state...") - - gen, err := execution.NewBlockGenerator(ctx, s.params.BlockGeneratorParams, s.txFabric) - if err != nil { - return err - } - defer gen.Rollback() - - block, err := gen.GenerateZeroState(s.params.ZeroState, s.params.ZeroStateConfig) - if err != nil { - return err - } - - return PublishBlock(ctx, s.networkManager, s.params.ShardId, &types.BlockWithExtractedData{Block: block}) -} - func (s *Scheduler) BuildProposal(ctx context.Context) (*execution.Proposal, error) { collator := newCollator(s.params, s.params.Topology, s.pool, s.logger) proposal, err := collator.GenerateProposal(ctx, s.txFabric) diff --git a/nil/internal/collate/syncer.go b/nil/internal/collate/syncer.go index 2f2deac4d..0a61345fd 100644 --- a/nil/internal/collate/syncer.go +++ b/nil/internal/collate/syncer.go @@ -50,6 +50,7 @@ type Syncer struct { lastBlockHash common.Hash validatorPublicKey []byte + waitForSync *sync.WaitGroup } func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Syncer, error) { @@ -57,6 +58,9 @@ func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Sy return nil, errors.New("validator public key is required") } + var waitForSync sync.WaitGroup + waitForSync.Add(1) + return &Syncer{ config: cfg, topic: topicShardBlocks(cfg.ShardId), @@ -67,6 +71,7 @@ func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Sy Logger(), lastBlockNumber: types.BlockNumber(math.MaxUint64), validatorPublicKey: crypto.CompressPubkey(cfg.ValidatorPublicKey), + waitForSync: &waitForSync, }, nil } @@ -101,12 +106,18 @@ func (s *Syncer) shardIsEmpty(ctx context.Context) (bool, error) { return err != nil, nil } +func (s *Syncer) WaitComplete() { + s.waitForSync.Wait() +} + func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error { - if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil { - return err - } else if snapIsRequired { - if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil { - return fmt.Errorf("failed to fetch snapshot: %w", err) + if s.config.ReplayBlocks { + if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil { + return err + } else if snapIsRequired { + if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil { + return fmt.Errorf("failed to fetch snapshot: %w", err) + } } } @@ -115,10 +126,13 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error { wgFetch.Done() wgFetch.Wait() - if s.config.ReplayBlocks { - if err := s.generateZerostate(ctx); err != nil { - return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err) - } + if err := s.generateZerostate(ctx); err != nil { + return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err) + } + + if s.networkManager == nil { + s.waitForSync.Done() + return nil } err := s.readLastBlockNumber(ctx) @@ -133,9 +147,16 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error { s.logger.Info().Msg("Starting sync") + s.fetchBlocks(ctx) + s.waitForSync.Done() + + if ctx.Err() != nil { + return nil + } + sub, err := s.networkManager.PubSub().Subscribe(s.topic) if err != nil { - return err + return fmt.Errorf("Failed to subscribe to %s: %w", s.topic, err) } defer sub.Close() @@ -342,20 +363,35 @@ func (s *Syncer) saveDirectly(ctx context.Context, blocks []*types.BlockWithExtr } func (s *Syncer) generateZerostate(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, s.config.Timeout) + defer cancel() + if empty, err := s.shardIsEmpty(ctx); err != nil { return err } else if !empty { return nil } + if len(s.config.BlockGeneratorParams.MainKeysOutPath) != 0 && s.config.ShardId == types.BaseShardId { + if err := execution.DumpMainKeys(s.config.BlockGeneratorParams.MainKeysOutPath); err != nil { + return err + } + } + + s.logger.Info().Msg("Generating zero-state...") + gen, err := execution.NewBlockGenerator(ctx, s.config.BlockGeneratorParams, s.db) if err != nil { return err } defer gen.Rollback() - _, err = gen.GenerateZeroState(s.config.ZeroState, s.config.ZeroStateConfig) - return err + block, err := gen.GenerateZeroState(s.config.ZeroState, s.config.ZeroStateConfig) + if err != nil { + return err + } + + return PublishBlock(ctx, s.networkManager, s.config.ShardId, &types.BlockWithExtractedData{Block: block}) } func validateRepliedBlock( diff --git a/nil/internal/execution/block_generator.go b/nil/internal/execution/block_generator.go index d89ceb06e..7b6492bef 100644 --- a/nil/internal/execution/block_generator.go +++ b/nil/internal/execution/block_generator.go @@ -16,12 +16,13 @@ import ( ) type BlockGeneratorParams struct { - ShardId types.ShardId - NShards uint32 - TraceEVM bool - Timer common.Timer - GasBasePrice types.Value - GasPriceScale float64 + ShardId types.ShardId + NShards uint32 + TraceEVM bool + Timer common.Timer + GasBasePrice types.Value + GasPriceScale float64 + MainKeysOutPath string } type Proposal struct { diff --git a/nil/services/nilservice/service.go b/nil/services/nilservice/service.go index 504d9ff3b..69f26d4bb 100644 --- a/nil/services/nilservice/service.go +++ b/nil/services/nilservice/service.go @@ -244,7 +244,7 @@ func createArchiveSyncers(cfg *Config, nm *network.Manager, database db.DB, logg logger.Error(). Err(err). Stringer(logging.FieldShardId, shardId). - Msg("Collator goroutine failed") + Msg("Syncer goroutine failed") return err } return nil @@ -464,32 +464,68 @@ func createShards( collatorTickPeriod := time.Millisecond * time.Duration(cfg.CollatorTickPeriodMs) syncerTimeout := syncTimeoutFactor * collatorTickPeriod - funcs := make([]concurrent.Func, cfg.NShards) + funcs := make([]concurrent.Func, 0, 2*cfg.NShards) pools := make(map[types.ShardId]txnpool.Pool) var wgFetch sync.WaitGroup - wgFetch.Add(int(cfg.NShards) - len(cfg.GetMyShards())) + wgFetch.Add(int(cfg.NShards)) for i := range cfg.NShards { - shard := types.ShardId(i) + shardId := types.ShardId(i) - pKey, err := cfg.LoadValidatorPrivateKey(shard) + pKey, err := cfg.LoadValidatorPrivateKey(shardId) if err != nil { return nil, nil, err } - if cfg.IsShardActive(shard) { - txnPool, err := txnpool.New(ctx, txnpool.NewConfig(shard), networkManager) + zeroState := execution.DefaultZeroStateConfig + zeroStateConfig := cfg.ZeroState + if len(cfg.ZeroStateYaml) != 0 { + zeroState = cfg.ZeroStateYaml + } + + syncerCfg := collate.SyncerConfig{ + ShardId: shardId, + ReplayBlocks: shardId.IsMainShard() || cfg.IsShardActive(shardId), + Timeout: syncerTimeout, + BlockGeneratorParams: cfg.BlockGeneratorParams(shardId), + ValidatorPublicKey: &pKey.PublicKey, + ZeroState: zeroState, + ZeroStateConfig: zeroStateConfig, + } + + if int(shardId) < len(cfg.BootstrapPeers) { + syncerCfg.BootstrapPeer = &cfg.BootstrapPeers[shardId] + } + + syncer, err := collate.NewSyncer(syncerCfg, database, networkManager) + if err != nil { + return nil, nil, err + } + + funcs = append(funcs, func(ctx context.Context) error { + if err := syncer.Run(ctx, &wgFetch); err != nil { + logger.Error(). + Err(err). + Stringer(logging.FieldShardId, shardId). + Msg("Syncer goroutine failed") + return err + } + return nil + }) + + if cfg.IsShardActive(shardId) { + txnPool, err := txnpool.New(ctx, txnpool.NewConfig(shardId), networkManager) if err != nil { return nil, nil, err } - collator, err := createActiveCollator(shard, cfg, collatorTickPeriod, database, networkManager, txnPool) + collator, err := createActiveCollator(shardId, cfg, collatorTickPeriod, database, networkManager, txnPool) if err != nil { return nil, nil, err } consensus := ibft.NewConsensus(&ibft.ConsensusParams{ - ShardId: shard, + ShardId: shardId, Db: database, Scheduler: collator, NetManager: networkManager, @@ -499,55 +535,19 @@ func createShards( return nil, nil, err } - pools[shard] = txnPool - funcs[i] = func(ctx context.Context) error { - if err := collator.Run(ctx, consensus); err != nil { + pools[shardId] = txnPool + funcs = append(funcs, func(ctx context.Context) error { + if err := collator.Run(ctx, syncer, consensus); err != nil { logger.Error(). Err(err). - Stringer(logging.FieldShardId, shard). + Stringer(logging.FieldShardId, shardId). Msg("Collator goroutine failed") return err } return nil - } - } else { - zeroState := execution.DefaultZeroStateConfig - zeroStateConfig := cfg.ZeroState - if len(cfg.ZeroStateYaml) != 0 { - zeroState = cfg.ZeroStateYaml - } - - config := collate.SyncerConfig{ - ShardId: shard, - ReplayBlocks: shard.IsMainShard(), - Timeout: syncerTimeout, - BlockGeneratorParams: cfg.BlockGeneratorParams(shard), - ValidatorPublicKey: &pKey.PublicKey, - ZeroState: zeroState, - ZeroStateConfig: zeroStateConfig, - } - if int(shard) < len(cfg.BootstrapPeers) { - config.BootstrapPeer = &cfg.BootstrapPeers[shard] - } - if networkManager == nil { - return nil, nil, errors.New("trying to start syncer without network configuration") - } - - syncer, err := collate.NewSyncer(config, database, networkManager) - if err != nil { - return nil, nil, err - } - - funcs[i] = func(ctx context.Context) error { - if err := syncer.Run(ctx, &wgFetch); err != nil { - logger.Error(). - Err(err). - Stringer(logging.FieldShardId, shard). - Msg("Syncer goroutine failed") - return err - } - return nil - } + }) + } else if networkManager == nil { + return nil, nil, errors.New("trying to start syncer without network configuration") } } @@ -557,18 +557,18 @@ func createShards( func createActiveCollator(shard types.ShardId, cfg *Config, collatorTickPeriod time.Duration, database db.DB, networkManager *network.Manager, txnPool txnpool.Pool) (*collate.Scheduler, error) { collatorCfg := collate.Params{ BlockGeneratorParams: execution.BlockGeneratorParams{ - ShardId: shard, - NShards: cfg.NShards, - TraceEVM: cfg.TraceEVM, - Timer: common.NewTimer(), - GasBasePrice: types.NewValueFromUint64(cfg.GasBasePrice), - GasPriceScale: cfg.GasPriceScale, + ShardId: shard, + NShards: cfg.NShards, + TraceEVM: cfg.TraceEVM, + Timer: common.NewTimer(), + GasBasePrice: types.NewValueFromUint64(cfg.GasBasePrice), + GasPriceScale: cfg.GasPriceScale, + MainKeysOutPath: cfg.MainKeysOutPath, }, CollatorTickPeriod: collatorTickPeriod, Timeout: collatorTickPeriod, ZeroState: execution.DefaultZeroStateConfig, ZeroStateConfig: cfg.ZeroState, - MainKeysOutPath: cfg.MainKeysOutPath, Topology: collate.GetShardTopologyById(cfg.Topology), } if len(cfg.ZeroStateYaml) != 0 {