Commit 45cadbf9 authored by Adam Hegyi's avatar Adam Hegyi

Merge branch 'mwaw/activate_missing_prometheus_integration' into 'master'

Upsert missing or inactive services records for projects with available Prometheus application installed on shared k8s cluster

See merge request gitlab-org/gitlab!24684
parents 80b819e1 f77970c2
---
title: Migrate the database to activate projects prometheus service integration for
projects with prometheus installed on shared k8s cluster.
merge_request: 24684
author:
type: fixed
# frozen_string_literal: true
# See http://doc.gitlab.com/ce/development/migration_style_guide.html
# for more information on how to write migrations for GitLab.
class FixProjectsWithoutPrometheusService < ActiveRecord::Migration[6.0]
include Gitlab::Database::MigrationHelpers
DOWNTIME = false
BATCH_SIZE = 50_000
MIGRATION = 'FixProjectsWithoutPrometheusService'
disable_ddl_transaction!
class Project < ActiveRecord::Base
include EachBatch
end
def up
queue_background_migration_jobs_by_range_at_intervals(Project, MIGRATION, 2.minutes, batch_size: BATCH_SIZE)
end
def down
# no-op
end
end
......@@ -12747,6 +12747,7 @@ COPY "schema_migrations" (version) FROM STDIN;
20200219184219
20200219193058
20200219193117
20200220115023
20200220180944
20200221023320
20200221074028
......
# frozen_string_literal: true
module Gitlab
module BackgroundMigration
# This migration creates missing services records
# for the projects within the given range of ids
class FixProjectsWithoutPrometheusService
# There is important inconsistency between single query timeout 15s and background migration worker minimum lease 2 minutes
# to address that scheduled ids range (for minimum 2 minutes processing) should be inserted in smaller portions to fit under 15s limit.
# https://gitlab.com/gitlab-com/gl-infra/infrastructure/issues/9064#note_279857215
MAX_BATCH_SIZE = 1_000
DEFAULTS = {
'active' => true,
'properties' => "'{}'",
'type' => "'PrometheusService'",
'template' => false,
'push_events' => true,
'issues_events' => true,
'merge_requests_events' => true,
'tag_push_events' => true,
'note_events' => true,
'category' => "'monitoring'",
'default' => false,
'wiki_page_events' => true,
'pipeline_events' => true,
'confidential_issues_events' => true,
'commit_events' => true,
'job_events' => true,
'confidential_note_events' => true
}.freeze
module Migratable
module Applications
# Migration model namespace isolated from application code.
class Prometheus
def self.statuses
{
errored: -1,
installed: 3,
updated: 5
}
end
end
end
# Migration model namespace isolated from application code.
class Cluster < ActiveRecord::Base
self.table_name = 'clusters'
enum cluster_type: {
instance_type: 1,
group_type: 2
}
def self.has_prometheus_application?
joins("INNER JOIN clusters_applications_prometheus ON clusters_applications_prometheus.cluster_id = clusters.id
AND clusters_applications_prometheus.status IN (#{Applications::Prometheus.statuses[:installed]}, #{Applications::Prometheus.statuses[:updated]})").exists?
end
end
# Migration model namespace isolated from application code.
class PrometheusService < ActiveRecord::Base
self.inheritance_column = :_type_disabled
self.table_name = 'services'
default_scope { where(type: type) }
def self.type
'PrometheusService'
end
def self.template
find_by(template: true)
end
def self.values
(template&.attributes_for_insert || DEFAULTS).merge('template' => false, 'active' => true).values
end
def attributes_for_insert
slice(DEFAULTS.keys).transform_values do |v|
v.is_a?(String) ? "'#{v}'" : v
end
end
end
# Migration model namespace isolated from application code.
class Project < ActiveRecord::Base
self.table_name = 'projects'
scope :select_for_insert, -> {
select('id')
.select(PrometheusService.values.join(','))
.select("TIMEZONE('UTC', NOW()) as created_at", "TIMEZONE('UTC', NOW()) as updated_at")
}
scope :with_prometheus_services, ->(from_id, to_id) {
joins("LEFT JOIN services ON services.project_id = projects.id AND services.project_id BETWEEN #{Integer(from_id)} AND #{Integer(to_id)}
AND services.type = '#{PrometheusService.type}'")
}
scope :with_group_prometheus_installed, -> {
joins("INNER JOIN cluster_groups ON cluster_groups.group_id = projects.namespace_id")
.joins("INNER JOIN clusters_applications_prometheus ON clusters_applications_prometheus.cluster_id = cluster_groups.cluster_id
AND clusters_applications_prometheus.status IN (#{Applications::Prometheus.statuses[:installed]}, #{Applications::Prometheus.statuses[:updated]})")
}
end
end
def perform(from_id, to_id)
(from_id..to_id).each_slice(MAX_BATCH_SIZE) do |batch|
process_batch(batch.first, batch.last)
end
end
private
def process_batch(from_id, to_id)
update_inconsistent(from_id, to_id)
create_missing(from_id, to_id)
end
def create_missing(from_id, to_id)
result = ActiveRecord::Base.connection.select_one(create_sql(from_id, to_id))
return unless result
logger.info(message: "#{self.class}: created missing services for #{result['number_of_created_records']} projects in id=#{from_id}...#{to_id}")
end
def update_inconsistent(from_id, to_id)
result = ActiveRecord::Base.connection.select_one(update_sql(from_id, to_id))
return unless result
logger.info(message: "#{self.class}: updated inconsistent services for #{result['number_of_updated_records']} projects in id=#{from_id}...#{to_id}")
end
# there is no uniq constraint on project_id and type pair, which prevents us from using ON CONFLICT
def create_sql(from_id, to_id)
<<~SQL
WITH created_records AS (
INSERT INTO services (project_id, #{DEFAULTS.keys.map { |key| %("#{key}")}.join(',')}, created_at, updated_at)
#{select_insert_values_sql(from_id, to_id)}
RETURNING *
)
SELECT COUNT(*) as number_of_created_records
FROM created_records
SQL
end
# there is no uniq constraint on project_id and type pair, which prevents us from using ON CONFLICT
def update_sql(from_id, to_id)
<<~SQL
WITH updated_records AS (
UPDATE services SET active = TRUE
WHERE services.project_id BETWEEN #{Integer(from_id)} AND #{Integer(to_id)} AND services.properties = '{}' AND services.type = '#{Migratable::PrometheusService.type}'
AND #{group_cluster_condition(from_id, to_id)} AND services.active = FALSE
RETURNING *
)
SELECT COUNT(*) as number_of_updated_records
FROM updated_records
SQL
end
def group_cluster_condition(from_id, to_id)
return '1 = 1' if migrate_instance_cluster?
<<~SQL
EXISTS (
#{Migratable::Project.select(1).with_group_prometheus_installed.where("projects.id BETWEEN ? AND ?", Integer(from_id), Integer(to_id)).to_sql}
)
SQL
end
def select_insert_values_sql(from_id, to_id)
scope = Migratable::Project
.select_for_insert
.with_prometheus_services(from_id, to_id)
.where("projects.id BETWEEN ? AND ? AND services.id IS NULL", Integer(from_id), Integer(to_id))
return scope.to_sql if migrate_instance_cluster?
scope.with_group_prometheus_installed.to_sql
end
def logger
@logger ||= Gitlab::BackgroundMigration::Logger.build
end
def migrate_instance_cluster?
if instance_variable_defined?('@migrate_instance_cluster')
@migrate_instance_cluster
else
@migrate_instance_cluster = Migratable::Cluster.instance_type.has_prometheus_application?
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
describe Gitlab::BackgroundMigration::FixProjectsWithoutPrometheusService, :migration, schema: 2020_02_20_115023 do
def service_params_for(project_id, params = {})
{
project_id: project_id,
active: false,
properties: '{}',
type: 'PrometheusService',
template: false,
push_events: true,
issues_events: true,
merge_requests_events: true,
tag_push_events: true,
note_events: true,
category: 'monitoring',
default: false,
wiki_page_events: true,
pipeline_events: true,
confidential_issues_events: true,
commit_events: true,
job_events: true,
confidential_note_events: true,
deployment_events: false
}.merge(params)
end
let(:namespaces) { table(:namespaces) }
let(:projects) { table(:projects) }
let(:services) { table(:services) }
let(:clusters) { table(:clusters) }
let(:cluster_groups) { table(:cluster_groups) }
let(:clusters_applications_prometheus) { table(:clusters_applications_prometheus) }
let(:namespace) { namespaces.create(name: 'user', path: 'user') }
let(:project) { projects.create(namespace_id: namespace.id) }
let(:application_statuses) do
{
errored: -1,
installed: 3,
updated: 5
}
end
let(:cluster_types) do
{
instance_type: 1,
group_type: 2,
project_type: 3
}
end
let(:columns) do
%w(project_id active properties type template push_events
issues_events merge_requests_events tag_push_events
note_events category default wiki_page_events pipeline_events
confidential_issues_events commit_events job_events
confidential_note_events deployment_events)
end
describe '#perform' do
shared_examples 'fix services entries state' do
it 'is idempotent' do
expect { subject.perform(project.id, project.id + 1) }.to change { services.order(:id).map { |row| row.attributes } }
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
context 'non prometheus services' do
it 'does not change them' do
other_type = 'SomeOtherService'
services.create(service_params_for(project.id, active: true, type: other_type))
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.where(type: other_type).order(:id).map { |row| row.attributes } }
end
end
context 'prometheus integration services do not exist' do
it 'creates missing services entries', :aggregate_failures do
expect { subject.perform(project.id, project.id + 1) }.to change { services.count }.by(1)
expect([service_params_for(project.id, active: true)]).to eq services.order(:id).map { |row| row.attributes.slice(*columns).symbolize_keys }
end
context 'template is present for prometheus services' do
it 'creates missing services entries', :aggregate_failures do
services.create(service_params_for(nil, template: true, properties: { 'from_template' => true }.to_json))
expect { subject.perform(project.id, project.id + 1) }.to change { services.count }.by(1)
updated_rows = services.where(template: false).order(:id).map { |row| row.attributes.slice(*columns).symbolize_keys }
expect([service_params_for(project.id, active: true, properties: { 'from_template' => true }.to_json)]).to eq updated_rows
end
end
end
context 'prometheus integration services exist' do
context 'in active state' do
it 'does not change them' do
services.create(service_params_for(project.id, active: true))
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
context 'not in active state' do
it 'sets active attribute to true' do
service = services.create(service_params_for(project.id, active: false))
expect { subject.perform(project.id, project.id + 1) }.to change { service.reload.active? }.from(false).to(true)
end
context 'prometheus services are configured manually ' do
it 'does not change them' do
properties = '{"api_url":"http://test.dev","manual_configuration":"1"}'
services.create(service_params_for(project.id, properties: properties, active: false))
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
end
end
end
context 'k8s cluster shared on instance level' do
let(:cluster) { clusters.create(name: 'cluster', cluster_type: cluster_types[:instance_type]) }
context 'with installed prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:installed], version: '123')
end
it_behaves_like 'fix services entries state'
end
context 'with updated prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:updated], version: '123')
end
it_behaves_like 'fix services entries state'
end
context 'with errored prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:errored], version: '123')
end
it 'does not change services entries' do
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
end
context 'k8s cluster shared on group level' do
let(:cluster) { clusters.create(name: 'cluster', cluster_type: cluster_types[:group_type]) }
before do
cluster_groups.create(cluster_id: cluster.id, group_id: project.namespace_id)
end
context 'with installed prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:installed], version: '123')
end
it_behaves_like 'fix services entries state'
context 'second k8s cluster without application available' do
let(:namespace_2) { namespaces.create(name: 'namespace2', path: 'namespace2') }
let(:project_2) { projects.create(namespace_id: namespace_2.id) }
before do
cluster_2 = clusters.create(name: 'cluster2', cluster_type: cluster_types[:group_type])
cluster_groups.create(cluster_id: cluster_2.id, group_id: project_2.namespace_id)
end
it 'changed only affected services entries' do
expect { subject.perform(project.id, project_2.id + 1) }.to change { services.count }.by(1)
expect([service_params_for(project.id, active: true)]).to eq services.order(:id).map { |row| row.attributes.slice(*columns).symbolize_keys }
end
end
end
context 'with updated prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:updated], version: '123')
end
it_behaves_like 'fix services entries state'
end
context 'with errored prometheus application' do
before do
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:errored], version: '123')
end
it 'does not change services entries' do
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
context 'with missing prometheus application' do
it 'does not change services entries' do
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
context 'with inactive service' do
it 'does not change services entries' do
services.create(service_params_for(project.id))
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
end
end
context 'k8s cluster for single project' do
let(:cluster) { clusters.create(name: 'cluster', cluster_type: cluster_types[:project_type]) }
let(:cluster_projects) { table(:cluster_projects) }
context 'with installed prometheus application' do
before do
cluster_projects.create(cluster_id: cluster.id, project_id: project.id)
clusters_applications_prometheus.create(cluster_id: cluster.id, status: application_statuses[:installed], version: '123')
end
it 'does not change services entries' do
expect { subject.perform(project.id, project.id + 1) }.not_to change { services.order(:id).map { |row| row.attributes } }
end
end
end
end
end
# frozen_string_literal: true
#
require 'spec_helper'
require Rails.root.join('db', 'post_migrate', '20200220115023_fix_projects_without_prometheus_service.rb')
describe FixProjectsWithoutPrometheusService, :migration do
let(:namespace) { table(:namespaces).create(name: 'gitlab', path: 'gitlab-org') }
let!(:projects) do
[
table(:projects).create(namespace_id: namespace.id, name: 'foo 1'),
table(:projects).create(namespace_id: namespace.id, name: 'foo 2'),
table(:projects).create(namespace_id: namespace.id, name: 'foo 3')
]
end
before do
stub_const("#{described_class.name}::BATCH_SIZE", 2)
end
around do |example|
Sidekiq::Testing.fake! do
Timecop.freeze do
example.call
end
end
end
it 'schedules jobs for ranges of projects' do
migrate!
expect(described_class::MIGRATION)
.to be_scheduled_delayed_migration(2.minutes, projects[0].id, projects[1].id)
expect(described_class::MIGRATION)
.to be_scheduled_delayed_migration(4.minutes, projects[2].id, projects[2].id)
end
it 'schedules jobs according to the configured batch size' do
expect { migrate! }.to change { BackgroundMigrationWorker.jobs.size }.by(2)
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment