1
1

Merge pull request #2762 from rhc54/topic/oobfast

Speed-up the OOB/TCP communications by using writev instead of writing the header, and then separately write the body
Этот коммит содержится в:
Ralph Castain 2017-01-19 15:39:06 -08:00 коммит произвёл GitHub
родитель 63caeba84d e5f687f896
Коммит ca50b31de1

Просмотреть файл

@ -13,7 +13,7 @@
* All rights reserved.
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -75,47 +75,89 @@
#include "orte/mca/oob/tcp/oob_tcp_common.h"
#include "orte/mca/oob/tcp/oob_tcp_connection.h"
static int send_bytes(mca_oob_tcp_peer_t* peer)
static int send_msg(mca_oob_tcp_peer_t* peer, mca_oob_tcp_send_t* msg)
{
mca_oob_tcp_send_t* msg = peer->send_msg;
int rc;
struct iovec iov[2];
int iov_count;
ssize_t remain = msg->sdbytes, rc;
OPAL_TIMING_EVENT((&tm_oob, "to %s %d bytes",
ORTE_NAME_PRINT(&(peer->name)), msg->sdbytes));
while (0 < msg->sdbytes) {
rc = write(peer->sd, msg->sdptr, msg->sdbytes);
if (rc < 0) {
if (opal_socket_errno == EINTR) {
continue;
} else if (opal_socket_errno == EAGAIN) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return ORTE_ERR_RESOURCE_BUSY;
} else if (opal_socket_errno == EWOULDBLOCK) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return ORTE_ERR_WOULD_BLOCK;
}
/* we hit an error and cannot progress this message */
opal_output(0, "%s->%s mca_oob_tcp_msg_send_bytes: write failed: %s (%d) [sd = %d]",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
strerror(opal_socket_errno),
opal_socket_errno,
peer->sd);
return ORTE_ERR_COMM_FAILURE;
iov[0].iov_base = msg->sdptr;
iov[0].iov_len = msg->sdbytes;
if (!msg->hdr_sent) {
if (NULL != msg->data) {
/* relay message - just send that data */
iov[1].iov_base = msg->data;
} else if (NULL != msg->msg->buffer) {
/* buffer send */
iov[1].iov_base = msg->msg->buffer->base_ptr;
} else {
iov[1].iov_base = msg->msg->data;
}
/* update location */
msg->sdbytes -= rc;
msg->sdptr += rc;
iov[1].iov_len = ntohl(msg->hdr.nbytes);
remain += ntohl(msg->hdr.nbytes);
iov_count = 2;
} else {
iov_count = 1;
}
retry:
rc = writev(peer->sd, iov, iov_count);
if (OPAL_LIKELY(rc == remain)) {
/* we successfully sent the header and the msg data if any */
msg->hdr_sent = true;
msg->sdbytes = 0;
msg->sdptr = (char *)iov[iov_count-1].iov_base + iov[iov_count-1].iov_len;
return ORTE_SUCCESS;
} else if (rc < 0) {
if (opal_socket_errno == EINTR) {
goto retry;
} else if (opal_socket_errno == EAGAIN) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return ORTE_ERR_RESOURCE_BUSY;
} else if (opal_socket_errno == EWOULDBLOCK) {
/* tell the caller to keep this message on active,
* but let the event lib cycle so other messages
* can progress while this socket is busy
*/
return ORTE_ERR_WOULD_BLOCK;
} else {
/* we hit an error and cannot progress this message */
opal_output(0, "oob:tcp: send_msg: write failed: %s (%d) [sd = %d]",
strerror(opal_socket_errno),
opal_socket_errno, peer->sd);
return ORTE_ERR_UNREACH;
}
} else {
/* short writev. This usually means the kernel buffer is full,
* so there is no point for retrying at that time.
* simply update the msg and return with PMIX_ERR_RESOURCE_BUSY */
if ((size_t)rc < msg->sdbytes) {
/* partial write of the header or the msg data */
msg->sdptr = (char *)msg->sdptr + rc;
msg->sdbytes -= rc;
} else {
/* header was fully written, but only a part of the msg data was written */
msg->hdr_sent = true;
rc -= msg->sdbytes;
if (NULL != msg->data) {
/* technically, this should never happen as iov_count
* would be 1 for a zero-byte message, and so we cannot
* have a case where we write the header and part of the
* msg. However, code checkers don't know that and are
* fooled by our earlier check for NULL, and so
* we silence their warnings by using this check */
msg->sdptr = (char *)msg->data + rc;
}
msg->sdbytes = ntohl(msg->hdr.nbytes) - rc;
}
return ORTE_ERR_RESOURCE_BUSY;
}
/* we sent the full data block */
return ORTE_SUCCESS;
}
/*
@ -155,68 +197,56 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == peer->send_msg) ? "NULL" : ORTE_NAME_PRINT(&peer->name));
if (NULL != msg) {
/* if the header hasn't been completely sent, send it */
if (!msg->hdr_sent) {
if (ORTE_SUCCESS == (rc = send_bytes(peer))) {
/* header is completely sent */
msg->hdr_sent = true;
/* setup to send the data */
if (NULL != msg->data) {
/* relay msg - send that data */
msg->sdptr = msg->data;
msg->sdbytes = (int)ntohl(msg->hdr.nbytes);
} else if (NULL == msg->msg) {
/* this was a zero-byte relay - nothing more to do */
OBJ_RELEASE(msg);
peer->send_msg = NULL;
goto next;
} else if (NULL != msg->msg->buffer) {
/* send the buffer data as a single block */
msg->sdptr = msg->msg->buffer->base_ptr;
msg->sdbytes = msg->msg->buffer->bytes_used;
} else if (NULL != msg->msg->iov) {
/* start with the first iovec */
msg->sdptr = msg->msg->iov[0].iov_base;
msg->sdbytes = msg->msg->iov[0].iov_len;
msg->iovnum = 0;
} else {
/* just send the data */
msg->sdptr = msg->msg->data;
msg->sdbytes = msg->msg->count;
}
/* fall thru and let the send progress */
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
ORTE_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
return;
} else {
// report the error
opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send header",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)));
opal_event_del(&peer->send_event);
msg->msg->status = rc;
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"oob:tcp:send_handler SENDING MSG");
if (ORTE_SUCCESS == (rc = send_msg(peer, msg))) {
/* this msg is complete */
if (NULL != msg->data || NULL == msg->msg) {
/* the relay is complete - release the data */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
(int)ntohl(msg->hdr.nbytes), peer->sd);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
} else if (NULL != msg->msg->buffer) {
/* we are done - notify the RML */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
(int)ntohl(msg->hdr.nbytes), peer->sd);
msg->msg->status = ORTE_SUCCESS;
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
goto next;
}
}
/* progress the data transmission */
if (msg->hdr_sent) {
if (ORTE_SUCCESS == (rc = send_bytes(peer))) {
/* this block is complete */
if (NULL != msg->data || NULL == msg->msg) {
/* the relay is complete - release the data */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
(int)ntohl(msg->hdr.nbytes), peer->sd);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
} else if (NULL != msg->msg->buffer) {
/* we are done - notify the RML */
} else if (NULL != msg->msg->data) {
/* this was a relay we have now completed - no need to
* notify the RML as the local proc didn't initiate
* the send
*/
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
(int)ntohl(msg->hdr.nbytes), peer->sd);
msg->msg->status = ORTE_SUCCESS;
OBJ_RELEASE(msg);
peer->send_msg = NULL;
} else {
/* rotate to the next iovec */
msg->iovnum++;
if (msg->iovnum < msg->msg->count) {
msg->sdptr = msg->msg->iov[msg->iovnum].iov_base;
msg->sdbytes = msg->msg->iov[msg->iovnum].iov_len;
/* exit this event to give the event lib
* a chance to progress any other pending
* actions
*/
return;
} else {
/* this message is complete - notify the RML */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -226,64 +256,27 @@ void mca_oob_tcp_send_handler(int sd, short flags, void *cbdata)
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
} else if (NULL != msg->msg->data) {
/* this was a relay we have now completed - no need to
* notify the RML as the local proc didn't initiate
* the send
*/
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE RELAY COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
(int)ntohl(msg->hdr.nbytes), peer->sd);
msg->msg->status = ORTE_SUCCESS;
OBJ_RELEASE(msg);
peer->send_msg = NULL;
} else {
/* rotate to the next iovec */
msg->iovnum++;
if (msg->iovnum < msg->msg->count) {
msg->sdptr = msg->msg->iov[msg->iovnum].iov_base;
msg->sdbytes = msg->msg->iov[msg->iovnum].iov_len;
/* exit this event to give the event lib
* a chance to progress any other pending
* actions
*/
return;
} else {
/* this message is complete - notify the RML */
opal_output_verbose(2, orte_oob_base_framework.framework_output,
"%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)),
(int)ntohl(msg->hdr.nbytes), peer->sd);
msg->msg->status = ORTE_SUCCESS;
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
}
}
/* fall thru to queue the next message */
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
ORTE_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
return;
} else {
// report the error
opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send message ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)), peer->sd);
opal_event_del(&peer->send_event);
msg->msg->status = rc;
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
ORTE_FORCED_TERMINATE(1);
return;
}
/* fall thru to queue the next message */
} else if (ORTE_ERR_RESOURCE_BUSY == rc ||
ORTE_ERR_WOULD_BLOCK == rc) {
/* exit this event and let the event lib progress */
return;
} else {
// report the error
opal_output(0, "%s-%s mca_oob_tcp_peer_send_handler: unable to send message ON SOCKET %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&(peer->name)), peer->sd);
opal_event_del(&peer->send_event);
msg->msg->status = rc;
ORTE_RML_SEND_COMPLETE(msg->msg);
OBJ_RELEASE(msg);
peer->send_msg = NULL;
ORTE_FORCED_TERMINATE(1);
return;
}
next:
/* if current message completed - progress any pending sends by
* moving the next in the queue into the "on-deck" position. Note
* that this doesn't mean we send the message right now - we will
@ -657,4 +650,3 @@ static void err_cons(mca_oob_tcp_msg_error_t *ptr)
OBJ_CLASS_INSTANCE(mca_oob_tcp_msg_error_t,
opal_object_t,
err_cons, NULL);