Commit 52bc4c43 authored by Dmitry Gruzd's avatar Dmitry Gruzd

Move ProcessBookkeepingService to sharded approach

This change moves Elastic::ProcessBookkeepingService and
Elastic::ProcessInitialBookkeepingService to using sharded zsets instead
of having 1 big ZSET in redis, which often appears in redis slowlogs.
The sharding code uses SHA256 as hashing function.
parent 83b0a4b7
# frozen_string_literal: true
module Elastic
class BookkeepingShardService
def self.shard_number(number_of_shards:, data:)
Digest::SHA256.hexdigest(data).hex % number_of_shards
end
end
end
...@@ -2,11 +2,23 @@ ...@@ -2,11 +2,23 @@
module Elastic module Elastic
class ProcessBookkeepingService class ProcessBookkeepingService
REDIS_SET_KEY = 'elastic:incremental:updates:0:zset' SHARD_LIMIT = 1_000
REDIS_SCORE_KEY = 'elastic:incremental:updates:0:score' SHARDS_NUMBER = 16
LIMIT = 10_000 SHARDS = 0.upto(SHARDS_NUMBER - 1).to_a
class << self class << self
def shard_number(data)
Elastic::BookkeepingShardService.shard_number(number_of_shards: SHARDS_NUMBER, data: data)
end
def redis_set_key(shard_number)
"elastic:incremental:updates:#{shard_number}:zset"
end
def redis_score_key(shard_number)
"elastic:incremental:updates:#{shard_number}:score"
end
# Add some records to the processing queue. Items must be serializable to # Add some records to the processing queue. Items must be serializable to
# a Gitlab::Elastic::DocumentReference # a Gitlab::Elastic::DocumentReference
def track!(*items) def track!(*items)
...@@ -14,19 +26,25 @@ module Elastic ...@@ -14,19 +26,25 @@ module Elastic
items.map! { |item| ::Gitlab::Elastic::DocumentReference.serialize(item) } items.map! { |item| ::Gitlab::Elastic::DocumentReference.serialize(item) }
items_by_shard = items.group_by { |item| shard_number(item) }
with_redis do |redis| with_redis do |redis|
items_by_shard.each do |shard_number, shard_items|
set_key = redis_set_key(shard_number)
# Efficiently generate a guaranteed-unique score for each item # Efficiently generate a guaranteed-unique score for each item
max = redis.incrby(self::REDIS_SCORE_KEY, items.size) max = redis.incrby(redis_score_key(shard_number), shard_items.size)
min = (max - items.size) + 1 min = (max - shard_items.size) + 1
(min..max).zip(items).each_slice(1000) do |group| (min..max).zip(shard_items).each_slice(1000) do |group|
logger.debug(class: self.name, logger.debug(class: self.name,
redis_set: self::REDIS_SET_KEY, redis_set: set_key,
message: 'track_items', message: 'track_items',
count: group.count, count: group.count,
tracked_items_encoded: group.to_json) tracked_items_encoded: group.to_json)
redis.zadd(self::REDIS_SET_KEY, group) redis.zadd(set_key, group)
end
end end
end end
...@@ -34,14 +52,39 @@ module Elastic ...@@ -34,14 +52,39 @@ module Elastic
end end
def queue_size def queue_size
with_redis { |redis| redis.zcard(self::REDIS_SET_KEY) } with_redis do |redis|
SHARDS.sum do |shard_number|
redis.zcard(redis_set_key(shard_number))
end
end
end
def queued_items
{}.tap do |hash|
with_redis do |redis|
each_queued_items_by_shard(redis) do |shard_number, specs|
hash[shard_number] = specs if specs.present?
end
end
end
end end
def clear_tracking! def clear_tracking!
with_redis do |redis| with_redis do |redis|
Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do Gitlab::Instrumentation::RedisClusterValidator.allow_cross_slot_commands do
redis.unlink(self::REDIS_SET_KEY, self::REDIS_SCORE_KEY) keys = SHARDS.map { |m| [redis_set_key(m), redis_score_key(m)] }.flatten
redis.unlink(*keys)
end
end
end end
def each_queued_items_by_shard(redis)
SHARDS.each do |shard_number|
set_key = redis_set_key(shard_number)
specs = redis.zrangebyscore(set_key, '-inf', '+inf', limit: [0, SHARD_LIMIT], with_scores: true)
yield shard_number, specs
end end
end end
...@@ -88,41 +131,55 @@ module Elastic ...@@ -88,41 +131,55 @@ module Elastic
def execute_with_redis(redis) def execute_with_redis(redis)
start_time = Time.current start_time = Time.current
specs = redis.zrangebyscore(self.class::REDIS_SET_KEY, '-inf', '+inf', limit: [0, LIMIT], with_scores: true) specs_buffer = []
return 0 if specs.empty? scores = {}
self.class.each_queued_items_by_shard(redis) do |shard_number, specs|
next if specs.empty?
set_key = self.class.redis_set_key(shard_number)
first_score = specs.first.last first_score = specs.first.last
last_score = specs.last.last last_score = specs.last.last
logger.info( logger.info(
message: 'bulk_indexing_start', message: 'bulk_indexing_start',
redis_set: set_key,
records_count: specs.count, records_count: specs.count,
first_score: first_score, first_score: first_score,
last_score: last_score last_score: last_score
) )
refs = deserialize_all(specs) specs_buffer += specs
scores[set_key] = [first_score, last_score, specs.count]
end
return 0 if specs_buffer.blank?
refs = deserialize_all(specs_buffer)
refs.preload_database_records.each { |ref| submit_document(ref) } refs.preload_database_records.each { |ref| submit_document(ref) }
failures = bulk_indexer.flush failures = bulk_indexer.flush
# Re-enqueue any failures so they are retried # Re-enqueue any failures so they are retried
self.class.track!(*failures) if failures.present? self.class.track!(*failures) if failures.present?
# Remove all the successes # Remove all the successes
redis.zremrangebyscore(self.class::REDIS_SET_KEY, first_score, last_score) scores.each do |set_key, (first_score, last_score, count)|
redis.zremrangebyscore(set_key, first_score, last_score)
records_count = specs.count
logger.info( logger.info(
message: 'bulk_indexing_end', message: 'bulk_indexing_end',
records_count: records_count, redis_set: set_key,
failures_count: failures.count, records_count: count,
first_score: first_score, first_score: first_score,
last_score: last_score, last_score: last_score,
failures_count: failures.count,
bulk_execution_duration_s: Time.current - start_time bulk_execution_duration_s: Time.current - start_time
) )
end
records_count specs_buffer.count
end end
def deserialize_all(specs) def deserialize_all(specs)
......
...@@ -2,8 +2,6 @@ ...@@ -2,8 +2,6 @@
module Elastic module Elastic
class ProcessInitialBookkeepingService < Elastic::ProcessBookkeepingService class ProcessInitialBookkeepingService < Elastic::ProcessBookkeepingService
REDIS_SET_KEY = 'elastic:bulk:initial:0:zset'
REDIS_SCORE_KEY = 'elastic:bulk:initial:0:score'
INDEXED_PROJECT_ASSOCIATIONS = [ INDEXED_PROJECT_ASSOCIATIONS = [
:issues, :issues,
:merge_requests, :merge_requests,
...@@ -13,6 +11,14 @@ module Elastic ...@@ -13,6 +11,14 @@ module Elastic
].freeze ].freeze
class << self class << self
def redis_set_key(shard_number)
"elastic:bulk:initial:#{shard_number}:zset"
end
def redis_score_key(shard_number)
"elastic:bulk:initial:#{shard_number}:score"
end
def backfill_projects!(*projects) def backfill_projects!(*projects)
track!(*projects) track!(*projects)
......
---
title: Use shards for Advanced Search indexing
merge_request: 55047
author:
type: performance
...@@ -3,26 +3,27 @@ ...@@ -3,26 +3,27 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_state do RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_state do
around do |example|
described_class.with_redis do |redis|
@redis = redis
example.run
end
end
let(:zset) { 'elastic:incremental:updates:0:zset' }
let(:redis) { @redis }
let(:ref_class) { ::Gitlab::Elastic::DocumentReference } let(:ref_class) { ::Gitlab::Elastic::DocumentReference }
let(:fake_refs) { Array.new(10) { |i| ref_class.new(Issue, i, "issue_#{i}", 'project_1') } } let(:fake_refs) { Array.new(10) { |i| ref_class.new(Issue, i, "issue_#{i}", 'project_1') } }
let(:issue) { fake_refs.first } let(:issue) { fake_refs.first }
let(:issue_spec) { issue.serialize } let(:issue_spec) { issue.serialize }
describe '.shard_number' do
it 'returns correct shard number' do
shard = described_class.shard_number(ref_class.serialize(fake_refs.first))
expect(shard).to eq(9)
end
end
describe '.track' do describe '.track' do
it 'enqueues a record' do it 'enqueues a record' do
described_class.track!(issue) described_class.track!(issue)
spec, score = redis.zrange(zset, 0, 0, with_scores: true).first shard = described_class.shard_number(issue_spec)
spec, score = described_class.queued_items[shard].first
expect(spec).to eq(issue_spec) expect(spec).to eq(issue_spec)
expect(score).to eq(1.0) expect(score).to eq(1.0)
...@@ -32,11 +33,24 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st ...@@ -32,11 +33,24 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st
described_class.track!(*fake_refs) described_class.track!(*fake_refs)
expect(described_class.queue_size).to eq(fake_refs.size) expect(described_class.queue_size).to eq(fake_refs.size)
expect(described_class.queued_items.keys).to contain_exactly(0, 1, 3, 4, 6, 8, 9, 10, 13)
end
it 'orders items based on when they were added and moves them to the back of the queue if they were added again' do
shard_number = 9
item1_in_shard = ref_class.new(Issue, 0, 'issue_0', 'project_1')
item2_in_shard = ref_class.new(Issue, 8, 'issue_8', 'project_1')
described_class.track!(item1_in_shard)
described_class.track!(item2_in_shard)
expect(described_class.queued_items[shard_number][0]).to eq([item1_in_shard.serialize, 1.0])
expect(described_class.queued_items[shard_number][1]).to eq([item2_in_shard.serialize, 2.0])
(spec1, score1), (_, score2), _ = redis.zrange(zset, 0, -1, with_scores: true) described_class.track!(item1_in_shard)
expect(score1).to be < score2 expect(described_class.queued_items[shard_number][0]).to eq([item2_in_shard.serialize, 2.0])
expect(spec1).to eq(issue_spec) expect(described_class.queued_items[shard_number][1]).to eq([item1_in_shard.serialize, 3.0])
end end
it 'enqueues 10 identical records as 1 entry' do it 'enqueues 10 identical records as 1 entry' do
...@@ -59,8 +73,20 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st ...@@ -59,8 +73,20 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st
described_class.track!(*fake_refs) described_class.track!(*fake_refs)
expect(described_class.queue_size).to eq(fake_refs.size) expect(described_class.queue_size).to eq(fake_refs.size)
end
end
describe '.queued_items' do
it 'reports queued items' do
expect(described_class.queued_items).to be_empty
described_class.track!(*fake_refs.take(3))
expect { redis.zadd(zset, 0, 'foo') }.to change(described_class, :queue_size).by(1) expect(described_class.queued_items).to eq(
4 => [["Issue 1 issue_1 project_1", 1.0]],
6 => [["Issue 2 issue_2 project_1", 1.0]],
9 => [["Issue 0 issue_0 project_1", 1.0]]
)
end end
end end
...@@ -99,10 +125,26 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st ...@@ -99,10 +125,26 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st
end end
describe '#execute' do describe '#execute' do
let(:limit) { 5 } let(:shard_limit) { 5 }
let(:shard_number) { 2 }
let(:limit) { shard_limit * shard_number }
before do before do
stub_const('Elastic::ProcessBookkeepingService::LIMIT', limit) stub_const('Elastic::ProcessBookkeepingService::SHARD_LIMIT', shard_limit)
stub_const('Elastic::ProcessBookkeepingService::SHARDS_NUMBER', shard_number)
end
context 'limit is less than refs count' do
let(:shard_limit) { 2 }
it 'processes only up to limit' do
described_class.track!(*fake_refs)
expect(described_class.queue_size).to eq(fake_refs.size)
allow_processing(*fake_refs)
expect { described_class.new.execute }.to change(described_class, :queue_size).by(-limit)
end
end end
it 'submits a batch of documents' do it 'submits a batch of documents' do
...@@ -137,7 +179,8 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st ...@@ -137,7 +179,8 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st
expect { described_class.new.execute }.to change(described_class, :queue_size).by(-limit + 1) expect { described_class.new.execute }.to change(described_class, :queue_size).by(-limit + 1)
serialized = redis.zrange(zset, -1, -1).first shard = described_class.shard_number(failed.serialize)
serialized = described_class.queued_items[shard].first[0]
expect(ref_class.deserialize(serialized)).to eq(failed) expect(ref_class.deserialize(serialized)).to eq(failed)
end end
...@@ -172,5 +215,13 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st ...@@ -172,5 +215,13 @@ RSpec.describe Elastic::ProcessBookkeepingService, :clean_gitlab_redis_shared_st
expect(indexer).to receive(:flush) { failures } expect(indexer).to receive(:flush) { failures }
end end
end end
def allow_processing(*refs, failures: [])
expect_next_instance_of(::Gitlab::Elastic::BulkIndexer) do |indexer|
refs.each { |ref| allow(indexer).to receive(:process).with(anything) }
expect(indexer).to receive(:flush) { failures }
end
end
end end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment