diff --git a/pkg/sentry/ethereum/services/duties.go b/pkg/sentry/ethereum/services/duties.go index c511fe98..bb950dfd 100644 --- a/pkg/sentry/ethereum/services/duties.go +++ b/pkg/sentry/ethereum/services/duties.go @@ -30,6 +30,8 @@ type DutiesService struct { bootstrapped bool onReadyCallbacks []func(context.Context) error + + lastSyncState bool } func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *MetadataService) DutiesService { @@ -47,13 +49,15 @@ func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *Met metadata: metadata, bootstrapped: false, + + lastSyncState: false, } } func (m *DutiesService) Start(ctx context.Context) error { go func() { operation := func() error { - if err := m.fetchRequiredEpochDuties(ctx); err != nil { + if err := m.fetchRequiredEpochDuties(ctx, false); err != nil { return err } @@ -79,32 +83,46 @@ func (m *DutiesService) Start(ctx context.Context) error { }() m.metadata.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) { - if err := m.fetchRequiredEpochDuties(ctx); err != nil { + if err := m.fetchRequiredEpochDuties(ctx, true); err != nil { m.log.WithError(err).Warn("Failed to fetch required epoch duties") } + // Sleep for a bit to give the beacon node a chance to run its epoch transition. + // We don't really care about nice-to-have duties so the sleep here is fine. + // "Required" duties (aka the current epoch) will be refetched the moment that epoch + // starts. + time.Sleep(15 * time.Second) + //nolint:errcheck // We don't care about the error here m.fetchNiceToHaveEpochDuties(ctx) }) m.beacon.OnChainReOrg(ctx, func(ctx context.Context, ev *v1.ChainReorgEvent) error { - // Clear the cache for the reorged epoch and all future epochs. - for _, epoch := range m.beaconCommittees.Keys() { - if epoch >= ev.Epoch { - m.log.WithFields(logrus.Fields{ - "epoch": epoch, - "event": ev, - }).Info("Clearing beacon committee after reorg event") - - if err := m.fetchBeaconCommittee(ctx, epoch, true); err != nil { - m.log.WithError(err).WithFields(logrus.Fields{ - "epoch": epoch, - "event": ev, - }).Error("Failed to fetch new beacon committee after reorg") - } + m.log.Info("Chain reorg detected - refetching beacon committees") + + if err := m.fetchRequiredEpochDuties(ctx, true); err != nil { + m.log.WithError(err).Warn("Failed to fetch required epoch duties") + } + + return nil + }) + + m.beacon.OnSyncStatus(ctx, func(ctx context.Context, ev *beacon.SyncStatusEvent) error { + if ev.State.IsSyncing != m.lastSyncState { + m.log.WithFields(logrus.Fields{ + "is_syncing": ev.State.IsSyncing, + }).Info("Sync status changed - refetching beacon committees") + + if err := m.fetchRequiredEpochDuties(ctx, true); err != nil { + m.log. + WithError(err). + WithField("is_syncing", ev.State.IsSyncing). + Warn("Failed to fetch required epoch duties after a sync status change") } } + m.lastSyncState = ev.State.IsSyncing + return nil }) @@ -150,9 +168,6 @@ func (m *DutiesService) NiceToHaveEpochDuties(ctx context.Context) []phase0.Epoc epochs := []phase0.Epoch{ phase0.Epoch(epochNumber - 1), - phase0.Epoch(epochNumber - 2), - phase0.Epoch(epochNumber - 3), - phase0.Epoch(epochNumber + 1), } @@ -181,14 +196,14 @@ func (m *DutiesService) Ready(ctx context.Context) error { return nil } -func (m *DutiesService) fetchRequiredEpochDuties(ctx context.Context) error { +func (m *DutiesService) fetchRequiredEpochDuties(ctx context.Context, overrideCache ...bool) error { if m.metadata.Wallclock() == nil { return fmt.Errorf("metadata service is not ready") } for _, epoch := range m.RequiredEpochDuties(ctx) { - if duties := m.beaconCommittees.Get(epoch); duties == nil { - if err := m.fetchBeaconCommittee(ctx, epoch); err != nil { + if duties := m.beaconCommittees.Get(epoch); duties == nil || len(overrideCache) != 0 && overrideCache[0] { + if err := m.fetchBeaconCommittee(ctx, epoch, overrideCache...); err != nil { return err } } @@ -221,8 +236,8 @@ func (m *DutiesService) fireOnBeaconCommitteeSubscriptions(epoch phase0.Epoch, c } } -func (m *DutiesService) fetchBeaconCommittee(ctx context.Context, epoch phase0.Epoch, skipCache ...bool) error { - if len(skipCache) != 0 && !skipCache[0] { +func (m *DutiesService) fetchBeaconCommittee(ctx context.Context, epoch phase0.Epoch, overrideCache ...bool) error { + if len(overrideCache) != 0 && !overrideCache[0] { if duties := m.beaconCommittees.Get(epoch); duties != nil { return nil } @@ -231,6 +246,8 @@ func (m *DutiesService) fetchBeaconCommittee(ctx context.Context, epoch phase0.E m.mu.Lock() defer m.mu.Unlock() + m.log.WithField("epoch", epoch).WithField("override_cache", overrideCache).Debug("Fetching beacon committee") + committees, err := m.beacon.FetchBeaconCommittees(ctx, "head", epoch) if err != nil { m.log.WithError(err).Error("Failed to fetch beacon committees") diff --git a/pkg/sentry/ethereum/services/metadata.go b/pkg/sentry/ethereum/services/metadata.go index d7b9e868..aa448eb9 100644 --- a/pkg/sentry/ethereum/services/metadata.go +++ b/pkg/sentry/ethereum/services/metadata.go @@ -110,6 +110,10 @@ func (m *MetadataService) Ready(ctx context.Context) error { return errors.New("network name is not available") } + if m.wallclock == nil { + return errors.New("wallclock is not available") + } + return nil }