Skip to content

Commit

Permalink
Fix buffer posting and FI_EAGAIN (#26617)
Browse files Browse the repository at this point in the history
When a receive buffer is flagged as full via FI_MULTI_RECV re-post it
immediately while libfabric fills the other buffer.

Also, when fi_recvmsg returns FI_EAGAIN try again after calling
fi_cq_read to progress the endpoint.

[Reviewed by @jabraham17, thank you.]
  • Loading branch information
jhh67 authored Feb 12, 2025
2 parents 6fdb5db + 37dc452 commit 8bd732c
Showing 1 changed file with 99 additions and 37 deletions.
136 changes: 99 additions & 37 deletions runtime/src/comm/ofi/comm-ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -3165,6 +3165,25 @@ void init_ofiForAms(void) {
// set up two of these and swap back and forth between them, to hedge
// against receiving "buffer filled and released" events out of order
// with respect to the messages stored within them.

// There are two receive buffers and we alternate between them. If there
// were only one buffer then there might be a window during which there is
// no available buffer space because we are processing the last message in
// the buffer while new messages are still being sent. Instead, we
// double-buffer. When the current buffer has been consumed up to a
// threshold (defined by FI_OPT_MIN_MULTI_RECV above), libfabric will tell
// us the buffer is (almost) full by setting the FI_MULTI_RECV flag in a
// completion event and switch to the other buffer. In response we re-post
// the buffer.
//
// One issue is knowing when there are no lingering dependencies on a buffer
// so we can repost it, since doing so will cause it to be filled with new
// messages. There are two types of active messages in the buffer; some are
// handled synchronously by the active message handler itself, and some are
// handled asynchronously by calling chpl_task_startMovedTask to create a
// new task to execute the active message. chpl_task_startMovedTask copies
// its arguments, so in either case there are no lingering dependencies on
// the message buffer.
//
CHPL_CALLOC_SZ(amLZs[0], 1, amLZSize);
CHPL_CALLOC_SZ(amLZs[1], 1, amLZSize);
Expand All @@ -3190,16 +3209,16 @@ void init_ofiForAms(void) {
ofi_rxBuffer = ofi_msg_reqs[0].msg_iov->iov_base;
ofi_rxEnd = (void *) ((char *) ofi_rxBuffer +
ofi_msg_reqs[0].msg_iov->iov_len);

for (int i = 0; i < 2; i++) {
memset(ofi_msg_reqs[i].msg_iov->iov_base, '\0',
ofi_msg_reqs[i].msg_iov->iov_len);
OFI_CHK(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[i], FI_MULTI_RECV));
DBG_PRINTF(DBG_AM_BUF,
"pre-post fi_recvmsg(AMLZs %p, len %#zx)",
"post fi_recvmsg(AMLZs %p, len %#zx)",
ofi_msg_reqs[i].msg_iov->iov_base,
ofi_msg_reqs[i].msg_iov->iov_len);
}

init_amHandling();
}

Expand Down Expand Up @@ -5117,6 +5136,30 @@ void processRxAmReqCntr(void) {
ofi_rxCount += todo;
}

//
// Post a receive buffer.
//
static
chpl_bool postBuffer(int i) {
chpl_bool posted = true;
int rc;
OFI_CHK_2(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[i], FI_MULTI_RECV), rc,
-FI_EAGAIN);
if (rc == -FI_EAGAIN) {
DBG_PRINTF(DBG_AM_BUF,
"(re)post fi_recvmsg(AMLZs %p, len %#zx) returned EAGAIN",
ofi_msg_reqs[i].msg_iov->iov_base,
ofi_msg_reqs[i].msg_iov->iov_len);
posted = false;
} else {
DBG_PRINTF(DBG_AM_BUF,
"(re)post fi_recvmsg(AMLZs %p, len %#zx) succeeded",
ofi_msg_reqs[i].msg_iov->iov_base,
ofi_msg_reqs[i].msg_iov->iov_len);
}
return posted;
}

static
void processRxAmReqCQ(void) {
//
Expand All @@ -5125,44 +5168,63 @@ void processRxAmReqCQ(void) {
struct fi_cq_data_entry cqes[5];
const size_t maxEvents = sizeof(cqes) / sizeof(cqes[0]);
ssize_t ret;
CHK_TRUE((ret = fi_cq_read(ofi_rxCQ, cqes, maxEvents)) > 0
|| ret == -FI_EAGAIN
|| ret == -FI_EAVAIL);
if (ret == -FI_EAVAIL) {
reportCQError(ofi_rxCQ);
}

const size_t numEvents = (ret == -FI_EAGAIN) ? 0 : ret;

for (int i = 0; i < numEvents; i++) {
if ((cqes[i].flags & FI_RECV) != 0) {
//
// This event is for an inbound AM request. Handle it.
//
amRequest_t* req = (amRequest_t*) cqes[i].buf;
DBG_PRINTF(DBG_AM_BUF,
"CQ rx AM req @ buffer offset %zd, sz %zd, seqId %s",
(char*) req - (char*) ofi_iov_reqs[ofi_msg_i].iov_base,
cqes[i].len, am_seqIdStr(req));
DBG_PRINTF(DBG_AM | DBG_AM_RECV,
"rx AM req: %s",
am_reqStr(chpl_nodeID, req, cqes[i].len));
(void) handleAmReq(req);
chpl_bool post = false;
do {
CHK_TRUE((ret = fi_cq_read(ofi_rxCQ, cqes, maxEvents)) > 0
|| ret == -FI_EAGAIN
|| ret == -FI_EAVAIL);
if (ret == -FI_EAVAIL) {
reportCQError(ofi_rxCQ);
}
if ((cqes[i].flags & FI_MULTI_RECV) != 0) {
//
// Multi-receive buffer filled; post the other one.
//
ofi_msg_i = 1 - ofi_msg_i;
OFI_CHK(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[ofi_msg_i], FI_MULTI_RECV));
DBG_PRINTF(DBG_AM_BUF,
"re-post fi_recvmsg(AMLZs %p, len %#zx)",
ofi_msg_reqs[ofi_msg_i].msg_iov->iov_base,
ofi_msg_reqs[ofi_msg_i].msg_iov->iov_len);

//
// Post the other buffer if we were unable to do it when we received
// FI_MULTI_RECV below.
//
if (post) {
DBG_PRINTF(DBG_AM_BUF, "post pending\n");
if (postBuffer(1-ofi_msg_i) == true) {
post = false;
}
}

CHK_TRUE((cqes[i].flags & ~(FI_MSG | FI_RECV | FI_MULTI_RECV)) == 0);
}
const size_t numEvents = (ret == -FI_EAGAIN) ? 0 : ret;

for (int i = 0; i < numEvents; i++) {
if ((cqes[i].flags & FI_RECV) != 0) {
//
// This event is for an inbound AM request. Handle it.
//
amRequest_t* req = (amRequest_t*) cqes[i].buf;
DBG_PRINTF(DBG_AM_BUF,
"CQ rx AM req @ buffer offset %zd, sz %zd, seqId %s %s",
(char*) req - (char*) ofi_iov_reqs[ofi_msg_i].iov_base,
cqes[i].len, am_seqIdStr(req),
(cqes[i].flags & FI_MULTI_RECV) ? "FI_MULTI_RECV" : "");
DBG_PRINTF(DBG_AM | DBG_AM_RECV,
"rx AM req: %s",
am_reqStr(chpl_nodeID, req, cqes[i].len));
(void) handleAmReq(req);
}
if ((cqes[i].flags & FI_MULTI_RECV) != 0) {
//
// Multi-receive buffer filled; libfabric has switched to the other
// buffer. Repost this one.
//

if (postBuffer(ofi_msg_i) == false) {
//
// Buffer was not posted due to FI_EAGAIN. Go around the outer loop
// again which will call fi_cq_read to progress the endpoint and
// then try reposting the buffer.
//
post = true;
}
ofi_msg_i = 1-ofi_msg_i;
}
CHK_TRUE((cqes[i].flags & ~(FI_MSG | FI_RECV | FI_MULTI_RECV)) == 0);
}
} while(post);
}

static
Expand Down

0 comments on commit 8bd732c

Please sign in to comment.