1
1

Merge pull request #2686 from ggouaillardet/topic/pmix2x_ptl_base_sendrecv

pmix2x: ptl/base: send header and message data together via writev()
Этот коммит содержится в:
Gilles Gouaillardet 2017-01-10 16:26:10 +09:00 коммит произвёл GitHub
родитель 44c1ff60f1 a01960bee5
Коммит 6d59b476de

Просмотреть файл

@ -2,7 +2,7 @@
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Artem Y. Polyakov <artpol84@gmail.com>.
* All rights reserved.
* Copyright (c) 2015-2016 Research Organization for Information Science
* Copyright (c) 2015-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Mellanox Technologies, Inc.
* All rights reserved.
@ -142,46 +142,69 @@ static void lost_connection(pmix_peer_t *peer, pmix_status_t err)
PMIX_REPORT_EVENT(err, _notify_complete);
}
static pmix_status_t send_bytes(int sd, char **buf, size_t *remain)
static pmix_status_t send_msg(int sd, pmix_ptl_send_t *msg)
{
pmix_status_t ret = PMIX_SUCCESS;
int rc;
char *ptr = *buf;
while (0 < *remain) {
rc = write(sd, ptr, *remain);
if (rc < 0) {
if (pmix_socket_errno == EINTR) {
continue;
} else if (pmix_socket_errno == EAGAIN) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
ret = PMIX_ERR_RESOURCE_BUSY;
goto exit;
} else if (pmix_socket_errno == EWOULDBLOCK) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
ret = PMIX_ERR_WOULD_BLOCK;
goto exit;
}
struct iovec iov[2];
int iov_count;
ssize_t remain = msg->sdbytes, rc;
iov[0].iov_base = msg->sdptr;
iov[0].iov_len = msg->sdbytes;
if (!msg->hdr_sent && NULL != msg->data) {
iov[1].iov_base = msg->data->base_ptr;
iov[1].iov_len = ntohl(msg->hdr.nbytes);
remain += ntohl(msg->hdr.nbytes);
iov_count = 2;
} else {
iov_count = 1;
}
retry:
rc = writev(sd, iov, iov_count);
if (PMIX_LIKELY(rc == remain)) {
/* we successfully sent the header and the msg data if any */
msg->hdr_sent = true;
msg->sdbytes = 0;
msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len;
return PMIX_SUCCESS;
} else if (rc < 0) {
if (pmix_socket_errno == EINTR) {
goto retry;
} else if (pmix_socket_errno == EAGAIN) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return PMIX_ERR_RESOURCE_BUSY;
} else if (pmix_socket_errno == EWOULDBLOCK) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return PMIX_ERR_WOULD_BLOCK;
} else {
/* we hit an error and cannot progress this message */
pmix_output(0, "pmix_ptl_base_msg_send_bytes: write failed: %s (%d) [sd = %d]",
pmix_output(0, "pmix_ptl_base: send_msg: write failed: %s (%d) [sd = %d]",
strerror(pmix_socket_errno),
pmix_socket_errno, sd);
ret = PMIX_ERR_UNREACH;
goto exit;
return PMIX_ERR_UNREACH;
}
/* update location */
(*remain) -= rc;
ptr += rc;
} else {
/* short writev. This usually means the kernel buffer is full,
* so there is no point for retrying at that time.
* simply update the msg and return with PMIX_ERR_RESOURCE_BUSY */
if ((size_t)rc < msg->sdbytes) {
/* partial write of the header or the msg data */
msg->sdptr = (char *)msg->sdptr + rc;
msg->sdbytes -= rc;
} else {
/* header was fully written, but only a part of the msg data was written */
msg->hdr_sent = true;
rc -= msg->sdbytes;
msg->sdptr = (char *)msg->data->base_ptr + rc;
msg->sdbytes = ntohl(msg->hdr.nbytes) - rc;
}
return PMIX_ERR_RESOURCE_BUSY;
}
/* we sent the full data block */
exit:
*buf = ptr;
return ret;
}
static pmix_status_t read_bytes(int sd, char **buf, size_t *remain)
@ -253,72 +276,30 @@ void pmix_ptl_base_send_handler(int sd, short flags, void *cbdata)
(NULL == msg) ? "NULL" : "NON-NULL");
if (NULL != msg) {
if (!msg->hdr_sent) {
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler SENDING MSG");
if (PMIX_SUCCESS == (rc = send_msg(peer->sd, msg))) {
// message is complete
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler SENDING HEADER");
if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) {
/* header is completely sent */
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler HEADER SENT");
msg->hdr_sent = true;
/* setup to send the data */
if (NULL == msg->data) {
/* this was a zero-byte msg - nothing more to do */
PMIX_RELEASE(msg);
peer->send_msg = NULL;
goto next;
} else {
/* send the data as a single block */
msg->sdptr = msg->data->base_ptr;
msg->sdbytes = ntohl(msg->hdr.nbytes);
}
/* fall thru and let the send progress */
} else if (PMIX_ERR_RESOURCE_BUSY == rc ||
PMIX_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler RES BUSY OR WOULD BLOCK");
return;
} else {
// report the error
event_del(&peer->send_event);
peer->send_ev_active = false;
PMIX_RELEASE(msg);
peer->send_msg = NULL;
lost_connection(peer, rc);
return;
}
"ptl:base:send_handler MSG SENT");
PMIX_RELEASE(msg);
peer->send_msg = NULL;
} else if (PMIX_ERR_RESOURCE_BUSY == rc ||
PMIX_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler RES BUSY OR WOULD BLOCK");
return;
} else {
// report the error
event_del(&peer->send_event);
peer->send_ev_active = false;
PMIX_RELEASE(msg);
peer->send_msg = NULL;
lost_connection(peer, rc);
return;
}
if (msg->hdr_sent) {
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler SENDING BODY OF MSG");
if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) {
// message is complete
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler BODY SENT");
PMIX_RELEASE(msg);
peer->send_msg = NULL;
} else if (PMIX_ERR_RESOURCE_BUSY == rc ||
PMIX_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler RES BUSY OR WOULD BLOCK");
return;
} else {
// report the error
pmix_output(0, "ptl:base:peer_send_handler: unable to send message ON SOCKET %d",
peer->sd);
event_del(&peer->send_event);
peer->send_ev_active = false;
PMIX_RELEASE(msg);
peer->send_msg = NULL;
lost_connection(peer, rc);
return;
}
}
next:
/* if current message completed - progress any pending sends by
* moving the next in the queue into the "on-deck" position. Note
* that this doesn't mean we send the message right now - we will