diff --git a/osu.ElasticIndexer/IndexQueueProcessor.cs b/osu.ElasticIndexer/IndexQueueProcessor.cs index 3df0d52..b62affd 100644 --- a/osu.ElasticIndexer/IndexQueueProcessor.cs +++ b/osu.ElasticIndexer/IndexQueueProcessor.cs @@ -38,21 +38,38 @@ internal IndexQueueProcessor(string index, OsuElasticClient elasticClient, Actio protected override void ProcessResults(IEnumerable items) { - ProcessableItemsBuffer buffer; + List additions = new List(); + List deletions = new List(); using (var conn = GetDatabaseConnection()) - buffer = new ProcessableItemsBuffer(conn, items); + { + Dictionary scores = ElasticModel.Find(conn, items.Select(i => i.ScoreId)).ToDictionary(s => s.id, s => s); + + foreach (var item in items) + { + if (scores.TryGetValue(item.ScoreId, out var score) && score.ShouldIndex) + { + item.Tags = (item.Tags ?? Array.Empty()).Concat(new[] { "action:add", $"type:{(score.is_legacy ? "legacy" : "normal")}", $"ruleset:{score.ruleset_id}" }).ToArray(); + additions.Add(score); + } + else + { + item.Tags = (item.Tags ?? Array.Empty()).Append("action:remove").ToArray(); + deletions.Add(item.ScoreId); + } + } + } - if (buffer.Additions.Any() || buffer.Deletions.Any()) + if (additions.Any() || deletions.Any()) { var bulkDescriptor = new BulkDescriptor() // Disabling exceptions streamlines error handling; all the relevant info will be in the response. // With exceptions, some of the source data gets lost when it's put into the exception message. .RequestConfiguration(r => r.ThrowExceptions(false)) .Index(index) - .IndexMany(buffer.Additions) + .IndexMany(additions) // type is needed for ids https://github.com/elastic/elasticsearch-net/issues/3500 - .DeleteMany(buffer.Deletions); + .DeleteMany(deletions); var response = elasticClient.Bulk(bulkDescriptor); handleResponse(response, items);