diff --git a/pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go b/pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go index e536c8da..3c31c804 100644 --- a/pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go +++ b/pkg/cannon/deriver/beacon/eth/v1/beacon_validators.go @@ -6,7 +6,6 @@ import ( "time" client "github.com/attestantio/go-eth2-client" - "github.com/attestantio/go-eth2-client/api" apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" backoff "github.com/cenkalti/backoff/v4" @@ -155,7 +154,7 @@ func (b *BeaconValidatorsDeriver) run(rctx context.Context) { return nil } - events, err := b.processEpoch(ctx, phase0.Epoch(epochToProcess)) + events, slot, err := b.processEpoch(ctx, phase0.Epoch(epochToProcess)) if err != nil { b.log.WithError(err).WithField("epoch", epochToProcess).Error("Failed to process epoch") @@ -164,6 +163,27 @@ func (b *BeaconValidatorsDeriver) run(rctx context.Context) { return err } + // Be a good citizen and clean up the validator cache for the current epoch + b.beacon.DeleteValidatorsFromCache(xatuethv1.SlotAsString(slot)) + + if !processingFinalizedEpoch { + // If we are backfilling we can prewarm the validator cache with the next epoch's validators + spec, err := b.beacon.Node().Spec() + if err != nil { + return errors.Wrap(err, "failed to fetch spec") + } + + nextSlot := slot - spec.SlotsPerEpoch + if nextSlot > 0 { + b.log.WithFields(logrus.Fields{ + "next_state_id": nextSlot, + "current_state_id": slot, + }).Info("Prewarming validators cache") + + b.beacon.LazyLoadValidators(xatuethv1.SlotAsString(nextSlot)) + } + } + for _, fn := range b.onEventsCallbacks { if err := fn(ctx, events); err != nil { span.SetStatus(codes.Error, err.Error()) @@ -215,37 +235,25 @@ func (b *BeaconValidatorsDeriver) run(rctx context.Context) { } } -func (b *BeaconValidatorsDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, error) { +func (b *BeaconValidatorsDeriver) processEpoch(ctx context.Context, epoch phase0.Epoch) ([]*xatu.DecoratedEvent, phase0.Slot, error) { ctx, span := observability.Tracer().Start(ctx, "BeaconValidatorsDeriver.processEpoch", trace.WithAttributes(attribute.Int64("epoch", int64(epoch))), ) defer span.End() - validatorStates, err := b.getValidatorsClient(ctx) - if err != nil { - return nil, errors.Wrap(err, "failed to fetch validator states") - } - spec, err := b.beacon.Node().Spec() if err != nil { - return nil, errors.Wrap(err, "failed to fetch spec") + return nil, 0, errors.Wrap(err, "failed to fetch spec") } boundarySlot := phase0.Slot(uint64(epoch) * uint64(spec.SlotsPerEpoch)) - validatorsResponse, err := validatorStates.Validators(ctx, &api.ValidatorsOpts{ - State: xatuethv1.SlotAsString(boundarySlot), - Common: api.CommonOpts{ - Timeout: 6 * time.Minute, // If we can't fetch 1 epoch of validators within 1 epoch we will never catch up. - }, - }) + validatorsMap, err := b.beacon.GetValidators(ctx, xatuethv1.SlotAsString(boundarySlot)) if err != nil { - return nil, errors.Wrap(err, "failed to fetch validator states") + return nil, 0, errors.Wrap(err, "failed to fetch validator states") } - validatorsMap := validatorsResponse.Data - // Chunk the validators per the configured chunk size chunkSize := b.cfg.ChunkSize @@ -278,13 +286,13 @@ func (b *BeaconValidatorsDeriver) processEpoch(ctx context.Context, epoch phase0 WithField("epoch", epoch). Error("Failed to create event from validator state") - return nil, err + return nil, 0, err } allEvents = append(allEvents, event) } - return allEvents, nil + return allEvents, boundarySlot, nil } func (b *BeaconValidatorsDeriver) getValidatorsClient(ctx context.Context) (client.ValidatorsProvider, error) { diff --git a/pkg/cannon/ethereum/beacon.go b/pkg/cannon/ethereum/beacon.go index 6237b221..62d0a3f2 100644 --- a/pkg/cannon/ethereum/beacon.go +++ b/pkg/cannon/ethereum/beacon.go @@ -6,7 +6,11 @@ import ( "sync" "time" + client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/go-eth2-client/api" + apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethpandaops/beacon/pkg/beacon" "github.com/ethpandaops/xatu/pkg/cannon/ethereum/services" "github.com/ethpandaops/xatu/pkg/networks" @@ -36,6 +40,11 @@ type BeaconNode struct { blockCache *ttlcache.Cache[string, *spec.VersionedSignedBeaconBlock] blockPreloadChan chan string blockPreloadSem chan struct{} + + validatorsSfGroup *singleflight.Group + validatorsCache *ttlcache.Cache[string, map[phase0.ValidatorIndex]*apiv1.Validator] + validatorsPreloadChan chan string + validatorsPreloadSem chan struct{} } func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) { @@ -72,20 +81,28 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus. // Create a buffered channel (semaphore) to limit the number of concurrent goroutines. sem := make(chan struct{}, config.BlockPreloadWorkers) + validatorsSem := make(chan struct{}, 1) return &BeaconNode{ - config: config, - log: log.WithField("module", "cannon/ethereum/beacon"), - beacon: node, - services: svcs, - sfGroup: &singleflight.Group{}, + config: config, + log: log.WithField("module", "cannon/ethereum/beacon"), + beacon: node, + services: svcs, + sfGroup: &singleflight.Group{}, + validatorsSfGroup: &singleflight.Group{}, blockCache: ttlcache.New( ttlcache.WithTTL[string, *spec.VersionedSignedBeaconBlock](config.BlockCacheTTL.Duration), ttlcache.WithCapacity[string, *spec.VersionedSignedBeaconBlock](config.BlockCacheSize), ), blockPreloadChan: make(chan string, config.BlockPreloadQueueSize), blockPreloadSem: sem, - metrics: NewMetrics(namespace, name), + validatorsCache: ttlcache.New( + ttlcache.WithTTL[string, map[phase0.ValidatorIndex]*apiv1.Validator](5*time.Minute), + ttlcache.WithCapacity[string, map[phase0.ValidatorIndex]*apiv1.Validator](4), + ), + validatorsPreloadChan: make(chan string, 2), + validatorsPreloadSem: validatorsSem, + metrics: NewMetrics(namespace, name), }, nil } @@ -155,6 +172,15 @@ func (b *BeaconNode) Start(ctx context.Context) error { }() } + go func() { + for identifier := range b.validatorsPreloadChan { + b.log.WithField("identifier", identifier).Trace("Preloading validators") + + //nolint:errcheck // We don't care about errors here. + b.GetValidators(ctx, identifier) + } + }() + select { case err := <-errs: return err @@ -307,3 +333,86 @@ func (b *BeaconNode) LazyLoadBeaconBlock(identifier string) { b.blockPreloadChan <- identifier } + +// GetValidators returns a list of validators by its identifier. Validators can be cached internally. +func (b *BeaconNode) GetValidators(ctx context.Context, identifier string) (map[phase0.ValidatorIndex]*apiv1.Validator, error) { + ctx, span := observability.Tracer().Start(ctx, "ethereum.beacon.GetValidators", trace.WithAttributes(attribute.String("identifier", identifier))) + + defer span.End() + + // Check the cache first. + if item := b.validatorsCache.Get(identifier); item != nil { + b.log.WithField("identifier", identifier).Debug("Validator cache hit") + + span.SetAttributes(attribute.Bool("cached", true)) + + return item.Value(), nil + } + + span.SetAttributes(attribute.Bool("cached", false)) + + // Use singleflight to ensure we only make one request for validators at a time. + x, err, shared := b.validatorsSfGroup.Do(identifier, func() (interface{}, error) { + span.AddEvent("Acquiring semaphore...") + + // Acquire a semaphore before proceeding. + b.validatorsPreloadSem <- struct{}{} + defer func() { <-b.validatorsPreloadSem }() + + span.AddEvent("Semaphore acquired. Fetching validators from beacon api...") + + client, err := b.getValidatorsClient(ctx) + if err != nil { + return nil, err + } + + // Not in the cache, so fetch it. + validatorsResponse, err := client.Validators(ctx, &api.ValidatorsOpts{ + State: identifier, + Common: api.CommonOpts{ + Timeout: 6 * time.Minute, // If we can't fetch 1 epoch of validators within 1 epoch we will never catch up. + }, + }) + if err != nil { + return nil, err + } + + validators := validatorsResponse.Data + + span.AddEvent("Validators fetched from beacon node.") + + // Add it to the cache. + b.validatorsCache.Set(identifier, validators, 5*time.Minute) + + return validators, nil + }) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + + return nil, err + } + + span.AddEvent("Validators fetching complete.", trace.WithAttributes(attribute.Bool("shared", shared))) + + return x.(map[phase0.ValidatorIndex]*apiv1.Validator), nil +} + +func (b *BeaconNode) LazyLoadValidators(stateID string) { + if item := b.validatorsCache.Get(stateID); item != nil { + return + } + + b.validatorsPreloadChan <- stateID +} + +func (b *BeaconNode) getValidatorsClient(ctx context.Context) (client.ValidatorsProvider, error) { + if provider, isProvider := b.beacon.Service().(client.ValidatorsProvider); isProvider { + return provider, nil + } + + return nil, errors.New("validator states client not found") +} + +func (b *BeaconNode) DeleteValidatorsFromCache(stateID string) { + b.validatorsCache.Delete(stateID) +}