Commit b62a5608 authored by Gabriel Mazetto's avatar Gabriel Mazetto

Refactored UpdateQueue, related code and added specs

parent 3fa2f84f
module Geo
class BaseService
NAMESPACE = :geo
protected
def redis_connection
redis_config_file = Rails.root.join('config', 'resque.yml')
redis_url_string = if File.exists?(redis_config_file)
YAML.load_file(redis_config_file)[Rails.env]
else
'redis://localhost:6379'
end
Redis::Namespace.new(NAMESPACE, redis: Redis.new(url: redis_url_string))
def initialize
@queue = Gitlab::Geo::UpdateQueue.new
end
end
end
......@@ -3,12 +3,12 @@ module Geo
attr_reader :project
def initialize(project)
super()
@project = project
@redis = redis_connection
end
def execute
@redis.rpush('updated_projects', @project.id)
@queue.store({ id: @project.id, clone_url: @project.url_to_repo })
end
end
end
module Geo
class NotifyNodesService < Geo::BaseService
include HTTParty
BATCH_SIZE = 250
QUEUE = 'updated_projects'
# HTTParty timeout
default_timeout Gitlab.config.gitlab.webhook_timeout
def initialize
@redis = redis_connection
end
def execute
queue_size = @redis.llen(QUEUE)
return if queue_size == 0
if queue_size > BATCH_SIZE
batch_size = BATCH_SIZE
else
batch_size = queue_size
end
projects = []
@redis.multi do
projects = @redis.lrange(QUEUE, 0, batch_size-1)
@redis.ltrim(QUEUE, batch_size, -1)
end
return if @queue.empty?
projects = @queue.fetch_batched_data
::Gitlab::Geo.secondary_nodes.each do |node|
success, message = notify_updated_projects(node, projects.value)
success, message = notify_updated_projects(node, projects)
unless success
Rails.logger.error("Gitlab Failed to notify #{node.url} : #{message}")
reenqueue_projects(projects.value)
@queue.store_batched_data(projects)
end
end
end
......@@ -55,14 +37,5 @@ module Geo
# TODO: should we ask admin user to be defined as part of configuration?
@private_token ||= User.find_by(admin: true).authentication_token
end
def reenqueue_projects(projects)
@redis.pipelined do
projects.each do |project|
# enqueue again to the head of the queue
@redis.lpush(QUEUE, project)
end
end
end
end
end
......@@ -7,8 +7,8 @@ module Geo
end
def execute
@projects.each do |project_id|
GeoRepositoryUpdateWorker.perform_async(project_id)
@projects.each do |project|
GeoRepositoryUpdateWorker.perform_async(project['id'], project['clone_url'])
end
end
end
......
......@@ -6,16 +6,16 @@ class GeoRepositoryUpdateWorker
attr_accessor :project
def perform(project_id)
def perform(project_id, clone_url)
@project = Project.find(project_id)
@project.create_repository unless @project.repository_exists?
fetch_repository(@project.repository, @project.url_to_repo)
fetch_repository(@project, clone_url)
end
private
def fetch_repository(repository, remote_url)
repository.fetch_geo_mirror(remote_url)
def fetch_repository(project, remote_url)
project.create_repository unless project.repository_exists?
project.repository.fetch_geo_mirror(remote_url)
end
end
module Gitlab
module Geo
class UpdateQueue
BATCH_SIZE = 250
NAMESPACE = :geo
QUEUE = 'updated_projects'
def initialize
@redis = redis_connection
end
def store(data)
@redis.rpush(QUEUE, data.to_json) and expire_queue_size!
end
def first
data = fetch(0, 0)
data.first unless data.empty?
end
def last
data = fetch(-1, -1)
data.first unless data.empty?
end
def fetch_batched_data
projects = []
bsize = batch_size
@redis.multi do
projects = @redis.lrange(QUEUE, 0, bsize-1)
@redis.ltrim(QUEUE, bsize, -1)
end
expire_queue_size!
deserialize(projects.value)
end
def store_batched_data(projects)
@redis.pipelined do
projects.reverse_each do |project|
# enqueue again to the head of the queue
@redis.lpush(QUEUE, project.to_json)
end
end
expire_queue_size!
end
def batch_size
queue_size > BATCH_SIZE ? BATCH_SIZE : queue_size
end
def queue_size
@queue_size ||= fetch_queue_size
end
def empty?
queue_size == 0
end
def empty!
@redis.del(QUEUE)
end
protected
def fetch(start, stop)
deserialize(@redis.lrange(QUEUE, start, stop))
end
def fetch_queue_size
@redis.llen(QUEUE)
end
def expire_queue_size!
@queue_size = nil
end
def deserialize(data)
data.map! { |item| JSON.parse(item) } unless data.empty?
data
end
def redis_connection
redis_config_file = Rails.root.join('config', 'resque.yml')
redis_url_string = if File.exists?(redis_config_file)
YAML.load_file(redis_config_file)[Rails.env]
else
'redis://localhost:6379'
end
Redis::Namespace.new(NAMESPACE, redis: Redis.new(url: redis_url_string))
end
end
end
end
describe Gitlab::Geo::UpdateQueue do
subject { described_class.new }
let(:dummy_data) { { 'id' => 1, 'clone_url' => 'git@localhost:repo/path.git' } }
let(:dummy_data2) { { 'id' => 99, 'clone_url' => 'git@localhost:other_repo/path.git' } }
let(:multiple_dummy_data) { [dummy_data, dummy_data2] * 10 }
before(:each) { subject.empty! }
describe '#store' do
before(:each) { subject.store(dummy_data) }
it 'stores data to the queue' do
expect(subject).not_to be_empty
end
it 'stored data is equal to original' do
expect(subject.first).to eq(dummy_data)
end
end
context 'when queue has elements' do
before(:each) do
subject.store(dummy_data)
subject.store(dummy_data2)
end
describe '#first' do
it { expect(subject.first).to eq(dummy_data) }
end
describe '#last' do
it { expect(subject.last).to eq(dummy_data2) }
end
end
describe '#fetch_batched_data' do
before(:each) { subject.store_batched_data(multiple_dummy_data) }
it 'returns same stored data' do
expect(subject.fetch_batched_data).to eq(multiple_dummy_data)
end
end
describe '#store_batched_data' do
let(:ordered_data) { [{ 'a' => 1 }, { 'a' => 2 }, { 'a' => 3 }, { 'a' => 4 }, { 'a' => 5 }] }
it 'stores multiple items to the queue' do
expect { subject.store_batched_data(multiple_dummy_data) }.to change { subject.batch_size }.by(multiple_dummy_data.size)
end
it 'returns data in equal order to original' do
subject.store_batched_data(ordered_data)
expect(subject.first).to eq(ordered_data.first)
expect(subject.last).to eq(ordered_data.last)
end
end
describe '#batch_size' do
before(:each) { allow(subject).to receive(:queue_size) { queue_size } }
context 'when queue size is smaller than BATCH_SIZE' do
let(:queue_size) { described_class::BATCH_SIZE - 20 }
it 'equals to the queue size' do
expect(subject.batch_size).to eq(queue_size)
end
end
context 'when queue size is bigger than BATCH_SIZE' do
let(:queue_size) { described_class::BATCH_SIZE + 20 }
it 'equals to the BATCH_SIZE' do
expect(subject.batch_size).to eq(described_class::BATCH_SIZE)
end
end
end
describe '#queue_size' do
it 'returns the ammount of items in queue' do
expect { subject.store(dummy_data) }.to change { subject.queue_size }.by(1)
end
end
describe '#empty?' do
it 'returns true when empty' do
is_expected.to be_empty
end
it 'returns false when there are enqueue data' do
subject.store(dummy_data)
is_expected.not_to be_empty
end
end
end
describe Geo::EnqueueUpdateService, service: true do
subject { Geo::EnqueueUpdateService.new(project) }
let(:project) { double(:project) }
let(:fake_url) { 'git@localhost:repo/path.git' }
let(:fake_id) { 999 }
let(:queue) { subject.instance_variable_get(:@queue) }
before(:each) do
queue.empty!
expect(project).to receive(:url_to_repo) { fake_url }
expect(project).to receive(:id) { fake_id }
end
describe '#execute' do
let(:stored_data) { queue.first }
before(:each) { subject.execute }
it 'persists id and clone_url to redis queue' do
expect(stored_data).to have_key('id')
expect(stored_data).to have_key('clone_url')
end
it 'persisted id is equal to original' do
expect(stored_data['id']).to eq(fake_id)
end
it 'persisted clone_url is equal to original' do
expect(stored_data['clone_url']).to eq(fake_url)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment