Commit d9e894d9 authored by Matthew Sakai's avatar Matthew Sakai Committed by Mike Snitzer

dm vdo: add specialized request queueing functionality

This patch adds funnel_queue, a mostly lock-free multi-producer,
single-consumer queue. It also adds the request queue used by the dm-vdo
deduplication index, and the work_queue used by the dm-vdo data store. Both
of these are built on top of funnel queue and are intended to support the
dispatching of many short-running tasks. The work_queue also supports
priorities. Finally, this patch adds vdo_completion, the structure which is
enqueued on work_queues.
Co-developed-by: default avatarJ. corwin Coburn <corwin@hurlbutnet.net>
Signed-off-by: default avatarJ. corwin Coburn <corwin@hurlbutnet.net>
Co-developed-by: default avatarMichael Sclafani <dm-devel@lists.linux.dev>
Signed-off-by: default avatarMichael Sclafani <dm-devel@lists.linux.dev>
Co-developed-by: default avatarSweet Tea Dorminy <sweettea-kernel@dorminy.me>
Signed-off-by: default avatarSweet Tea Dorminy <sweettea-kernel@dorminy.me>
Co-developed-by: default avatarKen Raeburn <raeburn@redhat.com>
Signed-off-by: default avatarKen Raeburn <raeburn@redhat.com>
Signed-off-by: default avatarMatthew Sakai <msakai@redhat.com>
Signed-off-by: default avatarMike Snitzer <snitzer@kernel.org>
parent 89f9b701
// SPDX-License-Identifier: GPL-2.0-only
/*
* Copyright 2023 Red Hat
*/
#include "completion.h"
#include <linux/kernel.h>
#include "logger.h"
#include "permassert.h"
#include "status-codes.h"
#include "types.h"
#include "vio.h"
#include "vdo.h"
/**
* DOC: vdo completions.
*
* Most of vdo's data structures are lock free, each either belonging to a single "zone," or
* divided into a number of zones whose accesses to the structure do not overlap. During normal
* operation, at most one thread will be operating in any given zone. Each zone has a
* vdo_work_queue which holds vdo_completions that are to be run in that zone. A completion may
* only be enqueued on one queue or operating in a single zone at a time.
*
* At each step of a multi-threaded operation, the completion performing the operation is given a
* callback, error handler, and thread id for the next step. A completion is "run" when it is
* operating on the correct thread (as specified by its callback_thread_id). If the value of its
* "result" field is an error (i.e. not VDO_SUCCESS), the function in its "error_handler" will be
* invoked. If the error_handler is NULL, or there is no error, the function set as its "callback"
* will be invoked. Generally, a completion will not be run directly, but rather will be
* "launched." In this case, it will check whether it is operating on the correct thread. If it is,
* it will run immediately. Otherwise, it will be enqueue on the vdo_work_queue associated with the
* completion's "callback_thread_id". When it is dequeued, it will be on the correct thread, and
* will get run. In some cases, the completion should get queued instead of running immediately,
* even if it is being launched from the correct thread. This is usually in cases where there is a
* long chain of callbacks, all on the same thread, which could overflow the stack. In such cases,
* the completion's "requeue" field should be set to true. Doing so will skip the current thread
* check and simply enqueue the completion.
*
* A completion may be "finished," in which case its "complete" field will be set to true before it
* is next run. It is a bug to attempt to set the result or re-finish a finished completion.
* Because a completion's fields are not safe to examine from any thread other than the one on
* which the completion is currently operating, this field is used only to aid in detecting
* programming errors. It can not be used for cross-thread checking on the status of an operation.
* A completion must be "reset" before it can be reused after it has been finished. Resetting will
* also clear any error from the result field.
**/
void vdo_initialize_completion(struct vdo_completion *completion,
struct vdo *vdo,
enum vdo_completion_type type)
{
memset(completion, 0, sizeof(*completion));
completion->vdo = vdo;
completion->type = type;
vdo_reset_completion(completion);
}
static inline void assert_incomplete(struct vdo_completion *completion)
{
ASSERT_LOG_ONLY(!completion->complete, "completion is not complete");
}
/**
* vdo_set_completion_result() - Set the result of a completion.
*
* Older errors will not be masked.
*/
void vdo_set_completion_result(struct vdo_completion *completion, int result)
{
assert_incomplete(completion);
if (completion->result == VDO_SUCCESS)
completion->result = result;
}
/**
* vdo_launch_completion_with_priority() - Run or enqueue a completion.
* @priority: The priority at which to enqueue the completion.
*
* If called on the correct thread (i.e. the one specified in the completion's callback_thread_id
* field) and not marked for requeue, the completion will be run immediately. Otherwise, the
* completion will be enqueued on the specified thread.
*/
void vdo_launch_completion_with_priority(struct vdo_completion *completion,
enum vdo_completion_priority priority)
{
thread_id_t callback_thread = completion->callback_thread_id;
if (completion->requeue || (callback_thread != vdo_get_callback_thread_id())) {
vdo_enqueue_completion(completion, priority);
return;
}
vdo_run_completion(completion);
}
/** vdo_finish_completion() - Mark a completion as complete and then launch it. */
void vdo_finish_completion(struct vdo_completion *completion)
{
assert_incomplete(completion);
completion->complete = true;
if (completion->callback != NULL)
vdo_launch_completion(completion);
}
void vdo_enqueue_completion(struct vdo_completion *completion,
enum vdo_completion_priority priority)
{
struct vdo *vdo = completion->vdo;
thread_id_t thread_id = completion->callback_thread_id;
if (ASSERT(thread_id < vdo->thread_config.thread_count,
"thread_id %u (completion type %d) is less than thread count %u",
thread_id, completion->type,
vdo->thread_config.thread_count) != UDS_SUCCESS)
BUG();
completion->requeue = false;
completion->priority = priority;
completion->my_queue = NULL;
vdo_enqueue_work_queue(vdo->threads[thread_id].queue, completion);
}
/**
* vdo_requeue_completion_if_needed() - Requeue a completion if not called on the specified thread.
*
* Return: True if the completion was requeued; callers may not access the completion in this case.
*/
bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
thread_id_t callback_thread_id)
{
if (vdo_get_callback_thread_id() == callback_thread_id)
return false;
completion->callback_thread_id = callback_thread_id;
vdo_enqueue_completion(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
return true;
}
/* SPDX-License-Identifier: GPL-2.0-only */
/*
* Copyright 2023 Red Hat
*/
#ifndef VDO_COMPLETION_H
#define VDO_COMPLETION_H
#include "permassert.h"
#include "status-codes.h"
#include "types.h"
/**
* vdo_run_completion() - Run a completion's callback or error handler on the current thread.
*
* Context: This function must be called from the correct callback thread.
*/
static inline void vdo_run_completion(struct vdo_completion *completion)
{
if ((completion->result != VDO_SUCCESS) && (completion->error_handler != NULL)) {
completion->error_handler(completion);
return;
}
completion->callback(completion);
}
void vdo_set_completion_result(struct vdo_completion *completion, int result);
void vdo_initialize_completion(struct vdo_completion *completion, struct vdo *vdo,
enum vdo_completion_type type);
/**
* vdo_reset_completion() - Reset a completion to a clean state, while keeping the type, vdo and
* parent information.
*/
static inline void vdo_reset_completion(struct vdo_completion *completion)
{
completion->result = VDO_SUCCESS;
completion->complete = false;
}
void vdo_launch_completion_with_priority(struct vdo_completion *completion,
enum vdo_completion_priority priority);
/**
* vdo_launch_completion() - Launch a completion with default priority.
*/
static inline void vdo_launch_completion(struct vdo_completion *completion)
{
vdo_launch_completion_with_priority(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
}
/**
* vdo_continue_completion() - Continue processing a completion.
* @result: The current result (will not mask older errors).
*
* Continue processing a completion by setting the current result and calling
* vdo_launch_completion().
*/
static inline void vdo_continue_completion(struct vdo_completion *completion, int result)
{
vdo_set_completion_result(completion, result);
vdo_launch_completion(completion);
}
void vdo_finish_completion(struct vdo_completion *completion);
/**
* vdo_fail_completion() - Set the result of a completion if it does not already have an error,
* then finish it.
*/
static inline void vdo_fail_completion(struct vdo_completion *completion, int result)
{
vdo_set_completion_result(completion, result);
vdo_finish_completion(completion);
}
/**
* vdo_assert_completion_type() - Assert that a completion is of the correct type.
*
* Return: VDO_SUCCESS or an error
*/
static inline int vdo_assert_completion_type(struct vdo_completion *completion,
enum vdo_completion_type expected)
{
return ASSERT(expected == completion->type,
"completion type should be %u, not %u", expected,
completion->type);
}
static inline void vdo_set_completion_callback(struct vdo_completion *completion,
vdo_action_fn callback,
thread_id_t callback_thread_id)
{
completion->callback = callback;
completion->callback_thread_id = callback_thread_id;
}
/**
* vdo_launch_completion_callback() - Set the callback for a completion and launch it immediately.
*/
static inline void vdo_launch_completion_callback(struct vdo_completion *completion,
vdo_action_fn callback,
thread_id_t callback_thread_id)
{
vdo_set_completion_callback(completion, callback, callback_thread_id);
vdo_launch_completion(completion);
}
/**
* vdo_prepare_completion() - Prepare a completion for launch.
*
* Resets the completion, and then sets its callback, error handler, callback thread, and parent.
*/
static inline void vdo_prepare_completion(struct vdo_completion *completion,
vdo_action_fn callback,
vdo_action_fn error_handler,
thread_id_t callback_thread_id, void *parent)
{
vdo_reset_completion(completion);
vdo_set_completion_callback(completion, callback, callback_thread_id);
completion->error_handler = error_handler;
completion->parent = parent;
}
/**
* vdo_prepare_completion_for_requeue() - Prepare a completion for launch ensuring that it will
* always be requeued.
*
* Resets the completion, and then sets its callback, error handler, callback thread, and parent.
*/
static inline void vdo_prepare_completion_for_requeue(struct vdo_completion *completion,
vdo_action_fn callback,
vdo_action_fn error_handler,
thread_id_t callback_thread_id,
void *parent)
{
vdo_prepare_completion(completion, callback, error_handler,
callback_thread_id, parent);
completion->requeue = true;
}
void vdo_enqueue_completion(struct vdo_completion *completion,
enum vdo_completion_priority priority);
bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
thread_id_t callback_thread_id);
#endif /* VDO_COMPLETION_H */
/* SPDX-License-Identifier: GPL-2.0-only */
/*
* Copyright 2023 Red Hat
*/
#ifndef UDS_CPU_H
#define UDS_CPU_H
#include <linux/cache.h>
/**
* uds_prefetch_address() - Minimize cache-miss latency by attempting to move data into a CPU cache
* before it is accessed.
*
* @address: the address to fetch (may be invalid)
* @for_write: must be constant at compile time--false if for reading, true if for writing
*/
static inline void uds_prefetch_address(const void *address, bool for_write)
{
/*
* for_write won't be a constant if we are compiled with optimization turned off, in which
* case prefetching really doesn't matter. clang can't figure out that if for_write is a
* constant, it can be passed as the second, mandatorily constant argument to prefetch(),
* at least currently on llvm 12.
*/
if (__builtin_constant_p(for_write)) {
if (for_write)
__builtin_prefetch(address, true);
else
__builtin_prefetch(address, false);
}
}
/**
* uds_prefetch_range() - Minimize cache-miss latency by attempting to move a range of addresses
* into a CPU cache before they are accessed.
*
* @start: the starting address to fetch (may be invalid)
* @size: the number of bytes in the address range
* @for_write: must be constant at compile time--false if for reading, true if for writing
*/
static inline void uds_prefetch_range(const void *start, unsigned int size,
bool for_write)
{
/*
* Count the number of cache lines to fetch, allowing for the address range to span an
* extra cache line boundary due to address alignment.
*/
const char *address = (const char *) start;
unsigned int offset = ((uintptr_t) address % L1_CACHE_BYTES);
unsigned int cache_lines = (1 + ((size + offset) / L1_CACHE_BYTES));
while (cache_lines-- > 0) {
uds_prefetch_address(address, for_write);
address += L1_CACHE_BYTES;
}
}
#endif /* UDS_CPU_H */
// SPDX-License-Identifier: GPL-2.0-only
/*
* Copyright 2023 Red Hat
*/
#include "funnel-queue.h"
#include "cpu.h"
#include "memory-alloc.h"
#include "permassert.h"
#include "uds.h"
int uds_make_funnel_queue(struct funnel_queue **queue_ptr)
{
int result;
struct funnel_queue *queue;
result = uds_allocate(1, struct funnel_queue, "funnel queue", &queue);
if (result != UDS_SUCCESS)
return result;
/*
* Initialize the stub entry and put it in the queue, establishing the invariant that
* queue->newest and queue->oldest are never null.
*/
queue->stub.next = NULL;
queue->newest = &queue->stub;
queue->oldest = &queue->stub;
*queue_ptr = queue;
return UDS_SUCCESS;
}
void uds_free_funnel_queue(struct funnel_queue *queue)
{
uds_free(queue);
}
static struct funnel_queue_entry *get_oldest(struct funnel_queue *queue)
{
/*
* Barrier requirements: We need a read barrier between reading a "next" field pointer
* value and reading anything it points to. There's an accompanying barrier in
* uds_funnel_queue_put() between its caller setting up the entry and making it visible.
*/
struct funnel_queue_entry *oldest = queue->oldest;
struct funnel_queue_entry *next = READ_ONCE(oldest->next);
if (oldest == &queue->stub) {
/*
* When the oldest entry is the stub and it has no successor, the queue is
* logically empty.
*/
if (next == NULL)
return NULL;
/*
* The stub entry has a successor, so the stub can be dequeued and ignored without
* breaking the queue invariants.
*/
oldest = next;
queue->oldest = oldest;
next = READ_ONCE(oldest->next);
}
/*
* We have a non-stub candidate to dequeue. If it lacks a successor, we'll need to put the
* stub entry back on the queue first.
*/
if (next == NULL) {
struct funnel_queue_entry *newest = READ_ONCE(queue->newest);
if (oldest != newest) {
/*
* Another thread has already swung queue->newest atomically, but not yet
* assigned previous->next. The queue is really still empty.
*/
return NULL;
}
/*
* Put the stub entry back on the queue, ensuring a successor will eventually be
* seen.
*/
uds_funnel_queue_put(queue, &queue->stub);
/* Check again for a successor. */
next = READ_ONCE(oldest->next);
if (next == NULL) {
/*
* We lost a race with a producer who swapped queue->newest before we did,
* but who hasn't yet updated previous->next. Try again later.
*/
return NULL;
}
}
return oldest;
}
/*
* Poll a queue, removing the oldest entry if the queue is not empty. This function must only be
* called from a single consumer thread.
*/
struct funnel_queue_entry *uds_funnel_queue_poll(struct funnel_queue *queue)
{
struct funnel_queue_entry *oldest = get_oldest(queue);
if (oldest == NULL)
return oldest;
/*
* Dequeue the oldest entry and return it. Only one consumer thread may call this function,
* so no locking, atomic operations, or fences are needed; queue->oldest is owned by the
* consumer and oldest->next is never used by a producer thread after it is swung from NULL
* to non-NULL.
*/
queue->oldest = READ_ONCE(oldest->next);
/*
* Make sure the caller sees the proper stored data for this entry. Since we've already
* fetched the entry pointer we stored in "queue->oldest", this also ensures that on entry
* to the next call we'll properly see the dependent data.
*/
smp_rmb();
/*
* If "oldest" is a very light-weight work item, we'll be looking for the next one very
* soon, so prefetch it now.
*/
uds_prefetch_address(queue->oldest, true);
WRITE_ONCE(oldest->next, NULL);
return oldest;
}
/*
* Check whether the funnel queue is empty or not. If the queue is in a transition state with one
* or more entries being added such that the list view is incomplete, this function will report the
* queue as empty.
*/
bool uds_is_funnel_queue_empty(struct funnel_queue *queue)
{
return get_oldest(queue) == NULL;
}
/*
* Check whether the funnel queue is idle or not. If the queue has entries available to be
* retrieved, it is not idle. If the queue is in a transition state with one or more entries being
* added such that the list view is incomplete, it may not be possible to retrieve an entry with
* the uds_funnel_queue_poll() function, but the queue will not be considered idle.
*/
bool uds_is_funnel_queue_idle(struct funnel_queue *queue)
{
/*
* Oldest is not the stub, so there's another entry, though if next is NULL we can't
* retrieve it yet.
*/
if (queue->oldest != &queue->stub)
return false;
/*
* Oldest is the stub, but newest has been updated by _put(); either there's another,
* retrievable entry in the list, or the list is officially empty but in the intermediate
* state of having an entry added.
*
* Whether anything is retrievable depends on whether stub.next has been updated and become
* visible to us, but for idleness we don't care. And due to memory ordering in _put(), the
* update to newest would be visible to us at the same time or sooner.
*/
if (READ_ONCE(queue->newest) != &queue->stub)
return false;
return true;
}
/* SPDX-License-Identifier: GPL-2.0-only */
/*
* Copyright 2023 Red Hat
*/
#ifndef UDS_FUNNEL_QUEUE_H
#define UDS_FUNNEL_QUEUE_H
#include <linux/atomic.h>
#include <linux/cache.h>
/*
* A funnel queue is a simple (almost) lock-free queue that accepts entries from multiple threads
* (multi-producer) and delivers them to a single thread (single-consumer). "Funnel" is an attempt
* to evoke the image of requests from more than one producer being "funneled down" to a single
* consumer.
*
* This is an unsynchronized but thread-safe data structure when used as intended. There is no
* mechanism to ensure that only one thread is consuming from the queue. If more than one thread
* attempts to consume from the queue, the resulting behavior is undefined. Clients must not
* directly access or manipulate the internals of the queue, which are only exposed for the purpose
* of allowing the very simple enqueue operation to be inlined.
*
* The implementation requires that a funnel_queue_entry structure (a link pointer) is embedded in
* the queue entries, and pointers to those structures are used exclusively by the queue. No macros
* are defined to template the queue, so the offset of the funnel_queue_entry in the records placed
* in the queue must all be the same so the client can derive their structure pointer from the
* entry pointer returned by uds_funnel_queue_poll().
*
* Callers are wholly responsible for allocating and freeing the entries. Entries may be freed as
* soon as they are returned since this queue is not susceptible to the "ABA problem" present in
* many lock-free data structures. The queue is dynamically allocated to ensure cache-line
* alignment, but no other dynamic allocation is used.
*
* The algorithm is not actually 100% lock-free. There is a single point in uds_funnel_queue_put()
* at which a preempted producer will prevent the consumers from seeing items added to the queue by
* later producers, and only if the queue is short enough or the consumer fast enough for it to
* reach what was the end of the queue at the time of the preemption.
*
* The consumer function, uds_funnel_queue_poll(), will return NULL when the queue is empty. To
* wait for data to consume, spin (if safe) or combine the queue with a struct event_count to
* signal the presence of new entries.
*/
/* This queue link structure must be embedded in client entries. */
struct funnel_queue_entry {
/* The next (newer) entry in the queue. */
struct funnel_queue_entry *next;
};
/*
* The dynamically allocated queue structure, which is allocated on a cache line boundary so the
* producer and consumer fields in the structure will land on separate cache lines. This should be
* consider opaque but it is exposed here so uds_funnel_queue_put() can be inlined.
*/
struct __aligned(L1_CACHE_BYTES) funnel_queue {
/*
* The producers' end of the queue, an atomically exchanged pointer that will never be
* NULL.
*/
struct funnel_queue_entry *newest;
/* The consumer's end of the queue, which is owned by the consumer and never NULL. */
struct funnel_queue_entry *oldest __aligned(L1_CACHE_BYTES);
/* A dummy entry used to provide the non-NULL invariants above. */
struct funnel_queue_entry stub;
};
int __must_check uds_make_funnel_queue(struct funnel_queue **queue_ptr);
void uds_free_funnel_queue(struct funnel_queue *queue);
/*
* Put an entry on the end of the queue.
*
* The entry pointer must be to the struct funnel_queue_entry embedded in the caller's data
* structure. The caller must be able to derive the address of the start of their data structure
* from the pointer that passed in here, so every entry in the queue must have the struct
* funnel_queue_entry at the same offset within the client's structure.
*/
static inline void uds_funnel_queue_put(struct funnel_queue *queue,
struct funnel_queue_entry *entry)
{
struct funnel_queue_entry *previous;
/*
* Barrier requirements: All stores relating to the entry ("next" pointer, containing data
* structure fields) must happen before the previous->next store making it visible to the
* consumer. Also, the entry's "next" field initialization to NULL must happen before any
* other producer threads can see the entry (the xchg) and try to update the "next" field.
*
* xchg implements a full barrier.
*/
WRITE_ONCE(entry->next, NULL);
previous = xchg(&queue->newest, entry);
/*
* Preemptions between these two statements hide the rest of the queue from the consumer,
* preventing consumption until the following assignment runs.
*/
WRITE_ONCE(previous->next, entry);
}
struct funnel_queue_entry *__must_check uds_funnel_queue_poll(struct funnel_queue *queue);
bool __must_check uds_is_funnel_queue_empty(struct funnel_queue *queue);
bool __must_check uds_is_funnel_queue_idle(struct funnel_queue *queue);
#endif /* UDS_FUNNEL_QUEUE_H */
// SPDX-License-Identifier: GPL-2.0-only
/*
* Copyright 2023 Red Hat
*/
#include "funnel-requestqueue.h"
#include <linux/atomic.h>
#include <linux/compiler.h>
#include <linux/wait.h>
#include "funnel-queue.h"
#include "logger.h"
#include "memory-alloc.h"
#include "uds-threads.h"
/*
* This queue will attempt to handle requests in reasonably sized batches instead of reacting
* immediately to each new request. The wait time between batches is dynamically adjusted up or
* down to try to balance responsiveness against wasted thread run time.
*
* If the wait time becomes long enough, the queue will become dormant and must be explicitly
* awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
* queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
* wakeup of the worker thread.
*
* When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
* decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
* going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
* more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
* queue's "next" field update isn't visible yet to make the entry accessible, its existence will
* kick the worker thread out of dormant mode and back into timer-based mode.
*
* Unbatched requests are used to communicate between different zone threads and will also cause
* the queue to awaken immediately.
*/
enum {
NANOSECOND = 1,
MICROSECOND = 1000 * NANOSECOND,
MILLISECOND = 1000 * MICROSECOND,
DEFAULT_WAIT_TIME = 20 * MICROSECOND,
MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
MAXIMUM_WAIT_TIME = MILLISECOND,
MINIMUM_BATCH = 32,
MAXIMUM_BATCH = 64,
};
struct uds_request_queue {
/* Wait queue for synchronizing producers and consumer */
struct wait_queue_head wait_head;
/* Function to process a request */
uds_request_queue_processor_fn processor;
/* Queue of new incoming requests */
struct funnel_queue *main_queue;
/* Queue of old requests to retry */
struct funnel_queue *retry_queue;
/* The thread id of the worker thread */
struct thread *thread;
/* True if the worker was started */
bool started;
/* When true, requests can be enqueued */
bool running;
/* A flag set when the worker is waiting without a timeout */
atomic_t dormant;
};
static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
{
struct funnel_queue_entry *entry;
entry = uds_funnel_queue_poll(queue->retry_queue);
if (entry != NULL)
return container_of(entry, struct uds_request, queue_link);
entry = uds_funnel_queue_poll(queue->main_queue);
if (entry != NULL)
return container_of(entry, struct uds_request, queue_link);
return NULL;
}
static inline bool are_queues_idle(struct uds_request_queue *queue)
{
return uds_is_funnel_queue_idle(queue->retry_queue) &&
uds_is_funnel_queue_idle(queue->main_queue);
}
/*
* Determine if there is a next request to process, and return it if there is. Also return flags
* indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
* the thread did sleep before returning a new request.
*/
static inline bool dequeue_request(struct uds_request_queue *queue,
struct uds_request **request_ptr, bool *waited_ptr)
{
struct uds_request *request = poll_queues(queue);
if (request != NULL) {
*request_ptr = request;
return true;
}
if (!READ_ONCE(queue->running)) {
/* Wake the worker thread so it can exit. */
*request_ptr = NULL;
return true;
}
*request_ptr = NULL;
*waited_ptr = true;
return false;
}
static void wait_for_request(struct uds_request_queue *queue, bool dormant,
unsigned long timeout, struct uds_request **request,
bool *waited)
{
if (dormant) {
wait_event_interruptible(queue->wait_head,
(dequeue_request(queue, request, waited) ||
!are_queues_idle(queue)));
return;
}
wait_event_interruptible_hrtimeout(queue->wait_head,
dequeue_request(queue, request, waited),
ns_to_ktime(timeout));
}
static void request_queue_worker(void *arg)
{
struct uds_request_queue *queue = arg;
struct uds_request *request = NULL;
unsigned long time_batch = DEFAULT_WAIT_TIME;
bool dormant = atomic_read(&queue->dormant);
bool waited = false;
long current_batch = 0;
for (;;) {
wait_for_request(queue, dormant, time_batch, &request, &waited);
if (likely(request != NULL)) {
current_batch++;
queue->processor(request);
} else if (!READ_ONCE(queue->running)) {
break;
}
if (dormant) {
/*
* The queue has been roused from dormancy. Clear the flag so enqueuers can
* stop broadcasting. No fence is needed for this transition.
*/
atomic_set(&queue->dormant, false);
dormant = false;
time_batch = DEFAULT_WAIT_TIME;
} else if (waited) {
/*
* We waited for this request to show up. Adjust the wait time to smooth
* out the batch size.
*/
if (current_batch < MINIMUM_BATCH) {
/*
* If the last batch of requests was too small, increase the wait
* time.
*/
time_batch += time_batch / 4;
if (time_batch >= MAXIMUM_WAIT_TIME) {
atomic_set(&queue->dormant, true);
dormant = true;
}
} else if (current_batch > MAXIMUM_BATCH) {
/*
* If the last batch of requests was too large, decrease the wait
* time.
*/
time_batch -= time_batch / 4;
if (time_batch < MINIMUM_WAIT_TIME)
time_batch = MINIMUM_WAIT_TIME;
}
current_batch = 0;
}
}
/*
* Ensure that we process any remaining requests that were enqueued before trying to shut
* down. The corresponding write barrier is in uds_request_queue_finish().
*/
smp_rmb();
while ((request = poll_queues(queue)) != NULL)
queue->processor(request);
}
int uds_make_request_queue(const char *queue_name,
uds_request_queue_processor_fn processor,
struct uds_request_queue **queue_ptr)
{
int result;
struct uds_request_queue *queue;
result = uds_allocate(1, struct uds_request_queue, __func__, &queue);
if (result != UDS_SUCCESS)
return result;
queue->processor = processor;
queue->running = true;
atomic_set(&queue->dormant, false);
init_waitqueue_head(&queue->wait_head);
result = uds_make_funnel_queue(&queue->main_queue);
if (result != UDS_SUCCESS) {
uds_request_queue_finish(queue);
return result;
}
result = uds_make_funnel_queue(&queue->retry_queue);
if (result != UDS_SUCCESS) {
uds_request_queue_finish(queue);
return result;
}
result = uds_create_thread(request_queue_worker, queue, queue_name,
&queue->thread);
if (result != UDS_SUCCESS) {
uds_request_queue_finish(queue);
return result;
}
queue->started = true;
*queue_ptr = queue;
return UDS_SUCCESS;
}
static inline void wake_up_worker(struct uds_request_queue *queue)
{
if (wq_has_sleeper(&queue->wait_head))
wake_up(&queue->wait_head);
}
void uds_request_queue_enqueue(struct uds_request_queue *queue,
struct uds_request *request)
{
struct funnel_queue *sub_queue;
bool unbatched = request->unbatched;
sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
uds_funnel_queue_put(sub_queue, &request->queue_link);
/*
* We must wake the worker thread when it is dormant. A read fence isn't needed here since
* we know the queue operation acts as one.
*/
if (atomic_read(&queue->dormant) || unbatched)
wake_up_worker(queue);
}
void uds_request_queue_finish(struct uds_request_queue *queue)
{
int result;
if (queue == NULL)
return;
/*
* This memory barrier ensures that any requests we queued will be seen. The point is that
* when dequeue_request() sees the following update to the running flag, it will also be
* able to see any change we made to a next field in the funnel queue entry. The
* corresponding read barrier is in request_queue_worker().
*/
smp_wmb();
WRITE_ONCE(queue->running, false);
if (queue->started) {
wake_up_worker(queue);
result = uds_join_threads(queue->thread);
if (result != UDS_SUCCESS)
uds_log_warning_strerror(result, "Failed to join worker thread");
}
uds_free_funnel_queue(queue->main_queue);
uds_free_funnel_queue(queue->retry_queue);
uds_free(queue);
}
/* SPDX-License-Identifier: GPL-2.0-only */
/*
* Copyright 2023 Red Hat
*/
#ifndef UDS_REQUEST_QUEUE_H
#define UDS_REQUEST_QUEUE_H
#include "uds.h"
/*
* A simple request queue which will handle new requests in the order in which they are received,
* and will attempt to handle requeued requests before new ones. However, the nature of the
* implementation means that it cannot guarantee this ordering; the prioritization is merely a
* hint.
*/
struct uds_request_queue;
typedef void (*uds_request_queue_processor_fn)(struct uds_request *);
int __must_check uds_make_request_queue(const char *queue_name,
uds_request_queue_processor_fn processor,
struct uds_request_queue **queue_ptr);
void uds_request_queue_enqueue(struct uds_request_queue *queue,
struct uds_request *request);
void uds_request_queue_finish(struct uds_request_queue *queue);
#endif /* UDS_REQUEST_QUEUE_H */
This diff is collapsed.
/* SPDX-License-Identifier: GPL-2.0-only */
/*
* Copyright 2023 Red Hat
*/
#ifndef VDO_WORK_QUEUE_H
#define VDO_WORK_QUEUE_H
#include <linux/sched.h> /* for TASK_COMM_LEN */
#include "types.h"
enum {
MAX_VDO_WORK_QUEUE_NAME_LEN = TASK_COMM_LEN,
};
struct vdo_work_queue_type {
void (*start)(void *context);
void (*finish)(void *context);
enum vdo_completion_priority max_priority;
enum vdo_completion_priority default_priority;
};
struct vdo_completion;
struct vdo_thread;
struct vdo_work_queue;
int vdo_make_work_queue(const char *thread_name_prefix, const char *name,
struct vdo_thread *owner, const struct vdo_work_queue_type *type,
unsigned int thread_count, void *thread_privates[],
struct vdo_work_queue **queue_ptr);
void vdo_enqueue_work_queue(struct vdo_work_queue *queue, struct vdo_completion *completion);
void vdo_finish_work_queue(struct vdo_work_queue *queue);
void vdo_free_work_queue(struct vdo_work_queue *queue);
void vdo_dump_work_queue(struct vdo_work_queue *queue);
void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer,
size_t length);
void *vdo_get_work_queue_private_data(void);
struct vdo_work_queue *vdo_get_current_work_queue(void);
struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue);
bool __must_check vdo_work_queue_type_is(struct vdo_work_queue *queue,
const struct vdo_work_queue_type *type);
#endif /* VDO_WORK_QUEUE_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment