Commit 80863e73 authored by Fabio Pitino's avatar Fabio Pitino Committed by Stan Hu

Introduce Gitlab::EventStore as simple pub-sub system

parent 6ef717fc
# frozen_string_literal: true
module Ci
class PipelineCreatedEvent < ::Gitlab::EventStore::Event
def schema
{
'type' => 'object',
'properties' => {
'pipeline_id' => { 'type' => 'integer' }
}
}
end
end
end
......@@ -95,7 +95,14 @@ module Ci
.build!
if pipeline.persisted?
schedule_head_pipeline_update
if Feature.enabled?(:ci_publish_pipeline_events, pipeline.project, default_enabled: :yaml)
Gitlab::EventStore.publish(
Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id })
)
else
schedule_head_pipeline_update
end
create_namespace_onboarding_action
else
# If pipeline is not persisted, try to recover IID
......
......@@ -2420,6 +2420,15 @@
:weight: 1
:idempotent: true
:tags: []
- :name: merge_requests_update_head_pipeline
:worker_name: MergeRequests::UpdateHeadPipelineWorker
:feature_category: :code_review
:has_external_dependencies:
:urgency: :high
:resource_boundary: :cpu
:weight: 1
:idempotent: true
:tags: []
- :name: metrics_dashboard_prune_old_annotations
:worker_name: Metrics::Dashboard::PruneOldAnnotationsWorker
:feature_category: :metrics
......
# frozen_string_literal: true
module MergeRequests
class UpdateHeadPipelineWorker
include ApplicationWorker
include Gitlab::EventStore::Subscriber
feature_category :code_review
urgency :high
worker_resource_boundary :cpu
data_consistency :always
idempotent!
def handle_event(event)
Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline|
pipeline.all_merge_requests.opened.each do |merge_request|
UpdateHeadPipelineForMergeRequestWorker.perform_async(merge_request.id)
end
end
end
end
end
......@@ -8,8 +8,10 @@ class UpdateHeadPipelineForMergeRequestWorker
sidekiq_options retry: 3
include PipelineQueue
# NOTE: this worker belongs to :code_review since there is no CI logic.
queue_namespace :pipeline_processing
feature_category :continuous_integration
urgency :high
worker_resource_boundary :cpu
......
---
name: ci_publish_pipeline_events
introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/34042
rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/336752
milestone: '14.3'
type: development
group: group::pipeline execution
default_enabled: false
......@@ -255,6 +255,8 @@
- 1
- - merge_requests_sync_code_owner_approval_rules
- 1
- - merge_requests_update_head_pipeline
- 1
- - metrics_dashboard_prune_old_annotations
- 1
- - metrics_dashboard_sync_dashboards
......
---
stage: none
group: unassigned
info: To determine the technical writer assigned to the Stage/Group associated with this page, see https://about.gitlab.com/handbook/engineering/ux/technical-writing/#assignments
---
# GitLab EventStore
## Background
The monolithic GitLab project is becoming larger and more domains are being defined.
As a result, these domains are becoming entangled with each others due to temporal coupling.
An emblematic example is the [`PostReceive`](https://gitlab.com/gitlab-org/gitlab/blob/master/app/workers/post_receive.rb)
worker where a lot happens across multiple domains. If a new behavior reacts to
a new commit being pushed, then we add code somewhere in `PostReceive` or its sub-components
(`Git::ProcessRefChangesService`, for example).
This type of architecture:
- Is a violation of the Single Responsibility Principle.
- Increases the risk of adding code to a codebase you are not familiar with.
There may be nuances you don't know about which may introduce bugs or a performance degradation.
- Violates domain boundaries. Inside a specific namespace (for example `Git::`) we suddenly see
classes from other domains chiming in (like `Ci::` or `MergeRequests::`).
## What is EventStore?
`Gitlab:EventStore` is a basic pub-sub system built on top of the existing Sidekiq workers and observability we have today.
We use this system to apply an event-driven approach when modeling a domain while keeping coupling
to a minimum.
This essentially leaves the existing Sidekiq workers as-is to perform asynchronous work but inverts
the dependency.
### EventStore example
When a CI pipeline is created we update the head pipeline for any merge request matching the
pipeline's `ref`. The merge request can then display the status of the latest pipeline.
#### Without the EventStore
We change `Ci::CreatePipelineService` and add logic (like an `if` statement) to check if the
pipeline is created. Then we schedule a worker to run some side-effects for the `MergeRequests::` domain.
This style violates the [Open-Closed Principle](https://en.wikipedia.org/wiki/Open%E2%80%93closed_principle)
and unnecessarily add side-effects logic from other domains, increasing coupling:
```mermaid
graph LR
subgraph ci[CI]
cp[CreatePipelineService]
end
subgraph mr[MergeRequests]
upw[UpdateHeadPipelineWorker]
end
subgraph no[Namespaces::Onboarding]
pow[PipelinesOnboardedWorker]
end
cp -- perform_async --> upw
cp -- perform_async --> pow
```
#### With the EventStore
`Ci::CreatePipelineService` publishes an event `Ci::PipelineCreatedEvent` and its responsibility stops here.
The `MergeRequests::` domain can subscribe to this event with a worker `MergeRequests::UpdateHeadPipelineWorker`, so:
- Side-effects are scheduled asynchronously and don't impact the main business transaction that
emits the domain event.
- More side-effects can be added without modifying the main business transaction.
- We can clearly see what domains are involved and their ownership.
- We can identify what events occur in the system because they are explicitly declared.
With `Gitlab::EventStore` there is still coupling between the subscriber (Sidekiq worker) and the schema of the domain event.
This level of coupling is much smaller than having the main transaction (`Ci::CreatePipelineService`) coupled to:
- multiple subscribers.
- multiple ways of invoking subscribers (including conditional invocations).
- multiple ways of passing parameters.
```mermaid
graph LR
subgraph ci[CI]
cp[CreatePipelineService]
cp -- publish --> e[PipelineCreateEvent]
end
subgraph mr[MergeRequests]
upw[UpdateHeadPipelineWorker]
end
subgraph no[Namespaces::Onboarding]
pow[PipelinesOnboardedWorker]
end
upw -. subscribe .-> e
pow -. subscribe .-> e
```
Each subscriber, being itself a Sidekiq worker, can specify any attributes that are related
to the type of work they are responsible for. For example, one subscriber could define
`urgency: high` while another one less critical could set `urgency: low`.
The EventStore is only an abstraction that allows us to have Dependency Inversion. This helps
separating a business transaction from side-effects (often executed in other domains).
When an event is published, the EventStore calls `perform_async` on each subscribed worker,
passing in the event information as arguments. This essentially schedules a Sidekiq job on each
subscriber's queue.
This means that nothing else changes with regards to how subscribers work, as they are just
Sidekiq workers. For example: if a worker (subscriber) fails to execute a job, the job is put
back into Sidekiq to be retried.
## EventStore advantages
- Subscribers (Sidekiq workers) can be set to run quicker by changing the worker weight
if the side-effect is critical.
- Automatically enforce the fact that side-effects run asynchronously.
This makes it safe for other domains to subscribe to events without affecting the performance of the
main business transaction.
## Define an event
An `Event` object represents a domain event that occurred in a bounded context.
Notify other bounded contexts about something
that happened by publishing events, so that they can react to it.
Define new event classes under `app/events/<namespace>/` with a name representing something that happened in the past:
```ruby
class Ci::PipelineCreatedEvent < Gitlab::EventStore::Event
def schema
{
'type' => 'object',
'required' => ['pipeline_id'],
'properties' => {
'pipeline_id' => { 'type' => 'integer' },
'ref' => { 'type' => 'string' }
}
}
end
end
```
The schema is validated immediately when we initialize the event object so we can ensure that
publishers follow the contract with the subscribers.
We recommend using optional properties as much as possible, which require fewer rollouts for schema changes.
However, `required` properties could be used for unique identifiers of the event's subject. For example:
- `pipeline_id` can be a required property for a `Ci::PipelineCreatedEvent`.
- `project_id` can be a required property for a `Projects::ProjectDeletedEvent`.
Publish only properties that are needed by the subscribers without tailoring the payload to specific subscribers.
The payload should fully represent the event and not contain loosely related properties. For example:
```ruby
Ci::PipelineCreatedEvent.new(data: {
pipeline_id: pipeline.id,
# unless all subscribers need merge request IDs,
# this is data that can be fetched by the subscriber.
merge_request_ids: pipeline.all_merge_requests.pluck(:id)
})
```
Publishing events with more properties provides the subscribers with the data
they need in the first place. Otherwise subscribers have to fetch the additional data from the database.
However, this can lead to continuous changes to the schema and possibly adding properties that may not
represent the single source of truth.
It's best to use this technique as a performance optimization. For example: when an event has many
subscribers that all fetch the same data again from the database.
### Update the schema
Changes to the schema require multiple rollouts. While the new version is being deployed:
- Existing publishers can publish events using the old version.
- Existing subscribers can consume events using the old version.
- Events get persisted in the Sidekiq queue as job arguments, so we could have 2 versions of the schema during deployments.
As changing the schema ultimately impacts the Sidekiq arguments, please refer to our
[Sidekiq style guide](sidekiq_style_guide.md#changing-the-arguments-for-a-worker) with regards to multiple rollouts.
#### Add properties
1. Rollout 1:
- Add new properties as optional (not `required`).
- Update the subscriber so it can consume events with and without the new properties.
1. Rollout 2:
- Change the publisher to provide the new property
1. Rollout 3: (if the property should be `required`):
- Change the schema and the subscriber code to always expect it.
#### Remove properties
1. Rollout 1:
- If the property is `required`, make it optional.
- Update the subscriber so it does not always expect the property.
1. Rollout 2:
- Remove the property from the event publishing.
- Remove the code from the subscriber that processes the property.
#### Other changes
For other changes, like renaming a property, use the same steps:
1. Remove the old property
1. Add the new property
## Publish an event
To publish the event from the [previous example](#define-an-event):
```ruby
Gitlab::EventStore.publish(
Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id })
)
```
## Create a subscriber
A subscriber is a Sidekiq worker that includes the `Gitlab::EventStore::Subscriber` module.
This module takes care of the `perform` method and provides a better abstraction to handle
the event safely via the `handle_event` method. For example:
```ruby
module MergeRequests
class UpdateHeadPipelineWorker
include ApplicationWorker
include Gitlab::EventStore::Subscriber
def handle_event(event)
Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline|
# ...
end
end
end
end
```
## Register the subscriber to the event
To subscribe the worker to a specific event in `lib/gitlab/event_store.rb`,
add a line like this to the `Gitlab::EventStore.configure!` method:
```ruby
module Gitlab
module EventStore
def self.configure!
Store.new.tap do |store|
# ...
store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, to: ::Ci::PipelineCreatedEvent
# ...
end
end
end
end
```
Subscriptions are stored in memory when the Rails app is loaded and they are immediately frozen.
It's not possible to modify subscriptions at runtime.
### Conditional dispatch of events
A subscription can specify a condition when to accept an event:
```ruby
store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
to: ::Ci::PipelineCreatedEvent,
if: -> (event) { event.data[:merge_request_id].present? }
```
This tells the event store to dispatch `Ci::PipelineCreatedEvent`s to the subscriber if
the condition is met.
This technique can avoid scheduling Sidekiq jobs if the subscriber is interested in a
small subset of events.
WARNING:
When using conditional dispatch it must contain only cheap conditions because they are
executed synchronously every time the given event is published.
For complex conditions it's best to subscribe to all the events and then handle the logic
in the `handle_event` method of the subscriber worker.
......@@ -164,6 +164,7 @@ the [reviewer values](https://about.gitlab.com/handbook/engineering/workflow/rev
### General
- [Directory structure](directory_structure.md)
- [GitLab EventStore](event_store.md) to publish/subscribe to domain events
- [GitLab utilities](utilities.md)
- [Newlines style guide](newlines_styleguide.md)
- [Logging](logging.md)
......
# frozen_string_literal: true
# Gitlab::EventStore is a simple pub-sub mechanism that lets you publish
# domain events and use Sidekiq workers as event handlers.
#
# It can be used to decouple domains from different bounded contexts
# by publishing domain events and let any interested parties subscribe
# to them.
#
module Gitlab
module EventStore
Error = Class.new(StandardError)
InvalidEvent = Class.new(Error)
InvalidSubscriber = Class.new(Error)
def self.publish(event)
instance.publish(event)
end
def self.instance
@instance ||= configure!
end
# Define all event subscriptions using:
#
# store.subscribe(DomainA::SomeWorker, to: DomainB::SomeEvent)
#
# It is possible to subscribe to a subset of events matching a condition:
#
# store.subscribe(DomainA::SomeWorker, to: DomainB::SomeEvent), if: ->(event) { event.data == :some_value }
#
def self.configure!
Store.new do |store|
###
# Add subscriptions here:
store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, to: ::Ci::PipelineCreatedEvent
end
end
private_class_method :configure!
end
end
# frozen_string_literal: true
# An Event object represents a domain event that occurred in a bounded context.
# By publishing events we notify other bounded contexts about something
# that happened, so that they can react to it.
#
# Define new event classes under `app/events/<namespace>/` with a name
# representing something that happened in the past:
#
# class Projects::ProjectCreatedEvent < Gitlab::EventStore::Event
# def schema
# {
# 'type' => 'object',
# 'properties' => {
# 'project_id' => { 'type' => 'integer' }
# }
# }
# end
# end
#
# To publish it:
#
# Gitlab::EventStore.publish(
# Projects::ProjectCreatedEvent.new(data: { project_id: project.id })
# )
#
module Gitlab
module EventStore
class Event
attr_reader :data
def initialize(data:)
validate_schema!(data)
@data = data
end
def schema
raise NotImplementedError, 'must specify schema to validate the event'
end
private
def validate_schema!(data)
unless data.is_a?(Hash)
raise Gitlab::EventStore::InvalidEvent, "Event data must be a Hash"
end
unless JSONSchemer.schema(schema).valid?(data.deep_stringify_keys)
raise Gitlab::EventStore::InvalidEvent, "Data for event #{self.class} does not match the defined schema: #{schema}"
end
end
end
end
end
# frozen_string_literal: true
module Gitlab
module EventStore
class Store
attr_reader :subscriptions
def initialize
@subscriptions = Hash.new { |h, k| h[k] = [] }
yield(self) if block_given?
# freeze the subscriptions as safety measure to avoid further
# subcriptions after initialization.
lock!
end
def subscribe(worker, to:, if: nil)
condition = binding.local_variable_get('if')
Array(to).each do |event|
validate_subscription!(worker, event)
subscriptions[event] << Gitlab::EventStore::Subscription.new(worker, condition)
end
end
def publish(event)
unless event.is_a?(Event)
raise InvalidEvent, "Event being published is not an instance of Gitlab::EventStore::Event: got #{event.inspect}"
end
subscriptions[event.class].each do |subscription|
subscription.consume_event(event)
end
end
private
def lock!
@subscriptions.freeze
end
def validate_subscription!(subscriber, event_class)
unless event_class < Event
raise InvalidEvent, "Event being subscribed to is not a subclass of Gitlab::EventStore::Event: got #{event_class}"
end
unless subscriber.respond_to?(:perform_async)
raise InvalidSubscriber, "Subscriber is not an ApplicationWorker: got #{subscriber}"
end
end
end
end
end
# frozen_string_literal: true
# This module should be included in order to turn an ApplicationWorker
# into a Subscriber.
# This module overrides the `perform` method and provides a better and
# safer interface for handling events via `handle_event` method.
#
# @example:
# class SomeEventSubscriber
# include ApplicationWorker
# include Gitlab::EventStore::Subscriber
#
# def handle_event(event)
# # ...
# end
# end
module Gitlab
module EventStore
module Subscriber
def perform(event_type, data)
raise InvalidEvent, event_type unless self.class.const_defined?(event_type)
event = event_type.constantize.new(
data: data.with_indifferent_access
)
handle_event(event)
end
def handle_event(event)
raise NotImplementedError, 'you must implement this methods in order to handle events'
end
end
end
end
# frozen_string_literal: true
module Gitlab
module EventStore
class Subscription
attr_reader :worker, :condition
def initialize(worker, condition)
@worker = worker
@condition = condition
end
def consume_event(event)
return unless condition_met?(event)
worker.perform_async(event.class.name, event.data)
# TODO: Log dispatching of events to subscriber
# We rescue and track any exceptions here because we don't want to
# impact other subscribers if one is faulty.
# The method `condition_met?`, since it can run a block, it might encounter
# a bug. By raising an exception here we could interrupt the publishing
# process, preventing other subscribers from consuming the event.
rescue StandardError => e
Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e, event_class: event.class.name, event_data: event.data)
end
private
def condition_met?(event)
return true unless condition
condition.call(event)
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::EventStore::Event do
let(:event_class) { stub_const('TestEvent', Class.new(described_class)) }
let(:event) { event_class.new(data: data) }
let(:data) { { project_id: 123, project_path: 'org/the-project' } }
context 'when schema is not defined' do
it 'raises an error on initialization' do
expect { event }.to raise_error(NotImplementedError)
end
end
context 'when schema is defined' do
before do
event_class.class_eval do
def schema
{
'required' => ['project_id'],
'type' => 'object',
'properties' => {
'project_id' => { 'type' => 'integer' },
'project_path' => { 'type' => 'string' }
}
}
end
end
end
describe 'schema validation' do
context 'when data matches the schema' do
it 'initializes the event correctly' do
expect(event.data).to eq(data)
end
end
context 'when required properties are present as well as unknown properties' do
let(:data) { { project_id: 123, unknown_key: 'unknown_value' } }
it 'initializes the event correctly' do
expect(event.data).to eq(data)
end
end
context 'when some properties are missing' do
let(:data) { { project_path: 'org/the-project' } }
it 'expects all properties to be present' do
expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, /does not match the defined schema/)
end
end
context 'when data is not a Hash' do
let(:data) { 123 }
it 'raises an error' do
expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, 'Event data must be a Hash')
end
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::EventStore::Store do
let(:event_klass) { stub_const('TestEvent', Class.new(Gitlab::EventStore::Event)) }
let(:event) { event_klass.new(data: data) }
let(:another_event_klass) { stub_const('TestAnotherEvent', Class.new(Gitlab::EventStore::Event)) }
let(:worker) do
stub_const('EventSubscriber', Class.new).tap do |klass|
klass.class_eval do
include ApplicationWorker
include Gitlab::EventStore::Subscriber
def handle_event(event)
event.data
end
end
end
end
let(:another_worker) do
stub_const('AnotherEventSubscriber', Class.new).tap do |klass|
klass.class_eval do
include ApplicationWorker
include Gitlab::EventStore::Subscriber
end
end
end
let(:unrelated_worker) do
stub_const('UnrelatedEventSubscriber', Class.new).tap do |klass|
klass.class_eval do
include ApplicationWorker
include Gitlab::EventStore::Subscriber
end
end
end
before do
event_klass.class_eval do
def schema
{
'required' => %w[name id],
'type' => 'object',
'properties' => {
'name' => { 'type' => 'string' },
'id' => { 'type' => 'integer' }
}
}
end
end
end
describe '#subscribe' do
it 'subscribes a worker to an event' do
store = described_class.new do |s|
s.subscribe worker, to: event_klass
end
subscriptions = store.subscriptions[event_klass]
expect(subscriptions.map(&:worker)).to contain_exactly(worker)
end
it 'subscribes multiple workers to an event' do
store = described_class.new do |s|
s.subscribe worker, to: event_klass
s.subscribe another_worker, to: event_klass
end
subscriptions = store.subscriptions[event_klass]
expect(subscriptions.map(&:worker)).to contain_exactly(worker, another_worker)
end
it 'subscribes a worker to multiple events is separate calls' do
store = described_class.new do |s|
s.subscribe worker, to: event_klass
s.subscribe worker, to: another_event_klass
end
subscriptions = store.subscriptions[event_klass]
expect(subscriptions.map(&:worker)).to contain_exactly(worker)
subscriptions = store.subscriptions[another_event_klass]
expect(subscriptions.map(&:worker)).to contain_exactly(worker)
end
it 'subscribes a worker to multiple events in a single call' do
store = described_class.new do |s|
s.subscribe worker, to: [event_klass, another_event_klass]
end
subscriptions = store.subscriptions[event_klass]
expect(subscriptions.map(&:worker)).to contain_exactly(worker)
subscriptions = store.subscriptions[another_event_klass]
expect(subscriptions.map(&:worker)).to contain_exactly(worker)
end
it 'subscribes a worker to an event with condition' do
store = described_class.new do |s|
s.subscribe worker, to: event_klass, if: ->(event) { event.data[:name] == 'Alice' }
end
subscriptions = store.subscriptions[event_klass]
expect(subscriptions.size).to eq(1)
subscription = subscriptions.first
expect(subscription).to be_an_instance_of(Gitlab::EventStore::Subscription)
expect(subscription.worker).to eq(worker)
expect(subscription.condition.call(double(data: { name: 'Bob' }))).to eq(false)
expect(subscription.condition.call(double(data: { name: 'Alice' }))).to eq(true)
end
it 'refuses the subscription if the target is not an Event object' do
expect do
described_class.new do |s|
s.subscribe worker, to: Integer
end
end.to raise_error(
Gitlab::EventStore::Error,
/Event being subscribed to is not a subclass of Gitlab::EventStore::Event/)
end
it 'refuses the subscription if the subscriber is not a worker' do
expect do
described_class.new do |s|
s.subscribe double, to: event_klass
end
end.to raise_error(
Gitlab::EventStore::Error,
/Subscriber is not an ApplicationWorker/)
end
end
describe '#publish' do
let(:data) { { name: 'Bob', id: 123 } }
context 'when event has a subscribed worker' do
let(:store) do
described_class.new do |store|
store.subscribe worker, to: event_klass
store.subscribe another_worker, to: another_event_klass
end
end
it 'dispatches the event to the subscribed worker' do
expect(worker).to receive(:perform_async).with('TestEvent', data)
expect(another_worker).not_to receive(:perform_async)
store.publish(event)
end
context 'when other workers subscribe to the same event' do
let(:store) do
described_class.new do |store|
store.subscribe worker, to: event_klass
store.subscribe another_worker, to: event_klass
store.subscribe unrelated_worker, to: another_event_klass
end
end
it 'dispatches the event to each subscribed worker' do
expect(worker).to receive(:perform_async).with('TestEvent', data)
expect(another_worker).to receive(:perform_async).with('TestEvent', data)
expect(unrelated_worker).not_to receive(:perform_async)
store.publish(event)
end
end
context 'when an error is raised' do
before do
allow(worker).to receive(:perform_async).and_raise(NoMethodError, 'the error message')
end
it 'is rescued and tracked' do
expect(Gitlab::ErrorTracking)
.to receive(:track_and_raise_for_dev_exception)
.with(kind_of(NoMethodError), event_class: event.class.name, event_data: event.data)
.and_call_original
expect { store.publish(event) }.to raise_error(NoMethodError, 'the error message')
end
end
it 'raises and tracks an error when event is published inside a database transaction' do
expect(Gitlab::ErrorTracking)
.to receive(:track_and_raise_for_dev_exception)
.at_least(:once)
.and_call_original
expect do
ApplicationRecord.transaction do
store.publish(event)
end
end.to raise_error(Sidekiq::Worker::EnqueueFromTransactionError)
end
it 'refuses publishing if the target is not an Event object' do
expect { store.publish(double(:event)) }
.to raise_error(
Gitlab::EventStore::Error,
/Event being published is not an instance of Gitlab::EventStore::Event/)
end
end
context 'when event has subscribed workers with condition' do
let(:store) do
described_class.new do |s|
s.subscribe worker, to: event_klass, if: -> (event) { event.data[:name] == 'Bob' }
s.subscribe another_worker, to: event_klass, if: -> (event) { event.data[:name] == 'Alice' }
end
end
let(:event) { event_klass.new(data: data) }
it 'dispatches the event to the workers satisfying the condition' do
expect(worker).to receive(:perform_async).with('TestEvent', data)
expect(another_worker).not_to receive(:perform_async)
store.publish(event)
end
end
end
describe 'subscriber' do
let(:data) { { name: 'Bob', id: 123 } }
let(:event_name) { event.class.name }
let(:worker_instance) { worker.new }
subject { worker_instance.perform(event_name, data) }
it 'handles the event' do
expect(worker_instance).to receive(:handle_event).with(instance_of(event.class))
expect_any_instance_of(event.class) do |event|
expect(event).to receive(:data).and_return(data)
end
subject
end
context 'when the event name does not exist' do
let(:event_name) { 'UnknownClass' }
it 'raises an error' do
expect { subject }.to raise_error(Gitlab::EventStore::InvalidEvent)
end
end
context 'when the worker does not define handle_event method' do
let(:worker_instance) { another_worker.new }
it 'raises an error' do
expect { subject }.to raise_error(NotImplementedError)
end
end
end
end
......@@ -146,138 +146,44 @@ RSpec.describe Ci::CreatePipelineService do
end
context 'when merge requests already exist for this source branch' do
let(:merge_request_1) do
let!(:merge_request_1) do
create(:merge_request, source_branch: 'feature', target_branch: "master", source_project: project)
end
let(:merge_request_2) do
let!(:merge_request_2) do
create(:merge_request, source_branch: 'feature', target_branch: "v1.1.0", source_project: project)
end
context 'when related merge request is already merged' do
let!(:merged_merge_request) do
create(:merge_request, source_branch: 'master', target_branch: "branch_2", source_project: project, state: 'merged')
end
it 'does not schedule update head pipeline job' do
expect(UpdateHeadPipelineForMergeRequestWorker).not_to receive(:perform_async).with(merged_merge_request.id)
execute_service
end
end
context 'when the head pipeline sha equals merge request sha' do
it 'updates head pipeline of each merge request', :sidekiq_might_not_need_inline do
merge_request_1
merge_request_2
head_pipeline = execute_service(ref: 'feature', after: nil).payload
expect(merge_request_1.reload.head_pipeline).to eq(head_pipeline)
expect(merge_request_2.reload.head_pipeline).to eq(head_pipeline)
end
end
context 'when the head pipeline sha does not equal merge request sha' do
it 'does not update the head piepeline of MRs' do
merge_request_1
merge_request_2
allow_any_instance_of(Ci::Pipeline).to receive(:latest?).and_return(true)
expect { execute_service(after: 'ae73cb07c9eeaf35924a10f713b364d32b2dd34f') }.not_to raise_error
last_pipeline = Ci::Pipeline.last
expect(merge_request_1.reload.head_pipeline).not_to eq(last_pipeline)
expect(merge_request_2.reload.head_pipeline).not_to eq(last_pipeline)
end
end
context 'when there is no pipeline for source branch' do
it "does not update merge request head pipeline" do
merge_request = create(:merge_request, source_branch: 'feature',
target_branch: "branch_1",
source_project: project)
head_pipeline = execute_service.payload
expect(merge_request.reload.head_pipeline).not_to eq(head_pipeline)
end
end
context 'when merge request target project is different from source project' do
let!(:project) { fork_project(target_project, nil, repository: true) }
let!(:target_project) { create(:project, :repository) }
let!(:user) { create(:user) }
before do
project.add_developer(user)
end
it 'updates head pipeline for merge request', :sidekiq_might_not_need_inline do
merge_request = create(:merge_request, source_branch: 'feature',
target_branch: "master",
source_project: project,
target_project: target_project)
head_pipeline = execute_service(ref: 'feature', after: nil).payload
expect(merge_request.reload.head_pipeline).to eq(head_pipeline)
end
end
context 'when the pipeline is not the latest for the branch' do
it 'does not update merge request head pipeline' do
merge_request = create(:merge_request, source_branch: 'master',
target_branch: "branch_1",
source_project: project)
allow_any_instance_of(MergeRequest)
.to receive(:find_actual_head_pipeline) { }
execute_service
# TODO: remove after ci_publish_pipeline_events FF is removed
# https://gitlab.com/gitlab-org/gitlab/-/issues/336752
it 'does not schedule sync update for the head pipeline of the merge request' do
expect(UpdateHeadPipelineForMergeRequestWorker)
.not_to receive(:perform_async)
expect(merge_request.reload.head_pipeline).to be_nil
execute_service(ref: 'feature', after: nil)
end
end
context 'when pipeline has errors' do
context 'when feature flag ci_publish_pipeline_events is disabled' do
before do
stub_ci_pipeline_yaml_file('some invalid syntax')
stub_feature_flags(ci_publish_pipeline_events: false)
end
it 'updates merge request head pipeline reference', :sidekiq_might_not_need_inline do
merge_request = create(:merge_request, source_branch: 'master',
target_branch: 'feature',
source_project: project)
head_pipeline = execute_service.payload
expect(head_pipeline).to be_persisted
expect(head_pipeline.yaml_errors).to be_present
expect(head_pipeline.messages).to be_present
expect(merge_request.reload.head_pipeline).to eq head_pipeline
end
end
context 'when pipeline has been skipped' do
before do
allow_any_instance_of(Ci::Pipeline)
.to receive(:git_commit_message)
.and_return('some commit [ci skip]')
end
it 'updates merge request head pipeline', :sidekiq_might_not_need_inline do
merge_request = create(:merge_request, source_branch: 'master',
target_branch: 'feature',
source_project: project)
head_pipeline = execute_service.payload
it 'schedules update for the head pipeline of the merge request' do
expect(UpdateHeadPipelineForMergeRequestWorker)
.to receive(:perform_async).with(merge_request_1.id)
expect(UpdateHeadPipelineForMergeRequestWorker)
.to receive(:perform_async).with(merge_request_2.id)
expect(head_pipeline).to be_skipped
expect(head_pipeline).to be_persisted
expect(merge_request.reload.head_pipeline).to eq head_pipeline
execute_service(ref: 'feature', after: nil)
end
end
end
......@@ -1655,7 +1561,7 @@ RSpec.describe Ci::CreatePipelineService do
expect(pipeline.target_sha).to be_nil
end
it 'schedules update for the head pipeline of the merge request' do
it 'schedules update for the head pipeline of the merge request', :sidekiq_inline do
expect(UpdateHeadPipelineForMergeRequestWorker)
.to receive(:perform_async).with(merge_request.id)
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe MergeRequests::UpdateHeadPipelineWorker do
include ProjectForksHelper
let_it_be(:project) { create(:project, :repository) }
let(:ref) { 'master' }
let(:pipeline) { create(:ci_pipeline, project: project, ref: ref) }
let(:event) { Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id }) }
subject { consume_event(event) }
def consume_event(event)
described_class.new.perform(event.class.name, event.data)
end
context 'when merge requests already exist for this source branch', :sidekiq_inline do
let(:merge_request_1) do
create(:merge_request, source_branch: 'feature', target_branch: "master", source_project: project)
end
let(:merge_request_2) do
create(:merge_request, source_branch: 'feature', target_branch: "v1.1.0", source_project: project)
end
context 'when related merge request is already merged' do
let!(:merged_merge_request) do
create(:merge_request, source_branch: 'master', target_branch: "branch_2", source_project: project, state: 'merged')
end
it 'does not schedule update head pipeline job' do
expect(UpdateHeadPipelineForMergeRequestWorker).not_to receive(:perform_async).with(merged_merge_request.id)
subject
end
end
context 'when the head pipeline sha equals merge request sha' do
let(:ref) { 'feature' }
before do
pipeline.update!(sha: project.repository.commit(ref).id)
end
it 'updates head pipeline of each merge request' do
merge_request_1
merge_request_2
subject
expect(merge_request_1.reload.head_pipeline).to eq(pipeline)
expect(merge_request_2.reload.head_pipeline).to eq(pipeline)
end
end
context 'when the head pipeline sha does not equal merge request sha' do
let(:ref) { 'feature' }
it 'does not update the head piepeline of MRs' do
merge_request_1
merge_request_2
subject
expect(merge_request_1.reload.head_pipeline).not_to eq(pipeline)
expect(merge_request_2.reload.head_pipeline).not_to eq(pipeline)
end
end
context 'when there is no pipeline for source branch' do
it "does not update merge request head pipeline" do
merge_request = create(:merge_request, source_branch: 'feature',
target_branch: "branch_1",
source_project: project)
subject
expect(merge_request.reload.head_pipeline).not_to eq(pipeline)
end
end
context 'when merge request target project is different from source project' do
let(:project) { fork_project(target_project, nil, repository: true) }
let(:target_project) { create(:project, :repository) }
let(:user) { create(:user) }
let(:ref) { 'feature' }
before do
project.add_developer(user)
pipeline.update!(sha: project.repository.commit(ref).id)
end
it 'updates head pipeline for merge request' do
merge_request = create(:merge_request, source_branch: 'feature',
target_branch: "master",
source_project: project,
target_project: target_project)
subject
expect(merge_request.reload.head_pipeline).to eq(pipeline)
end
end
context 'when the pipeline is not the latest for the branch' do
it 'does not update merge request head pipeline' do
merge_request = create(:merge_request, source_branch: 'master',
target_branch: "branch_1",
source_project: project)
create(:ci_pipeline, project: pipeline.project, ref: pipeline.ref)
subject
expect(merge_request.reload.head_pipeline).to be_nil
end
end
context 'when pipeline has errors' do
before do
pipeline.update!(yaml_errors: 'some errors', status: :failed)
end
it 'updates merge request head pipeline reference' do
merge_request = create(:merge_request, source_branch: 'master',
target_branch: 'feature',
source_project: project)
subject
expect(merge_request.reload.head_pipeline).to eq(pipeline)
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment