Merge branch 'tipc'

Ying Xue says:

====================
tipc: align TIPC behaviours of waiting for events with other stacks

Comparing the current implementations of waiting for events in TIPC
socket layer with other stacks, TIPC's behaviour is very different
because wait_event_interruptible_timeout()/wait_event_interruptible()
are always used by TIPC to wait for events while relevant socket or
port variables are fed to them as their arguments. As socket lock has
to be released temporarily before the two routines of waiting for
events are called, their arguments associated with socket or port
structures are out of socket lock protection. This might cause
serious issues where the process of calling socket syscall such as
sendsmg(), connect(), accept(), and recvmsg(), cannot be waken up
at all even if proper event arrives or improperly be woken up
although the condition of waking up the process is not satisfied
in practice.

Therefore, aligning its behaviours with similar functions implemented
in other stacks, for instance, sk_stream_wait_connect() and
inet_csk_wait_for_connect() etc, can avoid above risks for us.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2014-01-16 19:11:22 -08:00
commit 8b88a11e44
1 changed files with 186 additions and 110 deletions

View File

@ -55,9 +55,6 @@ struct tipc_sock {
#define tipc_sk(sk) ((struct tipc_sock *)(sk))
#define tipc_sk_port(sk) (tipc_sk(sk)->p)
#define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \
(sock->state == SS_DISCONNECTING))
static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
static void wakeupdispatch(struct tipc_port *tport);
@ -567,6 +564,31 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
return 0;
}
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
{
struct sock *sk = sock->sk;
struct tipc_port *tport = tipc_sk_port(sk);
DEFINE_WAIT(wait);
int done;
do {
int err = sock_error(sk);
if (err)
return err;
if (sock->state == SS_DISCONNECTING)
return -EPIPE;
if (!*timeo_p)
return -EAGAIN;
if (signal_pending(current))
return sock_intr_errno(*timeo_p);
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p, !tport->congested);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
}
/**
* send_msg - send message in connectionless manner
* @iocb: if NULL, indicates that socket lock is already held
@ -588,7 +610,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
struct tipc_port *tport = tipc_sk_port(sk);
struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
int needs_conn;
long timeout_val;
long timeo;
int res = -EINVAL;
if (unlikely(!dest))
@ -625,8 +647,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
reject_rx_queue(sk);
}
timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
do {
if (dest->addrtype == TIPC_ADDR_NAME) {
res = dest_name_check(dest, m);
@ -660,14 +681,9 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
sock->state = SS_CONNECTING;
break;
}
if (timeout_val <= 0L) {
res = timeout_val ? timeout_val : -EWOULDBLOCK;
res = tipc_wait_for_sndmsg(sock, &timeo);
if (res)
break;
}
release_sock(sk);
timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
!tport->congested, timeout_val);
lock_sock(sk);
} while (1);
exit:
@ -676,6 +692,34 @@ exit:
return res;
}
static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
{
struct sock *sk = sock->sk;
struct tipc_port *tport = tipc_sk_port(sk);
DEFINE_WAIT(wait);
int done;
do {
int err = sock_error(sk);
if (err)
return err;
if (sock->state == SS_DISCONNECTING)
return -EPIPE;
else if (sock->state != SS_CONNECTED)
return -ENOTCONN;
if (!*timeo_p)
return -EAGAIN;
if (signal_pending(current))
return sock_intr_errno(*timeo_p);
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p,
(!tport->congested || !tport->connected));
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
}
/**
* send_packet - send a connection-oriented message
* @iocb: if NULL, indicates that socket lock is already held
@ -693,8 +737,8 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
struct sock *sk = sock->sk;
struct tipc_port *tport = tipc_sk_port(sk);
struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
long timeout_val;
int res;
int res = -EINVAL;
long timeo;
/* Handle implied connection establishment */
if (unlikely(dest))
@ -706,30 +750,24 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
if (iocb)
lock_sock(sk);
timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
if (unlikely(sock->state != SS_CONNECTED)) {
if (sock->state == SS_DISCONNECTING)
res = -EPIPE;
else
res = -ENOTCONN;
goto exit;
}
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
do {
if (unlikely(sock->state != SS_CONNECTED)) {
if (sock->state == SS_DISCONNECTING)
res = -EPIPE;
else
res = -ENOTCONN;
break;
}
res = tipc_send(tport->ref, m->msg_iov, total_len);
if (likely(res != -ELINKCONG))
break;
if (timeout_val <= 0L) {
res = timeout_val ? timeout_val : -EWOULDBLOCK;
res = tipc_wait_for_sndpkt(sock, &timeo);
if (res)
break;
}
release_sock(sk);
timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
(!tport->congested || !tport->connected), timeout_val);
lock_sock(sk);
} while (1);
exit:
if (iocb)
release_sock(sk);
return res;
@ -953,6 +991,37 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
return 0;
}
static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo)
{
struct sock *sk = sock->sk;
DEFINE_WAIT(wait);
int err;
for (;;) {
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
if (skb_queue_empty(&sk->sk_receive_queue)) {
if (sock->state == SS_DISCONNECTING) {
err = -ENOTCONN;
break;
}
release_sock(sk);
timeo = schedule_timeout(timeo);
lock_sock(sk);
}
err = 0;
if (!skb_queue_empty(&sk->sk_receive_queue))
break;
err = sock_intr_errno(timeo);
if (signal_pending(current))
break;
err = -EAGAIN;
if (!timeo)
break;
}
finish_wait(sk_sleep(sk), &wait);
return err;
}
/**
* recv_msg - receive packet-oriented message
* @iocb: (unused)
@ -972,7 +1041,7 @@ static int recv_msg(struct kiocb *iocb, struct socket *sock,
struct tipc_port *tport = tipc_sk_port(sk);
struct sk_buff *buf;
struct tipc_msg *msg;
long timeout;
long timeo;
unsigned int sz;
u32 err;
int res;
@ -988,25 +1057,13 @@ static int recv_msg(struct kiocb *iocb, struct socket *sock,
goto exit;
}
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
restart:
/* Look for a message in receive queue; wait if necessary */
while (skb_queue_empty(&sk->sk_receive_queue)) {
if (sock->state == SS_DISCONNECTING) {
res = -ENOTCONN;
goto exit;
}
if (timeout <= 0L) {
res = timeout ? timeout : -EWOULDBLOCK;
goto exit;
}
release_sock(sk);
timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
tipc_rx_ready(sock),
timeout);
lock_sock(sk);
}
res = tipc_wait_for_rcvmsg(sock, timeo);
if (res)
goto exit;
/* Look at first message in receive queue */
buf = skb_peek(&sk->sk_receive_queue);
@ -1078,7 +1135,7 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
struct tipc_port *tport = tipc_sk_port(sk);
struct sk_buff *buf;
struct tipc_msg *msg;
long timeout;
long timeo;
unsigned int sz;
int sz_to_copy, target, needed;
int sz_copied = 0;
@ -1091,31 +1148,19 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
lock_sock(sk);
if (unlikely((sock->state == SS_UNCONNECTED))) {
if (unlikely(sock->state == SS_UNCONNECTED)) {
res = -ENOTCONN;
goto exit;
}
target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
restart:
/* Look for a message in receive queue; wait if necessary */
while (skb_queue_empty(&sk->sk_receive_queue)) {
if (sock->state == SS_DISCONNECTING) {
res = -ENOTCONN;
goto exit;
}
if (timeout <= 0L) {
res = timeout ? timeout : -EWOULDBLOCK;
goto exit;
}
release_sock(sk);
timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
tipc_rx_ready(sock),
timeout);
lock_sock(sk);
}
res = tipc_wait_for_rcvmsg(sock, timeo);
if (res)
goto exit;
/* Look at first message in receive queue */
buf = skb_peek(&sk->sk_receive_queue);
@ -1438,6 +1483,28 @@ static void wakeupdispatch(struct tipc_port *tport)
sk->sk_write_space(sk);
}
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
{
struct sock *sk = sock->sk;
DEFINE_WAIT(wait);
int done;
do {
int err = sock_error(sk);
if (err)
return err;
if (!*timeo_p)
return -ETIMEDOUT;
if (signal_pending(current))
return sock_intr_errno(*timeo_p);
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
}
/**
* connect - establish a connection to another TIPC port
* @sock: socket structure
@ -1453,7 +1520,8 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
struct sock *sk = sock->sk;
struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
struct msghdr m = {NULL,};
unsigned int timeout;
long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
socket_state previous;
int res;
lock_sock(sk);
@ -1475,8 +1543,7 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
goto exit;
}
timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
previous = sock->state;
switch (sock->state) {
case SS_UNCONNECTED:
/* Send a 'SYN-' to destination */
@ -1498,41 +1565,22 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
* case is EINPROGRESS, rather than EALREADY.
*/
res = -EINPROGRESS;
break;
case SS_CONNECTING:
res = -EALREADY;
if (previous == SS_CONNECTING)
res = -EALREADY;
if (!timeout)
goto exit;
timeout = msecs_to_jiffies(timeout);
/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
res = tipc_wait_for_connect(sock, &timeout);
break;
case SS_CONNECTED:
res = -EISCONN;
break;
default:
res = -EINVAL;
goto exit;
break;
}
if (sock->state == SS_CONNECTING) {
if (!timeout)
goto exit;
/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
release_sock(sk);
res = wait_event_interruptible_timeout(*sk_sleep(sk),
sock->state != SS_CONNECTING,
timeout ? (long)msecs_to_jiffies(timeout)
: MAX_SCHEDULE_TIMEOUT);
if (res <= 0) {
if (res == 0)
res = -ETIMEDOUT;
return res;
}
lock_sock(sk);
}
if (unlikely(sock->state == SS_DISCONNECTING))
res = sock_error(sk);
else
res = 0;
exit:
release_sock(sk);
return res;
@ -1563,6 +1611,42 @@ static int listen(struct socket *sock, int len)
return res;
}
static int tipc_wait_for_accept(struct socket *sock, long timeo)
{
struct sock *sk = sock->sk;
DEFINE_WAIT(wait);
int err;
/* True wake-one mechanism for incoming connections: only
* one process gets woken up, not the 'whole herd'.
* Since we do not 'race & poll' for established sockets
* anymore, the common case will execute the loop only once.
*/
for (;;) {
prepare_to_wait_exclusive(sk_sleep(sk), &wait,
TASK_INTERRUPTIBLE);
if (skb_queue_empty(&sk->sk_receive_queue)) {
release_sock(sk);
timeo = schedule_timeout(timeo);
lock_sock(sk);
}
err = 0;
if (!skb_queue_empty(&sk->sk_receive_queue))
break;
err = -EINVAL;
if (sock->state != SS_LISTENING)
break;
err = sock_intr_errno(timeo);
if (signal_pending(current))
break;
err = -EAGAIN;
if (!timeo)
break;
}
finish_wait(sk_sleep(sk), &wait);
return err;
}
/**
* accept - wait for connection request
* @sock: listening socket
@ -1579,7 +1663,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
struct tipc_port *new_tport;
struct tipc_msg *msg;
u32 new_ref;
long timeo;
int res;
lock_sock(sk);
@ -1589,18 +1673,10 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
goto exit;
}
while (skb_queue_empty(&sk->sk_receive_queue)) {
if (flags & O_NONBLOCK) {
res = -EWOULDBLOCK;
goto exit;
}
release_sock(sk);
res = wait_event_interruptible(*sk_sleep(sk),
(!skb_queue_empty(&sk->sk_receive_queue)));
lock_sock(sk);
if (res)
goto exit;
}
timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
res = tipc_wait_for_accept(sock, timeo);
if (res)
goto exit;
buf = skb_peek(&sk->sk_receive_queue);