Commit 189f707a authored by Sean McGivern's avatar Sean McGivern

Fix and document --queue-query-syntax

This ensures that --queue-query-syntax works correctly when invoked from
the command line, without the rest of the Rails environment, as well as
adding documentation and unit-level tests.
parent 24e727ca
---
title: Add experimental --queue-query-syntax option to sidekiq-cluster
merge_request: 18877
author:
type: changed
......@@ -82,6 +82,81 @@ you list:
sudo gitlab-ctl reconfigure
```
## Queue query syntax (experimental)
> [Introduced](https://gitlab.com/gitlab-com/gl-infra/scalability/issues/45) in [GitLab Starter](https://about.gitlab.com/pricing/) 12.8.
In addition to selecting queues by name, as above, the `queue_query_syntax`
option allows queue groups to be selected in a more general way using
the following components:
- Attributes that can be selected.
- Operators used to construct a query.
### Available attributes
From the [list of all available
attributes](https://gitlab.com/gitlab-org/gitlab/-/blob/master/app/workers/all_queues.yml),
`queue_query_syntax` allows selecting of queues by the following attributes:
- `feature_category` - the [GitLab feature
category](https://about.gitlab.com/direction/maturity/#category-maturity) the
queue belongs to. For example, the `merge` queue belongs to the
`source_code_management` category.
- `has_external_dependencies` - whether or not the queue connects to external
services. For example, all importers have this set to `true`.
- `latency_sensitive` - whether or not the queue is particularly sensitive to
latency, which also means that its jobs should run quickly. For example, the
`authorized_projects` queue is used to refresh user permissions, and is
latency sensitive.
- `resource_boundary` - if the worker is bound by `cpu`, `memory`, or
`unknown`. For example, the `project_export` queue is memory bound as it has
to load data in memory before saving it for export.
Both `has_external_dependencies` and `latency_sensitive` are boolean attributes:
only the exact string `true` is considered true, and everything else is
considered false.
### Available operators
`queue_query_syntax` supports the following operators, listed from highest to
lowest precedence:
- <code> </code>&nbsp;(space) - the logical OR operator. For example, `query_a
query_b` (where `query_a` and `query_b` are queries made up of the other
operators here) will include queues that match either query.
- `,` - the logical AND operator. For example, `query_a,query_b` (where
`query_a` and `query_b` are queries made up of the other operators here) will
only include queues that match both queries.
- `!=` - the not equal to operator. For example,
`feature_category!=issue_tracking` excludes all queues from the
`issue_tracking` feature category.
- `=` - the equal to operator. For example, `resource_boundary=cpu` includes all
queues that are CPU bound.
- `|` - the concatenate set operator. For example,
`feature_category=continuous_integration|pages` includes all queues from
either the `continuous_integration` category or the `pages` category. This
example is also possible using the OR operator, but allows greater brevity, as
well as being lower precedence.
The operator precedence for this syntax is fixed: it's not possible to make AND
have higher precedence than OR.
### Example queries
In `/etc/gitlab/gitlab.rb`:
```ruby
sidekiq_cluster['enable'] = true
sidekiq_cluster['queue_query_syntax'] = true
sidekiq_cluster['queue_groups'] = [
# Run all non-CPU-bound, queues that are latency sensitive
'resource_boundary!=cpu,latency_sensitive=true',
# Run all continuous integration and pages queues that are not latency sensitive
'feature_category=continuous_integration|pages,latency_sensitive=false'
]
```
## Ignore all GitHub import queues
When [importing from GitHub](../../user/project/import/github.md), Sidekiq might
......
......@@ -41,21 +41,25 @@ module Gitlab
option_parser.parse!(argv)
queue_groups = SidekiqCluster.parse_queues(argv)
all_queues = SidekiqConfig::CliMethods.all_queues(@rails_path)
queue_names = SidekiqConfig::CliMethods.worker_queues(@rails_path)
all_queues = SidekiqConfig::CliMethods.worker_queues(@rails_path)
# When using the experimental queue query syntax, we treat each queue
# group as a worker attribute query, and resolve the queues for the
# queue group using this query.
queue_groups =
if @queue_query_syntax
queue_groups = argv.map { |queues| SidekiqConfig.query_workers(queues).map(&:queue) }
# When using the experimental queue query syntax, we treat
# each queue group as a worker attribute query, and resolve
# the queues for the queue group using this query.
argv.map do |queues|
SidekiqConfig::CliMethods.query_workers(queues, all_queues)
end
else
queue_groups.map! { |queues| SidekiqConfig::CliMethods.expand_queues(queues, all_queues) }
SidekiqCluster.parse_queues(argv).map do |queues|
SidekiqConfig::CliMethods.expand_queues(queues, queue_names)
end
end
if @negate_queues
queue_groups.map! { |queues| all_queues - queues }
queue_groups.map! { |queues| queue_names - queues }
end
@logger.info("Starting cluster with #{queue_groups.length} processes")
......
......@@ -22,9 +22,9 @@ describe Gitlab::SidekiqCluster::CLI do
context 'with arguments' do
before do
expect(cli).to receive(:write_pid)
expect(cli).to receive(:trap_signals)
expect(cli).to receive(:start_loop)
allow(cli).to receive(:write_pid)
allow(cli).to receive(:trap_signals)
allow(cli).to receive(:start_loop)
end
it 'starts the Sidekiq workers' do
......@@ -140,6 +140,22 @@ describe Gitlab::SidekiqCluster::CLI do
cli.run(%W(--negate --queue-query-syntax #{query}))
end
end
it 'expands multiple queue groups correctly' do
expect(Gitlab::SidekiqCluster)
.to receive(:start)
.with([['chat_notification'], ['project_export']], default_options)
.and_return([])
cli.run(%w(--queue-query-syntax feature_category=chatops,latency_sensitive=true resource_boundary=memory,feature_category=source_code_management))
end
it 'errors on an invalid query multiple queue groups correctly' do
expect(Gitlab::SidekiqCluster).not_to receive(:start)
expect { cli.run(%w(--queue-query-syntax unknown_field=chatops)) }
.to raise_error(Gitlab::SidekiqConfig::CliMethods::QueryError)
end
end
end
end
......
......@@ -104,59 +104,5 @@ module Gitlab
ns.camelize.constantize
end
end
def self.query_workers(query_string)
predicate = query_string_to_lambda(query_string)
workers.filter(&predicate)
end
def self.query_string_to_lambda(query_string)
or_clauses = query_string.split(%r{\s+}).map do |and_clauses_string|
and_clauses_predicates = and_clauses_string.split(',').map do |term|
match = term.match(%r{^(\w+)(!?=)([\w|]+)})
raise "invalid term #{term}" unless match
lhs = match[1]
op = match[2]
rhs = match[3]
predicate_for_op(op, predicate_factory(lhs, rhs.split('|')))
end
lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
end
lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
end
def self.predicate_for_op(op, predicate)
case op
when "="
predicate
when "!="
lambda { |worker| !predicate.call(worker) }
else
raise "unknown op #{op}"
end
end
def self.predicate_factory(lhs, values)
case lhs
when "resource_boundary"
values_sym = values.map(&:to_sym)
lambda { |worker| values_sym.include? worker.get_worker_resource_boundary }
when "latency_sensitive"
values_bool = values.map { |v| v.casecmp("true").zero? }
lambda { |worker| values_bool.include? worker.latency_sensitive_worker? }
when "feature_category"
values_sym = values.map(&:to_sym)
lambda { |worker| values_sym.include? worker.get_feature_category }
else
raise "unknown predicate #{lhs}"
end
end
end
end
......@@ -18,16 +18,30 @@ module Gitlab
result
end.freeze
def worker_queues(rails_path = Rails.root.to_s)
QUERY_OR_OPERATOR = %r{\s+}.freeze
QUERY_AND_OPERATOR = ','
QUERY_CONCATENATE_OPERATOR = '|'
QUERY_TERM_REGEX = %r{^(\w+)(!?=)([\w|]+)}.freeze
QueryError = Class.new(StandardError)
InvalidTerm = Class.new(QueryError)
UnknownOperator = Class.new(QueryError)
UnknownPredicate = Class.new(QueryError)
def all_queues(rails_path = Rails.root.to_s)
@worker_queues ||= {}
@worker_queues[rails_path] ||= QUEUE_CONFIG_PATHS.flat_map do |path|
full_path = File.join(rails_path, path)
queues = File.exist?(full_path) ? YAML.load_file(full_path) : []
# https://gitlab.com/gitlab-org/gitlab/issues/199230
queues.map { |queue| queue.is_a?(Hash) ? queue[:name] : queue }
File.exist?(full_path) ? YAML.load_file(full_path) : []
end
end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
def worker_queues(rails_path = Rails.root.to_s)
# https://gitlab.com/gitlab-org/gitlab/issues/199230
worker_names(all_queues(rails_path))
end
def expand_queues(queues, all_queues = self.worker_queues)
......@@ -40,12 +54,73 @@ module Gitlab
end
end
def query_workers(query_string, queues)
worker_names(queues.select(&query_string_to_lambda(query_string)))
end
def clear_memoization!
if instance_variable_defined?('@worker_queues')
remove_instance_variable('@worker_queues')
end
end
# rubocop:enable Gitlab/ModuleWithInstanceVariables
private
def worker_names(workers)
workers.map { |queue| queue.is_a?(Hash) ? queue[:name] : queue }
end
def query_string_to_lambda(query_string)
or_clauses = query_string.split(QUERY_OR_OPERATOR).map do |and_clauses_string|
and_clauses_predicates = and_clauses_string.split(QUERY_AND_OPERATOR).map do |term|
match = term.match(QUERY_TERM_REGEX)
raise InvalidTerm.new("Invalid term: #{term}") unless match
_, lhs, op, rhs = *match
predicate_for_op(op, predicate_factory(lhs, rhs.split(QUERY_CONCATENATE_OPERATOR)))
end
lambda { |worker| and_clauses_predicates.all? { |predicate| predicate.call(worker) } }
end
lambda { |worker| or_clauses.any? { |predicate| predicate.call(worker) } }
end
def predicate_for_op(op, predicate)
case op
when '='
predicate
when '!='
lambda { |worker| !predicate.call(worker) }
else
# This is unreachable because InvalidTerm will be raised instead, but
# keeping it allows to guard against that changing in future.
raise UnknownOperator.new("Unknown operator: #{op}")
end
end
def predicate_factory(lhs, values)
to_bool = lambda { |value| value == 'true' }
case lhs
when 'feature_category'
lambda { |worker| values.map(&:to_sym).include?(worker[:feature_category]) }
when 'has_external_dependencies'
lambda { |worker| values.map(&to_bool).include?(worker[:has_external_dependencies]) }
when 'latency_sensitive'
lambda { |worker| values.map(&to_bool).include?(worker[:latency_sensitive]) }
when 'resource_boundary'
lambda { |worker| values.map(&:to_sym).include?(worker[:resource_boundary]) }
else
raise UnknownPredicate.new("Unknown predicate: #{lhs}")
end
end
end
end
end
# frozen_string_literal: true
require 'fast_spec_helper'
require 'rspec-parameterized'
describe Gitlab::SidekiqConfig::CliMethods do
let(:dummy_root) { '/tmp/' }
......@@ -82,7 +83,7 @@ describe Gitlab::SidekiqConfig::CliMethods do
end
describe '.expand_queues' do
let(:all_queues) do
let(:worker_queues) do
['cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs', 'post_receive']
end
......@@ -92,25 +93,119 @@ describe Gitlab::SidekiqConfig::CliMethods do
expect(described_class.expand_queues(['cronjob']))
.to contain_exactly('cronjob')
allow(described_class).to receive(:worker_queues).and_return(all_queues)
allow(described_class).to receive(:worker_queues).and_return(worker_queues)
expect(described_class.expand_queues(['cronjob']))
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
end
it 'expands queue namespaces to concrete queue names' do
expect(described_class.expand_queues(['cronjob'], all_queues))
expect(described_class.expand_queues(['cronjob'], worker_queues))
.to contain_exactly('cronjob', 'cronjob:stuck_import_jobs', 'cronjob:stuck_merge_jobs')
end
it 'lets concrete queue names pass through' do
expect(described_class.expand_queues(['post_receive'], all_queues))
expect(described_class.expand_queues(['post_receive'], worker_queues))
.to contain_exactly('post_receive')
end
it 'lets unknown queues pass through' do
expect(described_class.expand_queues(['unknown'], all_queues))
expect(described_class.expand_queues(['unknown'], worker_queues))
.to contain_exactly('unknown')
end
end
describe '.query_workers' do
using RSpec::Parameterized::TableSyntax
let(:queues) do
[
{
name: 'a',
feature_category: :category_a,
has_external_dependencies: false,
latency_sensitive: false,
resource_boundary: :cpu
},
{
name: 'a_2',
feature_category: :category_a,
has_external_dependencies: false,
latency_sensitive: true,
resource_boundary: :none
},
{
name: 'b',
feature_category: :category_b,
has_external_dependencies: true,
latency_sensitive: true,
resource_boundary: :memory
},
{
name: 'c',
feature_category: :category_c,
has_external_dependencies: false,
latency_sensitive: false,
resource_boundary: :memory
}
]
end
context 'with valid input' do
where(:query, :selected_queues) do
# feature_category
'feature_category=category_a' | %w(a a_2)
'feature_category=category_a|category_c' | %w(a a_2 c)
'feature_category=category_a feature_category=category_c' | %w(a a_2 c)
'feature_category!=category_a' | %w(b c)
# has_external_dependencies
'has_external_dependencies=true' | %w(b)
'has_external_dependencies=false' | %w(a a_2 c)
'has_external_dependencies=true|false' | %w(a a_2 b c)
'has_external_dependencies=true has_external_dependencies=false' | %w(a a_2 b c)
'has_external_dependencies!=true' | %w(a a_2 c)
# latency_sensitive
'latency_sensitive=true' | %w(a_2 b)
'latency_sensitive=false' | %w(a c)
'latency_sensitive=true|false' | %w(a a_2 b c)
'latency_sensitive=true latency_sensitive=false' | %w(a a_2 b c)
'latency_sensitive!=true' | %w(a c)
# resource_boundary
'resource_boundary=memory' | %w(b c)
'resource_boundary=memory|cpu' | %w(a b c)
'resource_boundary=memory resource_boundary=cpu' | %w(a b c)
'resource_boundary!=memory|cpu' | %w(a_2)
# combinations
'feature_category=category_a,latency_sensitive=true' | %w(a_2)
'feature_category=category_a,latency_sensitive=true feature_category=category_c' | %w(a_2 c)
end
with_them do
it do
expect(described_class.query_workers(query, queues))
.to match_array(selected_queues)
end
end
end
context 'with invalid input' do
where(:query, :error) do
'feature_category="category_a"' | described_class::InvalidTerm
'feature_category=' | described_class::InvalidTerm
'feature_category~category_a' | described_class::InvalidTerm
'name=a' | described_class::UnknownPredicate
end
with_them do
it do
expect { described_class.query_workers(query, queues) }
.to raise_error(error)
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment