Release 4.11 net/ceph/messenger.c
#include <linux/ceph/ceph_debug.h>
#include <linux/crc32c.h>
#include <linux/ctype.h>
#include <linux/highmem.h>
#include <linux/inet.h>
#include <linux/kthread.h>
#include <linux/net.h>
#include <linux/nsproxy.h>
#include <linux/sched/mm.h>
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
#ifdef CONFIG_BLOCK
#include <linux/bio.h>
#endif /* CONFIG_BLOCK */
#include <linux/dns_resolver.h>
#include <net/tcp.h>
#include <linux/ceph/ceph_features.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/pagelist.h>
#include <linux/export.h>
/*
* Ceph uses the messenger to exchange ceph_msg messages with other
* hosts in the system. The messenger provides ordered and reliable
* delivery. We tolerate TCP disconnects by reconnecting (with
* exponential backoff) in the case of a fault (disconnection, bad
* crc, protocol error). Acks allow sent messages to be discarded by
* the sender.
*/
/*
* We track the state of the socket on a given connection using
* values defined below. The transition to a new socket state is
* handled by a function which verifies we aren't coming from an
* unexpected state.
*
* --------
* | NEW* | transient initial state
* --------
* | con_sock_state_init()
* v
* ----------
* | CLOSED | initialized, but no socket (and no
* ---------- TCP connection)
* ^ \
* | \ con_sock_state_connecting()
* | ----------------------
* | \
* + con_sock_state_closed() \
* |+--------------------------- \
* | \ \ \
* | ----------- \ \
* | | CLOSING | socket event; \ \
* | ----------- await close \ \
* | ^ \ |
* | | \ |
* | + con_sock_state_closing() \ |
* | / \ | |
* | / --------------- | |
* | / \ v v
* | / --------------
* | / -----------------| CONNECTING | socket created, TCP
* | | / -------------- connect initiated
* | | | con_sock_state_connected()
* | | v
* -------------
* | CONNECTED | TCP connection established
* -------------
*
* State values for ceph_connection->sock_state; NEW is assumed to be 0.
*/
#define CON_SOCK_STATE_NEW 0
/* -> CLOSED */
#define CON_SOCK_STATE_CLOSED 1
/* -> CONNECTING */
#define CON_SOCK_STATE_CONNECTING 2
/* -> CONNECTED or -> CLOSING */
#define CON_SOCK_STATE_CONNECTED 3
/* -> CLOSING or -> CLOSED */
#define CON_SOCK_STATE_CLOSING 4
/* -> CLOSED */
/*
* connection states
*/
#define CON_STATE_CLOSED 1
/* -> PREOPEN */
#define CON_STATE_PREOPEN 2
/* -> CONNECTING, CLOSED */
#define CON_STATE_CONNECTING 3
/* -> NEGOTIATING, CLOSED */
#define CON_STATE_NEGOTIATING 4
/* -> OPEN, CLOSED */
#define CON_STATE_OPEN 5
/* -> STANDBY, CLOSED */
#define CON_STATE_STANDBY 6
/* -> PREOPEN, CLOSED */
/*
* ceph_connection flag bits
*/
#define CON_FLAG_LOSSYTX 0
/* we can close channel or drop
* messages on errors */
#define CON_FLAG_KEEPALIVE_PENDING 1
/* we need to send a keepalive */
#define CON_FLAG_WRITE_PENDING 2
/* we have data ready to send */
#define CON_FLAG_SOCK_CLOSED 3
/* socket state changed to closed */
#define CON_FLAG_BACKOFF 4
/* need to retry queuing delayed work */
static bool con_flag_valid(unsigned long con_flag)
{
switch (con_flag) {
case CON_FLAG_LOSSYTX:
case CON_FLAG_KEEPALIVE_PENDING:
case CON_FLAG_WRITE_PENDING:
case CON_FLAG_SOCK_CLOSED:
case CON_FLAG_BACKOFF:
return true;
default:
return false;
}
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 38 | 100.00% | 1 | 100.00% |
Total | 38 | 100.00% | 1 | 100.00% |
static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
clear_bit(con_flag, &con->flags);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 34 | 100.00% | 1 | 100.00% |
Total | 34 | 100.00% | 1 | 100.00% |
static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
set_bit(con_flag, &con->flags);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 34 | 100.00% | 1 | 100.00% |
Total | 34 | 100.00% | 1 | 100.00% |
static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
return test_bit(con_flag, &con->flags);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 35 | 100.00% | 1 | 100.00% |
Total | 35 | 100.00% | 1 | 100.00% |
static bool con_flag_test_and_clear(struct ceph_connection *con,
unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
return test_and_clear_bit(con_flag, &con->flags);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 35 | 100.00% | 1 | 100.00% |
Total | 35 | 100.00% | 1 | 100.00% |
static bool con_flag_test_and_set(struct ceph_connection *con,
unsigned long con_flag)
{
BUG_ON(!con_flag_valid(con_flag));
return test_and_set_bit(con_flag, &con->flags);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 35 | 100.00% | 1 | 100.00% |
Total | 35 | 100.00% | 1 | 100.00% |
/* Slab caches for frequently-allocated structures */
static struct kmem_cache *ceph_msg_cache;
static struct kmem_cache *ceph_msg_data_cache;
/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
#ifdef CONFIG_LOCKDEP
static struct lock_class_key socket_class;
#endif
/*
* When skipping (ignoring) a block of input we read it into a "skip
* buffer," which is this many bytes in size.
*/
#define SKIP_BUF_SIZE 1024
static void queue_con(struct ceph_connection *con);
static void cancel_con(struct ceph_connection *con);
static void ceph_con_workfn(struct work_struct *);
static void con_fault(struct ceph_connection *con);
/*
* Nicely render a sockaddr as a string. An array of formatted
* strings is used, to approximate reentrancy.
*/
#define ADDR_STR_COUNT_LOG 5
/* log2(# address strings in array) */
#define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG)
#define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1)
#define MAX_ADDR_STR_LEN 64
/* 54 is enough */
static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
static atomic_t addr_str_seq = ATOMIC_INIT(0);
static struct page *zero_page;
/* used in certain error cases */
const char *ceph_pr_addr(const struct sockaddr_storage *ss)
{
int i;
char *s;
struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
s = addr_str[i];
switch (ss->ss_family) {
case AF_INET:
snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr,
ntohs(in4->sin_port));
break;
case AF_INET6:
snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr,
ntohs(in6->sin6_port));
break;
default:
snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
ss->ss_family);
}
return s;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 124 | 91.18% | 3 | 37.50% |
Alex Elder | 11 | 8.09% | 4 | 50.00% |
Yehuda Sadeh Weinraub | 1 | 0.74% | 1 | 12.50% |
Total | 136 | 100.00% | 8 | 100.00% |
EXPORT_SYMBOL(ceph_pr_addr);
static void encode_my_addr(struct ceph_messenger *msgr)
{
memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
ceph_encode_addr(&msgr->my_enc_addr);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 41 | 100.00% | 1 | 100.00% |
Total | 41 | 100.00% | 1 | 100.00% |
/*
* work queue for all reading and writing to/from the socket.
*/
static struct workqueue_struct *ceph_msgr_wq;
static int ceph_msgr_slab_init(void)
{
BUG_ON(ceph_msg_cache);
ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
if (!ceph_msg_cache)
return -ENOMEM;
BUG_ON(ceph_msg_data_cache);
ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
if (ceph_msg_data_cache)
return 0;
kmem_cache_destroy(ceph_msg_cache);
ceph_msg_cache = NULL;
return -ENOMEM;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 63 | 96.92% | 2 | 66.67% |
Geliang Tang | 2 | 3.08% | 1 | 33.33% |
Total | 65 | 100.00% | 3 | 100.00% |
static void ceph_msgr_slab_exit(void)
{
BUG_ON(!ceph_msg_data_cache);
kmem_cache_destroy(ceph_msg_data_cache);
ceph_msg_data_cache = NULL;
BUG_ON(!ceph_msg_cache);
kmem_cache_destroy(ceph_msg_cache);
ceph_msg_cache = NULL;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 38 | 100.00% | 2 | 100.00% |
Total | 38 | 100.00% | 2 | 100.00% |
static void _ceph_msgr_exit(void)
{
if (ceph_msgr_wq) {
destroy_workqueue(ceph_msgr_wq);
ceph_msgr_wq = NULL;
}
BUG_ON(zero_page == NULL);
put_page(zero_page);
zero_page = NULL;
ceph_msgr_slab_exit();
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 38 | 90.48% | 3 | 60.00% |
Benoît Canet | 3 | 7.14% | 1 | 20.00% |
Kirill A. Shutemov | 1 | 2.38% | 1 | 20.00% |
Total | 42 | 100.00% | 5 | 100.00% |
int ceph_msgr_init(void)
{
if (ceph_msgr_slab_init())
return -ENOMEM;
BUG_ON(zero_page != NULL);
zero_page = ZERO_PAGE(0);
get_page(zero_page);
/*
* The number of active work items is limited by the number of
* connections, so leave @max_active at default.
*/
ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
if (ceph_msgr_wq)
return 0;
pr_err("msgr_init failed to create workqueue\n");
_ceph_msgr_exit();
return -ENOMEM;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 26 | 39.39% | 2 | 25.00% |
Alex Elder | 24 | 36.36% | 2 | 25.00% |
Benoît Canet | 9 | 13.64% | 1 | 12.50% |
Tejun Heo | 4 | 6.06% | 1 | 12.50% |
Ilya Dryomov | 2 | 3.03% | 1 | 12.50% |
Kirill A. Shutemov | 1 | 1.52% | 1 | 12.50% |
Total | 66 | 100.00% | 8 | 100.00% |
EXPORT_SYMBOL(ceph_msgr_init);
void ceph_msgr_exit(void)
{
BUG_ON(ceph_msgr_wq == NULL);
_ceph_msgr_exit();
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 10 | 58.82% | 2 | 66.67% |
Sage Weil | 7 | 41.18% | 1 | 33.33% |
Total | 17 | 100.00% | 3 | 100.00% |
EXPORT_SYMBOL(ceph_msgr_exit);
void ceph_msgr_flush(void)
{
flush_workqueue(ceph_msgr_wq);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 9 | 75.00% | 1 | 50.00% |
Yehuda Sadeh Weinraub | 3 | 25.00% | 1 | 50.00% |
Total | 12 | 100.00% | 2 | 100.00% |
EXPORT_SYMBOL(ceph_msgr_flush);
/* Connection socket state transition functions */
static void con_sock_state_init(struct ceph_connection *con)
{
int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CLOSED);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 32 | 56.14% | 2 | 66.67% |
Alex Elder | 25 | 43.86% | 1 | 33.33% |
Total | 57 | 100.00% | 3 | 100.00% |
static void con_sock_state_connecting(struct ceph_connection *con)
{
int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CONNECTING);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 30 | 52.63% | 2 | 50.00% |
Alex Elder | 25 | 43.86% | 1 | 25.00% |
Jim Schutt | 2 | 3.51% | 1 | 25.00% |
Total | 57 | 100.00% | 4 | 100.00% |
static void con_sock_state_connected(struct ceph_connection *con)
{
int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CONNECTED);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 35 | 61.40% | 1 | 25.00% |
Sage Weil | 12 | 21.05% | 2 | 50.00% |
Jim Schutt | 10 | 17.54% | 1 | 25.00% |
Total | 57 | 100.00% | 4 | 100.00% |
static void con_sock_state_closing(struct ceph_connection *con)
{
int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
old_state != CON_SOCK_STATE_CONNECTED &&
old_state != CON_SOCK_STATE_CLOSING))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CLOSING);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 40 | 61.54% | 1 | 33.33% |
Sage Weil | 25 | 38.46% | 2 | 66.67% |
Total | 65 | 100.00% | 3 | 100.00% |
static void con_sock_state_closed(struct ceph_connection *con)
{
int old_state;
old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
old_state != CON_SOCK_STATE_CLOSING &&
old_state != CON_SOCK_STATE_CONNECTING &&
old_state != CON_SOCK_STATE_CLOSED))
printk("%s: unexpected old state %d\n", __func__, old_state);
dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
CON_SOCK_STATE_CLOSED);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 43 | 62.32% | 1 | 25.00% |
Sage Weil | 26 | 37.68% | 3 | 75.00% |
Total | 69 | 100.00% | 4 | 100.00% |
/*
* socket callback functions
*/
/* data available on socket, or listen socket received a connect */
static void ceph_sock_data_ready(struct sock *sk)
{
struct ceph_connection *con = sk->sk_user_data;
if (atomic_read(&con->msgr->stopping)) {
return;
}
if (sk->sk_state != TCP_CLOSE_WAIT) {
dout("%s on %p state = %lu, queueing work\n", __func__,
con, con->state);
queue_con(con);
}
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 51 | 80.95% | 1 | 33.33% |
Guanjun He | 8 | 12.70% | 1 | 33.33% |
Alex Elder | 4 | 6.35% | 1 | 33.33% |
Total | 63 | 100.00% | 3 | 100.00% |
/* socket has buffer space for writing */
static void ceph_sock_write_space(struct sock *sk)
{
struct ceph_connection *con = sk->sk_user_data;
/* only queue to workqueue if there is data we want to write,
* and there is sufficient space in the socket buffer to accept
* more data. clear SOCK_NOSPACE so that ceph_sock_write_space()
* doesn't get called again until try_write() fills the socket
* buffer. See net/ipv4/tcp_input.c:tcp_check_space()
* and net/core/stream.c:sk_stream_write_space().
*/
if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) {
if (sk_stream_is_writeable(sk)) {
dout("%s %p queueing write work\n", __func__, con);
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
queue_con(con);
}
} else {
dout("%s %p nothing to write\n", __func__, con);
}
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 62 | 78.48% | 1 | 20.00% |
Alex Elder | 11 | 13.92% | 2 | 40.00% |
Jim Schutt | 5 | 6.33% | 1 | 20.00% |
Eric Dumazet | 1 | 1.27% | 1 | 20.00% |
Total | 79 | 100.00% | 5 | 100.00% |
/* socket's state has changed */
static void ceph_sock_state_change(struct sock *sk)
{
struct ceph_connection *con = sk->sk_user_data;
dout("%s %p state = %lu sk_state = %u\n", __func__,
con, con->state, sk->sk_state);
switch (sk->sk_state) {
case TCP_CLOSE:
dout("%s TCP_CLOSE\n", __func__);
case TCP_CLOSE_WAIT:
dout("%s TCP_CLOSE_WAIT\n", __func__);
con_sock_state_closing(con);
con_flag_set(con, CON_FLAG_SOCK_CLOSED);
queue_con(con);
break;
case TCP_ESTABLISHED:
dout("%s TCP_ESTABLISHED\n", __func__);
con_sock_state_connected(con);
queue_con(con);
break;
default: /* Everything else is uninteresting */
break;
}
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 77 | 71.96% | 1 | 16.67% |
Alex Elder | 30 | 28.04% | 5 | 83.33% |
Total | 107 | 100.00% | 6 | 100.00% |
/*
* set up socket callbacks
*/
static void set_sock_callbacks(struct socket *sock,
struct ceph_connection *con)
{
struct sock *sk = sock->sk;
sk->sk_user_data = con;
sk->sk_data_ready = ceph_sock_data_ready;
sk->sk_write_space = ceph_sock_write_space;
sk->sk_state_change = ceph_sock_state_change;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 46 | 93.88% | 1 | 50.00% |
Alex Elder | 3 | 6.12% | 1 | 50.00% |
Total | 49 | 100.00% | 2 | 100.00% |
/*
* socket helpers
*/
/*
* initiate connection to a remote socket.
*/
static int ceph_tcp_connect(struct ceph_connection *con)
{
struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
struct socket *sock;
unsigned int noio_flag;
int ret;
BUG_ON(con->sock);
/* sock_create_kern() allocates with GFP_KERNEL */
noio_flag = memalloc_noio_save();
ret = sock_create_kern(read_pnet(&con->msgr->net), paddr->ss_family,
SOCK_STREAM, IPPROTO_TCP, &sock);
memalloc_noio_restore(noio_flag);
if (ret)
return ret;
sock->sk->sk_allocation = GFP_NOFS;
#ifdef CONFIG_LOCKDEP
lockdep_set_class(&sock->sk->sk_lock, &socket_class);
#endif
set_sock_callbacks(sock, con);
dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
con_sock_state_connecting(con);
ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
O_NONBLOCK);
if (ret == -EINPROGRESS) {
dout("connect %s EINPROGRESS sk_state = %u\n",
ceph_pr_addr(&con->peer_addr.in_addr),
sock->sk->sk_state);
} else if (ret < 0) {
pr_err("connect %s error %d\n",
ceph_pr_addr(&con->peer_addr.in_addr), ret);
sock_release(sock);
return ret;
}
if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) {
int optval = 1;
ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
(char *)&optval, sizeof(optval));
if (ret)
pr_err("kernel_setsockopt(TCP_NODELAY) failed: %d",
ret);
}
con->sock = sock;
return 0;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 200 | 68.26% | 4 | 33.33% |
Chaitanya Huilgol | 47 | 16.04% | 1 | 8.33% |
Ilya Dryomov | 32 | 10.92% | 3 | 25.00% |
Alex Elder | 10 | 3.41% | 2 | 16.67% |
Yehuda Sadeh Weinraub | 3 | 1.02% | 1 | 8.33% |
Eric W. Biedermann | 1 | 0.34% | 1 | 8.33% |
Total | 293 | 100.00% | 12 | 100.00% |
static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
{
struct kvec iov = {buf, len};
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
int r;
iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, len);
r = sock_recvmsg(sock, &msg, msg.msg_flags);
if (r == -EAGAIN)
r = 0;
return r;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 74 | 81.32% | 2 | 66.67% |
Al Viro | 17 | 18.68% | 1 | 33.33% |
Total | 91 | 100.00% | 3 | 100.00% |
static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
int page_offset, size_t length)
{
struct bio_vec bvec = {
.bv_page = page,
.bv_offset = page_offset,
.bv_len = length
};
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
int r;
BUG_ON(page_offset + length > PAGE_SIZE);
iov_iter_bvec(&msg.msg_iter, READ | ITER_BVEC, &bvec, 1, length);
r = sock_recvmsg(sock, &msg, msg.msg_flags);
if (r == -EAGAIN)
r = 0;
return r;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Al Viro | 68 | 59.13% | 1 | 50.00% |
Alex Elder | 47 | 40.87% | 1 | 50.00% |
Total | 115 | 100.00% | 2 | 100.00% |
/*
* write something. @more is true if caller will be sending more data
* shortly.
*/
static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
size_t kvlen, size_t len, int more)
{
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
int r;
if (more)
msg.msg_flags |= MSG_MORE;
else
msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
if (r == -EAGAIN)
r = 0;
return r;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 89 | 100.00% | 2 | 100.00% |
Total | 89 | 100.00% | 2 | 100.00% |
static int __ceph_tcp_sendpage(struct socket *sock, struct page *page,
int offset, size_t size, bool more)
{
int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
int ret;
ret = kernel_sendpage(sock, page, offset, size, flags);
if (ret == -EAGAIN)
ret = 0;
return ret;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Alex Elder | 71 | 98.61% | 2 | 66.67% |
Chunwei Chen | 1 | 1.39% | 1 | 33.33% |
Total | 72 | 100.00% | 3 | 100.00% |
static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
int offset, size_t size, bool more)
{
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
struct bio_vec bvec;
int ret;
/* sendpage cannot properly handle pages with page_count == 0,
* we need to fallback to sendmsg if that's the case */
if (page_count(page) >= 1)
return __ceph_tcp_sendpage(sock, page, offset, size, more);
bvec.bv_page = page;
bvec.bv_offset = offset;
bvec.bv_len = size;
if (more)
msg.msg_flags |= MSG_MORE;
else
msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
iov_iter_bvec(&msg.msg_iter, WRITE | ITER_BVEC, &bvec, 1, size);
ret = sock_sendmsg(sock, &msg);
if (ret == -EAGAIN)
ret = 0;
return ret;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Chunwei Chen | 76 | 51.35% | 1 | 50.00% |
Al Viro | 72 | 48.65% | 1 | 50.00% |
Total | 148 | 100.00% | 2 | 100.00% |
/*
* Shutdown/close the socket for the given connection.
*/
static int con_close_socket(struct ceph_connection *con)
{
int rc = 0;
dout("con_close_socket on %p sock %p\n", con, con->sock);
if (con->sock) {
rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
sock_release(con->sock);
con->sock = NULL;
}
/*
* Forcibly clear the SOCK_CLOSED flag. It gets set
* independent of the connection mutex, and we could have
* received a socket close event before we had the chance to
* shut the socket down.
*/
con_flag_clear(con, CON_FLAG_SOCK_CLOSED);
con_sock_state_closed(con);
return rc;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 75 | 92.59% | 3 | 60.00% |
Alex Elder | 6 | 7.41% | 2 | 40.00% |
Total | 81 | 100.00% | 5 | 100.00% |
/*
* Reset a connection. Discard all incoming and outgoing messages
* and clear *_seq state.
*/
static void ceph_msg_remove(struct ceph_msg *msg)
{
list_del_init(&msg->list_head);
ceph_msg_put(msg);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 24 | 100.00% | 1 | 100.00% |
Total | 24 | 100.00% | 1 | 100.00% |
static void ceph_msg_remove_list(struct list_head *head)
{
while (!list_empty(head)) {
struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
list_head);
ceph_msg_remove(msg);
}
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 41 | 100.00% | 1 | 100.00% |
Total | 41 | 100.00% | 1 | 100.00% |
static void reset_connection(struct ceph_connection *con)
{
/* reset connection, out_queue, msg_ and connect_seq */
/* discard existing out_queue and msg_seq */
dout("reset_connection %p\n", con);
ceph_msg_remove_list(&con->out_queue);
ceph_msg_remove_list(&con->out_sent);
if (con->in_msg) {
BUG_ON(con->in_msg->con != con);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
}
con->connect_seq = 0;
con->out_seq = 0;
if (con->out_msg) {
BUG_ON(con->out_msg->con != con);
ceph_msg_put(con->out_msg);
con->out_msg = NULL;
}
con->in_seq = 0;
con->in_seq_acked = 0;
con->out_skip = 0;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 102 | 78.46% | 5 | 62.50% |
Ilya Dryomov | 17 | 13.08% | 2 | 25.00% |
Alex Elder | 11 | 8.46% | 1 | 12.50% |
Total | 130 | 100.00% | 8 | 100.00% |
/*
* mark a peer down. drop any open connections.
*/
void ceph_con_close(struct ceph_connection *con)
{
mutex_lock(&con->mutex);
dout("con_close %p peer %s\n", con,
ceph_pr_addr(&con->peer_addr.in_addr));
con->state = CON_STATE_CLOSED;
con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */
con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING);
con_flag_clear(con, CON_FLAG_WRITE_PENDING);
con_flag_clear(con, CON_FLAG_BACKOFF);
reset_connection(con);
con->peer_global_seq = 0;
cancel_con(con);
con_close_socket(con);
mutex_unlock(&con->mutex);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 86 | 86.87% | 8 | 72.73% |
Alex Elder | 11 | 11.11% | 1 | 9.09% |
Ilya Dryomov | 1 | 1.01% | 1 | 9.09% |
Yehuda Sadeh Weinraub | 1 | 1.01% | 1 | 9.09% |
Total | 99 | 100.00% | 11 | 100.00% |
EXPORT_SYMBOL(ceph_con_close);
/*
* Reopen a closed connection, with a new peer address.
*/
void ceph_con_open(struct ceph_connection *con,
__u8 entity_type, __u64 entity_num,
struct ceph_entity_addr *addr)
{
mutex_lock(&con->mutex);
dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
WARN_ON(con->state != CON_STATE_CLOSED);
con->state = CON_STATE_PREOPEN;
con->peer_name.type = (__u8) entity_type;
con->peer_name.num = cpu_to_le64(entity_num);
memcpy(&con->peer_addr, addr, sizeof(*addr));
con->delay = 0; /* reset backoff memory */
mutex_unlock(&con->mutex);
queue_con(con);
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 115 | 98.29% | 5 | 71.43% |
Yehuda Sadeh Weinraub | 1 | 0.85% | 1 | 14.29% |
Alex Elder | 1 | 0.85% | 1 | 14.29% |
Total | 117 | 100.00% | 7 | 100.00% |
EXPORT_SYMBOL(ceph_con_open);
/*
* return true if this connection ever successfully opened
*/
bool ceph_con_opened(struct ceph_connection *con)
{
return con->connect_seq > 0;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 17 | 100.00% | 1 | 100.00% |
Total | 17 | 100.00% | 1 | 100.00% |
/*
* initialize a new connection.
*/
void ceph_con_init(struct ceph_connection *con, void *private,
const struct ceph_connection_operations *ops,
struct ceph_messenger *msgr)
{
dout("con_init %p\n", con);
memset(con, 0, sizeof(*con));
con->private = private;
con->ops = ops;
con->msgr = msgr;
con_sock_state_init(con);
mutex_init(&con->mutex);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
con->state = CON_STATE_CLOSED;
}
Contributors
Person | Tokens | Prop | Commits | CommitProp |
Sage Weil | 81 | 75.00% | 3 | 42.86% |
Alex Elder | 26 | 24.07% | 3 | 42.86% |
Ilya Dryomov | 1 | 0.93% | 1 | 14.29% |
Total | 108 | 100.00% | 7 | 100.00% |
EXPORT_SYMBOL(ceph_con_init);
/*
* We maintain a global counter to order connection attempts. Get
* a unique seq greater than @gt.
*/
static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
{
u32 ret;
spin_lock(&msgr->global_seq_lock);
if (msgr->global_seq < gt)