Commit f10222a6 authored by Furkan Ayhan's avatar Furkan Ayhan

Apply review changes

parent 13036f0a
...@@ -12,13 +12,14 @@ module Ci ...@@ -12,13 +12,14 @@ module Ci
class << self class << self
def sync!(event) def sync!(event)
traversal_ids = event.namespace.self_and_ancestor_ids(hierarchy_order: :desc) namespace = event.namespace
traversal_ids = namespace.self_and_ancestor_ids(hierarchy_order: :desc)
upsert({ namespace_id: event.namespace_id, traversal_ids: traversal_ids }, upsert({ namespace_id: event.namespace_id, traversal_ids: traversal_ids },
unique_by: :namespace_id) unique_by: :namespace_id)
# TODO: after fully implemented `sync_traversal_ids` FF, we will not need this method. # It won't be necessary once we remove `sync_traversal_ids`.
# However, we also need to change the PG trigger to reflect `namespaces.traversal_ids` changes # More info: https://gitlab.com/gitlab-org/gitlab/-/issues/347541
sync_children_namespaces!(event.namespace_id, traversal_ids) sync_children_namespaces!(event.namespace_id, traversal_ids)
end end
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
module Ci module Ci
class ProcessSyncEventsService class ProcessSyncEventsService
include Gitlab::Utils::StrongMemoize include Gitlab::Utils::StrongMemoize
include ExclusiveLeaseGuard
BATCH_SIZE = 1000 BATCH_SIZE = 1000
...@@ -14,30 +15,44 @@ module Ci ...@@ -14,30 +15,44 @@ module Ci
def execute def execute
return unless ::Feature.enabled?(:ci_namespace_project_mirrors, default_enabled: :yaml) return unless ::Feature.enabled?(:ci_namespace_project_mirrors, default_enabled: :yaml)
process_events # preventing parallel processing over the same event table
try_obtain_lease { process_events }
enqueue_worker_if_there_still_event enqueue_worker_if_there_still_event
end end
private private
def process_events def process_events
events = @sync_event_class events = @sync_event_class.preload_synced_relation.first(BATCH_SIZE)
.preload_synced_relation
.order_by_id_asc
.limit(BATCH_SIZE)
.to_a
return if events.empty? return if events.empty?
min = events.first first = events.first
max = events.last last_processed = nil
begin
events.each do |event|
@sync_class.sync!(event)
events.each { |event| @sync_class.sync!(event) } last_processed = event
@sync_event_class.id_in(min.id..max.id).delete_all end
ensure
# remove events till the one that was last succesfully processed
@sync_event_class.id_in(first.id..last_processed.id).delete_all if last_processed
end
end end
def enqueue_worker_if_there_still_event def enqueue_worker_if_there_still_event
@sync_event_class.enqueue_worker if @sync_event_class.exists? @sync_event_class.enqueue_worker if @sync_event_class.exists?
end end
def lease_key
"#{super}::#{@sync_event_class}"
end
def lease_timeout
1.minute
end
end end
end end
# frozen_string_literal: true # frozen_string_literal: true
module Namespaces module Namespaces
# This worker can be called multiple times at the same time but only one of them can
# process events at a time. This is ensured by `try_obtain_lease` in `Ci::ProcessSyncEventsService`.
# `until_executing` here is to reduce redundant worker enqueuing.
class ProcessSyncEventsWorker class ProcessSyncEventsWorker
include ApplicationWorker include ApplicationWorker
...@@ -9,8 +12,8 @@ module Namespaces ...@@ -9,8 +12,8 @@ module Namespaces
feature_category :sharding feature_category :sharding
urgency :high urgency :high
deduplicate :until_executed
idempotent! idempotent!
deduplicate :until_executing
def perform def perform
::Ci::ProcessSyncEventsService.new(::Namespaces::SyncEvent, ::Ci::NamespaceMirror).execute ::Ci::ProcessSyncEventsService.new(::Namespaces::SyncEvent, ::Ci::NamespaceMirror).execute
......
# frozen_string_literal: true # frozen_string_literal: true
module Projects module Projects
# This worker can be called multiple times at the same time but only one of them can
# process events at a time. This is ensured by `try_obtain_lease` in `Ci::ProcessSyncEventsService`.
# `until_executing` here is to reduce redundant worker enqueuing.
class ProcessSyncEventsWorker class ProcessSyncEventsWorker
include ApplicationWorker include ApplicationWorker
...@@ -9,8 +12,8 @@ module Projects ...@@ -9,8 +12,8 @@ module Projects
feature_category :sharding feature_category :sharding
urgency :high urgency :high
deduplicate :until_executed
idempotent! idempotent!
deduplicate :until_executing
def perform def perform
::Ci::ProcessSyncEventsService.new(::Projects::SyncEvent, ::Ci::ProjectMirror).execute ::Ci::ProcessSyncEventsService.new(::Projects::SyncEvent, ::Ci::ProjectMirror).execute
......
...@@ -1540,8 +1540,10 @@ RSpec.describe User do ...@@ -1540,8 +1540,10 @@ RSpec.describe User do
allow(user).to receive(:update_highest_role) allow(user).to receive(:update_highest_role)
end end
# Namespace#schedule_sync_event_worker => Sidekiq calls `SecureRandom.hex(12)` to generate `jid` allow_next_instance_of(Namespaces::UserNamespace) do |namespace|
expect(SecureRandom).to receive(:hex).with(12).and_call_original allow(namespace).to receive(:schedule_sync_event_worker)
end
expect(SecureRandom).to receive(:hex).with(no_args).and_return('3b8ca303') expect(SecureRandom).to receive(:hex).with(no_args).and_return('3b8ca303')
user = create(:user) user = create(:user)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment