Add a few new MPI_Info options to the dpm - documentation to follow.
Fix a mistake in the dpm that hardcoded the update of routes to the HNP. This needs to be done by the individual routing modules so they can take whatever action is required - which will usually include updating the HNP, but might not...and might include additional steps. New routing modules are coming that violated this assumption, so it had to be moved back into init_routes. All current routed modules know what to do - anyone with routed modules not in the current trunk may need to adjust them (see any of the current routed modules for examples of what to do). This commit was SVN r20427.
Этот коммит содержится в:
родитель
e694c0dac6
Коммит
b100513022
@ -552,7 +552,8 @@ static int spawn(int count, char **array_of_commands,
|
||||
}
|
||||
/* record the number of procs to be generated */
|
||||
app->num_procs = array_of_maxprocs[i];
|
||||
|
||||
jdata->num_procs += app->num_procs;
|
||||
|
||||
/* copy over the argv array */
|
||||
counter = 1;
|
||||
|
||||
@ -588,8 +589,7 @@ static int spawn(int count, char **array_of_commands,
|
||||
/* Add environment variable with the contact information for the
|
||||
child processes.
|
||||
*/
|
||||
counter = 1;
|
||||
app->env = (char**)malloc((1+counter) * sizeof(char*));
|
||||
app->env = (char**)malloc(2 * sizeof(char*));
|
||||
if (NULL == app->env) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
OBJ_RELEASE(jdata);
|
||||
@ -649,9 +649,27 @@ static int spawn(int count, char **array_of_commands,
|
||||
*/
|
||||
ompi_info_get_bool(array_of_info[i], "ompi_local_slave", &local_spawn, &flag);
|
||||
if ( local_spawn ) {
|
||||
jdata->controls |= ORTE_JOB_CONTROL_LOCAL_SPAWN;
|
||||
jdata->controls |= ORTE_JOB_CONTROL_LOCAL_SLAVE;
|
||||
}
|
||||
|
||||
|
||||
/* check for 'preload_binary' */
|
||||
ompi_info_get_bool(array_of_info[i], "ompi_preload_binary", &local_spawn, &flag);
|
||||
if ( flag ) {
|
||||
app->preload_binary = true;
|
||||
}
|
||||
|
||||
/* check for 'preload_files' */
|
||||
ompi_info_get (array_of_info[i], "ompi_preload_files", valuelen, cwd, &flag);
|
||||
if ( flag ) {
|
||||
app->preload_files = strdup(cwd);
|
||||
}
|
||||
|
||||
/* check for 'preload_files_dest_dir' */
|
||||
ompi_info_get (array_of_info[i], "ompi_preload_files_dest_dir", valuelen, cwd, &flag);
|
||||
if ( flag ) {
|
||||
app->preload_files_dest_dir = strdup(cwd);
|
||||
}
|
||||
|
||||
/* see if this is a non-mpi job - if so, then set the flag so ORTE
|
||||
* knows what to do
|
||||
*/
|
||||
@ -780,101 +798,32 @@ cleanup:
|
||||
}
|
||||
|
||||
|
||||
/* HANDLE ACK MESSAGES FROM AN HNP */
|
||||
static bool ack_recvd;
|
||||
|
||||
static void release_ack(int fd, short event, void *data)
|
||||
{
|
||||
orte_message_event_t *mev = (orte_message_event_t*)data;
|
||||
ack_recvd = true;
|
||||
OBJ_RELEASE(mev);
|
||||
}
|
||||
|
||||
static void recv_ack(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
/* don't process this right away - we need to get out of the recv before
|
||||
* we process the message as it may ask us to do something that involves
|
||||
* more messaging! Instead, setup an event so that the message gets processed
|
||||
* as soon as we leave the recv.
|
||||
*
|
||||
* The macro makes a copy of the buffer, which we release above - the incoming
|
||||
* buffer, however, is NOT released here, although its payload IS transferred
|
||||
* to the message buffer for later processing
|
||||
*/
|
||||
ORTE_MESSAGE_EVENT(sender, buffer, tag, release_ack);
|
||||
}
|
||||
|
||||
static int route_to_port(char *rml_uri, orte_process_name_t *rproc)
|
||||
{
|
||||
opal_buffer_t route;
|
||||
orte_rml_cmd_flag_t cmd = ORTE_RML_UPDATE_CMD;
|
||||
int rc;
|
||||
|
||||
/* if this proc is part of my job family, then I need to
|
||||
* update my RML contact hash table and my routes
|
||||
*/
|
||||
if (ORTE_JOB_FAMILY(rproc->jobid) == ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm_route_to_port: same job family - updating route",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* set the contact info into the hash table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(rml_uri))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.update_route(rproc, rproc))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* We're good to go */
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* the proc must be part of another job family. In this case, we
|
||||
* will route any messages to the proc through our HNP. We need
|
||||
* to update the HNP, though, so it knows how to reach the
|
||||
* HNP of the rproc's job family
|
||||
/* We need to ask the routed module to init_routes so it can do the
|
||||
* right thing. In most cases, it will route any messages to the
|
||||
* proc through our HNP - however, this is NOT the case in all
|
||||
* circumstances, so we need to let the routed module decide what
|
||||
* to do.
|
||||
*/
|
||||
/* pack a cmd so the buffer can be unpacked correctly */
|
||||
OBJ_CONSTRUCT(&route, opal_buffer_t);
|
||||
opal_dss.pack(&route, &cmd, 1, ORTE_RML_CMD);
|
||||
|
||||
/* pack the HNP uri */
|
||||
/* pack the provided uri */
|
||||
opal_dss.pack(&route, &rml_uri, 1, OPAL_STRING);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm_route_to_port: %s in diff job family - sending update to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(rproc),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
|
||||
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &route,
|
||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||
/* init the route */
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.init_routes(rproc->jobid, &route))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&route);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* wait right here until the HNP acks the update to ensure that
|
||||
* any subsequent messaging can succeed
|
||||
*/
|
||||
ack_recvd = false;
|
||||
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
|
||||
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
|
||||
|
||||
ORTE_PROGRESSED_WAIT(ack_recvd, 0, 1);
|
||||
OBJ_DESTRUCT(&route);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm_parse_port: ack recvd",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* our get_route function automatically routes all messages for
|
||||
* other job families via the HNP, so nothing more to do here
|
||||
*/
|
||||
/* nothing more to do here */
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user