Commit 1cb77f73 authored by Douwe Maan's avatar Douwe Maan

Merge branch 'tc-geo-remove-system-hooks' into 'master'

Remove Geo system hooks

See merge request !2644
parents afa2cb2b 755b3acc
...@@ -48,9 +48,6 @@ class Projects::WikisController < Projects::ApplicationController ...@@ -48,9 +48,6 @@ class Projects::WikisController < Projects::ApplicationController
@page = WikiPages::UpdateService.new(@project, current_user, wiki_params).execute(@page) @page = WikiPages::UpdateService.new(@project, current_user, wiki_params).execute(@page)
if @page.valid? if @page.valid?
# Triggers repository update on secondary nodes when Geo is enabled
Gitlab::Geo.notify_wiki_update(@project) if Gitlab::Geo.primary?
redirect_to( redirect_to(
project_wiki_path(@project, @page), project_wiki_path(@project, @page),
notice: 'Wiki was successfully updated.' notice: 'Wiki was successfully updated.'
...@@ -67,8 +64,6 @@ class Projects::WikisController < Projects::ApplicationController ...@@ -67,8 +64,6 @@ class Projects::WikisController < Projects::ApplicationController
@page = WikiPages::CreateService.new(@project, current_user, wiki_params).execute @page = WikiPages::CreateService.new(@project, current_user, wiki_params).execute
if @page.persisted? if @page.persisted?
# Triggers repository update on secondary nodes when Geo is enabled
Gitlab::Geo.notify_wiki_update(@project) if Gitlab::Geo.primary?
redirect_to( redirect_to(
project_wiki_path(@project, @page), project_wiki_path(@project, @page),
notice: 'Wiki was successfully updated.' notice: 'Wiki was successfully updated.'
......
class GeoNode < ActiveRecord::Base class GeoNode < ActiveRecord::Base
include Presentable include Presentable
include IgnorableColumn
ignore_column :system_hook_id
belongs_to :geo_node_key, dependent: :destroy # rubocop: disable Cop/ActiveRecordDependent belongs_to :geo_node_key, dependent: :destroy # rubocop: disable Cop/ActiveRecordDependent
belongs_to :oauth_application, class_name: 'Doorkeeper::Application', dependent: :destroy # rubocop: disable Cop/ActiveRecordDependent belongs_to :oauth_application, class_name: 'Doorkeeper::Application', dependent: :destroy # rubocop: disable Cop/ActiveRecordDependent
belongs_to :system_hook, dependent: :destroy # rubocop: disable Cop/ActiveRecordDependent
has_many :geo_node_namespace_links has_many :geo_node_namespace_links
has_many :namespaces, through: :geo_node_namespace_links has_many :namespaces, through: :geo_node_namespace_links
...@@ -14,7 +16,7 @@ class GeoNode < ActiveRecord::Base ...@@ -14,7 +16,7 @@ class GeoNode < ActiveRecord::Base
relative_url_root: lambda { Gitlab.config.gitlab.relative_url_root }, relative_url_root: lambda { Gitlab.config.gitlab.relative_url_root },
primary: false primary: false
accepts_nested_attributes_for :geo_node_key, :system_hook accepts_nested_attributes_for :geo_node_key
validates :host, host: true, presence: true, uniqueness: { case_sensitive: false, scope: :port } validates :host, host: true, presence: true, uniqueness: { case_sensitive: false, scope: :port }
validates :primary, uniqueness: { message: 'node already exists' }, if: :primary validates :primary, uniqueness: { message: 'node already exists' }, if: :primary
...@@ -64,18 +66,6 @@ class GeoNode < ActiveRecord::Base ...@@ -64,18 +66,6 @@ class GeoNode < ActiveRecord::Base
self.relative_url_root = new_uri.path != '/' ? new_uri.path : '' self.relative_url_root = new_uri.path != '/' ? new_uri.path : ''
end end
def notify_projects_url
geo_api_url('refresh_projects')
end
def notify_wikis_url
geo_api_url('refresh_wikis')
end
def geo_events_url
geo_api_url('receive_events')
end
def geo_transfers_url(file_type, file_id) def geo_transfers_url(file_type, file_id)
geo_api_url("transfers/#{file_type}/#{file_id}") geo_api_url("transfers/#{file_type}/#{file_id}")
end end
...@@ -198,7 +188,6 @@ class GeoNode < ActiveRecord::Base ...@@ -198,7 +188,6 @@ class GeoNode < ActiveRecord::Base
update_clone_url update_clone_url
else else
update_oauth_application! update_oauth_application!
update_system_hook!
end end
end end
...@@ -221,17 +210,6 @@ class GeoNode < ActiveRecord::Base ...@@ -221,17 +210,6 @@ class GeoNode < ActiveRecord::Base
self.oauth_application.redirect_uri = oauth_callback_url self.oauth_application.redirect_uri = oauth_callback_url
end end
def update_system_hook!
return if self.primary?
self.build_system_hook if system_hook.nil?
self.system_hook.token = SecureRandom.hex(20) unless self.system_hook.token.present?
self.system_hook.url = geo_events_url if uri.present?
self.system_hook.push_events = false
self.system_hook.tag_push_events = false
self.system_hook.repository_update_events = true
end
def expire_cache! def expire_cache!
Gitlab::Geo.expire_cache! Gitlab::Geo.expire_cache!
end end
......
module Geo
class EnqueueWikiUpdateService
attr_reader :project
def initialize(project)
@queue = Gitlab::Geo::UpdateQueue.new('updated_wikis')
@project = project
end
def execute
@queue.store({ id: @project.id, clone_url: @project.wiki.url_to_repo })
end
end
end
...@@ -11,6 +11,10 @@ module Geo ...@@ -11,6 +11,10 @@ module Geo
@new_path_with_namespace = new_path_with_namespace @new_path_with_namespace = new_path_with_namespace
end end
def async_execute
GeoRepositoryMoveWorker.perform_async(id, name, old_path_with_namespace, new_path_with_namespace)
end
def execute def execute
project = Project.find(id) project = Project.find(id)
project.expire_caches_before_rename(old_path_with_namespace) project.expire_caches_before_rename(old_path_with_namespace)
......
module Geo
class NotifyNodesService < BaseNotify
def initialize
@wiki_queue = Gitlab::Geo::UpdateQueue.new('updated_wikis')
end
def execute
process(@wiki_queue, :notify_wikis_url)
end
private
def process(queue, notify_url_method)
return if queue.empty?
projects = queue.fetch_batched_data
content = { projects: projects }.to_json
::Gitlab::Geo.secondary_nodes.each do |node|
next unless node.enabled?
notify_url = node.__send__(notify_url_method.to_sym) # rubocop:disable GitlabSecurity/PublicSend
success, details = notify(notify_url, content)
unless success
Gitlab::Geo::Logger.error(
class: self.class.name,
message: "GitLab failed to notify",
error: details,
node_url: node.url,
notify_url: notify_url)
queue.store_batched_data(projects)
end
end
end
end
end
module Geo
class RepositoryDestroyService
attr_reader :id, :name, :full_path, :storage_name
def initialize(id, name, full_path, storage_name)
@id = id
@name = name
@full_path = full_path
@storage_name = storage_name
end
def async_execute
GeoRepositoryDestroyWorker.perform_async(id, name, full_path, storage_name)
end
def execute
::Projects::DestroyService.new(deleted_project, nil).geo_replicate
end
private
def deleted_project
# We don't have access to the original model anymore, so we are
# rebuilding only what our service class requires
::Geo::DeletedProject.new(id: id,
name: name,
full_path: full_path,
repository_storage: storage_name)
end
end
end
module Geo
class RepositoryUpdateService
include Gitlab::Geo::ProjectLogHelpers
attr_reader :project, :clone_url, :logger
LEASE_TIMEOUT = 1.hour.freeze
LEASE_KEY_PREFIX = 'geo_repository_fetch'.freeze
def initialize(project, clone_url, logger = Rails.logger)
@project = project
@clone_url = clone_url
@logger = logger
end
def execute
try_obtain_lease do
project.create_repository unless project.repository_exists?
project.repository.after_create if project.empty_repo?
project.repository.fetch_geo_mirror(clone_url)
project.repository.expire_all_method_caches
project.repository.expire_branch_cache
project.repository.expire_content_cache
end
rescue Gitlab::Shell::Error => e
log_error('Error fetching repository for project', e)
rescue Gitlab::Git::Repository::NoRepository => e
log_error('Error invalid repository', e)
log_info('Invalidating cache for project')
project.repository.after_create
end
private
def try_obtain_lease
log_info('Trying to obtain lease to sync repository')
repository_lease = Gitlab::ExclusiveLease.new(lease_key, timeout: LEASE_TIMEOUT).try_obtain
unless repository_lease.present?
log_info('Could not obtain lease to sync repository')
return
end
begin
yield
ensure
log_info('Releasing leases to sync repository')
Gitlab::ExclusiveLease.cancel(lease_key, repository_lease)
end
end
def lease_key
@lease_key ||= "#{LEASE_KEY_PREFIX}:#{project.id}"
end
end
end
module Geo
class ScheduleKeyChangeService
attr_reader :id, :key, :action
def initialize(params)
@id = params['id']
@key = params['key']
@action = params['event_name']
end
def execute
GeoKeyRefreshWorker.perform_async(@id, @key, @action)
end
end
end
module Geo
class ScheduleRepoCreateService
attr_reader :id
def initialize(params)
@id = params['project_id']
end
def execute
GeoRepositoryCreateWorker.perform_async(id)
end
end
end
module Geo
class ScheduleRepoDestroyService
attr_reader :id, :name, :path_with_namespace
def initialize(params)
@id = params['project_id']
@name = params['name']
@path_with_namespace = params['path_with_namespace']
end
def execute
GeoRepositoryDestroyWorker.perform_async(id, name, path_with_namespace)
end
end
end
module Geo
class ScheduleRepoFetchService
def initialize(params)
@project_id = params[:project_id]
@remote_url = params[:project][:git_ssh_url]
end
def execute
GeoRepositoryFetchWorker.perform_async(@project_id, @remote_url)
end
end
end
module Geo
class ScheduleRepoMoveService
attr_reader :id, :name, :old_path_with_namespace, :path_with_namespace
def initialize(params)
@id = params['project_id']
@name = params['name']
@old_path_with_namespace = params['old_path_with_namespace']
@path_with_namespace = params['path_with_namespace']
end
def execute
GeoRepositoryMoveWorker.perform_async(id, name, old_path_with_namespace, path_with_namespace)
end
end
end
module Geo
class ScheduleRepoUpdateService
attr_reader :id, :clone_url, :push_data
def initialize(params)
@id = params[:project_id]
@clone_url = params[:project][:git_ssh_url]
@push_data = { 'type' => params[:object_kind], 'before' => params[:before],
'after' => params[:newref], 'ref' => params[:ref] }
end
def execute
GeoRepositoryUpdateWorker.perform_async(@id, @clone_url, @push_data)
end
end
end
module Geo
class ScheduleWikiRepoUpdateService
attr_reader :projects
def initialize(projects)
@projects = projects
end
def execute
@projects.each do |project|
next unless Gitlab::Geo.current_node&.projects_include?(project['id'].to_i)
GeoWikiRepositoryUpdateWorker.perform_async(project['id'], project['clone_url'])
end
end
end
end
...@@ -5,6 +5,11 @@ ...@@ -5,6 +5,11 @@
%p.light %p.light
With #{link_to 'GitLab Geo', help_page_path('gitlab-geo/README'), class: 'vlink'} you can install a special With #{link_to 'GitLab Geo', help_page_path('gitlab-geo/README'), class: 'vlink'} you can install a special
read-only and replicated instance anywhere. read-only and replicated instance anywhere.
Before you add nodes, follow the
#{link_to 'Setup instructions', help_page_path('gitlab-geo/README', anchor: 'setup-instructions'), class: 'vlink' }
in the
%strong exact order
they appear.
%hr %hr
......
require 'active_support/concern'
module GeoDynamicBackoff
extend ActiveSupport::Concern
included do
sidekiq_options retry: 55
sidekiq_retry_in do |count|
count <= 30 ? linear_backoff_strategy(count) : geometric_backoff_strategy(count)
end
end
class_methods do
private
def linear_backoff_strategy(count)
rand(1..20) + count
end
def geometric_backoff_strategy(count)
# This strategy is based on the original one from sidekiq
count = count - 30 # we must start counting after 30
(count**4) + 15 + (rand(30) * (count + 1))
end
end
end
class GeoBulkNotifyWorker
include Sidekiq::Worker
include CronjobQueue
def perform
Geo::NotifyNodesService.new.execute
end
end
class GeoKeyRefreshWorker
include Sidekiq::Worker
include ::GeoDynamicBackoff
include GeoQueue
def perform(key_id, key, action)
action = action.to_sym
case action
when :key_create
# ActiveRecord::RecordNotFound when not found (so job will retry)
key = Key.find(key_id)
key.add_to_shell
when :key_destroy
# we are physically removing the key after model is removed
# so we must reconstruct ids to schedule removal
key = Key.new(id: key_id, key: key)
key.remove_from_shell
else
raise "Invalid action: #{action}"
end
end
end
class GeoRepositoryCreateWorker
include Sidekiq::Worker
include GeoQueue
def perform(id)
project = Project.find(id)
project.ensure_storage_path_exists
project.create_repository unless project.repository_exists? || project.import?
end
end
...@@ -3,29 +3,7 @@ class GeoRepositoryDestroyWorker ...@@ -3,29 +3,7 @@ class GeoRepositoryDestroyWorker
include GeoQueue include GeoQueue
include Gitlab::ShellAdapter include Gitlab::ShellAdapter
def perform(id, name, full_path) def perform(id, name, full_path, storage_name)
repository_storage = probe_repository_storage(full_path) Geo::RepositoryDestroyService.new(id, name, full_path, storage_name).execute
# We don't have access to the original model anymore, so we are
# rebuilding only what our service class requires
project = ::Geo::DeletedProject.new(id: id, name: name, full_path: full_path, repository_storage: repository_storage)
::Projects::DestroyService.new(project, nil).geo_replicate
end
private
# Detect in which repository_storage the project repository is stored.
#
# As we don't have access to `repository_storage` from the data in the Hook notification
# we need to probe on all existing ones.
#
# if we don't find it means it has already been deleted and we just return
def probe_repository_storage(repo_path)
Gitlab.config.repositories.storages.each do |repository_storage, rs_data|
return repository_storage if gitlab_shell.exists?(rs_data['path'], repo_path + '.git')
end
nil
end end
end end
class GeoRepositoryFetchWorker
include Sidekiq::Worker
include ::GeoDynamicBackoff
include GeoQueue
include Gitlab::ShellAdapter
sidekiq_options queue: 'geo_repository_update'
def perform(project_id, clone_url)
project = Project.find(project_id)
Geo::RepositoryUpdateService.new(project, clone_url, logger).execute
end
end
class GeoRepositoryUpdateWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
attr_accessor :project
def perform(project_id, _clone_url, push_data = nil)
@project = Project.find(project_id)
@push_data = push_data
process_hooks if push_data # we should be compatible with old unprocessed data
end
private
def fetch_repository(remote_url)
@project.create_repository unless @project.repository_exists?
@project.repository.after_create if @project.empty_repo?
@project.repository.fetch_geo_mirror(remote_url)
end
def process_hooks
if @push_data['type'] == 'push'
branch = Gitlab::Git.ref_name(@push_data['ref'])
process_push(branch)
end
end
def process_push(branch)
@project.repository.after_push_commit(branch)
if push_remove_branch?
@project.repository.after_remove_branch
elsif push_to_new_branch?
@project.repository.after_create_branch
end
ProjectCacheWorker.perform_async(@project.id)
end
def push_remove_branch?
Gitlab::Git.branch_ref?(@push_data['ref']) && Gitlab::Git.blank_ref?(@push_data['after'])
end
def push_to_new_branch?
Gitlab::Git.branch_ref?(@push_data['ref']) && Gitlab::Git.blank_ref?(@push_data['before'])
end
end
class GeoWikiRepositoryUpdateWorker
include Sidekiq::Worker
include Gitlab::ShellAdapter
include GeoQueue
attr_accessor :project
def perform(project_id, clone_url)
@project = Project.find(project_id)
fetch_repository(clone_url)
end
private
def fetch_repository(remote_url)
# Second .wiki call returns a Gollum::Wiki, and it will always create the physical repository when not found
if @project.wiki.wiki.exist?
@project.wiki.repository.fetch_geo_mirror(remote_url)
end
end
end
---
title: Improve copy so users will set up SSH from DB for Geo
merge_request: 2644
author:
type: changed
...@@ -219,11 +219,6 @@ production: &base ...@@ -219,11 +219,6 @@ production: &base
ldap_sync_worker: ldap_sync_worker:
cron: "30 1 * * *" cron: "30 1 * * *"
# GitLab Geo nodes notification worker
# NOTE: This will only take effect if Geo is enabled
geo_bulk_notify_worker:
cron: "*/10 * * * * *"
# GitLab Geo repository sync worker # GitLab Geo repository sync worker
# NOTE: This will only take effect if Geo is enabled # NOTE: This will only take effect if Geo is enabled
geo_repository_sync_worker: geo_repository_sync_worker:
......
...@@ -425,9 +425,6 @@ Settings.cron_jobs['ldap_sync_worker']['job_class'] = 'LdapSyncWorker' ...@@ -425,9 +425,6 @@ Settings.cron_jobs['ldap_sync_worker']['job_class'] = 'LdapSyncWorker'
Settings.cron_jobs['ldap_group_sync_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['ldap_group_sync_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['ldap_group_sync_worker']['cron'] ||= '0 * * * *' Settings.cron_jobs['ldap_group_sync_worker']['cron'] ||= '0 * * * *'
Settings.cron_jobs['ldap_group_sync_worker']['job_class'] = 'LdapAllGroupsSyncWorker' Settings.cron_jobs['ldap_group_sync_worker']['job_class'] = 'LdapAllGroupsSyncWorker'
Settings.cron_jobs['geo_bulk_notify_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_bulk_notify_worker']['cron'] ||= '*/10 * * * * *'
Settings.cron_jobs['geo_bulk_notify_worker']['job_class'] ||= 'GeoBulkNotifyWorker'
Settings.cron_jobs['geo_repository_sync_worker'] ||= Settingslogic.new({}) Settings.cron_jobs['geo_repository_sync_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_repository_sync_worker']['cron'] ||= '*/5 * * * *' Settings.cron_jobs['geo_repository_sync_worker']['cron'] ||= '*/5 * * * *'
Settings.cron_jobs['geo_repository_sync_worker']['job_class'] ||= 'Geo::RepositorySyncWorker' Settings.cron_jobs['geo_repository_sync_worker']['job_class'] ||= 'Geo::RepositorySyncWorker'
......
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class RemoveSystemHookFromGeoNodes < ActiveRecord::Migration
DOWNTIME = false
def up
execute <<-EOF.strip_heredoc
DELETE FROM web_hooks
WHERE id IN (
SELECT system_hook_id
FROM geo_nodes
);
EOF
remove_reference :geo_nodes, :system_hook
end
def down
add_column :geo_nodes, :system_hook_id, :integer
end
end
...@@ -683,7 +683,6 @@ ActiveRecord::Schema.define(version: 20170824162758) do ...@@ -683,7 +683,6 @@ ActiveRecord::Schema.define(version: 20170824162758) do
t.boolean "primary" t.boolean "primary"
t.integer "geo_node_key_id" t.integer "geo_node_key_id"
t.integer "oauth_application_id" t.integer "oauth_application_id"
t.integer "system_hook_id"
t.boolean "enabled", default: true, null: false t.boolean "enabled", default: true, null: false
t.string "access_key" t.string "access_key"
t.string "encrypted_secret_access_key" t.string "encrypted_secret_access_key"
......
...@@ -53,13 +53,16 @@ screen. ...@@ -53,13 +53,16 @@ screen.
## Communication ## Communication
Previous implementation (GitLab =< 8.6.x) used custom code to handle ### Custom code (GitLab 8.6 and earlier)
In GitLab versions before 8.6 custom code is used to handle
notification from **Primary** to **Secondary** by HTTP requests. notification from **Primary** to **Secondary** by HTTP requests.
We decided to move away from custom code and integrate by using ### System hooks (GitLab 8.7 till 9.5)
**System Webhooks**, as we have more people using them, so any
improvements we make to this communication layer, many other will Later was decided to move away from custom code and integrate by using
benefit from. **System Webhooks**. More people are using them, so many would benefit from
improvements made to this communication layer.
There is a specific **internal** endpoint in our api code (Grape), There is a specific **internal** endpoint in our api code (Grape),
that receives all requests from this System Hooks: that receives all requests from this System Hooks:
...@@ -67,6 +70,13 @@ that receives all requests from this System Hooks: ...@@ -67,6 +70,13 @@ that receives all requests from this System Hooks:
We switch and filter from each event by the `event_name` field. We switch and filter from each event by the `event_name` field.
### Geo Log Cursor (GitLab 10.0 and up)
Since GitLab 10.0, **System Webhooks** are no longer used, and Geo Log
Cursor is used instead. The Log Cursor traverses the `Geo::EventLog`
to see if there are changes since the last time the log was checked
and will handle repository updates, deletes, changes & renames.
## Readonly ## Readonly
......
...@@ -92,8 +92,8 @@ If you installed GitLab using the Omnibus packages (highly recommended): ...@@ -92,8 +92,8 @@ If you installed GitLab using the Omnibus packages (highly recommended):
secondary node for the moment. secondary node for the moment.
1. [Upload the GitLab License](../user/admin_area/license.md) to the **primary** Geo Node to unlock GitLab Geo. 1. [Upload the GitLab License](../user/admin_area/license.md) to the **primary** Geo Node to unlock GitLab Geo.
1. [Setup the database replication](database.md) (`primary (read-write) <-> secondary (read-only)` topology). 1. [Setup the database replication](database.md) (`primary (read-write) <-> secondary (read-only)` topology).
1. [Configure GitLab](configuration.md) to set the primary and secondary nodes.
1. [Configure SSH authorizations to use the database](ssh.md) 1. [Configure SSH authorizations to use the database](ssh.md)
1. [Configure GitLab](configuration.md) to set the primary and secondary nodes.
1. Optional: [Configure a secondary LDAP server](../administration/auth/ldap.md) for the secondary. See [notes on LDAP](#ldap). 1. Optional: [Configure a secondary LDAP server](../administration/auth/ldap.md) for the secondary. See [notes on LDAP](#ldap).
1. [Follow the after setup steps](after_setup.md). 1. [Follow the after setup steps](after_setup.md).
...@@ -108,6 +108,7 @@ If you installed GitLab from source: ...@@ -108,6 +108,7 @@ If you installed GitLab from source:
else in the secondary node for the moment. else in the secondary node for the moment.
1. [Upload the GitLab License](../user/admin_area/license.md) you purchased for GitLab Enterprise Edition to unlock GitLab Geo. 1. [Upload the GitLab License](../user/admin_area/license.md) you purchased for GitLab Enterprise Edition to unlock GitLab Geo.
1. [Setup the database replication](database_source.md) (`primary (read-write) <-> secondary (read-only)` topology). 1. [Setup the database replication](database_source.md) (`primary (read-write) <-> secondary (read-only)` topology).
1. [Configure SSH authorizations to use the database](ssh.md)
1. [Configure GitLab](configuration_source.md) to set the primary and secondary 1. [Configure GitLab](configuration_source.md) to set the primary and secondary
nodes. nodes.
1. [Follow the after setup steps](after_setup.md). 1. [Follow the after setup steps](after_setup.md).
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
By default, GitLab manages an `authorized_keys` file, which contains all the By default, GitLab manages an `authorized_keys` file, which contains all the
public SSH keys for users allowed to access GitLab. However, to maintain a public SSH keys for users allowed to access GitLab. However, to maintain a
single source of truth, Geo needs to be configured to peform SSH fingerprint single source of truth, Geo needs to be configured to perform SSH fingerprint
lookups via database lookup. This approach is also much faster than scanning a lookups via database lookup. This approach is also much faster than scanning a
file. file.
......
...@@ -18,9 +18,6 @@ module EE ...@@ -18,9 +18,6 @@ module EE
if ::Gitlab::Geo.primary? if ::Gitlab::Geo.primary?
# Create wiki repository updated event on Geo event log # Create wiki repository updated event on Geo event log
::Geo::RepositoryUpdatedEventStore.new(project, source: Geo::RepositoryUpdatedEvent::WIKI).create ::Geo::RepositoryUpdatedEventStore.new(project, source: Geo::RepositoryUpdatedEvent::WIKI).create
# Triggers repository update on secondary nodes
::Gitlab::Geo.notify_wiki_update(project)
end end
end end
end end
......
...@@ -24,9 +24,6 @@ module EE ...@@ -24,9 +24,6 @@ module EE
if ::Gitlab::Geo.enabled? if ::Gitlab::Geo.enabled?
# Create wiki repository updated event on Geo event log # Create wiki repository updated event on Geo event log
::Geo::RepositoryUpdatedEventStore.new(post_received.project, source: Geo::RepositoryUpdatedEvent::WIKI).create ::Geo::RepositoryUpdatedEventStore.new(post_received.project, source: Geo::RepositoryUpdatedEvent::WIKI).create
# Triggers repository update on secondary nodes
::Gitlab::Geo.notify_wiki_update(post_received.project)
end end
end end
......
...@@ -38,57 +38,6 @@ module API ...@@ -38,57 +38,6 @@ module API
present GeoNodeStatus.new(id: Gitlab::Geo.current_node.id), with: Entities::GeoNodeStatus present GeoNodeStatus.new(id: Gitlab::Geo.current_node.id), with: Entities::GeoNodeStatus
end end
# Enqueue a batch of IDs of wiki's projects to have their
# wiki repositories updated
#
# Example request:
# POST /geo/refresh_wikis
post 'refresh_wikis' do
authenticated_as_admin!
require_node_to_be_enabled!
required_attributes! [:projects]
::Geo::ScheduleWikiRepoUpdateService.new(params[:projects]).execute
end
# Receive event streams from primary and enqueue changes
#
# Example request:
# POST /geo/receive_events
post 'receive_events' do
authenticate_by_gitlab_geo_token!
require_node_to_be_enabled!
check_node_restricted_project_ids!
required_attributes! %w(event_name)
case params['event_name']
when 'key_create', 'key_destroy'
required_attributes! %w(key id)
::Geo::ScheduleKeyChangeService.new(params).execute
when 'repository_update'
required_attributes! %w(event_name project_id project)
::Geo::ScheduleRepoFetchService.new(params).execute
when 'push'
required_attributes! %w(event_name project_id project)
::Geo::ScheduleRepoUpdateService.new(params).execute
when 'tag_push'
required_attributes! %w(event_name project_id project)
::Geo::ScheduleWikiRepoUpdateService.new(params).execute
when 'project_create'
required_attributes! %w(event_name project_id)
::Geo::ScheduleRepoCreateService.new(params).execute
when 'project_destroy'
required_attributes! %w(event_name project_id path_with_namespace)
::Geo::ScheduleRepoDestroyService.new(params).execute
when 'project_rename'
required_attributes! %w(event_name project_id path_with_namespace old_path_with_namespace)
::Geo::ScheduleRepoMoveService.new(params).execute
when 'project_transfer'
required_attributes! %w(event_name project_id path_with_namespace old_path_with_namespace)
::Geo::ScheduleRepoMoveService.new(params).execute
end
end
end end
helpers do helpers do
...@@ -111,14 +60,6 @@ module API ...@@ -111,14 +60,6 @@ module API
def require_node_to_be_secondary! def require_node_to_be_secondary!
forbidden! 'Geo node is not secondary node.' unless Gitlab::Geo.current_node&.secondary? forbidden! 'Geo node is not secondary node.' unless Gitlab::Geo.current_node&.secondary?
end end
def check_node_restricted_project_ids!
return unless params.key?(:project_id)
unless Gitlab::Geo.current_node&.projects_include?(params[:project_id].to_i)
not_found!
end
end
end end
end end
end end
...@@ -161,13 +161,6 @@ module API ...@@ -161,13 +161,6 @@ module API
end end
end end
def authenticate_by_gitlab_geo_token!
token = headers['X-Gitlab-Token'].try(:chomp)
unless token && Devise.secure_compare(geo_token, token)
unauthorized!
end
end
def authenticated_as_admin! def authenticated_as_admin!
authenticate! authenticate!
forbidden! unless current_user.admin? forbidden! unless current_user.admin?
......
...@@ -11,7 +11,6 @@ module Gitlab ...@@ -11,7 +11,6 @@ module Gitlab
geo_oauth_application geo_oauth_application
).freeze ).freeze
PRIMARY_JOBS = %i(bulk_notify_job).freeze
SECONDARY_JOBS = %i(repository_sync_job file_download_job).freeze SECONDARY_JOBS = %i(repository_sync_job file_download_job).freeze
def self.current_node def self.current_node
...@@ -66,14 +65,6 @@ module Gitlab ...@@ -66,14 +65,6 @@ module Gitlab
GeoNode.where(host: host, port: port).exists? GeoNode.where(host: host, port: port).exists?
end end
def self.notify_wiki_update(project)
::Geo::EnqueueWikiUpdateService.new(project).execute
end
def self.bulk_notify_job
Sidekiq::Cron::Job.find('geo_bulk_notify_worker')
end
def self.repository_sync_job def self.repository_sync_job
Sidekiq::Cron::Job.find('geo_repository_sync_worker') Sidekiq::Cron::Job.find('geo_repository_sync_worker')
end end
...@@ -93,7 +84,6 @@ module Gitlab ...@@ -93,7 +84,6 @@ module Gitlab
end end
def self.disable_all_geo_jobs! def self.disable_all_geo_jobs!
PRIMARY_JOBS.each { |job| self.__send__(job).try(:disable!) } # rubocop:disable GitlabSecurity/PublicSend
SECONDARY_JOBS.each { |job| self.__send__(job).try(:disable!) } # rubocop:disable GitlabSecurity/PublicSend SECONDARY_JOBS.each { |job| self.__send__(job).try(:disable!) } # rubocop:disable GitlabSecurity/PublicSend
end end
......
...@@ -128,14 +128,14 @@ module Gitlab ...@@ -128,14 +128,14 @@ module Gitlab
def handle_repository_delete(event) def handle_repository_delete(event)
deleted_event = event.repository_deleted_event deleted_event = event.repository_deleted_event
# Once we remove system hooks we can refactor
# GeoRepositoryDestroyWorker to avoid doing this
full_path = File.join(deleted_event.repository_storage_path, full_path = File.join(deleted_event.repository_storage_path,
deleted_event.deleted_path) deleted_event.deleted_path)
job_id = ::GeoRepositoryDestroyWorker.perform_async( job_id = ::Geo::RepositoryDestroyService
deleted_event.project_id, .new(deleted_event.project_id,
deleted_event.deleted_project_name, deleted_event.deleted_project_name,
full_path) full_path,
deleted_event.repository_storage_name)
.async_execute
log_event_info(event.created_at, log_event_info(event.created_at,
message: "Deleted project", message: "Deleted project",
project_id: deleted_event.project_id, project_id: deleted_event.project_id,
...@@ -164,8 +164,9 @@ module Gitlab ...@@ -164,8 +164,9 @@ module Gitlab
old_path = renamed_event.old_path_with_namespace old_path = renamed_event.old_path_with_namespace
new_path = renamed_event.new_path_with_namespace new_path = renamed_event.new_path_with_namespace
job_id = ::GeoRepositoryMoveWorker.perform_async( job_id = ::Geo::MoveRepositoryService
renamed_event.project_id, "", old_path, new_path) .new(renamed_event.project_id, "", old_path, new_path)
.async_execute
log_event_info(event.created_at, log_event_info(event.created_at,
message: "Renaming project", message: "Renaming project",
......
module Gitlab
module Geo
class UpdateQueue
BATCH_SIZE = 250
NAMESPACE = 'geo:gitlab'.freeze
def initialize(queue)
@queue = queue
end
def store(data)
redis.rpush(@queue, data.to_json)
expire_queue_size!
end
def first
data = fetch(0, 0)
data.first unless data.empty?
end
def last
data = fetch(-1, -1)
data.first unless data.empty?
end
def fetch_batched_data
projects = []
bsize = batch_size
redis.multi do
projects = redis.lrange(@queue, 0, bsize - 1)
redis.ltrim(@queue, bsize, -1)
end
expire_queue_size!
deserialize(projects.value)
end
def store_batched_data(projects)
redis.pipelined do
projects.reverse_each do |project|
# enqueue again to the head of the queue
redis.lpush(@queue, project.to_json)
end
end
expire_queue_size!
end
def batch_size
queue_size > BATCH_SIZE ? BATCH_SIZE : queue_size
end
def queue_size
@queue_size ||= fetch_queue_size
end
def empty?
queue_size == 0
end
def empty!
redis.del(@queue)
end
protected
def fetch(start, stop)
deserialize(redis.lrange(@queue, start, stop))
end
def fetch_queue_size
redis.llen(@queue)
end
def expire_queue_size!
@queue_size = nil
end
def deserialize(data)
data.map! { |item| JSON.parse(item) } unless data.empty?
data
end
def redis
self.class.redis
end
def self.redis_connection
::Redis::Namespace.new(NAMESPACE, redis: ::Redis.new(url: Gitlab::Redis::SharedState.url))
end
def self.redis
@redis ||= redis_connection
end
end
end
end
...@@ -7,7 +7,7 @@ module Gitlab ...@@ -7,7 +7,7 @@ module Gitlab
def initialize(app) def initialize(app)
@app = app @app = app
@whitelisted = internal_routes + geo_routes @whitelisted = internal_routes
end end
def call(env) def call(env)
...@@ -36,11 +36,6 @@ module Gitlab ...@@ -36,11 +36,6 @@ module Gitlab
API_VERSIONS.flat_map { |version| "api/v#{version}/internal" } API_VERSIONS.flat_map { |version| "api/v#{version}/internal" }
end end
def geo_routes
geo_routes = %w(refresh_wikis receive_events)
API_VERSIONS.flat_map { |version| geo_routes.map { |route| "api/v#{version}/geo/#{route}" } }
end
def disallowed_request? def disallowed_request?
DISALLOWED_METHODS.include?(@env['REQUEST_METHOD']) && !whitelisted_routes DISALLOWED_METHODS.include?(@env['REQUEST_METHOD']) && !whitelisted_routes
end end
......
require "spec_helper"
require Rails.root.join("db", "post_migrate", "20170811082658_remove_system_hook_from_geo_nodes.rb")
describe RemoveSystemHookFromGeoNodes, :migration do
let(:geo_nodes) { table(:geo_nodes) }
before do
allow_any_instance_of(WebHookService).to receive(:execute)
create(:system_hook)
geo_nodes.create! attributes_for(:geo_node, :primary, :current)
geo_nodes.create! attributes_for(:geo_node, system_hook_id: create(:system_hook).id)
end
it 'destroy all system hooks for secondary nodes' do
expect do
migrate!
end.to change { SystemHook.count }.by(-1)
end
end
...@@ -30,12 +30,6 @@ describe WikiPages::CreateService do ...@@ -30,12 +30,6 @@ describe WikiPages::CreateService do
service.execute service.execute
end end
it 'triggers wiki update on secondary nodes' do
expect(Gitlab::Geo).to receive(:notify_wiki_update).with(instance_of(Project))
service.execute
end
end end
end end
end end
...@@ -23,12 +23,6 @@ describe WikiPages::DestroyService do ...@@ -23,12 +23,6 @@ describe WikiPages::DestroyService do
service.execute(page) service.execute(page)
end end
it 'triggers wiki update on secondary nodes' do
expect(Gitlab::Geo).to receive(:notify_wiki_update).with(instance_of(Project))
service.execute(page)
end
end end
end end
end end
...@@ -31,12 +31,6 @@ describe WikiPages::UpdateService do ...@@ -31,12 +31,6 @@ describe WikiPages::UpdateService do
service.execute(page) service.execute(page)
end end
it 'triggers wiki update on secondary nodes' do
expect(Gitlab::Geo).to receive(:notify_wiki_update).with(instance_of(Project))
service.execute(page)
end
end end
end end
end end
...@@ -76,7 +76,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do ...@@ -76,7 +76,7 @@ describe Gitlab::Geo::LogCursor::Daemon, :postgresql do
repository_deleted_event.deleted_path) repository_deleted_event.deleted_path)
expect(::GeoRepositoryDestroyWorker).to receive(:perform_async) expect(::GeoRepositoryDestroyWorker).to receive(:perform_async)
.with(project_id, project_name, full_path) .with(project_id, project_name, full_path, project.repository_storage)
subject.run! subject.run!
end end
......
require 'spec_helper'
describe Gitlab::Geo::UpdateQueue do
subject { described_class.new('test_queue') }
let(:dummy_data) { { 'id' => 1, 'clone_url' => 'git@localhost:repo/path.git' } }
let(:dummy_data2) { { 'id' => 99, 'clone_url' => 'git@localhost:other_repo/path.git' } }
let(:multiple_dummy_data) { [dummy_data, dummy_data2] * 10 }
before do
subject.empty!
end
describe '#store' do
before do
subject.store(dummy_data)
end
it 'stores data to the queue' do
expect(subject).not_to be_empty
end
it 'stored data is equal to original' do
expect(subject.first).to eq(dummy_data)
end
end
context 'when queue has elements' do
before do
subject.store(dummy_data)
subject.store(dummy_data2)
end
describe '#first' do
it { expect(subject.first).to eq(dummy_data) }
end
describe '#last' do
it { expect(subject.last).to eq(dummy_data2) }
end
end
describe '#fetch_batched_data' do
before do
subject.store_batched_data(multiple_dummy_data)
end
it 'returns same stored data' do
expect(subject.fetch_batched_data).to eq(multiple_dummy_data)
end
end
describe '#store_batched_data' do
let(:ordered_data) { [{ 'a' => 1 }, { 'a' => 2 }, { 'a' => 3 }, { 'a' => 4 }, { 'a' => 5 }] }
it 'stores multiple items to the queue' do
expect { subject.store_batched_data(multiple_dummy_data) }.to change { subject.batch_size }.by(multiple_dummy_data.size)
end
it 'returns data in equal order to original' do
subject.store_batched_data(ordered_data)
expect(subject.first).to eq(ordered_data.first)
expect(subject.last).to eq(ordered_data.last)
end
end
describe '#batch_size' do
before do
allow(subject).to receive(:queue_size) { queue_size }
end
context 'when queue size is smaller than BATCH_SIZE' do
let(:queue_size) { described_class::BATCH_SIZE - 20 }
it 'equals to the queue size' do
expect(subject.batch_size).to eq(queue_size)
end
end
context 'when queue size is bigger than BATCH_SIZE' do
let(:queue_size) { described_class::BATCH_SIZE + 20 }
it 'equals to the BATCH_SIZE' do
expect(subject.batch_size).to eq(described_class::BATCH_SIZE)
end
end
end
describe '#queue_size' do
it 'returns the ammount of items in queue' do
expect { subject.store(dummy_data) }.to change { subject.queue_size }.by(1)
end
end
describe '#empty?' do
it 'returns true when empty' do
is_expected.to be_empty
end
it 'returns false when there are enqueue data' do
subject.store(dummy_data)
is_expected.not_to be_empty
end
end
end
...@@ -176,7 +176,7 @@ describe Gitlab::Geo do ...@@ -176,7 +176,7 @@ describe Gitlab::Geo do
end end
describe '.configure_cron_jobs!' do describe '.configure_cron_jobs!' do
JOBS = %w(ldap_test geo_bulk_notify_worker geo_repository_sync_worker geo_file_download_dispatch_worker).freeze JOBS = %w(ldap_test geo_repository_sync_worker geo_file_download_dispatch_worker).freeze
def init_cron_job(job_name, class_name) def init_cron_job(job_name, class_name)
job = Sidekiq::Cron::Job.new( job = Sidekiq::Cron::Job.new(
...@@ -202,7 +202,6 @@ describe Gitlab::Geo do ...@@ -202,7 +202,6 @@ describe Gitlab::Geo do
described_class.configure_cron_jobs! described_class.configure_cron_jobs!
expect(described_class.bulk_notify_job).to be_enabled
expect(described_class.repository_sync_job).not_to be_enabled expect(described_class.repository_sync_job).not_to be_enabled
expect(described_class.file_download_job).not_to be_enabled expect(described_class.file_download_job).not_to be_enabled
expect(Sidekiq::Cron::Job.find('ldap_test')).to be_enabled expect(Sidekiq::Cron::Job.find('ldap_test')).to be_enabled
...@@ -215,7 +214,6 @@ describe Gitlab::Geo do ...@@ -215,7 +214,6 @@ describe Gitlab::Geo do
described_class.configure_cron_jobs! described_class.configure_cron_jobs!
expect(Sidekiq::Cron::Job.find('ldap_test')).not_to be_enabled expect(Sidekiq::Cron::Job.find('ldap_test')).not_to be_enabled
expect(described_class.bulk_notify_job).not_to be_enabled
expect(described_class.repository_sync_job).to be_enabled expect(described_class.repository_sync_job).to be_enabled
expect(described_class.file_download_job).to be_enabled expect(described_class.file_download_job).to be_enabled
end end
...@@ -226,7 +224,6 @@ describe Gitlab::Geo do ...@@ -226,7 +224,6 @@ describe Gitlab::Geo do
described_class.configure_cron_jobs! described_class.configure_cron_jobs!
expect(described_class.bulk_notify_job).not_to be_enabled
expect(described_class.repository_sync_job).not_to be_enabled expect(described_class.repository_sync_job).not_to be_enabled
expect(described_class.file_download_job).not_to be_enabled expect(described_class.file_download_job).not_to be_enabled
expect(Sidekiq::Cron::Job.find('ldap_test')).to be_enabled expect(Sidekiq::Cron::Job.find('ldap_test')).to be_enabled
......
...@@ -65,12 +65,12 @@ describe Gitlab::Middleware::ReadonlyGeo do ...@@ -65,12 +65,12 @@ describe Gitlab::Middleware::ReadonlyGeo do
expect(subject).to disallow_request expect(subject).to disallow_request
end end
it 'expects a POST Geo request to be allowed after a disallowed request' do it 'expects a internal POST request to be allowed after a disallowed request' do
response = request.post('/test_request') response = request.post('/test_request')
expect(response).to be_a_redirect expect(response).to be_a_redirect
response = request.post("/api/#{API::API.version}/geo/refresh_wikis") response = request.post("/api/#{API::API.version}/internal")
expect(response).not_to be_a_redirect expect(response).not_to be_a_redirect
end end
...@@ -97,8 +97,8 @@ describe Gitlab::Middleware::ReadonlyGeo do ...@@ -97,8 +97,8 @@ describe Gitlab::Middleware::ReadonlyGeo do
expect(subject).not_to disallow_request expect(subject).not_to disallow_request
end end
it 'expects a POST Geo request to be allowed' do it 'expects a GET status request to be allowed' do
response = request.post("/api/#{API::API.version}/geo/refresh_wikis") response = request.get("/api/#{API::API.version}/geo/status")
expect(response).not_to be_a_redirect expect(response).not_to be_a_redirect
expect(subject).not_to disallow_request expect(subject).not_to disallow_request
......
...@@ -49,19 +49,6 @@ describe GeoNode, type: :model do ...@@ -49,19 +49,6 @@ describe GeoNode, type: :model do
end end
end end
context 'system hooks' do
it 'primary does not create a system hook' do
expect(primary_node.system_hook).to be_nil
end
it 'secondary creates a system hook with repository update events' do
hook = new_node.system_hook
expect(hook.push_events).to be_falsey
expect(hook.tag_push_events).to be_falsey
expect(hook.repository_update_events).to be_truthy
end
end
context 'prevent locking yourself out' do context 'prevent locking yourself out' do
subject do subject do
GeoNode.new(host: Gitlab.config.gitlab.host, GeoNode.new(host: Gitlab.config.gitlab.host,
...@@ -97,19 +84,6 @@ describe GeoNode, type: :model do ...@@ -97,19 +84,6 @@ describe GeoNode, type: :model do
expect(node.oauth_application).to be_persisted expect(node.oauth_application).to be_persisted
end end
it 'has a system_hook if it is a secondary node' do
expect(node.system_hook).to be_present
end
it 'generated system_hook has required attributes' do
expect(node.system_hook.url).to be_present
expect(node.system_hook.url).to eq(node.geo_events_url)
expect(node.system_hook.token).to be_present
expect(node.system_hook.push_events).to be_falsey
expect(node.system_hook.tag_push_events).to be_falsey
expect(node.system_hook.repository_update_events).to be_truthy
end
context 'when is a primary node' do context 'when is a primary node' do
it 'has no oauth_application' do it 'has no oauth_application' do
expect(primary_node.oauth_application).not_to be_present expect(primary_node.oauth_application).not_to be_present
...@@ -232,30 +206,6 @@ describe GeoNode, type: :model do ...@@ -232,30 +206,6 @@ describe GeoNode, type: :model do
end end
end end
describe '#notify_projects_url' do
let(:refresh_url) { "https://localhost:3000/gitlab/api/#{api_version}/geo/refresh_projects" }
it 'returns api url based on node uri' do
expect(new_node.notify_projects_url).to eq(refresh_url)
end
end
describe '#notify_wikis_url' do
let(:refresh_url) { "https://localhost:3000/gitlab/api/#{api_version}/geo/refresh_wikis" }
it 'returns api url based on node uri' do
expect(new_node.notify_wikis_url).to eq(refresh_url)
end
end
describe '#geo_events_url' do
let(:events_url) { "https://localhost:3000/gitlab/api/#{api_version}/geo/receive_events" }
it 'returns api url based on node uri' do
expect(new_node.geo_events_url).to eq(events_url)
end
end
describe '#geo_transfers_url' do describe '#geo_transfers_url' do
let(:transfers_url) { "https://localhost:3000/gitlab/api/#{api_version}/geo/transfers/lfs/1" } let(:transfers_url) { "https://localhost:3000/gitlab/api/#{api_version}/geo/transfers/lfs/1" }
......
...@@ -15,161 +15,6 @@ describe API::Geo do ...@@ -15,161 +15,6 @@ describe API::Geo do
allow(Gitlab::Geo).to receive(:current_node) { secondary_node } allow(Gitlab::Geo).to receive(:current_node) { secondary_node }
end end
describe 'POST /geo/receive_events authentication' do
it 'denies access if token is not present' do
post api('/geo/receive_events')
expect(response).to have_http_status(401)
end
it 'denies access if token is invalid' do
post api('/geo/receive_events'), nil, { 'X-Gitlab-Token' => 'nothing' }
expect(response).to have_http_status(401)
end
end
describe 'POST /geo/refresh_wikis disabled node' do
it 'responds with forbidden' do
secondary_node.enabled = false
post api('/geo/refresh_wikis', admin), nil
expect(response).to have_http_status(403)
end
end
describe 'POST /geo/receive_events disabled node' do
it 'responds with forbidden' do
secondary_node.enabled = false
post api('/geo/receive_events'), nil, geo_token_header
expect(response).to have_http_status(403)
end
end
describe 'POST /geo/receive_events when node has namespace restrictions' do
let(:synced_group) { create(:group) }
let(:secondary_node) { create(:geo_node, namespaces: [synced_group]) }
let(:push_payload) do
{
'event_name' => 'push',
'project' => {
'git_ssh_url' => 'git@example.com:mike/diaspora.git'
}
}
end
before do
allow(Gitlab::Geo).to receive(:current_node) { secondary_node }
allow_any_instance_of(::Geo::ScheduleRepoUpdateService).to receive(:execute)
allow_any_instance_of(::Geo::ScheduleRepoFetchService).to receive(:execute)
end
it 'responds with not found for projects that do not belong to selected namespaces to replicate' do
unsynced_project = create(:project)
post api('/geo/receive_events'), push_payload.merge('project_id' => unsynced_project.id), geo_token_header
expect(response).to have_http_status(404)
end
it 'responds with success for projects that belong to selected namespaces to replicate' do
project_in_synced_group = create(:project, group: synced_group)
post api('/geo/receive_events'), push_payload.merge('project_id' => project_in_synced_group.id), geo_token_header
expect(response).to have_http_status(201)
end
end
describe 'POST /geo/receive_events key events' do
before do
allow_any_instance_of(::Geo::ScheduleKeyChangeService).to receive(:execute)
end
let(:key_create_payload) do
{
'event_name' => 'key_create',
'created_at' => '2014-08-18 18:45:16 UTC',
'updated_at' => '2012-07-21T07:38:22Z',
'username' => 'root',
'key' => 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC58FwqHUbebw2SdT7SP4FxZ0w+lAO/erhy2ylhlcW/tZ3GY3mBu9VeeiSGoGz8hCx80Zrz+aQv28xfFfKlC8XQFpCWwsnWnQqO2Lv9bS8V1fIHgMxOHIt5Vs+9CAWGCCvUOAurjsUDoE2ALIXLDMKnJxcxD13XjWdK54j6ZXDB4syLF0C2PnAQSVY9X7MfCYwtuFmhQhKaBussAXpaVMRHltie3UYSBUUuZaB3J4cg/7TxlmxcNd+ppPRIpSZAB0NI6aOnqoBCpimscO/VpQRJMVLr3XiSYeT6HBiDXWHnIVPfQc03OGcaFqOit6p8lYKMaP/iUQLm+pgpZqrXZ9vB john@localhost',
'id' => 1
}
end
let(:key_destroy_payload) do
{
'event_name' => 'key_destroy',
'created_at' => '2014-08-18 18:45:16 UTC',
'updated_at' => '2012-07-21T07:38:22Z',
'username' => 'root',
'key' => 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC58FwqHUbebw2SdT7SP4FxZ0w+lAO/erhy2ylhlcW/tZ3GY3mBu9VeeiSGoGz8hCx80Zrz+aQv28xfFfKlC8XQFpCWwsnWnQqO2Lv9bS8V1fIHgMxOHIt5Vs+9CAWGCCvUOAurjsUDoE2ALIXLDMKnJxcxD13XjWdK54j6ZXDB4syLF0C2PnAQSVY9X7MfCYwtuFmhQhKaBussAXpaVMRHltie3UYSBUUuZaB3J4cg/7TxlmxcNd+ppPRIpSZAB0NI6aOnqoBCpimscO/VpQRJMVLr3XiSYeT6HBiDXWHnIVPfQc03OGcaFqOit6p8lYKMaP/iUQLm+pgpZqrXZ9vB john@localhost',
'id' => 1
}
end
it 'enqueues on disk key creation if admin and correct params' do
post api('/geo/receive_events'), key_create_payload, geo_token_header
expect(response).to have_http_status(201)
end
it 'enqueues on disk key removal if admin and correct params' do
post api('/geo/receive_events'), key_destroy_payload, geo_token_header
expect(response).to have_http_status(201)
end
end
describe 'POST /geo/receive_events push events' do
before do
allow_any_instance_of(::Geo::ScheduleRepoUpdateService).to receive(:execute)
allow_any_instance_of(::Geo::ScheduleRepoFetchService).to receive(:execute)
end
let(:push_payload) do
{
'event_name' => 'push',
'project_id' => 1,
'project' => {
'git_ssh_url' => 'git@example.com:mike/diaspora.git'
}
}
end
it 'starts refresh process if admin and correct params' do
post api('/geo/receive_events'), push_payload, geo_token_header
expect(response).to have_http_status(201)
end
end
describe 'POST /geo/receive_events push_tag events' do
before do
allow_any_instance_of(::Geo::ScheduleWikiRepoUpdateService).to receive(:execute)
end
let(:tag_push_payload) do
{
'event_name' => 'tag_push',
'project_id' => 1,
'project' => {
'git_ssh_url' => 'git@example.com:mike/diaspora.git'
}
}
end
it 'starts refresh process if admin and correct params' do
post api('/geo/receive_events'), tag_push_payload, geo_token_header
expect(response).to have_http_status(201)
end
end
describe 'GET /geo/transfers/attachment/1' do describe 'GET /geo/transfers/attachment/1' do
let!(:secondary_node) { create(:geo_node) } let!(:secondary_node) { create(:geo_node) }
let(:note) { create(:note, :with_attachment) } let(:note) { create(:note, :with_attachment) }
......
require 'spec_helper'
describe Geo::EnqueueWikiUpdateService do
subject { described_class.new(project) }
let(:project) { double(:project) }
let(:fake_url) { 'git@localhost:repo/path.git' }
let(:fake_id) { 999 }
let(:queue) { subject.instance_variable_get(:@queue) }
before do
queue.empty!
expect(project).to receive_message_chain(:wiki, :url_to_repo) { fake_url }
expect(project).to receive(:id) { fake_id }
end
describe '#execute' do
let(:stored_data) { queue.first }
before do
subject.execute
end
it 'persists id and clone_url to redis queue' do
expect(stored_data).to have_key('id')
expect(stored_data).to have_key('clone_url')
end
it 'persisted id is equal to original' do
expect(stored_data['id']).to eq(fake_id)
end
it 'persisted clone_url is equal to original' do
expect(stored_data['clone_url']).to eq(fake_url)
end
end
end
...@@ -17,4 +17,18 @@ describe Geo::MoveRepositoryService do ...@@ -17,4 +17,18 @@ describe Geo::MoveRepositoryService do
expect(File.directory?("#{full_new_path}.git")).to be_truthy expect(File.directory?("#{full_new_path}.git")).to be_truthy
end end
end end
describe '#async_execute' do
it 'starts the worker' do
expect(GeoRepositoryMoveWorker).to receive(:perform_async)
subject.async_execute
end
it 'returns job id' do
allow(GeoRepositoryMoveWorker).to receive(:perform_async).and_return('foo')
expect(subject.async_execute).to eq('foo')
end
end
end end
require 'spec_helper'
describe Geo::RepositoryDestroyService do
let(:project) { create(:project_empty_repo) }
subject(:service) { described_class.new(project.id, project.name, project.disk_path, project.repository_storage) }
describe '#async_execute' do
it 'starts the worker' do
expect(GeoRepositoryDestroyWorker).to receive(:perform_async)
subject.async_execute
end
end
describe '#execute' do
it 'delegates project removal to Projects::DestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate)
service.execute
end
context 'legacy storage project' do
it 'removes the repository from disk' do
project.delete
expect(project.gitlab_shell.exists?(project.repository_storage_path, "#{project.disk_path}.git")).to be_truthy
service.execute
expect(project.gitlab_shell.exists?(project.repository_storage_path, "#{project.disk_path}.git")).to be_falsey
end
end
context 'hashed storage project' do
let(:project) { create(:project_empty_repo, :hashed) }
it 'removes the repository from disk' do
project.delete
expect(project.gitlab_shell.exists?(project.repository_storage_path, "#{project.disk_path}.git")).to be_truthy
service.execute
expect(project.gitlab_shell.exists?(project.repository_storage_path, "#{project.disk_path}.git")).to be_falsey
end
end
end
end
require 'spec_helper'
describe Geo::RepositoryUpdateService do
let(:project) { create(:project) }
let(:clone_url) { project.ssh_url_to_repo }
subject { described_class.new(project, clone_url) }
describe '#execute' do
before do
allow_any_instance_of(Gitlab::Geo).to receive_messages(secondary?: true)
allow(project.repository).to receive(:fetch_geo_mirror).and_return(true)
allow(project).to receive(:repository_exists?) { false }
allow(project).to receive(:empty_repo?) { true }
allow(project.repository).to receive(:expire_all_method_caches)
allow(project.repository).to receive(:expire_branch_cache)
allow(project.repository).to receive(:expire_content_cache)
end
it 'releases the lease' do
expect(Gitlab::ExclusiveLease).to receive(:cancel).once.and_call_original
subject.execute
end
it 'creates a new repository' do
expect(project).to receive(:create_repository)
subject.execute
end
it 'executes after_create hook' do
expect(project.repository).to receive(:after_create).at_least(:once)
subject.execute
end
it 'fetches the Geo mirror' do
expect(project.repository).to receive(:fetch_geo_mirror)
subject.execute
end
it 'expires repository caches' do
expect(project.repository).to receive(:expire_all_method_caches)
expect(project.repository).to receive(:expire_branch_cache)
expect(project.repository).to receive(:expire_content_cache)
subject.execute
end
it 'rescues Gitlab::Shell::Error failures' do
expect(project.repository).to receive(:fetch_geo_mirror).and_raise(Gitlab::Shell::Error)
expect { subject.execute }.not_to raise_error
end
it 'rescues Gitlab::Git::Repository::NoRepository failures and fires after_create hook' do
expect(project.repository).to receive(:fetch_geo_mirror).and_raise(Gitlab::Git::Repository::NoRepository)
expect_any_instance_of(Repository).to receive(:after_create)
expect { subject.execute }.not_to raise_error
end
end
end
require 'spec_helper'
describe Geo::ScheduleKeyChangeService do
subject(:key_create) { described_class.new('id' => 1, 'key' => key.key, 'action' => :create) }
subject(:key_delete) { described_class.new('id' => 1, 'key' => key.key, 'action' => :delete) }
let(:key) { FactoryGirl.build(:key) }
before do
allow_any_instance_of(GeoKeyRefreshWorker).to receive(:perform)
end
context 'key creation' do
it 'executes action' do
expect(key_create.execute).to be_truthy
end
end
context 'key removal' do
it 'executes action' do
expect(key_delete.execute).to be_truthy
end
end
end
require 'spec_helper'
describe Geo::ScheduleRepoCreateService do
let(:project) { create(:project, :repository) }
subject { described_class.new(project_id: project.id) }
describe '#execute' do
it 'schedules the repository creation' do
Sidekiq::Worker.clear_all
Sidekiq::Testing.fake! do
expect { subject.execute }.to change(GeoRepositoryCreateWorker.jobs, :size).by(1)
end
end
end
end
require 'spec_helper'
describe Geo::ScheduleRepoUpdateService do
include RepoHelpers
let(:user) { create :user }
let(:project) { create :project, :repository }
let(:blankrev) { Gitlab::Git::BLANK_SHA }
let(:oldrev) { sample_commit.parent_id }
let(:newrev) { sample_commit.id }
let(:ref) { 'refs/heads/master' }
let(:service) { execute_push_service(project, user, oldrev, newrev, ref) }
before do
project.team << [user, :master]
end
subject { described_class.new(service.push_data) }
context 'parsed push_data' do
it 'includes required params' do
expect(subject.push_data).to include('type', 'before', 'after', 'ref')
end
end
context '#execute' do
let(:push_data) { service.push_data }
let(:args) do
[
project.id,
push_data[:project][:git_ssh_url],
{
'type' => push_data[:object_kind],
'before' => push_data[:before],
'after' => push_data[:newref],
'ref' => push_data[:ref]
}
]
end
it 'schedule update service' do
expect(GeoRepositoryUpdateWorker).to receive(:perform_async).with(*args)
subject.execute
end
end
def execute_push_service(project, user, oldrev, newrev, ref)
service = GitPushService.new(project, user, oldrev: oldrev, newrev: newrev, ref: ref)
service.execute
service
end
end
require 'spec_helper'
describe Geo::ScheduleWikiRepoUpdateService do
describe '#execute' do
let(:group) { create(:group) }
let(:project_1) { create(:project) }
let(:project_2) { create(:project, group: group) }
let(:projects) do
[
{ 'id' => project_1.id, 'clone_url' => 'git@example.com:mike/diaspora.git' },
{ 'id' => project_2.id, 'clone_url' => 'git@example.com:asd/vim.git' }
]
end
subject { described_class.new(projects) }
it "enqueues a batch of IDs of wiki's projects to have their wiki repositories updated" do
create(:geo_node, :current)
expect(GeoWikiRepositoryUpdateWorker).to receive(:perform_async)
.once.with(project_1.id, 'git@example.com:mike/diaspora.git').and_return(spy)
expect(GeoWikiRepositoryUpdateWorker).to receive(:perform_async)
.once.with(project_2.id, 'git@example.com:asd/vim.git').and_return(spy)
subject.execute
end
context 'when node has namespace restrictions' do
it "does not enqueue IDs of wiki's projects that do not belong to selected namespaces to replicate" do
create(:geo_node, :current, namespaces: [group])
expect(GeoWikiRepositoryUpdateWorker).not_to receive(:perform_async)
.with(project_1.id, 'git@example.com:mike/diaspora.git')
expect(GeoWikiRepositoryUpdateWorker).to receive(:perform_async)
.once.with(project_2.id, 'git@example.com:asd/vim.git').and_return(spy)
subject.execute
end
end
end
end
require 'spec_helper'
describe GeoDynamicBackoff do
class TestWorkerBackOff
include Sidekiq::Worker
include GeoDynamicBackoff
def perform(options)
false
end
end
let(:worker) do
TestWorkerBackOff
end
context 'retry strategy' do
it 'sets a custom strategy for retrying' do
expect(worker.sidekiq_retry_in_block).to be_a(Proc)
end
it 'when retry_count is in 1..30, retries with linear_backoff_strategy' do
expect(worker).to receive(:linear_backoff_strategy)
worker.sidekiq_retry_in_block.call(1)
expect(worker).to receive(:linear_backoff_strategy)
worker.sidekiq_retry_in_block.call(30)
end
it 'when retry_count is > 30, retries with geometric_backoff_strategy' do
expect(worker).to receive(:geometric_backoff_strategy)
worker.sidekiq_retry_in_block.call(31)
end
end
context '.linear_backoff_strategy' do
it 'returns rand + retry_count' do
allow(worker).to receive(:rand).and_return(1)
expect(worker.sidekiq_retry_in_block.call(1)).to eq(2)
end
end
context '.geometric_backoff_strategy' do
it 'when retry_count is 31 for a fixed rand()=1 returns 18' do
allow(worker).to receive(:rand).and_return(1)
expect(worker.sidekiq_retry_in_block.call(31)).to eq(18)
end
it 'when retry_count is 32 for a fixed rand()=1 returns 18' do
allow(worker).to receive(:rand).and_return(1)
expect(worker.sidekiq_retry_in_block.call(32)).to eq(34)
end
end
end
require 'spec_helper'
describe GeoKeyRefreshWorker do
subject(:key_create) { described_class.new.perform(key.id, key.key, 'key_create') }
subject(:key_delete) { described_class.new.perform(key.id, key.key, 'key_destroy') }
let(:key) { FactoryGirl.create(:key) }
context 'key creation' do
it 'adds key to shell' do
expect(Key).to receive(:find).with(key.id) { key }
expect(key).to receive(:add_to_shell)
expect { key_create }.not_to raise_error
end
end
context 'key removal' do
it 'removes key from the shell' do
expect(Key).to receive(:new).with(id: key.id, key: key.key) { key }
expect(key).to receive(:remove_from_shell)
expect { key_delete }.not_to raise_error
end
end
end
require 'spec_helper'
describe GeoRepositoryCreateWorker do
let(:user) { create :user }
let(:project) { create :project, :repository }
let(:perform!) { subject.perform(project.id) }
before do
expect(Project).to receive(:find).at_least(:once).with(project.id) { project }
end
context 'when no repository' do
before do
expect(project).to receive(:repository_exists?) { false }
end
it 'creates the repository' do
expect(project).to receive(:create_repository)
perform!
end
it 'does not create the repository when its being imported' do
expect(project).to receive(:import?) { true }
expect(project).not_to receive(:create_repository)
perform!
end
end
context 'when repository exists' do
before do
expect(project).to receive(:repository_exists?) { true }
end
it 'does not try to create the repository again' do
expect(project).not_to receive(:create_repository)
perform!
end
end
end
require 'spec_helper' require 'spec_helper'
describe GeoRepositoryDestroyWorker do describe GeoRepositoryDestroyWorker do
let!(:project) { create :project_empty_repo } let(:project) { create(:project) }
let!(:path) { project.repository.full_path }
let!(:remove_path) { path.sub(/\.git\Z/, "+#{project.id}+deleted.git") }
let(:perform!) { subject.perform(project.id, project.name, path) }
it 'delegates project removal to Projects::DestroyService' do it 'delegates project removal to Geo::RepositoryDestroyService' do
expect_any_instance_of(EE::Projects::DestroyService).to receive(:geo_replicate) expect_any_instance_of(Geo::RepositoryDestroyService).to receive(:execute)
perform! described_class.new.perform(project.id, project.name, project.path, 'default')
end
context 'sidekiq execution' do
before do
project.delete
end
it 'removes the repository from disk' do
expect(project.gitlab_shell.exists?(project.repository_storage_path, path + '.git')).to be_truthy
Sidekiq::Testing.inline! { perform! }
expect(project.gitlab_shell.exists?(project.repository_storage_path, path + '.git')).to be_falsey
expect(project.gitlab_shell.exists?(project.repository_storage_path, remove_path + '.git')).to be_falsey
end
end
describe '#probe_repository_storage' do
it 'returns a repository_storage when repository can be found' do
expect(subject.send(:probe_repository_storage, project.full_path)).to eq('default')
end
it 'returns nil when repository cannot be found in any existing repository_storage' do
expect(subject.send(:probe_repository_storage, 'nonexistent/project')).to eq(nil)
end
end end
end end
require 'spec_helper'
describe GeoRepositoryFetchWorker do
describe '#perform' do
let(:project) { create(:project) }
it 'delegates to Geo::RepositoryUpdateService' do
expect_any_instance_of(Geo::RepositoryUpdateService).to receive(:execute)
perform
end
end
def perform
subject.perform(project.id, project.ssh_url_to_repo)
end
end
require 'spec_helper'
describe GeoRepositoryUpdateWorker do
include RepoHelpers
let(:user) { create :user }
let(:project) { create :project, :repository }
let(:blankrev) { Gitlab::Git::BLANK_SHA }
let(:oldrev) { sample_commit.parent_id }
let(:newrev) { sample_commit.id }
let(:ref) { 'refs/heads/master' }
let(:service) { execute_push_service(project, user, oldrev, newrev, ref) }
let(:push_data) { service.push_data }
let(:parsed_push_data) do
{
'type' => push_data[:object_kind],
'before' => push_data[:before],
'after' => push_data[:after],
'ref' => push_data[:ref]
}
end
let(:clone_url) { push_data[:project][:git_ssh_url] }
let(:performed) { subject.perform(project.id, clone_url, parsed_push_data) }
before do
project.team << [user, :master]
expect(Project).to receive(:find).at_least(:once).with(project.id) { project }
end
context 'when empty repository' do
before do
allow(project.repository).to receive(:fetch_geo_mirror)
allow(project).to receive(:empty_repo?) { true }
end
it 'executes after_create hook' do
expect(project.repository).to receive(:after_create).at_least(:once)
performed
end
end
context '#process_hooks' do
before do
allow(subject).to receive(:fetch_repository)
end
it 'calls if push_data is present' do
expect(subject).to receive(:process_hooks)
performed
end
context 'when no push_data is present' do
let(:parsed_push_data) { nil }
it 'skips process_hooks' do
expect(subject).not_to receive(:process_hooks)
performed
end
end
end
context '#process_push' do
before do
allow(subject).to receive(:fetch_repository)
end
it 'executes after_push_commit' do
expect(project.repository).to receive(:after_push_commit).at_least(:once).with('master')
performed
end
context 'when removing branch' do
it 'executes after_remove_branch' do
allow(subject).to receive(:push_remove_branch?) { true }
expect(project.repository).to receive(:after_remove_branch)
performed
end
end
context 'when updating a new branch' do
it 'executes after_create_branch' do
allow(subject).to receive(:push_to_new_branch?) { true }
expect(project.repository).to receive(:after_create_branch)
performed
end
end
end
def execute_push_service(project, user, oldrev, newrev, ref)
service = GitPushService.new(project, user, oldrev: oldrev, newrev: newrev, ref: ref)
service.execute
service
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment