Commit 31ad7992 authored by Jan Provaznik's avatar Jan Provaznik

Merge branch 'an-sidekiq-query' into 'master'

Allow config of sidekiq-cluster with query

Closes gitlab-com/gl-infra/scalability#45

See merge request gitlab-org/gitlab!18877
parents 0a8d441b 3a426da1
---
title: Add experimental --queue-selector option to sidekiq-cluster
merge_request: 18877
author:
type: changed
...@@ -82,6 +82,93 @@ you list: ...@@ -82,6 +82,93 @@ you list:
sudo gitlab-ctl reconfigure sudo gitlab-ctl reconfigure
``` ```
## Queue selector (experimental)
> [Introduced](https://gitlab.com/gitlab-com/gl-infra/scalability/issues/45) in [GitLab Starter](https://about.gitlab.com/pricing/) 12.8.
CAUTION: **Caution:**
As this is marked as **experimental**, it is subject to change at any
time, including **breaking backwards compatibility**. This is so that we
can react to changes we need for our GitLab.com deployment. We have a
tracking issue open to [remove the experimental
designation](https://gitlab.com/gitlab-com/gl-infra/scalability/issues/147)
from this feature; please comment there if you are interested in using
this in your own deployment.
In addition to selecting queues by name, as above, the
`experimental_queue_selector` option allows queue groups to be selected
in a more general way using the following components:
- Attributes that can be selected.
- Operators used to construct a query.
### Available attributes
From the [list of all available
attributes](https://gitlab.com/gitlab-org/gitlab/-/blob/master/app/workers/all_queues.yml),
`experimental_queue_selector` allows selecting of queues by the
following attributes:
- `feature_category` - the [GitLab feature
category](https://about.gitlab.com/direction/maturity/#category-maturity) the
queue belongs to. For example, the `merge` queue belongs to the
`source_code_management` category.
- `has_external_dependencies` - whether or not the queue connects to external
services. For example, all importers have this set to `true`.
- `latency_sensitive` - whether or not the queue is particularly sensitive to
latency, which also means that its jobs should run quickly. For example, the
`authorized_projects` queue is used to refresh user permissions, and is
latency sensitive.
- `name` - the queue name. The other attributes are typically more useful as
they are more general, but this is available in case a particular queue needs
to be selected.
- `resource_boundary` - if the worker is bound by `cpu`, `memory`, or
`unknown`. For example, the `project_export` queue is memory bound as it has
to load data in memory before saving it for export.
Both `has_external_dependencies` and `latency_sensitive` are boolean attributes:
only the exact string `true` is considered true, and everything else is
considered false.
### Available operators
`experimental_queue_selector` supports the following operators, listed
from highest to lowest precedence:
- `|` - the logical OR operator. For example, `query_a|query_b` (where `query_a`
and `query_b` are queries made up of the other operators here) will include
queues that match either query.
- `&` - the logical AND operator. For example, `query_a&query_b` (where
`query_a` and `query_b` are queries made up of the other operators here) will
only include queues that match both queries.
- `!=` - the NOT IN operator. For example, `feature_category!=issue_tracking`
excludes all queues from the `issue_tracking` feature category.
- `=` - the IN operator. For example, `resource_boundary=cpu` includes all
queues that are CPU bound.
- `,` - the concatenate set operator. For example,
`feature_category=continuous_integration,pages` includes all queues from
either the `continuous_integration` category or the `pages` category. This
example is also possible using the OR operator, but allows greater brevity, as
well as being lower precedence.
The operator precedence for this syntax is fixed: it's not possible to make AND
have higher precedence than OR.
### Example queries
In `/etc/gitlab/gitlab.rb`:
```ruby
sidekiq_cluster['enable'] = true
sidekiq_cluster['experimental_queue_selector'] = true
sidekiq_cluster['queue_groups'] = [
# Run all non-CPU-bound queues that are latency sensitive
'resource_boundary!=cpu&latency_sensitive=true',
# Run all continuous integration and pages queues that are not latency sensitive
'feature_category=continuous_integration,pages&latency_sensitive=false'
]
```
## Ignore all GitHub import queues ## Ignore all GitHub import queues
When [importing from GitHub](../../user/project/import/github.md), Sidekiq might When [importing from GitHub](../../user/project/import/github.md), Sidekiq might
......
...@@ -41,16 +41,25 @@ module Gitlab ...@@ -41,16 +41,25 @@ module Gitlab
option_parser.parse!(argv) option_parser.parse!(argv)
queue_groups = SidekiqCluster.parse_queues(argv) all_queues = SidekiqConfig::CliMethods.all_queues(@rails_path)
queue_names = SidekiqConfig::CliMethods.worker_queues(@rails_path)
all_queues = SidekiqConfig::CliMethods.worker_queues(@rails_path)
queue_groups =
queue_groups.map! do |queues| if @experimental_queue_selector
SidekiqConfig::CliMethods.expand_queues(queues, all_queues) # When using the experimental queue query syntax, we treat
end # each queue group as a worker attribute query, and resolve
# the queues for the queue group using this query.
argv.map do |queues|
SidekiqConfig::CliMethods.query_workers(queues, all_queues)
end
else
SidekiqCluster.parse_queues(argv).map do |queues|
SidekiqConfig::CliMethods.expand_queues(queues, queue_names)
end
end
if @negate_queues if @negate_queues
queue_groups.map! { |queues| all_queues - queues } queue_groups.map! { |queues| queue_names - queues }
end end
@logger.info("Starting cluster with #{queue_groups.length} processes") @logger.info("Starting cluster with #{queue_groups.length} processes")
...@@ -151,6 +160,10 @@ module Gitlab ...@@ -151,6 +160,10 @@ module Gitlab
@rails_path = path @rails_path = path
end end
opt.on('--experimental-queue-selector', 'EXPERIMENTAL: Run workers based on the provided selector') do |experimental_queue_selector|
@experimental_queue_selector = experimental_queue_selector
end
opt.on('-n', '--negate', 'Run workers for all queues in sidekiq_queues.yml except the given ones') do opt.on('-n', '--negate', 'Run workers for all queues in sidekiq_queues.yml except the given ones') do
@negate_queues = true @negate_queues = true
end end
......
...@@ -3,14 +3,23 @@ ...@@ -3,14 +3,23 @@
require 'spec_helper' require 'spec_helper'
describe 'ee/bin/sidekiq-cluster' do describe 'ee/bin/sidekiq-cluster' do
it 'runs successfully', :aggregate_failures do using RSpec::Parameterized::TableSyntax
cmd = %w[ee/bin/sidekiq-cluster --dryrun --negate cronjob]
output, status = Gitlab::Popen.popen(cmd, Rails.root.to_s) where(:args, :included, :excluded) do
%w[--negate cronjob] | '-qdefault,1' | '-qcronjob,1'
%w[--experimental-queue-selector resource_boundary=cpu] | '-qupdate_merge_requests,1' | '-qdefault,1'
end
with_them do
it 'runs successfully', :aggregate_failures do
cmd = %w[ee/bin/sidekiq-cluster --dryrun] + args
output, status = Gitlab::Popen.popen(cmd, Rails.root.to_s)
expect(status).to be(0) expect(status).to be(0)
expect(output).to include('"bundle", "exec", "sidekiq"') expect(output).to include('"bundle", "exec", "sidekiq"')
expect(output).not_to include('-qcronjob,1') expect(output).to include(included)
expect(output).to include('-qdefault,1') expect(output).not_to include(excluded)
end
end end
end end
# frozen_string_literal: true # frozen_string_literal: true
require 'fast_spec_helper' require 'fast_spec_helper'
require 'rspec-parameterized'
describe Gitlab::SidekiqCluster::CLI do describe Gitlab::SidekiqCluster::CLI do
let(:cli) { described_class.new('/dev/null') } let(:cli) { described_class.new('/dev/null') }
...@@ -21,9 +22,9 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -21,9 +22,9 @@ describe Gitlab::SidekiqCluster::CLI do
context 'with arguments' do context 'with arguments' do
before do before do
expect(cli).to receive(:write_pid) allow(cli).to receive(:write_pid)
expect(cli).to receive(:trap_signals) allow(cli).to receive(:trap_signals)
expect(cli).to receive(:start_loop) allow(cli).to receive(:start_loop)
end end
it 'starts the Sidekiq workers' do it 'starts the Sidekiq workers' do
...@@ -77,6 +78,85 @@ describe Gitlab::SidekiqCluster::CLI do ...@@ -77,6 +78,85 @@ describe Gitlab::SidekiqCluster::CLI do
cli.run(%w(cronjob)) cli.run(%w(cronjob))
end end
end end
context 'with --experimental-queue-selector' do
where do
{
'memory-bound queues' => {
query: 'resource_boundary=memory',
included_queues: %w(project_export),
excluded_queues: %w(merge)
},
'memory- or CPU-bound queues' => {
query: 'resource_boundary=memory,cpu',
included_queues: %w(auto_merge:auto_merge_process project_export),
excluded_queues: %w(merge)
},
'latency-sensitive CI queues' => {
query: 'feature_category=continuous_integration&latency_sensitive=true',
included_queues: %w(pipeline_cache:expire_job_cache pipeline_cache:expire_pipeline_cache),
excluded_queues: %w(merge)
},
'CPU-bound latency-sensitive CI queues' => {
query: 'feature_category=continuous_integration&latency_sensitive=true&resource_boundary=cpu',
included_queues: %w(pipeline_cache:expire_pipeline_cache),
excluded_queues: %w(pipeline_cache:expire_job_cache merge)
},
'CPU-bound latency-sensitive non-CI queues' => {
query: 'feature_category!=continuous_integration&latency_sensitive=true&resource_boundary=cpu',
included_queues: %w(new_issue),
excluded_queues: %w(pipeline_cache:expire_pipeline_cache)
},
'CI and SCM queues' => {
query: 'feature_category=continuous_integration|feature_category=source_code_management',
included_queues: %w(pipeline_cache:expire_job_cache merge),
excluded_queues: %w(mailers)
}
}
end
with_them do
it 'expands queues by attributes' do
expect(Gitlab::SidekiqCluster).to receive(:start) do |queues, opts|
expect(opts).to eq(default_options)
expect(queues.first).to include(*included_queues)
expect(queues.first).not_to include(*excluded_queues)
[]
end
cli.run(%W(--experimental-queue-selector #{query}))
end
it 'works when negated' do
expect(Gitlab::SidekiqCluster).to receive(:start) do |queues, opts|
expect(opts).to eq(default_options)
expect(queues.first).not_to include(*included_queues)
expect(queues.first).to include(*excluded_queues)
[]
end
cli.run(%W(--negate --experimental-queue-selector #{query}))
end
end
it 'expands multiple queue groups correctly' do
expect(Gitlab::SidekiqCluster)
.to receive(:start)
.with([['chat_notification'], ['project_export']], default_options)
.and_return([])
cli.run(%w(--experimental-queue-selector feature_category=chatops&latency_sensitive=true resource_boundary=memory&feature_category=importers))
end
it 'errors on an invalid query multiple queue groups correctly' do
expect(Gitlab::SidekiqCluster).not_to receive(:start)
expect { cli.run(%w(--experimental-queue-selector unknown_field=chatops)) }
.to raise_error(Gitlab::SidekiqConfig::CliMethods::QueryError)
end
end
end end
end end
......
...@@ -18,17 +18,39 @@ module Gitlab ...@@ -18,17 +18,39 @@ module Gitlab
result result
end.freeze end.freeze
def worker_queues(rails_path = Rails.root.to_s) QUERY_OR_OPERATOR = '|'
QUERY_AND_OPERATOR = '&'
QUERY_CONCATENATE_OPERATOR = ','
QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w#{QUERY_CONCATENATE_OPERATOR}]+)}.freeze
QUERY_PREDICATES = {
feature_category: :to_sym,
has_external_dependencies: lambda { |value| value == 'true' },
latency_sensitive: lambda { |value| value == 'true' },
name: :to_s,
resource_boundary: :to_sym
}.freeze
QueryError = Class.new(StandardError)
InvalidTerm = Class.new(QueryError)
UnknownOperator = Class.new(QueryError)
UnknownPredicate = Class.new(QueryError)
def all_queues(rails_path = Rails.root.to_s)
@worker_queues ||= {} @worker_queues ||= {}
@worker_queues[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path| @worker_queues[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path|
full_path = File.join(rails_path, path) full_path = File.join(rails_path, path)
queues = File.exist?(full_path) ? YAML.load_file(full_path) : []
# https://gitlab.com/gitlab-org/gitlab/issues/199230 File.exist?(full_path) ? YAML.load_file(full_path) : []
queues.map { |queue| queue.is_a?(Hash) ? queue[:name] : queue }
end end
end end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
def worker_queues(rails_path = Rails.root.to_s)
# https://gitlab.com/gitlab-org/gitlab/issues/199230
worker_names(all_queues(rails_path))
end
def expand_queues(queues, all_queues = self.worker_queues) def expand_queues(queues, all_queues = self.worker_queues)
return [] if queues.empty? return [] if queues.empty?
...@@ -40,12 +62,64 @@ module Gitlab ...@@ -40,12 +62,64 @@ module Gitlab
end end
end end
def query_workers(query_string, queues)
worker_names(queues.select(&query_string_to_lambda(query_string)))
end
def clear_memoization! def clear_memoization!
if instance_variable_defined?('@worker_queues') if instance_variable_defined?('@worker_queues')
remove_instance_variable('@worker_queues') remove_instance_variable('@worker_queues')
end end
end end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
private
def worker_names(workers)
workers.map { |queue| queue.is_a?(Hash) ? queue[:name] : queue }
end
def query_string_to_lambda(query_string)
or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string|
and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term|
predicate_for_term(term)
end
lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
end
lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
end
def predicate_for_term(term)
match = term.match(QUERY_TERM_REGEX)
raise InvalidTerm.new("Invalid term: #{term}") unless match
_, lhs, op, rhs = *match
predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR)))
end
def predicate_for_op(op, predicate)
case op
when '='
predicate
when '!='
lambda { |worker| !predicate.call(worker) }
else
# This is unreachable because InvalidTerm will be raised instead, but
# keeping it allows to guard against that changing in future.
raise UnknownOperator.new("Unknown operator: #{op}")
end
end
def predicate_factory(lhs, values)
values_block = QUERY_PREDICATES[lhs.to_sym]
raise UnknownPredicate.new("Unknown predicate: #{lhs}") unless values_block
lambda { |queue| values.map(&values_block).include?(queue[lhs.to_sym]) }
end
end end
end end
end end
# frozen_string_literal: true # frozen_string_literal: true
require 'fast_spec_helper' require 'fast_spec_helper'
require 'rspec-parameterized'
describe Gitlab::SidekiqConfig::CliMethods do describe Gitlab::SidekiqConfig::CliMethods do
let(:dummy_root) { '/tmp/' } let(:dummy_root) { '/tmp/' }
...@@ -82,7 +83,7 @@ describe Gitlab::SidekiqConfig::CliMethods do ...@@ -82,7 +83,7 @@ describe Gitlab::SidekiqConfig::CliMethods do
end end
describe '.expand_queues' do describe '.expand_queues' do
let(:all_queues) do let(:worker_queues) do
['cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs', 'post_receive'] ['cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs', 'post_receive']
end end
...@@ -92,25 +93,125 @@ describe Gitlab::SidekiqConfig::CliMethods do ...@@ -92,25 +93,125 @@ describe Gitlab::SidekiqConfig::CliMethods do
expect(described_class.expand_queues(['cronjob'])) expect(described_class.expand_queues(['cronjob']))
.to contain_exactly('cronjob') .to contain_exactly('cronjob')
allow(described_class).to receive(:worker_queues).and_return(all_queues) allow(described_class).to receive(:worker_queues).and_return(worker_queues)
expect(described_class.expand_queues(['cronjob'])) expect(described_class.expand_queues(['cronjob']))
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs') .to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
end end
it 'expands queue namespaces to concrete queue names' do it 'expands queue namespaces to concrete queue names' do
expect(described_class.expand_queues(['cronjob'], all_queues)) expect(described_class.expand_queues(['cronjob'], worker_queues))
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs') .to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
end end
it 'lets concrete queue names pass through' do it 'lets concrete queue names pass through' do
expect(described_class.expand_queues(['post_receive'], all_queues)) expect(described_class.expand_queues(['post_receive'], worker_queues))
.to contain_exactly('post_receive') .to contain_exactly('post_receive')
end end
it 'lets unknown queues pass through' do it 'lets unknown queues pass through' do
expect(described_class.expand_queues(['unknown'], all_queues)) expect(described_class.expand_queues(['unknown'], worker_queues))
.to contain_exactly('unknown') .to contain_exactly('unknown')
end end
end end
describe '.query_workers' do
using RSpec::Parameterized::TableSyntax
let(:queues) do
[
{
name: 'a',
feature_category: :category_a,
has_external_dependencies: false,
latency_sensitive: false,
resource_boundary: :cpu
},
{
name: 'a_2',
feature_category: :category_a,
has_external_dependencies: false,
latency_sensitive: true,
resource_boundary: :none
},
{
name: 'b',
feature_category: :category_b,
has_external_dependencies: true,
latency_sensitive: true,
resource_boundary: :memory
},
{
name: 'c',
feature_category: :category_c,
has_external_dependencies: false,
latency_sensitive: false,
resource_boundary: :memory
}
]
end
context 'with valid input' do
where(:query, :selected_queues) do
# feature_category
'feature_category=category_a' | %w(a a_2)
'feature_category=category_a,category_c' | %w(a a_2 c)
'feature_category=category_a|feature_category=category_c' | %w(a a_2 c)
'feature_category!=category_a' | %w(b c)
# has_external_dependencies
'has_external_dependencies=true' | %w(b)
'has_external_dependencies=false' | %w(a a_2 c)
'has_external_dependencies=true,false' | %w(a a_2 b c)
'has_external_dependencies=true|has_external_dependencies=false' | %w(a a_2 b c)
'has_external_dependencies!=true' | %w(a a_2 c)
# latency_sensitive
'latency_sensitive=true' | %w(a_2 b)
'latency_sensitive=false' | %w(a c)
'latency_sensitive=true,false' | %w(a a_2 b c)
'latency_sensitive=true|latency_sensitive=false' | %w(a a_2 b c)
'latency_sensitive!=true' | %w(a c)
# name
'name=a' | %w(a)
'name=a,b' | %w(a b)
'name=a,a_2|name=b' | %w(a a_2 b)
'name!=a,a_2' | %w(b c)
# resource_boundary
'resource_boundary=memory' | %w(b c)
'resource_boundary=memory,cpu' | %w(a b c)
'resource_boundary=memory|resource_boundary=cpu' | %w(a b c)
'resource_boundary!=memory,cpu' | %w(a_2)
# combinations
'feature_category=category_a&latency_sensitive=true' | %w(a_2)
'feature_category=category_a&latency_sensitive=true|feature_category=category_c' | %w(a_2 c)
end
with_them do
it do
expect(described_class.query_workers(query, queues))
.to match_array(selected_queues)
end
end
end
context 'with invalid input' do
where(:query, :error) do
'feature_category="category_a"' | described_class::InvalidTerm
'feature_category=' | described_class::InvalidTerm
'feature_category~category_a' | described_class::InvalidTerm
'worker_name=a' | described_class::UnknownPredicate
end
with_them do
it do
expect { described_class.query_workers(query, queues) }
.to raise_error(error)
end
end
end
end
end end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment