Contributors: 2
Author Tokens Token Proportion Commits Commit Proportion
Matthew Sakai 936 98.11% 1 14.29%
Mike Snitzer 18 1.89% 6 85.71%
Total 954 7


// SPDX-License-Identifier: GPL-2.0-only
/*
 * Copyright 2023 Red Hat
 */

#include "funnel-requestqueue.h"

#include <linux/atomic.h>
#include <linux/compiler.h>
#include <linux/wait.h>

#include "funnel-queue.h"
#include "logger.h"
#include "memory-alloc.h"
#include "thread-utils.h"

/*
 * This queue will attempt to handle requests in reasonably sized batches instead of reacting
 * immediately to each new request. The wait time between batches is dynamically adjusted up or
 * down to try to balance responsiveness against wasted thread run time.
 *
 * If the wait time becomes long enough, the queue will become dormant and must be explicitly
 * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
 * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
 * wakeup of the worker thread.
 *
 * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
 * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
 * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
 * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
 * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
 * kick the worker thread out of dormant mode and back into timer-based mode.
 *
 * Unbatched requests are used to communicate between different zone threads and will also cause
 * the queue to awaken immediately.
 */

enum {
	NANOSECOND = 1,
	MICROSECOND = 1000 * NANOSECOND,
	MILLISECOND = 1000 * MICROSECOND,
	DEFAULT_WAIT_TIME = 20 * MICROSECOND,
	MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
	MAXIMUM_WAIT_TIME = MILLISECOND,
	MINIMUM_BATCH = 32,
	MAXIMUM_BATCH = 64,
};

struct uds_request_queue {
	/* Wait queue for synchronizing producers and consumer */
	struct wait_queue_head wait_head;
	/* Function to process a request */
	uds_request_queue_processor_fn processor;
	/* Queue of new incoming requests */
	struct funnel_queue *main_queue;
	/* Queue of old requests to retry */
	struct funnel_queue *retry_queue;
	/* The thread id of the worker thread */
	struct thread *thread;
	/* True if the worker was started */
	bool started;
	/* When true, requests can be enqueued */
	bool running;
	/* A flag set when the worker is waiting without a timeout */
	atomic_t dormant;
};

static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
{
	struct funnel_queue_entry *entry;

	entry = vdo_funnel_queue_poll(queue->retry_queue);
	if (entry != NULL)
		return container_of(entry, struct uds_request, queue_link);

	entry = vdo_funnel_queue_poll(queue->main_queue);
	if (entry != NULL)
		return container_of(entry, struct uds_request, queue_link);

	return NULL;
}

static inline bool are_queues_idle(struct uds_request_queue *queue)
{
	return vdo_is_funnel_queue_idle(queue->retry_queue) &&
	       vdo_is_funnel_queue_idle(queue->main_queue);
}

/*
 * Determine if there is a next request to process, and return it if there is. Also return flags
 * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
 * the thread did sleep before returning a new request.
 */
static inline bool dequeue_request(struct uds_request_queue *queue,
				   struct uds_request **request_ptr, bool *waited_ptr)
{
	struct uds_request *request = poll_queues(queue);

	if (request != NULL) {
		*request_ptr = request;
		return true;
	}

	if (!READ_ONCE(queue->running)) {
		/* Wake the worker thread so it can exit. */
		*request_ptr = NULL;
		return true;
	}

	*request_ptr = NULL;
	*waited_ptr = true;
	return false;
}

static void wait_for_request(struct uds_request_queue *queue, bool dormant,
			     unsigned long timeout, struct uds_request **request,
			     bool *waited)
{
	if (dormant) {
		wait_event_interruptible(queue->wait_head,
					 (dequeue_request(queue, request, waited) ||
					  !are_queues_idle(queue)));
		return;
	}

	wait_event_interruptible_hrtimeout(queue->wait_head,
					   dequeue_request(queue, request, waited),
					   ns_to_ktime(timeout));
}

static void request_queue_worker(void *arg)
{
	struct uds_request_queue *queue = arg;
	struct uds_request *request = NULL;
	unsigned long time_batch = DEFAULT_WAIT_TIME;
	bool dormant = atomic_read(&queue->dormant);
	bool waited = false;
	long current_batch = 0;

	for (;;) {
		wait_for_request(queue, dormant, time_batch, &request, &waited);
		if (likely(request != NULL)) {
			current_batch++;
			queue->processor(request);
		} else if (!READ_ONCE(queue->running)) {
			break;
		}

		if (dormant) {
			/*
			 * The queue has been roused from dormancy. Clear the flag so enqueuers can
			 * stop broadcasting. No fence is needed for this transition.
			 */
			atomic_set(&queue->dormant, false);
			dormant = false;
			time_batch = DEFAULT_WAIT_TIME;
		} else if (waited) {
			/*
			 * We waited for this request to show up. Adjust the wait time to smooth
			 * out the batch size.
			 */
			if (current_batch < MINIMUM_BATCH) {
				/*
				 * If the last batch of requests was too small, increase the wait
				 * time.
				 */
				time_batch += time_batch / 4;
				if (time_batch >= MAXIMUM_WAIT_TIME) {
					atomic_set(&queue->dormant, true);
					dormant = true;
				}
			} else if (current_batch > MAXIMUM_BATCH) {
				/*
				 * If the last batch of requests was too large, decrease the wait
				 * time.
				 */
				time_batch -= time_batch / 4;
				if (time_batch < MINIMUM_WAIT_TIME)
					time_batch = MINIMUM_WAIT_TIME;
			}
			current_batch = 0;
		}
	}

	/*
	 * Ensure that we process any remaining requests that were enqueued before trying to shut
	 * down. The corresponding write barrier is in uds_request_queue_finish().
	 */
	smp_rmb();
	while ((request = poll_queues(queue)) != NULL)
		queue->processor(request);
}

int uds_make_request_queue(const char *queue_name,
			   uds_request_queue_processor_fn processor,
			   struct uds_request_queue **queue_ptr)
{
	int result;
	struct uds_request_queue *queue;

	result = vdo_allocate(1, struct uds_request_queue, __func__, &queue);
	if (result != VDO_SUCCESS)
		return result;

	queue->processor = processor;
	queue->running = true;
	atomic_set(&queue->dormant, false);
	init_waitqueue_head(&queue->wait_head);

	result = vdo_make_funnel_queue(&queue->main_queue);
	if (result != VDO_SUCCESS) {
		uds_request_queue_finish(queue);
		return result;
	}

	result = vdo_make_funnel_queue(&queue->retry_queue);
	if (result != VDO_SUCCESS) {
		uds_request_queue_finish(queue);
		return result;
	}

	result = vdo_create_thread(request_queue_worker, queue, queue_name,
				   &queue->thread);
	if (result != VDO_SUCCESS) {
		uds_request_queue_finish(queue);
		return result;
	}

	queue->started = true;
	*queue_ptr = queue;
	return UDS_SUCCESS;
}

static inline void wake_up_worker(struct uds_request_queue *queue)
{
	if (wq_has_sleeper(&queue->wait_head))
		wake_up(&queue->wait_head);
}

void uds_request_queue_enqueue(struct uds_request_queue *queue,
			       struct uds_request *request)
{
	struct funnel_queue *sub_queue;
	bool unbatched = request->unbatched;

	sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
	vdo_funnel_queue_put(sub_queue, &request->queue_link);

	/*
	 * We must wake the worker thread when it is dormant. A read fence isn't needed here since
	 * we know the queue operation acts as one.
	 */
	if (atomic_read(&queue->dormant) || unbatched)
		wake_up_worker(queue);
}

void uds_request_queue_finish(struct uds_request_queue *queue)
{
	if (queue == NULL)
		return;

	/*
	 * This memory barrier ensures that any requests we queued will be seen. The point is that
	 * when dequeue_request() sees the following update to the running flag, it will also be
	 * able to see any change we made to a next field in the funnel queue entry. The
	 * corresponding read barrier is in request_queue_worker().
	 */
	smp_wmb();
	WRITE_ONCE(queue->running, false);

	if (queue->started) {
		wake_up_worker(queue);
		vdo_join_threads(queue->thread);
	}

	vdo_free_funnel_queue(queue->main_queue);
	vdo_free_funnel_queue(queue->retry_queue);
	vdo_free(queue);
}