1
1

pmix2x: ptl/base: send header and message data together via writev()

on Linux, sending the header and then the message data does severely
impact performances of ptl/tcp :
on the receiver, reading the data can often result in an PMIX_ERR_RESOURCE_BUSY
or PMIX_ERR_WOULD_BLOCK, which ends up degrading performances)
this commit send both header and message data at the same time via writev()
and makes ptl/tcp virtually as efficient as ptl/usock.

Short writev generally occur when the kernel buffer is full, so there is no
point for retrying in this case.

fwiw, no such degradation was observed on OSX.

Refs open-mpi/ompi#2657

Signed-off-by: Gilles Gouaillardet <gilles@rist.or.jp>
Этот коммит содержится в:
Gilles Gouaillardet 2017-01-09 21:40:09 +09:00
родитель b320882932
Коммит a01960bee5

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Artem Y. Polyakov <artpol84@gmail.com>. * Copyright (c) 2014 Artem Y. Polyakov <artpol84@gmail.com>.
* All rights reserved. * All rights reserved.
* Copyright (c) 2015-2016 Research Organization for Information Science * Copyright (c) 2015-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved. * and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Mellanox Technologies, Inc. * Copyright (c) 2016 Mellanox Technologies, Inc.
* All rights reserved. * All rights reserved.
@ -142,46 +142,69 @@ static void lost_connection(pmix_peer_t *peer, pmix_status_t err)
PMIX_REPORT_EVENT(err, _notify_complete); PMIX_REPORT_EVENT(err, _notify_complete);
} }
static pmix_status_t send_bytes(int sd, char **buf, size_t *remain) static pmix_status_t send_msg(int sd, pmix_ptl_send_t *msg)
{ {
pmix_status_t ret = PMIX_SUCCESS; pmix_status_t ret = PMIX_SUCCESS;
int rc; struct iovec iov[2];
char *ptr = *buf; int iov_count;
while (0 < *remain) { ssize_t remain = msg->sdbytes, rc;
rc = write(sd, ptr, *remain); iov[0].iov_base = msg->sdptr;
if (rc < 0) { iov[0].iov_len = msg->sdbytes;
if (pmix_socket_errno == EINTR) { if (!msg->hdr_sent && NULL != msg->data) {
continue; iov[1].iov_base = msg->data->base_ptr;
} else if (pmix_socket_errno == EAGAIN) { iov[1].iov_len = ntohl(msg->hdr.nbytes);
/* tell the caller to keep this message on active, remain += ntohl(msg->hdr.nbytes);
* but let the event lib cycle so other messages iov_count = 2;
* can progress while this socket is busy } else {
*/ iov_count = 1;
ret = PMIX_ERR_RESOURCE_BUSY; }
goto exit; retry:
} else if (pmix_socket_errno == EWOULDBLOCK) { rc = writev(sd, iov, iov_count);
/* tell the caller to keep this message on active, if (PMIX_LIKELY(rc == remain)) {
* but let the event lib cycle so other messages /* we successfully sent the header and the msg data if any */
* can progress while this socket is busy msg->hdr_sent = true;
*/ msg->sdbytes = 0;
ret = PMIX_ERR_WOULD_BLOCK; msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len;
goto exit; return PMIX_SUCCESS;
} } else if (rc < 0) {
if (pmix_socket_errno == EINTR) {
goto retry;
} else if (pmix_socket_errno == EAGAIN) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return PMIX_ERR_RESOURCE_BUSY;
} else if (pmix_socket_errno == EWOULDBLOCK) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return PMIX_ERR_WOULD_BLOCK;
} else {
/* we hit an error and cannot progress this message */ /* we hit an error and cannot progress this message */
pmix_output(0, "pmix_ptl_base_msg_send_bytes: write failed: %s (%d) [sd = %d]", pmix_output(0, "pmix_ptl_base: send_msg: write failed: %s (%d) [sd = %d]",
strerror(pmix_socket_errno), strerror(pmix_socket_errno),
pmix_socket_errno, sd); pmix_socket_errno, sd);
ret = PMIX_ERR_UNREACH; return PMIX_ERR_UNREACH;
goto exit;
} }
/* update location */ } else {
(*remain) -= rc; /* short writev. This usually means the kernel buffer is full,
ptr += rc; * so there is no point for retrying at that time.
* simply update the msg and return with PMIX_ERR_RESOURCE_BUSY */
if ((size_t)rc < msg->sdbytes) {
/* partial write of the header or the msg data */
msg->sdptr = (char *)msg->sdptr + rc;
msg->sdbytes -= rc;
} else {
/* header was fully written, but only a part of the msg data was written */
msg->hdr_sent = true;
rc -= msg->sdbytes;
msg->sdptr = (char *)msg->data->base_ptr + rc;
msg->sdbytes = ntohl(msg->hdr.nbytes) - rc;
}
return PMIX_ERR_RESOURCE_BUSY;
} }
/* we sent the full data block */
exit:
*buf = ptr;
return ret;
} }
static pmix_status_t read_bytes(int sd, char **buf, size_t *remain) static pmix_status_t read_bytes(int sd, char **buf, size_t *remain)
@ -253,72 +276,30 @@ void pmix_ptl_base_send_handler(int sd, short flags, void *cbdata)
(NULL == msg) ? "NULL" : "NON-NULL"); (NULL == msg) ? "NULL" : "NON-NULL");
if (NULL != msg) { if (NULL != msg) {
if (!msg->hdr_sent) { pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler SENDING MSG");
if (PMIX_SUCCESS == (rc = send_msg(peer->sd, msg))) {
// message is complete
pmix_output_verbose(2, pmix_globals.debug_output, pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler SENDING HEADER"); "ptl:base:send_handler MSG SENT");
if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) { PMIX_RELEASE(msg);
/* header is completely sent */ peer->send_msg = NULL;
pmix_output_verbose(2, pmix_globals.debug_output, } else if (PMIX_ERR_RESOURCE_BUSY == rc ||
"ptl:base:send_handler HEADER SENT"); PMIX_ERR_WOULD_BLOCK == rc) {
msg->hdr_sent = true; /* exit this event and let the event lib progress */
/* setup to send the data */ pmix_output_verbose(2, pmix_globals.debug_output,
if (NULL == msg->data) { "ptl:base:send_handler RES BUSY OR WOULD BLOCK");
/* this was a zero-byte msg - nothing more to do */ return;
PMIX_RELEASE(msg); } else {
peer->send_msg = NULL; // report the error
goto next; event_del(&peer->send_event);
} else { peer->send_ev_active = false;
/* send the data as a single block */ PMIX_RELEASE(msg);
msg->sdptr = msg->data->base_ptr; peer->send_msg = NULL;
msg->sdbytes = ntohl(msg->hdr.nbytes); lost_connection(peer, rc);
} return;
/* fall thru and let the send progress */
} else if (PMIX_ERR_RESOURCE_BUSY == rc ||
PMIX_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler RES BUSY OR WOULD BLOCK");
return;
} else {
// report the error
event_del(&peer->send_event);
peer->send_ev_active = false;
PMIX_RELEASE(msg);
peer->send_msg = NULL;
lost_connection(peer, rc);
return;
}
} }
if (msg->hdr_sent) {
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler SENDING BODY OF MSG");
if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) {
// message is complete
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler BODY SENT");
PMIX_RELEASE(msg);
peer->send_msg = NULL;
} else if (PMIX_ERR_RESOURCE_BUSY == rc ||
PMIX_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler RES BUSY OR WOULD BLOCK");
return;
} else {
// report the error
pmix_output(0, "ptl:base:peer_send_handler: unable to send message ON SOCKET %d",
peer->sd);
event_del(&peer->send_event);
peer->send_ev_active = false;
PMIX_RELEASE(msg);
peer->send_msg = NULL;
lost_connection(peer, rc);
return;
}
}
next:
/* if current message completed - progress any pending sends by /* if current message completed - progress any pending sends by
* moving the next in the queue into the "on-deck" position. Note * moving the next in the queue into the "on-deck" position. Note
* that this doesn't mean we send the message right now - we will * that this doesn't mean we send the message right now - we will