Contributors: 2
Author Tokens Token Proportion Commits Commit Proportion
Matthew Sakai 3105 96.76% 3 25.00%
Mike Snitzer 104 3.24% 9 75.00%
Total 3209 12


// SPDX-License-Identifier: GPL-2.0-only
/*
 * Copyright 2023 Red Hat
 */

#include "index-session.h"

#include <linux/atomic.h>

#include "logger.h"
#include "memory-alloc.h"
#include "time-utils.h"

#include "funnel-requestqueue.h"
#include "index.h"
#include "index-layout.h"

/*
 * The index session contains a lock (the request_mutex) which ensures that only one thread can
 * change the state of its index at a time. The state field indicates the current state of the
 * index through a set of descriptive flags. The request_mutex must be notified whenever a
 * non-transient state flag is cleared. The request_mutex is also used to count the number of
 * requests currently in progress so that they can be drained when suspending or closing the index.
 *
 * If the index session is suspended shortly after opening an index, it may have to suspend during
 * a rebuild. Depending on the size of the index, a rebuild may take a significant amount of time,
 * so UDS allows the rebuild to be paused in order to suspend the session in a timely manner. When
 * the index session is resumed, the rebuild can continue from where it left off. If the index
 * session is shut down with a suspended rebuild, the rebuild progress is abandoned and the rebuild
 * will start from the beginning the next time the index is loaded. The mutex and status fields in
 * the index_load_context are used to record the state of any interrupted rebuild.
 */

enum index_session_flag_bit {
	IS_FLAG_BIT_START = 8,
	/* The session has started loading an index but not completed it. */
	IS_FLAG_BIT_LOADING = IS_FLAG_BIT_START,
	/* The session has loaded an index, which can handle requests. */
	IS_FLAG_BIT_LOADED,
	/* The session's index has been permanently disabled. */
	IS_FLAG_BIT_DISABLED,
	/* The session's index is suspended. */
	IS_FLAG_BIT_SUSPENDED,
	/* The session is handling some index state change. */
	IS_FLAG_BIT_WAITING,
	/* The session's index is closing and draining requests. */
	IS_FLAG_BIT_CLOSING,
	/* The session is being destroyed and is draining requests. */
	IS_FLAG_BIT_DESTROYING,
};

enum index_session_flag {
	IS_FLAG_LOADED = (1 << IS_FLAG_BIT_LOADED),
	IS_FLAG_LOADING = (1 << IS_FLAG_BIT_LOADING),
	IS_FLAG_DISABLED = (1 << IS_FLAG_BIT_DISABLED),
	IS_FLAG_SUSPENDED = (1 << IS_FLAG_BIT_SUSPENDED),
	IS_FLAG_WAITING = (1 << IS_FLAG_BIT_WAITING),
	IS_FLAG_CLOSING = (1 << IS_FLAG_BIT_CLOSING),
	IS_FLAG_DESTROYING = (1 << IS_FLAG_BIT_DESTROYING),
};

/* Release a reference to an index session. */
static void release_index_session(struct uds_index_session *index_session)
{
	mutex_lock(&index_session->request_mutex);
	if (--index_session->request_count == 0)
		uds_broadcast_cond(&index_session->request_cond);
	mutex_unlock(&index_session->request_mutex);
}

/*
 * Acquire a reference to the index session for an asynchronous index request. The reference must
 * eventually be released with a corresponding call to release_index_session().
 */
static int get_index_session(struct uds_index_session *index_session)
{
	unsigned int state;
	int result = UDS_SUCCESS;

	mutex_lock(&index_session->request_mutex);
	index_session->request_count++;
	state = index_session->state;
	mutex_unlock(&index_session->request_mutex);

	if (state == IS_FLAG_LOADED) {
		return UDS_SUCCESS;
	} else if (state & IS_FLAG_DISABLED) {
		result = UDS_DISABLED;
	} else if ((state & IS_FLAG_LOADING) ||
		   (state & IS_FLAG_SUSPENDED) ||
		   (state & IS_FLAG_WAITING)) {
		result = -EBUSY;
	} else {
		result = UDS_NO_INDEX;
	}

	release_index_session(index_session);
	return result;
}

int uds_launch_request(struct uds_request *request)
{
	size_t internal_size;
	int result;

	if (request->callback == NULL) {
		vdo_log_error("missing required callback");
		return -EINVAL;
	}

	switch (request->type) {
	case UDS_DELETE:
	case UDS_POST:
	case UDS_QUERY:
	case UDS_QUERY_NO_UPDATE:
	case UDS_UPDATE:
		break;
	default:
		vdo_log_error("received invalid callback type");
		return -EINVAL;
	}

	/* Reset all internal fields before processing. */
	internal_size =
		sizeof(struct uds_request) - offsetof(struct uds_request, zone_number);
	// FIXME should be using struct_group for this instead
	memset((char *) request + sizeof(*request) - internal_size, 0, internal_size);

	result = get_index_session(request->session);
	if (result != UDS_SUCCESS)
		return result;

	request->found = false;
	request->unbatched = false;
	request->index = request->session->index;

	uds_enqueue_request(request, STAGE_TRIAGE);
	return UDS_SUCCESS;
}

static void enter_callback_stage(struct uds_request *request)
{
	if (request->status != UDS_SUCCESS) {
		/* All request errors are considered unrecoverable */
		mutex_lock(&request->session->request_mutex);
		request->session->state |= IS_FLAG_DISABLED;
		mutex_unlock(&request->session->request_mutex);
	}

	uds_request_queue_enqueue(request->session->callback_queue, request);
}

static inline void count_once(u64 *count_ptr)
{
	WRITE_ONCE(*count_ptr, READ_ONCE(*count_ptr) + 1);
}

static void update_session_stats(struct uds_request *request)
{
	struct session_stats *session_stats = &request->session->stats;

	count_once(&session_stats->requests);

	switch (request->type) {
	case UDS_POST:
		if (request->found)
			count_once(&session_stats->posts_found);
		else
			count_once(&session_stats->posts_not_found);

		if (request->location == UDS_LOCATION_IN_OPEN_CHAPTER)
			count_once(&session_stats->posts_found_open_chapter);
		else if (request->location == UDS_LOCATION_IN_DENSE)
			count_once(&session_stats->posts_found_dense);
		else if (request->location == UDS_LOCATION_IN_SPARSE)
			count_once(&session_stats->posts_found_sparse);
		break;

	case UDS_UPDATE:
		if (request->found)
			count_once(&session_stats->updates_found);
		else
			count_once(&session_stats->updates_not_found);
		break;

	case UDS_DELETE:
		if (request->found)
			count_once(&session_stats->deletions_found);
		else
			count_once(&session_stats->deletions_not_found);
		break;

	case UDS_QUERY:
	case UDS_QUERY_NO_UPDATE:
		if (request->found)
			count_once(&session_stats->queries_found);
		else
			count_once(&session_stats->queries_not_found);
		break;

	default:
		request->status = VDO_ASSERT(false, "unknown request type: %d",
					     request->type);
	}
}

static void handle_callbacks(struct uds_request *request)
{
	struct uds_index_session *index_session = request->session;

	if (request->status == UDS_SUCCESS)
		update_session_stats(request);

	request->status = uds_status_to_errno(request->status);
	request->callback(request);
	release_index_session(index_session);
}

static int __must_check make_empty_index_session(struct uds_index_session **index_session_ptr)
{
	int result;
	struct uds_index_session *session;

	result = vdo_allocate(1, struct uds_index_session, __func__, &session);
	if (result != VDO_SUCCESS)
		return result;

	mutex_init(&session->request_mutex);
	uds_init_cond(&session->request_cond);
	mutex_init(&session->load_context.mutex);
	uds_init_cond(&session->load_context.cond);

	result = uds_make_request_queue("callbackW", &handle_callbacks,
					&session->callback_queue);
	if (result != UDS_SUCCESS) {
		vdo_free(session);
		return result;
	}

	*index_session_ptr = session;
	return UDS_SUCCESS;
}

int uds_create_index_session(struct uds_index_session **session)
{
	if (session == NULL) {
		vdo_log_error("missing session pointer");
		return -EINVAL;
	}

	return uds_status_to_errno(make_empty_index_session(session));
}

static int __must_check start_loading_index_session(struct uds_index_session *index_session)
{
	int result;

	mutex_lock(&index_session->request_mutex);
	if (index_session->state & IS_FLAG_SUSPENDED) {
		vdo_log_info("Index session is suspended");
		result = -EBUSY;
	} else if (index_session->state != 0) {
		vdo_log_info("Index is already loaded");
		result = -EBUSY;
	} else {
		index_session->state |= IS_FLAG_LOADING;
		result = UDS_SUCCESS;
	}
	mutex_unlock(&index_session->request_mutex);
	return result;
}

static void finish_loading_index_session(struct uds_index_session *index_session,
					 int result)
{
	mutex_lock(&index_session->request_mutex);
	index_session->state &= ~IS_FLAG_LOADING;
	if (result == UDS_SUCCESS)
		index_session->state |= IS_FLAG_LOADED;

	uds_broadcast_cond(&index_session->request_cond);
	mutex_unlock(&index_session->request_mutex);
}

static int initialize_index_session(struct uds_index_session *index_session,
				    enum uds_open_index_type open_type)
{
	int result;
	struct uds_configuration *config;

	result = uds_make_configuration(&index_session->parameters, &config);
	if (result != UDS_SUCCESS) {
		vdo_log_error_strerror(result, "Failed to allocate config");
		return result;
	}

	memset(&index_session->stats, 0, sizeof(index_session->stats));
	result = uds_make_index(config, open_type, &index_session->load_context,
				enter_callback_stage, &index_session->index);
	if (result != UDS_SUCCESS)
		vdo_log_error_strerror(result, "Failed to make index");
	else
		uds_log_configuration(config);

	uds_free_configuration(config);
	return result;
}

static const char *get_open_type_string(enum uds_open_index_type open_type)
{
	switch (open_type) {
	case UDS_CREATE:
		return "creating index";
	case UDS_LOAD:
		return "loading or rebuilding index";
	case UDS_NO_REBUILD:
		return "loading index";
	default:
		return "unknown open method";
	}
}

/*
 * Open an index under the given session. This operation will fail if the
 * index session is suspended, or if there is already an open index.
 */
int uds_open_index(enum uds_open_index_type open_type,
		   const struct uds_parameters *parameters,
		   struct uds_index_session *session)
{
	int result;
	char name[BDEVNAME_SIZE];

	if (parameters == NULL) {
		vdo_log_error("missing required parameters");
		return -EINVAL;
	}
	if (parameters->bdev == NULL) {
		vdo_log_error("missing required block device");
		return -EINVAL;
	}
	if (session == NULL) {
		vdo_log_error("missing required session pointer");
		return -EINVAL;
	}

	result = start_loading_index_session(session);
	if (result != UDS_SUCCESS)
		return uds_status_to_errno(result);

	session->parameters = *parameters;
	format_dev_t(name, parameters->bdev->bd_dev);
	vdo_log_info("%s: %s", get_open_type_string(open_type), name);

	result = initialize_index_session(session, open_type);
	if (result != UDS_SUCCESS)
		vdo_log_error_strerror(result, "Failed %s",
				       get_open_type_string(open_type));

	finish_loading_index_session(session, result);
	return uds_status_to_errno(result);
}

static void wait_for_no_requests_in_progress(struct uds_index_session *index_session)
{
	mutex_lock(&index_session->request_mutex);
	while (index_session->request_count > 0) {
		uds_wait_cond(&index_session->request_cond,
			      &index_session->request_mutex);
	}
	mutex_unlock(&index_session->request_mutex);
}

static int __must_check save_index(struct uds_index_session *index_session)
{
	wait_for_no_requests_in_progress(index_session);
	return uds_save_index(index_session->index);
}

static void suspend_rebuild(struct uds_index_session *session)
{
	mutex_lock(&session->load_context.mutex);
	switch (session->load_context.status) {
	case INDEX_OPENING:
		session->load_context.status = INDEX_SUSPENDING;

		/* Wait until the index indicates that it is not replaying. */
		while ((session->load_context.status != INDEX_SUSPENDED) &&
		       (session->load_context.status != INDEX_READY)) {
			uds_wait_cond(&session->load_context.cond,
				      &session->load_context.mutex);
		}

		break;

	case INDEX_READY:
		/* Index load does not need to be suspended. */
		break;

	case INDEX_SUSPENDED:
	case INDEX_SUSPENDING:
	case INDEX_FREEING:
	default:
		/* These cases should not happen. */
		VDO_ASSERT_LOG_ONLY(false, "Bad load context state %u",
				    session->load_context.status);
		break;
	}
	mutex_unlock(&session->load_context.mutex);
}

/*
 * Suspend index operation, draining all current index requests and preventing new index requests
 * from starting. Optionally saves all index data before returning.
 */
int uds_suspend_index_session(struct uds_index_session *session, bool save)
{
	int result = UDS_SUCCESS;
	bool no_work = false;
	bool rebuilding = false;

	/* Wait for any current index state change to complete. */
	mutex_lock(&session->request_mutex);
	while (session->state & IS_FLAG_CLOSING)
		uds_wait_cond(&session->request_cond, &session->request_mutex);

	if ((session->state & IS_FLAG_WAITING) || (session->state & IS_FLAG_DESTROYING)) {
		no_work = true;
		vdo_log_info("Index session is already changing state");
		result = -EBUSY;
	} else if (session->state & IS_FLAG_SUSPENDED) {
		no_work = true;
	} else if (session->state & IS_FLAG_LOADING) {
		session->state |= IS_FLAG_WAITING;
		rebuilding = true;
	} else if (session->state & IS_FLAG_LOADED) {
		session->state |= IS_FLAG_WAITING;
	} else {
		no_work = true;
		session->state |= IS_FLAG_SUSPENDED;
		uds_broadcast_cond(&session->request_cond);
	}
	mutex_unlock(&session->request_mutex);

	if (no_work)
		return uds_status_to_errno(result);

	if (rebuilding)
		suspend_rebuild(session);
	else if (save)
		result = save_index(session);
	else
		result = uds_flush_index_session(session);

	mutex_lock(&session->request_mutex);
	session->state &= ~IS_FLAG_WAITING;
	session->state |= IS_FLAG_SUSPENDED;
	uds_broadcast_cond(&session->request_cond);
	mutex_unlock(&session->request_mutex);
	return uds_status_to_errno(result);
}

static int replace_device(struct uds_index_session *session, struct block_device *bdev)
{
	int result;

	result = uds_replace_index_storage(session->index, bdev);
	if (result != UDS_SUCCESS)
		return result;

	session->parameters.bdev = bdev;
	return UDS_SUCCESS;
}

/*
 * Resume index operation after being suspended. If the index is suspended and the supplied block
 * device differs from the current backing store, the index will start using the new backing store.
 */
int uds_resume_index_session(struct uds_index_session *session,
			     struct block_device *bdev)
{
	int result = UDS_SUCCESS;
	bool no_work = false;
	bool resume_replay = false;

	mutex_lock(&session->request_mutex);
	if (session->state & IS_FLAG_WAITING) {
		vdo_log_info("Index session is already changing state");
		no_work = true;
		result = -EBUSY;
	} else if (!(session->state & IS_FLAG_SUSPENDED)) {
		/* If not suspended, just succeed. */
		no_work = true;
		result = UDS_SUCCESS;
	} else {
		session->state |= IS_FLAG_WAITING;
		if (session->state & IS_FLAG_LOADING)
			resume_replay = true;
	}
	mutex_unlock(&session->request_mutex);

	if (no_work)
		return result;

	if ((session->index != NULL) && (bdev != session->parameters.bdev)) {
		result = replace_device(session, bdev);
		if (result != UDS_SUCCESS) {
			mutex_lock(&session->request_mutex);
			session->state &= ~IS_FLAG_WAITING;
			uds_broadcast_cond(&session->request_cond);
			mutex_unlock(&session->request_mutex);
			return uds_status_to_errno(result);
		}
	}

	if (resume_replay) {
		mutex_lock(&session->load_context.mutex);
		switch (session->load_context.status) {
		case INDEX_SUSPENDED:
			session->load_context.status = INDEX_OPENING;
			/* Notify the index to start replaying again. */
			uds_broadcast_cond(&session->load_context.cond);
			break;

		case INDEX_READY:
			/* There is no index rebuild to resume. */
			break;

		case INDEX_OPENING:
		case INDEX_SUSPENDING:
		case INDEX_FREEING:
		default:
			/* These cases should not happen; do nothing. */
			VDO_ASSERT_LOG_ONLY(false, "Bad load context state %u",
					    session->load_context.status);
			break;
		}
		mutex_unlock(&session->load_context.mutex);
	}

	mutex_lock(&session->request_mutex);
	session->state &= ~IS_FLAG_WAITING;
	session->state &= ~IS_FLAG_SUSPENDED;
	uds_broadcast_cond(&session->request_cond);
	mutex_unlock(&session->request_mutex);
	return UDS_SUCCESS;
}

static int save_and_free_index(struct uds_index_session *index_session)
{
	int result = UDS_SUCCESS;
	bool suspended;
	struct uds_index *index = index_session->index;

	if (index == NULL)
		return UDS_SUCCESS;

	mutex_lock(&index_session->request_mutex);
	suspended = (index_session->state & IS_FLAG_SUSPENDED);
	mutex_unlock(&index_session->request_mutex);

	if (!suspended) {
		result = uds_save_index(index);
		if (result != UDS_SUCCESS)
			vdo_log_warning_strerror(result,
						 "ignoring error from save_index");
	}
	uds_free_index(index);
	index_session->index = NULL;

	/*
	 * Reset all index state that happens to be in the index
	 * session, so it doesn't affect any future index.
	 */
	mutex_lock(&index_session->load_context.mutex);
	index_session->load_context.status = INDEX_OPENING;
	mutex_unlock(&index_session->load_context.mutex);

	mutex_lock(&index_session->request_mutex);
	/* Only the suspend bit will remain relevant. */
	index_session->state &= IS_FLAG_SUSPENDED;
	mutex_unlock(&index_session->request_mutex);

	return result;
}

/* Save and close the current index. */
int uds_close_index(struct uds_index_session *index_session)
{
	int result = UDS_SUCCESS;

	/* Wait for any current index state change to complete. */
	mutex_lock(&index_session->request_mutex);
	while ((index_session->state & IS_FLAG_WAITING) ||
	       (index_session->state & IS_FLAG_CLOSING)) {
		uds_wait_cond(&index_session->request_cond,
			      &index_session->request_mutex);
	}

	if (index_session->state & IS_FLAG_SUSPENDED) {
		vdo_log_info("Index session is suspended");
		result = -EBUSY;
	} else if ((index_session->state & IS_FLAG_DESTROYING) ||
		   !(index_session->state & IS_FLAG_LOADED)) {
		/* The index doesn't exist, hasn't finished loading, or is being destroyed. */
		result = UDS_NO_INDEX;
	} else {
		index_session->state |= IS_FLAG_CLOSING;
	}
	mutex_unlock(&index_session->request_mutex);
	if (result != UDS_SUCCESS)
		return uds_status_to_errno(result);

	vdo_log_debug("Closing index");
	wait_for_no_requests_in_progress(index_session);
	result = save_and_free_index(index_session);
	vdo_log_debug("Closed index");

	mutex_lock(&index_session->request_mutex);
	index_session->state &= ~IS_FLAG_CLOSING;
	uds_broadcast_cond(&index_session->request_cond);
	mutex_unlock(&index_session->request_mutex);
	return uds_status_to_errno(result);
}

/* This will save and close an open index before destroying the session. */
int uds_destroy_index_session(struct uds_index_session *index_session)
{
	int result;
	bool load_pending = false;

	vdo_log_debug("Destroying index session");

	/* Wait for any current index state change to complete. */
	mutex_lock(&index_session->request_mutex);
	while ((index_session->state & IS_FLAG_WAITING) ||
	       (index_session->state & IS_FLAG_CLOSING)) {
		uds_wait_cond(&index_session->request_cond,
			      &index_session->request_mutex);
	}

	if (index_session->state & IS_FLAG_DESTROYING) {
		mutex_unlock(&index_session->request_mutex);
		vdo_log_info("Index session is already closing");
		return -EBUSY;
	}

	index_session->state |= IS_FLAG_DESTROYING;
	load_pending = ((index_session->state & IS_FLAG_LOADING) &&
			(index_session->state & IS_FLAG_SUSPENDED));
	mutex_unlock(&index_session->request_mutex);

	if (load_pending) {
		/* Tell the index to terminate the rebuild. */
		mutex_lock(&index_session->load_context.mutex);
		if (index_session->load_context.status == INDEX_SUSPENDED) {
			index_session->load_context.status = INDEX_FREEING;
			uds_broadcast_cond(&index_session->load_context.cond);
		}
		mutex_unlock(&index_session->load_context.mutex);

		/* Wait until the load exits before proceeding. */
		mutex_lock(&index_session->request_mutex);
		while (index_session->state & IS_FLAG_LOADING) {
			uds_wait_cond(&index_session->request_cond,
				      &index_session->request_mutex);
		}
		mutex_unlock(&index_session->request_mutex);
	}

	wait_for_no_requests_in_progress(index_session);
	result = save_and_free_index(index_session);
	uds_request_queue_finish(index_session->callback_queue);
	index_session->callback_queue = NULL;
	vdo_log_debug("Destroyed index session");
	vdo_free(index_session);
	return uds_status_to_errno(result);
}

/* Wait until all callbacks for index operations are complete. */
int uds_flush_index_session(struct uds_index_session *index_session)
{
	wait_for_no_requests_in_progress(index_session);
	uds_wait_for_idle_index(index_session->index);
	return UDS_SUCCESS;
}

/* Statistics collection is intended to be thread-safe. */
static void collect_stats(const struct uds_index_session *index_session,
			  struct uds_index_stats *stats)
{
	const struct session_stats *session_stats = &index_session->stats;

	stats->current_time = ktime_to_seconds(current_time_ns(CLOCK_REALTIME));
	stats->posts_found = READ_ONCE(session_stats->posts_found);
	stats->in_memory_posts_found = READ_ONCE(session_stats->posts_found_open_chapter);
	stats->dense_posts_found = READ_ONCE(session_stats->posts_found_dense);
	stats->sparse_posts_found = READ_ONCE(session_stats->posts_found_sparse);
	stats->posts_not_found = READ_ONCE(session_stats->posts_not_found);
	stats->updates_found = READ_ONCE(session_stats->updates_found);
	stats->updates_not_found = READ_ONCE(session_stats->updates_not_found);
	stats->deletions_found = READ_ONCE(session_stats->deletions_found);
	stats->deletions_not_found = READ_ONCE(session_stats->deletions_not_found);
	stats->queries_found = READ_ONCE(session_stats->queries_found);
	stats->queries_not_found = READ_ONCE(session_stats->queries_not_found);
	stats->requests = READ_ONCE(session_stats->requests);
}

int uds_get_index_session_stats(struct uds_index_session *index_session,
				struct uds_index_stats *stats)
{
	if (stats == NULL) {
		vdo_log_error("received a NULL index stats pointer");
		return -EINVAL;
	}

	collect_stats(index_session, stats);
	if (index_session->index != NULL) {
		uds_get_index_stats(index_session->index, stats);
	} else {
		stats->entries_indexed = 0;
		stats->memory_used = 0;
		stats->collisions = 0;
		stats->entries_discarded = 0;
	}

	return UDS_SUCCESS;
}

void uds_wait_cond(struct cond_var *cv, struct mutex *mutex)
{
	DEFINE_WAIT(__wait);

	prepare_to_wait(&cv->wait_queue, &__wait, TASK_IDLE);
	mutex_unlock(mutex);
	schedule();
	finish_wait(&cv->wait_queue, &__wait);
	mutex_lock(mutex);
}