Skip to content

Commit

Permalink
feat(cannon): Preload validators (#337)
Browse files Browse the repository at this point in the history
* feat(cannon): Preload validators

* perf: Improve cache expiration time to 5 minutes
  • Loading branch information
samcm authored Jun 19, 2024
1 parent 723187b commit a340f24
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 26 deletions.
48 changes: 28 additions & 20 deletions pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/api"
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
backoff "github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -155,7 +154,7 @@ func (b *BeaconValidatorsDeriver) run(rctx context.Context) {
return nil
}

events, err := b.processEpoch(ctx, phase0.Epoch(epochToProcess))
events, slot, err := b.processEpoch(ctx, phase0.Epoch(epochToProcess))
if err != nil {
b.log.WithError(err).WithField("epoch", epochToProcess).Error("Failed to process epoch")

Expand All @@ -164,6 +163,27 @@ func (b *BeaconValidatorsDeriver) run(rctx context.Context) {
return err
}

// Be a good citizen and clean up the validator cache for the current epoch
b.beacon.DeleteValidatorsFromCache(xatuethv1.SlotAsString(slot))

if !processingFinalizedEpoch {
// If we are backfilling we can prewarm the validator cache with the next epoch's validators
spec, err := b.beacon.Node().Spec()
if err != nil {
return errors.Wrap(err, "failed to fetch spec")
}

nextSlot := slot - spec.SlotsPerEpoch
if nextSlot > 0 {
b.log.WithFields(logrus.Fields{
"next_state_id": nextSlot,
"current_state_id": slot,
}).Info("Prewarming validators cache")

b.beacon.LazyLoadValidators(xatuethv1.SlotAsString(nextSlot))
}
}

for _, fn := range b.onEventsCallbacks {
if err := fn(ctx, events); err != nil {
span.SetStatus(codes.Error, err.Error())
Expand Down Expand Up @@ -215,37 +235,25 @@ func (b *BeaconValidatorsDeriver) run(rctx context.Context) {
}
}

func (b *BeaconValidatorsDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) {
func (b *BeaconValidatorsDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, phase0.Slot, error) {
ctx, span := observability.Tracer().Start(ctx,
"BeaconValidatorsDeriver.processEpoch",
trace.WithAttributes(attribute.Int64("epoch", int64(epoch))),
)
defer span.End()

validatorStates, err := b.getValidatorsClient(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to fetch validator states")
}

spec, err := b.beacon.Node().Spec()
if err != nil {
return nil, errors.Wrap(err, "failed to fetch spec")
return nil, 0, errors.Wrap(err, "failed to fetch spec")
}

boundarySlot := phase0.Slot(uint64(epoch) * uint64(spec.SlotsPerEpoch))

validatorsResponse, err := validatorStates.Validators(ctx, &api.ValidatorsOpts{
State: xatuethv1.SlotAsString(boundarySlot),
Common: api.CommonOpts{
Timeout: 6 * time.Minute, // If we can't fetch 1 epoch of validators within 1 epoch we will never catch up.
},
})
validatorsMap, err := b.beacon.GetValidators(ctx, xatuethv1.SlotAsString(boundarySlot))
if err != nil {
return nil, errors.Wrap(err, "failed to fetch validator states")
return nil, 0, errors.Wrap(err, "failed to fetch validator states")
}

validatorsMap := validatorsResponse.Data

// Chunk the validators per the configured chunk size
chunkSize := b.cfg.ChunkSize

Expand Down Expand Up @@ -278,13 +286,13 @@ func (b *BeaconValidatorsDeriver) processEpoch(ctx context.Context, epoch phase0
WithField("epoch", epoch).
Error("Failed to create event from validator state")

return nil, err
return nil, 0, err
}

allEvents = append(allEvents, event)
}

return allEvents, nil
return allEvents, boundarySlot, nil
}

func (b *BeaconValidatorsDeriver) getValidatorsClient(ctx context.Context) (client.ValidatorsProvider, error) {
Expand Down
121 changes: 115 additions & 6 deletions pkg/cannon/ethereum/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (
"sync"
"time"

client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/api"
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/beacon/pkg/beacon"
"github.com/ethpandaops/xatu/pkg/cannon/ethereum/services"
"github.com/ethpandaops/xatu/pkg/networks"
Expand Down Expand Up @@ -36,6 +40,11 @@ type BeaconNode struct {
blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock]
blockPreloadChan chan string
blockPreloadSem chan struct{}

validatorsSfGroup *singleflight.Group
validatorsCache *ttlcache.Cache[string, map[phase0.ValidatorIndex]*apiv1.Validator]
validatorsPreloadChan chan string
validatorsPreloadSem chan struct{}
}

func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) {
Expand Down Expand Up @@ -72,20 +81,28 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.

// Create a buffered channel (semaphore) to limit the number of concurrent goroutines.
sem := make(chan struct{}, config.BlockPreloadWorkers)
validatorsSem := make(chan struct{}, 1)

return &BeaconNode{
config: config,
log: log.WithField("module", "cannon/ethereum/beacon"),
beacon: node,
services: svcs,
sfGroup: &singleflight.Group{},
config: config,
log: log.WithField("module", "cannon/ethereum/beacon"),
beacon: node,
services: svcs,
sfGroup: &singleflight.Group{},
validatorsSfGroup: &singleflight.Group{},
blockCache: ttlcache.New(
ttlcache.WithTTL[string, *spec.VersionedSignedBeaconBlock](config.BlockCacheTTL.Duration),
ttlcache.WithCapacity[string, *spec.VersionedSignedBeaconBlock](config.BlockCacheSize),
),
blockPreloadChan: make(chan string, config.BlockPreloadQueueSize),
blockPreloadSem: sem,
metrics: NewMetrics(namespace, name),
validatorsCache: ttlcache.New(
ttlcache.WithTTL[string, map[phase0.ValidatorIndex]*apiv1.Validator](5*time.Minute),
ttlcache.WithCapacity[string, map[phase0.ValidatorIndex]*apiv1.Validator](4),
),
validatorsPreloadChan: make(chan string, 2),
validatorsPreloadSem: validatorsSem,
metrics: NewMetrics(namespace, name),
}, nil
}

Expand Down Expand Up @@ -155,6 +172,15 @@ func (b *BeaconNode) Start(ctx context.Context) error {
}()
}

go func() {
for identifier := range b.validatorsPreloadChan {
b.log.WithField("identifier", identifier).Trace("Preloading validators")

//nolint:errcheck // We don't care about errors here.
b.GetValidators(ctx, identifier)
}
}()

select {
case err := <-errs:
return err
Expand Down Expand Up @@ -307,3 +333,86 @@ func (b *BeaconNode) LazyLoadBeaconBlock(identifier string) {

b.blockPreloadChan <- identifier
}

// GetValidators returns a list of validators by its identifier. Validators can be cached internally.
func (b *BeaconNode) GetValidators(ctx context.Context, identifier string) (map[phase0.ValidatorIndex]*apiv1.Validator, error) {
ctx, span := observability.Tracer().Start(ctx, "ethereum.beacon.GetValidators", trace.WithAttributes(attribute.String("identifier", identifier)))

defer span.End()

// Check the cache first.
if item := b.validatorsCache.Get(identifier); item != nil {
b.log.WithField("identifier", identifier).Debug("Validator cache hit")

span.SetAttributes(attribute.Bool("cached", true))

return item.Value(), nil
}

span.SetAttributes(attribute.Bool("cached", false))

// Use singleflight to ensure we only make one request for validators at a time.
x, err, shared := b.validatorsSfGroup.Do(identifier, func() (interface{}, error) {
span.AddEvent("Acquiring semaphore...")

// Acquire a semaphore before proceeding.
b.validatorsPreloadSem <- struct{}{}
defer func() { <-b.validatorsPreloadSem }()

span.AddEvent("Semaphore acquired. Fetching validators from beacon api...")

client, err := b.getValidatorsClient(ctx)
if err != nil {
return nil, err
}

// Not in the cache, so fetch it.
validatorsResponse, err := client.Validators(ctx, &api.ValidatorsOpts{
State: identifier,
Common: api.CommonOpts{
Timeout: 6 * time.Minute, // If we can't fetch 1 epoch of validators within 1 epoch we will never catch up.
},
})
if err != nil {
return nil, err
}

validators := validatorsResponse.Data

span.AddEvent("Validators fetched from beacon node.")

// Add it to the cache.
b.validatorsCache.Set(identifier, validators, 5*time.Minute)

return validators, nil
})
if err != nil {
span.SetStatus(codes.Error, err.Error())

return nil, err
}

span.AddEvent("Validators fetching complete.", trace.WithAttributes(attribute.Bool("shared", shared)))

return x.(map[phase0.ValidatorIndex]*apiv1.Validator), nil
}

func (b *BeaconNode) LazyLoadValidators(stateID string) {
if item := b.validatorsCache.Get(stateID); item != nil {
return
}

b.validatorsPreloadChan <- stateID
}

func (b *BeaconNode) getValidatorsClient(ctx context.Context) (client.ValidatorsProvider, error) {
if provider, isProvider := b.beacon.Service().(client.ValidatorsProvider); isProvider {
return provider, nil
}

return nil, errors.New("validator states client not found")
}

func (b *BeaconNode) DeleteValidatorsFromCache(stateID string) {
b.validatorsCache.Delete(stateID)
}

0 comments on commit a340f24

Please sign in to comment.