Commit 88e627cf authored by Yorick Peterse's avatar Yorick Peterse

Fix race conditions for AuthorizedProjectsWorker

There were two cases that could be problematic:

1. Because sometimes AuthorizedProjectsWorker would be scheduled in a
   transaction it was possible for a job to run/complete before a
   COMMIT; resulting in it either producing an error, or producing no
   new data.

2. When scheduling jobs the code would not wait until completion. This
   could lead to a user creating a project and then immediately trying
   to push to it. Usually this will work fine, but given enough load it
   might take a few seconds before a user has access.

The first one is problematic, the second one is mostly just annoying
(but annoying enough to warrant a solution).

This commit changes two things to deal with this:

1. Sidekiq scheduling now takes places after a COMMIT, this is ensured
   by scheduling using Rails' after_commit hook instead of doing so in
   an arbitrary method.

2. When scheduling jobs the calling thread now waits for all jobs to
   complete.

Solution 2 requires tracking of job completions. Sidekiq provides a way
to find a job by its ID, but this involves scanning over the entire
queue; something that is very in-efficient for large queues. As such a
more efficient solution is necessary. There are two main Gems that can
do this in a more efficient manner:

* sidekiq-status
* sidekiq_status

No, this is not a joke. Both Gems do a similar thing (but slightly
different), and the only difference in their name is a dash vs an
underscore. Both Gems however provide far more than just checking if a
job has been completed, and both have their problems. sidekiq-status
does not appear to be actively maintained, with the last release being
in 2015. It also has some issues during testing as API calls are not
stubbed in any way. sidekiq_status on the other hand does not appear to
be very popular, and introduces a similar amount of code.

Because of this I opted to write a simple home grown solution. After
all, all we need is storing a job ID somewhere so we can efficiently
look it up; we don't need extra web UIs (as provided by sidekiq-status)
or complex APIs to update progress, etc.

This is where Gitlab::SidekiqStatus comes in handy. This namespace
contains some code used for tracking, removing, and looking up job IDs;
all without having to scan over an entire queue. Data is removed
explicitly, but also expires automatically just in case.

Using this API we can now schedule jobs in a fork-join like manner: we
schedule the jobs in Sidekiq, process them in parallel, then wait for
completion. By using Sidekiq we can leverage all the benefits such as
being able to scale across multiple cores and hosts, retrying failed
jobs, etc.

The one downside is that we need to make sure we can deal with
unexpected increases in job processing timings. To deal with this the
class Gitlab::JobWaiter (used for waiting for jobs to complete) will
only wait a number of seconds (30 by default). Once this timeout is
reached it will simply return.

For GitLab.com almost all AuthorizedProjectWorker jobs complete in
seconds, only very rarely do we spike to job timings of around a minute.
These in turn seem to be the result of external factors (e.g. deploys),
in which case a user is most likely not able to use the system anyway.

In short, this new solution should ensure that jobs are processed
properly and that in almost all cases a user has access to their
resources whenever they need to have access.
parent 3a54128d
...@@ -322,7 +322,7 @@ group :test do ...@@ -322,7 +322,7 @@ group :test do
gem 'email_spec', '~> 1.6.0' gem 'email_spec', '~> 1.6.0'
gem 'json-schema', '~> 2.6.2' gem 'json-schema', '~> 2.6.2'
gem 'webmock', '~> 1.21.0' gem 'webmock', '~> 1.21.0'
gem 'test_after_commit', '~> 0.4.2' gem 'test_after_commit', '~> 1.1'
gem 'sham_rack', '~> 1.3.6' gem 'sham_rack', '~> 1.3.6'
gem 'timecop', '~> 0.8.0' gem 'timecop', '~> 0.8.0'
end end
......
...@@ -760,7 +760,7 @@ GEM ...@@ -760,7 +760,7 @@ GEM
teaspoon-jasmine (2.2.0) teaspoon-jasmine (2.2.0)
teaspoon (>= 1.0.0) teaspoon (>= 1.0.0)
temple (0.7.7) temple (0.7.7)
test_after_commit (0.4.2) test_after_commit (1.1.0)
activerecord (>= 3.2) activerecord (>= 3.2)
thin (1.7.0) thin (1.7.0)
daemons (~> 1.0, >= 1.0.9) daemons (~> 1.0, >= 1.0.9)
...@@ -997,7 +997,7 @@ DEPENDENCIES ...@@ -997,7 +997,7 @@ DEPENDENCIES
sys-filesystem (~> 1.1.6) sys-filesystem (~> 1.1.6)
teaspoon (~> 1.1.0) teaspoon (~> 1.1.0)
teaspoon-jasmine (~> 2.2.0) teaspoon-jasmine (~> 2.2.0)
test_after_commit (~> 0.4.2) test_after_commit (~> 1.1)
thin (~> 1.7.0) thin (~> 1.7.0)
timecop (~> 0.8.0) timecop (~> 0.8.0)
truncato (~> 0.7.8) truncato (~> 0.7.8)
......
...@@ -68,9 +68,9 @@ class Member < ActiveRecord::Base ...@@ -68,9 +68,9 @@ class Member < ActiveRecord::Base
after_create :send_request, if: :request?, unless: :importing? after_create :send_request, if: :request?, unless: :importing?
after_create :create_notification_setting, unless: [:pending?, :importing?] after_create :create_notification_setting, unless: [:pending?, :importing?]
after_create :post_create_hook, unless: [:pending?, :importing?] after_create :post_create_hook, unless: [:pending?, :importing?]
after_create :refresh_member_authorized_projects, if: :importing?
after_update :post_update_hook, unless: [:pending?, :importing?] after_update :post_update_hook, unless: [:pending?, :importing?]
after_destroy :post_destroy_hook, unless: :pending? after_destroy :post_destroy_hook, unless: :pending?
after_commit :refresh_member_authorized_projects
delegate :name, :username, :email, to: :user, prefix: true delegate :name, :username, :email, to: :user, prefix: true
...@@ -147,8 +147,6 @@ class Member < ActiveRecord::Base ...@@ -147,8 +147,6 @@ class Member < ActiveRecord::Base
member.save member.save
end end
UserProjectAccessChangedService.new(user.id).execute if user.is_a?(User)
member member
end end
...@@ -275,23 +273,27 @@ class Member < ActiveRecord::Base ...@@ -275,23 +273,27 @@ class Member < ActiveRecord::Base
end end
def post_create_hook def post_create_hook
UserProjectAccessChangedService.new(user.id).execute
system_hook_service.execute_hooks_for(self, :create) system_hook_service.execute_hooks_for(self, :create)
end end
def post_update_hook def post_update_hook
UserProjectAccessChangedService.new(user.id).execute if access_level_changed? # override in sub class
end end
def post_destroy_hook def post_destroy_hook
refresh_member_authorized_projects
system_hook_service.execute_hooks_for(self, :destroy) system_hook_service.execute_hooks_for(self, :destroy)
end end
# Refreshes authorizations of the current member.
#
# This method schedules a job using Sidekiq and as such **must not** be called
# in a transaction. Doing so can lead to the job running before the
# transaction has been committed, resulting in the job either throwing an
# error or not doing any meaningful work.
def refresh_member_authorized_projects def refresh_member_authorized_projects
# If user/source is being destroyed, project access are gonna be destroyed eventually # If user/source is being destroyed, project access are going to be
# because of DB foreign keys, so we shouldn't bother with refreshing after each # destroyed eventually because of DB foreign keys, so we shouldn't bother
# member is destroyed through association # with refreshing after each member is destroyed through association
return if destroyed_by_association.present? return if destroyed_by_association.present?
UserProjectAccessChangedService.new(user_id).execute UserProjectAccessChangedService.new(user_id).execute
......
...@@ -16,8 +16,7 @@ class ProjectGroupLink < ActiveRecord::Base ...@@ -16,8 +16,7 @@ class ProjectGroupLink < ActiveRecord::Base
validates :group_access, inclusion: { in: Gitlab::Access.values }, presence: true validates :group_access, inclusion: { in: Gitlab::Access.values }, presence: true
validate :different_group validate :different_group
after_create :refresh_group_members_authorized_projects after_commit :refresh_group_members_authorized_projects
after_destroy :refresh_group_members_authorized_projects
def self.access_options def self.access_options
Gitlab::Access.options Gitlab::Access.options
......
...@@ -4,6 +4,6 @@ class UserProjectAccessChangedService ...@@ -4,6 +4,6 @@ class UserProjectAccessChangedService
end end
def execute def execute
AuthorizedProjectsWorker.bulk_perform_async(@user_ids.map { |id| [id] }) AuthorizedProjectsWorker.bulk_perform_and_wait(@user_ids.map { |id| [id] })
end end
end end
...@@ -2,6 +2,13 @@ class AuthorizedProjectsWorker ...@@ -2,6 +2,13 @@ class AuthorizedProjectsWorker
include Sidekiq::Worker include Sidekiq::Worker
include DedicatedSidekiqQueue include DedicatedSidekiqQueue
# Schedules multiple jobs and waits for them to be completed.
def self.bulk_perform_and_wait(args_list)
job_ids = bulk_perform_async(args_list)
Gitlab::JobWaiter.new(job_ids).wait
end
def self.bulk_perform_async(args_list) def self.bulk_perform_async(args_list)
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
end end
......
---
title: Fix race conditions for AuthorizedProjectsWorker
merge_request:
author:
...@@ -12,6 +12,11 @@ Sidekiq.configure_server do |config| ...@@ -12,6 +12,11 @@ Sidekiq.configure_server do |config|
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS']
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
chain.add Gitlab::SidekiqMiddleware::RequestStoreMiddleware unless ENV['SIDEKIQ_REQUEST_STORE'] == '0' chain.add Gitlab::SidekiqMiddleware::RequestStoreMiddleware unless ENV['SIDEKIQ_REQUEST_STORE'] == '0'
chain.add Gitlab::SidekiqStatus::ServerMiddleware
end
config.client_middleware do |chain|
chain.add Gitlab::SidekiqStatus::ClientMiddleware
end end
# Sidekiq-cron: load recurring jobs from gitlab.yml # Sidekiq-cron: load recurring jobs from gitlab.yml
...@@ -46,6 +51,10 @@ end ...@@ -46,6 +51,10 @@ end
Sidekiq.configure_client do |config| Sidekiq.configure_client do |config|
config.redis = redis_config_hash config.redis = redis_config_hash
config.client_middleware do |chain|
chain.add Gitlab::SidekiqStatus::ClientMiddleware
end
end end
# The Sidekiq client API always adds the queue to the Sidekiq queue # The Sidekiq client API always adds the queue to the Sidekiq queue
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
User.seed do |s| User.seed do |s|
s.id = 1 s.id = 1
......
require 'sidekiq/testing' require './spec/support/sidekiq'
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
20.times do |i| 20.times do |i|
begin begin
......
require 'sidekiq/testing' require './spec/support/sidekiq'
Sidekiq::Testing.inline! do Sidekiq::Testing.inline! do
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
Project.all.each do |project| Project.all.each do |project|
5.times do |i| 5.times do |i|
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
Project.all.each do |project| Project.all.each do |project|
10.times do 10.times do
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
# Limit the number of merge requests per project to avoid long seeds # Limit the number of merge requests per project to avoid long seeds
MAX_NUM_MERGE_REQUESTS = 10 MAX_NUM_MERGE_REQUESTS = 10
......
Gitlab::Seeder.quiet do require './spec/support/sidekiq'
# Creating keys runs a gitlab-shell worker. Since we may not have the right
# gitlab-shell path set (yet) we need to disable this for these fixtures.
Sidekiq::Testing.disable! do
Gitlab::Seeder.quiet do
User.first(10).each do |user| User.first(10).each do |user|
key = "ssh-rsa AAAAB3NzaC1yc2EAAAABJQAAAIEAiPWx6WM4lhHNedGfBpPJNPpZ7yKu+dnn1SJejgt#{user.id + 100}6k6YjzGGphH2TUxwKzxcKDKKezwkpfnxPkSMkuEspGRt/aZZ9wa++Oi7Qkr8prgHc4soW6NUlfDzpvZK2H5E7eQaSeP3SAwGmQKUFHCddNaP0L+hM7zhFNzjFvpaMgJw0=" key = "ssh-rsa AAAAB3NzaC1yc2EAAAABJQAAAIEAiPWx6WM4lhHNedGfBpPJNPpZ7yKu+dnn1SJejgt#{user.id + 100}6k6YjzGGphH2TUxwKzxcKDKKezwkpfnxPkSMkuEspGRt/aZZ9wa++Oi7Qkr8prgHc4soW6NUlfDzpvZK2H5E7eQaSeP3SAwGmQKUFHCddNaP0L+hM7zhFNzjFvpaMgJw0="
...@@ -9,4 +14,5 @@ Gitlab::Seeder.quiet do ...@@ -9,4 +14,5 @@ Gitlab::Seeder.quiet do
print '.' print '.'
end end
end
end end
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
content =<<eos content =<<eos
class Member < ActiveRecord::Base class Member < ActiveRecord::Base
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
Issue.all.each do |issue| Issue.all.each do |issue|
project = issue.project project = issue.project
......
require './spec/support/sidekiq'
class Gitlab::Seeder::Pipelines class Gitlab::Seeder::Pipelines
STAGES = %w[build test deploy notify] STAGES = %w[build test deploy notify]
BUILDS = [ BUILDS = [
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
emoji = Gitlab::AwardEmoji.emojis.keys emoji = Gitlab::AwardEmoji.emojis.keys
......
require './spec/support/sidekiq'
Gitlab::Seeder.quiet do Gitlab::Seeder.quiet do
admin_user = User.find(1) admin_user = User.find(1)
......
require 'sidekiq/testing' require './spec/support/sidekiq'
require './spec/support/test_env' require './spec/support/test_env'
class Gitlab::Seeder::CycleAnalytics class Gitlab::Seeder::CycleAnalytics
......
...@@ -4,7 +4,6 @@ SimpleCovEnv.start! ...@@ -4,7 +4,6 @@ SimpleCovEnv.start!
ENV['RAILS_ENV'] = 'test' ENV['RAILS_ENV'] = 'test'
require './config/environment' require './config/environment'
require 'rspec/expectations' require 'rspec/expectations'
require 'sidekiq/testing/inline'
require_relative 'capybara' require_relative 'capybara'
require_relative 'db_cleaner' require_relative 'db_cleaner'
...@@ -15,7 +14,7 @@ if ENV['CI'] ...@@ -15,7 +14,7 @@ if ENV['CI']
Knapsack::Adapters::SpinachAdapter.bind Knapsack::Adapters::SpinachAdapter.bind
end end
%w(select2_helper test_env repo_helpers wait_for_ajax).each do |f| %w(select2_helper test_env repo_helpers wait_for_ajax sidekiq).each do |f|
require Rails.root.join('spec', 'support', f) require Rails.root.join('spec', 'support', f)
end end
......
module Gitlab
# JobWaiter can be used to wait for a number of Sidekiq jobs to complete.
class JobWaiter
# The sleep interval between checking keys, in seconds.
INTERVAL = 0.1
# jobs - The job IDs to wait for.
def initialize(jobs)
@jobs = jobs
end
# Waits for all the jobs to be completed.
#
# timeout - The maximum amount of seconds to block the caller for. This
# ensures we don't indefinitely block a caller in case a job takes
# long to process, or is never processed.
def wait(timeout = 60)
start = Time.current
while (Time.current - start) <= timeout
break if SidekiqStatus.all_completed?(@jobs)
sleep(INTERVAL) # to not overload Redis too much.
end
end
end
end
module Gitlab
# The SidekiqStatus module and its child classes can be used for checking if a
# Sidekiq job has been processed or not.
#
# To check if a job has been completed, simply pass the job ID to the
# `completed?` method:
#
# job_id = SomeWorker.perform_async(...)
#
# if Gitlab::SidekiqStatus.completed?(job_id)
# ...
# end
#
# For each job ID registered a separate key is stored in Redis, making lookups
# much faster than using Sidekiq's built-in job finding/status API. These keys
# expire after a certain period of time to prevent storing too many keys in
# Redis.
module SidekiqStatus
STATUS_KEY = 'gitlab-sidekiq-status:%s'.freeze
# The default time (in seconds) after which a status key is expired
# automatically. The default of 30 minutes should be more than sufficient
# for most jobs.
DEFAULT_EXPIRATION = 30.minutes.to_i
# Starts tracking of the given job.
#
# jid - The Sidekiq job ID
# expire - The expiration time of the Redis key.
def self.set(jid, expire = DEFAULT_EXPIRATION)
Sidekiq.redis do |redis|
redis.set(key_for(jid), 1, ex: expire)
end
end
# Stops the tracking of the given job.
#
# jid - The Sidekiq job ID to remove.
def self.unset(jid)
Sidekiq.redis do |redis|
redis.del(key_for(jid))
end
end
# Returns true if all the given job have been completed.
#
# jids - The Sidekiq job IDs to check.
#
# Returns true or false.
def self.all_completed?(jids)
keys = jids.map { |jid| key_for(jid) }
responses = Sidekiq.redis do |redis|
redis.pipelined do
keys.each { |key| redis.exists(key) }
end
end
responses.all? { |value| !value }
end
def self.key_for(jid)
STATUS_KEY % jid
end
end
end
module Gitlab
module SidekiqStatus
class ClientMiddleware
def call(_, job, _, _)
SidekiqStatus.set(job['jid'])
yield
end
end
end
end
module Gitlab
module SidekiqStatus
class ServerMiddleware
def call(worker, job, queue)
ret = yield
SidekiqStatus.unset(job['jid'])
ret
end
end
end
end
require 'spec_helper'
describe Gitlab::JobWaiter do
describe '#wait' do
let(:waiter) { described_class.new(%w(a)) }
it 'returns when all jobs have been completed' do
expect(Gitlab::SidekiqStatus).to receive(:all_completed?).with(%w(a)).
and_return(true)
expect(waiter).not_to receive(:sleep)
waiter.wait
end
it 'sleeps between checking the job statuses' do
expect(Gitlab::SidekiqStatus).to receive(:all_completed?).
with(%w(a)).
and_return(false, true)
expect(waiter).to receive(:sleep).with(described_class::INTERVAL)
waiter.wait
end
it 'returns when timing out' do
expect(waiter).not_to receive(:sleep)
waiter.wait(0)
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqStatus::ClientMiddleware do
describe '#call' do
it 'tracks the job in Redis' do
expect(Gitlab::SidekiqStatus).to receive(:set).with('123')
described_class.new.
call('Foo', { 'jid' => '123' }, double(:queue), double(:pool)) { nil }
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqStatus::ServerMiddleware do
describe '#call' do
it 'stops tracking of a job upon completion' do
expect(Gitlab::SidekiqStatus).to receive(:unset).with('123')
ret = described_class.new.
call(double(:worker), { 'jid' => '123' }, double(:queue)) { 10 }
expect(ret).to eq(10)
end
end
end
require 'spec_helper'
describe Gitlab::SidekiqStatus do
describe '.set', :redis do
it 'stores the job ID' do
described_class.set('123')
key = described_class.key_for('123')
Sidekiq.redis do |redis|
expect(redis.exists(key)).to eq(true)
expect(redis.ttl(key) > 0).to eq(true)
end
end
end
describe '.unset', :redis do
it 'removes the job ID' do
described_class.set('123')
described_class.unset('123')
key = described_class.key_for('123')
Sidekiq.redis do |redis|
expect(redis.exists(key)).to eq(false)
end
end
end
describe '.all_completed?', :redis do
it 'returns true if all jobs have been completed' do
expect(described_class.all_completed?(%w(123))).to eq(true)
end
it 'returns false if a job has not yet been completed' do
described_class.set('123')
expect(described_class.all_completed?(%w(123 456))).to eq(false)
end
end
describe '.key_for' do
it 'returns the key for a job ID' do
key = described_class.key_for('123')
expect(key).to be_an_instance_of(String)
expect(key).to include('123')
end
end
end
require 'spec_helper'
describe UserProjectAccessChangedService do
describe '#execute' do
it 'schedules the user IDs' do
expect(AuthorizedProjectsWorker).to receive(:bulk_perform_and_wait).
with([[1], [2]])
described_class.new([1, 2]).execute
end
end
end
...@@ -6,7 +6,6 @@ ENV["RAILS_ENV"] ||= 'test' ...@@ -6,7 +6,6 @@ ENV["RAILS_ENV"] ||= 'test'
require File.expand_path("../../config/environment", __FILE__) require File.expand_path("../../config/environment", __FILE__)
require 'rspec/rails' require 'rspec/rails'
require 'shoulda/matchers' require 'shoulda/matchers'
require 'sidekiq/testing/inline'
require 'rspec/retry' require 'rspec/retry'
if ENV['CI'] && !ENV['NO_KNAPSACK'] if ENV['CI'] && !ENV['NO_KNAPSACK']
......
require 'sidekiq/testing/inline'
Sidekiq::Testing.server_middleware do |chain|
chain.add Gitlab::SidekiqStatus::ServerMiddleware
end
...@@ -3,6 +3,18 @@ require 'spec_helper' ...@@ -3,6 +3,18 @@ require 'spec_helper'
describe AuthorizedProjectsWorker do describe AuthorizedProjectsWorker do
let(:worker) { described_class.new } let(:worker) { described_class.new }
describe '.bulk_perform_and_wait' do
it 'schedules the ids and waits for the jobs to complete' do
project = create(:project)
project.owner.project_authorizations.delete_all
described_class.bulk_perform_and_wait([[project.owner.id]])
expect(project.owner.project_authorizations.count).to eq(1)
end
end
describe '#perform' do describe '#perform' do
it "refreshes user's authorized projects" do it "refreshes user's authorized projects" do
user = create(:user) user = create(:user)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment