Handle repositories changed events in Geo::LogCursor

parent bea90eb7
...@@ -2,8 +2,6 @@ module Geo ...@@ -2,8 +2,6 @@ module Geo
class EventLog < ActiveRecord::Base class EventLog < ActiveRecord::Base
include Geo::Model include Geo::Model
delegate :project_id, to: :event, allow_nil: true
belongs_to :repository_updated_event, belongs_to :repository_updated_event,
class_name: 'Geo::RepositoryUpdatedEvent', class_name: 'Geo::RepositoryUpdatedEvent',
foreign_key: :repository_updated_event_id foreign_key: :repository_updated_event_id
...@@ -26,5 +24,11 @@ module Geo ...@@ -26,5 +24,11 @@ module Geo
repository_renamed_event || repository_renamed_event ||
repositories_changed_event repositories_changed_event
end end
def project_id
return nil unless event.respond_to?(:project_id)
event.project_id
end
end end
end end
module Geo
class RepositoriesCleanUpWorker
include Sidekiq::Worker
include GeoQueue
def perform(geo_node_id)
geo_node = GeoNode.find(geo_node_id)
rescue ActiveRecord::RecordNotFound => e
Gitlab::Geo::Logger.error(
class: self.class.name,
message: 'Could not find Geo node, skipping repositories clean up',
geo_node_id: geo_node_id,
error: e
)
end
end
end
...@@ -70,14 +70,15 @@ module Gitlab ...@@ -70,14 +70,15 @@ module Gitlab
end end
def handle_events(batch) def handle_events(batch)
batch.each do |event| batch.each do |event_log|
next unless can_replay?(event) next unless can_replay?(event_log)
# Update repository if event_log.repository_updated_event
if event.repository_updated_event handle_repository_update(event_log.repository_updated_event)
handle_repository_update(event.repository_updated_event) elsif event_log.repository_deleted_event
elsif event.repository_deleted_event handle_repository_delete(event_log.repository_deleted_event)
handle_repository_delete(event.repository_deleted_event) elsif event_log.repositories_changed_event
handle_repositories_changed(event_log.repositories_changed_event)
end end
end end
end end
...@@ -100,10 +101,15 @@ module Gitlab ...@@ -100,10 +101,15 @@ module Gitlab
@exit = true @exit = true
end end
def can_replay?(event) def exit?
@exit
end
def can_replay?(event_log)
return true if event_log.project_id.nil?
return true if Gitlab::Geo.current_node.project_ids.nil? return true if Gitlab::Geo.current_node.project_ids.nil?
Gitlab::Geo.current_node.project_ids.include?(event.project_id) Gitlab::Geo.current_node.project_ids.include?(event_log.project_id)
end end
def handle_repository_update(updated_event) def handle_repository_update(updated_event)
...@@ -147,8 +153,24 @@ module Gitlab ...@@ -147,8 +153,24 @@ module Gitlab
::Geo::ProjectRegistry.where(project_id: deleted_event.project_id).delete_all ::Geo::ProjectRegistry.where(project_id: deleted_event.project_id).delete_all
end end
def exit? def handle_repositories_changed(changed_event)
@exit return unless Gitlab::Geo.current_node.id == changed_event.geo_node_id
job_id = ::Geo::RepositoriesCleanUpWorker.perform_in(1.hour, changed_event.geo_node_id)
if job_id
log_info('Scheduled repositories clean up for Geo node', geo_node_id: changed_event.geo_node_id, job_id: job_id)
else
log_error('Could not schedule repositories clean up for Geo node', geo_node_id: changed_event.geo_node_id)
end
end
def log_info(message, params = {})
Gitlab::Geo::Logger.info({ class: self.class.name, message: message }.merge(params))
end
def log_error(message, params = {})
Gitlab::Geo::Logger.error({ class: self.class.name, message: message }.merge(params))
end end
end end
end end
......
...@@ -81,6 +81,33 @@ describe Gitlab::Geo::LogCursor::Daemon do ...@@ -81,6 +81,33 @@ describe Gitlab::Geo::LogCursor::Daemon do
end end
end end
context 'when replaying a repositories changed event' do
let(:geo_node) { create(:geo_node) }
let(:repositories_changed_event) { create(:geo_repositories_changed_event, geo_node: geo_node) }
let(:event_log) { create(:geo_event_log, repositories_changed_event: repositories_changed_event) }
let!(:event_log_state) { create(:geo_event_log_state, event_id: event_log.id - 1) }
before do
allow(subject).to receive(:exit?).and_return(false, true)
end
it 'schedules a GeoRepositoryDestroyWorker when event node is the current node' do
allow(Gitlab::Geo).to receive(:current_node).and_return(geo_node)
expect(Geo::RepositoriesCleanUpWorker).to receive(:perform_in).with(1.hour, geo_node.id)
subject.run!
end
it 'does not schedule a GeoRepositoryDestroyWorker when event node is not the current node' do
allow(Gitlab::Geo).to receive(:current_node).and_return(build(:geo_node))
expect(Geo::RepositoriesCleanUpWorker).not_to receive(:perform_in)
subject.run!
end
end
context 'when node have namespace restrictions' do context 'when node have namespace restrictions' do
let(:geo_node) { create(:geo_node, :current) } let(:geo_node) { create(:geo_node, :current) }
let(:group_1) { create(:group) } let(:group_1) { create(:group) }
......
...@@ -47,7 +47,14 @@ RSpec.describe Geo::EventLog, type: :model do ...@@ -47,7 +47,14 @@ RSpec.describe Geo::EventLog, type: :model do
expect(subject.project_id).to be_nil expect(subject.project_id).to be_nil
end end
it 'returns event#project_id when an event is present' do it 'returns nil when an event does not respond to project_id' do
repositories_changed_event = build(:geo_repositories_changed_event)
subject.repositories_changed_event = repositories_changed_event
expect(subject.project_id).to be_nil
end
it 'returns event#project_id when an event respond to project_id' do
repository_updated_event = build(:geo_repository_updated_event) repository_updated_event = build(:geo_repository_updated_event)
subject.repository_updated_event = repository_updated_event subject.repository_updated_event = repository_updated_event
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment