Commit 5602d2f6 authored by mbergeron's avatar mbergeron

Make the ElasticCommitIndexer idempotent

This commit deprecates two parameters on the worker so that the worker
can be marked as idempotent, and thus enable the de-duplication.
parent 30cc0b78
...@@ -14,8 +14,8 @@ module Elastic ...@@ -14,8 +14,8 @@ module Elastic
end end
end end
def index_commits_and_blobs(from_rev: nil, to_rev: nil) def index_commits_and_blobs
::ElasticCommitIndexerWorker.perform_async(project.id, from_rev, to_rev) ::ElasticCommitIndexerWorker.perform_async(project.id)
end end
end end
end end
...@@ -8,8 +8,8 @@ module Elastic ...@@ -8,8 +8,8 @@ module Elastic
delegate(:delete_index_for_commits_and_blobs, :elastic_search, to: :__elasticsearch__) delegate(:delete_index_for_commits_and_blobs, :elastic_search, to: :__elasticsearch__)
def index_wiki_blobs(to_sha = nil) def index_wiki_blobs
ElasticCommitIndexerWorker.perform_async(project.id, nil, to_sha, true) ElasticCommitIndexerWorker.perform_async(project.id, nil, nil, true)
end end
end end
end end
...@@ -61,8 +61,7 @@ module EE ...@@ -61,8 +61,7 @@ module EE
after_transition started: :finished do |state, _| after_transition started: :finished do |state, _|
if state.project.use_elasticsearch? if state.project.use_elasticsearch?
state.run_after_commit do state.run_after_commit do
last_indexed_commit = state.project.index_status&.last_commit ElasticCommitIndexerWorker.perform_async(state.project_id)
ElasticCommitIndexerWorker.perform_async(state.project_id, last_indexed_commit)
end end
end end
end end
......
...@@ -18,7 +18,7 @@ module EE ...@@ -18,7 +18,7 @@ module EE
def enqueue_elasticsearch_indexing def enqueue_elasticsearch_indexing
return unless should_index_commits? return unless should_index_commits?
project.repository.index_commits_and_blobs(from_rev: oldrev, to_rev: newrev) project.repository.index_commits_and_blobs
end end
def enqueue_update_external_pull_requests def enqueue_update_external_pull_requests
......
...@@ -10,11 +10,9 @@ module EE ...@@ -10,11 +10,9 @@ module EE
super super
return unless project.use_elasticsearch? return unless project.use_elasticsearch?
return unless default_branch_changes.any?
# For all changes on the default branch (usually master) trigger an ES update project.wiki.index_wiki_blobs
default_branch_changes.each do |change|
project.wiki.index_wiki_blobs(change[:newrev])
end
end end
end end
end end
......
...@@ -485,7 +485,7 @@ ...@@ -485,7 +485,7 @@
:urgency: :throttled :urgency: :throttled
:resource_boundary: :unknown :resource_boundary: :unknown
:weight: 1 :weight: 1
:idempotent: :idempotent: true
- :name: elastic_full_index - :name: elastic_full_index
:feature_category: :global_search :feature_category: :global_search
:has_external_dependencies: :has_external_dependencies:
......
# frozen_string_literal: true # frozen_string_literal: true
class ElasticCommitIndexerWorker # rubocop:disable Scalability/IdempotentWorker class ElasticCommitIndexerWorker
include ApplicationWorker include ApplicationWorker
feature_category :global_search feature_category :global_search
sidekiq_options retry: 2 sidekiq_options retry: 2
urgency :throttled urgency :throttled
idempotent!
# Performs the commits and blobs indexation
#
# project_id - The ID of the project to index
# oldrev @deprecated - The revision to start indexing at (default: INDEXED_SHA)
# newrev @deprecated - The revision to stop indexing at (default: HEAD)
# wiki - Treat this project as a Wiki
#
# The indexation will cover all commits within INDEXED_SHA..HEAD
def perform(project_id, oldrev = nil, newrev = nil, wiki = false) def perform(project_id, oldrev = nil, newrev = nil, wiki = false)
return true unless Gitlab::CurrentSettings.elasticsearch_indexing? return true unless Gitlab::CurrentSettings.elasticsearch_indexing?
project = Project.find(project_id) project = Project.find(project_id)
return true unless project.use_elasticsearch? return true unless project.use_elasticsearch?
Gitlab::Elastic::Indexer.new(project, wiki: wiki).run(newrev) Gitlab::Elastic::Indexer.new(project, wiki: wiki).run
end end
end end
---
title: Make the ElasticCommitIndexer idempotent to enable job de-duplication.
merge_request: 31500
author: mbergeron
type: performance
...@@ -40,9 +40,7 @@ module Elastic ...@@ -40,9 +40,7 @@ module Elastic
def search_commit(query, page: 1, per: 20, options: {}) def search_commit(query, page: 1, per: 20, options: {})
page ||= 1 page ||= 1
fields = %w(message^10 sha^5 author.name^2 author.email^2 committer.name committer.email).map {|i| "commit.#{i}"} fields = %w(message^10 sha^5 author.name^2 author.email^2 committer.name committer.email).map {|i| "commit.#{i}"}
query_with_prefix = query.split(/\s+/).map { |s| s.gsub(SHA_REGEX) { |sha| "#{sha}*" } }.join(' ') query_with_prefix = query.split(/\s+/).map { |s| s.gsub(SHA_REGEX) { |sha| "#{sha}*" } }.join(' ')
query_hash = { query_hash = {
......
...@@ -16,7 +16,8 @@ module Gitlab ...@@ -16,7 +16,8 @@ module Gitlab
end end
end end
attr_reader :project, :index_status attr_reader :project, :index_status, :wiki
alias_method :index_wiki?, :wiki
def initialize(project, wiki: false) def initialize(project, wiki: false)
@project = project @project = project
...@@ -26,45 +27,52 @@ module Gitlab ...@@ -26,45 +27,52 @@ module Gitlab
@index_status = project.index_status @index_status = project.index_status
end end
def run(to_sha = nil) # Runs the indexation process, which is the following:
to_sha = nil if to_sha == Gitlab::Git::BLANK_SHA # - Purge the index for any unreachable commits;
# - Run the `gitlab-elasticsearch-indexer`;
head_commit = repository.try(:commit) # - Update the `index_status` for the associated project;
#
if repository.nil? || !repository.exists? || repository.empty? || head_commit.nil? # ref - Git ref up to which the indexation will run (default: HEAD)
update_index_status(Gitlab::Git::BLANK_SHA) def run(ref = 'HEAD')
return commit = find_indexable_commit(ref)
end return update_index_status(Gitlab::Git::BLANK_SHA) unless commit
repository.__elasticsearch__.elastic_writing_targets.each do |target| repository.__elasticsearch__.elastic_writing_targets.each do |target|
run_indexer!(to_sha, target) Sidekiq.logger.debug(message: "Indexation running for #{project.id} #{from_sha}..#{commit.sha}",
project_id: project.id,
wiki: index_wiki?)
run_indexer!(commit.sha, target)
end end
update_index_status(to_sha)
# update the index status only if all writes were successful
update_index_status(commit.sha)
true true
end end
private def find_indexable_commit(ref)
!repository.empty? && repository.commit(ref)
def wiki?
@wiki
end end
private
def repository def repository
wiki? ? project.wiki.repository : project.repository index_wiki? ? project.wiki.repository : project.repository
end end
def run_indexer!(to_sha, target) def run_indexer!(to_sha, target)
vars = build_envvars(to_sha, target) # This might happen when default branch has been reset or rebased.
base_sha = if purge_unreachable_commits_from_index!(to_sha, target)
if index_status && !repository_contains_last_indexed_commit? Gitlab::Git::EMPTY_TREE_ID
target.delete_index_for_commits_and_blobs(wiki: wiki?) else
from_sha
end end
vars = build_envvars(base_sha, to_sha, target)
path_to_indexer = Gitlab.config.elasticsearch.indexer_path path_to_indexer = Gitlab.config.elasticsearch.indexer_path
command = command =
if wiki? if index_wiki?
[path_to_indexer, "--blob-type=wiki_blob", "--skip-commits", project.id.to_s, repository_path] [path_to_indexer, "--blob-type=wiki_blob", "--skip-commits", project.id.to_s, repository_path]
else else
[path_to_indexer, project.id.to_s, repository_path] [path_to_indexer, project.id.to_s, repository_path]
...@@ -75,7 +83,19 @@ module Gitlab ...@@ -75,7 +83,19 @@ module Gitlab
raise Error, output unless status&.zero? raise Error, output unless status&.zero?
end end
def build_envvars(to_sha, target) # Remove all indexed data for commits and blobs for a project.
#
# @return: whether the index has been purged
def purge_unreachable_commits_from_index!(to_sha, target)
return false if last_commit_ancestor_of?(to_sha)
target.delete_index_for_commits_and_blobs(wiki: index_wiki?)
true
rescue ::Elasticsearch::Transport::Transport::Errors::BadRequest => e
Gitlab::ErrorTracking.track_exception(e, project_id: project.id)
end
def build_envvars(from_sha, to_sha, target)
# We accept any form of settings, including string and array # We accept any form of settings, including string and array
# This is why JSON is needed # This is why JSON is needed
vars = { vars = {
...@@ -96,16 +116,14 @@ module Gitlab ...@@ -96,16 +116,14 @@ module Gitlab
end end
def last_commit def last_commit
if wiki? index_wiki? ? index_status&.last_wiki_commit : index_status&.last_commit
index_status&.last_wiki_commit
else
index_status&.last_commit
end
end end
def from_sha def from_sha
strong_memoize(:from_sha) do
repository_contains_last_indexed_commit? ? last_commit : Gitlab::Git::EMPTY_TREE_ID repository_contains_last_indexed_commit? ? last_commit : Gitlab::Git::EMPTY_TREE_ID
end end
end
def repository_contains_last_indexed_commit? def repository_contains_last_indexed_commit?
strong_memoize(:repository_contains_last_indexed_commit) do strong_memoize(:repository_contains_last_indexed_commit) do
...@@ -113,6 +131,15 @@ module Gitlab ...@@ -113,6 +131,15 @@ module Gitlab
end end
end end
def last_commit_ancestor_of?(to_sha)
return true if from_sha == Gitlab::Git::BLANK_SHA
return false unless repository_contains_last_indexed_commit?
# we always treat the `EMPTY_TREE_ID` as an ancestor to make sure
# we don't try to purge an empty index
from_sha == Gitlab::Git::EMPTY_TREE_ID || repository.ancestor?(from_sha, to_sha)
end
def repository_path def repository_path
"#{repository.disk_path}.git" "#{repository.disk_path}.git"
end end
...@@ -131,7 +158,7 @@ module Gitlab ...@@ -131,7 +158,7 @@ module Gitlab
# rubocop: disable CodeReuse/ActiveRecord # rubocop: disable CodeReuse/ActiveRecord
def update_index_status(to_sha) def update_index_status(to_sha)
head_commit = repository.try(:commit) raise "Invalid sha #{to_sha}" unless to_sha.present?
# An index_status should always be created, # An index_status should always be created,
# even if the repository is empty, so we know it's been looked at. # even if the repository is empty, so we know it's been looked at.
...@@ -142,17 +169,11 @@ module Gitlab ...@@ -142,17 +169,11 @@ module Gitlab
retry retry
end end
# Don't update the index status if we never reached HEAD
return if head_commit && to_sha && head_commit.sha != to_sha
sha = head_commit.try(:sha)
sha ||= Gitlab::Git::BLANK_SHA
attributes = attributes =
if wiki? if index_wiki?
{ last_wiki_commit: sha, wiki_indexed_at: Time.now } { last_wiki_commit: to_sha, wiki_indexed_at: Time.now }
else else
{ last_commit: sha, indexed_at: Time.now } { last_commit: to_sha, indexed_at: Time.now }
end end
@index_status.update(attributes) @index_status.update(attributes)
......
This diff is collapsed.
...@@ -33,13 +33,12 @@ describe ProjectWiki, :elastic do ...@@ -33,13 +33,12 @@ describe ProjectWiki, :elastic do
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
project.wiki.find_page('omega_page').delete project.wiki.find_page('omega_page').delete
last_commit = project.wiki.repository.commit.sha
expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer| expect_next_instance_of(Gitlab::Elastic::Indexer) do |indexer|
expect(indexer).to receive(:run).with(last_commit).and_call_original expect(indexer).to receive(:run).and_call_original
end end
project.wiki.index_wiki_blobs(last_commit) project.wiki.index_wiki_blobs
ensure_elasticsearch_index! ensure_elasticsearch_index!
end end
......
...@@ -52,7 +52,7 @@ describe ProjectImportState, type: :model do ...@@ -52,7 +52,7 @@ describe ProjectImportState, type: :model do
context 'no index status' do context 'no index status' do
it 'schedules a full index of the repository' do it 'schedules a full index of the repository' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(import_state.project_id, nil) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(import_state.project_id)
import_state.finish import_state.finish
end end
...@@ -61,8 +61,8 @@ describe ProjectImportState, type: :model do ...@@ -61,8 +61,8 @@ describe ProjectImportState, type: :model do
context 'with index status' do context 'with index status' do
let(:index_status) { IndexStatus.create!(project: project, indexed_at: Time.now, last_commit: 'foo') } let(:index_status) { IndexStatus.create!(project: project, indexed_at: Time.now, last_commit: 'foo') }
it 'schedules a progressive index of the repository' do it 'schedules a full index of the repository' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(import_state.project_id, index_status.last_commit) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(import_state.project_id)
import_state.finish import_state.finish
end end
......
...@@ -55,7 +55,7 @@ describe Git::BranchPushService do ...@@ -55,7 +55,7 @@ describe Git::BranchPushService do
end end
it 'runs ElasticCommitIndexerWorker' do it 'runs ElasticCommitIndexerWorker' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, oldrev, newrev) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id)
subject.execute subject.execute
end end
...@@ -95,7 +95,7 @@ describe Git::BranchPushService do ...@@ -95,7 +95,7 @@ describe Git::BranchPushService do
end end
it 'runs ElasticCommitIndexerWorker' do it 'runs ElasticCommitIndexerWorker' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, oldrev, newrev) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id)
subject.execute subject.execute
end end
...@@ -110,7 +110,7 @@ describe Git::BranchPushService do ...@@ -110,7 +110,7 @@ describe Git::BranchPushService do
end end
it 'runs ElasticCommitIndexerWorker' do it 'runs ElasticCommitIndexerWorker' do
expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id, oldrev, newrev) expect(ElasticCommitIndexerWorker).to receive(:perform_async).with(project.id)
subject.execute subject.execute
end end
......
...@@ -28,7 +28,7 @@ describe Git::WikiPushService do ...@@ -28,7 +28,7 @@ describe Git::WikiPushService do
end end
it 'triggers a wiki update' do it 'triggers a wiki update' do
expect(project.wiki).to receive(:index_wiki_blobs).with("797823") expect(project.wiki).to receive(:index_wiki_blobs)
described_class.new(project, project.owner, changes: post_received.changes).execute described_class.new(project, project.owner, changes: post_received.changes).execute
end end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment