From 2cb02139edd7d19b43215537ef8423fb957b7cfe Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Thu, 17 Nov 2022 14:36:58 -0800 Subject: [PATCH 01/11] indexserver: add debug endpoint for deleting repository shards --- cmd/zoekt-sourcegraph-indexserver/debug.go | 4 + cmd/zoekt-sourcegraph-indexserver/main.go | 100 +++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/cmd/zoekt-sourcegraph-indexserver/debug.go b/cmd/zoekt-sourcegraph-indexserver/debug.go index c407e6a37..b27f00230 100644 --- a/cmd/zoekt-sourcegraph-indexserver/debug.go +++ b/cmd/zoekt-sourcegraph-indexserver/debug.go @@ -95,6 +95,10 @@ func debugCmd() *ffcli.Command { "wget -q -O - http://localhost:6072/metrics -sS | grep index_shard_merging_running". It is only possible to trigger one merge operation at a time. + wget -q -O - http://localhost:6072/debug/delete?id=[REPOSITORY_ID] + delete all of the shards associated with the given repository id. You can find the id associated with a + repository via the "/debug/indexed" route. + wget -q -O - http://localhost:6072/debug/queue list the repositories in the indexing queue, sorted by descending priority. diff --git a/cmd/zoekt-sourcegraph-indexserver/main.go b/cmd/zoekt-sourcegraph-indexserver/main.go index 23b521591..0eaf96248 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main.go +++ b/cmd/zoekt-sourcegraph-indexserver/main.go @@ -646,6 +646,7 @@ func (s *Server) addDebugHandlers(mux *http.ServeMux) { // on "/". mux.Handle("/", http.HandlerFunc(s.handleReIndex)) + mux.Handle("/debug/delete", http.HandlerFunc(s.handleDebugDelete)) mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) @@ -757,6 +758,105 @@ func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { } } +func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) { + rawID := r.URL.Query().Get("id") + if rawID == "" { + http.Error(w, "URL parameter 'id' must be specified", http.StatusBadRequest) + return + } + + id64, err := strconv.ParseUint(rawID, 10, 32) + if err != nil { + http.Error(w, fmt.Sprintf("failed to parse repository id %q as uint32: %s", rawID, err), http.StatusBadRequest) + return + } + + repoID := uint32(id64) + + s.queue.mu.Lock() + defer s.queue.mu.Unlock() + + item := s.queue.get(repoID) + if item == nil { + http.Error(w, fmt.Sprintf("no repository found for id %q", rawID), http.StatusBadRequest) + return + } + + var deletionError error + + repoName := item.opts.Name + s.muIndexDir.With(repoName, func() { + o := s.indexArgs(item.opts).BuildOptions() + deletionError = deleteShards(o) + }) + + if deletionError != nil { + http.Error(w, fmt.Sprintf("while deleting shards for repository id %q: %s", rawID, deletionError), http.StatusInternalServerError) + return + } +} + +// deleteShards deletes all the shards that are associated with the repository specified +// in the build options. +// +// Users must hold the indexDir lock for this repository before calling deleteShards. +func deleteShards(options *build.Options) error { + shardPaths := options.FindAllShards() + + // Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic + // works correctly. + // + // Example: - repoA_v16.00002.zoekt + // - repoA_v16.00001.zoekt + // - repoA_v16.00000.zoekt + // + // zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard + // is present (repoA_v16.00000.zoekt). If it's present, then it gathers all rest of the shards names in ascending order + // (...00001.zoekt, ...00002.zoekt). If it's missing, then zoekt assumes that it never indexed "repoA". + // + // If this function were to crash while deleting repoA and we only deleted the 0th shard, then shard's 1 & 2 would never + // be cleaned up by Zoekt indexserver (since the 0th shard is the only shard that's tested). + // + // Ensuring that we delete shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent + // state behind even if we crash. + + sort.Slice(shardPaths, func(i, j int) bool { + return shardPaths[i] > shardPaths[j] + }) + + for _, shard := range shardPaths { + // Is this repository inside a compound shard? If so, set a tombstone + // instead of deleting the shard outright. + if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(shard), "compound-") { + if !strings.HasSuffix(shard, ".zoekt") { + continue + } + + err := zoekt.SetTombstone(shard, options.RepositoryDescription.ID) + if err != nil { + return fmt.Errorf("setting tombstone in shard %q: %w", shard, err) + } + + continue + } + + metaFile := shard + ".meta" + if _, err := os.Stat(metaFile); err == nil { + err := os.Remove(metaFile) + if err != nil { + return fmt.Errorf("deleting metadata file %q: %w", metaFile, err) + } + } + + err := os.Remove(shard) + if err != nil { + return fmt.Errorf("deleting shard %q: %w", shard, err) + } + } + + return nil +} + // handleDebugMerge triggers a merge even if shard merging is not enabled. Users // can run this command during periods of low usage (evenings, weekends) to // trigger an initial merge run. In the steady-state, merges happen rarely, even From 76a18b65187bc179bd12bc35b2cb8f5fd2f02b51 Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Fri, 18 Nov 2022 17:48:07 -0800 Subject: [PATCH 02/11] add tests --- cmd/zoekt-sourcegraph-indexserver/main.go | 25 +- .../main_test.go | 233 +++++++++++++++++- 2 files changed, 248 insertions(+), 10 deletions(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/main.go b/cmd/zoekt-sourcegraph-indexserver/main.go index 0eaf96248..5bb1fe7a1 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main.go +++ b/cmd/zoekt-sourcegraph-indexserver/main.go @@ -6,10 +6,12 @@ import ( "bytes" "context" "encoding/json" + "errors" "flag" "fmt" "html/template" "io" + "io/fs" "log" "math" "math/rand" @@ -814,10 +816,10 @@ func deleteShards(options *build.Options) error { // is present (repoA_v16.00000.zoekt). If it's present, then it gathers all rest of the shards names in ascending order // (...00001.zoekt, ...00002.zoekt). If it's missing, then zoekt assumes that it never indexed "repoA". // - // If this function were to crash while deleting repoA and we only deleted the 0th shard, then shard's 1 & 2 would never + // If this function were to crash while deleting repoA, and we only deleted the 0th shard, then shard's 1 & 2 would never // be cleaned up by Zoekt indexserver (since the 0th shard is the only shard that's tested). // - // Ensuring that we delete shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent + // Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent // state behind even if we crash. sort.Slice(shardPaths, func(i, j int) bool { @@ -840,17 +842,24 @@ func deleteShards(options *build.Options) error { continue } + err := os.Remove(shard) + if err != nil { + return fmt.Errorf("deleting shard %q: %w", shard, err) + } + + // remove the metadata file associated with the shard (if any) metaFile := shard + ".meta" - if _, err := os.Stat(metaFile); err == nil { - err := os.Remove(metaFile) - if err != nil { - return fmt.Errorf("deleting metadata file %q: %w", metaFile, err) + if _, err := os.Stat(metaFile); err != nil { + if errors.Is(err, fs.ErrNotExist) { + continue } + + return fmt.Errorf("'stat'ing metadata file %q: %w", metaFile, err) } - err := os.Remove(shard) + err = os.Remove(metaFile) if err != nil { - return fmt.Errorf("deleting shard %q: %w", shard, err) + return fmt.Errorf("deleting metadata file %q: %w", metaFile, err) } } diff --git a/cmd/zoekt-sourcegraph-indexserver/main_test.go b/cmd/zoekt-sourcegraph-indexserver/main_test.go index 5ec4f2481..e52d962e9 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main_test.go +++ b/cmd/zoekt-sourcegraph-indexserver/main_test.go @@ -4,18 +4,23 @@ import ( "context" "flag" "fmt" - sglog "github.com/sourcegraph/log" - "github.com/sourcegraph/log/logtest" "io" "log" "net/http" "net/http/httptest" "net/url" "os" + "path/filepath" + "sort" + "strconv" "strings" "testing" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + sglog "github.com/sourcegraph/log" + "github.com/sourcegraph/log/logtest" + "github.com/sourcegraph/zoekt/build" "github.com/sourcegraph/zoekt" ) @@ -88,6 +93,127 @@ func TestListRepoIDs(t *testing.T) { } } +func TestDeleteShards(t *testing.T) { + remainingRepoA := zoekt.Repository{ID: 1, Name: "A"} + remainingRepoB := zoekt.Repository{ID: 2, Name: "B"} + repositoryToDelete := zoekt.Repository{ID: 99, Name: "DELETE_ME"} + + t.Run("delete repository from set of normal shards", func(t *testing.T) { + indexDir := t.TempDir() + + // map of repoID -> list of associated shard paths + metadata paths + shardMap := make(map[uint32][]string) + + // setup: create shards for each repository, and populate the shard map + for _, r := range []zoekt.Repository{ + remainingRepoA, + remainingRepoB, + repositoryToDelete, + } { + shards := createTestNormalShard(t, indexDir, r, 3) + + for _, shard := range shards { + // create stub meta file + metaFile := shard + ".meta" + f, err := os.Create(metaFile) + if err != nil { + t.Fatalf("creating metadata file %q: %s", metaFile, err) + } + + f.Close() + + shardMap[r.ID] = append(shardMap[r.ID], shard, metaFile) + } + } + + // run test: delete repository + options := &build.Options{ + IndexDir: indexDir, + RepositoryDescription: repositoryToDelete, + } + options.SetDefaults() + + err := deleteShards(options) + if err != nil { + t.Errorf("unexpected error when deleting shards: %s", err) + } + + // run assertions: gather all the shards + meta files that remain and + // check to see that only the files associated with the "remaining" repositories + // are present + var actualShardFiles []string + + for _, pattern := range []string{"*.zoekt", "*.meta"} { + files, err := filepath.Glob(filepath.Join(indexDir, pattern)) + if err != nil { + t.Fatalf("globbing indexDir: %s", err) + } + + actualShardFiles = append(actualShardFiles, files...) + } + + var expectedShardFiles []string + expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoA.ID]...) + expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoB.ID]...) + + sort.Strings(actualShardFiles) + sort.Strings(expectedShardFiles) + + if diff := cmp.Diff(expectedShardFiles, actualShardFiles); diff != "" { + t.Errorf("unexpected diff in list of shard files (-want +got):\n%s", diff) + } + }) + + t.Run("delete repository from compound shard", func(t *testing.T) { + indexDir := t.TempDir() + + // setup: enable shard merging for compound shards + t.Setenv("SRC_ENABLE_SHARD_MERGING", "1") + + // setup: create compound shard with all repositories + repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete} + shard := createTestCompoundShard(t, indexDir, repositories) + + // run test: delete repository + options := &build.Options{ + IndexDir: indexDir, + RepositoryDescription: repositoryToDelete, + } + options.SetDefaults() + + err := deleteShards(options) + if err != nil { + t.Errorf("unexpected error when deleting shards: %s", err) + } + + // verify: read the compound shard, and ensure that only + // the repositories that we expect are in the shard (and the deleted one has been tombstoned) + actualRepositories, _, err := zoekt.ReadMetadataPathAlive(shard) + if err != nil { + t.Fatalf("reading repository metadata from shard: %s", err) + } + + expectedRepositories := []*zoekt.Repository{&remainingRepoA, &remainingRepoB} + + sort.Slice(actualRepositories, func(i, j int) bool { + return actualRepositories[i].ID < actualRepositories[j].ID + }) + + sort.Slice(expectedRepositories, func(i, j int) bool { + return expectedRepositories[i].ID < expectedRepositories[j].ID + }) + + opts := []cmp.Option{ + cmpopts.IgnoreUnexported(zoekt.Repository{}), + cmpopts.IgnoreFields(zoekt.Repository{}, "IndexOptions", "HasSymbols"), + cmpopts.EquateEmpty(), + } + if diff := cmp.Diff(expectedRepositories, actualRepositories, opts...); diff != "" { + t.Errorf("unexpected diff in list of repositories (-want +got):\n%s", diff) + } + }) +} + func TestListRepoIDs_Error(t *testing.T) { msg := "deadbeaf deadbeaf" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -124,6 +250,109 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func createTestNormalShard(t *testing.T, indexDir string, r zoekt.Repository, numShards int, optFns ...func(options *build.Options)) []string { + t.Helper() + + if err := os.MkdirAll(filepath.Dir(indexDir), 0700); err != nil { + t.Fatal(err) + } + + o := build.Options{ + IndexDir: indexDir, + RepositoryDescription: r, + ShardMax: 75, // create a new shard every 75 bytes + } + o.SetDefaults() + + for _, fn := range optFns { + fn(&o) + } + + b, err := build.NewBuilder(o) + if err != nil { + t.Fatalf("NewBuilder: %v", err) + } + + if numShards == 0 { + // We have to make at least 1 shard. + numShards = 1 + } + + for i := 0; i < numShards; i++ { + // Create entries (file + contents) that are ~100 bytes each. + // This (along with our shardMax setting of 75 bytes) means that each shard + // will contain at most one of these. + fileName := strconv.Itoa(i) + document := zoekt.Document{Name: fileName, Content: []byte(strings.Repeat("A", 100))} + for _, branch := range o.RepositoryDescription.Branches { + document.Branches = append(document.Branches, branch.Name) + } + + err := b.Add(document) + if err != nil { + t.Fatalf("failed to add file %q to builder: %s", fileName, err) + } + } + + if err := b.Finish(); err != nil { + t.Fatalf("Finish: %v", err) + } + + return o.FindAllShards() +} + +func createTestCompoundShard(t *testing.T, indexDir string, repositories []zoekt.Repository, optFns ...func(options *build.Options)) string { + t.Helper() + + var shardNames []string + + for _, r := range repositories { + // create an isolated scratch space to store normal shards for this repository + scratchDir := t.TempDir() + + // create shards that'll be merged later + createTestNormalShard(t, scratchDir, r, 1, optFns...) + + // discover file names for all the normal shards we created + // note: this only looks in the immediate 'scratchDir' folder and doesn't recurse + shards, err := filepath.Glob(filepath.Join(scratchDir, "*.zoekt")) + if err != nil { + t.Fatalf("while globbing %q to find normal shards: %s", scratchDir, err) + } + + shardNames = append(shardNames, shards...) + } + + // load the normal shards that we created + var files []zoekt.IndexFile + for _, shard := range shardNames { + f, err := os.Open(shard) + if err != nil { + t.Fatalf("opening shard file: %s", err) + } + defer f.Close() + + indexFile, err := zoekt.NewIndexFile(f) + if err != nil { + t.Fatalf("creating index file: %s", err) + } + defer indexFile.Close() + + files = append(files, indexFile) + } + + // merge all the simple shards into a compound shard + tmpName, dstName, err := zoekt.Merge(indexDir, files...) + if err != nil { + t.Fatalf("merging index files into compound shard: %s", err) + } + if err := os.Rename(tmpName, dstName); err != nil { + t.Fatal(err) + } + + return dstName +} + func TestCreateEmptyShard(t *testing.T) { dir := t.TempDir() From 828806c8500069b65fdf5f78a32ca78a62dc67b8 Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Fri, 18 Nov 2022 17:53:59 -0800 Subject: [PATCH 03/11] provide example shell pipeline in documentation --- cmd/zoekt-sourcegraph-indexserver/debug.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/debug.go b/cmd/zoekt-sourcegraph-indexserver/debug.go index b27f00230..512bed753 100644 --- a/cmd/zoekt-sourcegraph-indexserver/debug.go +++ b/cmd/zoekt-sourcegraph-indexserver/debug.go @@ -96,8 +96,13 @@ func debugCmd() *ffcli.Command { to trigger one merge operation at a time. wget -q -O - http://localhost:6072/debug/delete?id=[REPOSITORY_ID] - delete all of the shards associated with the given repository id. You can find the id associated with a - repository via the "/debug/indexed" route. + delete all of the shards associated with the given repository id. + + You can find the id associated with a repository via the "/debug/indexed" route. + If you need to delete multiple repositories at once, you can create a small shell pipeline. See the following example + (that removes the first listed repository from the ""/debug/indexed" route for inspiration): + + > wget -q -O - http://localhost:6072/debug/indexed | awk '{print $1}' | tail -n +2 | head -n 1 | xargs -I {} -- wget -q -O - "http://localhost:6072/debug/delete?id={}" wget -q -O - http://localhost:6072/debug/queue list the repositories in the indexing queue, sorted by descending priority. From e53fadaec5e674fafc956594f600e4729b015ec4 Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Mon, 28 Nov 2022 10:52:39 -0800 Subject: [PATCH 04/11] move logic to cleanup.go --- cmd/zoekt-sourcegraph-indexserver/cleanup.go | 71 ++++++++++ .../cleanup_test.go | 121 +++++++++++++++++ cmd/zoekt-sourcegraph-indexserver/main.go | 104 +++------------ .../main_test.go | 123 ------------------ 4 files changed, 209 insertions(+), 210 deletions(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup.go b/cmd/zoekt-sourcegraph-indexserver/cleanup.go index ef01a4f62..abe28e367 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup.go @@ -1,7 +1,9 @@ package main import ( + "errors" "fmt" + "io/fs" "log" "os" "os/exec" @@ -13,6 +15,7 @@ import ( "github.com/grafana/regexp" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/sourcegraph/zoekt/build" "gopkg.in/natefinch/lumberjack.v2" "github.com/sourcegraph/zoekt" @@ -515,3 +518,71 @@ func removeTombstones(fn string) ([]*zoekt.Repository, error) { } return tombstones, nil } + +// deleteShards deletes all the shards that are associated with the repository specified +// in the build options. +// +// Users must hold the indexDir lock for this repository before calling deleteShards. +func deleteShards(options *build.Options) error { + shardPaths := options.FindAllShards() + + // Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic + // works correctly. + // + // Example: - repoA_v16.00002.zoekt + // - repoA_v16.00001.zoekt + // - repoA_v16.00000.zoekt + // + // zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard + // is present (repoA_v16.00000.zoekt). If it's present, then it gathers all rest of the shards names in ascending order + // (...00001.zoekt, ...00002.zoekt). If it's missing, then zoekt assumes that it never indexed "repoA". + // + // If this function were to crash while deleting repoA, and we only deleted the 0th shard, then shard's 1 & 2 would never + // be cleaned up by Zoekt indexserver (since the 0th shard is the only shard that's tested). + // + // Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent + // state behind even if we crash. + + sort.Slice(shardPaths, func(i, j int) bool { + return shardPaths[i] > shardPaths[j] + }) + + for _, shard := range shardPaths { + // Is this repository inside a compound shard? If so, set a tombstone + // instead of deleting the shard outright. + if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(shard), "compound-") { + if !strings.HasSuffix(shard, ".zoekt") { + continue + } + + err := zoekt.SetTombstone(shard, options.RepositoryDescription.ID) + if err != nil { + return fmt.Errorf("setting tombstone in shard %q: %w", shard, err) + } + + continue + } + + err := os.Remove(shard) + if err != nil { + return fmt.Errorf("deleting shard %q: %w", shard, err) + } + + // remove the metadata file associated with the shard (if any) + metaFile := shard + ".meta" + if _, err := os.Stat(metaFile); err != nil { + if errors.Is(err, fs.ErrNotExist) { + continue + } + + return fmt.Errorf("'stat'ing metadata file %q: %w", metaFile, err) + } + + err = os.Remove(metaFile) + if err != nil { + return fmt.Errorf("deleting metadata file %q: %w", metaFile, err) + } + } + + return nil +} diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go index f6e532804..90c7cad13 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go @@ -455,6 +455,127 @@ func TestCleanupCompoundShards(t *testing.T) { } } +func TestDeleteShards(t *testing.T) { + remainingRepoA := zoekt.Repository{ID: 1, Name: "A"} + remainingRepoB := zoekt.Repository{ID: 2, Name: "B"} + repositoryToDelete := zoekt.Repository{ID: 99, Name: "DELETE_ME"} + + t.Run("delete repository from set of normal shards", func(t *testing.T) { + indexDir := t.TempDir() + + // map of repoID -> list of associated shard paths + metadata paths + shardMap := make(map[uint32][]string) + + // setup: create shards for each repository, and populate the shard map + for _, r := range []zoekt.Repository{ + remainingRepoA, + remainingRepoB, + repositoryToDelete, + } { + shards := createTestNormalShard(t, indexDir, r, 3) + + for _, shard := range shards { + // create stub meta file + metaFile := shard + ".meta" + f, err := os.Create(metaFile) + if err != nil { + t.Fatalf("creating metadata file %q: %s", metaFile, err) + } + + f.Close() + + shardMap[r.ID] = append(shardMap[r.ID], shard, metaFile) + } + } + + // run test: delete repository + options := &build.Options{ + IndexDir: indexDir, + RepositoryDescription: repositoryToDelete, + } + options.SetDefaults() + + err := deleteShards(options) + if err != nil { + t.Errorf("unexpected error when deleting shards: %s", err) + } + + // run assertions: gather all the shards + meta files that remain and + // check to see that only the files associated with the "remaining" repositories + // are present + var actualShardFiles []string + + for _, pattern := range []string{"*.zoekt", "*.meta"} { + files, err := filepath.Glob(filepath.Join(indexDir, pattern)) + if err != nil { + t.Fatalf("globbing indexDir: %s", err) + } + + actualShardFiles = append(actualShardFiles, files...) + } + + var expectedShardFiles []string + expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoA.ID]...) + expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoB.ID]...) + + sort.Strings(actualShardFiles) + sort.Strings(expectedShardFiles) + + if diff := cmp.Diff(expectedShardFiles, actualShardFiles); diff != "" { + t.Errorf("unexpected diff in list of shard files (-want +got):\n%s", diff) + } + }) + + t.Run("delete repository from compound shard", func(t *testing.T) { + indexDir := t.TempDir() + + // setup: enable shard merging for compound shards + t.Setenv("SRC_ENABLE_SHARD_MERGING", "1") + + // setup: create compound shard with all repositories + repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete} + shard := createTestCompoundShard(t, indexDir, repositories) + + // run test: delete repository + options := &build.Options{ + IndexDir: indexDir, + RepositoryDescription: repositoryToDelete, + } + options.SetDefaults() + + err := deleteShards(options) + if err != nil { + t.Errorf("unexpected error when deleting shards: %s", err) + } + + // verify: read the compound shard, and ensure that only + // the repositories that we expect are in the shard (and the deleted one has been tombstoned) + actualRepositories, _, err := zoekt.ReadMetadataPathAlive(shard) + if err != nil { + t.Fatalf("reading repository metadata from shard: %s", err) + } + + expectedRepositories := []*zoekt.Repository{&remainingRepoA, &remainingRepoB} + + sort.Slice(actualRepositories, func(i, j int) bool { + return actualRepositories[i].ID < actualRepositories[j].ID + }) + + sort.Slice(expectedRepositories, func(i, j int) bool { + return expectedRepositories[i].ID < expectedRepositories[j].ID + }) + + opts := []cmp.Option{ + cmpopts.IgnoreUnexported(zoekt.Repository{}), + cmpopts.IgnoreFields(zoekt.Repository{}, "IndexOptions", "HasSymbols"), + cmpopts.EquateEmpty(), + } + if diff := cmp.Diff(expectedRepositories, actualRepositories, opts...); diff != "" { + t.Errorf("unexpected diff in list of repositories (-want +got):\n%s", diff) + } + }) +} + // createCompoundShard returns a path to a compound shard containing repos with // ids. Use optsFns to overwrite fields of zoekt.Repository for all repos. func createCompoundShard(t *testing.T, dir string, ids []uint32, optFns ...func(in *zoekt.Repository)) string { diff --git a/cmd/zoekt-sourcegraph-indexserver/main.go b/cmd/zoekt-sourcegraph-indexserver/main.go index 5bb1fe7a1..446904eba 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main.go +++ b/cmd/zoekt-sourcegraph-indexserver/main.go @@ -6,12 +6,10 @@ import ( "bytes" "context" "encoding/json" - "errors" "flag" "fmt" "html/template" "io" - "io/fs" "log" "math" "math/rand" @@ -760,6 +758,23 @@ func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) { } } +// handleDebugMerge triggers a merge even if shard merging is not enabled. Users +// can run this command during periods of low usage (evenings, weekends) to +// trigger an initial merge run. In the steady-state, merges happen rarely, even +// on busy instances, and users can rely on automatic merging instead. +func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { + + // A merge operation can take very long, depending on the number merges and the + // target size of the compound shards. We run the merge in the background and + // return immediately to the user. + // + // We track the status of the merge with metricShardMergingRunning. + go func() { + s.doMerge() + }() + _, _ = w.Write([]byte("merging enqueued\n")) +} + func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) { rawID := r.URL.Query().Get("id") if rawID == "" { @@ -798,91 +813,6 @@ func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) { } } -// deleteShards deletes all the shards that are associated with the repository specified -// in the build options. -// -// Users must hold the indexDir lock for this repository before calling deleteShards. -func deleteShards(options *build.Options) error { - shardPaths := options.FindAllShards() - - // Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic - // works correctly. - // - // Example: - repoA_v16.00002.zoekt - // - repoA_v16.00001.zoekt - // - repoA_v16.00000.zoekt - // - // zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard - // is present (repoA_v16.00000.zoekt). If it's present, then it gathers all rest of the shards names in ascending order - // (...00001.zoekt, ...00002.zoekt). If it's missing, then zoekt assumes that it never indexed "repoA". - // - // If this function were to crash while deleting repoA, and we only deleted the 0th shard, then shard's 1 & 2 would never - // be cleaned up by Zoekt indexserver (since the 0th shard is the only shard that's tested). - // - // Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent - // state behind even if we crash. - - sort.Slice(shardPaths, func(i, j int) bool { - return shardPaths[i] > shardPaths[j] - }) - - for _, shard := range shardPaths { - // Is this repository inside a compound shard? If so, set a tombstone - // instead of deleting the shard outright. - if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(shard), "compound-") { - if !strings.HasSuffix(shard, ".zoekt") { - continue - } - - err := zoekt.SetTombstone(shard, options.RepositoryDescription.ID) - if err != nil { - return fmt.Errorf("setting tombstone in shard %q: %w", shard, err) - } - - continue - } - - err := os.Remove(shard) - if err != nil { - return fmt.Errorf("deleting shard %q: %w", shard, err) - } - - // remove the metadata file associated with the shard (if any) - metaFile := shard + ".meta" - if _, err := os.Stat(metaFile); err != nil { - if errors.Is(err, fs.ErrNotExist) { - continue - } - - return fmt.Errorf("'stat'ing metadata file %q: %w", metaFile, err) - } - - err = os.Remove(metaFile) - if err != nil { - return fmt.Errorf("deleting metadata file %q: %w", metaFile, err) - } - } - - return nil -} - -// handleDebugMerge triggers a merge even if shard merging is not enabled. Users -// can run this command during periods of low usage (evenings, weekends) to -// trigger an initial merge run. In the steady-state, merges happen rarely, even -// on busy instances, and users can rely on automatic merging instead. -func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { - - // A merge operation can take very long, depending on the number merges and the - // target size of the compound shards. We run the merge in the background and - // return immediately to the user. - // - // We track the status of the merge with metricShardMergingRunning. - go func() { - s.doMerge() - }() - _, _ = w.Write([]byte("merging enqueued\n")) -} - func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { indexed := listIndexed(s.IndexDir) diff --git a/cmd/zoekt-sourcegraph-indexserver/main_test.go b/cmd/zoekt-sourcegraph-indexserver/main_test.go index e52d962e9..77867ebb1 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main_test.go +++ b/cmd/zoekt-sourcegraph-indexserver/main_test.go @@ -11,13 +11,11 @@ import ( "net/url" "os" "path/filepath" - "sort" "strconv" "strings" "testing" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" sglog "github.com/sourcegraph/log" "github.com/sourcegraph/log/logtest" "github.com/sourcegraph/zoekt/build" @@ -93,127 +91,6 @@ func TestListRepoIDs(t *testing.T) { } } -func TestDeleteShards(t *testing.T) { - remainingRepoA := zoekt.Repository{ID: 1, Name: "A"} - remainingRepoB := zoekt.Repository{ID: 2, Name: "B"} - repositoryToDelete := zoekt.Repository{ID: 99, Name: "DELETE_ME"} - - t.Run("delete repository from set of normal shards", func(t *testing.T) { - indexDir := t.TempDir() - - // map of repoID -> list of associated shard paths + metadata paths - shardMap := make(map[uint32][]string) - - // setup: create shards for each repository, and populate the shard map - for _, r := range []zoekt.Repository{ - remainingRepoA, - remainingRepoB, - repositoryToDelete, - } { - shards := createTestNormalShard(t, indexDir, r, 3) - - for _, shard := range shards { - // create stub meta file - metaFile := shard + ".meta" - f, err := os.Create(metaFile) - if err != nil { - t.Fatalf("creating metadata file %q: %s", metaFile, err) - } - - f.Close() - - shardMap[r.ID] = append(shardMap[r.ID], shard, metaFile) - } - } - - // run test: delete repository - options := &build.Options{ - IndexDir: indexDir, - RepositoryDescription: repositoryToDelete, - } - options.SetDefaults() - - err := deleteShards(options) - if err != nil { - t.Errorf("unexpected error when deleting shards: %s", err) - } - - // run assertions: gather all the shards + meta files that remain and - // check to see that only the files associated with the "remaining" repositories - // are present - var actualShardFiles []string - - for _, pattern := range []string{"*.zoekt", "*.meta"} { - files, err := filepath.Glob(filepath.Join(indexDir, pattern)) - if err != nil { - t.Fatalf("globbing indexDir: %s", err) - } - - actualShardFiles = append(actualShardFiles, files...) - } - - var expectedShardFiles []string - expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoA.ID]...) - expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoB.ID]...) - - sort.Strings(actualShardFiles) - sort.Strings(expectedShardFiles) - - if diff := cmp.Diff(expectedShardFiles, actualShardFiles); diff != "" { - t.Errorf("unexpected diff in list of shard files (-want +got):\n%s", diff) - } - }) - - t.Run("delete repository from compound shard", func(t *testing.T) { - indexDir := t.TempDir() - - // setup: enable shard merging for compound shards - t.Setenv("SRC_ENABLE_SHARD_MERGING", "1") - - // setup: create compound shard with all repositories - repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete} - shard := createTestCompoundShard(t, indexDir, repositories) - - // run test: delete repository - options := &build.Options{ - IndexDir: indexDir, - RepositoryDescription: repositoryToDelete, - } - options.SetDefaults() - - err := deleteShards(options) - if err != nil { - t.Errorf("unexpected error when deleting shards: %s", err) - } - - // verify: read the compound shard, and ensure that only - // the repositories that we expect are in the shard (and the deleted one has been tombstoned) - actualRepositories, _, err := zoekt.ReadMetadataPathAlive(shard) - if err != nil { - t.Fatalf("reading repository metadata from shard: %s", err) - } - - expectedRepositories := []*zoekt.Repository{&remainingRepoA, &remainingRepoB} - - sort.Slice(actualRepositories, func(i, j int) bool { - return actualRepositories[i].ID < actualRepositories[j].ID - }) - - sort.Slice(expectedRepositories, func(i, j int) bool { - return expectedRepositories[i].ID < expectedRepositories[j].ID - }) - - opts := []cmp.Option{ - cmpopts.IgnoreUnexported(zoekt.Repository{}), - cmpopts.IgnoreFields(zoekt.Repository{}, "IndexOptions", "HasSymbols"), - cmpopts.EquateEmpty(), - } - if diff := cmp.Diff(expectedRepositories, actualRepositories, opts...); diff != "" { - t.Errorf("unexpected diff in list of repositories (-want +got):\n%s", diff) - } - }) -} - func TestListRepoIDs_Error(t *testing.T) { msg := "deadbeaf deadbeaf" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { From 1ab8eb7b90d5a6af911190728cc59c7ea1fd2a11 Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Mon, 28 Nov 2022 15:08:50 -0800 Subject: [PATCH 05/11] wip - add logic to cleanup() to handle stranded metadata files --- cmd/zoekt-sourcegraph-indexserver/cleanup.go | 52 ++++++++++---- .../cleanup_test.go | 70 ++++++++++++++++--- 2 files changed, 98 insertions(+), 24 deletions(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup.go b/cmd/zoekt-sourcegraph-indexserver/cleanup.go index abe28e367..b6e208bce 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup.go @@ -175,6 +175,38 @@ func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) } } + // remove any Zoekt metadata files in the given dir that don't have an + // associated shard file + metaFiles, err := filepath.Glob(filepath.Join(indexDir, "*.meta")) + if err != nil { + log.Printf("failed to glob %q for stranded metadata files: %s", indexDir, err) + } else { + for _, metaFile := range metaFiles { + shard := strings.TrimSuffix(metaFile, ".meta") + _, err := os.Stat(shard) + if err == nil { + // metadata file has associated shard + continue + } + + if !errors.Is(err, fs.ErrNotExist) { + log.Printf("failed to stat metadata file %q: %s", metaFile, err) + continue + } + + // metadata doesn't have an associated shard file, remove the metadata file + + err = os.Remove(metaFile) + if err != nil { + log.Printf("failed to remove stranded metadata file %q: %s", metaFile, err) + continue + } else { + log.Printf("removed stranded metadata file: %s", metaFile) + } + + } + } + metricCleanupDuration.Observe(time.Since(start).Seconds()) } @@ -563,24 +595,16 @@ func deleteShards(options *build.Options) error { continue } - err := os.Remove(shard) + paths, err := zoekt.IndexFilePaths(shard) if err != nil { - return fmt.Errorf("deleting shard %q: %w", shard, err) + return fmt.Errorf("listing files for shard %q: %w", shard, err) } - // remove the metadata file associated with the shard (if any) - metaFile := shard + ".meta" - if _, err := os.Stat(metaFile); err != nil { - if errors.Is(err, fs.ErrNotExist) { - continue + for _, p := range paths { + err := os.Remove(p) + if err != nil { + return fmt.Errorf("deleting %q: %w", p, err) } - - return fmt.Errorf("'stat'ing metadata file %q: %w", metaFile, err) - } - - err = os.Remove(metaFile) - if err != nil { - return fmt.Errorf("deleting metadata file %q: %w", metaFile, err) } } diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go index 90c7cad13..c65ac6b23 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go @@ -19,6 +19,7 @@ import ( ) func TestCleanup(t *testing.T) { + mk := func(name string, n int, mtime time.Time) shard { return shard{ RepoID: fakeID(name), @@ -28,6 +29,11 @@ func TestCleanup(t *testing.T) { RepoTombstone: false, } } + + mkMeta := func(name string, n int) string { + return fmt.Sprintf("%s_v%d.%05d.zoekt.meta", url.QueryEscape(name), 15, n) + } + // We don't use getShards so that we have two implementations of the same // thing (ie pick up bugs in one) glob := func(pattern string) []shard { @@ -56,14 +62,16 @@ func TestCleanup(t *testing.T) { recent := now.Add(-time.Hour) old := now.Add(-25 * time.Hour) cases := []struct { - name string - repos []string - index []shard - trash []shard - tmps []string - - wantIndex []shard - wantTrash []shard + name string + repos []string + indexMetaFiles []string + index []shard + trash []shard + tmps []string + + wantIndex []shard + wantIndexMetaFiles []string + wantTrash []shard }{{ name: "noop", }, { @@ -96,6 +104,13 @@ func TestCleanup(t *testing.T) { index: []shard{mk("foo", 0, recent), mk("bar", 0, recent)}, wantIndex: []shard{mk("foo", 0, recent)}, wantTrash: []shard{mk("bar", 0, now)}, + }, { + name: "remove metafiles with no associated shards", + repos: []string{"foo", "bar"}, + index: []shard{mk("foo", 0, recent), mk("bar", 0, recent)}, + indexMetaFiles: []string{mkMeta("foo", 0), mkMeta("foo", 1), mkMeta("bar", 0)}, + wantIndex: []shard{mk("foo", 0, recent), mk("bar", 0, recent)}, + wantIndexMetaFiles: []string{mkMeta("foo", 0), mkMeta("bar", 0)}, }, { name: "clean old .tmp files", tmps: []string{"recent.tmp", "old.tmp"}, @@ -134,6 +149,12 @@ func TestCleanup(t *testing.T) { t.Fatal(err) } } + for _, f := range tt.indexMetaFiles { + path := filepath.Join(dir, f) + if _, err := os.Create(path); err != nil { + t.Fatal(err) + } + } var repoIDs []uint32 for _, name := range tt.repos { @@ -141,12 +162,41 @@ func TestCleanup(t *testing.T) { } cleanup(dir, repoIDs, now, false) - if d := cmp.Diff(tt.wantIndex, glob(filepath.Join(dir, "*.zoekt"))); d != "" { + actualIndexShards := glob(filepath.Join(dir, "*.zoekt")) + + sort.Slice(actualIndexShards, func(i, j int) bool { + return actualIndexShards[i].Path < actualIndexShards[j].Path + }) + sort.Slice(tt.wantIndex, func(i, j int) bool { + return tt.wantIndex[i].Path < tt.wantIndex[j].Path + }) + + if d := cmp.Diff(tt.wantIndex, actualIndexShards); d != "" { t.Errorf("unexpected index (-want, +got):\n%s", d) } - if d := cmp.Diff(tt.wantTrash, glob(filepath.Join(dir, ".trash", "*.zoekt"))); d != "" { + + actualTrashShards := glob(filepath.Join(dir, ".trash", "*.zoekt")) + + sort.Slice(actualTrashShards, func(i, j int) bool { + return actualTrashShards[i].Path < actualTrashShards[j].Path + }) + + sort.Slice(tt.wantTrash, func(i, j int) bool { + return tt.wantTrash[i].Path < tt.wantTrash[j].Path + }) + if d := cmp.Diff(tt.wantTrash, actualTrashShards); d != "" { t.Errorf("unexpected trash (-want, +got):\n%s", d) } + + actualIndexMetaFiles := globBase(filepath.Join(dir, "*.meta")) + + sort.Strings(actualIndexMetaFiles) + sort.Strings(tt.wantIndexMetaFiles) + + if d := cmp.Diff(tt.wantIndexMetaFiles, actualIndexMetaFiles, cmpopts.EquateEmpty()); d != "" { + t.Errorf("unexpected metadata files (-want, +got):\n%s", d) + } + if tmps := globBase(filepath.Join(dir, "*.tmp")); len(tmps) > 0 { t.Errorf("unexpected tmps: %v", tmps) } From 76ae04c3850e18faf9ae56352e5b985ff5354594 Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Tue, 29 Nov 2022 11:39:54 -0800 Subject: [PATCH 06/11] switch deleteShards implementation to search across all shards in indexDir (instead of using buildOptions) --- cmd/zoekt-sourcegraph-indexserver/cleanup.go | 42 ++++++++++++------- .../cleanup_test.go | 34 ++++++++------- cmd/zoekt-sourcegraph-indexserver/main.go | 21 ++++------ 3 files changed, 54 insertions(+), 43 deletions(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup.go b/cmd/zoekt-sourcegraph-indexserver/cleanup.go index b6e208bce..286598fdc 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup.go @@ -15,7 +15,6 @@ import ( "github.com/grafana/regexp" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/sourcegraph/zoekt/build" "gopkg.in/natefinch/lumberjack.v2" "github.com/sourcegraph/zoekt" @@ -551,12 +550,21 @@ func removeTombstones(fn string) ([]*zoekt.Repository, error) { return tombstones, nil } -// deleteShards deletes all the shards that are associated with the repository specified -// in the build options. +// deleteShards deletes all the shards in indexDir that are associated with +// the given repoID. If the repository specified by repoID happens to be in a +// compound shard, the repository is tombstoned instead. // -// Users must hold the indexDir lock for this repository before calling deleteShards. -func deleteShards(options *build.Options) error { - shardPaths := options.FindAllShards() +// deleteShards returns errRepositoryNotFound if the repository specified by repoID +// isn't present in indexDir. +// +// Users must hold the global indexDir lock before calling deleteShards. +func deleteShards(indexDir string, repoID uint32) error { + shardMap := getShards(indexDir) + + shards, ok := shardMap[repoID] + if !ok { + return errRepositoryNotFound + } // Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic // works correctly. @@ -575,29 +583,31 @@ func deleteShards(options *build.Options) error { // Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent // state behind even if we crash. - sort.Slice(shardPaths, func(i, j int) bool { - return shardPaths[i] > shardPaths[j] + sort.Slice(shards, func(i, j int) bool { + return shards[i].Path > shards[j].Path }) - for _, shard := range shardPaths { + for _, s := range shards { + shardPath := s.Path + // Is this repository inside a compound shard? If so, set a tombstone // instead of deleting the shard outright. - if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(shard), "compound-") { - if !strings.HasSuffix(shard, ".zoekt") { + if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(shardPath), "compound-") { + if !strings.HasSuffix(shardPath, ".zoekt") { continue } - err := zoekt.SetTombstone(shard, options.RepositoryDescription.ID) + err := zoekt.SetTombstone(shardPath, repoID) if err != nil { - return fmt.Errorf("setting tombstone in shard %q: %w", shard, err) + return fmt.Errorf("setting tombstone in shard %q: %w", shardPath, err) } continue } - paths, err := zoekt.IndexFilePaths(shard) + paths, err := zoekt.IndexFilePaths(shardPath) if err != nil { - return fmt.Errorf("listing files for shard %q: %w", shard, err) + return fmt.Errorf("listing files for shard %q: %w", shardPath, err) } for _, p := range paths { @@ -610,3 +620,5 @@ func deleteShards(options *build.Options) error { return nil } + +var errRepositoryNotFound = errors.New("repository not found") diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go index c65ac6b23..931c603d0 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "net/url" "os" @@ -539,13 +540,7 @@ func TestDeleteShards(t *testing.T) { } // run test: delete repository - options := &build.Options{ - IndexDir: indexDir, - RepositoryDescription: repositoryToDelete, - } - options.SetDefaults() - - err := deleteShards(options) + err := deleteShards(indexDir, repositoryToDelete.ID) if err != nil { t.Errorf("unexpected error when deleting shards: %s", err) } @@ -586,14 +581,7 @@ func TestDeleteShards(t *testing.T) { repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete} shard := createTestCompoundShard(t, indexDir, repositories) - // run test: delete repository - options := &build.Options{ - IndexDir: indexDir, - RepositoryDescription: repositoryToDelete, - } - options.SetDefaults() - - err := deleteShards(options) + err := deleteShards(indexDir, repositoryToDelete.ID) if err != nil { t.Errorf("unexpected error when deleting shards: %s", err) } @@ -624,6 +612,22 @@ func TestDeleteShards(t *testing.T) { t.Errorf("unexpected diff in list of repositories (-want +got):\n%s", diff) } }) + + t.Run("returns errRepositoryNotFound if the repoID isn't in indexDir", func(t *testing.T) { + indexDir := t.TempDir() + + // setup: create compound shard with all repositories + repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete} + for _, r := range repositories { + createTestNormalShard(t, indexDir, r, 3) + } + + // test: delete some random repository and check to see if we get the expected error + err := deleteShards(indexDir, 7777777) + if !errors.Is(err, errRepositoryNotFound) { + t.Errorf("expected errRepositoryNotFound when deleting shards, got: %s", err) + } + }) } // createCompoundShard returns a path to a compound shard containing repos with diff --git a/cmd/zoekt-sourcegraph-indexserver/main.go b/cmd/zoekt-sourcegraph-indexserver/main.go index 446904eba..99cc87d81 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main.go +++ b/cmd/zoekt-sourcegraph-indexserver/main.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "flag" "fmt" "html/template" @@ -790,24 +791,18 @@ func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) { repoID := uint32(id64) - s.queue.mu.Lock() - defer s.queue.mu.Unlock() - - item := s.queue.get(repoID) - if item == nil { - http.Error(w, fmt.Sprintf("no repository found for id %q", rawID), http.StatusBadRequest) - return - } - var deletionError error - repoName := item.opts.Name - s.muIndexDir.With(repoName, func() { - o := s.indexArgs(item.opts).BuildOptions() - deletionError = deleteShards(o) + s.muIndexDir.Global(func() { + deletionError = deleteShards(s.IndexDir, repoID) }) if deletionError != nil { + if errors.Is(deletionError, errRepositoryNotFound) { + http.Error(w, fmt.Sprintf("repository id %q not found", rawID), http.StatusBadRequest) + return + } + http.Error(w, fmt.Sprintf("while deleting shards for repository id %q: %s", rawID, deletionError), http.StatusInternalServerError) return } From 4d58e64bf9a9b49a0fefedb0ebe8ca07f3d4d6ff Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Tue, 29 Nov 2022 12:06:47 -0800 Subject: [PATCH 07/11] wire up shardslog --- cmd/zoekt-sourcegraph-indexserver/cleanup.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup.go b/cmd/zoekt-sourcegraph-indexserver/cleanup.go index 286598fdc..fc44e2690 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup.go @@ -588,26 +588,16 @@ func deleteShards(indexDir string, repoID uint32) error { }) for _, s := range shards { - shardPath := s.Path - // Is this repository inside a compound shard? If so, set a tombstone // instead of deleting the shard outright. - if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(shardPath), "compound-") { - if !strings.HasSuffix(shardPath, ".zoekt") { - continue - } - - err := zoekt.SetTombstone(shardPath, repoID) - if err != nil { - return fmt.Errorf("setting tombstone in shard %q: %w", shardPath, err) - } - + if zoekt.ShardMergingEnabled() && maybeSetTombstone([]shard{s}, repoID) { + shardsLog(indexDir, "tomb", []shard{s}) continue } - paths, err := zoekt.IndexFilePaths(shardPath) + paths, err := zoekt.IndexFilePaths(s.Path) if err != nil { - return fmt.Errorf("listing files for shard %q: %w", shardPath, err) + return fmt.Errorf("listing files for shard %q: %w", s.Path, err) } for _, p := range paths { @@ -616,6 +606,8 @@ func deleteShards(indexDir string, repoID uint32) error { return fmt.Errorf("deleting %q: %w", p, err) } } + + shardsLog(indexDir, "delete", []shard{s}) } return nil From 4d8cf415dfb8a8448357770d048cd589f17dce65 Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Wed, 30 Nov 2022 15:01:32 -0800 Subject: [PATCH 08/11] factor out cleanup and delete logic to shared function --- cmd/zoekt-sourcegraph-indexserver/cleanup.go | 128 ++++++++---------- .../cleanup_test.go | 57 ++++---- cmd/zoekt-sourcegraph-indexserver/main.go | 20 +-- 3 files changed, 87 insertions(+), 118 deletions(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup.go b/cmd/zoekt-sourcegraph-indexserver/cleanup.go index fc44e2690..75a1a1784 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup.go @@ -94,23 +94,7 @@ func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) } log.Printf("removing shards for %v due to multiple repository names: %s", repo, strings.Join(paths, " ")) - // We may be in both normal and compound shards in this case. First - // tombstone the compound shards so we don't just rm them. - simple := shards[:0] - for _, s := range shards { - if shardMerging && maybeSetTombstone([]shard{s}, repo) { - shardsLog(indexDir, "tombname", []shard{s}) - } else { - simple = append(simple, s) - } - } - - if len(simple) == 0 { - continue - } - - removeAll(simple...) - shardsLog(indexDir, "removename", simple) + deleteOrTombstone(indexDir, repo, shardMerging, shards...) } // index: Move missing repos from trash into index @@ -326,11 +310,47 @@ func removeAll(shards ...shard) { // potential for a partial index for a repo. However, this should be // exceedingly rare due to it being a mix of partial failure on something in // trash + an admin re-adding a repository. - for _, shard := range shards { + + if len(shards) == 0 { + return + } + + // Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic + // works correctly + ensure that we don't leave behind partial state. + // + // Example: - repoA_v16.00002.zoekt + // - repoA_v16.00001.zoekt + // - repoA_v16.00000.zoekt + // + // zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard + // is present (repoA_v16.00000.zoekt). + // - If it's present, then it gathers all rest of the shards names in ascending order (...00001.zoekt, ...00002.zoekt). + // - If it's missing, then zoekt assumes that it never indexed "repoA" (the remaining data from shards 1 & 2 is effectively invisible) + // + // If this function were to crash while deleting repoA, and we only deleted the 0th shard, then : + // - zoekt would think that there is no data for that repository (despite the partial data from + // - it's possible for zoekt to show inconsistent state when re-indexing the repository (zoekt incorrectly + // associates the data from shards 1 and 2 with the "new" shard 0 data (from a newer commit)) + // + // Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent + // state behind even if we crash. + + var sortedShards []shard + for _, s := range shards { + sortedShards = append(sortedShards, s) + } + + sort.Slice(sortedShards, func(i, j int) bool { + return sortedShards[i].Path > sortedShards[j].Path + }) + + for _, shard := range sortedShards { paths, err := zoekt.IndexFilePaths(shard.Path) if err != nil { debug.Printf("failed to remove shard %s: %v", shard.Path, err) + } + for _, p := range paths { if err := os.Remove(p); err != nil { debug.Printf("failed to remove shard file %s: %v", p, err) @@ -550,67 +570,25 @@ func removeTombstones(fn string) ([]*zoekt.Repository, error) { return tombstones, nil } -// deleteShards deletes all the shards in indexDir that are associated with -// the given repoID. If the repository specified by repoID happens to be in a -// compound shard, the repository is tombstoned instead. +// deleteOrTombstone deletes the provided shards in indexDir that are associated with +// the given repoID. // -// deleteShards returns errRepositoryNotFound if the repository specified by repoID -// isn't present in indexDir. -// -// Users must hold the global indexDir lock before calling deleteShards. -func deleteShards(indexDir string, repoID uint32) error { - shardMap := getShards(indexDir) - - shards, ok := shardMap[repoID] - if !ok { - return errRepositoryNotFound - } - - // Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic - // works correctly. - // - // Example: - repoA_v16.00002.zoekt - // - repoA_v16.00001.zoekt - // - repoA_v16.00000.zoekt - // - // zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard - // is present (repoA_v16.00000.zoekt). If it's present, then it gathers all rest of the shards names in ascending order - // (...00001.zoekt, ...00002.zoekt). If it's missing, then zoekt assumes that it never indexed "repoA". - // - // If this function were to crash while deleting repoA, and we only deleted the 0th shard, then shard's 1 & 2 would never - // be cleaned up by Zoekt indexserver (since the 0th shard is the only shard that's tested). - // - // Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent - // state behind even if we crash. - - sort.Slice(shards, func(i, j int) bool { - return shards[i].Path > shards[j].Path - }) - +// If one of the provided shards is a compound shard and the repository is contained within it, +// the repository is tombstoned instead. +func deleteOrTombstone(indexDir string, repoID uint32, shardMerging bool, shards ...shard) { + var simple []shard for _, s := range shards { - // Is this repository inside a compound shard? If so, set a tombstone - // instead of deleting the shard outright. - if zoekt.ShardMergingEnabled() && maybeSetTombstone([]shard{s}, repoID) { - shardsLog(indexDir, "tomb", []shard{s}) - continue - } - - paths, err := zoekt.IndexFilePaths(s.Path) - if err != nil { - return fmt.Errorf("listing files for shard %q: %w", s.Path, err) - } - - for _, p := range paths { - err := os.Remove(p) - if err != nil { - return fmt.Errorf("deleting %q: %w", p, err) - } + if shardMerging && maybeSetTombstone([]shard{s}, repoID) { + shardsLog(indexDir, "tombname", []shard{s}) + } else { + simple = append(simple, s) } + } - shardsLog(indexDir, "delete", []shard{s}) + if len(simple) == 0 { + return } - return nil + removeAll(simple...) + shardsLog(indexDir, "removename", simple) } - -var errRepositoryNotFound = errors.New("repository not found") diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go index 931c603d0..965404132 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go @@ -1,7 +1,6 @@ package main import ( - "errors" "fmt" "net/url" "os" @@ -514,8 +513,11 @@ func TestDeleteShards(t *testing.T) { t.Run("delete repository from set of normal shards", func(t *testing.T) { indexDir := t.TempDir() - // map of repoID -> list of associated shard paths + metadata paths - shardMap := make(map[uint32][]string) + // map of repoID -> list of paths for associated shard files + metadata files + shardFilesMap := make(map[uint32][]string) + + // map of repoID -> list of associated shard structs + shardStructMap := make(map[uint32][]shard) // setup: create shards for each repository, and populate the shard map for _, r := range []zoekt.Repository{ @@ -523,11 +525,11 @@ func TestDeleteShards(t *testing.T) { remainingRepoB, repositoryToDelete, } { - shards := createTestNormalShard(t, indexDir, r, 3) + shardPaths := createTestNormalShard(t, indexDir, r, 3) - for _, shard := range shards { + for _, p := range shardPaths { // create stub meta file - metaFile := shard + ".meta" + metaFile := p + ".meta" f, err := os.Create(metaFile) if err != nil { t.Fatalf("creating metadata file %q: %s", metaFile, err) @@ -535,15 +537,17 @@ func TestDeleteShards(t *testing.T) { f.Close() - shardMap[r.ID] = append(shardMap[r.ID], shard, metaFile) + shardFilesMap[r.ID] = append(shardFilesMap[r.ID], p, metaFile) + shardStructMap[r.ID] = append(shardStructMap[r.ID], shard{ + RepoID: repositoryToDelete.ID, + RepoName: repositoryToDelete.Name, + Path: p, + }) } } // run test: delete repository - err := deleteShards(indexDir, repositoryToDelete.ID) - if err != nil { - t.Errorf("unexpected error when deleting shards: %s", err) - } + deleteOrTombstone(indexDir, repositoryToDelete.ID, false, shardStructMap[repositoryToDelete.ID]...) // run assertions: gather all the shards + meta files that remain and // check to see that only the files associated with the "remaining" repositories @@ -560,8 +564,8 @@ func TestDeleteShards(t *testing.T) { } var expectedShardFiles []string - expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoA.ID]...) - expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoB.ID]...) + expectedShardFiles = append(expectedShardFiles, shardFilesMap[remainingRepoA.ID]...) + expectedShardFiles = append(expectedShardFiles, shardFilesMap[remainingRepoB.ID]...) sort.Strings(actualShardFiles) sort.Strings(expectedShardFiles) @@ -579,16 +583,18 @@ func TestDeleteShards(t *testing.T) { // setup: create compound shard with all repositories repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete} - shard := createTestCompoundShard(t, indexDir, repositories) + compoundShard := createTestCompoundShard(t, indexDir, repositories) - err := deleteShards(indexDir, repositoryToDelete.ID) - if err != nil { - t.Errorf("unexpected error when deleting shards: %s", err) + s := shard{ + RepoID: repositoryToDelete.ID, + RepoName: repositoryToDelete.Name, + Path: compoundShard, } + deleteOrTombstone(indexDir, repositoryToDelete.ID, true, s) // verify: read the compound shard, and ensure that only // the repositories that we expect are in the shard (and the deleted one has been tombstoned) - actualRepositories, _, err := zoekt.ReadMetadataPathAlive(shard) + actualRepositories, _, err := zoekt.ReadMetadataPathAlive(compoundShard) if err != nil { t.Fatalf("reading repository metadata from shard: %s", err) } @@ -613,21 +619,6 @@ func TestDeleteShards(t *testing.T) { } }) - t.Run("returns errRepositoryNotFound if the repoID isn't in indexDir", func(t *testing.T) { - indexDir := t.TempDir() - - // setup: create compound shard with all repositories - repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete} - for _, r := range repositories { - createTestNormalShard(t, indexDir, r, 3) - } - - // test: delete some random repository and check to see if we get the expected error - err := deleteShards(indexDir, 7777777) - if !errors.Is(err, errRepositoryNotFound) { - t.Errorf("expected errRepositoryNotFound when deleting shards, got: %s", err) - } - }) } // createCompoundShard returns a path to a compound shard containing repos with diff --git a/cmd/zoekt-sourcegraph-indexserver/main.go b/cmd/zoekt-sourcegraph-indexserver/main.go index 99cc87d81..fe75bb49a 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main.go +++ b/cmd/zoekt-sourcegraph-indexserver/main.go @@ -6,7 +6,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "flag" "fmt" "html/template" @@ -791,19 +790,20 @@ func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) { repoID := uint32(id64) - var deletionError error - + repositoryNotFound := false s.muIndexDir.Global(func() { - deletionError = deleteShards(s.IndexDir, repoID) - }) - - if deletionError != nil { - if errors.Is(deletionError, errRepositoryNotFound) { - http.Error(w, fmt.Sprintf("repository id %q not found", rawID), http.StatusBadRequest) + shardMap := getShards(s.IndexDir) + shards, ok := shardMap[repoID] + if !ok { + repositoryNotFound = true return } - http.Error(w, fmt.Sprintf("while deleting shards for repository id %q: %s", rawID, deletionError), http.StatusInternalServerError) + deleteOrTombstone(s.IndexDir, repoID, s.shardMerging, shards...) + }) + + if repositoryNotFound { + http.Error(w, fmt.Sprintf("repository id %q not found", rawID), http.StatusBadRequest) return } } From 3270e03f0148c58c233f6de8b683c0c5c13319ed Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Thu, 8 Dec 2022 12:47:34 -0800 Subject: [PATCH 09/11] Update cmd/zoekt-sourcegraph-indexserver/cleanup.go Co-authored-by: Keegan Carruthers-Smith --- cmd/zoekt-sourcegraph-indexserver/cleanup.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup.go b/cmd/zoekt-sourcegraph-indexserver/cleanup.go index 75a1a1784..28ee80d7b 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup.go @@ -335,10 +335,7 @@ func removeAll(shards ...shard) { // Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent // state behind even if we crash. - var sortedShards []shard - for _, s := range shards { - sortedShards = append(sortedShards, s) - } + sortedShards := append([]shard{}, shards...) sort.Slice(sortedShards, func(i, j int) bool { return sortedShards[i].Path > sortedShards[j].Path From 83f68e7e1ccd671fe4621a1287ab639e784b405c Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Thu, 8 Dec 2022 12:47:41 -0800 Subject: [PATCH 10/11] Update cmd/zoekt-sourcegraph-indexserver/cleanup.go Co-authored-by: Keegan Carruthers-Smith --- cmd/zoekt-sourcegraph-indexserver/cleanup.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup.go b/cmd/zoekt-sourcegraph-indexserver/cleanup.go index 28ee80d7b..ea5f3cd26 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup.go @@ -345,7 +345,6 @@ func removeAll(shards ...shard) { paths, err := zoekt.IndexFilePaths(shard.Path) if err != nil { debug.Printf("failed to remove shard %s: %v", shard.Path, err) - } for _, p := range paths { From 4c2077d96b46cc40d7a3dbc6182af7097e8a7eee Mon Sep 17 00:00:00 2001 From: Geoffrey Gilmore Date: Thu, 8 Dec 2022 13:10:22 -0800 Subject: [PATCH 11/11] handler: write successful message upon deleting a repository --- cmd/zoekt-sourcegraph-indexserver/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/zoekt-sourcegraph-indexserver/main.go b/cmd/zoekt-sourcegraph-indexserver/main.go index fe75bb49a..91bef9c94 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main.go +++ b/cmd/zoekt-sourcegraph-indexserver/main.go @@ -806,6 +806,9 @@ func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("repository id %q not found", rawID), http.StatusBadRequest) return } + + _, _ = w.Write([]byte(fmt.Sprintf("deleted repository %q\n", rawID))) + } func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {