diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c index 572160057f..063909da28 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c @@ -2,7 +2,7 @@ * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014 Artem Y. Polyakov . * All rights reserved. - * Copyright (c) 2015-2016 Research Organization for Information Science + * Copyright (c) 2015-2017 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016 Mellanox Technologies, Inc. * All rights reserved. @@ -142,46 +142,69 @@ static void lost_connection(pmix_peer_t *peer, pmix_status_t err) PMIX_REPORT_EVENT(err, _notify_complete); } -static pmix_status_t send_bytes(int sd, char **buf, size_t *remain) +static pmix_status_t send_msg(int sd, pmix_ptl_send_t *msg) { pmix_status_t ret = PMIX_SUCCESS; - int rc; - char *ptr = *buf; - while (0 < *remain) { - rc = write(sd, ptr, *remain); - if (rc < 0) { - if (pmix_socket_errno == EINTR) { - continue; - } else if (pmix_socket_errno == EAGAIN) { - /* tell the caller to keep this message on active, - * but let the event lib cycle so other messages - * can progress while this socket is busy - */ - ret = PMIX_ERR_RESOURCE_BUSY; - goto exit; - } else if (pmix_socket_errno == EWOULDBLOCK) { - /* tell the caller to keep this message on active, - * but let the event lib cycle so other messages - * can progress while this socket is busy - */ - ret = PMIX_ERR_WOULD_BLOCK; - goto exit; - } + struct iovec iov[2]; + int iov_count; + ssize_t remain = msg->sdbytes, rc; + iov[0].iov_base = msg->sdptr; + iov[0].iov_len = msg->sdbytes; + if (!msg->hdr_sent && NULL != msg->data) { + iov[1].iov_base = msg->data->base_ptr; + iov[1].iov_len = ntohl(msg->hdr.nbytes); + remain += ntohl(msg->hdr.nbytes); + iov_count = 2; + } else { + iov_count = 1; + } +retry: + rc = writev(sd, iov, iov_count); + if (PMIX_LIKELY(rc == remain)) { + /* we successfully sent the header and the msg data if any */ + msg->hdr_sent = true; + msg->sdbytes = 0; + msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len; + return PMIX_SUCCESS; + } else if (rc < 0) { + if (pmix_socket_errno == EINTR) { + goto retry; + } else if (pmix_socket_errno == EAGAIN) { + /* tell the caller to keep this message on active, + * but let the event lib cycle so other messages + * can progress while this socket is busy + */ + return PMIX_ERR_RESOURCE_BUSY; + } else if (pmix_socket_errno == EWOULDBLOCK) { + /* tell the caller to keep this message on active, + * but let the event lib cycle so other messages + * can progress while this socket is busy + */ + return PMIX_ERR_WOULD_BLOCK; + } else { /* we hit an error and cannot progress this message */ - pmix_output(0, "pmix_ptl_base_msg_send_bytes: write failed: %s (%d) [sd = %d]", + pmix_output(0, "pmix_ptl_base: send_msg: write failed: %s (%d) [sd = %d]", strerror(pmix_socket_errno), pmix_socket_errno, sd); - ret = PMIX_ERR_UNREACH; - goto exit; + return PMIX_ERR_UNREACH; } - /* update location */ - (*remain) -= rc; - ptr += rc; + } else { + /* short writev. This usually means the kernel buffer is full, + * so there is no point for retrying at that time. + * simply update the msg and return with PMIX_ERR_RESOURCE_BUSY */ + if ((size_t)rc < msg->sdbytes) { + /* partial write of the header or the msg data */ + msg->sdptr = (char *)msg->sdptr + rc; + msg->sdbytes -= rc; + } else { + /* header was fully written, but only a part of the msg data was written */ + msg->hdr_sent = true; + rc -= msg->sdbytes; + msg->sdptr = (char *)msg->data->base_ptr + rc; + msg->sdbytes = ntohl(msg->hdr.nbytes) - rc; + } + return PMIX_ERR_RESOURCE_BUSY; } - /* we sent the full data block */ -exit: - *buf = ptr; - return ret; } static pmix_status_t read_bytes(int sd, char **buf, size_t *remain) @@ -253,72 +276,30 @@ void pmix_ptl_base_send_handler(int sd, short flags, void *cbdata) (NULL == msg) ? "NULL" : "NON-NULL"); if (NULL != msg) { - if (!msg->hdr_sent) { + pmix_output_verbose(2, pmix_globals.debug_output, + "ptl:base:send_handler SENDING MSG"); + if (PMIX_SUCCESS == (rc = send_msg(peer->sd, msg))) { + // message is complete pmix_output_verbose(2, pmix_globals.debug_output, - "ptl:base:send_handler SENDING HEADER"); - if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) { - /* header is completely sent */ - pmix_output_verbose(2, pmix_globals.debug_output, - "ptl:base:send_handler HEADER SENT"); - msg->hdr_sent = true; - /* setup to send the data */ - if (NULL == msg->data) { - /* this was a zero-byte msg - nothing more to do */ - PMIX_RELEASE(msg); - peer->send_msg = NULL; - goto next; - } else { - /* send the data as a single block */ - msg->sdptr = msg->data->base_ptr; - msg->sdbytes = ntohl(msg->hdr.nbytes); - } - /* fall thru and let the send progress */ - } else if (PMIX_ERR_RESOURCE_BUSY == rc || - PMIX_ERR_WOULD_BLOCK == rc) { - /* exit this event and let the event lib progress */ - pmix_output_verbose(2, pmix_globals.debug_output, - "ptl:base:send_handler RES BUSY OR WOULD BLOCK"); - return; - } else { - // report the error - event_del(&peer->send_event); - peer->send_ev_active = false; - PMIX_RELEASE(msg); - peer->send_msg = NULL; - lost_connection(peer, rc); - return; - } + "ptl:base:send_handler MSG SENT"); + PMIX_RELEASE(msg); + peer->send_msg = NULL; + } else if (PMIX_ERR_RESOURCE_BUSY == rc || + PMIX_ERR_WOULD_BLOCK == rc) { + /* exit this event and let the event lib progress */ + pmix_output_verbose(2, pmix_globals.debug_output, + "ptl:base:send_handler RES BUSY OR WOULD BLOCK"); + return; + } else { + // report the error + event_del(&peer->send_event); + peer->send_ev_active = false; + PMIX_RELEASE(msg); + peer->send_msg = NULL; + lost_connection(peer, rc); + return; } - if (msg->hdr_sent) { - pmix_output_verbose(2, pmix_globals.debug_output, - "ptl:base:send_handler SENDING BODY OF MSG"); - if (PMIX_SUCCESS == (rc = send_bytes(peer->sd, &msg->sdptr, &msg->sdbytes))) { - // message is complete - pmix_output_verbose(2, pmix_globals.debug_output, - "ptl:base:send_handler BODY SENT"); - PMIX_RELEASE(msg); - peer->send_msg = NULL; - } else if (PMIX_ERR_RESOURCE_BUSY == rc || - PMIX_ERR_WOULD_BLOCK == rc) { - /* exit this event and let the event lib progress */ - pmix_output_verbose(2, pmix_globals.debug_output, - "ptl:base:send_handler RES BUSY OR WOULD BLOCK"); - return; - } else { - // report the error - pmix_output(0, "ptl:base:peer_send_handler: unable to send message ON SOCKET %d", - peer->sd); - event_del(&peer->send_event); - peer->send_ev_active = false; - PMIX_RELEASE(msg); - peer->send_msg = NULL; - lost_connection(peer, rc); - return; - } - } - - next: /* if current message completed - progress any pending sends by * moving the next in the queue into the "on-deck" position. Note * that this doesn't mean we send the message right now - we will