Commit 85c58de0 authored by Gabriel Mazetto's avatar Gabriel Mazetto

Fix how events are called and propagated

We know use `consume_event_#{event_name}` as the consume method pattern.

Some places were calling methods based on early version of our APIs.

Added extra coverage on missing parts of the framework.
parent 15548018
......@@ -20,7 +20,7 @@ module Geo
end
# Called by Gitlab::Geo::Replicator#consume
def consume_created_event
def consume_event_created(**params)
download
end
......@@ -50,7 +50,7 @@ module Geo
private
# Update checksum on main database
# Update checksum on Geo primary database
#
# @param [String] checksum value generated by the checksum routine
# @param [String] failure (optional) stacktrace from failed execution
......
......@@ -10,19 +10,19 @@ module Geo
def initialize(replicable_name, event_name, payload)
@replicable_name = replicable_name
@event_name = event_name
@payload = payload
@event_name = event_name.to_sym
@payload = payload.symbolize_keys
end
def execute
replicator.consume(event_name, payload)
replicator.consume(event_name, **payload)
end
private
def replicator
strong_memoize(:replicator) do
model_record_id = payload['model_record_id']
model_record_id = payload[:model_record_id]
replicator_class = ::Gitlab::Geo::Replicator.for_replicable_name(replicable_name)
replicator_class.new(model_record_id: model_record_id)
......
......@@ -4,6 +4,7 @@ module Geo
class EventWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
include GeoQueue
include ::Gitlab::Geo::LogHelpers
sidekiq_options retry: 3, dead: false
......
......@@ -33,9 +33,9 @@ module API
authorize_geo_transfer!(replicable_name: params[:replicable_name], id: params[:id])
decoded_params = jwt_decoder.decode
service = Geo::BlobUploadService.new(replicable_name: params[:replicable_name],
blob_id: params[:id],
decoded_params: decoded_params)
service = ::Geo::BlobUploadService.new(replicable_name: params[:replicable_name],
blob_id: params[:id],
decoded_params: decoded_params)
service.execute
end
......
......@@ -64,7 +64,7 @@ module Gitlab
def request_headers
request_data = {
replicable_name: replicable_name,
model_record_id: model_record.id
id: model_record.id
}
TransferRequest.new(request_data).headers
......
......@@ -2,7 +2,7 @@
module Gitlab
module Geo
# Geo Replicators are objects that knows to to replicator a replicable resource
# Geo Replicators are objects that know how to replicate a replicable resource
#
# A replicator is responsible for:
# - firing events (producer)
......@@ -45,7 +45,7 @@ module Gitlab
@events.include?(event_name.to_sym)
end
# Return the name of the replicator
# Return the name of the replicable, e.g. "package_file"
#
# This can be used to retrieve the replicator class again
# by using the `.for_replicable_name` method
......@@ -132,19 +132,19 @@ module Gitlab
# This method is called by the GeoLogCursor when reading the event from the queue
#
# @param [Symbol] event_name
# @param [Hash] params contextual data published with the event
def consume(event_name, **params)
# @param [Hash] event_data contextual data published with the event
def consume(event_name, **event_data)
raise ArgumentError, "Unsupported event: '#{event_name}'" unless self.class.event_supported?(event_name)
consume_method = "consume_#{event_name}".to_sym
raise NotImplementedError, "Consume method not implemented: '#{consume_method}'" unless instance_method_defined?(consume_method)
consume_method = "consume_event_#{event_name}".to_sym
raise NotImplementedError, "Consume method not implemented: '#{consume_method}'" unless self.methods.include?(consume_method)
# Inject model_record based on included class
if model_record
params[:model_record] = model_record
event_data[:model_record] = model_record
end
send(consume_method, **params) # rubocop:disable GitlabSecurity/PublicSend
send(consume_method, **event_data) # rubocop:disable GitlabSecurity/PublicSend
end
# Return the name of the replicator
......@@ -199,16 +199,6 @@ module Gitlab
rescue ActiveRecord::RecordInvalid, NoMethodError => e
log_error("#{class_name} could not be created", e, params)
end
private
# Checks if method is implemented by current class (ignoring inherited methods)
#
# @param [Symbol] method_name
# @return [Boolean] whether method is implemented
def instance_method_defined?(method_name)
self.class.instance_methods(false).include?(method_name)
end
end
end
end
......@@ -25,8 +25,8 @@ describe Gitlab::Geo::LogCursor::Events::Event, :clean_gitlab_redis_shared_state
it "eventually calls Replicator#consume", :sidekiq_inline do
expect_next_instance_of(::Geo::PackageFileReplicator) do |replicator|
expect(replicator).to receive(:consume).with(
"created",
{ "model_record_id" => replicable.id }
:created,
{ model_record_id: replicable.id }
)
end
......
......@@ -30,7 +30,7 @@ describe Gitlab::Geo::Replicator do
protected
def publish_test(other:)
def consume_event_test(user:, other:)
true
end
end
......@@ -118,5 +118,19 @@ describe Gitlab::Geo::Replicator do
end
end
end
describe '#consume' do
subject { DummyReplicator.new }
it 'accepts valid attributes' do
expect { subject.consume(:test, user: 'something', other: 'something else') }.not_to raise_error
end
it 'calls corresponding method with specified named attributes' do
expect(subject).to receive(:consume_event_test).with(user: 'something', other: 'something else')
subject.consume(:test, user: 'something', other: 'something else')
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Geo::EventService do
include ::EE::GeoHelpers
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
let(:model_record) { create(:package_file, :npm) }
subject { described_class.new('package_file', 'created', { 'model_record_id' => model_record.id }) }
describe '#execute' do
before do
resource_url = primary.geo_retrieve_url('package_file', model_record.id.to_s)
content = model_record.file.open
File.unlink(model_record.file.path)
stub_request(:get, resource_url).to_return(status: 200, body: content)
stub_current_geo_node(secondary)
end
it 'executes the consume part of the replication' do
subject.execute
expect(model_record.file_exist?).to be_truthy
end
end
end
......@@ -76,7 +76,7 @@ RSpec.shared_examples 'a blob replicator' do
expect(service).to receive(:execute)
expect(::Geo::BlobDownloadService).to receive(:new).with(replicator: replicator).and_return(service)
replicator.consume_created_event
replicator.consume_event_created
end
end
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment