Commit 6e65f367 authored by Kamil Trzciński's avatar Kamil Trzciński Committed by Yorick Peterse

Fix sticking of Runner a new job is scheduled

parent 64ea9921
...@@ -2,7 +2,7 @@ module EE ...@@ -2,7 +2,7 @@ module EE
module Ci module Ci
module Runner module Runner
def tick_runner_queue def tick_runner_queue
::Gitlab::Database::LoadBalancing::Sticking.stick(:runner, token) ::Gitlab::Database::LoadBalancing::Sticking.stick(:runner, id)
super super
end end
......
...@@ -7,6 +7,22 @@ module EE ...@@ -7,6 +7,22 @@ module EE
module RegisterJobService module RegisterJobService
extend ActiveSupport::Concern extend ActiveSupport::Concern
def execute
db_all_caught_up = ::Gitlab::Database::LoadBalancing::Sticking.all_caught_up?(:runner, runner.id)
super.tap do |result|
# Since we execute this query against replica it might lead to false-positive
# We might receive the positive response: "hi, we don't have any more builds for you".
# This might not be true. If our DB replica is not up-to date with when runner event was generated
# we might still have some CI builds to be picked. Instead we should say to runner:
# "Hi, we don't have any more builds now, but not everything is right anyway, so try again".
# Runner will retry, but again, against replica, and again will check if replication lag did catch-up.
if !db_all_caught_up && !result.build
return ::Ci::RegisterJobService::Result.new(nil, false) # rubocop:disable Cop/AvoidReturnFromBlocks
end
end
end
def builds_for_shared_runner def builds_for_shared_runner
return super unless shared_runner_build_limits_feature_enabled? return super unless shared_runner_build_limits_feature_enabled?
......
---
title: Fix sticking of runner to primary if new job is scheduled
merge_request:
author:
type: fixed
...@@ -25,20 +25,23 @@ module Gitlab ...@@ -25,20 +25,23 @@ module Gitlab
stick(namespace, id) if Session.current.performed_write? stick(namespace, id) if Session.current.performed_write?
end end
# Sticks to the primary if necessary, otherwise unsticks an object (if # Checks if we were able to caught-up with all the work
# it was previously stuck to the primary). def self.all_caught_up?(namespace, id)
def self.unstick_or_continue_sticking(namespace, id)
location = last_write_location_for(namespace, id) location = last_write_location_for(namespace, id)
return unless location return true unless location
if load_balancer.all_caught_up?(location) load_balancer.all_caught_up?(location).tap do |caught_up|
unstick(namespace, id) unstick(namespace, id) if caught_up
else
Session.current.use_primary!
end end
end end
# Sticks to the primary if necessary, otherwise unsticks an object (if
# it was previously stuck to the primary).
def self.unstick_or_continue_sticking(namespace, id)
Session.current.use_primary! unless all_caught_up?(namespace, id)
end
# Starts sticking to the primary for the given namespace and id, using # Starts sticking to the primary for the given namespace and id, using
# the latest WAL pointer from the primary. # the latest WAL pointer from the primary.
def self.stick(namespace, id) def self.stick(namespace, id)
......
...@@ -42,6 +42,46 @@ describe Gitlab::Database::LoadBalancing::Sticking, :redis do ...@@ -42,6 +42,46 @@ describe Gitlab::Database::LoadBalancing::Sticking, :redis do
end end
end end
describe '.all_caught_up?' do
let(:lb) { double(:lb) }
before do
allow(described_class).to receive(:load_balancer).and_return(lb)
end
it 'returns true if no write location could be found' do
allow(described_class).to receive(:last_write_location_for)
.with(:user, 42)
.and_return(nil)
expect(lb).not_to receive(:all_caught_up?)
expect(described_class.all_caught_up?(:user, 42)).to eq(true)
end
it 'returns true, and unsticks if all secondaries have caught up' do
allow(described_class).to receive(:last_write_location_for)
.with(:user, 42)
.and_return('foo')
allow(lb).to receive(:all_caught_up?).with('foo').and_return(true)
expect(described_class).to receive(:unstick).with(:user, 42)
expect(described_class.all_caught_up?(:user, 42)).to eq(true)
end
it 'return false if the secondaries have not yet caught up' do
allow(described_class).to receive(:last_write_location_for)
.with(:user, 42)
.and_return('foo')
allow(lb).to receive(:all_caught_up?).with('foo').and_return(false)
expect(described_class.all_caught_up?(:user, 42)).to eq(false)
end
end
describe '.unstick_or_continue_sticking' do describe '.unstick_or_continue_sticking' do
let(:lb) { double(:lb) } let(:lb) { double(:lb) }
......
...@@ -9,7 +9,7 @@ describe EE::Ci::Runner do ...@@ -9,7 +9,7 @@ describe EE::Ci::Runner do
.and_return(true) .and_return(true)
expect(Gitlab::Database::LoadBalancing::Sticking).to receive(:stick) expect(Gitlab::Database::LoadBalancing::Sticking).to receive(:stick)
.with(:runner, runner.token) .with(:runner, runner.id)
expect(Gitlab::Workhorse).to receive(:set_key_and_notify) expect(Gitlab::Workhorse).to receive(:set_key_and_notify)
......
require 'spec_helper' require 'spec_helper'
describe Ci::RegisterJobService do describe Ci::RegisterJobService do
set(:shared_runner) { create(:ci_runner, :instance) }
let!(:project) { create :project, shared_runners_enabled: false } let!(:project) { create :project, shared_runners_enabled: false }
let!(:pipeline) { create :ci_empty_pipeline, project: project } let!(:pipeline) { create :ci_empty_pipeline, project: project }
let!(:pending_build) { create :ci_build, pipeline: pipeline } let!(:pending_build) { create :ci_build, pipeline: pipeline }
let(:shared_runner) { create(:ci_runner, :instance) }
describe '#execute' do describe '#execute' do
context 'checks database loadbalancing stickiness' do
subject { described_class.new(shared_runner).execute }
it 'result is valid if replica did caught-up' do
allow(Gitlab::Database::LoadBalancing).to receive(:enable?)
.and_return(true)
expect(Gitlab::Database::LoadBalancing::Sticking).to receive(:all_caught_up?)
.with(:runner, shared_runner.id) { true }
expect(subject).to be_valid
end
it 'result is invalid if replica did not caught-up' do
allow(Gitlab::Database::LoadBalancing).to receive(:enable?)
.and_return(true)
expect(Gitlab::Database::LoadBalancing::Sticking).to receive(:all_caught_up?)
.with(:runner, shared_runner.id) { false }
expect(subject).not_to be_valid
end
end
context 'for project with shared runners when global minutes limit is set' do context 'for project with shared runners when global minutes limit is set' do
before do before do
project.update(shared_runners_enabled: true) project.update(shared_runners_enabled: true)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment