tipc: fix unlimited bundling of small messages

We have identified a problem with the "oversubscription" policy in the
link transmission code.

When small messages are transmitted, and the sending link has reached
the transmit window limit, those messages will be bundled and put into
the link backlog queue. However, bundles of data messages are counted
at the 'CRITICAL' level, so that the counter for that level, instead of
the counter for the real, bundled message's level is the one being
increased.
Subsequent, to-be-bundled data messages at non-CRITICAL levels continue
to be tested against the unchanged counter for their own level, while
contributing to an unrestrained increase at the CRITICAL backlog level.

This leaves a gap in congestion control algorithm for small messages
that can result in starvation for other users or a "real" CRITICAL
user. Even that eventually can lead to buffer exhaustion & link reset.

We fix this by keeping a 'target_bskb' buffer pointer at each levels,
then when bundling, we only bundle messages at the same importance
level only. This way, we know exactly how many slots a certain level
have occupied in the queue, so can manage level congestion accurately.

By bundling messages at the same level, we even have more benefits. Let
consider this:
- One socket sends 64-byte messages at the 'CRITICAL' level;
- Another sends 4096-byte messages at the 'LOW' level;

When a 64-byte message comes and is bundled the first time, we put the
overhead of message bundle to it (+ 40-byte header, data copy, etc.)
for later use, but the next message can be a 4096-byte one that cannot
be bundled to the previous one. This means the last bundle carries only
one payload message which is totally inefficient, as for the receiver
also! Later on, another 64-byte message comes, now we make a new bundle
and the same story repeats...

With the new bundling algorithm, this will not happen, the 64-byte
messages will be bundled together even when the 4096-byte message(s)
comes in between. However, if the 4096-byte messages are sent at the
same level i.e. 'CRITICAL', the bundling algorithm will again cause the
same overhead.

Also, the same will happen even with only one socket sending small
messages at a rate close to the link transmit's one, so that, when one
message is bundled, it's transmitted shortly. Then, another message
comes, a new bundle is created and so on...

We will solve this issue radically by another patch.

Fixes: 365ad353c2 ("tipc: reduce risk of user starvation during link congestion")
Reported-by: Hoang Le <hoang.h.le@dektech.com.au>
Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Tuong Lien 2019-10-02 18:49:43 +07:00 committed by David S. Miller
parent a761129e36
commit e95584a889
2 changed files with 19 additions and 15 deletions

View File

@ -160,6 +160,7 @@ struct tipc_link {
struct { struct {
u16 len; u16 len;
u16 limit; u16 limit;
struct sk_buff *target_bskb;
} backlog[5]; } backlog[5];
u16 snd_nxt; u16 snd_nxt;
u16 window; u16 window;
@ -880,6 +881,7 @@ static void link_prepare_wakeup(struct tipc_link *l)
void tipc_link_reset(struct tipc_link *l) void tipc_link_reset(struct tipc_link *l)
{ {
struct sk_buff_head list; struct sk_buff_head list;
u32 imp;
__skb_queue_head_init(&list); __skb_queue_head_init(&list);
@ -901,11 +903,10 @@ void tipc_link_reset(struct tipc_link *l)
__skb_queue_purge(&l->deferdq); __skb_queue_purge(&l->deferdq);
__skb_queue_purge(&l->backlogq); __skb_queue_purge(&l->backlogq);
__skb_queue_purge(&l->failover_deferdq); __skb_queue_purge(&l->failover_deferdq);
l->backlog[TIPC_LOW_IMPORTANCE].len = 0; for (imp = 0; imp <= TIPC_SYSTEM_IMPORTANCE; imp++) {
l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; l->backlog[imp].len = 0;
l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; l->backlog[imp].target_bskb = NULL;
l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; }
l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
kfree_skb(l->reasm_buf); kfree_skb(l->reasm_buf);
kfree_skb(l->reasm_tnlmsg); kfree_skb(l->reasm_tnlmsg);
kfree_skb(l->failover_reasm_skb); kfree_skb(l->failover_reasm_skb);
@ -947,7 +948,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
struct sk_buff_head *transmq = &l->transmq; struct sk_buff_head *transmq = &l->transmq;
struct sk_buff_head *backlogq = &l->backlogq; struct sk_buff_head *backlogq = &l->backlogq;
struct sk_buff *skb, *_skb, *bskb; struct sk_buff *skb, *_skb, **tskb;
int pkt_cnt = skb_queue_len(list); int pkt_cnt = skb_queue_len(list);
int rc = 0; int rc = 0;
@ -999,19 +1000,21 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
seqno++; seqno++;
continue; continue;
} }
if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) { tskb = &l->backlog[imp].target_bskb;
if (tipc_msg_bundle(*tskb, hdr, mtu)) {
kfree_skb(__skb_dequeue(list)); kfree_skb(__skb_dequeue(list));
l->stats.sent_bundled++; l->stats.sent_bundled++;
continue; continue;
} }
if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) { if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) {
kfree_skb(__skb_dequeue(list)); kfree_skb(__skb_dequeue(list));
__skb_queue_tail(backlogq, bskb); __skb_queue_tail(backlogq, *tskb);
l->backlog[msg_importance(buf_msg(bskb))].len++; l->backlog[imp].len++;
l->stats.sent_bundled++; l->stats.sent_bundled++;
l->stats.sent_bundles++; l->stats.sent_bundles++;
continue; continue;
} }
l->backlog[imp].target_bskb = NULL;
l->backlog[imp].len += skb_queue_len(list); l->backlog[imp].len += skb_queue_len(list);
skb_queue_splice_tail_init(list, backlogq); skb_queue_splice_tail_init(list, backlogq);
} }
@ -1027,6 +1030,7 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
u16 seqno = l->snd_nxt; u16 seqno = l->snd_nxt;
u16 ack = l->rcv_nxt - 1; u16 ack = l->rcv_nxt - 1;
u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
u32 imp;
while (skb_queue_len(&l->transmq) < l->window) { while (skb_queue_len(&l->transmq) < l->window) {
skb = skb_peek(&l->backlogq); skb = skb_peek(&l->backlogq);
@ -1037,7 +1041,10 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
break; break;
__skb_dequeue(&l->backlogq); __skb_dequeue(&l->backlogq);
hdr = buf_msg(skb); hdr = buf_msg(skb);
l->backlog[msg_importance(hdr)].len--; imp = msg_importance(hdr);
l->backlog[imp].len--;
if (unlikely(skb == l->backlog[imp].target_bskb))
l->backlog[imp].target_bskb = NULL;
__skb_queue_tail(&l->transmq, skb); __skb_queue_tail(&l->transmq, skb);
/* next retransmit attempt */ /* next retransmit attempt */
if (link_is_bc_sndlink(l)) if (link_is_bc_sndlink(l))

View File

@ -543,10 +543,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
bmsg = buf_msg(_skb); bmsg = buf_msg(_skb);
tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0, tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0,
INT_H_SIZE, dnode); INT_H_SIZE, dnode);
if (msg_isdata(msg)) msg_set_importance(bmsg, msg_importance(msg));
msg_set_importance(bmsg, TIPC_CRITICAL_IMPORTANCE);
else
msg_set_importance(bmsg, TIPC_SYSTEM_IMPORTANCE);
msg_set_seqno(bmsg, msg_seqno(msg)); msg_set_seqno(bmsg, msg_seqno(msg));
msg_set_ack(bmsg, msg_ack(msg)); msg_set_ack(bmsg, msg_ack(msg));
msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));