Contributors: 7
Author Tokens Token Proportion Commits Commit Proportion
Kuniyuki Iwashima 2331 93.84% 1 10.00%
Stanislav Fomichev 69 2.78% 3 30.00%
Andrey Ignatov 65 2.62% 1 10.00%
Andrii Nakryiko 11 0.44% 2 20.00%
Martin KaFai Lau 6 0.24% 1 10.00%
Taichi Nishimura 1 0.04% 1 10.00%
Yucong Sun 1 0.04% 1 10.00%
Total 2484 10


// SPDX-License-Identifier: GPL-2.0
/*
 * Check if we can migrate child sockets.
 *
 *   1. call listen() for 4 server sockets.
 *   2. call connect() for 25 client sockets.
 *   3. call listen() for 1 server socket. (migration target)
 *   4. update a map to migrate all child sockets
 *        to the last server socket (migrate_map[cookie] = 4)
 *   5. call shutdown() for first 4 server sockets
 *        and migrate the requests in the accept queue
 *        to the last server socket.
 *   6. call listen() for the second server socket.
 *   7. call shutdown() for the last server
 *        and migrate the requests in the accept queue
 *        to the second server socket.
 *   8. call listen() for the last server.
 *   9. call shutdown() for the second server
 *        and migrate the requests in the accept queue
 *        to the last server socket.
 *  10. call accept() for the last server socket.
 *
 * Author: Kuniyuki Iwashima <kuniyu@amazon.co.jp>
 */

#include <bpf/bpf.h>
#include <bpf/libbpf.h>

#include "test_progs.h"
#include "test_migrate_reuseport.skel.h"
#include "network_helpers.h"

#ifndef TCP_FASTOPEN_CONNECT
#define TCP_FASTOPEN_CONNECT 30
#endif

#define IFINDEX_LO 1

#define NR_SERVERS 5
#define NR_CLIENTS (NR_SERVERS * 5)
#define MIGRATED_TO (NR_SERVERS - 1)

/* fastopenq->max_qlen and sk->sk_max_ack_backlog */
#define QLEN (NR_CLIENTS * 5)

#define MSG "Hello World\0"
#define MSGLEN 12

static struct migrate_reuseport_test_case {
	const char *name;
	__s64 servers[NR_SERVERS];
	__s64 clients[NR_CLIENTS];
	struct sockaddr_storage addr;
	socklen_t addrlen;
	int family;
	int state;
	bool drop_ack;
	bool expire_synack_timer;
	bool fastopen;
	struct bpf_link *link;
} test_cases[] = {
	{
		.name = "IPv4 TCP_ESTABLISHED  inet_csk_listen_stop",
		.family = AF_INET,
		.state = BPF_TCP_ESTABLISHED,
		.drop_ack = false,
		.expire_synack_timer = false,
		.fastopen = false,
	},
	{
		.name = "IPv4 TCP_SYN_RECV     inet_csk_listen_stop",
		.family = AF_INET,
		.state = BPF_TCP_SYN_RECV,
		.drop_ack = true,
		.expire_synack_timer = false,
		.fastopen = true,
	},
	{
		.name = "IPv4 TCP_NEW_SYN_RECV reqsk_timer_handler",
		.family = AF_INET,
		.state = BPF_TCP_NEW_SYN_RECV,
		.drop_ack = true,
		.expire_synack_timer = true,
		.fastopen = false,
	},
	{
		.name = "IPv4 TCP_NEW_SYN_RECV inet_csk_complete_hashdance",
		.family = AF_INET,
		.state = BPF_TCP_NEW_SYN_RECV,
		.drop_ack = true,
		.expire_synack_timer = false,
		.fastopen = false,
	},
	{
		.name = "IPv6 TCP_ESTABLISHED  inet_csk_listen_stop",
		.family = AF_INET6,
		.state = BPF_TCP_ESTABLISHED,
		.drop_ack = false,
		.expire_synack_timer = false,
		.fastopen = false,
	},
	{
		.name = "IPv6 TCP_SYN_RECV     inet_csk_listen_stop",
		.family = AF_INET6,
		.state = BPF_TCP_SYN_RECV,
		.drop_ack = true,
		.expire_synack_timer = false,
		.fastopen = true,
	},
	{
		.name = "IPv6 TCP_NEW_SYN_RECV reqsk_timer_handler",
		.family = AF_INET6,
		.state = BPF_TCP_NEW_SYN_RECV,
		.drop_ack = true,
		.expire_synack_timer = true,
		.fastopen = false,
	},
	{
		.name = "IPv6 TCP_NEW_SYN_RECV inet_csk_complete_hashdance",
		.family = AF_INET6,
		.state = BPF_TCP_NEW_SYN_RECV,
		.drop_ack = true,
		.expire_synack_timer = false,
		.fastopen = false,
	}
};

static void init_fds(__s64 fds[], int len)
{
	int i;

	for (i = 0; i < len; i++)
		fds[i] = -1;
}

static void close_fds(__s64 fds[], int len)
{
	int i;

	for (i = 0; i < len; i++) {
		if (fds[i] != -1) {
			close(fds[i]);
			fds[i] = -1;
		}
	}
}

static int setup_fastopen(char *buf, int size, int *saved_len, bool restore)
{
	int err = 0, fd, len;

	fd = open("/proc/sys/net/ipv4/tcp_fastopen", O_RDWR);
	if (!ASSERT_NEQ(fd, -1, "open"))
		return -1;

	if (restore) {
		len = write(fd, buf, *saved_len);
		if (!ASSERT_EQ(len, *saved_len, "write - restore"))
			err = -1;
	} else {
		*saved_len = read(fd, buf, size);
		if (!ASSERT_GE(*saved_len, 1, "read")) {
			err = -1;
			goto close;
		}

		err = lseek(fd, 0, SEEK_SET);
		if (!ASSERT_OK(err, "lseek"))
			goto close;

		/* (TFO_CLIENT_ENABLE | TFO_SERVER_ENABLE |
		 *  TFO_CLIENT_NO_COOKIE | TFO_SERVER_COOKIE_NOT_REQD)
		 */
		len = write(fd, "519", 3);
		if (!ASSERT_EQ(len, 3, "write - setup"))
			err = -1;
	}

close:
	close(fd);

	return err;
}

static int drop_ack(struct migrate_reuseport_test_case *test_case,
		    struct test_migrate_reuseport *skel)
{
	if (test_case->family == AF_INET)
		skel->bss->server_port = ((struct sockaddr_in *)
					  &test_case->addr)->sin_port;
	else
		skel->bss->server_port = ((struct sockaddr_in6 *)
					  &test_case->addr)->sin6_port;

	test_case->link = bpf_program__attach_xdp(skel->progs.drop_ack,
						  IFINDEX_LO);
	if (!ASSERT_OK_PTR(test_case->link, "bpf_program__attach_xdp"))
		return -1;

	return 0;
}

static int pass_ack(struct migrate_reuseport_test_case *test_case)
{
	int err;

	err = bpf_link__destroy(test_case->link);
	if (!ASSERT_OK(err, "bpf_link__destroy"))
		return -1;

	test_case->link = NULL;

	return 0;
}

static int start_servers(struct migrate_reuseport_test_case *test_case,
			 struct test_migrate_reuseport *skel)
{
	int i, err, prog_fd, reuseport = 1, qlen = QLEN;

	prog_fd = bpf_program__fd(skel->progs.migrate_reuseport);

	make_sockaddr(test_case->family,
		      test_case->family == AF_INET ? "127.0.0.1" : "::1", 0,
		      &test_case->addr, &test_case->addrlen);

	for (i = 0; i < NR_SERVERS; i++) {
		test_case->servers[i] = socket(test_case->family, SOCK_STREAM,
					       IPPROTO_TCP);
		if (!ASSERT_NEQ(test_case->servers[i], -1, "socket"))
			return -1;

		err = setsockopt(test_case->servers[i], SOL_SOCKET,
				 SO_REUSEPORT, &reuseport, sizeof(reuseport));
		if (!ASSERT_OK(err, "setsockopt - SO_REUSEPORT"))
			return -1;

		err = bind(test_case->servers[i],
			   (struct sockaddr *)&test_case->addr,
			   test_case->addrlen);
		if (!ASSERT_OK(err, "bind"))
			return -1;

		if (i == 0) {
			err = setsockopt(test_case->servers[i], SOL_SOCKET,
					 SO_ATTACH_REUSEPORT_EBPF,
					 &prog_fd, sizeof(prog_fd));
			if (!ASSERT_OK(err,
				       "setsockopt - SO_ATTACH_REUSEPORT_EBPF"))
				return -1;

			err = getsockname(test_case->servers[i],
					  (struct sockaddr *)&test_case->addr,
					  &test_case->addrlen);
			if (!ASSERT_OK(err, "getsockname"))
				return -1;
		}

		if (test_case->fastopen) {
			err = setsockopt(test_case->servers[i],
					 SOL_TCP, TCP_FASTOPEN,
					 &qlen, sizeof(qlen));
			if (!ASSERT_OK(err, "setsockopt - TCP_FASTOPEN"))
				return -1;
		}

		/* All requests will be tied to the first four listeners */
		if (i != MIGRATED_TO) {
			err = listen(test_case->servers[i], qlen);
			if (!ASSERT_OK(err, "listen"))
				return -1;
		}
	}

	return 0;
}

static int start_clients(struct migrate_reuseport_test_case *test_case)
{
	char buf[MSGLEN] = MSG;
	int i, err;

	for (i = 0; i < NR_CLIENTS; i++) {
		test_case->clients[i] = socket(test_case->family, SOCK_STREAM,
					       IPPROTO_TCP);
		if (!ASSERT_NEQ(test_case->clients[i], -1, "socket"))
			return -1;

		/* The attached XDP program drops only the final ACK, so
		 * clients will transition to TCP_ESTABLISHED immediately.
		 */
		err = settimeo(test_case->clients[i], 100);
		if (!ASSERT_OK(err, "settimeo"))
			return -1;

		if (test_case->fastopen) {
			int fastopen = 1;

			err = setsockopt(test_case->clients[i], IPPROTO_TCP,
					 TCP_FASTOPEN_CONNECT, &fastopen,
					 sizeof(fastopen));
			if (!ASSERT_OK(err,
				       "setsockopt - TCP_FASTOPEN_CONNECT"))
				return -1;
		}

		err = connect(test_case->clients[i],
			      (struct sockaddr *)&test_case->addr,
			      test_case->addrlen);
		if (!ASSERT_OK(err, "connect"))
			return -1;

		err = write(test_case->clients[i], buf, MSGLEN);
		if (!ASSERT_EQ(err, MSGLEN, "write"))
			return -1;
	}

	return 0;
}

static int update_maps(struct migrate_reuseport_test_case *test_case,
		       struct test_migrate_reuseport *skel)
{
	int i, err, migrated_to = MIGRATED_TO;
	int reuseport_map_fd, migrate_map_fd;
	__u64 value;

	reuseport_map_fd = bpf_map__fd(skel->maps.reuseport_map);
	migrate_map_fd = bpf_map__fd(skel->maps.migrate_map);

	for (i = 0; i < NR_SERVERS; i++) {
		value = (__u64)test_case->servers[i];
		err = bpf_map_update_elem(reuseport_map_fd, &i, &value,
					  BPF_NOEXIST);
		if (!ASSERT_OK(err, "bpf_map_update_elem - reuseport_map"))
			return -1;

		err = bpf_map_lookup_elem(reuseport_map_fd, &i, &value);
		if (!ASSERT_OK(err, "bpf_map_lookup_elem - reuseport_map"))
			return -1;

		err = bpf_map_update_elem(migrate_map_fd, &value, &migrated_to,
					  BPF_NOEXIST);
		if (!ASSERT_OK(err, "bpf_map_update_elem - migrate_map"))
			return -1;
	}

	return 0;
}

static int migrate_dance(struct migrate_reuseport_test_case *test_case)
{
	int i, err;

	/* Migrate TCP_ESTABLISHED and TCP_SYN_RECV requests
	 * to the last listener based on eBPF.
	 */
	for (i = 0; i < MIGRATED_TO; i++) {
		err = shutdown(test_case->servers[i], SHUT_RDWR);
		if (!ASSERT_OK(err, "shutdown"))
			return -1;
	}

	/* No dance for TCP_NEW_SYN_RECV to migrate based on eBPF */
	if (test_case->state == BPF_TCP_NEW_SYN_RECV)
		return 0;

	/* Note that we use the second listener instead of the
	 * first one here.
	 *
	 * The fist listener is bind()ed with port 0 and,
	 * SOCK_BINDPORT_LOCK is not set to sk_userlocks, so
	 * calling listen() again will bind() the first listener
	 * on a new ephemeral port and detach it from the existing
	 * reuseport group.  (See: __inet_bind(), tcp_set_state())
	 *
	 * OTOH, the second one is bind()ed with a specific port,
	 * and SOCK_BINDPORT_LOCK is set. Thus, re-listen() will
	 * resurrect the listener on the existing reuseport group.
	 */
	err = listen(test_case->servers[1], QLEN);
	if (!ASSERT_OK(err, "listen"))
		return -1;

	/* Migrate from the last listener to the second one.
	 *
	 * All listeners were detached out of the reuseport_map,
	 * so migration will be done by kernel random pick from here.
	 */
	err = shutdown(test_case->servers[MIGRATED_TO], SHUT_RDWR);
	if (!ASSERT_OK(err, "shutdown"))
		return -1;

	/* Back to the existing reuseport group */
	err = listen(test_case->servers[MIGRATED_TO], QLEN);
	if (!ASSERT_OK(err, "listen"))
		return -1;

	/* Migrate back to the last one from the second one */
	err = shutdown(test_case->servers[1], SHUT_RDWR);
	if (!ASSERT_OK(err, "shutdown"))
		return -1;

	return 0;
}

static void count_requests(struct migrate_reuseport_test_case *test_case,
			   struct test_migrate_reuseport *skel)
{
	struct sockaddr_storage addr;
	socklen_t len = sizeof(addr);
	int err, cnt = 0, client;
	char buf[MSGLEN];

	err = settimeo(test_case->servers[MIGRATED_TO], 4000);
	if (!ASSERT_OK(err, "settimeo"))
		goto out;

	for (; cnt < NR_CLIENTS; cnt++) {
		client = accept(test_case->servers[MIGRATED_TO],
				(struct sockaddr *)&addr, &len);
		if (!ASSERT_NEQ(client, -1, "accept"))
			goto out;

		memset(buf, 0, MSGLEN);
		read(client, &buf, MSGLEN);
		close(client);

		if (!ASSERT_STREQ(buf, MSG, "read"))
			goto out;
	}

out:
	ASSERT_EQ(cnt, NR_CLIENTS, "count in userspace");

	switch (test_case->state) {
	case BPF_TCP_ESTABLISHED:
		cnt = skel->bss->migrated_at_close;
		break;
	case BPF_TCP_SYN_RECV:
		cnt = skel->bss->migrated_at_close_fastopen;
		break;
	case BPF_TCP_NEW_SYN_RECV:
		if (test_case->expire_synack_timer)
			cnt = skel->bss->migrated_at_send_synack;
		else
			cnt = skel->bss->migrated_at_recv_ack;
		break;
	default:
		cnt = 0;
	}

	ASSERT_EQ(cnt, NR_CLIENTS, "count in BPF prog");
}

static void run_test(struct migrate_reuseport_test_case *test_case,
		     struct test_migrate_reuseport *skel)
{
	int err, saved_len;
	char buf[16];

	skel->bss->migrated_at_close = 0;
	skel->bss->migrated_at_close_fastopen = 0;
	skel->bss->migrated_at_send_synack = 0;
	skel->bss->migrated_at_recv_ack = 0;

	init_fds(test_case->servers, NR_SERVERS);
	init_fds(test_case->clients, NR_CLIENTS);

	if (test_case->fastopen) {
		memset(buf, 0, sizeof(buf));

		err = setup_fastopen(buf, sizeof(buf), &saved_len, false);
		if (!ASSERT_OK(err, "setup_fastopen - setup"))
			return;
	}

	err = start_servers(test_case, skel);
	if (!ASSERT_OK(err, "start_servers"))
		goto close_servers;

	if (test_case->drop_ack) {
		/* Drop the final ACK of the 3-way handshake and stick the
		 * in-flight requests on TCP_SYN_RECV or TCP_NEW_SYN_RECV.
		 */
		err = drop_ack(test_case, skel);
		if (!ASSERT_OK(err, "drop_ack"))
			goto close_servers;
	}

	/* Tie requests to the first four listeners */
	err = start_clients(test_case);
	if (!ASSERT_OK(err, "start_clients"))
		goto close_clients;

	err = listen(test_case->servers[MIGRATED_TO], QLEN);
	if (!ASSERT_OK(err, "listen"))
		goto close_clients;

	err = update_maps(test_case, skel);
	if (!ASSERT_OK(err, "fill_maps"))
		goto close_clients;

	/* Migrate the requests in the accept queue only.
	 * TCP_NEW_SYN_RECV requests are not migrated at this point.
	 */
	err = migrate_dance(test_case);
	if (!ASSERT_OK(err, "migrate_dance"))
		goto close_clients;

	if (test_case->expire_synack_timer) {
		/* Wait for SYN+ACK timers to expire so that
		 * reqsk_timer_handler() migrates TCP_NEW_SYN_RECV requests.
		 */
		sleep(1);
	}

	if (test_case->link) {
		/* Resume 3WHS and migrate TCP_NEW_SYN_RECV requests */
		err = pass_ack(test_case);
		if (!ASSERT_OK(err, "pass_ack"))
			goto close_clients;
	}

	count_requests(test_case, skel);

close_clients:
	close_fds(test_case->clients, NR_CLIENTS);

	if (test_case->link) {
		err = pass_ack(test_case);
		ASSERT_OK(err, "pass_ack - clean up");
	}

close_servers:
	close_fds(test_case->servers, NR_SERVERS);

	if (test_case->fastopen) {
		err = setup_fastopen(buf, sizeof(buf), &saved_len, true);
		ASSERT_OK(err, "setup_fastopen - restore");
	}
}

void serial_test_migrate_reuseport(void)
{
	struct test_migrate_reuseport *skel;
	int i;

	skel = test_migrate_reuseport__open_and_load();
	if (!ASSERT_OK_PTR(skel, "open_and_load"))
		return;

	for (i = 0; i < ARRAY_SIZE(test_cases); i++) {
		test__start_subtest(test_cases[i].name);
		run_test(&test_cases[i], skel);
	}

	test_migrate_reuseport__destroy(skel);
}