libceph: resend lingering requests with a new tid

Both not yet registered (r_linger && list_empty(&r_linger_item)) and
registered linger requests should use the new tid on resend to avoid
the dup op detection logic on the OSDs, yet we were doing this only for
"registered" case.  Factor out and simplify the "registered" logic and
use the new helper for "not registered" case as well.

Fixes: http://tracker.ceph.com/issues/8806

Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
Reviewed-by: Alex Elder <elder@linaro.org>
This commit is contained in:
Ilya Dryomov 2014-09-03 14:41:45 +04:00 committed by Ilya Dryomov
parent f671b581f1
commit 2cc6128ab2
1 changed files with 54 additions and 19 deletions

View File

@ -30,8 +30,11 @@ static void __send_queued(struct ceph_osd_client *osdc);
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
static void __register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static void __unregister_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static void __enqueue_request(struct ceph_osd_request *req);
static void __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
@ -892,6 +895,37 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
return NULL;
}
static void __kick_linger_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_osd *osd = req->r_osd;
/*
* Linger requests need to be resent with a new tid to avoid
* the dup op detection logic on the OSDs. Achieve this with
* a re-register dance instead of open-coding.
*/
ceph_osdc_get_request(req);
if (!list_empty(&req->r_linger_item))
__unregister_linger_request(osdc, req);
else
__unregister_request(osdc, req);
__register_request(osdc, req);
ceph_osdc_put_request(req);
/*
* Unless request has been registered as both normal and
* lingering, __unregister{,_linger}_request clears r_osd.
* However, here we need to preserve r_osd to make sure we
* requeue on the same OSD.
*/
WARN_ON(req->r_osd || !osd);
req->r_osd = osd;
dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
__enqueue_request(req);
}
/*
* Resubmit requests pending on the given osd.
*/
@ -900,12 +934,14 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
{
struct ceph_osd_request *req, *nreq;
LIST_HEAD(resend);
LIST_HEAD(resend_linger);
int err;
dout("__kick_osd_requests osd%d\n", osd->o_osd);
dout("%s osd%d\n", __func__, osd->o_osd);
err = __reset_osd(osdc, osd);
if (err)
return;
/*
* Build up a list of requests to resend by traversing the
* osd's list of requests. Requests for a given object are
@ -926,33 +962,32 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
list_for_each_entry(req, &osd->o_requests, r_osd_item) {
if (!req->r_sent)
break;
list_move_tail(&req->r_req_lru_item, &resend);
dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
osd->o_osd);
if (!req->r_linger)
if (!req->r_linger) {
dout("%s requeueing %p tid %llu\n", __func__, req,
req->r_tid);
list_move_tail(&req->r_req_lru_item, &resend);
req->r_flags |= CEPH_OSD_FLAG_RETRY;
} else {
list_move_tail(&req->r_req_lru_item, &resend_linger);
}
}
list_splice(&resend, &osdc->req_unsent);
/*
* Linger requests are re-registered before sending, which
* sets up a new tid for each. We add them to the unsent
* list at the end to keep things in tid order.
* Both registered and not yet registered linger requests are
* enqueued with a new tid on the same OSD. We add/move them
* to req_unsent/o_requests at the end to keep things in tid
* order.
*/
list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
r_linger_osd_item) {
/*
* reregister request prior to unregistering linger so
* that r_osd is preserved.
*/
BUG_ON(!list_empty(&req->r_req_lru_item));
__register_request(osdc, req);
list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
__unregister_linger_request(osdc, req);
dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
osd->o_osd);
WARN_ON(!list_empty(&req->r_req_lru_item));
__kick_linger_request(req);
}
list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
__kick_linger_request(req);
}
/*