Commit 9d82cea3 authored by Valery Sizov's avatar Valery Sizov Committed by Michael Kozono

Geo: Publish Snippets replication events

When snippet is created or updated we have to create
a Geo event to replicate the change

This MR also adds a sceleton of the snippet replication feature
parent d7c8a30c
......@@ -13,6 +13,10 @@ class ApplicationRecord < ActiveRecord::Base
where(id: ids)
end
def self.primary_key_in(values)
where(primary_key => values)
end
def self.iid_in(iids)
where(iid: iids)
end
......
......@@ -131,3 +131,5 @@ class SnippetRepository < ApplicationRecord
action[:action] == :update && action[:content].nil?
end
end
SnippetRepository.prepend_if_ee('EE::SnippetRepository')
......@@ -58,3 +58,5 @@ module Snippets
end
end
end
Snippets::DestroyService.prepend_if_ee('EE::Snippets::DestroyService')
......@@ -75,10 +75,16 @@ class PostReceive # rubocop:disable Scalability/IdempotentWorker
return false unless user
replicate_snippet_changes(snippet)
expire_caches(post_received, snippet.repository)
Snippets::UpdateStatisticsService.new(snippet).execute
end
def replicate_snippet_changes(snippet)
# Used by Gitlab Geo
end
# Expire the repository status, branch, and tag cache once per push.
def expire_caches(post_received, repository)
repository.expire_status_cache if repository.empty?
......
---
title: 'Geo: Added DB tables for snippets replication'
merge_request: 38688
author:
type: added
---
name: geo_snippet_repository_replication
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/38688
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/224168
group: group::geo
type: development
default_enabled: false
\ No newline at end of file
......@@ -28,6 +28,7 @@ ActiveSupport::Inflector.inflections do |inflect|
terraform_state_registry
vulnerabilities_feedback
vulnerability_feedback
snippet_repository_registry
)
inflect.acronym 'EE'
inflect.acronym 'CSP'
......
# frozen_string_literal: true
class AddVerificationStateToSnippetRepository < ActiveRecord::Migration[6.0]
DOWNTIME = false
def change
change_table(:snippet_repositories) do |t|
t.integer :verification_retry_count, limit: 2
t.column :verification_retry_at, :datetime_with_timezone
t.column :verified_at, :datetime_with_timezone
t.binary :verification_checksum, using: 'verification_checksum::bytea'
# rubocop:disable Migration/AddLimitToTextColumns
t.text :verification_failure
# rubocop:enable Migration/AddLimitToTextColumns
end
end
end
# frozen_string_literal: true
class AddVerificationFailureToSnippetRepository < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
CONSTRAINT_NAME = 'snippet_repositories_verification_failure_text_limit'
def up
add_text_limit :snippet_repositories, :verification_failure, 255, constraint_name: CONSTRAINT_NAME
end
def down
remove_check_constraint(:snippet_repositories, CONSTRAINT_NAME)
end
end
# frozen_string_literal: true
class AddVerificationFailureIndexToSnippetRepository < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
add_concurrent_index :snippet_repositories, :verification_failure, where: "(verification_failure IS NOT NULL)", name: 'snippet_repositories_verification_failure_partial'
add_concurrent_index :snippet_repositories, :verification_checksum, where: "(verification_checksum IS NOT NULL)", name: 'snippet_repositories_verification_checksum_partial'
end
def down
remove_concurrent_index_by_name :snippet_repositories, 'snippet_repositories_verification_failure_partial'
remove_concurrent_index_by_name :snippet_repositories, 'snippet_repositories_verification_checksum_partial'
end
end
a48d2b0ae00630775a3385aab6b6ae7ac5ebc999228605293c2e824c3651b7da
\ No newline at end of file
a7e650ffb9db2068168b486f8cb27c6bcdaad3682309e7c7df7e3c24c34c0498
\ No newline at end of file
d51302497aafd672954e1ea7613e33036fd3a083f4b0f349b3710058f1db0810
\ No newline at end of file
......@@ -15667,7 +15667,13 @@ ALTER SEQUENCE public.smartcard_identities_id_seq OWNED BY public.smartcard_iden
CREATE TABLE public.snippet_repositories (
snippet_id bigint NOT NULL,
shard_id bigint NOT NULL,
disk_path character varying(80) NOT NULL
disk_path character varying(80) NOT NULL,
verification_retry_count smallint,
verification_retry_at timestamp with time zone,
verified_at timestamp with time zone,
verification_checksum bytea,
verification_failure text,
CONSTRAINT snippet_repositories_verification_failure_text_limit CHECK ((char_length(verification_failure) <= 255))
);
CREATE TABLE public.snippet_statistics (
......@@ -21374,6 +21380,10 @@ CREATE INDEX partial_index_deployments_for_legacy_successful_deployments ON publ
CREATE INDEX partial_index_deployments_for_project_id_and_tag ON public.deployments USING btree (project_id) WHERE (tag IS TRUE);
CREATE INDEX snippet_repositories_verification_checksum_partial ON public.snippet_repositories USING btree (verification_checksum) WHERE (verification_checksum IS NOT NULL);
CREATE INDEX snippet_repositories_verification_failure_partial ON public.snippet_repositories USING btree (verification_failure) WHERE (verification_failure IS NOT NULL);
CREATE UNIQUE INDEX snippet_user_mentions_on_snippet_id_and_note_id_index ON public.snippet_user_mentions USING btree (snippet_id, note_id);
CREATE UNIQUE INDEX snippet_user_mentions_on_snippet_id_index ON public.snippet_user_mentions USING btree (snippet_id) WHERE (note_id IS NULL);
......
......@@ -192,6 +192,12 @@ configuration option in `gitlab.yml`. These metrics are served from the
| `geo_merge_request_diffs_synced` | Gauge | 13.4 | Number of syncable merge request diffs synced on secondary | `url` |
| `geo_merge_request_diffs_failed` | Gauge | 13.4 | Number of syncable merge request diffs failed to sync on secondary | `url` |
| `geo_merge_request_diffs_registry` | Gauge | 13.4 | Number of merge request diffs in the registry | `url` |
| `geo_snippet_repositories` | Gauge | 13.4 | Number of snippets on primary | `url` |
| `geo_snippet_repositories_checksummed` | Gauge | 13.4 | Number of snippets checksummed on primary | `url` |
| `geo_snippet_repositories_checksum_failed` | Gauge | 13.4 | Number of snippets failed to calculate the checksum on primary | `url` |
| `geo_snippet_repositories_synced` | Gauge | 13.4 | Number of syncable snippets synced on secondary | `url` |
| `geo_snippet_repositories_failed` | Gauge | 13.4 | Number of syncable snippets failed on secondary | `url` |
| `geo_snippet_repositories_registry` | Gauge | 13.4 | Number of syncable snippets in the registry | `url` |
## Database load balancing metrics **(PREMIUM ONLY)**
......
......@@ -371,7 +371,13 @@ Example response:
"package_files_checksum_failed_count": 0,
"package_files_registry_count": 10,
"package_files_synced_count": 6,
"package_files_failed_count": 3
"package_files_failed_count": 3,
"snippet_repositories_count": 10,
"snippet_repositories_checksummed_count": 10,
"snippet_repositories_checksum_failed_count": 0,
"snippet_repositories_registry_count": 10,
"snippet_repositories_synced_count": 6,
"snippet_repositories_failed_count": 3
},
{
"geo_node_id": 2,
......@@ -460,6 +466,12 @@ Example response:
"terraform_states_registry_count": 10,
"terraform_states_synced_count": 6,
"terraform_states_failed_count": 3
"snippet_repositories_count": 10,
"snippet_repositories_checksummed_count": 10,
"snippet_repositories_checksum_failed_count": 0,
"snippet_repositories_registry_count": 10,
"snippet_repositories_synced_count": 6,
"snippet_repositories_failed_count": 3
}
]
```
......
......@@ -416,12 +416,20 @@ for verification state to the widgets table:
# frozen_string_literal: true
class AddVerificationFailureLimitToWidgets < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def change
add_text_limit :widgets, :verification_failure, 255
CONSTRAINT_NAME = 'widget_verification_failure_text_limit'
def up
add_text_limit :widget, :verification_failure, 255, constraint_name: CONSTRAINT_NAME
end
def down
remove_check_constraint(:widget, CONSTRAINT_NAME)
end
end
```
......
......@@ -26,12 +26,6 @@ module Geo
download
end
def handle_after_destroy
return unless self.class.enabled?
publish(:deleted, **deleted_params)
end
# Called by Gitlab::Geo::Replicator#consume
def consume_event_deleted(**params)
replicate_destroy(params)
......@@ -103,22 +97,12 @@ module Geo
).execute
end
def schedule_checksum_calculation
Geo::BlobVerificationPrimaryWorker.perform_async(replicable_name, model_record.id)
end
def created_params
{ model_record_id: model_record.id }
end
def deleted_params
{ model_record_id: model_record.id, blob_path: blob_path }
end
def needs_checksum?
return true unless model_record.respond_to?(:needs_checksum?)
model_record.needs_checksum?
def schedule_checksum_calculation
Geo::BlobVerificationPrimaryWorker.perform_async(replicable_name, model_record.id)
end
end
end
# frozen_string_literal: true
module Geo
module RepositoryReplicatorStrategy
extend ActiveSupport::Concern
include Delay
include Gitlab::Geo::LogHelpers
included do
event :updated
event :deleted
end
# Called by Gitlab::Geo::Replicator#consume
def consume_event_updated(**params)
# not implemented yet
end
# Called by Gitlab::Geo::Replicator#consume
def consume_event_deleted(**params)
# not implemented yet
end
end
end
# frozen_string_literal: true
module EE
module SnippetRepository
extend ActiveSupport::Concern
prepended do
include ::Gitlab::Geo::ReplicableModel
with_replicator Geo::SnippetRepositoryReplicator
end
class_methods do
def replicables_for_geo_node
# Not implemented yet. Should be responible for selective sync
none
end
end
end
end
......@@ -50,10 +50,12 @@ class Geo::BaseRegistry < Geo::TrackingBase
end
def self.find_registry_differences(range)
model_primary_key = self::MODEL_CLASS.primary_key.to_sym
source_ids = self::MODEL_CLASS
.replicables_for_geo_node
.id_in(range)
.pluck(self::MODEL_CLASS.arel_table[:id])
.primary_key_in(range)
.pluck(self::MODEL_CLASS.arel_table[model_primary_key])
tracked_ids = self.pluck_model_ids_in_range(range)
......
# frozen_string_literal: true
class Geo::SnippetRepositoryRegistry < Geo::BaseRegistry
include Geo::ReplicableRegistry
MODEL_CLASS = ::SnippetRepository
MODEL_FOREIGN_KEY = :snippet_repository_id
belongs_to :snippet_repository, class_name: 'SnippetRepository'
end
# frozen_string_literal: true
module Geo
class SnippetRepositoryReplicator < Gitlab::Geo::Replicator
include ::Geo::RepositoryReplicatorStrategy
def self.model
::SnippetRepository
end
def needs_checksum?
false
end
def self.replication_enabled_by_default?
false
end
end
end
# frozen_string_literal: true
module EE
module Snippets
module DestroyService
extend ActiveSupport::Concern
def attempt_destroy!
super
snippet.snippet_repository.replicator.handle_after_destroy if snippet.snippet_repository
end
end
end
end
# frozen_string_literal: true
# This service is deprecated. Don't add new resources to here.
# Use the new Self-Service Framework instead.
module Geo
class RepositoryUpdatedService
include ::Gitlab::Geo::ProjectLogHelpers
......
......@@ -7,6 +7,7 @@ module EE
# and be prepended in the `PostReceive` worker
module PostReceive
extend ActiveSupport::Concern
extend ::Gitlab::Utils::Override
private
......@@ -34,6 +35,15 @@ module EE
end
end
override :replicate_snippet_changes
def replicate_snippet_changes(snippet)
if ::Gitlab::Geo.primary?
# We don't use Geo::RepositoryUpdatedService anymore as
# it's already deprecated. See https://gitlab.com/groups/gitlab-org/-/epics/2809
snippet.snippet_repository.replicator.handle_after_update if snippet.snippet_repository
end
end
def audit_push?(project)
project.push_audit_events_enabled? && !::Gitlab::Database.read_only?
end
......
# frozen_string_literal: true
class CreateSnippetRepositoryRegistry < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
disable_ddl_transaction!
def up
create_table :snippet_repository_registry, id: :bigserial, force: :cascade do |t|
t.datetime_with_timezone :retry_at
t.datetime_with_timezone :last_synced_at
t.datetime_with_timezone :created_at, null: false
t.bigint :snippet_repository_id, null: false
t.integer :state, default: 0, null: false, limit: 2
t.integer :retry_count, default: 0, limit: 2
t.text :last_sync_failure
t.boolean :force_to_redownload
t.boolean :missing_on_primary
t.index :snippet_repository_id, name: :index_snippet_repository_registry_on_snippet_repository_id, unique: true
t.index :retry_at
t.index :state
end
add_text_limit :snippet_repository_registry, :last_sync_failure, 255
end
def down
drop_table :snippet_repository_registry
end
end
......@@ -181,6 +181,21 @@ ActiveRecord::Schema.define(version: 2020_08_27_120552) do
t.index ["wiki_verification_checksum_sha"], name: "idx_project_registry_on_wiki_checksum_sha_partial", where: "(wiki_verification_checksum_sha IS NULL)"
end
create_table "snippet_repository_registry", force: :cascade do |t|
t.datetime_with_timezone "retry_at"
t.datetime_with_timezone "last_synced_at"
t.datetime_with_timezone "created_at", null: false
t.bigint "snippet_repository_id", null: false
t.integer "state", limit: 2, default: 0, null: false
t.integer "retry_count", limit: 2, default: 0
t.text "last_sync_failure"
t.boolean "force_to_redownload"
t.boolean "missing_on_primary"
t.index ["retry_at"], name: "index_snippet_repository_registry_on_retry_at"
t.index ["snippet_repository_id"], name: "index_snippet_repository_registry_on_snippet_repository_id", unique: true
t.index ["state"], name: "index_snippet_repository_registry_on_state"
end
create_table "terraform_state_registry", force: :cascade do |t|
t.datetime_with_timezone "retry_at"
t.datetime_with_timezone "last_synced_at"
......
......@@ -22,7 +22,8 @@ module Gitlab
REPLICATOR_CLASSES = [
::Geo::MergeRequestDiffReplicator,
::Geo::PackageFileReplicator,
::Geo::TerraformStateReplicator
::Geo::TerraformStateReplicator,
::Geo::SnippetRepositoryReplicator
].freeze
def self.current_node
......
......@@ -19,6 +19,7 @@ module Gitlab
attr_reader :model_record_id
delegate :model, to: :class
delegate :replication_enabled_feature_key, to: :class
delegate :in_replicables_for_geo_node?, to: :model_record
class << self
......@@ -188,6 +189,10 @@ module Gitlab
const_get("::Geo::#{name}Replicator", false)
end
def self.replication_enabled_feature_key
:"geo_#{replicable_name}_replication"
end
# @param [ActiveRecord::Base] model_record
# @param [Integer] model_record_id
def initialize(model_record: nil, model_record_id: nil)
......@@ -284,12 +289,46 @@ module Gitlab
{ replicable_name: replicable_name, replicable_id: model_record_id }
end
protected
def handle_after_destroy
return unless self.class.enabled?
def self.replication_enabled_feature_key
:"geo_#{replicable_name}_replication"
publish(:deleted, **deleted_params)
end
def handle_after_update
return unless self.class.enabled?
publish(:updated, **updated_params)
end
def schedule_checksum_calculation
raise NotImplementedError
end
def created_params
event_params
end
def deleted_params
event_params
end
def updated_params
event_params
end
def event_params
{ model_record_id: model_record.id }
end
def needs_checksum?
return true unless model_record.respond_to?(:needs_checksum?)
model_record.needs_checksum?
end
protected
# Store an event on the database
#
# @example Create an event
......
# frozen_string_literal: true
FactoryBot.define do
factory :geo_snippet_repository_registry, class: 'Geo::SnippetRepositoryRegistry' do
snippet_repository
state { Geo::SnippetRepositoryRegistry.state_value(:pending) }
trait :synced do
state { Geo::SnippetRepositoryRegistry.state_value(:synced) }
last_synced_at { 5.days.ago }
end
trait :failed do
state { Geo::SnippetRepositoryRegistry.state_value(:failed) }
last_synced_at { 1.day.ago }
retry_count { 2 }
last_sync_failure { 'Random error' }
end
trait :started do
state { Geo::SnippetRepositoryRegistry.state_value(:started) }
last_synced_at { 1.day.ago }
retry_count { 0 }
end
end
end
......@@ -70,6 +70,14 @@
"terraform_states_failed_count",
"terraform_states_synced_count",
"terraform_states_synced_in_percentage",
"snippet_repositories_count",
"snippet_repositories_checksum_failed_count",
"snippet_repositories_checksummed_in_percentage",
"snippet_repositories_checksummed_count",
"snippet_repositories_registry_count",
"snippet_repositories_failed_count",
"snippet_repositories_synced_count",
"snippet_repositories_synced_in_percentage",
"repositories_verified_count",
"repositories_verification_failed_count",
"repositories_verified_in_percentage",
......@@ -175,6 +183,14 @@
"terraform_states_synced_count": { "type": ["integer", "null"] },
"terraform_states_synced_in_percentage": { "type": "string" },
"terraform_states_checksummed_in_percentage": { "type": "string" },
"snippet_repositories_count": { "type": ["integer", "null"] },
"snippet_repositories_checksummed_count": { "type": ["integer", "null"] },
"snippet_repositories_checksum_failed_count": { "type": ["integer", "null"] },
"snippet_repositories_registry_count": { "type": ["integer", "null"] },
"snippet_repositories_failed_count": { "type": ["integer", "null"] },
"snippet_repositories_synced_count": { "type": ["integer", "null"] },
"snippet_repositories_synced_in_percentage": { "type": "string" },
"snippet_repositories_checksummed_in_percentage": { "type": "string" },
"repositories_verified_count": { "type": ["integer", "null"] },
"repositories_verification_failed_count": { "type": ["integer", "null"] },
"repositories_verified_in_percentage": { "type": "string" },
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::SnippetRepositoryRegistry, :geo, type: :model do
let_it_be(:registry) { create(:geo_snippet_repository_registry) }
specify 'factory is valid' do
expect(registry).to be_valid
end
include_examples 'a Geo framework registry'
end
......@@ -1150,6 +1150,7 @@ RSpec.describe GeoNodeStatus, :geo do
Geo::MergeRequestDiffReplicator | :external_merge_request_diff | :geo_merge_request_diff_registry
Geo::PackageFileReplicator | :package_file | :geo_package_file_registry
Geo::TerraformStateReplicator | :terraform_state | :geo_terraform_state_registry
Geo::SnippetRepositoryReplicator | :snippet_repository | :geo_snippet_repository_registry
end
with_them do
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Geo::SnippetRepositoryReplicator do
let(:model_record) { build(:snippet_repository) }
include_examples 'a repository replicator'
end
......@@ -197,7 +197,7 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
end
before do
model_class.where(id: unused_registry_ids).delete_all
model_class.where(model_class.primary_key => unused_registry_ids).delete_all
end
it 'deletes unused registries', :sidekiq_inline do
......@@ -222,7 +222,7 @@ RSpec.describe Geo::RegistryConsistencyService, :geo, :use_clean_rails_memory_st
end
before do
model_class.where(id: unused_registry_ids).delete_all
model_class.where(model_class.primary_key => unused_registry_ids).delete_all
end
it 'deletes unused registries', :sidekiq_inline do
......
......@@ -42,7 +42,7 @@ RSpec.shared_examples 'a blob replicator' do
context 'when replication feature flag is disabled' do
before do
stub_feature_flags("geo_#{replicator.replicable_name}_replication": false)
stub_feature_flags(replicator.replication_enabled_feature_key => false)
end
it 'does not schedule the checksum calculation' do
......@@ -71,7 +71,7 @@ RSpec.shared_examples 'a blob replicator' do
context 'when replication feature flag is disabled' do
before do
stub_feature_flags("geo_#{replicator.replicable_name}_replication": false)
stub_feature_flags(replicator.replication_enabled_feature_key => false)
end
it 'does not publish' do
......
# frozen_string_literal: true
# Include these shared examples in specs of Replicators that include
# BlobReplicatorStrategy.
#
# A let variable called model_record should be defined in the spec. It should be
# a valid, unpersisted instance of the model class.
#
RSpec.shared_examples 'a repository replicator' do
include EE::GeoHelpers
let_it_be(:primary) { create(:geo_node, :primary) }
let_it_be(:secondary) { create(:geo_node) }
subject(:replicator) { model_record.replicator }
before do
stub_current_geo_node(primary)
end
it_behaves_like 'a replicator'
# This could be included in each model's spec, but including it here is DRYer.
include_examples 'a replicable model'
describe '#handle_after_update' do
it 'creates a Geo::Event' do
expect do
replicator.handle_after_update
end.to change { ::Geo::Event.count }.by(1)
expect(::Geo::Event.last.attributes).to include(
"replicable_name" => replicator.replicable_name, "event_name" => "updated", "payload" => { "model_record_id" => replicator.model_record.id })
end
context 'when replication feature flag is disabled' do
before do
stub_feature_flags(replicator.replication_enabled_feature_key => false)
end
it 'does not publish' do
expect(replicator).not_to receive(:publish)
replicator.handle_after_update
end
end
end
describe '#handle_after_destroy' do
it 'creates a Geo::Event' do
expect do
replicator.handle_after_destroy
end.to change { ::Geo::Event.count }.by(1)
expect(::Geo::Event.last.attributes).to include(
"replicable_name" => replicator.replicable_name, "event_name" => "deleted", "payload" => { "model_record_id" => replicator.model_record.id })
end
context 'when replication feature flag is disabled' do
before do
stub_feature_flags("geo_#{replicator.replicable_name}_replication": false)
end
it 'does not publish' do
expect(replicator).not_to receive(:publish)
replicator.handle_after_destroy
end
end
end
describe '#model' do
let(:invoke_model) { replicator.class.model }
it 'is implemented' do
expect do
invoke_model
end.not_to raise_error
end
it 'is a Class' do
expect(invoke_model).to be_a(Class)
end
end
end
......@@ -8,5 +8,13 @@ FactoryBot.define do
snippet_repository.shard_name = snippet_repository.snippet.repository_storage
snippet_repository.disk_path = snippet_repository.snippet.disk_path
end
trait(:checksummed) do
verification_checksum { 'abc' }
end
trait(:checksum_failure) do
verification_failure { 'Could not calculate the checksum' }
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment