dlm for 5.9

This set includes a some improvements to the dlm
 networking layer: improving the ability to trace
 dlm messages for debugging, and improved handling
 of bad messages or disrupted connections.
 -----BEGIN PGP SIGNATURE-----
 
 iQIcBAABAgAGBQJfLCPxAAoJEDgbc8f8gGmqz04P/2hvv/4rXo9AOgnnstvZV1Qy
 Yo01Cy807vB1c3jhIJryM2gG61GNH22RAHc2NcfjJwy04HH/1IEr6P48Po3qYEnS
 8fZ8B9msxpsujVOrRoeBuLN8elI1HftyNVWaVjH7xtD+fLCDLu9i10kv3aeS+DiB
 T6f7yQQv7hgXS3xGvlMr2//aLwGD2ZdcRbkOEGo+k7yUjQbIDH/wdZWcPLh6y4yT
 p20i2ulYKjEZFmXDMa17diONISeGO6iaDhee24XPDwNDp8qI1iPGJsmxltMmn8Qf
 d2HPF1IDh4eM8lCwmqBtjYTnJd6rAW0v3+Ek1+wzQKVeXLFiz/MEyuOldtpsqmMO
 8Og0vr6zfTCjFo8uvyj+cF7Fcj0yIPWg1yb7EauqqxreK8V9GBA1V2ZXYVd8xwea
 thrAUaq8f+PYQ9uy1FsN3xaO3BFN1VpcvHu4/3gU3OudnZZt2Ae670RYHKC0bq8D
 2tSsqaiDnlvniHgh4xvtNIvRANkDS1ZSbkUPZhMHL7DnRJn66oDIfCr7NMbZwvCa
 AS0q6suUFyXFbAEJcY6XWxe3aQ3WuxIClT84MgzX/dAK2Qcl8ryWGGSVc0dp4Vl1
 cd8MtmpnIWsnxqNRl4jn6cfolDheaxL8nouLtJ+3/dC9VkyDyfmrtnM+8aTZKHoa
 3/xrBuVkEJAwkAAr8Pb8
 =qgti
 -----END PGP SIGNATURE-----

Merge tag 'dlm-5.9' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set includes a some improvements to the dlm networking layer:
  improving the ability to trace dlm messages for debugging, and
  improved handling of bad messages or disrupted connections"

* tag 'dlm-5.9' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  fs: dlm: implement tcp graceful shutdown
  fs: dlm: change handling of reconnects
  fs: dlm: don't close socket on invalid message
  fs: dlm: set skb mark per peer socket
  fs: dlm: set skb mark for listen socket
  net: sock: add sock_set_mark
  dlm: Fix kobject memleak
This commit is contained in:
Linus Torvalds 2020-08-06 19:44:25 -07:00
commit 86cfccb669
6 changed files with 164 additions and 28 deletions

View File

@ -73,6 +73,7 @@ struct dlm_cluster {
unsigned int cl_log_debug;
unsigned int cl_log_info;
unsigned int cl_protocol;
unsigned int cl_mark;
unsigned int cl_timewarn_cs;
unsigned int cl_waitwarn_us;
unsigned int cl_new_rsb_count;
@ -96,6 +97,7 @@ enum {
CLUSTER_ATTR_LOG_DEBUG,
CLUSTER_ATTR_LOG_INFO,
CLUSTER_ATTR_PROTOCOL,
CLUSTER_ATTR_MARK,
CLUSTER_ATTR_TIMEWARN_CS,
CLUSTER_ATTR_WAITWARN_US,
CLUSTER_ATTR_NEW_RSB_COUNT,
@ -168,6 +170,7 @@ CLUSTER_ATTR(scan_secs, 1);
CLUSTER_ATTR(log_debug, 0);
CLUSTER_ATTR(log_info, 0);
CLUSTER_ATTR(protocol, 0);
CLUSTER_ATTR(mark, 0);
CLUSTER_ATTR(timewarn_cs, 1);
CLUSTER_ATTR(waitwarn_us, 0);
CLUSTER_ATTR(new_rsb_count, 0);
@ -183,6 +186,7 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug,
[CLUSTER_ATTR_LOG_INFO] = &cluster_attr_log_info,
[CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol,
[CLUSTER_ATTR_MARK] = &cluster_attr_mark,
[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs,
[CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us,
[CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count,
@ -196,6 +200,7 @@ enum {
COMM_ATTR_LOCAL,
COMM_ATTR_ADDR,
COMM_ATTR_ADDR_LIST,
COMM_ATTR_MARK,
};
enum {
@ -228,6 +233,7 @@ struct dlm_comm {
int nodeid;
int local;
int addr_count;
unsigned int mark;
struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
};
@ -465,6 +471,7 @@ static struct config_item *make_comm(struct config_group *g, const char *name)
cm->nodeid = -1;
cm->local = 0;
cm->addr_count = 0;
cm->mark = 0;
return &cm->item;
}
@ -660,8 +667,28 @@ static ssize_t comm_addr_list_show(struct config_item *item, char *buf)
return 4096 - allowance;
}
static ssize_t comm_mark_show(struct config_item *item, char *buf)
{
return sprintf(buf, "%u\n", config_item_to_comm(item)->mark);
}
static ssize_t comm_mark_store(struct config_item *item, const char *buf,
size_t len)
{
unsigned int mark;
int rc;
rc = kstrtouint(buf, 0, &mark);
if (rc)
return rc;
config_item_to_comm(item)->mark = mark;
return len;
}
CONFIGFS_ATTR(comm_, nodeid);
CONFIGFS_ATTR(comm_, local);
CONFIGFS_ATTR(comm_, mark);
CONFIGFS_ATTR_WO(comm_, addr);
CONFIGFS_ATTR_RO(comm_, addr_list);
@ -670,6 +697,7 @@ static struct configfs_attribute *comm_attrs[] = {
[COMM_ATTR_LOCAL] = &comm_attr_local,
[COMM_ATTR_ADDR] = &comm_attr_addr,
[COMM_ATTR_ADDR_LIST] = &comm_attr_addr_list,
[COMM_ATTR_MARK] = &comm_attr_mark,
NULL,
};
@ -829,6 +857,20 @@ int dlm_comm_seq(int nodeid, uint32_t *seq)
return 0;
}
int dlm_comm_mark(int nodeid, unsigned int *mark)
{
struct dlm_comm *cm;
cm = get_comm(nodeid);
if (!cm)
return -ENOENT;
*mark = cm->mark;
put_comm(cm);
return 0;
}
int dlm_our_nodeid(void)
{
return local_comm ? local_comm->nodeid : 0;
@ -855,6 +897,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_LOG_DEBUG 0
#define DEFAULT_LOG_INFO 1
#define DEFAULT_PROTOCOL 0
#define DEFAULT_MARK 0
#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
#define DEFAULT_WAITWARN_US 0
#define DEFAULT_NEW_RSB_COUNT 128
@ -871,6 +914,7 @@ struct dlm_config_info dlm_config = {
.ci_log_debug = DEFAULT_LOG_DEBUG,
.ci_log_info = DEFAULT_LOG_INFO,
.ci_protocol = DEFAULT_PROTOCOL,
.ci_mark = DEFAULT_MARK,
.ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
.ci_waitwarn_us = DEFAULT_WAITWARN_US,
.ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT,

View File

@ -31,6 +31,7 @@ struct dlm_config_info {
int ci_log_debug;
int ci_log_info;
int ci_protocol;
int ci_mark;
int ci_timewarn_cs;
int ci_waitwarn_us;
int ci_new_rsb_count;
@ -45,6 +46,7 @@ void dlm_config_exit(void);
int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
int *count_out);
int dlm_comm_seq(int nodeid, uint32_t *seq);
int dlm_comm_mark(int nodeid, unsigned int *mark);
int dlm_our_nodeid(void);
int dlm_our_addr(struct sockaddr_storage *addr, int num);

View File

@ -622,6 +622,9 @@ static int new_lockspace(const char *name, const char *cluster,
wait_event(ls->ls_recover_lock_wait,
test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
/* let kobject handle freeing of ls if there's an error */
do_unreg = 1;
ls->ls_kobj.kset = dlm_kset;
error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
"%s", ls->ls_name);
@ -629,9 +632,6 @@ static int new_lockspace(const char *name, const char *cluster,
goto out_recoverd;
kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
/* let kobject handle freeing of ls if there's an error */
do_unreg = 1;
/* This uevent triggers dlm_controld in userspace to add us to the
group of nodes that are members of this lockspace (managed by the
cluster infrastructure.) Once it's done that, it tells us who the

View File

@ -63,6 +63,7 @@
/* Number of messages to send before rescheduling */
#define MAX_SEND_MSG_COUNT 25
#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
struct cbuf {
unsigned int base;
@ -110,10 +111,12 @@ struct connection {
#define CF_CLOSE 6
#define CF_APP_LIMITED 7
#define CF_CLOSING 8
#define CF_SHUTDOWN 9
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
struct page *rx_page;
struct cbuf cb;
int retries;
@ -122,6 +125,7 @@ struct connection {
struct connection *othercon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
};
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
@ -218,6 +222,7 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
spin_lock_init(&con->writequeue_lock);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
init_waitqueue_head(&con->shutdown_wait);
/* Setup action pointers for child sockets */
if (con->nodeid) {
@ -619,6 +624,54 @@ static void close_connection(struct connection *con, bool and_other,
clear_bit(CF_CLOSING, &con->flags);
}
static void shutdown_connection(struct connection *con)
{
int ret;
if (cancel_work_sync(&con->swork)) {
log_print("canceled swork for node %d", con->nodeid);
clear_bit(CF_WRITE_PENDING, &con->flags);
}
mutex_lock(&con->sock_mutex);
/* nothing to shutdown */
if (!con->sock) {
mutex_unlock(&con->sock_mutex);
return;
}
set_bit(CF_SHUTDOWN, &con->flags);
ret = kernel_sock_shutdown(con->sock, SHUT_WR);
mutex_unlock(&con->sock_mutex);
if (ret) {
log_print("Connection %p failed to shutdown: %d will force close",
con, ret);
goto force_close;
} else {
ret = wait_event_timeout(con->shutdown_wait,
!test_bit(CF_SHUTDOWN, &con->flags),
DLM_SHUTDOWN_WAIT_TIMEOUT);
if (ret == 0) {
log_print("Connection %p shutdown timed out, will force close",
con);
goto force_close;
}
}
return;
force_close:
clear_bit(CF_SHUTDOWN, &con->flags);
close_connection(con, false, true, true);
}
static void dlm_tcp_shutdown(struct connection *con)
{
if (con->othercon)
shutdown_connection(con->othercon);
shutdown_connection(con);
}
/* Data received from remote end */
static int receive_from_sock(struct connection *con)
{
@ -685,14 +738,14 @@ static int receive_from_sock(struct connection *con)
page_address(con->rx_page),
con->cb.base, con->cb.len,
PAGE_SIZE);
if (ret == -EBADMSG) {
log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
page_address(con->rx_page), con->cb.base,
if (ret < 0) {
log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d",
ret, page_address(con->rx_page), con->cb.base,
con->cb.len, r);
cbuf_eat(&con->cb, r);
} else {
cbuf_eat(&con->cb, ret);
}
if (ret < 0)
goto out_close;
cbuf_eat(&con->cb, ret);
if (cbuf_empty(&con->cb) && !call_again_soon) {
__free_page(con->rx_page);
@ -713,13 +766,18 @@ out_resched:
out_close:
mutex_unlock(&con->sock_mutex);
if (ret != -EAGAIN) {
close_connection(con, true, true, false);
/* Reconnect when there is something to send */
close_connection(con, false, true, false);
if (ret == 0) {
log_print("connection %p got EOF from %d",
con, con->nodeid);
/* handling for tcp shutdown */
clear_bit(CF_SHUTDOWN, &con->flags);
wake_up(&con->shutdown_wait);
/* signal to breaking receive worker */
ret = -1;
}
}
/* Don't return success if we really got EOF */
if (ret == 0)
ret = -EAGAIN;
return ret;
}
@ -803,22 +861,18 @@ static int accept_from_sock(struct connection *con)
spin_lock_init(&othercon->writequeue_lock);
INIT_WORK(&othercon->swork, process_send_sockets);
INIT_WORK(&othercon->rwork, process_recv_sockets);
init_waitqueue_head(&othercon->shutdown_wait);
set_bit(CF_IS_OTHERCON, &othercon->flags);
} else {
/* close other sock con if we have something new */
close_connection(othercon, false, true, false);
}
mutex_lock_nested(&othercon->sock_mutex, 2);
if (!othercon->sock) {
newcon->othercon = othercon;
add_sock(newsock, othercon);
addcon = othercon;
mutex_unlock(&othercon->sock_mutex);
}
else {
printk("Extra connection from node %d attempted\n", nodeid);
result = -EAGAIN;
mutex_unlock(&othercon->sock_mutex);
mutex_unlock(&newcon->sock_mutex);
goto accept_err;
}
newcon->othercon = othercon;
add_sock(newsock, othercon);
addcon = othercon;
mutex_unlock(&othercon->sock_mutex);
}
else {
newcon->rx_action = receive_from_sock;
@ -914,6 +968,7 @@ static void sctp_connect_to_sock(struct connection *con)
int result;
int addr_len;
struct socket *sock;
unsigned int mark;
if (con->nodeid == 0) {
log_print("attempt to connect sock 0 foiled");
@ -944,6 +999,13 @@ static void sctp_connect_to_sock(struct connection *con)
if (result < 0)
goto socket_err;
/* set skb mark */
result = dlm_comm_mark(con->nodeid, &mark);
if (result < 0)
goto bind_err;
sock_set_mark(sock->sk, mark);
con->rx_action = receive_from_sock;
con->connect_action = sctp_connect_to_sock;
add_sock(sock, con);
@ -1006,6 +1068,7 @@ static void tcp_connect_to_sock(struct connection *con)
struct sockaddr_storage saddr, src_addr;
int addr_len;
struct socket *sock = NULL;
unsigned int mark;
int result;
if (con->nodeid == 0) {
@ -1027,6 +1090,13 @@ static void tcp_connect_to_sock(struct connection *con)
if (result < 0)
goto out_err;
/* set skb mark */
result = dlm_comm_mark(con->nodeid, &mark);
if (result < 0)
goto out_err;
sock_set_mark(sock->sk, mark);
memset(&saddr, 0, sizeof(saddr));
result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
if (result < 0) {
@ -1036,6 +1106,7 @@ static void tcp_connect_to_sock(struct connection *con)
con->rx_action = receive_from_sock;
con->connect_action = tcp_connect_to_sock;
con->shutdown_action = dlm_tcp_shutdown;
add_sock(sock, con);
/* Bind to our cluster-known address connecting to avoid
@ -1111,6 +1182,8 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
goto create_out;
}
sock_set_mark(sock->sk, dlm_config.ci_mark);
/* Turn off Nagle's algorithm */
tcp_sock_set_nodelay(sock->sk);
@ -1185,6 +1258,7 @@ static int sctp_listen_for_all(void)
}
sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
sock_set_mark(sock->sk, dlm_config.ci_mark);
sctp_sock_set_nodelay(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock);
@ -1396,7 +1470,7 @@ out:
send_error:
mutex_unlock(&con->sock_mutex);
close_connection(con, true, false, true);
close_connection(con, false, false, true);
/* Requeue the send work. When the work daemon runs again, it will try
a new connection, then call this function again. */
queue_work(send_workqueue, &con->swork);
@ -1528,6 +1602,12 @@ static void stop_conn(struct connection *con)
_stop_conn(con, true);
}
static void shutdown_conn(struct connection *con)
{
if (con->shutdown_action)
con->shutdown_action(con);
}
static void free_conn(struct connection *con)
{
close_connection(con, true, true, true);
@ -1579,6 +1659,7 @@ void dlm_lowcomms_stop(void)
mutex_lock(&connections_lock);
dlm_allow_conn = 0;
mutex_unlock(&connections_lock);
foreach_conn(shutdown_conn);
work_flush();
clean_writequeues();
foreach_conn(free_conn);

View File

@ -2696,6 +2696,7 @@ void sock_no_linger(struct sock *sk);
void sock_set_keepalive(struct sock *sk);
void sock_set_priority(struct sock *sk, u32 priority);
void sock_set_rcvbuf(struct sock *sk, int val);
void sock_set_mark(struct sock *sk, u32 val);
void sock_set_reuseaddr(struct sock *sk);
void sock_set_reuseport(struct sock *sk);
void sock_set_sndtimeo(struct sock *sk, s64 secs);

View File

@ -820,6 +820,14 @@ void sock_set_rcvbuf(struct sock *sk, int val)
}
EXPORT_SYMBOL(sock_set_rcvbuf);
void sock_set_mark(struct sock *sk, u32 val)
{
lock_sock(sk);
sk->sk_mark = val;
release_sock(sk);
}
EXPORT_SYMBOL(sock_set_mark);
/*
* This is meant for all protocols to use and covers goings on
* at the socket level. Everything here is generic.