Commit 1c873e63 authored by Luke Duncalfe's avatar Luke Duncalfe Committed by Alex Pooley

Update webhook failure state using a lease

This ensures that the failure state writes are operating from the latest
data when there are concurrent writes happening due to many
`WebHooks::LogExecutionWorker` jobs processing for the same hook.

https://gitlab.com/gitlab-org/gitlab/-/issues/352245
parent 06394349
...@@ -2,6 +2,12 @@ ...@@ -2,6 +2,12 @@
module WebHooks module WebHooks
class LogExecutionService class LogExecutionService
include ::Gitlab::ExclusiveLeaseHelpers
LOCK_TTL = 15.seconds.freeze
LOCK_SLEEP = 0.25.seconds.freeze
LOCK_RETRY = 65
attr_reader :hook, :log_data, :response_category attr_reader :hook, :log_data, :response_category
def initialize(hook:, log_data:, response_category:) def initialize(hook:, log_data:, response_category:)
...@@ -11,7 +17,7 @@ module WebHooks ...@@ -11,7 +17,7 @@ module WebHooks
end end
def execute def execute
update_hook_executability update_hook_failure_state
log_execution log_execution
end end
...@@ -21,7 +27,12 @@ module WebHooks ...@@ -21,7 +27,12 @@ module WebHooks
WebHookLog.create!(web_hook: hook, **log_data.transform_keys(&:to_sym)) WebHookLog.create!(web_hook: hook, **log_data.transform_keys(&:to_sym))
end end
def update_hook_executability # Perform this operation within an `Gitlab::ExclusiveLease` lock to make it
# safe to be called concurrently from different workers.
def update_hook_failure_state
in_lock(lock_name, ttl: LOCK_TTL, sleep_sec: LOCK_SLEEP, retries: LOCK_RETRY) do |retried|
hook.reset # Reload within the lock so properties are guaranteed to be current.
case response_category case response_category
when :ok when :ok
hook.enable! hook.enable!
...@@ -32,4 +43,9 @@ module WebHooks ...@@ -32,4 +43,9 @@ module WebHooks
end end
end end
end end
def lock_name
"web_hooks:update_hook_failure_state:#{hook.id}"
end
end
end end
...@@ -14,10 +14,6 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state ...@@ -14,10 +14,6 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state
let(:service_instance) { described_class.new(project_hook, data, :push_hooks) } let(:service_instance) { described_class.new(project_hook, data, :push_hooks) }
around do |example|
travel_to(Time.current) { example.run }
end
describe '#initialize' do describe '#initialize' do
before do before do
stub_application_setting(setting_name => setting) stub_application_setting(setting_name => setting)
...@@ -257,14 +253,6 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state ...@@ -257,14 +253,6 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state
end end
context 'execution logging' do context 'execution logging' do
let(:hook_log) { project_hook.web_hook_logs.last }
def run_service
service_instance.execute
::WebHooks::LogExecutionWorker.drain
project_hook.reload
end
context 'with success' do context 'with success' do
before do before do
stub_full_request(project_hook.url, method: :post).to_return(status: 200, body: 'Success') stub_full_request(project_hook.url, method: :post).to_return(status: 200, body: 'Success')
...@@ -280,42 +268,38 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state ...@@ -280,42 +268,38 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state
.with(hook: project_hook, log_data: Hash, response_category: :ok) .with(hook: project_hook, log_data: Hash, response_category: :ok)
.and_return(double(execute: nil)) .and_return(double(execute: nil))
run_service service_instance.execute
end
end
it 'log successful execution' do
run_service
expect(hook_log.trigger).to eq('push_hooks')
expect(hook_log.url).to eq(project_hook.url)
expect(hook_log.request_headers).to eq(headers)
expect(hook_log.response_body).to eq('Success')
expect(hook_log.response_status).to eq('200')
expect(hook_log.execution_duration).to be > 0
expect(hook_log.internal_error_message).to be_nil
end end
it 'does not log in the service itself' do
expect { service_instance.execute }.not_to change(::WebHookLog, :count)
end end
it 'does not increment the failure count' do it 'queues LogExecutionWorker correctly' do
expect { run_service }.not_to change(project_hook, :recent_failures) expect(WebHooks::LogExecutionWorker).to receive(:perform_async)
end .with(
project_hook.id,
hash_including(
trigger: 'push_hooks',
url: project_hook.url,
request_headers: headers,
request_data: data,
response_body: 'Success',
response_headers: {},
response_status: 200,
execution_duration: be > 0,
internal_error_message: nil
),
:ok,
nil
)
it 'does not change the disabled_until attribute' do service_instance.execute
expect { run_service }.not_to change(project_hook, :disabled_until)
end end
context 'when the hook had previously failed' do it 'queues LogExecutionWorker correctly, resulting in a log record (integration-style test)', :sidekiq_inline do
before do expect { service_instance.execute }.to change(::WebHookLog, :count).by(1)
project_hook.update!(recent_failures: 2)
end end
it 'resets the failure count' do it 'does not log in the service itself' do
expect { run_service }.to change(project_hook, :recent_failures).to(0) expect { service_instance.execute }.not_to change(::WebHookLog, :count)
end
end end
end end
...@@ -324,45 +308,26 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state ...@@ -324,45 +308,26 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state
stub_full_request(project_hook.url, method: :post).to_return(status: 400, body: 'Bad request') stub_full_request(project_hook.url, method: :post).to_return(status: 400, body: 'Bad request')
end end
it 'logs failed execution' do it 'queues LogExecutionWorker correctly' do
run_service expect(WebHooks::LogExecutionWorker).to receive(:perform_async)
.with(
expect(hook_log).to have_attributes( project_hook.id,
trigger: eq('push_hooks'), hash_including(
url: eq(project_hook.url), trigger: 'push_hooks',
request_headers: eq(headers), url: project_hook.url,
response_body: eq('Bad request'), request_headers: headers,
response_status: eq('400'), request_data: data,
response_body: 'Bad request',
response_headers: {},
response_status: 400,
execution_duration: be > 0, execution_duration: be > 0,
internal_error_message: be_nil internal_error_message: nil
),
:failed,
nil
) )
end
it 'increments the failure count' do
expect { run_service }.to change(project_hook, :recent_failures).by(1)
end
it 'does not change the disabled_until attribute' do
expect { run_service }.not_to change(project_hook, :disabled_until)
end
it 'does not allow the failure count to overflow' do
project_hook.update!(recent_failures: 32767)
expect { run_service }.not_to change(project_hook, :recent_failures)
end
context 'when the web_hooks_disable_failed FF is disabled' do service_instance.execute
before do
# Hook will only be executed if the flag is disabled.
stub_feature_flags(web_hooks_disable_failed: false)
end
it 'does not allow the failure count to overflow' do
project_hook.update!(recent_failures: 32767)
expect { run_service }.not_to change(project_hook, :recent_failures)
end
end end
end end
...@@ -371,65 +336,54 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state ...@@ -371,65 +336,54 @@ RSpec.describe WebHookService, :request_store, :clean_gitlab_redis_shared_state
stub_full_request(project_hook.url, method: :post).to_raise(SocketError.new('Some HTTP Post error')) stub_full_request(project_hook.url, method: :post).to_raise(SocketError.new('Some HTTP Post error'))
end end
it 'log failed execution' do it 'queues LogExecutionWorker correctly' do
run_service expect(WebHooks::LogExecutionWorker).to receive(:perform_async)
.with(
expect(hook_log.trigger).to eq('push_hooks') project_hook.id,
expect(hook_log.url).to eq(project_hook.url) hash_including(
expect(hook_log.request_headers).to eq(headers) trigger: 'push_hooks',
expect(hook_log.response_body).to eq('') url: project_hook.url,
expect(hook_log.response_status).to eq('internal error') request_headers: headers,
expect(hook_log.execution_duration).to be > 0 request_data: data,
expect(hook_log.internal_error_message).to eq('Some HTTP Post error') response_body: '',
end response_headers: {},
response_status: 'internal error',
it 'does not increment the failure count' do execution_duration: be > 0,
expect { run_service }.not_to change(project_hook, :recent_failures) internal_error_message: 'Some HTTP Post error'
end ),
:error,
it 'backs off' do nil
expect { run_service }.to change(project_hook, :disabled_until) )
end
it 'increases the backoff count' do
expect { run_service }.to change(project_hook, :backoff_count).by(1)
end
context 'when the previous cool-off was near the maximum' do
before do
project_hook.update!(disabled_until: 5.minutes.ago, backoff_count: 8)
end
it 'sets the disabled_until attribute' do
expect { run_service }.to change(project_hook, :disabled_until).to(1.day.from_now)
end
end
context 'when we have backed-off many many times' do
before do
project_hook.update!(disabled_until: 5.minutes.ago, backoff_count: 365)
end
it 'sets the disabled_until attribute' do service_instance.execute
expect { run_service }.to change(project_hook, :disabled_until).to(1.day.from_now)
end
end end
end end
context 'with unsafe response body' do context 'with unsafe response body' do
before do before do
stub_full_request(project_hook.url, method: :post).to_return(status: 200, body: "\xBB") stub_full_request(project_hook.url, method: :post).to_return(status: 200, body: "\xBB")
run_service
end end
it 'log successful execution' do it 'queues LogExecutionWorker with sanitized response_body' do
expect(hook_log.trigger).to eq('push_hooks') expect(WebHooks::LogExecutionWorker).to receive(:perform_async)
expect(hook_log.url).to eq(project_hook.url) .with(
expect(hook_log.request_headers).to eq(headers) project_hook.id,
expect(hook_log.response_body).to eq('') hash_including(
expect(hook_log.response_status).to eq('200') trigger: 'push_hooks',
expect(hook_log.execution_duration).to be > 0 url: project_hook.url,
expect(hook_log.internal_error_message).to be_nil request_headers: headers,
request_data: data,
response_body: '',
response_headers: {},
response_status: 200,
execution_duration: be > 0,
internal_error_message: nil
),
:ok,
nil
)
service_instance.execute
end end
end end
end end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe WebHooks::LogExecutionService do
include ExclusiveLeaseHelpers
describe '#execute' do
around do |example|
travel_to(Time.current) { example.run }
end
let_it_be_with_reload(:project_hook) { create(:project_hook) }
let(:response_category) { :ok }
let(:data) do
{
trigger: 'trigger_name',
url: 'https://example.com',
request_headers: { 'Header' => 'header value' },
request_data: { 'Request Data' => 'request data value' },
response_body: 'Response body',
response_status: '200',
execution_duration: 1.2,
internal_error_message: 'error message'
}
end
subject(:service) { described_class.new(hook: project_hook, log_data: data, response_category: response_category) }
it 'logs the data' do
expect { service.execute }.to change(::WebHookLog, :count).by(1)
expect(WebHookLog.recent.first).to have_attributes(data)
end
it 'updates failure state using a lease that ensures fresh state is written' do
service = described_class.new(hook: project_hook, log_data: data, response_category: :error)
WebHook.find(project_hook.id).update!(backoff_count: 1)
lease_key = "web_hooks:update_hook_failure_state:#{project_hook.id}"
lease = stub_exclusive_lease(lease_key, timeout: described_class::LOCK_TTL)
expect(lease).to receive(:try_obtain)
expect(lease).to receive(:cancel)
expect { service.execute }.to change { WebHook.find(project_hook.id).backoff_count }.to(2)
end
context 'when response_category is :ok' do
it 'does not increment the failure count' do
expect { service.execute }.not_to change(project_hook, :recent_failures)
end
it 'does not change the disabled_until attribute' do
expect { service.execute }.not_to change(project_hook, :disabled_until)
end
context 'when the hook had previously failed' do
before do
project_hook.update!(recent_failures: 2)
end
it 'resets the failure count' do
expect { service.execute }.to change(project_hook, :recent_failures).to(0)
end
end
end
context 'when response_category is :failed' do
let(:response_category) { :failed }
it 'increments the failure count' do
expect { service.execute }.to change(project_hook, :recent_failures).by(1)
end
it 'does not change the disabled_until attribute' do
expect { service.execute }.not_to change(project_hook, :disabled_until)
end
it 'does not allow the failure count to overflow' do
project_hook.update!(recent_failures: 32767)
expect { service.execute }.not_to change(project_hook, :recent_failures)
end
context 'when the web_hooks_disable_failed FF is disabled' do
before do
# Hook will only be executed if the flag is disabled.
stub_feature_flags(web_hooks_disable_failed: false)
end
it 'does not allow the failure count to overflow' do
project_hook.update!(recent_failures: 32767)
expect { service.execute }.not_to change(project_hook, :recent_failures)
end
end
end
context 'when response_category is :error' do
let(:response_category) { :error }
it 'does not increment the failure count' do
expect { service.execute }.not_to change(project_hook, :recent_failures)
end
it 'backs off' do
expect { service.execute }.to change(project_hook, :disabled_until)
end
it 'increases the backoff count' do
expect { service.execute }.to change(project_hook, :backoff_count).by(1)
end
context 'when the previous cool-off was near the maximum' do
before do
project_hook.update!(disabled_until: 5.minutes.ago, backoff_count: 8)
end
it 'sets the disabled_until attribute' do
expect { service.execute }.to change(project_hook, :disabled_until).to(1.day.from_now)
end
end
context 'when we have backed-off many many times' do
before do
project_hook.update!(disabled_until: 5.minutes.ago, backoff_count: 365)
end
it 'sets the disabled_until attribute' do
expect { service.execute }.to change(project_hook, :disabled_until).to(1.day.from_now)
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment