Commit c0337aa7 authored by Douwe Maan's avatar Douwe Maan

Merge branch 'gitlab-geo-wiki-sync' into 'master'

GitLab Geo: Wiki Synchronization

We should be able to replicate Wikis it in Geo (#76) as we do with any git project.

Fixes https://gitlab.com/gitlab-org/gitlab-ee/issues/367

# Checklist

- [x] When a wiki is updated by git on secondary node, we must enqueue and buffer this action in an specific redis queue
- [x] When a wiki is updated by web UI on secondary node, we must enqueue and buffer this action in an specific redis queue
- [x] When we pool for updated projects we must also query for updated Wikis
- [x] A secondary node must receive in a different endpoint bulk notifications of updated Wikis
- [x] When a updated wiki notification is received, it must generated async jobs for every wiki, to be updated

See merge request !267
parents 9006f046 59ff067e
...@@ -45,6 +45,9 @@ class Projects::WikisController < Projects::ApplicationController ...@@ -45,6 +45,9 @@ class Projects::WikisController < Projects::ApplicationController
return render('empty') unless can?(current_user, :create_wiki, @project) return render('empty') unless can?(current_user, :create_wiki, @project)
if @page.update(content, format, message) if @page.update(content, format, message)
# Triggers repository update on secondary nodes when Geo is enabled
Gitlab::Geo.notify_wiki_update(@project) if Gitlab::Geo.primary?
redirect_to( redirect_to(
namespace_project_wiki_path(@project.namespace, @project, @page), namespace_project_wiki_path(@project.namespace, @project, @page),
notice: 'Wiki was successfully updated.' notice: 'Wiki was successfully updated.'
...@@ -58,6 +61,9 @@ class Projects::WikisController < Projects::ApplicationController ...@@ -58,6 +61,9 @@ class Projects::WikisController < Projects::ApplicationController
@page = WikiPage.new(@project_wiki) @page = WikiPage.new(@project_wiki)
if @page.create(wiki_params) if @page.create(wiki_params)
# Triggers repository update on secondary nodes when Geo is enabled
Gitlab::Geo.notify_wiki_update(@project) if Gitlab::Geo.primary?
redirect_to( redirect_to(
namespace_project_wiki_path(@project.namespace, @project, @page), namespace_project_wiki_path(@project.namespace, @project, @page),
notice: 'Wiki was successfully updated.' notice: 'Wiki was successfully updated.'
...@@ -80,8 +86,13 @@ class Projects::WikisController < Projects::ApplicationController ...@@ -80,8 +86,13 @@ class Projects::WikisController < Projects::ApplicationController
def destroy def destroy
@page = @project_wiki.find_page(params[:id]) @page = @project_wiki.find_page(params[:id])
@page.delete if @page if @page
@page.delete
# Triggers repository update on secondary nodes when Geo is enabled
Gitlab::Geo.notify_wiki_update(@project) if Gitlab::Geo.primary?
end
redirect_to( redirect_to(
namespace_project_wiki_path(@project.namespace, @project, :home), namespace_project_wiki_path(@project.namespace, @project, :home),
notice: "Page was successfully deleted" notice: "Page was successfully deleted"
......
...@@ -52,10 +52,14 @@ class GeoNode < ActiveRecord::Base ...@@ -52,10 +52,14 @@ class GeoNode < ActiveRecord::Base
self.relative_url_root = new_uri.path != '/' ? new_uri.path : '' self.relative_url_root = new_uri.path != '/' ? new_uri.path : ''
end end
def notify_url def notify_projects_url
URI.join(uri, "#{uri.path}/", "api/#{API::API.version}/geo/refresh_projects").to_s URI.join(uri, "#{uri.path}/", "api/#{API::API.version}/geo/refresh_projects").to_s
end end
def notify_wikis_url
URI.join(uri, "#{uri.path}/", "api/#{API::API.version}/geo/refresh_wikis").to_s
end
def oauth_callback_url def oauth_callback_url
URI.join(uri, "#{uri.path}/", 'oauth/geo/callback').to_s URI.join(uri, "#{uri.path}/", 'oauth/geo/callback').to_s
end end
......
...@@ -54,6 +54,10 @@ class ProjectWiki ...@@ -54,6 +54,10 @@ class ProjectWiki
end end
end end
def repository_exists?
!!repository.exists?
end
def empty? def empty?
pages.empty? pages.empty?
end end
......
...@@ -196,7 +196,7 @@ class Repository ...@@ -196,7 +196,7 @@ class Repository
end end
def fetch_remote_forced!(remote) def fetch_remote_forced!(remote)
gitlab_shell.fetch_remote(path_with_namespace, remote, true) gitlab_shell.fetch_remote(path_with_namespace, remote, forced: true)
end end
def branch_names def branch_names
......
module Geo
class BaseService
def initialize
@queue = Gitlab::Geo::UpdateQueue.new
end
end
end
module Geo module Geo
class EnqueueUpdateService < Geo::BaseService class EnqueueProjectUpdateService
attr_reader :project attr_reader :project
def initialize(project) def initialize(project)
super() @queue = Gitlab::Geo::UpdateQueue.new('updated_projects')
@project = project @project = project
end end
......
module Geo
class EnqueueWikiUpdateService
attr_reader :project
def initialize(project)
@queue = Gitlab::Geo::UpdateQueue.new('updated_wikis')
@project = project
end
def execute
@queue.store({ id: @project.id, clone_url: @project.wiki.url_to_repo })
end
end
end
module Geo module Geo
class NotifyNodesService < Geo::BaseService class NotifyNodesService
include HTTParty include HTTParty
# HTTParty timeout # HTTParty timeout
default_timeout Gitlab.config.gitlab.webhook_timeout default_timeout Gitlab.config.gitlab.webhook_timeout
def initialize
@proj_queue = Gitlab::Geo::UpdateQueue.new('updated_projects')
@wiki_queue = Gitlab::Geo::UpdateQueue.new('updated_wikis')
end
def execute def execute
return if @queue.empty? process(@proj_queue, :notify_projects_url)
projects = @queue.fetch_batched_data process(@wiki_queue, :notify_wikis_url)
end
private
def process(queue, notify_url_method)
return if queue.empty?
projects = queue.fetch_batched_data
::Gitlab::Geo.secondary_nodes.each do |node| ::Gitlab::Geo.secondary_nodes.each do |node|
success, message = notify_updated_projects(node, projects) notify_url = node.send(notify_url_method.to_sym)
success, message = notify(notify_url, projects)
unless success unless success
Rails.logger.error("GitLab failed to notify #{node.url} : #{message}") Rails.logger.error("GitLab failed to notify #{node.url} to #{notify_url} : #{message}")
@queue.store_batched_data(projects) queue.store_batched_data(projects)
end end
end end
end end
private def notify(notify_url, projects)
response = self.class.post(notify_url,
def notify_updated_projects(node, projects)
response = self.class.post(node.notify_url,
body: { projects: projects }.to_json, body: { projects: projects }.to_json,
headers: { headers: {
'Content-Type' => 'application/json', 'Content-Type' => 'application/json',
......
module Geo
class ScheduleWikiRepoUpdateService
attr_reader :projects
def initialize(projects)
@projects = projects
end
def execute
@projects.each do |project|
GeoWikiRepositoryUpdateWorker.perform_async(project['id'], project['clone_url'])
end
end
end
end
...@@ -9,13 +9,13 @@ class GeoRepositoryUpdateWorker ...@@ -9,13 +9,13 @@ class GeoRepositoryUpdateWorker
def perform(project_id, clone_url) def perform(project_id, clone_url)
@project = Project.find(project_id) @project = Project.find(project_id)
fetch_repository(@project, clone_url) fetch_repository(clone_url)
end end
private private
def fetch_repository(project, remote_url) def fetch_repository(remote_url)
project.create_repository unless project.repository_exists? @project.create_repository unless @project.repository_exists?
project.repository.fetch_geo_mirror(remote_url) @project.repository.fetch_geo_mirror(remote_url)
end end
end end
class GeoWikiRepositoryUpdateWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
sidekiq_options queue: :default
attr_accessor :project
def perform(project_id, clone_url)
@project = Project.find(project_id)
fetch_repository(clone_url)
end
private
def fetch_repository(remote_url)
# Second .wiki call returns a Gollum::Wiki, and it will always create the physical repository when not found
if @project.wiki.wiki.exist?
@project.wiki.repository.fetch_geo_mirror(remote_url)
end
end
end
class PostReceive class PostReceive
include Sidekiq::Worker include Sidekiq::Worker
include Gitlab::Identifier
sidekiq_options queue: :post_receive sidekiq_options queue: :post_receive
...@@ -11,67 +10,57 @@ class PostReceive ...@@ -11,67 +10,57 @@ class PostReceive
log("Check gitlab.yml config for correct gitlab_shell.repos_path variable. \"#{Gitlab.config.gitlab_shell.repos_path}\" does not match \"#{repo_path}\"") log("Check gitlab.yml config for correct gitlab_shell.repos_path variable. \"#{Gitlab.config.gitlab_shell.repos_path}\" does not match \"#{repo_path}\"")
end end
repo_path.gsub!(/\.git\z/, "") post_received = Gitlab::GitPostReceive.new(repo_path, identifier, changes)
repo_path.gsub!(/\A\//, "")
update_wiki_es_indexes(repo_path) if post_received.project.nil?
project = Project.find_with_namespace(repo_path)
if project.nil?
log("Triggered hook for non-existing project with full path \"#{repo_path} \"") log("Triggered hook for non-existing project with full path \"#{repo_path} \"")
return false return false
end end
changes = Base64.decode64(changes) unless changes.include?(" ") if post_received.wiki?
changes = utf8_encode_changes(changes) update_wiki_es_indexes(post_received)
changes = changes.lines
# Triggers repository update on secondary nodes when Geo is enabled
Gitlab::Geo.notify_wiki_update(post_received.project) if Gitlab::Geo.enabled?
elsif post_received.regular_project?
# Triggers repository update on secondary nodes when Geo is enabled
Gitlab::Geo.notify_project_update(post_received.project) if Gitlab::Geo.enabled?
process_project_changes(post_received)
else
log("Triggered hook for unidentifiable repository type with full path \"#{repo_path} \"")
false
end
end
changes.each do |change| def process_project_changes(post_received)
post_received.changes.each do |change|
oldrev, newrev, ref = change.strip.split(' ') oldrev, newrev, ref = change.strip.split(' ')
@user ||= identify(identifier, project, newrev) @user ||= post_received.identify(newrev)
unless @user unless @user
log("Triggered hook for non-existing user \"#{identifier} \"") log("Triggered hook for non-existing user \"#{post_received.identifier} \"")
return false return false
end end
# Triggers repository update on secondary nodes when Geo is enabled
Gitlab::Geo.notify_update(project) if Gitlab::Geo.enabled?
if Gitlab::Git.tag_ref?(ref) if Gitlab::Git.tag_ref?(ref)
GitTagPushService.new.execute(project, @user, oldrev, newrev, ref) GitTagPushService.new.execute(post_received.project, @user, oldrev, newrev, ref)
else else
GitPushService.new(project, @user, oldrev: oldrev, newrev: newrev, ref: ref).execute GitPushService.new(post_received.project, @user, oldrev: oldrev, newrev: newrev, ref: ref).execute
end end
end end
end end
def utf8_encode_changes(changes) def update_wiki_es_indexes(post_received)
changes = changes.dup return unless Gitlab.config.elasticsearch.enabled
changes.force_encoding("UTF-8")
return changes if changes.valid_encoding?
# Convert non-UTF-8 branch/tag names to UTF-8 so they can be dumped as JSON.
detection = CharlockHolmes::EncodingDetector.detect(changes)
return changes unless detection && detection[:encoding]
CharlockHolmes::Converter.convert(changes, detection[:encoding], 'UTF-8') post_received.project.wiki.index_blobs
end end
private
def log(message) def log(message)
Gitlab::GitLogger.error("POST-RECEIVE: #{message}") Gitlab::GitLogger.error("POST-RECEIVE: #{message}")
end end
def update_wiki_es_indexes(repo_path)
return unless repo_path =~ /wiki\z/ && Gitlab.config.elasticsearch.enabled
project = Project.find_with_namespace(repo_path.gsub(/\.wiki\z/, ""))
if project
project.wiki.index_blobs
end
end
end end
...@@ -13,6 +13,16 @@ module API ...@@ -13,6 +13,16 @@ module API
required_attributes! [:projects] required_attributes! [:projects]
::Geo::ScheduleRepoUpdateService.new(params[:projects]).execute ::Geo::ScheduleRepoUpdateService.new(params[:projects]).execute
end end
# Enqueue a batch of IDs of wiki's projects to have their
# wiki repositories updated
#
# Example request:
# POST /refresh_wikis
post 'refresh_wikis' do
required_attributes! [:projects]
::Geo::ScheduleWikiRepoUpdateService.new(params[:projects]).execute
end
end end
end end
end end
...@@ -34,8 +34,12 @@ module Gitlab ...@@ -34,8 +34,12 @@ module Gitlab
GeoNode.where(host: host, port: port).exists? GeoNode.where(host: host, port: port).exists?
end end
def self.notify_update(project) def self.notify_project_update(project)
::Geo::EnqueueUpdateService.new(project).execute ::Geo::EnqueueProjectUpdateService.new(project).execute
end
def self.notify_wiki_update(project)
::Geo::EnqueueWikiUpdateService.new(project).execute
end end
def self.bulk_notify_job def self.bulk_notify_job
......
...@@ -3,10 +3,13 @@ module Gitlab ...@@ -3,10 +3,13 @@ module Gitlab
class UpdateQueue class UpdateQueue
BATCH_SIZE = 250 BATCH_SIZE = 250
NAMESPACE = 'geo:gitlab' NAMESPACE = 'geo:gitlab'
QUEUE = 'updated_projects'
def initialize(queue)
@queue = queue
end
def store(data) def store(data)
redis.rpush(QUEUE, data.to_json) redis.rpush(@queue, data.to_json)
expire_queue_size! expire_queue_size!
end end
...@@ -25,8 +28,8 @@ module Gitlab ...@@ -25,8 +28,8 @@ module Gitlab
bsize = batch_size bsize = batch_size
redis.multi do redis.multi do
projects = redis.lrange(QUEUE, 0, bsize - 1) projects = redis.lrange(@queue, 0, bsize - 1)
redis.ltrim(QUEUE, bsize, -1) redis.ltrim(@queue, bsize, -1)
end end
expire_queue_size! expire_queue_size!
...@@ -37,7 +40,7 @@ module Gitlab ...@@ -37,7 +40,7 @@ module Gitlab
redis.pipelined do redis.pipelined do
projects.reverse_each do |project| projects.reverse_each do |project|
# enqueue again to the head of the queue # enqueue again to the head of the queue
redis.lpush(QUEUE, project.to_json) redis.lpush(@queue, project.to_json)
end end
end end
expire_queue_size! expire_queue_size!
...@@ -56,17 +59,17 @@ module Gitlab ...@@ -56,17 +59,17 @@ module Gitlab
end end
def empty! def empty!
redis.del(QUEUE) redis.del(@queue)
end end
protected protected
def fetch(start, stop) def fetch(start, stop)
deserialize(redis.lrange(QUEUE, start, stop)) deserialize(redis.lrange(@queue, start, stop))
end end
def fetch_queue_size def fetch_queue_size
redis.llen(QUEUE) redis.llen(@queue)
end end
def expire_queue_size! def expire_queue_size!
......
module Gitlab
class GitPostReceive
include Gitlab::Identifier
attr_reader :repo_path, :identifier, :changes, :project
def initialize(repo_path, identifier, changes)
repo_path.gsub!(/\.git\z/, '')
repo_path.gsub!(/\A\//, '')
@repo_path = repo_path
@identifier = identifier
@changes = deserialize_changes(changes)
retrieve_project_and_type
end
def wiki?
@type == :wiki
end
def regular_project?
@type == :project
end
def identify(revision)
super(identifier, project, revision)
end
private
def retrieve_project_and_type
@type = :project
@project = Project.find_with_namespace(@repo_path)
if @repo_path.end_with?('.wiki') && !@project
@type = :wiki
@project = Project.find_with_namespace(@repo_path.gsub(/\.wiki\z/, ''))
end
end
def deserialize_changes(changes)
changes = Base64.decode64(changes) unless changes.include?(' ')
changes = utf8_encode_changes(changes)
changes.lines
end
def utf8_encode_changes(changes)
changes = changes.dup
changes.force_encoding('UTF-8')
return changes if changes.valid_encoding?
# Convert non-UTF-8 branch/tag names to UTF-8 so they can be dumped as JSON.
detection = CharlockHolmes::EncodingDetector.detect(changes)
return changes unless detection && detection[:encoding]
CharlockHolmes::Converter.convert(changes, detection[:encoding], 'UTF-8')
end
end
end
...@@ -2,7 +2,7 @@ module Gitlab ...@@ -2,7 +2,7 @@ module Gitlab
module Middleware module Middleware
class ReadonlyGeo class ReadonlyGeo
DISALLOWED_METHODS = %w(POST PATCH PUT DELETE) DISALLOWED_METHODS = %w(POST PATCH PUT DELETE)
WHITELISTED = %w(api/v3/internal api/v3/geo/refresh_projects) WHITELISTED = %w(api/v3/internal api/v3/geo/refresh_projects api/v3/geo/refresh_wikis)
def initialize(app) def initialize(app)
@app = app @app = app
......
require 'spec_helper' require 'spec_helper'
describe Gitlab::Geo::UpdateQueue do describe Gitlab::Geo::UpdateQueue do
subject { described_class.new } subject { described_class.new('test_queue') }
let(:dummy_data) { { 'id' => 1, 'clone_url' => 'git@localhost:repo/path.git' } } let(:dummy_data) { { 'id' => 1, 'clone_url' => 'git@localhost:repo/path.git' } }
let(:dummy_data2) { { 'id' => 99, 'clone_url' => 'git@localhost:other_repo/path.git' } } let(:dummy_data2) { { 'id' => 99, 'clone_url' => 'git@localhost:other_repo/path.git' } }
let(:multiple_dummy_data) { [dummy_data, dummy_data2] * 10 } let(:multiple_dummy_data) { [dummy_data, dummy_data2] * 10 }
......
...@@ -69,14 +69,14 @@ describe Gitlab::Geo, lib: true do ...@@ -69,14 +69,14 @@ describe Gitlab::Geo, lib: true do
end end
end end
describe 'notify_update' do describe 'notify_project_update' do
let(:project) { FactoryGirl.build(:project) } let(:project) { FactoryGirl.build(:project) }
it 'delegates to NotifyService' do it 'delegates to NotifyService' do
expect(Geo::EnqueueUpdateService).to receive(:new).with(project).and_call_original expect(Geo::EnqueueProjectUpdateService).to receive(:new).with(project).and_call_original
expect_any_instance_of(Geo::EnqueueUpdateService).to receive(:execute) expect_any_instance_of(Geo::EnqueueProjectUpdateService).to receive(:execute)
described_class.notify_update(project) described_class.notify_project_update(project)
end end
end end
end end
...@@ -147,14 +147,21 @@ describe GeoNode, type: :model do ...@@ -147,14 +147,21 @@ describe GeoNode, type: :model do
end end
end end
describe '#notify_url' do describe '#notify_projects_url' do
let(:refresh_url) { 'https://localhost:3000/gitlab/api/v3/geo/refresh_projects' } let(:refresh_url) { 'https://localhost:3000/gitlab/api/v3/geo/refresh_projects' }
it 'returns api url based on node uri' do it 'returns api url based on node uri' do
expect(new_node.notify_url).to eq(refresh_url) expect(new_node.notify_projects_url).to eq(refresh_url)
end end
end end
describe '#notify_wikis_url' do
let(:refresh_url) { 'https://localhost:3000/gitlab/api/v3/geo/refresh_wikis' }
it 'returns api url based on node uri' do
expect(new_node.notify_wikis_url).to eq(refresh_url)
end
end
describe '#oauth_callback_url' do describe '#oauth_callback_url' do
let(:oauth_callback_url) { 'https://localhost:3000/gitlab/oauth/geo/callback' } let(:oauth_callback_url) { 'https://localhost:3000/gitlab/oauth/geo/callback' }
......
describe Geo::EnqueueUpdateService, service: true do describe Geo::EnqueueProjectUpdateService, service: true do
subject { Geo::EnqueueUpdateService.new(project) } subject { Geo::EnqueueProjectUpdateService.new(project) }
let(:project) { double(:project) } let(:project) { double(:project) }
let(:fake_url) { 'git@localhost:repo/path.git' } let(:fake_url) { 'git@localhost:repo/path.git' }
let(:fake_id) { 999 } let(:fake_id) { 999 }
......
describe Geo::EnqueueWikiUpdateService, service: true do
subject { Geo::EnqueueWikiUpdateService.new(project) }
let(:project) { double(:project) }
let(:fake_url) { 'git@localhost:repo/path.git' }
let(:fake_id) { 999 }
let(:queue) { subject.instance_variable_get(:@queue) }
before(:each) do
queue.empty!
expect(project).to receive_message_chain(:wiki, :url_to_repo) { fake_url }
expect(project).to receive(:id) { fake_id }
end
describe '#execute' do
let(:stored_data) { queue.first }
before(:each) { subject.execute }
it 'persists id and clone_url to redis queue' do
expect(stored_data).to have_key('id')
expect(stored_data).to have_key('clone_url')
end
it 'persisted id is equal to original' do
expect(stored_data['id']).to eq(fake_id)
end
it 'persisted clone_url is equal to original' do
expect(stored_data['clone_url']).to eq(fake_url)
end
end
end
...@@ -22,9 +22,9 @@ describe PostReceive do ...@@ -22,9 +22,9 @@ describe PostReceive do
end end
it "triggers wiki index update" do it "triggers wiki index update" do
allow(Gitlab.config.elasticsearch).to receive(:enabled).and_return(true)
expect(Project).to receive(:find_with_namespace).with(project.path_with_namespace).and_return(project)
expect(Project).to receive(:find_with_namespace).with("#{project.path_with_namespace}.wiki").and_return(nil) expect(Project).to receive(:find_with_namespace).with("#{project.path_with_namespace}.wiki").and_return(nil)
expect(Project).to receive(:find_with_namespace).with(project.path_with_namespace).and_return(project)
allow(Gitlab.config.elasticsearch).to receive(:enabled).and_return(true)
expect_any_instance_of(ProjectWiki).to receive(:index_blobs) expect_any_instance_of(ProjectWiki).to receive(:index_blobs)
repo_path = "#{pwd(project)}.wiki" repo_path = "#{pwd(project)}.wiki"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment