Commit 71ff39c4 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'tc-geo-prune-events-correctly' into 'master'

Prune all the Geo event log tables correctly

Closes #6422

See merge request gitlab-org/gitlab-ee!6175
parents 62314647 37b1c7d3
......@@ -175,7 +175,6 @@
- geo:geo_repository_verification_primary_shard
- geo:geo_repository_verification_primary_single
- geo:geo_repository_verification_secondary_single
- geo:geo_truncate_event_log
- admin_emails
- create_github_webhook
......
......@@ -323,7 +323,7 @@ production: &base
# GitLab Geo prune event log worker
# NOTE: This will only take effect if Geo is enabled (primary node only)
geo_prune_event_log_worker:
cron: "0 */2 * * *"
cron: "*/5 * * * *"
# GitLab Geo repository sync worker
# NOTE: This will only take effect if Geo is enabled (secondary nodes only)
......
......@@ -317,7 +317,7 @@ Settings.cron_jobs['geo_file_download_dispatch_worker'] ||= Settingslogic.new({}
Settings.cron_jobs['geo_file_download_dispatch_worker']['cron'] ||= '*/1 * * * *'
Settings.cron_jobs['geo_file_download_dispatch_worker']['job_class'] ||= 'Geo::FileDownloadDispatchWorker'
Settings.cron_jobs['geo_prune_event_log_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_prune_event_log_worker']['cron'] ||= '0 */2 * * *'
Settings.cron_jobs['geo_prune_event_log_worker']['cron'] ||= '*/5 * * * *'
Settings.cron_jobs['geo_prune_event_log_worker']['job_class'] ||= 'Geo::PruneEventLogWorker'
Settings.cron_jobs['geo_repository_verification_primary_batch_worker'] ||= Settingslogic.new({})
Settings.cron_jobs['geo_repository_verification_primary_batch_worker']['cron'] ||= '*/1 * * * *'
......
......@@ -96,6 +96,7 @@ description: 'Learn how to contribute to GitLab.'
- [Verifying database capabilities](verifying_database_capabilities.md)
- [Database Debugging and Troubleshooting](database_debugging.md)
- [Query Count Limits](query_count_limits.md)
- [Database helper modules](database_helpers.md)
## Testing guides
......
# Database helpers
There are a number of useful helper modules defined in `/lib/gitlab/database/`.
## Subquery
In some cases it is not possible to perform an operation on a query.
For example:
```ruby
Geo::EventLog.where('id < 100').limit(10).delete_all
```
Will give this error:
> ActiveRecord::ActiveRecordError: delete_all doesn't support limit
One solution would be to wrap it in another `where`:
```ruby
Geo::EventLog.where(id: Geo::EventLog.where('id < 100').limit(10)).delete_all
```
This works with PostgreSQL, but with MySQL it gives this error:
> ActiveRecord::StatementInvalid: Mysql2::Error: This version of MySQL
> doesn't yet support 'LIMIT & IN/ALL/ANY/SOME subquery'
Also, that query doesn't have very good performance. Using a
`INNER JOIN` with itself is better.
So instead of this query:
```sql
SELECT geo_event_log.*
FROM geo_event_log
WHERE geo_event_log.id IN
(SELECT geo_event_log.id
FROM geo_event_log
WHERE (id < 100)
LIMIT 10)
```
It's better to write:
```sql
SELECT geo_event_log.*
FROM geo_event_log
INNER JOIN
(SELECT geo_event_log.*
FROM geo_event_log
WHERE (id < 100)
LIMIT 10) t2 ON geo_event_log.id = t2.id
```
And this is where `Gitlab::Database::Subquery.self_join` can help
you. So you can rewrite the above statement as:
```ruby
Gitlab::Database::Subquery.self_join(Geo::EventLog.where('id < 100').limit(10)).delete_all
```
And this also works with MySQL, so you don't need to worry about that.
# frozen_string_literal: true
module Geo
module Eventable
extend ActiveSupport::Concern
include ::EachBatch
included do
has_one :geo_event_log, class_name: 'Geo::EventLog'
end
class_methods do
def up_to_event(geo_event_log_id)
joins(:geo_event_log)
.where(Geo::EventLog.arel_table[:id].lteq(geo_event_log_id))
end
def delete_with_limit(limit)
::Gitlab::Database::Subquery.self_join(limit(limit)).delete_all
end
end
end
end
......@@ -3,6 +3,17 @@ module Geo
include Geo::Model
include ::EachBatch
EVENT_CLASSES = %w[Geo::RepositoryCreatedEvent
Geo::RepositoryUpdatedEvent
Geo::RepositoryDeletedEvent
Geo::RepositoryRenamedEvent
Geo::RepositoriesChangedEvent
Geo::HashedStorageMigratedEvent
Geo::HashedStorageAttachmentsEvent
Geo::LfsObjectDeletedEvent
Geo::JobArtifactDeletedEvent
Geo::UploadDeletedEvent].freeze
belongs_to :repository_created_event,
class_name: 'Geo::RepositoryCreatedEvent',
foreign_key: :repository_created_event_id
......@@ -47,6 +58,10 @@ module Geo
order(id: :desc).first
end
def self.event_classes
EVENT_CLASSES.map(&:constantize)
end
def self.includes_events
includes(reflections.keys)
end
......
module Geo
class HashedStorageAttachmentsEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
belongs_to :project
......
module Geo
class HashedStorageMigratedEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
belongs_to :project
......
module Geo
class JobArtifactDeletedEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
belongs_to :job_artifact, class_name: 'Ci::JobArtifact'
......
module Geo
class LfsObjectDeletedEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
belongs_to :lfs_object
......
module Geo
class RepositoriesChangedEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
belongs_to :geo_node
......
module Geo
class RepositoryCreatedEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
belongs_to :project
......
module Geo
class RepositoryDeletedEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
belongs_to :project
......
module Geo
class RepositoryRenamedEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
belongs_to :project
......
module Geo
class RepositoryUpdatedEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
REPOSITORY = 0
WIKI = 1
......
module Geo
class UploadDeletedEvent < ActiveRecord::Base
include Geo::Model
include Geo::Eventable
belongs_to :upload
......
......@@ -44,6 +44,7 @@ class GeoNode < ActiveRecord::Base
alias_method :repair, :save # the `update_dependents_attributes` hook will take care of it
scope :with_url_prefix, ->(prefix) { where('url LIKE ?', "#{prefix}%") }
scope :secondary_nodes, -> { where(primary: false) }
attr_encrypted :secret_access_key,
key: Settings.attr_encrypted_db_key_base_truncated,
......@@ -68,6 +69,34 @@ class GeoNode < ActiveRecord::Base
GeoNode.find_by(url: current_node_url)
end
def primary_node
find_by(primary: true)
end
def unhealthy_nodes
status_table = GeoNodeStatus.arel_table
query = status_table[:id].eq(nil)
.or(status_table[:cursor_last_event_id].eq(nil))
.or(status_table[:last_successful_status_check_at].eq(nil))
.or(status_table[:last_successful_status_check_at].lt(10.minutes.ago))
left_join_status.where(query)
end
def min_cursor_last_event_id
left_join_status.minimum(:cursor_last_event_id)
end
private
def left_join_status
join_statement = arel_table.join(GeoNodeStatus.arel_table, Arel::Nodes::OuterJoin)
.on(arel_table[:id].eq(GeoNodeStatus.arel_table[:geo_node_id]))
joins(join_statement.join_sources)
end
end
def current?
......
# frozen_string_literal: true
module Geo
class PruneEventLogService
include ::Gitlab::Geo::LogHelpers
include ::ExclusiveLeaseGuard
TOTAL_LIMIT = 50_000
LEASE_TIMEOUT = 4.minutes
attr_reader :event_log_min_id
def initialize(event_log_min_id)
@event_log_min_id = event_log_min_id
end
def execute
return if Gitlab::Database.read_only?
try_obtain_lease do
log_info('Prune Geo Event Log entries up to id', geo_event_log_id: event_log_min_id)
prunable_relations.reduce(TOTAL_LIMIT) do |limit, relation|
break if limit <= 0
break unless renew_lease!
limit - prune!(relation, limit).to_i
end
end
end
private
def lease_timeout
LEASE_TIMEOUT
end
def prunable_relations
Geo::EventLog.event_classes
end
def prune!(relation, limit)
unless delete_all?
relation = relation.up_to_event(event_log_min_id)
end
deleted = relation.delete_with_limit(limit)
if deleted.positive?
log_info('Rows pruned from Geo Event log',
relation: relation.name,
rows_deleted: deleted,
limit: limit)
end
deleted
end
def delete_all?
event_log_min_id == :all
end
end
end
# frozen_string_literal: true
module Geo
class PruneEventLogWorker
include ApplicationWorker
include CronjobQueue
include ExclusiveLeaseGuard
include ::Gitlab::Utils::StrongMemoize
include ::Gitlab::Geo::LogHelpers
LEASE_TIMEOUT = 60.minutes
TRUNCATE_DELAY = 10.minutes
LEASE_TIMEOUT = 5.minutes
# rubocop: disable CodeReuse/ActiveRecord
def perform
return if Gitlab::Database.read_only?
return unless Gitlab::Database.healthy?
try_obtain_lease do
if Gitlab::Geo.secondary_nodes.empty?
log_info('No secondary nodes configured, scheduling truncation of the Geo Event Log')
::Geo::TruncateEventLogWorker.perform_in(TRUNCATE_DELAY)
break
unless ::GeoNode.any?
Geo::PruneEventLogService.new(:all).execute
return
end
cursor_last_event_ids = Gitlab::Geo.secondary_nodes.map do |node|
node.status&.cursor_last_event_id
unless prune?
log_info('Some nodes are not healthy, prune geo event log skipped', unhealthy_node_count: unhealthy_nodes.count)
return
end
if cursor_last_event_ids.include?(nil)
log_info('Could not get status of all nodes, not deleting any entries from Geo Event Log', unhealthy_node_count: cursor_last_event_ids.count(nil))
break
Geo::PruneEventLogService.new(min_cursor_last_event_id).execute
end
log_info('Delete Geo Event Log entries up to id', geo_event_log_id: cursor_last_event_ids.min)
Geo::EventLog.where('id <= ?', cursor_last_event_ids.min)
.each_batch { |batch| batch.delete_all }
def prune?
unhealthy_nodes.empty?
end
def min_cursor_last_event_id
::GeoNode.secondary_nodes.min_cursor_last_event_id
end
# rubocop: enable CodeReuse/ActiveRecord
def lease_timeout
LEASE_TIMEOUT
def unhealthy_nodes
::GeoNode.secondary_nodes.unhealthy_nodes
end
end
end
module Geo
class TruncateEventLogWorker
include ApplicationWorker
include GeoQueue
include ::Gitlab::Geo::LogHelpers
def perform
if Gitlab::Geo.secondary_nodes.any?
log_info('Some secondary nodes configured, Geo Event Log should not be truncated', geo_node_count: Gitlab::Geo.secondary_nodes.count)
else
log_info('Still no secondary nodes configured, truncating the Geo Event Log')
ActiveRecord::Base.connection.truncate(Geo::EventLog.table_name)
end
end
end
end
---
title: Prune all the Geo event log tables correctly
merge_request: 6175
author:
type: fixed
# frozen_string_literal: true
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class SchedulePruneOrphanedGeoEvents < ActiveRecord::Migration
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
return unless Gitlab::Database.postgresql?
BackgroundMigrationWorker.perform_async('PruneOrphanedGeoEvents')
end
def down
# NOOP
end
end
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
class PruneOrphanedGeoEvents
BATCH_SIZE = 50_000
RESCHEDULE_DELAY = 5.minutes
EVENT_TABLES = %w[geo_repository_created_events
geo_repository_updated_events
geo_repository_deleted_events
geo_repository_renamed_events
geo_repositories_changed_events
geo_hashed_storage_migrated_events
geo_hashed_storage_attachments_events
geo_lfs_object_deleted_events
geo_job_artifact_deleted_events
geo_upload_deleted_events].freeze
module PrunableEvent
extend ActiveSupport::Concern
include EachBatch
included do
scope :orphans, -> do
where(<<-SQL.squish)
NOT EXISTS (
SELECT 1
FROM geo_event_log
WHERE geo_event_log.#{geo_event_foreign_key} = #{table_name}.id
)
SQL
end
end
class_methods do
def geo_event_foreign_key
table_name.singularize.sub(/^geo_/, '') + '_id'
end
def delete_batch_of_orphans!
deleted = where(id: orphans.limit(BATCH_SIZE)).delete_all
vacuum! if deleted.positive?
deleted
end
def vacuum!
connection.execute("VACUUM #{table_name}")
rescue ActiveRecord::StatementInvalid => e
# ignore timeout, auto-vacuum will take care of it
raise unless e.message =~ /statement timeout/i
end
end
end
def perform(table_name = EVENT_TABLES.first)
deleted = prune_orphaned_rows(table_name)
table_name = next_table(table_name) if deleted.zero?
BackgroundMigrationWorker.perform_in(RESCHEDULE_DELAY, self.class.name, table_name) if table_name
end
def prune_orphaned_rows(table)
event_model(table).delete_batch_of_orphans!
end
def event_model(table)
Class.new(ActiveRecord::Base) do
include PrunableEvent
self.table_name = table
end
end
def next_table(table_name)
return nil if EVENT_TABLES.last == table_name
index = EVENT_TABLES.index(table_name)
return nil unless index
EVENT_TABLES[index + 1]
end
end
end
end
......@@ -18,17 +18,13 @@ module Gitlab
self.cache_value(:geo_node_current) { GeoNode.current_node }
end
# rubocop: disable CodeReuse/ActiveRecord
def self.primary_node
self.cache_value(:geo_primary_node) { GeoNode.find_by(primary: true) }
self.cache_value(:geo_primary_node) { GeoNode.primary_node }
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def self.secondary_nodes
self.cache_value(:geo_secondary_nodes) { GeoNode.where(primary: false) }
self.cache_value(:geo_secondary_nodes) { GeoNode.secondary_nodes }
end
# rubocop: enable CodeReuse/ActiveRecord
def self.connected?
Gitlab::Database.postgresql? && GeoNode.connected? && GeoNode.table_exists?
......
......@@ -40,7 +40,7 @@ FactoryBot.define do
last_event_timestamp { Time.now.to_i }
cursor_last_event_id 1
cursor_last_event_timestamp { Time.now.to_i }
last_successful_status_check_timestamp { Time.now.beginning_of_day }
last_successful_status_check_timestamp { 2.minutes.ago }
version { Gitlab::VERSION }
revision { Gitlab.revision }
end
......
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::BackgroundMigration::PruneOrphanedGeoEvents, :migration, :postgresql, schema: 20180626125654 do
let(:event_table_name) { 'geo_repository_updated_events' }
let(:geo_event_log) { table(:geo_event_log) }
let(:geo_updated_events) { table(event_table_name) }
let(:namespace) { table(:namespaces).create(name: 'foo', path: 'foo') }
let(:project) { table(:projects).create(name: 'bar', path: 'path/to/bar', namespace_id: namespace.id) }
subject(:background_migration) { described_class.new }
describe 'PrunableEvent' do
subject(:prunable_event) do
Class.new(ActiveRecord::Base) do
include Gitlab::BackgroundMigration::PruneOrphanedGeoEvents::PrunableEvent
self.table_name = 'geo_repository_updated_events'
end
end
describe '.geo_event_foreign_key' do
it 'determines foreign key correctly' do
expect(subject.geo_event_foreign_key).to eq('repository_updated_event_id')
end
end
describe '.delete_batch_of_orphans!' do
it 'vacuums table after deleting rows' do
geo_updated_events.create!(project_id: project.id,
source: 0,
branches_affected: 0,
tags_affected: 0)
expect(subject).to receive(:vacuum!)
subject.delete_batch_of_orphans!
end
end
end
describe '#perform' do
before do
geo_updated_events.create!(project_id: project.id,
source: 0,
branches_affected: 0,
tags_affected: 0)
end
it 'takes the first table if no table is specified' do
expect(subject).to receive(:prune_orphaned_rows).with(described_class::EVENT_TABLES.first).and_call_original
subject.perform
end
it 'deletes orphans' do
expect { background_migration.perform(event_table_name) }.to change { Geo::RepositoryUpdatedEvent.count }.by(-1)
end
it 'reschedules itself with the same table if positive number of rows were pruned' do
allow(subject).to receive(:prune_orphaned_rows).and_return(5)
expect(BackgroundMigrationWorker).to receive(:perform_in).with(5.minutes, described_class.name, event_table_name)
subject.perform(event_table_name)
end
it 'reschedules itself with the next table if zero rows were pruned' do
allow(subject).to receive(:prune_orphaned_rows).and_return(0)
expect(BackgroundMigrationWorker).to receive(:perform_in).with(5.minutes, described_class.name, 'geo_repository_deleted_events')
subject.perform(event_table_name)
end
end
describe '#prune_orphaned_rows' do
it 'returns the number of pruned rows' do
event_model = spy(:event_model)
allow(event_model).to receive(:delete_batch_of_orphans!).and_return(555)
allow(subject).to receive(:event_model).and_return(event_model)
expect(subject.prune_orphaned_rows(event_table_name)).to eq(555)
end
end
describe '#next_table' do
it 'takes the next table in the array' do
expect(subject.next_table(described_class::EVENT_TABLES.first)).to eq(described_class::EVENT_TABLES.second)
end
it 'stops with the last table' do
expect(subject.next_table(described_class::EVENT_TABLES.last)).to be_nil
end
it 'cycles for EVENT_TABLES.count' do
table_name = 'geo_repository_created_events'
count = 0
loop do
count += 1
table_name = subject.next_table(table_name)
break unless table_name
end
expect(count).to eq(described_class::EVENT_TABLES.count)
end
end
end
# frozen_string_literal: true
require 'spec_helper'
require Rails.root.join('ee', 'db', 'post_migrate', '20180618193715_schedule_prune_orphaned_geo_events.rb')
describe SchedulePruneOrphanedGeoEvents, :migration do
describe '#up' do
it 'delegates work to Gitlab::BackgroundMigration::PruneOrphanedGeoEvents', :postgresql do
expect(BackgroundMigrationWorker).to receive(:perform_async).with('PruneOrphanedGeoEvents')
migrate!
end
end
end
require 'spec_helper'
RSpec.describe Geo::Eventable do
describe '.up_to_event' do
it 'finds only events up to the given geo event log id' do
events = create_list(:geo_event_log, 4, :updated_event)
expect(Geo::RepositoryUpdatedEvent.up_to_event(events.second.id)).to match_array(events.first(2).map(&:event))
end
end
describe '.delete_with_limit' do
it 'deletes a limited amount of rows' do
create_list(:geo_event_log, 4, :updated_event)
expect do
Geo::RepositoryUpdatedEvent.delete_with_limit(2)
end.to change { Geo::RepositoryUpdatedEvent.count }.by(-2)
end
end
end
......@@ -13,6 +13,14 @@ RSpec.describe Geo::EventLog, type: :model do
it { is_expected.to belong_to(:job_artifact_deleted_event).class_name('Geo::JobArtifactDeletedEvent').with_foreign_key('job_artifact_deleted_event_id') }
end
describe '.event_classes' do
it 'returns all event class reflections' do
reflections = described_class.reflections.map { |_k, v| v.class_name.constantize }
expect(described_class.event_classes).to contain_exactly(*reflections)
end
end
describe '#event' do
it 'returns nil when having no event associated' do
expect(subject.event).to be_nil
......
......@@ -85,6 +85,79 @@ describe GeoNode, type: :model do
end
end
describe '.primary_node' do
before do
create(:geo_node)
end
it 'returns the primary' do
primary = create(:geo_node, :primary)
expect(described_class.primary_node).to eq(primary)
end
it 'returns nil if there is no primary' do
expect(described_class.primary_node).to be_nil
end
end
describe '.secondary_nodes' do
before do
create(:geo_node, :primary)
end
it 'returns all secondary nodes' do
secondaries = create_list(:geo_node, 2)
expect(described_class.secondary_nodes).to match_array(secondaries)
end
it 'returns empty array if there are not any secondary nodes' do
expect(described_class.secondary_nodes).to be_empty
end
end
describe '.unhealthy_nodes' do
before do
create(:geo_node_status, :healthy)
end
subject(:unhealthy_nodes) { described_class.unhealthy_nodes }
it 'returns a node without status' do
geo_node = create(:geo_node)
expect(unhealthy_nodes).to contain_exactly(geo_node)
end
it 'returns a node not having a cursor last event id' do
geo_node_status = create(:geo_node_status, :healthy, cursor_last_event_id: nil)
expect(unhealthy_nodes).to contain_exactly(geo_node_status.geo_node)
end
it 'returns a node with missing status check timestamp' do
geo_node_status = create(:geo_node_status, :healthy, last_successful_status_check_at: nil)
expect(unhealthy_nodes).to contain_exactly(geo_node_status.geo_node)
end
it 'returns a node with an old status check timestamp' do
geo_node_status = create(:geo_node_status, :healthy, last_successful_status_check_at: 16.minutes.ago)
expect(unhealthy_nodes).to contain_exactly(geo_node_status.geo_node)
end
end
describe '.min_cursor_last_event_id' do
it 'returns the minimum of cursor_last_event_id across all nodes' do
create(:geo_node_status, cursor_last_event_id: 10)
create(:geo_node_status, cursor_last_event_id: 8)
expect(described_class.min_cursor_last_event_id).to eq(8)
end
end
describe '#repair' do
it 'creates an oauth application for a Geo secondary node' do
stub_current_geo_node(node)
......
# frozen_string_literal: true
require 'spec_helper'
describe Geo::PruneEventLogService do
include ExclusiveLeaseHelpers
let(:min_id) { :all }
let!(:events) { create_list(:geo_event_log, 5, :updated_event) }
let(:lease_key) { 'geo/prune_event_log_service' }
let(:lease_timeout) { described_class::LEASE_TIMEOUT }
subject(:service) { described_class.new(min_id) }
before do
stub_exclusive_lease(lease_key, timeout: lease_timeout, renew: true)
end
it 'logs error when it cannot obtain lease' do
stub_exclusive_lease_taken(lease_key, timeout: lease_timeout)
expect(service).to receive(:log_error).with(/^Cannot obtain an exclusive lease/)
service.execute
end
it 'aborts when it cannot renew lease' do
stub_exclusive_lease(lease_key, timeout: lease_timeout, renew: false)
expect(service).not_to receive(:prune!)
end
it 'prunes all event tables' do
expect(service).to receive(:prune!).with(Geo::RepositoryCreatedEvent, anything)
expect(service).to receive(:prune!).with(Geo::RepositoryUpdatedEvent, anything)
expect(service).to receive(:prune!).with(Geo::RepositoryDeletedEvent, anything)
expect(service).to receive(:prune!).with(Geo::RepositoryRenamedEvent, anything)
expect(service).to receive(:prune!).with(Geo::RepositoriesChangedEvent, anything)
expect(service).to receive(:prune!).with(Geo::HashedStorageMigratedEvent, anything)
expect(service).to receive(:prune!).with(Geo::HashedStorageAttachmentsEvent, anything)
expect(service).to receive(:prune!).with(Geo::LfsObjectDeletedEvent, anything)
expect(service).to receive(:prune!).with(Geo::JobArtifactDeletedEvent, anything)
expect(service).to receive(:prune!).with(Geo::UploadDeletedEvent, anything)
service.execute
end
it 'prunes max 50k records' do
expect(service).to receive(:prune!).and_return(20_000).ordered
expect(service).to receive(:prune!).with(anything, 30_000).and_return(20_000).ordered
expect(service).to receive(:prune!).with(anything, 10_000).and_return(9_000).ordered
expect(service).to receive(:prune!).with(anything, 1_000).and_return(1_000).ordered
expect(service).not_to receive(:prune!).ordered
service.execute
end
context 'event_log_min_id = :all' do
it 'prunes all events' do
expect { service.execute }.to change { Geo::EventLog.count }.by(-5)
end
it 'prunes all associated events' do
expect { service.execute }.to change { Geo::RepositoryUpdatedEvent.count }.by(-5)
end
end
context 'with event_log_min_id' do
let(:min_id) { events[1].id }
it 'prunes events up to the min id' do
expect { service.execute }.to change { Geo::EventLog.count }.by(-2)
end
it 'prunes all associated events' do
expect { service.execute }.to change { Geo::RepositoryUpdatedEvent.count }.by(-2)
end
end
describe '#prune!' do
it 'returns the number of rows pruned' do
expect(service.send(:prune!, Geo::RepositoryUpdatedEvent, 50_000)).to eq(5)
end
end
end
......@@ -2,20 +2,12 @@ require 'spec_helper'
describe Geo::PruneEventLogWorker, :geo do
include ::EE::GeoHelpers
include ExclusiveLeaseHelpers
subject(:worker) { described_class.new }
set(:primary) { create(:geo_node, :primary) }
set(:secondary) { create(:geo_node) }
let(:lease_key) { 'geo/prune_event_log_worker' }
let(:lease_timeout) { Geo::PruneEventLogWorker::LEASE_TIMEOUT }
before do
stub_exclusive_lease(lease_key, timeout: lease_timeout)
end
describe '#perform' do
context 'current node secondary' do
before do
......@@ -23,7 +15,7 @@ describe Geo::PruneEventLogWorker, :geo do
end
it 'does nothing' do
expect(worker).not_to receive(:try_obtain_lease)
expect(Geo::PruneEventLogService).not_to receive(:new)
worker.perform
end
......@@ -34,23 +26,49 @@ describe Geo::PruneEventLogWorker, :geo do
stub_current_geo_node(primary)
end
it 'logs error when it cannot obtain lease' do
stub_exclusive_lease_taken(lease_key, timeout: lease_timeout)
it 'does nothing when database is not feeling healthy' do
allow(Gitlab::Database).to receive(:healthy?).and_return(false)
expect(Geo::PruneEventLogService).not_to receive(:new)
worker.perform
end
it 'does checks if it should prune' do
expect(worker).to receive(:prune?)
worker.perform
end
it 'deletes also associated event table rows' do
create_list(:geo_event_log, 2, :updated_event)
create(:geo_node_status, :healthy, cursor_last_event_id: Geo::EventLog.last.id, geo_node_id: secondary.id)
expect { worker.perform }.to change { Geo::RepositoryUpdatedEvent.count }.by(-2)
end
it 'delegates pruning to Geo::PruneEventLogService' do
create(:geo_event_log, :updated_event)
create(:geo_node_status, :healthy, cursor_last_event_id: Geo::EventLog.last.id, geo_node_id: secondary.id)
prune_service = spy(:prune_service)
expect(worker).to receive(:log_error).with(/^Cannot obtain an exclusive lease/)
expect(Geo::PruneEventLogService).to receive(:new).with(Geo::EventLog.last.id).and_return(prune_service)
expect(prune_service).to receive(:execute)
worker.perform
end
context 'no secondary nodes' do
context 'no Geo nodes' do
before do
secondary.destroy
primary.destroy
end
it 'deletes everything from the Geo event log' do
create_list(:geo_event_log, 2)
expect(Geo::TruncateEventLogWorker).to receive(:perform_in).with(described_class::TRUNCATE_DELAY)
expect(Geo::PruneEventLogService).to receive(:new).with(:all).and_call_original
worker.perform
end
......@@ -58,36 +76,47 @@ describe Geo::PruneEventLogWorker, :geo do
context 'multiple secondary nodes' do
set(:secondary2) { create(:geo_node) }
let(:healthy_status) { build(:geo_node_status, :healthy) }
let(:unhealthy_status) { build(:geo_node_status, :unhealthy) }
let!(:events) { create_list(:geo_event_log, 5, :updated_event) }
it 'contacts all secondary nodes for their status' do
status = spy(:status)
it 'aborts when there is a node without status' do
create(:geo_node_status, :healthy, cursor_last_event_id: events.last.id, geo_node_id: secondary.id)
allow_any_instance_of(GeoNode).to receive(:status).and_return(status)
expect(worker).to receive(:log_info).with(/^Some nodes are not healthy/, unhealthy_node_count: 1)
expect(status).to receive(:cursor_last_event_id).twice.and_return(0)
expect { worker.perform }.not_to change { Geo::EventLog.count }
end
worker.perform
it 'aborts when there is an unhealthy node' do
create(:geo_node_status, :healthy, cursor_last_event_id: events.last.id, geo_node_id: secondary.id)
create(:geo_node_status, :unhealthy, geo_node_id: secondary2.id)
expect(worker).to receive(:log_info).with(/^Some nodes are not healthy/, unhealthy_node_count: 1)
expect { worker.perform }.not_to change { Geo::EventLog.count }
end
it 'aborts when there are unhealthy nodes' do
events = create_list(:geo_event_log, 2)
it 'aborts when there is a node with an old status' do
create(:geo_node_status, :healthy, cursor_last_event_id: events.last.id, geo_node_id: secondary.id)
create(:geo_node_status, :healthy, geo_node_id: secondary2.id, last_successful_status_check_at: 12.minutes.ago)
expect(worker).to receive(:log_info).with(/^Some nodes are not healthy/, unhealthy_node_count: 1)
expect { worker.perform }.not_to change { Geo::EventLog.count }
end
it 'aborts when there is a node with a healthy status without timestamp' do
create(:geo_node_status, :healthy, cursor_last_event_id: events.last.id, geo_node_id: secondary.id)
create(:geo_node_status, :unhealthy, geo_node_id: secondary2.id)
create(:geo_node_status, :healthy, geo_node_id: secondary2.id, last_successful_status_check_at: nil)
expect(worker).to receive(:log_info).with(/^Could not get status of all nodes/, unhealthy_node_count: 1)
expect(worker).to receive(:log_info).with(/^Some nodes are not healthy/, unhealthy_node_count: 1)
expect { worker.perform }.not_to change { Geo::EventLog.count }
end
it 'takes the integer-minimum value of all cursor_last_event_ids' do
events = create_list(:geo_event_log, 5)
create(:geo_node_status, :healthy, cursor_last_event_id: events[3].id, geo_node_id: secondary.id)
create(:geo_node_status, :healthy, cursor_last_event_id: events.last.id, geo_node_id: secondary2.id)
expect(worker).to receive(:log_info).with(/^Delete Geo Event Log/, geo_event_log_id: events[3].id)
expect(Geo::PruneEventLogService).to receive(:new).with(events[3].id).and_call_original
expect { worker.perform }.to change { Geo::EventLog.count }.by(-4)
end
......
require 'spec_helper'
describe Geo::TruncateEventLogWorker, :geo do
include ::EE::GeoHelpers
subject(:worker) { described_class.new }
set(:primary) { create(:geo_node, :primary) }
describe '#perform' do
context 'current node primary' do
before do
stub_current_geo_node(primary)
end
it 'deletes everything from the Geo event log' do
create_list(:geo_event_log, 2)
expect(ActiveRecord::Base.connection).to receive(:truncate).with('geo_event_log').and_call_original
expect { worker.perform }.to change { Geo::EventLog.count }.by(-2)
end
it 'deletes nothing when a secondary node exists' do
create(:geo_node)
create_list(:geo_event_log, 2)
expect { worker.perform }.not_to change { Geo::EventLog.count }
end
end
end
end
......@@ -99,6 +99,12 @@ module Gitlab
Gitlab::Database.postgresql_9_or_less? ? 'pg_last_xlog_replay_location' : 'pg_last_wal_replay_lsn'
end
def self.healthy?
return true unless postgresql?
!Postgresql::ReplicationSlot.lag_too_great?
end
def self.nulls_last_order(field, direction = 'ASC')
order = "#{field} #{direction}"
......
# frozen_string_literal: true
module Gitlab
module Database
module Subquery
class << self
def self_join(relation)
t = relation.arel_table
t2 = relation.arel.as('t2')
relation.unscoped.joins(t.join(t2).on(t[:id].eq(t2[:id])).join_sources.first)
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::Subquery do
describe '.self_join' do
set(:project) { create(:project) }
it 'allows you to delete_all rows with WHERE and LIMIT' do
events = create_list(:event, 8, project: project)
expect do
described_class.self_join(Event.where('id < ?', events[5]).recent.limit(2)).delete_all
end.to change { Event.count }.by(-2)
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment