Fix a race condition in the orted recv/process procedure.
Thx to Tim P for spotting it This commit was SVN r17666.
Этот коммит содержится в:
родитель
824c298abf
Коммит
a1eef0dd50
@ -81,10 +81,6 @@
|
||||
/*
|
||||
* Globals
|
||||
*/
|
||||
static bool send_relay_buffer;
|
||||
static opal_buffer_t *relay_buf;
|
||||
static orte_grpcomm_mode_t relay_mode;
|
||||
|
||||
static int process_commands(orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag);
|
||||
@ -99,12 +95,27 @@ static void send_callback(int status, orte_process_name_t *peer,
|
||||
OBJ_RELEASE(buf);
|
||||
}
|
||||
|
||||
static void send_relay(void)
|
||||
static void send_relay(int fd, short event, void *data)
|
||||
{
|
||||
orte_message_event_t *mev = (orte_message_event_t*)data;
|
||||
opal_buffer_t *buffer = mev->buffer;
|
||||
orte_rml_tag_t tag = mev->tag;
|
||||
orte_grpcomm_mode_t relay_mode;
|
||||
opal_list_t recips;
|
||||
opal_list_item_t *item;
|
||||
int ret;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* we pass the relay_mode in the mev "tag" field. This is a bit
|
||||
* of a hack as the two sizes may not exactly match. However, since
|
||||
* the rml_tag is an int32, it is doubtful we will ever see a
|
||||
* truncation error
|
||||
*/
|
||||
relay_mode = (orte_grpcomm_mode_t)tag;
|
||||
|
||||
/* setup a list of next recipients */
|
||||
OBJ_CONSTRUCT(&recips, opal_list_t);
|
||||
/* ask the active grpcomm module for the next recipients */
|
||||
@ -117,7 +128,13 @@ static void send_relay(void)
|
||||
*/
|
||||
while (NULL != (item = opal_list_remove_first(&recips))) {
|
||||
orte_namelist_t *target = (orte_namelist_t*)item;
|
||||
if (0 > (ret = orte_rml.send_buffer(&target->name, relay_buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:send_relay sending relay msg to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&target->name)));
|
||||
|
||||
if (0 > (ret = orte_rml.send_buffer(&target->name, buffer, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
@ -125,13 +142,9 @@ static void send_relay(void)
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
/* ensure that we don't try to do this again */
|
||||
send_relay_buffer = false;
|
||||
/* cleanup */
|
||||
OBJ_DESTRUCT(&recips);
|
||||
OBJ_RELEASE(relay_buf);
|
||||
/* make sure these messages have a chance to get out */
|
||||
opal_progress();
|
||||
OBJ_RELEASE(mev);
|
||||
}
|
||||
|
||||
void orte_daemon_recv(int status, orte_process_name_t* sender,
|
||||
@ -197,41 +210,42 @@ void orte_daemon_cmd_processor(int fd, short event, void *data)
|
||||
|
||||
/* see if this is a "process-and-relay" command */
|
||||
if (ORTE_DAEMON_PROCESS_AND_RELAY_CMD == command) {
|
||||
/* set flag so we can relay the buffer - after we process it! */
|
||||
send_relay_buffer = true;
|
||||
opal_buffer_t relay_buf;
|
||||
orte_grpcomm_mode_t relay_mode;
|
||||
orte_rml_tag_t relay_tag;
|
||||
|
||||
/* unpack the routing mode in use */
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &relay_mode, &n, ORTE_GRPCOMM_MODE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* we are going to use the message event's tag field to pass the relay mode
|
||||
* see comment in send_relay
|
||||
*/
|
||||
relay_tag = (orte_rml_tag_t)relay_mode;
|
||||
/* initialize the relay buffer */
|
||||
relay_buf = OBJ_NEW(opal_buffer_t);
|
||||
if (NULL == relay_buf) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
goto PROCESS;
|
||||
}
|
||||
OBJ_CONSTRUCT(&relay_buf, opal_buffer_t);
|
||||
/* tell the downstream daemons to process-and-relay */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(relay_buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&relay_buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto PROCESS;
|
||||
}
|
||||
/* tell the downstream daemons the routing algo in use */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(relay_buf, &relay_mode, 1, ORTE_GRPCOMM_MODE))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&relay_buf, &relay_mode, 1, ORTE_GRPCOMM_MODE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* copy the message payload to the relay buffer - this is non-destructive */
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(relay_buf, buffer))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.copy_payload(&relay_buf, buffer))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
goto PROCESS;
|
||||
}
|
||||
/* the remainder of the buffer contains the message to be processed */
|
||||
/* setup an event to actually perform the relay */
|
||||
ORTE_MESSAGE_EVENT(sender, &relay_buf, relay_tag, send_relay);
|
||||
} else {
|
||||
/* rewind the buffer so we can process it correctly */
|
||||
buffer->unpack_ptr = unpack_ptr;
|
||||
/* no relay */
|
||||
send_relay_buffer = false;
|
||||
}
|
||||
|
||||
PROCESS:
|
||||
@ -245,16 +259,7 @@ PROCESS:
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:cmd:processor: processing commands completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if we need to relay, do so now */
|
||||
if (send_relay_buffer) {
|
||||
send_relay();
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
|
||||
"%s orte:daemon:cmd:processor: relay completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
|
||||
CLEANUP:
|
||||
OBJ_RELEASE(mev);
|
||||
return;
|
||||
@ -432,13 +437,10 @@ static int process_commands(orte_process_name_t* sender,
|
||||
opal_output(0, "%s orted_cmd: received exit",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
}
|
||||
/* if we need to relay a buffer, we need to do it
|
||||
* now as the wakeup trigger will cause us to exit
|
||||
/* trigger our appropriate exit procedure
|
||||
* NOTE: this event will fire -after- any zero-time events
|
||||
* so any pending relays -do- get sent first
|
||||
*/
|
||||
if (send_relay_buffer) {
|
||||
send_relay();
|
||||
}
|
||||
/* trigger our appropriate exit procedure */
|
||||
orte_wakeup(0);
|
||||
return ORTE_SUCCESS;
|
||||
break;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user