Skip to content

Commit

Permalink
indexserver: index new repos (#212)
Browse files Browse the repository at this point in the history
We only calculate options of repositories that have changed since our
last check. However, if we are informed of a new repository and the
repository hasn't changed, we will never fetch its options. This can
happen if our cluster grows or shrinks.

This commit expands the interface to have a "force" option to fetch
options. I don't like this design, and will address it in follow-up
commits. However, for now I'd like to fix the bug.

I think the reason this is becoming complicated is the state stored by
"IterateIndexOptions" around search configuration fingerprint. I think a
better solution is to add an API call to calculate a new fingerprint and
then make the setting/updating of the fingerprint more explicit.
  • Loading branch information
keegancsmith authored Nov 12, 2021
1 parent 8ea6c20 commit 3c5cea3
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 7 deletions.
7 changes: 5 additions & 2 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,11 @@ func (s *Server) Run() {
// IterateIndexOptions will only iterate over repositories that have
// changed since we last called list. However, we want to add all IDs
// back onto the queue just to check that what is on disk is still
// correct. This will use the last IndexOptions we stored in the queue.
s.queue.Bump(repos.IDs)
// correct. This will use the last IndexOptions we stored in the
// queue. The repositories not on the queue (missing) need a forced
// fetch of IndexOptions.
missing := s.queue.Bump(repos.IDs)
s.Sourcegraph.ForceIterateIndexOptions(s.queue.AddOrUpdate, missing...)

<-cleanupDone
}
Expand Down
12 changes: 9 additions & 3 deletions cmd/zoekt-sourcegraph-indexserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,31 @@ func (q *Queue) AddOrUpdate(opts IndexOptions) {
}

// Bump will take any repository in ids which is not on the queue and
// re-insert it with the last known IndexOptions.
func (q *Queue) Bump(ids []uint32) {
// re-insert it with the last known IndexOptions. Bump returns ids that are
// unknown to the queue.
func (q *Queue) Bump(ids []uint32) []uint32 {
q.mu.Lock()
defer q.mu.Unlock()

if q.items == nil {
q.init()
}

var missing []uint32
for _, id := range ids {
item, ok := q.items[id]
if ok && item.heapIdx < 0 {
if !ok {
missing = append(missing, id)
} else if item.heapIdx < 0 {
q.seq++
item.seq = q.seq
heap.Push(&q.pq, item)
metricQueueLen.Set(float64(len(q.pq)))
metricQueueCap.Set(float64(len(q.items)))
}
}

return missing
}

// Iterate will call f on each item known to the queue, including items that
Expand Down
7 changes: 5 additions & 2 deletions cmd/zoekt-sourcegraph-indexserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func TestQueue_Bump(t *testing.T) {
}

// Bump 2 and 3. 3 doesn't exist, so only 2 should exist.
queue.Bump([]uint32{2, 3})
missing := queue.Bump([]uint32{2, 3})
if d := cmp.Diff([]uint32{3}, missing); d != "" {
t.Errorf("unexpected missing (-want, +got):\n%s", d)
}

want := []IndexOptions{{RepoID: 2, Name: "bar"}}
var got []IndexOptions
Expand All @@ -113,7 +116,7 @@ func TestQueue_Bump(t *testing.T) {
}

if d := cmp.Diff(want, got); d != "" {
t.Fatalf("(-want, +got):\n%s", d)
t.Errorf("unexpected items bumped into the queue (-want, +got):\n%s", d)
}
}

Expand Down
33 changes: 33 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/sg.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,21 @@ type SourcegraphListResult struct {

// IterateIndexOptions best effort resolves the IndexOptions for RepoIDs. If
// any repository fails it internally logs.
//
// Note: this has a side-effect of setting a the "config fingerprint". The
// config fingerprint means we only calculate index options for repositories
// that have changed since the last call to IterateIndexOptions. If you want
// to force calculation of index options use Sourcegraph.GetIndexOptions.
IterateIndexOptions func(func(IndexOptions))
}

type Sourcegraph interface {
List(ctx context.Context, indexed []uint32) (*SourcegraphListResult, error)

// ForceIterateIndexOptions will best-effort calculate the index options for
// all of ids. If any repository fails it internally logs.
ForceIterateIndexOptions(func(IndexOptions), ...uint32)

// GetIndexOptions is deprecated but kept around until we improve our
// forceIndex code.
GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error)
Expand Down Expand Up @@ -133,6 +142,18 @@ func (s *sourcegraphClient) List(ctx context.Context, indexed []uint32) (*Source
}, nil
}

func (s *sourcegraphClient) ForceIterateIndexOptions(f func(IndexOptions), repos ...uint32) {
opts, err := s.GetIndexOptions(repos...)
if err != nil {
return
}
for _, o := range opts {
if o.Error == "" {
f(o.IndexOptions)
}
}
}

// indexOptionsItem wraps IndexOptions to also include an error returned by
// the API.
type indexOptionsItem struct {
Expand Down Expand Up @@ -272,6 +293,18 @@ func (sf sourcegraphFake) List(ctx context.Context, indexed []uint32) (*Sourcegr
}, nil
}

func (sf sourcegraphFake) ForceIterateIndexOptions(f func(IndexOptions), repos ...uint32) {
opts, err := sf.GetIndexOptions(repos...)
if err != nil {
return
}
for _, o := range opts {
if o.Error == "" {
f(o.IndexOptions)
}
}
}

func (sf sourcegraphFake) GetIndexOptions(repos ...uint32) ([]indexOptionsItem, error) {
reposIdx := map[uint32]int{}
for i, id := range repos {
Expand Down

0 comments on commit 3c5cea3

Please sign in to comment.