Commit 61fb7906 authored by Gabriel Mazetto's avatar Gabriel Mazetto

Refactored UpdateQueue to be more generic.

Renamed EnqueueUpdateService to be specific for Project Updates
parent 545b6b1d
module Geo
class BaseService
def initialize
@queue = Gitlab::Geo::UpdateQueue.new
end
end
end
module Geo module Geo
class EnqueueUpdateService < Geo::BaseService class EnqueueProjectUpdateService
attr_reader :project attr_reader :project
def initialize(project) def initialize(project)
super() @queue = Gitlab::Geo::UpdateQueue.new('updated_projects')
@project = project @project = project
end end
......
module Geo module Geo
class NotifyNodesService < Geo::BaseService class NotifyNodesService
include HTTParty include HTTParty
# HTTParty timeout # HTTParty timeout
default_timeout Gitlab.config.gitlab.webhook_timeout default_timeout Gitlab.config.gitlab.webhook_timeout
def initialize
@queue = Gitlab::Geo::UpdateQueue.new('updated_projects')
end
def execute def execute
return if @queue.empty? return if @queue.empty?
projects = @queue.fetch_batched_data projects = @queue.fetch_batched_data
......
...@@ -35,7 +35,7 @@ module Gitlab ...@@ -35,7 +35,7 @@ module Gitlab
end end
def self.notify_update(project) def self.notify_update(project)
::Geo::EnqueueUpdateService.new(project).execute ::Geo::EnqueueProjectUpdateService.new(project).execute
end end
def self.bulk_notify_job def self.bulk_notify_job
......
...@@ -3,10 +3,13 @@ module Gitlab ...@@ -3,10 +3,13 @@ module Gitlab
class UpdateQueue class UpdateQueue
BATCH_SIZE = 250 BATCH_SIZE = 250
NAMESPACE = 'geo:gitlab' NAMESPACE = 'geo:gitlab'
QUEUE = 'updated_projects'
def initialize(queue)
@queue = queue
end
def store(data) def store(data)
redis.rpush(QUEUE, data.to_json) redis.rpush(@queue, data.to_json)
expire_queue_size! expire_queue_size!
end end
...@@ -25,8 +28,8 @@ module Gitlab ...@@ -25,8 +28,8 @@ module Gitlab
bsize = batch_size bsize = batch_size
redis.multi do redis.multi do
projects = redis.lrange(QUEUE, 0, bsize - 1) projects = redis.lrange(@queue, 0, bsize - 1)
redis.ltrim(QUEUE, bsize, -1) redis.ltrim(@queue, bsize, -1)
end end
expire_queue_size! expire_queue_size!
...@@ -37,7 +40,7 @@ module Gitlab ...@@ -37,7 +40,7 @@ module Gitlab
redis.pipelined do redis.pipelined do
projects.reverse_each do |project| projects.reverse_each do |project|
# enqueue again to the head of the queue # enqueue again to the head of the queue
redis.lpush(QUEUE, project.to_json) redis.lpush(@queue, project.to_json)
end end
end end
expire_queue_size! expire_queue_size!
...@@ -56,17 +59,17 @@ module Gitlab ...@@ -56,17 +59,17 @@ module Gitlab
end end
def empty! def empty!
redis.del(QUEUE) redis.del(@queue)
end end
protected protected
def fetch(start, stop) def fetch(start, stop)
deserialize(redis.lrange(QUEUE, start, stop)) deserialize(redis.lrange(@queue, start, stop))
end end
def fetch_queue_size def fetch_queue_size
redis.llen(QUEUE) redis.llen(@queue)
end end
def expire_queue_size! def expire_queue_size!
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::UpdateQueue do describe Gitlab::Geo::UpdateQueue do
subject { described_class.new } subject { described_class.new('test_queue') }
let(:dummy_data) { { 'id' => 1, 'clone_url' => 'git@localhost:repo/path.git' } } let(:dummy_data) { { 'id' => 1, 'clone_url' => 'git@localhost:repo/path.git' } }
let(:dummy_data2) { { 'id' => 99, 'clone_url' => 'git@localhost:other_repo/path.git' } } let(:dummy_data2) { { 'id' => 99, 'clone_url' => 'git@localhost:other_repo/path.git' } }
let(:multiple_dummy_data) { [dummy_data, dummy_data2] * 10 } let(:multiple_dummy_data) { [dummy_data, dummy_data2] * 10 }
......
...@@ -73,8 +73,8 @@ describe Gitlab::Geo, lib: true do ...@@ -73,8 +73,8 @@ describe Gitlab::Geo, lib: true do
let(:project) { FactoryGirl.build(:project) } let(:project) { FactoryGirl.build(:project) }
it 'delegates to NotifyService' do it 'delegates to NotifyService' do
expect(Geo::EnqueueUpdateService).to receive(:new).with(project).and_call_original expect(Geo::EnqueueProjectUpdateService).to receive(:new).with(project).and_call_original
expect_any_instance_of(Geo::EnqueueUpdateService).to receive(:execute) expect_any_instance_of(Geo::EnqueueProjectUpdateService).to receive(:execute)
described_class.notify_update(project) described_class.notify_update(project)
end end
......
describe Geo::EnqueueUpdateService, service: true do describe Geo::EnqueueProjectUpdateService, service: true do
subject { Geo::EnqueueUpdateService.new(project) } subject { Geo::EnqueueProjectUpdateService.new(project) }
let(:project) { double(:project) } let(:project) { double(:project) }
let(:fake_url) { 'git@localhost:repo/path.git' } let(:fake_url) { 'git@localhost:repo/path.git' }
let(:fake_id) { 999 } let(:fake_id) { 999 }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment