cregit-Linux how code gets into the kernel

Release 4.10 fs/dlm/lowcomms.c

Directory: fs/dlm
**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
**  This copyrighted material is made available to anyone wishing to use,
**  modify, copy, or redistribute it subject to the terms and conditions
**  of the GNU General Public License v.2.

 * lowcomms.c
 * This is the "low-level" comms layer.
 * It is responsible for sending/receiving messages
 * from other nodes in the cluster.
 * Cluster nodes are referred to by their nodeids. nodeids are
 * simply 32 bit numbers to the locking module - if they need to
 * be expanded for the cluster infrastructure then that is its
 * responsibility. It is this layer's
 * responsibility to resolve these into IP address or
 * whatever it needs for inter-node communication.
 * The comms level is two kernel threads that deal mainly with
 * the receiving of messages from other nodes and passing them
 * up to the mid-level comms layer (which understands the
 * message format) for execution by the locking core, and
 * a send thread which does all the setting up of connections
 * to remote nodes and the sending of data. Threads are not allowed
 * to send their own data because it may cause them to wait in times
 * of high load. Also, this way, the sending thread can collect together
 * messages bound for one node and send them in one block.
 * lowcomms will choose to use either TCP or SCTP as its transport layer
 * depending on the configuration variable 'protocol'. This should be set
 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
 * cluster-wide mechanism as it must be the same on all nodes of the cluster
 * for the DLM to function.

#include <asm/ioctls.h>
#include <net/sock.h>
#include <net/tcp.h>
#include <linux/pagemap.h>
#include <linux/file.h>
#include <linux/mutex.h>
#include <linux/sctp.h>
#include <linux/slab.h>
#include <net/sctp/sctp.h>
#include <net/ipv6.h>

#include "dlm_internal.h"
#include "lowcomms.h"
#include "midcomms.h"
#include "config.h"

#define NEEDED_RMEM (4*1024*1024)

#define CONN_HASH_SIZE 32

/* Number of messages to send before rescheduling */


struct cbuf {
unsigned int base;
unsigned int len;
unsigned int mask;

static void cbuf_add(struct cbuf *cb, int n) { cb->len += n; }


patrick caulfieldpatrick caulfield1365.00%150.00%
david teiglanddavid teigland735.00%150.00%

static int cbuf_data(struct cbuf *cb) { return ((cb->base + cb->len) & cb->mask); }


patrick caulfieldpatrick caulfield2589.29%150.00%
david teiglanddavid teigland310.71%150.00%

static void cbuf_init(struct cbuf *cb, int size) { cb->base = cb->len = 0; cb->mask = size-1; }


patrick caulfieldpatrick caulfield2784.38%150.00%
david teiglanddavid teigland515.62%150.00%

static void cbuf_eat(struct cbuf *cb, int n) { cb->len -= n; cb->base += n; cb->base &= cb->mask; }


patrick caulfieldpatrick caulfield2985.29%150.00%
david teiglanddavid teigland514.71%150.00%

static bool cbuf_empty(struct cbuf *cb) { return cb->len == 0; }


patrick caulfieldpatrick caulfield1583.33%150.00%
david teiglanddavid teigland316.67%150.00%

struct connection { struct socket *sock; /* NULL if not connected */ uint32_t nodeid; /* So we know who we are in the list */ struct mutex sock_mutex; unsigned long flags; #define CF_READ_PENDING 1 #define CF_WRITE_PENDING 2 #define CF_CONNECT_PENDING 3 #define CF_INIT_PENDING 4 #define CF_IS_OTHERCON 5 #define CF_CLOSE 6 #define CF_APP_LIMITED 7 struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; int (*rx_action) (struct connection *); /* What to do when active */ void (*connect_action) (struct connection *); /* What to do to connect */ struct page *rx_page; struct cbuf cb; int retries; #define MAX_CONNECT_RETRIES 3 struct hlist_node list; struct connection *othercon; struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */ void (*orig_error_report)(struct sock *); void (*orig_data_ready)(struct sock *); void (*orig_state_change)(struct sock *); void (*orig_write_space)(struct sock *); }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) /* An entry waiting to be sent */ struct writequeue_entry { struct list_head list; struct page *page; int offset; int len; int end; int users; struct connection *con; }; struct dlm_node_addr { struct list_head list; int nodeid; int addr_count; int curr_addr_index; struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; }; static LIST_HEAD(dlm_node_addrs); static DEFINE_SPINLOCK(dlm_node_addrs_spin); static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; static int dlm_local_count; static int dlm_allow_conn; /* Work queues */ static struct workqueue_struct *recv_workqueue; static struct workqueue_struct *send_workqueue; static struct hlist_head connection_hash[CONN_HASH_SIZE]; static DEFINE_MUTEX(connections_lock); static struct kmem_cache *con_cache; static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); /* This is deliberately very simple because most clusters have simple sequential nodeids, so we should be able to go straight to a connection struct in the array */
static inline int nodeid_hash(int nodeid) { return nodeid & (CONN_HASH_SIZE-1); }


christine caulfieldchristine caulfield19100.00%1100.00%

static struct connection *__find_con(int nodeid) { int r; struct connection *con; r = nodeid_hash(nodeid); hlist_for_each_entry(con, &connection_hash[r], list) { if (con->nodeid == nodeid) return con; } return NULL; }


christine caulfieldchristine caulfield50100.00%1100.00%

/* * If 'allocation' is zero then we don't attempt to create a new * connection structure for this node. */
static struct connection *__nodeid2con(int nodeid, gfp_t alloc) { struct connection *con = NULL; int r; con = __find_con(nodeid); if (con || !alloc) return con; con = kmem_cache_zalloc(con_cache, alloc); if (!con) return NULL; r = nodeid_hash(nodeid); hlist_add_head(&con->list, &connection_hash[r]); con->nodeid = nodeid; mutex_init(&con->sock_mutex); INIT_LIST_HEAD(&con->writequeue); spin_lock_init(&con->writequeue_lock); INIT_WORK(&con->swork, process_send_sockets); INIT_WORK(&con->rwork, process_recv_sockets); /* Setup action pointers for child sockets */ if (con->nodeid) { struct connection *zerocon = __find_con(0); con->connect_action = zerocon->connect_action; if (!con->rx_action) con->rx_action = zerocon->rx_action; } return con; }


patrick caulfieldpatrick caulfield10459.77%360.00%
david teiglanddavid teigland5833.33%120.00%
christine caulfieldchristine caulfield126.90%120.00%

/* Loop round all connections */
static void foreach_conn(void (*conn_func)(struct connection *c)) { int i; struct hlist_node *n; struct connection *con; for (i = 0; i < CONN_HASH_SIZE; i++) { hlist_for_each_entry_safe(con, n, &connection_hash[i], list) conn_func(con); } }


christine caulfieldchristine caulfield61100.00%1100.00%

static struct connection *nodeid2con(int nodeid, gfp_t allocation) { struct connection *con; mutex_lock(&connections_lock); con = __nodeid2con(nodeid, allocation); mutex_unlock(&connections_lock); return con; }


patrick caulfieldpatrick caulfield3479.07%133.33%
david teiglanddavid teigland716.28%133.33%
matthias kaehlckematthias kaehlcke24.65%133.33%

static struct dlm_node_addr *find_node_addr(int nodeid) { struct dlm_node_addr *na; list_for_each_entry(na, &dlm_node_addrs, list) { if (na->nodeid == nodeid) return na; } return NULL; }


david teiglanddavid teigland40100.00%1100.00%

static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) { switch (x->ss_family) { case AF_INET: { struct sockaddr_in *sinx = (struct sockaddr_in *)x; struct sockaddr_in *siny = (struct sockaddr_in *)y; if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) return 0; if (sinx->sin_port != siny->sin_port) return 0; break; } case AF_INET6: { struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) return 0; if (sinx->sin6_port != siny->sin6_port) return 0; break; } default: return 0; } return 1; }


david teiglanddavid teigland153100.00%1100.00%

static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, struct sockaddr *sa_out, bool try_new_addr) { struct sockaddr_storage sas; struct dlm_node_addr *na; if (!dlm_local_count) return -1; spin_lock(&dlm_node_addrs_spin); na = find_node_addr(nodeid); if (na && na->addr_count) { memcpy(&sas, na->addr[na->curr_addr_index], sizeof(struct sockaddr_storage)); if (try_new_addr) { na->curr_addr_index++; if (na->curr_addr_index == na->addr_count) na->curr_addr_index = 0; } } spin_unlock(&dlm_node_addrs_spin); if (!na) return -EEXIST; if (!na->addr_count) return -ENOENT; if (sas_out) memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); if (!sa_out) return 0; if (dlm_local_addr[0]->ss_family == AF_INET) { struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; ret4->sin_addr.s_addr = in4->sin_addr.s_addr; } else { struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; ret6->sin6_addr = in6->sin6_addr; } return 0; }


david teiglanddavid teigland15561.51%233.33%
patrick caulfieldpatrick caulfield4216.67%116.67%
mike christiemike christie3212.70%116.67%
marcelo ricardo leitnermarcelo ricardo leitner218.33%116.67%
steven whitehousesteven whitehouse20.79%116.67%

static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) { struct dlm_node_addr *na; int rv = -EEXIST; int addr_i; spin_lock(&dlm_node_addrs_spin); list_for_each_entry(na, &dlm_node_addrs, list) { if (!na->addr_count) continue; for (addr_i = 0; addr_i < na->addr_count; addr_i++) { if (addr_compare(na->addr[addr_i], addr)) { *nodeid = na->nodeid; rv = 0; goto unlock; } } } unlock: spin_unlock(&dlm_node_addrs_spin); return rv; }


david teiglanddavid teigland8374.77%150.00%
mike christiemike christie2825.23%150.00%

int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) { struct sockaddr_storage *new_addr; struct dlm_node_addr *new_node, *na; new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); if (!new_node) return -ENOMEM; new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); if (!new_addr) { kfree(new_node); return -ENOMEM; } memcpy(new_addr, addr, len); spin_lock(&dlm_node_addrs_spin); na = find_node_addr(nodeid); if (!na) { new_node->nodeid = nodeid; new_node->addr[0] = new_addr; new_node->addr_count = 1; list_add(&new_node->list, &dlm_node_addrs); spin_unlock(&dlm_node_addrs_spin); return 0; } if (na->addr_count >= DLM_MAX_ADDR_COUNT) { spin_unlock(&dlm_node_addrs_spin); kfree(new_addr); kfree(new_node); return -ENOSPC; } na->addr[na->addr_count++] = new_addr; spin_unlock(&dlm_node_addrs_spin); kfree(new_node); return 0; }


david teiglanddavid teigland20097.09%250.00%
patrick caulfieldpatrick caulfield52.43%125.00%
alexey dobriyanalexey dobriyan10.49%125.00%

/* Data available on socket or listen socket received a connect */
static void lowcomms_data_ready(struct sock *sk) { struct connection *con = sock2con(sk); if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) queue_work(recv_workqueue, &con->rwork); }


patrick caulfieldpatrick caulfield3882.61%266.67%
david teiglanddavid teigland817.39%133.33%

static void lowcomms_write_space(struct sock *sk) { struct connection *con = sock2con(sk); if (!con) return; clear_bit(SOCK_NOSPACE, &con->sock->flags); if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { con->sock->sk->sk_write_pending--; clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); } if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) queue_work(send_workqueue, &con->swork); }


david s. millerdavid s. miller5152.58%120.00%
patrick caulfieldpatrick caulfield3232.99%240.00%
david teiglanddavid teigland1313.40%120.00%
eric dumazeteric dumazet11.03%120.00%

static inline void lowcomms_connect_sock(struct connection *con) { if (test_bit(CF_CLOSE, &con->flags)) return; if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) queue_work(send_workqueue, &con->swork); }


patrick caulfieldpatrick caulfield2347.92%133.33%
lars marowsky-breelars marowsky-bree1327.08%133.33%
david teiglanddavid teigland1225.00%133.33%

static void lowcomms_state_change(struct sock *sk) { /* SCTP layer is not calling sk_data_ready when the connection * is done, so we catch the signal through here. Also, it * doesn't switch socket state when entering shutdown, so we * skip the write in that case. */ if (sk->sk_shutdown) { if (sk->sk_shutdown == RCV_SHUTDOWN) lowcomms_data_ready(sk); } else if (sk->sk_state == TCP_ESTABLISHED) { lowcomms_write_space(sk); } }


marcelo ricardo leitnermarcelo ricardo leitner2551.02%133.33%
patrick caulfieldpatrick caulfield1734.69%133.33%
david teiglanddavid teigland714.29%133.33%

int dlm_lowcomms_connect_node(int nodeid) { struct connection *con; if (nodeid == dlm_our_nodeid()) return 0; con = nodeid2con(nodeid, GFP_NOFS); if (!con) return -ENOMEM; lowcomms_connect_sock(con); return 0; }


christine caulfieldchristine caulfield49100.00%1100.00%

static void lowcomms_error_report(struct sock *sk) { struct connection *con; struct sockaddr_storage saddr; int buflen; void (*orig_report)(struct sock *) = NULL; read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); if (con == NULL) goto out; orig_report = con->orig_error_report; if (con->sock == NULL || kernel_getpeername(con->sock, (struct sockaddr *)&saddr, &buflen)) { printk_ratelimited(KERN_ERR "dlm: node %d: socket error " "sending to node %d, port %d, " "sk_err=%d/%d\n", dlm_our_nodeid(), con->nodeid, dlm_config.ci_tcp_port, sk->sk_err, sk->sk_err_soft); } else if (saddr.ss_family == AF_INET) { struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; printk_ratelimited(KERN_ERR "dlm: node %d: socket error " "sending to node %d at %pI4, port %d, " "sk_err=%d/%d\n", dlm_our_nodeid(), con->nodeid, &sin4->sin_addr.s_addr, dlm_config.ci_tcp_port, sk->sk_err, sk->sk_err_soft); } else { struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; printk_ratelimited(KERN_ERR "dlm: node %d: socket error " "sending to node %d at %u.%u.%u.%u, " "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), con->nodeid, sin6->sin6_addr.s6_addr32[0], sin6->sin6_addr.s6_addr32[1], sin6->sin6_addr.s6_addr32[2], sin6->sin6_addr.s6_addr32[3], dlm_config.ci_tcp_port, sk->sk_err, sk->sk_err_soft); } out: read_unlock_bh(&sk->sk_callback_lock); if (orig_report) orig_report(sk); }


robert s. petersonrobert s. peterson277100.00%3100.00%

/* Note: sk_callback_lock must be locked before calling this function. */
static void save_callbacks(struct connection *con, struct sock *sk) { con->orig_data_ready = sk->sk_data_ready; con->orig_state_change = sk->sk_state_change; con->orig_write_space = sk->sk_write_space; con->orig_error_report = sk->sk_error_report; }


robert s. petersonrobert s. peterson2143.75%125.00%
patrick caulfieldpatrick caulfield1633.33%125.00%
david teiglanddavid teigland1020.83%125.00%
ying xueying xue12.08%125.00%

static void restore_callbacks(struct connection *con, struct sock *sk) { write_lock_bh(&sk->sk_callback_lock); sk->sk_user_data = NULL; sk->sk_data_ready = con->orig_data_ready; sk->sk_state_change = con->orig_state_change; sk->sk_write_space = con->orig_write_space; sk->sk_error_report = con->orig_error_report; write_unlock_bh(&sk->sk_callback_lock); }


robert s. petersonrobert s. peterson5477.14%125.00%
patrick caulfieldpatrick caulfield912.86%125.00%
david teiglanddavid teigland57.14%125.00%
steven whitehousesteven whitehouse22.86%125.00%

/* Make a socket active */
static void add_sock(struct socket *sock, struct connection *con, bool save_cb) { struct sock *sk = sock->sk; write_lock_bh(&sk->sk_callback_lock); con->sock = sock; sk->sk_user_data = con; if (save_cb) save_callbacks(con, sk); /* Install a data_ready callback */ sk->sk_data_ready = lowcomms_data_ready; sk->sk_write_space = lowcomms_write_space; sk->sk_state_change = lowcomms_state_change; sk->sk_allocation = GFP_NOFS; sk->sk_error_report = lowcomms_error_report; write_unlock_bh(&sk->sk_callback_lock); }


robert s. petersonrobert s. peterson9293.88%360.00%
steven whitehousesteven whitehouse55.10%120.00%
patrick caulfieldpatrick caulfield11.02%120.00%

/* Add the port number to an IPv6 or 4 sockaddr and return the address length */
static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, int *addr_len) { saddr->ss_family = dlm_local_addr[0]->ss_family; if (saddr->ss_family == AF_INET) { struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; in4_addr->sin_port = cpu_to_be16(port); *addr_len = sizeof(struct sockaddr_in); memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); } else { struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; in6_addr->sin6_port = cpu_to_be16(port); *addr_len = sizeof(struct sockaddr_in6); } memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); }


patrick caulfieldpatrick caulfield11278.87%266.67%
david teiglanddavid teigland3021.13%133.33%

/* Close a remote connection and tidy up */
static void close_connection(struct connection *con, bool and_other, bool tx, bool rx) { clear_bit(CF_CONNECT_PENDING, &con->flags); clear_bit(CF_WRITE_PENDING, &con->flags); if (tx && cancel_work_sync(&con->swork)) log_print("canceled swork for node %d", con->nodeid); if (rx && cancel_work_sync(&con->rwork)) log_print("canceled rwork for node %d", con->nodeid); mutex_lock(&con->sock_mutex); if (con->sock) { if (!test_bit(CF_IS_OTHERCON, &con->flags)) restore_callbacks(con, con->sock->sk); sock_release(con->sock); con->sock = NULL; } if (con->othercon && and_other) { /* Will only re-enter once. */ close_connection(con->othercon, false, true, true); } if (con->rx_page) { __free_page(con->rx_page); con->rx_page = NULL; } con->retries = 0; mutex_unlock(&con->sock_mutex); }


marcelo ricardo leitnermarcelo ricardo leitner7237.11%114.29%
patrick caulfieldpatrick caulfield6634.02%457.14%
david teiglanddavid teigland3216.49%114.29%
robert s. petersonrobert s. peterson2412.37%114.29%

/* Data received from remote end */
static int receive_from_sock(struct connection *con) { int ret = 0; struct msghdr msg = {}; struct kvec iov[2]; unsigned len; int r; int call_again_soon = 0; int nvec; mutex_lock(&con->sock_mutex); if (con->sock == NULL) { ret = -EAGAIN; goto out_close; } if (con->nodeid == 0) { ret = -EINVAL; goto out_close; } if (con->rx_page == NULL) { /* * This doesn't need to be atomic, but I think it should * improve performance if it is. */ con->rx_page = alloc_page(GFP_ATOMIC); if (con->rx_page == NULL) goto out_resched; cbuf_init(&