Commit b485cd2b authored by Nick Thomas's avatar Nick Thomas

Elasticsearch: allow initial indexing to proceed within Sidekiq

This commit introduces a `gitlab:elastic:index_repositories_async' rake task,
which can be used to enqueue a set of indexing jobs into Sidekiq. These will
be scheduled in the usual manner, in the `sidekiq` queue.

In theory, this allows Sidekiq's concurrency and distributed execution features
to be used to spread and control the load of indexing repositories.
parent 7fb9d56a
class ElasticBatchProjectIndexerWorker
include Sidekiq::Worker
include Gitlab::CurrentSettings
sidekiq_options queue: :elasticsearch, retry: 2
def perform(start, finish, update_index = false)
projects = build_relation(start, finish, update_index)
indexer = Gitlab::Elastic::Indexer.new
projects.find_each do |project|
repository = project.repository
next unless repository.exists? && !repository.empty?
begin
logger.info "Indexing #{project.name_with_namespace} (ID=#{project.id})..."
index_status = project.index_status || project.build_index_status
head_commit = repository.commit
if !head_commit || index_status.last_commit == head_commit.sha
logger.info("Skipped".color(:yellow))
next
end
indexer.run(
project.id,
repository.path_to_repo,
index_status.last_commit
)
# During indexing the new commits can be pushed,
# the last_commit parameter only indicates that at least this commit is in index
index_status.last_commit = head_commit.sha
index_status.indexed_at = DateTime.now
index_status.save
logger.info("Done!".color(:green))
rescue => err
logger.warn("#{err.message}, trace - #{err.backtrace}")
end
end
end
def build_relation(start, finish, update_index)
relation = Project.includes(:index_status)
if update_index
relation = relation.where('index_statuses.id IS NULL').references(:index_statuses)
end
table = Project.arel_table
relation = relation.where(table[:id].gteq(start)) if start
relation = relation.where(table[:id].lteq(finish)) if finish
relation
end
end
......@@ -58,3 +58,4 @@
- [project_update_repository_storage, 1]
- [admin_emails, 1]
- [geo_repository_update, 1]
- [elastic_batch_project_indexer, 1]
......@@ -9,50 +9,26 @@ namespace :gitlab do
Rake::Task["gitlab:elastic:index_database"].invoke
end
desc "GitLab | Elasticsearch | Index project repositories"
task index_repositories: :environment do
projects = if ENV['UPDATE_INDEX']
Project
else
Project.includes(:index_status).
where("index_statuses.id IS NULL").
references(:index_statuses)
end
projects = apply_project_filters(projects)
desc "GitLab | Elasticsearch | Index project repositories in the background"
task index_repositories_async: :environment do
print "Enqueuing project repositories in batches of #{batch_size}"
indexer = Gitlab::Elastic::Indexer.new
projects.find_each(batch_size: 300) do |project|
repository = project.repository
project_id_batches do |start, finish|
ElasticBatchProjectIndexerWorker.perform_async(start, finish, ENV['UPDATE_INDEX'])
print "."
end
if repository.exists? && !repository.empty?
puts "Indexing #{project.name_with_namespace} (ID=#{project.id})..."
puts "OK"
end
index_status = IndexStatus.find_or_create_by(project: project)
desc "GitLab | Elasticsearch | Index project repositories"
task index_repositories: :environment do
print "Indexing project repositories..."
begin
head_commit = repository.commit
if !head_commit || index_status.last_commit == head_commit.sha
puts "Skipped".color(:yellow)
next
end
indexer.run(
project.id,
repository.path_to_repo,
index_status.last_commit
)
# During indexing the new commits can be pushed,
# the last_commit parameter only indicates that at least this commit is in index
index_status.update(last_commit: head_commit.sha, indexed_at: DateTime.now)
puts "Done!".color(:green)
rescue StandardError => e
puts "#{e.message}, trace - #{e.backtrace}"
end
end
Sidekiq::Logging.logger = Logger.new(STDOUT)
project_id_batches do |start, finish|
puts [start, finish].inspect
ElasticBatchProjectIndexerWorker.new.perform(start, finish, ENV['UPDATE_INDEX'])
end
end
......@@ -116,6 +92,23 @@ namespace :gitlab do
puts "Index recreated".color(:green)
end
def batch_size
ENV.fetch('BATCH', 300).to_i
end
def project_id_batches(&blk)
relation = Project
if ENV['UPDATE_INDEX']
relation = relation.includes(:index_status).where('index_statuses.id IS NULL').references(:index_statuses)
end
relation.all.in_batches(of: batch_size, start: ENV['ID_FROM'], finish: ENV['ID_TO']) do |relation|
ids = relation.reorder(:id).pluck(:id)
yield ids[0], ids[-1]
end
end
def apply_project_filters(projects)
if ENV['ID_FROM']
projects = projects.where("projects.id >= ?", ENV['ID_FROM'])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment