Take into account Ralph's comments. There is no duplicate functionnality with the rml base. Also fix a stupid typo bug.
This commit was SVN r19667.
Этот коммит содержится в:
родитель
1995fb23cb
Коммит
73edda92db
@ -93,7 +93,7 @@ int ompi_dpm_base_null_dyn_finalize (void);
|
||||
void ompi_dpm_base_null_mark_dyncomm (ompi_communicator_t *comm);
|
||||
int ompi_dpm_base_null_open_port(char *port_name, orte_rml_tag_t given_tag);
|
||||
int ompi_dpm_base_null_parse_port(char *port_name,
|
||||
orte_process_name_t *rproc, orte_rml_tag_t *tag);
|
||||
char **hnp_uri, char **rml_uri, orte_rml_tag_t *tag);
|
||||
int ompi_dpm_base_null_route_to_port(char *rml_uri, orte_process_name_t *rproc);
|
||||
int ompi_dpm_base_null_close_port(char *port_name);
|
||||
|
||||
|
@ -72,7 +72,7 @@ int ompi_dpm_base_null_open_port(char *port_name, orte_rml_tag_t given_tag)
|
||||
}
|
||||
|
||||
int ompi_dpm_base_null_parse_port(char *port_name,
|
||||
orte_process_name_t *rproc, orte_rml_tag_t *tag)
|
||||
char **hnp_uri, char **rml_uri, orte_rml_tag_t *tag)
|
||||
{
|
||||
return OMPI_ERR_NOT_SUPPORTED;
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ typedef int (*ompi_dpm_base_module_open_port_fn_t)(char *port_name, orte_rml_tag
|
||||
* Converts an opaque port string to a RML process nane and tag.
|
||||
*/
|
||||
typedef int (*ompi_dpm_base_module_parse_port_name_t)(char *port_name,
|
||||
orte_process_name_t *rproc,
|
||||
char **hnp_uri, char **rml_uri,
|
||||
orte_rml_tag_t *tag);
|
||||
|
||||
/*
|
||||
|
@ -75,7 +75,7 @@ static int spawn(int count, char **array_of_commands,
|
||||
char *port_name);
|
||||
static int dyn_init(void);
|
||||
static int open_port(char *port_name, orte_rml_tag_t given_tag);
|
||||
static int parse_port_name(char *port_name, orte_process_name_t *rproc,
|
||||
static int parse_port_name(char *port_name, char **hnp_uri, char **rml_uri,
|
||||
orte_rml_tag_t *tag);
|
||||
static int route_to_port(char *rml_uri, orte_process_name_t *rproc);
|
||||
static int close_port(char *port_name);
|
||||
@ -147,17 +147,27 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
* set us up to communicate with it
|
||||
*/
|
||||
if (NULL != port_string && 0 < strlen(port_string)) {
|
||||
/* separate the string into the RML URI and tag */
|
||||
if (ORTE_SUCCESS != (rc = parse_port_name(port_string, &port, &tag))) {
|
||||
char *hnp_uri, *rml_uri;
|
||||
|
||||
/* separate the string into the HNP and RML URI and tag */
|
||||
if (ORTE_SUCCESS != (rc = parse_port_name(port_string, &hnp_uri, &rml_uri, &tag))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
/* make sure we can route rml messages to the destination */
|
||||
if (ORTE_SUCCESS != (rc = route_to_port(port_string, &port))) {
|
||||
/* extract the originating proc's name */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &port, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(hnp_uri); free(rml_uri);
|
||||
return rc;
|
||||
}
|
||||
/* make sure we can route rml messages to the destination job */
|
||||
if (ORTE_SUCCESS != (rc = route_to_port(hnp_uri, &port))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(hnp_uri); free(rml_uri);
|
||||
return rc;
|
||||
}
|
||||
free(hnp_uri); free(rml_uri);
|
||||
}
|
||||
|
||||
/* tell the progress engine to tick the event library more
|
||||
often, to make sure that the OOB messages get sent */
|
||||
@ -788,7 +798,7 @@ static void recv_ack(int status, orte_process_name_t* sender,
|
||||
ORTE_MESSAGE_EVENT(sender, buffer, tag, release_ack);
|
||||
}
|
||||
|
||||
static int route_to_port(char *uri, orte_process_name_t *rproc)
|
||||
static int route_to_port(char *rml_uri, orte_process_name_t *rproc)
|
||||
{
|
||||
opal_buffer_t route;
|
||||
orte_rml_cmd_flag_t cmd = ORTE_RML_UPDATE_CMD;
|
||||
@ -799,18 +809,20 @@ static int route_to_port(char *uri, orte_process_name_t *rproc)
|
||||
*/
|
||||
if (ORTE_JOB_FAMILY(rproc->jobid) == ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm_parse_port: same job family - updating route",
|
||||
"%s dpm_route_to_port: same job family - updating route",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* set the contact info into the hash table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(uri))) {
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(rml_uri))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.update_route(rproc, rproc))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
/* We're good to go */
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* the proc must be part of another job family. In this case, we
|
||||
@ -823,10 +835,10 @@ static int route_to_port(char *uri, orte_process_name_t *rproc)
|
||||
opal_dss.pack(&route, &cmd, 1, ORTE_RML_CMD);
|
||||
|
||||
/* pack the HNP uri */
|
||||
opal_dss.pack(&route, &uri, 1, OPAL_STRING);
|
||||
opal_dss.pack(&route, &rml_uri, 1, OPAL_STRING);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
|
||||
"%s dpm_parse_port: %s in diff job family - sending update to %s",
|
||||
"%s dpm_route_to_port: %s in diff job family - sending update to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(rproc),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
|
||||
@ -859,10 +871,12 @@ static int route_to_port(char *uri, orte_process_name_t *rproc)
|
||||
}
|
||||
|
||||
static int parse_port_name(char *port_name,
|
||||
orte_process_name_t *rproc,
|
||||
orte_rml_tag_t *tag)
|
||||
char **hnp_uri,
|
||||
char **rml_uri,
|
||||
orte_rml_tag_t *ptag)
|
||||
{
|
||||
char *tmpstring=NULL, *ptr, *rml_uri=NULL;
|
||||
char *tmpstring=NULL, *ptr;
|
||||
int tag;
|
||||
int rc;
|
||||
|
||||
/* don't mangle the port name */
|
||||
@ -879,7 +893,7 @@ static int parse_port_name(char *port_name,
|
||||
ptr++;
|
||||
|
||||
/* convert the RML tag */
|
||||
sscanf(ptr,"%d", (int*)tag);
|
||||
sscanf(ptr,"%d", &tag);
|
||||
|
||||
/* now split out the second field - the uri of the remote proc */
|
||||
if (NULL == (ptr = strchr(tmpstring, '+'))) {
|
||||
@ -890,23 +904,18 @@ static int parse_port_name(char *port_name,
|
||||
ptr++;
|
||||
|
||||
/* save that info */
|
||||
rml_uri = strdup(ptr);
|
||||
if(NULL != hnp_uri) *hnp_uri = tmpstring;
|
||||
else free(tmpstring);
|
||||
if(NULL != rml_uri) *rml_uri = strdup(ptr);
|
||||
if(NULL != ptag) *ptag = tag;
|
||||
|
||||
/* extract the originating proc's name */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(ptr, rproc, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
return ORTE_SUCCESS;
|
||||
|
||||
cleanup:
|
||||
/* release the tmp storage */
|
||||
if (NULL != tmpstring) {
|
||||
free(tmpstring);
|
||||
}
|
||||
if (NULL != rml_uri) {
|
||||
free(rml_uri);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user