Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make syncer permanent #40

Merged
merged 2 commits into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 3 additions & 50 deletions nil/internal/collate/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package collate

import (
"context"
"errors"
"time"

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/logging"
"github.com/NilFoundation/nil/nil/internal/db"
"github.com/NilFoundation/nil/nil/internal/execution"
Expand Down Expand Up @@ -37,7 +35,6 @@ type Params struct {

ZeroState string
ZeroStateConfig *execution.ZeroStateConfig
MainKeysOutPath string

Topology ShardTopology
}
Expand Down Expand Up @@ -75,15 +72,12 @@ func (s *Scheduler) Validator() *Validator {
}
}

func (s *Scheduler) Run(ctx context.Context, consensus Consensus) error {
func (s *Scheduler) Run(ctx context.Context, syncer *Syncer, consensus Consensus) error {
syncer.WaitComplete()

s.logger.Info().Msg("Starting collation...")
s.consensus = consensus

// At first generate zero-state if needed
if err := s.generateZeroState(ctx); err != nil {
return err
}

// Enable handler for snapshot relaying
SetBootstrapHandler(ctx, s.networkManager, s.params.ShardId, s.txFabric)

Expand All @@ -109,37 +103,6 @@ func (s *Scheduler) Run(ctx context.Context, consensus Consensus) error {
}
}

func (s *Scheduler) generateZeroState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, s.params.Timeout)
defer cancel()

if _, err := s.readLastBlockHash(ctx); !errors.Is(err, db.ErrKeyNotFound) {
// error or nil if last block found
return err
}

if len(s.params.MainKeysOutPath) != 0 && s.params.ShardId == types.BaseShardId {
if err := execution.DumpMainKeys(s.params.MainKeysOutPath); err != nil {
return err
}
}

s.logger.Info().Msg("Generating zero-state...")

gen, err := execution.NewBlockGenerator(ctx, s.params.BlockGeneratorParams, s.txFabric)
if err != nil {
return err
}
defer gen.Rollback()

block, err := gen.GenerateZeroState(s.params.ZeroState, s.params.ZeroStateConfig)
if err != nil {
return err
}

return PublishBlock(ctx, s.networkManager, s.params.ShardId, &types.BlockWithExtractedData{Block: block})
}

func (s *Scheduler) doCollate(ctx context.Context) error {
id, err := s.readLastBlockId(ctx)
if err != nil {
Expand All @@ -149,16 +112,6 @@ func (s *Scheduler) doCollate(ctx context.Context) error {
return s.consensus.RunSequence(ctx, id.Uint64()+1)
}

func (s *Scheduler) readLastBlockHash(ctx context.Context) (common.Hash, error) {
roTx, err := s.txFabric.CreateRoTx(ctx)
if err != nil {
return common.Hash{}, err
}
defer roTx.Rollback()

return db.ReadLastBlockHash(roTx, s.params.ShardId)
}

func (s *Scheduler) readLastBlockId(ctx context.Context) (types.BlockNumber, error) {
roTx, err := s.txFabric.CreateRoTx(ctx)
if err != nil {
Expand Down
122 changes: 78 additions & 44 deletions nil/internal/collate/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"slices"
"sync"
"time"
Expand Down Expand Up @@ -45,11 +44,13 @@ type Syncer struct {

logger zerolog.Logger

lastBlockNumber types.BlockNumber
lastBlockHash common.Hash
waitForSync *sync.WaitGroup
}

func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Syncer, error) {
var waitForSync sync.WaitGroup
waitForSync.Add(1)

return &Syncer{
config: cfg,
topic: topicShardBlocks(cfg.ShardId),
Expand All @@ -58,47 +59,47 @@ func NewSyncer(cfg SyncerConfig, db db.DB, networkManager *network.Manager) (*Sy
logger: logging.NewLogger("sync").With().
Stringer(logging.FieldShardId, cfg.ShardId).
Logger(),
lastBlockNumber: types.BlockNumber(math.MaxUint64),
waitForSync: &waitForSync,
}, nil
}

func (s *Syncer) readLastBlockNumber(ctx context.Context) error {
func (s *Syncer) readLastBlock(ctx context.Context) (*types.Block, common.Hash, error) {
rotx, err := s.db.CreateRoTx(ctx)
if err != nil {
return err
return nil, common.EmptyHash, err
}
defer rotx.Rollback()
lastBlock, lastBlockHash, err := db.ReadLastBlock(rotx, s.config.ShardId)

block, hash, err := db.ReadLastBlock(rotx, s.config.ShardId)
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return err
return nil, common.EmptyHash, err
}
if err == nil {
s.lastBlockNumber = lastBlock.Id
s.lastBlockHash = lastBlockHash
return block, hash, err
}
return nil
return nil, common.EmptyHash, nil
}

func (s *Syncer) shardIsEmpty(ctx context.Context) (bool, error) {
rotx, err := s.db.CreateRoTx(ctx)
block, _, err := s.readLastBlock(ctx)
if err != nil {
return false, err
}
defer rotx.Rollback()
return block == nil, nil
}

_, err = db.ReadLastBlockHash(rotx, s.config.ShardId)
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return false, err
}
return err != nil, nil
func (s *Syncer) WaitComplete() {
s.waitForSync.Wait()
}

func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error {
if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil {
return err
} else if snapIsRequired {
if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil {
return fmt.Errorf("failed to fetch snapshot: %w", err)
if s.config.ReplayBlocks {
if snapIsRequired, err := s.shardIsEmpty(ctx); err != nil {
return err
} else if snapIsRequired {
if err := FetchSnapshot(ctx, s.networkManager, s.config.BootstrapPeer, s.config.ShardId, s.db); err != nil {
return fmt.Errorf("failed to fetch snapshot: %w", err)
}
}
}

Expand All @@ -107,27 +108,37 @@ func (s *Syncer) Run(ctx context.Context, wgFetch *sync.WaitGroup) error {
wgFetch.Done()
wgFetch.Wait()

if s.config.ReplayBlocks {
if err := s.generateZerostate(ctx); err != nil {
return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err)
}
if err := s.generateZerostate(ctx); err != nil {
return fmt.Errorf("Failed to generate zerostate for shard %s: %w", s.config.ShardId, err)
}

err := s.readLastBlockNumber(ctx)
if s.networkManager == nil {
s.waitForSync.Done()
return nil
}

block, hash, err := s.readLastBlock(ctx)
if err != nil {
return fmt.Errorf("failed to read last block number: %w", err)
}

s.logger.Debug().
Stringer(logging.FieldBlockHash, s.lastBlockHash).
Uint64(logging.FieldBlockNumber, s.lastBlockNumber.Uint64()).
Msgf("Initialized sync proposer at starting block")
Stringer(logging.FieldBlockHash, hash).
Uint64(logging.FieldBlockNumber, uint64(block.Id)).
Msg("Initialized sync proposer at starting block")

s.logger.Info().Msg("Starting sync")

s.fetchBlocks(ctx)
s.waitForSync.Done()

if ctx.Err() != nil {
return nil
}

sub, err := s.networkManager.PubSub().Subscribe(s.topic)
if err != nil {
return err
return fmt.Errorf("Failed to subscribe to %s: %w", s.topic, err)
}
defer sub.Close()

Expand Down Expand Up @@ -169,17 +180,22 @@ func (s *Syncer) processTopicTransaction(ctx context.Context, data []byte) (bool
Stringer(logging.FieldBlockHash, block.Hash(s.config.ShardId)).
Msg("Received block")

if block.Id != s.lastBlockNumber+1 {
lastBlock, lastHash, err := s.readLastBlock(ctx)
if err != nil {
return false, err
}

if block.Id != lastBlock.Id+1 {
s.logger.Debug().
Stringer(logging.FieldBlockNumber, block.Id).
Msgf("Received block is out of order with the last block %d", s.lastBlockNumber)
Msgf("Received block is out of order with the last block %d", lastBlock.Id)

// todo: queue the block for later processing
return false, nil
}

if block.PrevBlock != s.lastBlockHash {
txn := fmt.Sprintf("Prev block hash mismatch: expected %x, got %x", s.lastBlockHash, block.PrevBlock)
if block.PrevBlock != lastHash {
txn := fmt.Sprintf("Prev block hash mismatch: expected %x, got %x", lastHash, block.PrevBlock)
s.logger.Error().
Stringer(logging.FieldBlockNumber, block.Id).
Stringer(logging.FieldBlockHash, block.Hash(s.config.ShardId)).
Expand Down Expand Up @@ -221,11 +237,16 @@ func (s *Syncer) fetchBlocksRange(ctx context.Context) []*types.BlockWithExtract

s.logger.Debug().Msgf("Found %d peers to fetch block from:\n%v", len(peers), peers)

lastBlock, _, err := s.readLastBlock(ctx)
if err != nil {
return nil
}

for _, p := range peers {
s.logger.Debug().Msgf("Requesting block %d from peer %s", s.lastBlockNumber+1, p)
s.logger.Debug().Msgf("Requesting block %d from peer %s", lastBlock.Id+1, p)

const count = 100
blocks, err := RequestBlocks(ctx, s.networkManager, p, s.config.ShardId, s.lastBlockNumber+1, count)
blocks, err := RequestBlocks(ctx, s.networkManager, p, s.config.ShardId, lastBlock.Id+1, count)
if err == nil {
return blocks
}
Expand Down Expand Up @@ -274,11 +295,9 @@ func (s *Syncer) saveBlocks(ctx context.Context, blocks []*types.BlockWithExtrac
}
}

s.lastBlockNumber = blocks[len(blocks)-1].Block.Id
s.lastBlockHash = blocks[len(blocks)-1].Block.Hash(s.config.ShardId)

lastBlockNumber := blocks[len(blocks)-1].Block.Id
s.logger.Debug().
Stringer(logging.FieldBlockNumber, s.lastBlockNumber).
Stringer(logging.FieldBlockNumber, lastBlockNumber).
Msg("Blocks written")

return nil
Expand Down Expand Up @@ -333,20 +352,35 @@ func (s *Syncer) saveDirectly(ctx context.Context, blocks []*types.BlockWithExtr
}

func (s *Syncer) generateZerostate(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()

if empty, err := s.shardIsEmpty(ctx); err != nil {
return err
} else if !empty {
return nil
}

if len(s.config.BlockGeneratorParams.MainKeysOutPath) != 0 && s.config.ShardId == types.BaseShardId {
if err := execution.DumpMainKeys(s.config.BlockGeneratorParams.MainKeysOutPath); err != nil {
return err
}
}

s.logger.Info().Msg("Generating zero-state...")

gen, err := execution.NewBlockGenerator(ctx, s.config.BlockGeneratorParams, s.db)
if err != nil {
return err
}
defer gen.Rollback()

_, err = gen.GenerateZeroState(s.config.ZeroState, s.config.ZeroStateConfig)
return err
block, err := gen.GenerateZeroState(s.config.ZeroState, s.config.ZeroStateConfig)
if err != nil {
return err
}

return PublishBlock(ctx, s.networkManager, s.config.ShardId, &types.BlockWithExtractedData{Block: block})
}

func validateRepliedBlock(
Expand Down
13 changes: 7 additions & 6 deletions nil/internal/execution/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
)

type BlockGeneratorParams struct {
ShardId types.ShardId
NShards uint32
TraceEVM bool
Timer common.Timer
GasBasePrice types.Value
GasPriceScale float64
ShardId types.ShardId
NShards uint32
TraceEVM bool
Timer common.Timer
GasBasePrice types.Value
GasPriceScale float64
MainKeysOutPath string
}

type Proposal struct {
Expand Down
Loading