Commit 9188a762 authored by Douwe Maan's avatar Douwe Maan

Merge branch '2467-geo-implement-consumer-aka-geologcursor-for-event-log' into 'master'

Geo: Implement GeoLogCursor for event log

Closes #2467

See merge request !1988
parents 221ab608 d50ae9fc
class Geo::EventLogState < Geo::BaseRegistry
self.primary_key = :event_id
validates :event_id, presence: true
def self.last_processed
order(event_id: :desc).first
end
end
#!/usr/bin/env ruby
# vim: ft=ruby
require 'rubygems'
require 'bundler/setup'
# loads rails environment / initializers
require "#{File.dirname(__FILE__)}/../config/environment"
require 'optparse'
class GeoLogCursorOptionParser
def self.parse(argv)
options = { full_scan: false }
version = Gitlab::Geo::LogCursor::Daemon::VERSION
op = OptionParser.new
op.banner = 'GitLab Geo: Log Cursor'
op.separator ''
op.separator 'Usage: ./geo_log_cursor [options]'
op.separator ''
op.on('-f', '--full-scan', 'Performs full-scan to lookup for un-replicated data') { options[:full_scan] = true }
op.separator 'Common options:'
op.on('-h', '--help') do
puts op.to_s
exit
end
op.on('-v', '--version') do
puts version
exit
end
op.separator ''
op.parse!(argv)
options
end
end
if $0 == __FILE__
options = GeoLogCursorOptionParser.parse(ARGV)
Gitlab::Geo::LogCursor::Daemon.new(options).run!
end
---
title: GeoLogCursor is part of a new experimental Geo replication system
merge_request: 1988
author:
class CreateEventLogState < ActiveRecord::Migration
def change
create_table :event_log_states, id: false do |t|
t.integer :event_id, limit: 8, primary_key: true
end
end
end
class AddNeedsResyncToProjectRegistry < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_column_with_default(:project_registry, :resync_repository, :boolean, default: true)
add_column_with_default(:project_registry, :resync_wiki, :boolean, default: true)
end
def down
remove_columns :project_registry, :resync_repository, :resync_wiki
end
end
......@@ -11,11 +11,14 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20170526214010) do
ActiveRecord::Schema.define(version: 20170606155045) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
create_table "event_log_states", primary_key: "event_id", force: :cascade do |t|
end
create_table "file_registry", force: :cascade do |t|
t.string "file_type", null: false
t.integer "file_id", null: false
......@@ -32,7 +35,10 @@ ActiveRecord::Schema.define(version: 20170526214010) do
t.datetime "last_repository_synced_at"
t.datetime "last_repository_successful_sync_at"
t.datetime "created_at", null: false
t.boolean "resync_repository", default: true, null: false
t.boolean "resync_wiki", default: true, null: false
end
add_index "project_registry", ["project_id"], name: "index_project_registry_on_project_id", using: :btree
end
......@@ -10,13 +10,21 @@ module Gitlab
# ExclusiveLease.
#
class ExclusiveLease
LUA_CANCEL_SCRIPT = <<-EOS.freeze
LUA_CANCEL_SCRIPT = <<~EOS.freeze
local key, uuid = KEYS[1], ARGV[1]
if redis.call("get", key) == uuid then
redis.call("del", key)
end
EOS
LUA_RENEW_SCRIPT = <<~EOS.freeze
local key, uuid, ttl = KEYS[1], ARGV[1], ARGV[2]
if redis.call("get", key) == uuid then
redis.call("expire", key, ttl)
return uuid
end
EOS
def self.cancel(key, uuid)
Gitlab::Redis.with do |redis|
redis.eval(LUA_CANCEL_SCRIPT, keys: [redis_key(key)], argv: [uuid])
......@@ -42,6 +50,15 @@ module Gitlab
end
end
# Try to renew an existing lease. Return lease UUID on success,
# false if the lease is taken by a different UUID or inexistent.
def renew
Gitlab::Redis.with do |redis|
result = redis.eval(LUA_RENEW_SCRIPT, keys: [@redis_key], argv: [@uuid, @timeout])
result == @uuid
end
end
# Returns true if the key for this lease is set.
def exists?
Gitlab::Redis.with do |redis|
......
module Gitlab
module Geo
module LogCursor
class Daemon
VERSION = '0.1.0'.freeze
POOL_WAIT = 5.seconds.freeze
BATCH_SIZE = 250
attr_reader :options
def initialize(options = {})
@options = options
@exit = false
end
def run!
trap_signals
full_scan! if options[:full_scan]
until exit?
Events.fetch_in_batches do |batch|
handle_events(batch)
end
return if exit?
# When no new event is found sleep for a few moments
sleep(POOL_WAIT)
end
end
# Execute routines to verify the required initial data is available
# and mark non-replicated data as requiring replication.
def full_scan!
# This is slow and can be improved in the future by using PostgreSQL FDW
# so we can query with a LEFT JOIN and have a list of
# Projects without corresponding ProjectRegistry in the DR database
# See: https://robots.thoughtbot.com/postgres-foreign-data-wrapper (requires PG 9.6)
$stdout.print 'Searching for non replicated projects...'
Project.select(:id).find_in_batches(batch_size: BATCH_SIZE) do |batch|
$stdout.print '.'
project_ids = batch.map(&:id)
existing = ::Geo::ProjectRegistry.where(project_id: project_ids).pluck(:project_id)
missing_projects = project_ids - existing
Rails.logger.debug("Missing projects: #{missing_projects}")
missing_projects.each do |id|
::Geo::ProjectRegistry.create(project_id: id)
end
end
$stdout.puts 'Done!'
puts
end
def handle_events(batch)
batch.each do |event|
# Update repository
if event.repository_updated_event
handle_repository_update(event.repository_updated_event)
end
end
end
private
def trap_signals
trap(:TERM) do
quit!
end
trap(:INT) do
quit!
end
end
# Safe shutdown
def quit!
$stdout.puts 'Exiting...'
@exit = true
end
def handle_repository_update(updated_event)
registry = ::Geo::ProjectRegistry.find_or_initialize_by(project_id: project_id)
case updated_event.source
when 'repository'
registry.resync_repository = true
when 'wiki'
registry.resync_wiki = true
end
registry.save!
end
def exit?
@exit
end
end
end
end
end
module Gitlab
module Geo
module LogCursor
# Manages events from primary database and store state in the DR database
class Events
BATCH_SIZE = 50
NAMESPACE = 'geo:gitlab'.freeze
LEASE_TIMEOUT = 5.minutes.freeze
LEASE_KEY = 'geo_log_cursor_processed'.freeze
# fetches up to BATCH_SIZE next events and keep track of batches
def self.fetch_in_batches
try_obtain_lease do
::Geo::EventLog.where('id > ?', last_processed).find_in_batches(batch_size: BATCH_SIZE) do |batch|
yield batch
save_processed(batch.last.id)
break unless renew_lease!
end
end
end
# saves last replicated event
def self.save_processed(event_id)
event_state = ::Geo::EventLogState.last || ::Geo::EventLogState.new
event_state.update!(event_id: event_id)
end
# @return [Integer] id of last replicated event
def self.last_processed
last = ::Geo::EventLogState.last_processed.try(:id)
return last if last
::Geo::EventLog.any? ? ::Geo::EventLog.last.id : -1
end
# private methods
def self.try_obtain_lease
lease = exclusive_lease.try_obtain
unless lease
$stdout.puts 'Cannot obtain an exclusive lease. There must be another process already in execution.'
return
end
begin
yield lease
ensure
Gitlab::ExclusiveLease.cancel(LEASE_KEY, lease)
end
end
def self.renew_lease!
exclusive_lease.renew
end
def self.exclusive_lease
@lease ||= Gitlab::ExclusiveLease.new(LEASE_KEY, timeout: LEASE_TIMEOUT)
end
private_class_method :try_obtain_lease, :exclusive_lease
end
end
end
end
require 'spec_helper'
load File.expand_path('../../bin/geo_log_cursor', __dir__)
describe 'scripts/geo_log_cursor' do
describe GeoLogCursorOptionParser do
it 'parses -f and --full-scan' do
%w[-f --full-scan].each do |flag|
options = described_class.parse(%W[foo #{flag} bar])
expect(options[:full_scan]).to eq true
end
end
end
end
FactoryGirl.define do
factory :geo_event_log, class: Geo::EventLog do
repository_updated_event factory: :geo_repository_update_event
end
factory :geo_repository_update_event, class: Geo::RepositoryUpdatedEvent do
source 0
branches_affected 0
tags_affected 0
project factory: :empty_project
end
end
FactoryGirl.define do
factory :geo_event_log_state, class: Geo::EventLogState do
sequence(:event_id)
end
end
......@@ -19,6 +19,19 @@ describe Gitlab::ExclusiveLease, type: :redis do
end
end
describe '#renew' do
it 'returns true when we have the existing lease' do
lease = described_class.new(unique_key, timeout: 3600)
expect(lease.try_obtain).to be_present
expect(lease.renew).to be_truthy
end
it 'returns false when we dont have a lease' do
lease = described_class.new(unique_key, timeout: 3600)
expect(lease.renew).to be_falsey
end
end
describe '#exists?' do
it 'returns true for an existing lease' do
lease = described_class.new(unique_key, timeout: 3600)
......
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Daemon, lib: true do
describe '#run!' do
before do
allow(subject).to receive(:exit?) { true }
end
it 'traps signals' do
allow(subject).to receive(:exit?) { true }
expect(subject).to receive(:trap_signals)
subject.run!
end
context 'when the command-line defines full_scan: true' do
subject { described_class.new(full_scan: true) }
it 'executes a full-scan' do
expect(subject).to receive(:full_scan!)
subject.run!
end
end
end
end
require 'spec_helper'
describe Gitlab::Geo::LogCursor::Events, lib: true do
describe '.fetch_in_batches' do
let!(:event_log) { create(:geo_event_log) }
before do
allow(described_class).to receive(:last_processed) { -1 }
end
it 'yields a group of events' do
expect { |b| described_class.fetch_in_batches(&b) }.to yield_with_args([event_log])
end
it 'saves processed files after yielding' do
expect(described_class).to receive(:save_processed)
described_class.fetch_in_batches { |batch| batch }
end
it 'skips execution if cannot achieve a lease' do
expect_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain) { }
expect { |b| described_class.fetch_in_batches(&b) }.not_to yield_control
end
end
describe '.save_processed' do
it 'saves a new entry in geo_event_log_state' do
expect { described_class.save_processed(1) }.to change(Geo::EventLogState, :count).by(1)
expect(Geo::EventLogState.last.event_id).to eq(1)
end
it 'removes older entries from geo_event_log_state' do
create(:geo_event_log_state)
expect { described_class.save_processed(2) }.to change(Geo::EventLogState, :count).by(0)
expect(Geo::EventLogState.last.event_id).to eq(2)
end
end
describe '.last_processed' do
context 'when system has not generated any event yet' do
it 'returns -1' do
expect(described_class.last_processed).to eq(-1)
end
end
context 'when there are existing events already but no event_log_state' do
let!(:event_log) { create(:geo_event_log) }
it 'returns last event id' do
expect(described_class.last_processed).to eq(event_log.id)
end
end
context 'when there is already an event_log_state' do
let!(:event_log_state) { create(:geo_event_log_state) }
it 'returns last event from event_log_state' do
expect(described_class.last_processed).to eq(event_log_state.id)
end
end
end
end
require 'spec_helper'
RSpec.describe Geo::EventLogState, type: :model do
describe 'validations' do
it { is_expected.to validate_presence_of(:event_id) }
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment