Skip to content

Commit

Permalink
make syncer permanent
Browse files Browse the repository at this point in the history
In case of several validators it should be able to be in-sync.
So we need to create syncer even for active shards.
  • Loading branch information
olegrok committed Jan 30, 2025
1 parent 70e4d16 commit 522cafd
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 122 deletions.
48 changes: 3 additions & 45 deletions nil/internal/collate/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package collate

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -41,7 +40,6 @@ type Params struct {

ZeroState string
ZeroStateConfig *execution.ZeroStateConfig
MainKeysOutPath string

Topology ShardTopology
}
Expand Down Expand Up @@ -78,15 +76,12 @@ func NewScheduler(txFabric db.DB, pool txnpool.Pool, params Params, networkManag
}, nil
}

func (s *Scheduler) Run(ctx context.Context, consensus Consensus) error {
func (s *Scheduler) Run(ctx context.Context, syncer *Syncer, consensus Consensus) error {
syncer.WaitComplete()

s.logger.Info().Msg("Starting collation...")
s.consensus = consensus

// At first generate zero-state if needed
if err := s.generateZeroState(ctx); err != nil {
return err
}

// Enable handler for snapshot relaying
SetBootstrapHandler(ctx, s.networkManager, s.params.ShardId, s.txFabric)

Expand All @@ -112,43 +107,6 @@ func (s *Scheduler) Run(ctx context.Context, consensus Consensus) error {
}
}

func (s *Scheduler) generateZeroState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, s.params.Timeout)
defer cancel()

roTx, err := s.txFabric.CreateRoTx(ctx)
if err != nil {
return err
}
defer roTx.Rollback()

if _, err := db.ReadLastBlockHash(roTx, s.params.ShardId); !errors.Is(err, db.ErrKeyNotFound) {
// error or nil if last block found
return err
}

if len(s.params.MainKeysOutPath) != 0 && s.params.ShardId == types.BaseShardId {
if err := execution.DumpMainKeys(s.params.MainKeysOutPath); err != nil {
return err
}
}

s.logger.Info().Msg("Generating zero-state...")

gen, err := execution.NewBlockGenerator(ctx, s.params.BlockGeneratorParams, s.txFabric)
if err != nil {
return err
}
defer gen.Rollback()

block, err := gen.GenerateZeroState(s.params.ZeroState, s.params.ZeroStateConfig)
if err != nil {
return err
}

return PublishBlock(ctx, s.networkManager, s.params.ShardId, &types.BlockWithExtractedData{Block: block})
}

func (s *Scheduler) BuildProposal(ctx context.Context) (*execution.Proposal, error) {
collator := newCollator(s.params, s.params.Topology, s.pool, s.logger)
proposal, err := collator.GenerateProposal(ctx, s.txFabric)
Expand Down
60 changes: 48 additions & 12 deletions nil/internal/collate/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ type Syncer struct {
lastBlockHash common.Hash

validatorPublicKey []byte
waitForSync *sync.WaitGroup
}

func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Syncer, error) {
if cfg.ValidatorPublicKey == nil {
return nil, errors.New("validator public key is required")
}

var waitForSync sync.WaitGroup
waitForSync.Add(1)

return &Syncer{
config: cfg,
topic: topicShardBlocks(cfg.ShardId),
Expand All @@ -67,6 +71,7 @@ func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Sy
Logger(),
lastBlockNumber: types.BlockNumber(math.MaxUint64),
validatorPublicKey: crypto.CompressPubkey(cfg.ValidatorPublicKey),
waitForSync: &waitForSync,
}, nil
}

Expand Down Expand Up @@ -101,12 +106,18 @@ func (s *Syncer) shardIsEmpty(ctx context.Context) (bool, error) {
return err != nil, nil
}

func (s *Syncer) WaitComplete() {
s.waitForSync.Wait()
}

func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error {
if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil {
return err
} else if snapIsRequired {
if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil {
return fmt.Errorf("failed to fetch snapshot: %w", err)
if s.config.ReplayBlocks {
if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil {
return err
} else if snapIsRequired {
if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil {
return fmt.Errorf("failed to fetch snapshot: %w", err)
}
}
}

Expand All @@ -115,10 +126,13 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error {
wgFetch.Done()
wgFetch.Wait()

if s.config.ReplayBlocks {
if err := s.generateZerostate(ctx); err != nil {
return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err)
}
if err := s.generateZerostate(ctx); err != nil {
return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err)
}

if s.networkManager == nil {
s.waitForSync.Done()
return nil
}

err := s.readLastBlockNumber(ctx)
Expand All @@ -133,9 +147,16 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error {

s.logger.Info().Msg("Starting sync")

s.fetchBlocks(ctx)
s.waitForSync.Done()

if ctx.Err() != nil {
return nil
}

sub, err := s.networkManager.PubSub().Subscribe(s.topic)
if err != nil {
return err
return fmt.Errorf("Failed to subscribe to %s: %w", s.topic, err)
}
defer sub.Close()

Expand Down Expand Up @@ -342,20 +363,35 @@ func (s *Syncer) saveDirectly(ctx context.Context, blocks []*types.BlockWithExtr
}

func (s *Syncer) generateZerostate(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()

if empty, err := s.shardIsEmpty(ctx); err != nil {
return err
} else if !empty {
return nil
}

if len(s.config.BlockGeneratorParams.MainKeysOutPath) != 0 && s.config.ShardId == types.BaseShardId {
if err := execution.DumpMainKeys(s.config.BlockGeneratorParams.MainKeysOutPath); err != nil {
return err
}
}

s.logger.Info().Msg("Generating zero-state...")

gen, err := execution.NewBlockGenerator(ctx, s.config.BlockGeneratorParams, s.db)
if err != nil {
return err
}
defer gen.Rollback()

_, err = gen.GenerateZeroState(s.config.ZeroState, s.config.ZeroStateConfig)
return err
block, err := gen.GenerateZeroState(s.config.ZeroState, s.config.ZeroStateConfig)
if err != nil {
return err
}

return PublishBlock(ctx, s.networkManager, s.config.ShardId, &types.BlockWithExtractedData{Block: block})
}

func validateRepliedBlock(
Expand Down
13 changes: 7 additions & 6 deletions nil/internal/execution/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
)

type BlockGeneratorParams struct {
ShardId types.ShardId
NShards uint32
TraceEVM bool
Timer common.Timer
GasBasePrice types.Value
GasPriceScale float64
ShardId types.ShardId
NShards uint32
TraceEVM bool
Timer common.Timer
GasBasePrice types.Value
GasPriceScale float64
MainKeysOutPath string
}

type Proposal struct {
Expand Down
118 changes: 59 additions & 59 deletions nil/services/nilservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func createArchiveSyncers(cfg *Config, nm *network.Manager, database db.DB, logg
logger.Error().
Err(err).
Stringer(logging.FieldShardId, shardId).
Msg("Collator goroutine failed")
Msg("Syncer goroutine failed")
return err
}
return nil
Expand Down Expand Up @@ -464,32 +464,68 @@ func createShards(
collatorTickPeriod := time.Millisecond * time.Duration(cfg.CollatorTickPeriodMs)
syncerTimeout := syncTimeoutFactor * collatorTickPeriod

funcs := make([]concurrent.Func, cfg.NShards)
funcs := make([]concurrent.Func, 0, 2*cfg.NShards)
pools := make(map[types.ShardId]txnpool.Pool)

var wgFetch sync.WaitGroup
wgFetch.Add(int(cfg.NShards) - len(cfg.GetMyShards()))
wgFetch.Add(int(cfg.NShards))

for i := range cfg.NShards {
shard := types.ShardId(i)
shardId := types.ShardId(i)

pKey, err := cfg.LoadValidatorPrivateKey(shard)
pKey, err := cfg.LoadValidatorPrivateKey(shardId)
if err != nil {
return nil, nil, err
}

if cfg.IsShardActive(shard) {
txnPool, err := txnpool.New(ctx, txnpool.NewConfig(shard), networkManager)
zeroState := execution.DefaultZeroStateConfig
zeroStateConfig := cfg.ZeroState
if len(cfg.ZeroStateYaml) != 0 {
zeroState = cfg.ZeroStateYaml
}

syncerCfg := collate.SyncerConfig{
ShardId: shardId,
ReplayBlocks: shardId.IsMainShard() || cfg.IsShardActive(shardId),
Timeout: syncerTimeout,
BlockGeneratorParams: cfg.BlockGeneratorParams(shardId),
ValidatorPublicKey: &pKey.PublicKey,
ZeroState: zeroState,
ZeroStateConfig: zeroStateConfig,
}

if int(shardId) < len(cfg.BootstrapPeers) {
syncerCfg.BootstrapPeer = &cfg.BootstrapPeers[shardId]
}

syncer, err := collate.NewSyncer(syncerCfg, database, networkManager)
if err != nil {
return nil, nil, err
}

funcs = append(funcs, func(ctx context.Context) error {
if err := syncer.Run(ctx, &wgFetch); err != nil {
logger.Error().
Err(err).
Stringer(logging.FieldShardId, shardId).
Msg("Syncer goroutine failed")
return err
}
return nil
})

if cfg.IsShardActive(shardId) {
txnPool, err := txnpool.New(ctx, txnpool.NewConfig(shardId), networkManager)
if err != nil {
return nil, nil, err
}
collator, err := createActiveCollator(shard, cfg, collatorTickPeriod, database, networkManager, txnPool)
collator, err := createActiveCollator(shardId, cfg, collatorTickPeriod, database, networkManager, txnPool)
if err != nil {
return nil, nil, err
}

consensus := ibft.NewConsensus(&ibft.ConsensusParams{
ShardId: shard,
ShardId: shardId,
Db: database,
Scheduler: collator,
NetManager: networkManager,
Expand All @@ -499,55 +535,19 @@ func createShards(
return nil, nil, err
}

pools[shard] = txnPool
funcs[i] = func(ctx context.Context) error {
if err := collator.Run(ctx, consensus); err != nil {
pools[shardId] = txnPool
funcs = append(funcs, func(ctx context.Context) error {
if err := collator.Run(ctx, syncer, consensus); err != nil {
logger.Error().
Err(err).
Stringer(logging.FieldShardId, shard).
Stringer(logging.FieldShardId, shardId).
Msg("Collator goroutine failed")
return err
}
return nil
}
} else {
zeroState := execution.DefaultZeroStateConfig
zeroStateConfig := cfg.ZeroState
if len(cfg.ZeroStateYaml) != 0 {
zeroState = cfg.ZeroStateYaml
}

config := collate.SyncerConfig{
ShardId: shard,
ReplayBlocks: shard.IsMainShard(),
Timeout: syncerTimeout,
BlockGeneratorParams: cfg.BlockGeneratorParams(shard),
ValidatorPublicKey: &pKey.PublicKey,
ZeroState: zeroState,
ZeroStateConfig: zeroStateConfig,
}
if int(shard) < len(cfg.BootstrapPeers) {
config.BootstrapPeer = &cfg.BootstrapPeers[shard]
}
if networkManager == nil {
return nil, nil, errors.New("trying to start syncer without network configuration")
}

syncer, err := collate.NewSyncer(config, database, networkManager)
if err != nil {
return nil, nil, err
}

funcs[i] = func(ctx context.Context) error {
if err := syncer.Run(ctx, &wgFetch); err != nil {
logger.Error().
Err(err).
Stringer(logging.FieldShardId, shard).
Msg("Syncer goroutine failed")
return err
}
return nil
}
})
} else if networkManager == nil {
return nil, nil, errors.New("trying to start syncer without network configuration")
}
}

Expand All @@ -557,18 +557,18 @@ func createShards(
func createActiveCollator(shard types.ShardId, cfg *Config, collatorTickPeriod time.Duration, database db.DB, networkManager *network.Manager, txnPool txnpool.Pool) (*collate.Scheduler, error) {
collatorCfg := collate.Params{
BlockGeneratorParams: execution.BlockGeneratorParams{
ShardId: shard,
NShards: cfg.NShards,
TraceEVM: cfg.TraceEVM,
Timer: common.NewTimer(),
GasBasePrice: types.NewValueFromUint64(cfg.GasBasePrice),
GasPriceScale: cfg.GasPriceScale,
ShardId: shard,
NShards: cfg.NShards,
TraceEVM: cfg.TraceEVM,
Timer: common.NewTimer(),
GasBasePrice: types.NewValueFromUint64(cfg.GasBasePrice),
GasPriceScale: cfg.GasPriceScale,
MainKeysOutPath: cfg.MainKeysOutPath,
},
CollatorTickPeriod: collatorTickPeriod,
Timeout: collatorTickPeriod,
ZeroState: execution.DefaultZeroStateConfig,
ZeroStateConfig: cfg.ZeroState,
MainKeysOutPath: cfg.MainKeysOutPath,
Topology: collate.GetShardTopologyById(cfg.Topology),
}
if len(cfg.ZeroStateYaml) != 0 {
Expand Down

0 comments on commit 522cafd

Please sign in to comment.