Commit f77970c2 authored by Mikolaj Wawrzyniak's avatar Mikolaj Wawrzyniak

Add background migraiton to fix missing services

Projects connected to shared k8s clusters are missing PromethueServices
we need to iterate over batch of projects records and insert missing
entires into services table
parent 1d05a0db
---
title: Migrate the database to activate projects prometheus service integration for
projects with prometheus installed on shared k8s cluster.
merge_request: 24684
author:
type: fixed
# frozen_string_literal: true
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class FixProjectsWithoutPrometheusService < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
BATCH_SIZE = 50_000
MIGRATION = 'FixProjectsWithoutPrometheusService'
disable_ddl_transaction!
class Project < ActiveRecord::Base
include EachBatch
end
def up
queue_background_migration_jobs_by_range_at_intervals(Project, MIGRATION, 2.minutes, batch_size: BATCH_SIZE)
end
def down
# no-op
end
end
......@@ -12747,6 +12747,7 @@ COPY "schema_migrations" (version) FROM STDIN;
20200219184219
20200219193058
20200219193117
20200220115023
20200220180944
20200221023320
20200221074028
......
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# This migration creates missing services records
# for the projects within the given range of ids
class FixProjectsWithoutPrometheusService
# There is important inconsistency between single query timeout 15s and background migration worker minimum lease 2 minutes
# to address that scheduled ids range (for minimum 2 minutes processing) should be inserted in smaller portions to fit under 15s limit.
# https://gitlab.com/gitlab-com/gl-infra/infrastructure/issues/9064#note_279857215
MAX_BATCH_SIZE = 1_000
DEFAULTS = {
'active' => true,
'properties' => "'{}'",
'type' => "'PrometheusService'",
'template' => false,
'push_events' => true,
'issues_events' => true,
'merge_requests_events' => true,
'tag_push_events' => true,
'note_events' => true,
'category' => "'monitoring'",
'default' => false,
'wiki_page_events' => true,
'pipeline_events' => true,
'confidential_issues_events' => true,
'commit_events' => true,
'job_events' => true,
'confidential_note_events' => true
}.freeze
module Migratable
module Applications
# Migration model namespace isolated from application code.
class Prometheus
def self.statuses
{
errored: -1,
installed: 3,
updated: 5
}
end
end
end
# Migration model namespace isolated from application code.
class Cluster < ActiveRecord::Base
self.table_name = 'clusters'
enum cluster_type: {
instance_type: 1,
group_type: 2
}
def self.has_prometheus_application?
joins("INNER JOIN clusters_applications_prometheus ON clusters_applications_prometheus.cluster_id = clusters.id
AND clusters_applications_prometheus.status IN (#{Applications::Prometheus.statuses[:installed]}, #{Applications::Prometheus.statuses[:updated]})").exists?
end
end
# Migration model namespace isolated from application code.
class PrometheusService < ActiveRecord::Base
self.inheritance_column = :_type_disabled
self.table_name = 'services'
default_scope { where(type: type) }
def self.type
'PrometheusService'
end
def self.template
find_by(template: true)
end
def self.values
(template&.attributes_for_insert || DEFAULTS).merge('template' => false, 'active' => true).values
end
def attributes_for_insert
slice(DEFAULTS.keys).transform_values do |v|
v.is_a?(String) ? "'#{v}'" : v
end
end
end
# Migration model namespace isolated from application code.
class Project < ActiveRecord::Base
self.table_name = 'projects'
scope :select_for_insert, -> {
select('id')
.select(PrometheusService.values.join(','))
.select("TIMEZONE('UTC', NOW()) as created_at", "TIMEZONE('UTC', NOW()) as updated_at")
}
scope :with_prometheus_services, ->(from_id, to_id) {
joins("LEFT JOIN services ON services.project_id = projects.id AND services.project_id BETWEEN #{Integer(from_id)} AND #{Integer(to_id)}
AND services.type = '#{PrometheusService.type}'")
}
scope :with_group_prometheus_installed, -> {
joins("INNER JOIN cluster_groups ON cluster_groups.group_id = projects.namespace_id")
.joins("INNER JOIN clusters_applications_prometheus ON clusters_applications_prometheus.cluster_id = cluster_groups.cluster_id
AND clusters_applications_prometheus.status IN (#{Applications::Prometheus.statuses[:installed]}, #{Applications::Prometheus.statuses[:updated]})")
}
end
end
def perform(from_id, to_id)
(from_id..to_id).each_slice(MAX_BATCH_SIZE) do |batch|
process_batch(batch.first, batch.last)
end
end
private
def process_batch(from_id, to_id)
update_inconsistent(from_id, to_id)
create_missing(from_id, to_id)
end
def create_missing(from_id, to_id)
result = ActiveRecord::Base.connection.select_one(create_sql(from_id, to_id))
return unless result
logger.info(message: "#{self.class}: created missing services for #{result['number_of_created_records']} projects in id=#{from_id}...#{to_id}")
end
def update_inconsistent(from_id, to_id)
result = ActiveRecord::Base.connection.select_one(update_sql(from_id, to_id))
return unless result
logger.info(message: "#{self.class}: updated inconsistent services for #{result['number_of_updated_records']} projects in id=#{from_id}...#{to_id}")
end
# there is no uniq constraint on project_id and type pair, which prevents us from using ON CONFLICT
def create_sql(from_id, to_id)
<<~SQL
WITH created_records AS (
INSERT INTO services (project_id, #{DEFAULTS.keys.map { |key| %("#{key}")}.join(',')}, created_at, updated_at)
#{select_insert_values_sql(from_id, to_id)}
RETURNING *
)
SELECT COUNT(*) as number_of_created_records
FROM created_records
SQL
end
# there is no uniq constraint on project_id and type pair, which prevents us from using ON CONFLICT
def update_sql(from_id, to_id)
<<~SQL
WITH updated_records AS (
UPDATE services SET active = TRUE
WHERE services.project_id BETWEEN #{Integer(from_id)} AND #{Integer(to_id)} AND services.properties = '{}' AND services.type = '#{Migratable::PrometheusService.type}'
AND #{group_cluster_condition(from_id, to_id)} AND services.active = FALSE
RETURNING *
)
SELECT COUNT(*) as number_of_updated_records
FROM updated_records
SQL
end
def group_cluster_condition(from_id, to_id)
return '1 = 1' if migrate_instance_cluster?
<<~SQL
EXISTS (
#{Migratable::Project.select(1).with_group_prometheus_installed.where("projects.id BETWEEN ? AND ?", Integer(from_id), Integer(to_id)).to_sql}
)
SQL
end
def select_insert_values_sql(from_id, to_id)
scope = Migratable::Project
.select_for_insert
.with_prometheus_services(from_id, to_id)
.where("projects.id BETWEEN ? AND ? AND services.id IS NULL", Integer(from_id), Integer(to_id))
return scope.to_sql if migrate_instance_cluster?
scope.with_group_prometheus_installed.to_sql
end
def logger
@logger ||= Gitlab::BackgroundMigration::Logger.build
end
def migrate_instance_cluster?
if instance_variable_defined?('@migrate_instance_cluster')
@migrate_instance_cluster
else
@migrate_instance_cluster = Migratable::Cluster.instance_type.has_prometheus_application?
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::BackgroundMigration::FixProjectsWithoutPrometheusService, :migration, schema: 2020_02_20_115023 do
def service_params_for(project_id, params = {})
{
project_id: project_id,
active: false,
properties: '{}',
type: 'PrometheusService',
template: false,
push_events: true,
issues_events: true,
merge_requests_events: true,
tag_push_events: true,
note_events: true,
category: 'monitoring',
default: false,
wiki_page_events: true,
pipeline_events: true,
confidential_issues_events: true,
commit_events: true,
job_events: true,
confidential_note_events: true,
deployment_events: false
}.merge(params)
end
let(:namespaces) { table(:namespaces) }
let(:projects) { table(:projects) }
let(:services) { table(:services) }
let(:clusters) { table(:clusters) }
let(:cluster_groups) { table(:cluster_groups) }
let(:clusters_applications_prometheus) { table(:clusters_applications_prometheus) }
let(:namespace) { namespaces.create(name: 'user', path: 'user') }
let(:project) { projects.create(namespace_id: namespace.id) }
let(:application_statuses) do
{
errored: -1,
installed: 3,
updated: 5
}
end
let(:cluster_types) do
{
instance_type: 1,
group_type: 2,
project_type: 3
}
end
let(:columns) do
%w(project_id active properties type template push_events
issues_events merge_requests_events tag_push_events
note_events category default wiki_page_events pipeline_events
confidential_issues_events commit_events job_events
confidential_note_events deployment_events)
end
describe '#perform' do
shared_examples 'fix services entries state' do
it 'is idempotent' do
expect { subject.perform(project.id, project.id + 1) }.to change { services.order(:id).map { |row| row.attributes } }
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
context 'non prometheus services' do
it 'does not change them' do
other_type = 'SomeOtherService'
services.create(service_params_for(project.id, active: true, type: other_type))
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.where(type: other_type).order(:id).map { |row| row.attributes } }
end
end
context 'prometheus integration services do not exist' do
it 'creates missing services entries', :aggregate_failures do
expect { subject.perform(project.id, project.id + 1) }.to change { services.count }.by(1)
expect([service_params_for(project.id, active: true)]).to eq services.order(:id).map { |row| row.attributes.slice(*columns).symbolize_keys }
end
context 'template is present for prometheus services' do
it 'creates missing services entries', :aggregate_failures do
services.create(service_params_for(nil, template: true, properties: { 'from_template' => true }.to_json))
expect { subject.perform(project.id, project.id + 1) }.to change { services.count }.by(1)
updated_rows = services.where(template: false).order(:id).map { |row| row.attributes.slice(*columns).symbolize_keys }
expect([service_params_for(project.id, active: true, properties: { 'from_template' => true }.to_json)]).to eq updated_rows
end
end
end
context 'prometheus integration services exist' do
context 'in active state' do
it 'does not change them' do
services.create(service_params_for(project.id, active: true))
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
context 'not in active state' do
it 'sets active attribute to true' do
service = services.create(service_params_for(project.id, active: false))
expect { subject.perform(project.id, project.id + 1) }.to change { service.reload.active? }.from(false).to(true)
end
context 'prometheus services are configured manually ' do
it 'does not change them' do
properties = '{"api_url":"http://test.dev","manual_configuration":"1"}'
services.create(service_params_for(project.id, properties: properties, active: false))
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
end
end
end
context 'k8s cluster shared on instance level' do
let(:cluster) { clusters.create(name: 'cluster', cluster_type: cluster_types[:instance_type]) }
context 'with installed prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:installed], version: '123')
end
it_behaves_like 'fix services entries state'
end
context 'with updated prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:updated], version: '123')
end
it_behaves_like 'fix services entries state'
end
context 'with errored prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:errored], version: '123')
end
it 'does not change services entries' do
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
end
context 'k8s cluster shared on group level' do
let(:cluster) { clusters.create(name: 'cluster', cluster_type: cluster_types[:group_type]) }
before do
cluster_groups.create(cluster_id: cluster.id, group_id: project.namespace_id)
end
context 'with installed prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:installed], version: '123')
end
it_behaves_like 'fix services entries state'
context 'second k8s cluster without application available' do
let(:namespace_2) { namespaces.create(name: 'namespace2', path: 'namespace2') }
let(:project_2) { projects.create(namespace_id: namespace_2.id) }
before do
cluster_2 = clusters.create(name: 'cluster2', cluster_type: cluster_types[:group_type])
cluster_groups.create(cluster_id: cluster_2.id, group_id: project_2.namespace_id)
end
it 'changed only affected services entries' do
expect { subject.perform(project.id, project_2.id + 1) }.to change { services.count }.by(1)
expect([service_params_for(project.id, active: true)]).to eq services.order(:id).map { |row| row.attributes.slice(*columns).symbolize_keys }
end
end
end
context 'with updated prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:updated], version: '123')
end
it_behaves_like 'fix services entries state'
end
context 'with errored prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:errored], version: '123')
end
it 'does not change services entries' do
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
context 'with missing prometheus application' do
it 'does not change services entries' do
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
context 'with inactive service' do
it 'does not change services entries' do
services.create(service_params_for(project.id))
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
end
end
context 'k8s cluster for single project' do
let(:cluster) { clusters.create(name: 'cluster', cluster_type: cluster_types[:project_type]) }
let(:cluster_projects) { table(:cluster_projects) }
context 'with installed prometheus application' do
before do
cluster_projects.create(cluster_id: cluster.id, project_id: project.id)
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:installed], version: '123')
end
it 'does not change services entries' do
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
end
end
end
# frozen_string_literal: true
#
require 'spec_helper'
require Rails.root.join('db', 'post_migrate', '20200220115023_fix_projects_without_prometheus_service.rb')
describe FixProjectsWithoutPrometheusService, :migration do
let(:namespace) { table(:namespaces).create(name: 'gitlab', path: 'gitlab-org') }
let!(:projects) do
[
table(:projects).create(namespace_id: namespace.id, name: 'foo 1'),
table(:projects).create(namespace_id: namespace.id, name: 'foo 2'),
table(:projects).create(namespace_id: namespace.id, name: 'foo 3')
]
end
before do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
end
around do |example|
Sidekiq::Testing.fake! do
Timecop.freeze do
example.call
end
end
end
it 'schedules jobs for ranges of projects' do
migrate!
expect(described_class::MIGRATION)
.to be_scheduled_delayed_migration(2.minutes, projects[0].id, projects[1].id)
expect(described_class::MIGRATION)
.to be_scheduled_delayed_migration(4.minutes, projects[2].id, projects[2].id)
end
it 'schedules jobs according to the configured batch size' do
expect { migrate! }.to change { BackgroundMigrationWorker.jobs.size }.by(2)
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment