Commit 2981044f authored by Dmitriy Zaporozhets's avatar Dmitriy Zaporozhets

Merge branch 'feature/geo-sshkey-systemhook' into 'master'

Geo: Use System Hooks for key synchronization

This MR changes from our custom notification solution to use system hooks instead for GitLab Geo (#76).
We had also to change the way we authenticate API requests for Geo. (See !334 for reference).

With new implementation, we create one system hook for every secondary Geo Node, and there will be a single endpoint that will receive all events (more to be migrated to System Hook).

See merge request !332
parents 1efccf37 160a6f81
......@@ -13,6 +13,7 @@
class GeoNode < ActiveRecord::Base
belongs_to :geo_node_key, dependent: :destroy
belongs_to :oauth_application, class_name: 'Doorkeeper::Application', dependent: :destroy
belongs_to :system_hook, dependent: :destroy
default_values schema: 'http',
host: lambda { Gitlab.config.gitlab.host },
......@@ -20,7 +21,7 @@ class GeoNode < ActiveRecord::Base
relative_url_root: '',
primary: false
accepts_nested_attributes_for :geo_node_key
accepts_nested_attributes_for :geo_node_key, :system_hook
validates :host, host: true, presence: true, uniqueness: { case_sensitive: false, scope: :port }
validates :primary, uniqueness: { message: 'node already exists' }, if: :primary
......@@ -60,8 +61,8 @@ class GeoNode < ActiveRecord::Base
URI.join(uri, "#{uri.path}/", "api/#{API::API.version}/geo/refresh_wikis").to_s
end
def notify_key_url
URI.join(uri, "#{uri.path}/", "api/#{API::API.version}/geo/refresh_key").to_s
def geo_events_url
URI.join(uri, "#{uri.path}/", "api/#{API::API.version}/geo/receive_events").to_s
end
def oauth_callback_url
......@@ -84,6 +85,7 @@ class GeoNode < ActiveRecord::Base
def build_dependents
self.build_geo_node_key if geo_node_key.nil?
update_system_hook!
end
def update_dependents_attributes
......@@ -92,9 +94,8 @@ class GeoNode < ActiveRecord::Base
if self.primary?
self.oauth_application = nil
else
self.build_oauth_application if oauth_application.nil?
self.oauth_application.name = "Geo node: #{self.url}"
self.oauth_application.redirect_uri = oauth_callback_url
update_oauth_application!
update_system_hook!
end
end
......@@ -106,4 +107,16 @@ class GeoNode < ActiveRecord::Base
record.errors[:base] << 'Current node must be the primary node or you will be locking yourself out'
end
end
def update_oauth_application!
self.build_oauth_application if oauth_application.nil?
self.oauth_application.name = "Geo node: #{self.url}"
self.oauth_application.redirect_uri = oauth_callback_url
end
def update_system_hook!
self.build_system_hook if system_hook.nil?
self.system_hook.token = SecureRandom.hex(20) unless self.system_hook.token.present?
self.system_hook.url = geo_events_url if uri.present?
end
end
......@@ -57,7 +57,6 @@ class Key < ActiveRecord::Base
end
def add_to_shell
Gitlab::Geo.notify_key_change(id, key, :create) if Gitlab::Geo.primary?
GitlabShellWorker.perform_async(
:add_key,
shell_id,
......@@ -74,7 +73,6 @@ class Key < ActiveRecord::Base
end
def remove_from_shell
Gitlab::Geo.notify_key_change(id, key, :delete) if Gitlab::Geo.primary?
GitlabShellWorker.perform_async(
:remove_key,
shell_id,
......
module Geo
class NotifyKeyChangeService < BaseNotify
def initialize(key_id, key, action)
@id = key_id
@key = key
@action = action
end
def execute
key_change = { 'id' => @id, 'key' => @key, 'action' => @action }
content = { key_change: key_change }.to_json
::Gitlab::Geo.secondary_nodes.each do |node|
notify_url = node.notify_key_url
success, message = notify(notify_url, content)
unless success
error_message = "GitLab failed to notify #{node.url} to #{notify_url} : #{message}"
Rails.logger.error(error_message)
fail error_message # we must throw exception here to re-schedule job execution.
end
end
end
end
end
......@@ -2,10 +2,10 @@ module Geo
class ScheduleKeyChangeService
attr_reader :id, :key, :action
def initialize(key_change)
@id = key_change['id']
@key = key_change['key']
@action = key_change['action']
def initialize(params)
@id = params['id']
@key = params['key']
@action = params['event_name']
end
def execute
......
class GeoKeyChangeNotifyWorker
include Sidekiq::Worker
include GeoDynamicBackoff
sidekiq_options queue: :default
def perform(key_id, key, action)
Geo::NotifyKeyChangeService.new(key_id, key, action).execute
end
end
......@@ -8,11 +8,11 @@ class GeoKeyRefreshWorker
action = action.to_sym
case action
when :create
when :key_create
# ActiveRecord::RecordNotFound when not found (so job will retry)
key = Key.find(key_id)
key.add_to_shell
when :delete
when :key_destroy
# we are physically removing the key after model is removed
# so we must reconstruct ids to schedule removal
key = Key.new(id: key_id, key: key)
......
class AddSystemHookToGeoNode < ActiveRecord::Migration
def change
change_table :geo_nodes do |t|
t.references :system_hook
end
end
end
......@@ -11,7 +11,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20160413115152) do
ActiveRecord::Schema.define(version: 20160414064845) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
......@@ -420,6 +420,7 @@ ActiveRecord::Schema.define(version: 20160413115152) do
t.boolean "primary"
t.integer "geo_node_key_id"
t.integer "oauth_application_id"
t.integer "system_hook_id"
end
add_index "geo_nodes", ["geo_node_key_id"], name: "index_geo_nodes_on_geo_node_key_id", using: :btree
......
module API
class Geo < Grape::API
before { authenticated_as_admin! }
resource :geo do
# Enqueue a batch of IDs of modified projects to have their
# repositories updated
......@@ -9,6 +7,7 @@ module API
# Example request:
# POST /geo/refresh_projects
post 'refresh_projects' do
authenticated_as_admin!
required_attributes! [:projects]
::Geo::ScheduleRepoUpdateService.new(params[:projects]).execute
end
......@@ -19,17 +18,24 @@ module API
# Example request:
# POST /geo/refresh_wikis
post 'refresh_wikis' do
authenticated_as_admin!
required_attributes! [:projects]
::Geo::ScheduleWikiRepoUpdateService.new(params[:projects]).execute
end
# Enqueue a change operation for specific key ID
# Receive event streams from primary and enqueue changes
#
# Example request:
# POST /geo/refresh_key
post 'refresh_key' do
required_attributes! [:key_change]
::Geo::ScheduleKeyChangeService.new(params[:key_change]).execute
# POST /geo/receive_events
post 'receive_events' do
authenticate_by_gitlab_geo_token!
required_attributes! %w(event_name)
case params['event_name']
when 'key_create', 'key_destroy'
required_attributes! %w(key id)
::Geo::ScheduleKeyChangeService.new(params).execute
end
end
end
end
......
......@@ -112,6 +112,13 @@ module API
end
end
def authenticate_by_gitlab_geo_token!
token = headers['X-Gitlab-Token'].try(:chomp)
unless token && Devise.secure_compare(geo_token, token)
unauthorized!
end
end
def authenticated_as_admin!
forbidden! unless current_user.is_admin?
end
......@@ -377,6 +384,10 @@ module API
File.read(Gitlab.config.gitlab_shell.secret_file).chomp
end
def geo_token
Gitlab::Geo.current_node.system_hook.token
end
def handle_member_errors(errors)
error!(errors[:access_level], 422) if errors[:access_level].any?
not_found!(errors)
......
......@@ -42,10 +42,6 @@ module Gitlab
::Geo::EnqueueWikiUpdateService.new(project).execute
end
def self.notify_key_change(key_id, key, action)
GeoKeyChangeNotifyWorker.perform_async(key_id, key, action)
end
def self.bulk_notify_job
Sidekiq::Cron::Job.find('geo_bulk_notify_worker')
end
......
......@@ -2,7 +2,7 @@ module Gitlab
module Middleware
class ReadonlyGeo
DISALLOWED_METHODS = %w(POST PATCH PUT DELETE)
WHITELISTED = %w(api/v3/internal api/v3/geo/refresh_projects api/v3/geo/refresh_wikis api/v3/geo/refresh_key)
WHITELISTED = %w(api/v3/internal api/v3/geo/refresh_projects api/v3/geo/refresh_wikis api/v3/geo/refresh_key api/v3/geo/receive_events)
APPLICATION_JSON = 'application/json'
def initialize(app)
......
......@@ -79,15 +79,4 @@ describe Gitlab::Geo, lib: true do
described_class.notify_project_update(project)
end
end
describe 'notify_ssh_key_change' do
let(:key) { FactoryGirl.build(:key) }
it 'schedule async notification' do
expect(GeoKeyChangeNotifyWorker).to receive(:perform_async).and_call_original
expect_any_instance_of(GeoKeyChangeNotifyWorker).to receive(:perform)
described_class.notify_key_change(key.id, key, 'create')
end
end
end
......@@ -3,7 +3,7 @@ require 'spec_helper'
describe GeoNode, type: :model do
subject(:new_node) { described_class.new(schema: 'https', host: 'localhost', port: 3000, relative_url_root: 'gitlab') }
subject(:new_primary_node) { described_class.new(schema: 'https', host: 'localhost', port: 3000, relative_url_root: 'gitlab', primary: true) }
subject(:empty_node) { described_class.new(schema: nil, host: nil, port: nil, relative_url_root: nil) }
subject(:empty_node) { described_class.new }
subject(:primary_node) { FactoryGirl.create(:geo_node, :primary) }
subject(:node) { FactoryGirl.create(:geo_node) }
......@@ -56,15 +56,12 @@ describe GeoNode, type: :model do
let(:geo_node_key_attributes) { FactoryGirl.build(:geo_node_key).attributes }
context 'on initialize' do
before(:each) do
new_node.geo_node_key_attributes = geo_node_key_attributes
end
it 'initializes a corresponding key' do
expect(new_node.geo_node_key).to be_present
end
it 'is valid' do
it 'is valid when required attributes are present' do
new_node.geo_node_key_attributes = geo_node_key_attributes
expect(new_node).to be_valid
end
end
......@@ -81,6 +78,16 @@ describe GeoNode, type: :model do
it 'has no oauth_application if it is a primary node' do
expect(primary_node.oauth_application).not_to be_present
end
it 'has a system_hook if it is a secondary node' do
expect(node.system_hook).to be_present
end
it 'generated system_hook has required attributes' do
expect(node.system_hook.url).to be_present
expect(node.system_hook.url).to eq(node.geo_events_url)
expect(node.system_hook.token).to be_present
end
end
end
......@@ -163,11 +170,11 @@ describe GeoNode, type: :model do
end
end
describe '#notify_key_url' do
let(:refresh_url) { 'https://localhost:3000/gitlab/api/v3/geo/refresh_key' }
describe '#geo_events_url' do
let(:events_url) { 'https://localhost:3000/gitlab/api/v3/geo/receive_events' }
it 'returns api url based on node uri' do
expect(new_node.notify_key_url).to eq(refresh_url)
expect(new_node.geo_events_url).to eq(events_url)
end
end
......
......@@ -4,6 +4,7 @@ describe API::API, api: true do
include ApiHelpers
let(:admin) { create(:admin) }
let(:user) { create(:user) }
let(:geo_node) { build(:geo_node) }
describe 'POST /geo/refresh_projects' do
before(:each) { allow_any_instance_of(::Geo::ScheduleRepoUpdateService).to receive(:execute) }
......@@ -19,22 +20,56 @@ describe API::API, api: true do
end
end
describe 'POST /geo/refresh_key' do
before(:each) { allow_any_instance_of(::Geo::ScheduleKeyChangeService).to receive(:execute) }
describe 'POST /geo/receive_events' do
before(:each) do
allow_any_instance_of(::Geo::ScheduleKeyChangeService).to receive(:execute)
allow(Gitlab::Geo).to receive(:current_node) { geo_node }
end
let(:geo_token_header) do
{ 'X-Gitlab-Token' => geo_node.system_hook.token }
end
let(:key_create_payload) do
{
'event_name' => 'key_create',
'created_at' => '2014-08-18 18:45:16 UTC',
'updated_at' => '2012-07-21T07:38:22Z',
'username' => 'root',
'key' => 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC58FwqHUbebw2SdT7SP4FxZ0w+lAO/erhy2ylhlcW/tZ3GY3mBu9VeeiSGoGz8hCx80Zrz+aQv28xfFfKlC8XQFpCWwsnWnQqO2Lv9bS8V1fIHgMxOHIt5Vs+9CAWGCCvUOAurjsUDoE2ALIXLDMKnJxcxD13XjWdK54j6ZXDB4syLF0C2PnAQSVY9X7MfCYwtuFmhQhKaBussAXpaVMRHltie3UYSBUUuZaB3J4cg/7TxlmxcNd+ppPRIpSZAB0NI6aOnqoBCpimscO/VpQRJMVLr3XiSYeT6HBiDXWHnIVPfQc03OGcaFqOit6p8lYKMaP/iUQLm+pgpZqrXZ9vB john@localhost',
'id' => 1
}
end
let(:key_destroy_payload) do
{
'event_name' => 'key_destroy',
'created_at' => '2014-08-18 18:45:16 UTC',
'updated_at' => '2012-07-21T07:38:22Z',
'username' => 'root',
'key' => 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC58FwqHUbebw2SdT7SP4FxZ0w+lAO/erhy2ylhlcW/tZ3GY3mBu9VeeiSGoGz8hCx80Zrz+aQv28xfFfKlC8XQFpCWwsnWnQqO2Lv9bS8V1fIHgMxOHIt5Vs+9CAWGCCvUOAurjsUDoE2ALIXLDMKnJxcxD13XjWdK54j6ZXDB4syLF0C2PnAQSVY9X7MfCYwtuFmhQhKaBussAXpaVMRHltie3UYSBUUuZaB3J4cg/7TxlmxcNd+ppPRIpSZAB0NI6aOnqoBCpimscO/VpQRJMVLr3XiSYeT6HBiDXWHnIVPfQc03OGcaFqOit6p8lYKMaP/iUQLm+pgpZqrXZ9vB john@localhost',
'id' => 1
}
end
it 'enqueues on disk key creation if admin and correct params' do
post api('/geo/refresh_key', admin), key_change: { id: 1, action: 'create' }
post api('/geo/receive_events'), key_create_payload, geo_token_header
expect(response.status).to eq 201
end
it 'enqueues on disk key removal if admin and correct params' do
post api('/geo/refresh_key', admin), key_change: { id: 1, action: 'delete' }
post api('/geo/receive_events'), key_destroy_payload, geo_token_header
expect(response.status).to eq 201
end
it 'denies access if not admin' do
post api('/geo/refresh_key', user)
expect(response.status).to eq 403
it 'denies access if token is not present' do
post api('/geo/receive_events')
expect(response.status).to eq 401
end
it 'denies access if token is invalid' do
post api('/geo/receive_events'), nil, { 'X-Gitlab-Token' => 'nothing' }
expect(response.status).to eq 401
end
end
end
require 'spec_helper'
describe GeoKeyRefreshWorker do
subject(:key_create) { described_class.new.perform(key.id, key.key, 'create') }
subject(:key_delete) { described_class.new.perform(key.id, key.key, 'delete') }
subject(:key_create) { described_class.new.perform(key.id, key.key, 'key_create') }
subject(:key_delete) { described_class.new.perform(key.id, key.key, 'key_destroy') }
let(:key) { FactoryGirl.create(:key) }
context 'key creation' do
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment