Commit 79c526e7 authored by Eugenia Grieff's avatar Eugenia Grieff

Add worker to refresh cache dissues count

- Use this worker in BulkUpdateService for updated state of
issuables that are issues
parent 2e2bdeb7
......@@ -19,9 +19,19 @@ module Groups
cached_count = Rails.cache.read(cache_key)
return cached_count unless cached_count.blank?
refreshed_count = uncached_count
update_cache_for_key(cache_key) { refreshed_count } if refreshed_count > CACHED_COUNT_THRESHOLD
refreshed_count
refresh_cache_over_threshold
end
def refresh_cache_over_threshold(new_count = nil)
new_count ||= uncached_count
if new_count > CACHED_COUNT_THRESHOLD
update_cache_for_key(cache_key) { new_count }
else
delete_cache
end
new_count
end
def cache_key
......
......@@ -5,48 +5,6 @@ module Groups
class OpenIssuesCountService < Groups::CountService
PUBLIC_COUNT_KEY = 'group_public_open_issues_count'
TOTAL_COUNT_KEY = 'group_total_open_issues_count'
CACHED_COUNT_THRESHOLD = 1000
EXPIRATION_TIME = 24.hours
attr_reader :group, :user
def initialize(group, user = nil)
@group = group
@user = user
end
# Reads count value from cache and return it if present.
# If empty or expired, #uncached_count will calculate the issues count for the group and
# compare it with the threshold. If it is greater, it will be written to the cache and returned.
# If below, it will be returned without being cached.
# This results in only caching large counts and calculating the rest with every call to maintain
# accuracy.
def count
cached_count = Rails.cache.read(cache_key)
return cached_count unless cached_count.blank?
refresh_cache_over_threshold
end
def cache_key(key = nil)
['groups', 'open_issues_count_service', VERSION, group.id, cache_key_name]
end
def refresh_cache_over_threshold(new_count = nil)
new_count ||= uncached_count
if new_count > CACHED_COUNT_THRESHOLD
update_cache_for_key(cache_key) { new_count }
else
delete_cache
end
new_count
end
def cached_count
Rails.cache.read(cache_key)
end
private
......
......@@ -15,13 +15,13 @@ module Issuable
def execute(type)
ids = params.delete(:issuable_ids).split(",")
set_update_params(type)
updated_issues_count = update_issuables(type, ids)
updated_issuables = update_issuables(type, ids)
if updated_issues_count > 0 && requires_issues_count_cache_refresh?(type)
update_group_cached_counts(updated_issues_count)
if !updated_issuables.empty? && requires_count_cache_refresh?(type)
schedule_group_count_refresh(type, updated_issuables)
end
response_success(payload: { count: updated_issues_count })
response_success(payload: { count: updated_issuables.size })
rescue ArgumentError => e
response_error(e.message, 422)
end
......@@ -60,7 +60,7 @@ module Issuable
update_class.new(issuable.issuing_parent, current_user, params).execute(issuable)
end
items.size
items
end
def find_issuables(parent, model_class, ids)
......@@ -86,26 +86,15 @@ module Issuable
ServiceResponse.error(message: message, http_status: http_status)
end
def requires_issues_count_cache_refresh?(type)
type == 'issue' && params.include?(:state_event) && group.present?
def requires_count_cache_refresh?(type)
type.to_sym == :issue && params.include?(:state_event)
end
def update_group_cached_counts(updated_issues_count)
count_service = Groups::OpenIssuesCountService.new(group, current_user)
cached_count = count_service.cached_count
return if cached_count.blank?
def schedule_group_count_refresh(issuable_type, updated_issuables)
group_ids = updated_issuables.map(&:project).map { |project| project.group&.id }.uniq
return unless group_ids.any?
new_count = compute_new_cached_count(cached_count, updated_issues_count)
count_service.refresh_cache_over_threshold(new_count)
end
def group
parent.is_a?(Group) ? parent : parent&.group
end
def compute_new_cached_count(cached_count, updated_issues_count)
operation = params[:state_event] == 'closed' ? :- : :+
[cached_count.to_i, updated_issues_count.to_i].inject(operation)
Issuables::RefreshGroupsCounterWorker.perform_async(issuable_type, current_user.id, group_ids)
end
end
end
......
......@@ -2110,6 +2110,14 @@
:weight: 1
:idempotent:
:tags: []
- :name: issuables_refresh_groups_counter
:feature_category: :issue_tracking
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
- :name: issue_placement
:worker_name: IssuePlacementWorker
:feature_category: :issue_tracking
......
# frozen_string_literal: true
module Issuables
class RefreshGroupsCounterWorker
include ApplicationWorker
idempotent!
urgency :low
feature_category :issue_tracking
# rubocop: disable CodeReuse/ActiveRecord
def perform(type, current_user_id, group_ids = [])
return unless group_ids.any? && issue_type?(type)
current_user = User.find(current_user_id)
groups_with_ancestors = Gitlab::ObjectHierarchy.new(Group.where(id: group_ids)).base_and_ancestors
refresh_cached_count(type, current_user, groups_with_ancestors)
rescue ActiveRecord::RecordNotFound => e
Gitlab::ErrorTracking.log_exception(e, user_id: current_user_id)
end
# rubocop: enable CodeReuse/ActiveRecord
private
def refresh_cached_count(type, user, groups)
groups.each do |group|
count_service = count_service_class(type)&.new(group, user)
next unless count_service&.count_stored?
count_service.refresh_cache_over_threshold
end
end
def count_service_class(type)
Groups::OpenIssuesCountService if issue_type?(type)
end
def issue_type?(type)
type.to_sym == :issue
end
end
end
......@@ -190,6 +190,8 @@
- 1
- - issuable_export_csv
- 1
- - issuables_refresh_groups_counter
- 1
- - issue_placement
- 2
- - issue_rebalancing
......
......@@ -281,72 +281,69 @@ RSpec.describe Issuable::BulkUpdateService, :clean_gitlab_redis_cache do
end
end
context 'when project belongs to a group' do
let_it_be(:group) { create(:group) }
let_it_be(:project2) { create(:project, :repository, group: group) }
let_it_be(:issues) { create_list(:issue, 2, :opened, project: project2) }
let(:parent) { project2 }
let(:count_service_class) { Groups::OpenIssuesCountService }
let(:count_service) { count_service_class.new(group, user) }
describe 'updating issuables cached count' do
shared_examples 'scheduling cached group count refresh' do
it 'schedules worker' do
expect(Issuables::RefreshGroupsCounterWorker).to receive(:perform_async)
before do
project2.add_reporter(user)
bulk_update(issuables, params)
end
end
shared_examples 'refreshing cached open issues count with state updates' do |state|
let(:public_count_key) { count_service.cache_key(count_service_class::PUBLIC_COUNT_KEY) }
let(:existing_cache) { 5 }
shared_examples 'not scheduling cached group count refresh' do
it 'does not schedule worker' do
expect(Issuables::RefreshGroupsCounterWorker).not_to receive(:perform_async)
context 'when cache is empty' do
before do
stub_const("#{count_service_class}::CACHED_COUNT_THRESHOLD", 1)
Rails.cache.delete(public_count_key)
end
bulk_update(issuables, params)
end
end
it 'does not change the counts cache' do
bulk_update(issues, state_event: state)
expect(Rails.cache.read(public_count_key)).to be_nil
end
context 'when project belongs to a group' do
let_it_be(:group_project) { create(:project, :repository, group: create(:group)) }
let_it_be(:issues) { create_list(:issue, 2, :opened, project: group_project) }
let_it_be(:milestone) { create(:milestone, project: project) }
let(:parent) { group_project }
let(:issuables) { issues }
before do
group_project.add_reporter(user)
end
context 'when new issues count is over cache threshold' do
before do
stub_const("#{count_service_class}::CACHED_COUNT_THRESHOLD", 1)
context 'when updating issues state' do
it_behaves_like 'scheduling cached group count refresh' do
let(:params) { { state_event: 'closed' } }
end
it 'updates existing issues counts cache' do
Rails.cache.write(public_count_key, existing_cache)
bulk_update(issues, state_event: state)
expect(Rails.cache.read(public_count_key)).to eq(adjusted_count)
it_behaves_like 'scheduling cached group count refresh' do
let(:params) { { state_event: 'reopened' } }
end
end
context 'when new issues count is under cache threshold' do
before do
stub_const("#{count_service_class}::CACHED_COUNT_THRESHOLD", 10)
context 'when state is not updated' do
it_behaves_like 'not scheduling cached group count refresh' do
let(:params) { { milestone_id: milestone.id } }
end
end
it 'deletes existing cache' do
Rails.cache.write(public_count_key, existing_cache)
bulk_update(issues, state_event: state)
expect(Rails.cache.read(public_count_key)).to be_nil
context 'when issuable type is not :issue' do
it_behaves_like 'not scheduling cached group count refresh' do
let(:params) { { state_event: 'closed' } }
let(:issuables) { [create(:merge_request, source_project: project, source_branch: 'branch-1')] }
end
end
end
context 'when closing issues', :use_clean_rails_memory_store_caching do
it_behaves_like 'refreshing cached open issues count with state updates', 'closed' do
let(:adjusted_count) { existing_cache - issues.size }
end
end
context 'when project belongs to a user namespace' do
let(:issuables) { create_list(:issue, 2, :opened, project: project) }
context 'when reopening issues', :use_clean_rails_memory_store_caching do
before do
issues.map { |issue| issue.update!(state: 'closed') }
end
context 'when updating issues state' do
it_behaves_like 'not scheduling cached group count refresh' do
let(:params) { { state_event: 'closed' } }
end
it_behaves_like 'refreshing cached open issues count with state updates', 'reopened' do
let(:adjusted_count) { existing_cache + issues.size }
it_behaves_like 'not scheduling cached group count refresh' do
let(:params) { { state_event: 'reopened' } }
end
end
end
end
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Issuables::RefreshGroupsCounterWorker do
describe '#perform' do
let_it_be(:user) { create(:user) }
let_it_be(:parent_group) { create(:group) }
let_it_be(:root_group) { create(:group, parent: parent_group) }
let_it_be(:subgroup) { create(:group, parent: root_group) }
let(:count_service) { Groups::OpenIssuesCountService }
let(:type) { 'issue' }
let(:group_ids) { [root_group.id] }
shared_examples 'a worker that takes no action' do
it 'does not call count service or rise error' do
expect(count_service).not_to receive(:new)
expect(Gitlab::ErrorTracking).not_to receive(:log_exception)
described_class.new.perform(type, user.id, group_ids)
end
end
it 'anticipates the inability to find the issue' do
expect(Gitlab::ErrorTracking).to receive(:log_exception)
.with(ActiveRecord::RecordNotFound, include(user_id: -1))
expect(count_service).not_to receive(:new)
described_class.new.perform(type, -1, group_ids)
end
context 'when group_ids is empty' do
let(:group_ids) { [] }
it_behaves_like 'a worker that takes no action'
end
context 'when type is not issue' do
let(:type) { 'merge_request' }
it_behaves_like 'a worker that takes no action'
end
context 'when updating cache' do
let(:instance1) { instance_double(count_service) }
let(:instance2) { instance_double(count_service) }
context 'with existing cached value' do
before do
allow(instance1).to receive(:count_stored?).and_return(true)
allow(instance2).to receive(:count_stored?).and_return(true)
end
it_behaves_like 'an idempotent worker' do
let(:job_args) { [type, user.id, group_ids] }
let(:exec_times) { IdempotentWorkerHelper::WORKER_EXEC_TIMES }
it 'refreshes the issue count in given groups and ancestors' do
expect(count_service).to receive(:new)
.exactly(exec_times).times.with(root_group, user).and_return(instance1)
expect(count_service).to receive(:new)
.exactly(exec_times).times.with(parent_group, user).and_return(instance2)
expect(count_service).not_to receive(:new).with(subgroup, user)
[instance1, instance2].all? do |instance|
expect(instance).to receive(:refresh_cache_over_threshold).exactly(exec_times).times
end
subject
end
end
end
context 'with no cached value' do
before do
allow(instance1).to receive(:count_stored?).and_return(false)
allow(instance2).to receive(:count_stored?).and_return(false)
end
it 'refreshes the issue count in given groups and ancestors' do
expect(count_service).to receive(:new).with(root_group, user).and_return(instance1)
expect(count_service).to receive(:new).with(parent_group, user).and_return(instance2)
[instance1, instance2].all? {|i| expect(i).not_to receive(:refresh_cache_over_threshold) }
described_class.new.perform(type, user.id, group_ids)
end
end
end
end
end
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment