Commit c7b28a64 authored by Gabriel Mazetto's avatar Gabriel Mazetto

WIP: Repository synchronization between primary and secondary nodes

parent 064d34e4
......@@ -27,4 +27,8 @@ class GeoNode < ActiveRecord::Base
def url
uri.to_s
end
def notify_url
# TODO: Notification endpoint
end
end
module Geo
class BaseService
NAMESPACE = :geo
protected
def redis_connection
redis_config_file = Rails.root.join('config', 'resque.yml')
redis_url_string = if File.exists?(redis_config_file)
YAML.load_file(redis_config_file)[Rails.env]
else
'redis://localhost:6379'
end
Redis::Namespace.new(NAMESPACE, redis: Redis.new(url: redis_url_string))
end
end
end
require_relative 'base_service'
module Geo
class EnqueueUpdateService < BaseService
attr_reader :project
def initialize(project)
@project = project
@redis = redis_connection
end
def execute
@redis.rpush('updated_projects', @project.id)
end
end
end
require_relative 'base_service'
module Geo
class NotifyNodesService < BaseService
include HTTParty
BATCH_SIZE = 250
# HTTParty timeout
default_timeout Gitlab.config.gitlab.webhook_timeout
def initialize
@redis = redis_connection
end
def execute
queue_size = @redis.llen('updated_projects')
return if queue_size == 0
if queue_size > BATCH_SIZE
batch_size = BATCH_SIZE
else
batch_size = queue_size
end
projects = []
@redis.multi do |redis|
projects = redis.lrange(0, batch_size-1)
redis.ltrim(0, batch_size-1)
end
::Gitlab::Geo.secondary_nodes.each do |node|
notify_updated_projects(node, projects.value)
end
end
private
def notify_updated_projects(node, projects)
self.post(node.notify_url,
body: projects.to_json,
headers: {
"Content-Type" => "application/json",
"X-Gitlab-Geo-Event" => "Update Repositories"
})
# TODO: Authentication
[(response.code >= 200 && response.code < 300), ActionView::Base.full_sanitizer.sanitize(response.to_s)]
end
end
end
class ScheduleRepoUpdateService
attr_reader :projects
def initialize(projects)
@projects = projects
end
def execute
@projects.each do |project_id|
# TODO: async repository fetch
end
end
end
class GeoRepositoryUpdateWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
sidekiq_options queue: :gitlab_shell
attr_accessor :project, :repository, :current_user
def perform(project_id)
@project = Project.find(project_id)
# @current_user = @project.mirror_user || @project.creator
# Projects::UpdateMirrorService.new(@project, @current_user).execute
end
end
......@@ -37,6 +37,9 @@ class PostReceive
return false
end
# Triggers repository update on secondary nodes when Geo is enabled
Gitlab::Geo.notify_update(project) if Gitlab::Geo.enabled?
if Gitlab::Git.tag_ref?(ref)
GitTagPushService.new.execute(project, @user, oldrev, newrev, ref)
else
......
......@@ -9,7 +9,11 @@ module Gitlab
end
def self.primary_node
RequestStore.store[:geo_node_primary] ||= GeoNode.find_by(primary: true)
RequestStore.store[:geo_primary_node] ||= GeoNode.find_by(primary: true)
end
def self.secondary_nodes
RequestStore.store[:geo_secondary_nodes] ||= GeoNode.find_by(primary: false)
end
def self.enabled?
......@@ -23,5 +27,9 @@ module Gitlab
def self.geo_node?(host:, port:)
GeoNode.where(host: host, port: port).exists?
end
def self.notify_update(project)
::Geo::EnqueueUpdateService.new(project).execute
end
end
end
......@@ -68,4 +68,15 @@ describe Gitlab::Geo, lib: true do
expect(described_class.geo_node?(host: 'inexistent', port: 1234)).to be_falsey
end
end
describe 'notify_update' do
let(:project) { FactoryGirl.build(:project) }
it 'delegates to NotifyService' do
expect(Geo::EnqueueUpdateService).to receive(:new).with(project).and_call_original
expect_any_instance_of(Geo::EnqueueUpdateService).to receive(:execute)
described_class.notify_update(project)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment