1
1

Allow the user to set the send/recv buf size for udp. Don't declare existing nb recvs to be an error.

This commit was SVN r23210.
Этот коммит содержится в:
Ralph Castain 2010-05-26 14:29:36 +00:00
родитель a1bc589f23
Коммит 4ce07ace61
5 изменённых файлов: 55 добавлений и 17 удалений

Просмотреть файл

@ -569,7 +569,10 @@ static int spread_recv_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
cbfunc, NULL, cbdata, false))) {
ORTE_ERROR_LOG(ret);
if (ORTE_EXISTS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
}
return ORTE_SUCCESS;
@ -644,8 +647,10 @@ static int spread_recv_buffer_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
NULL, cbfunc, cbdata, false))) {
ORTE_ERROR_LOG(ret);
return ret;
if (ORTE_EXISTS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
}
return ORTE_SUCCESS;

Просмотреть файл

@ -571,7 +571,11 @@ static int tcp_recv_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
cbfunc, NULL, cbdata, false))) {
ORTE_ERROR_LOG(ret);
if (ORTE_EXISTS == ret) {
ret = ORTE_SUCCESS;
} else {
ORTE_ERROR_LOG(ret);
}
}
return ret;
@ -645,8 +649,10 @@ static int tcp_recv_buffer_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
NULL, cbfunc, cbdata, false))) {
ORTE_ERROR_LOG(ret);
return ret;
if (ORTE_EXISTS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
}
return ORTE_SUCCESS;

Просмотреть файл

@ -465,7 +465,10 @@ static int udp_recv_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
cbfunc, NULL, cbdata, false))) {
ORTE_ERROR_LOG(ret);
if (ORTE_EXISTS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
}
return ORTE_SUCCESS;
@ -539,8 +542,10 @@ static int udp_recv_buffer_nb(orte_rmcast_channel_t channel,
if (ORTE_SUCCESS != (ret = orte_rmcast_base_queue_recv(NULL, chan, tag, flags,
NULL, cbfunc, cbdata, false))) {
ORTE_ERROR_LOG(ret);
return ret;
if (ORTE_EXISTS != ret) {
ORTE_ERROR_LOG(ret);
return ret;
}
}
return ORTE_SUCCESS;
@ -677,8 +682,8 @@ static void recv_handler(int sd, short flags, void* cbdata)
opal_buffer_t *buf;
/* read the data */
data = (uint8_t*)malloc(ORTE_RMCAST_UDP_MTU * sizeof(uint8_t));
sz = read(sd, data, ORTE_RMCAST_UDP_MTU);
data = (uint8_t*)malloc(orte_rmcast_udp_sndbuf_size * sizeof(uint8_t));
sz = read(sd, data, orte_rmcast_udp_sndbuf_size);
if (sz <= 0) {
/* this shouldn't happen - report the errno */
@ -733,7 +738,7 @@ static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction)
return rc;
}
chan->xmit = xmitsd;
chan->send_data = (uint8_t*)malloc(ORTE_RMCAST_UDP_MTU);
chan->send_data = (uint8_t*)malloc(orte_rmcast_udp_sndbuf_size);
/* setup the event to xmit messages, but don't activate it */
opal_event_set(&chan->send_ev, chan->xmit, OPAL_EV_WRITE, xmit_data, chan);
}
@ -834,8 +839,8 @@ static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket)
return ORTE_ERROR;
}
/* set the recvbuf size */
flags = 10*ORTE_RMCAST_UDP_MTU;
if ((setsockopt(target_sd, SOL_SOCKET, SO_RCVBUF, &flags, sizeof(flags))) < 0) {
if ((setsockopt(target_sd, SOL_SOCKET, SO_RCVBUF,
&orte_rmcast_udp_rcvbuf_size, sizeof(orte_rmcast_udp_rcvbuf_size))) < 0) {
opal_output(0, "%s rmcast:init: setsockopt() failed on SO_RCVBUF\n"
"\tfor multicast network %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d\n\tError: %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -865,8 +870,8 @@ static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket)
return ORTE_ERROR;
}
/* set the sendbuf size */
flags = ORTE_RMCAST_UDP_MTU;
if ((setsockopt(target_sd, SOL_SOCKET, SO_SNDBUF, &flags, sizeof(flags))) < 0) {
if ((setsockopt(target_sd, SOL_SOCKET, SO_SNDBUF,
&orte_rmcast_udp_sndbuf_size, sizeof(orte_rmcast_udp_sndbuf_size))) < 0) {
opal_output(0, "%s rmcast:init: setsockopt() failed on SO_SNDBUF\n"
"\tfor multicast network %03d.%03d.%03d.%03d interface %03d.%03d.%03d.%03d\n\tError: %s (%d)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),

Просмотреть файл

@ -24,11 +24,13 @@
BEGIN_C_DECLS
#define ORTE_RMCAST_UDP_MTU 65536
#define ORTE_RMCAST_UDP_DEFAULT_SNDBUF_SIZE 65536
ORTE_MODULE_DECLSPEC extern orte_rmcast_base_component_t mca_rmcast_udp_component;
extern orte_rmcast_module_t orte_rmcast_udp_module;
ORTE_MODULE_DECLSPEC extern int orte_rmcast_udp_sndbuf_size;
ORTE_MODULE_DECLSPEC extern int orte_rmcast_udp_rcvbuf_size;
END_C_DECLS
#endif

Просмотреть файл

@ -34,6 +34,8 @@ static int orte_rmcast_udp_query(mca_base_module_t **module, int *priority);
* Local variables
*/
static bool initialized = false;
int orte_rmcast_udp_sndbuf_size;
int orte_rmcast_udp_rcvbuf_size;
/*
* Public string showing the rmcast udp component version number
@ -66,6 +68,24 @@ orte_rmcast_base_component_t mca_rmcast_udp_component = {
*/
static int orte_rmcast_udp_open(void)
{
mca_base_component_t *c = &mca_rmcast_udp_component.version;
int value;
mca_base_param_reg_int(c, "sndbuf_size",
"Size of send buffer in Kbytes (must be > 0)",
false, false,
ORTE_RMCAST_UDP_DEFAULT_SNDBUF_SIZE, &value);
orte_rmcast_udp_sndbuf_size = 1024*value;
orte_rmcast_udp_rcvbuf_size = 16 * orte_rmcast_udp_sndbuf_size;
mca_base_param_reg_int(c, "rcvbuf_size",
"Size of recv buffer in Kbytes (default: 16xsndbuf)",
false, false,
orte_rmcast_udp_rcvbuf_size, &value);
if (value != orte_rmcast_udp_rcvbuf_size) {
orte_rmcast_udp_rcvbuf_size = 1024 * value;
}
return ORTE_SUCCESS;
}