1
1

Add functions to access the opaque port_string and to add routes to a remote port. This is usefull for FT, but could also turn usefull when considering MPI3 extentions to the MPI2 dynamics.

This commit was SVN r19653.
Этот коммит содержится в:
Aurelien Bouteiller 2008-09-27 13:22:32 +00:00
родитель 8b786cac04
Коммит 89a2eea37a
5 изменённых файлов: 117 добавлений и 70 удалений

Просмотреть файл

@ -92,6 +92,9 @@ int ompi_dpm_base_null_dyn_init(void);
int ompi_dpm_base_null_dyn_finalize (void); int ompi_dpm_base_null_dyn_finalize (void);
void ompi_dpm_base_null_mark_dyncomm (ompi_communicator_t *comm); void ompi_dpm_base_null_mark_dyncomm (ompi_communicator_t *comm);
int ompi_dpm_base_null_open_port(char *port_name, orte_rml_tag_t given_tag); int ompi_dpm_base_null_open_port(char *port_name, orte_rml_tag_t given_tag);
int ompi_dpm_base_null_parse_port(char *port_name,
orte_process_name_t *rproc, orte_rml_tag_t *tag);
int ompi_dpm_base_null_route_to_port(char *rml_uri, orte_process_name_t *rproc);
int ompi_dpm_base_null_close_port(char *port_name); int ompi_dpm_base_null_close_port(char *port_name);
/* useful globals */ /* useful globals */

Просмотреть файл

@ -36,6 +36,7 @@ int ompi_dpm_base_null_connect_accept (ompi_communicator_t *comm, int root,
{ {
return OMPI_ERR_NOT_SUPPORTED; return OMPI_ERR_NOT_SUPPORTED;
} }
void ompi_dpm_base_null_disconnect(ompi_communicator_t *comm) void ompi_dpm_base_null_disconnect(ompi_communicator_t *comm)
{ {
return; return;
@ -70,6 +71,17 @@ int ompi_dpm_base_null_open_port(char *port_name, orte_rml_tag_t given_tag)
return OMPI_ERR_NOT_SUPPORTED; return OMPI_ERR_NOT_SUPPORTED;
} }
int ompi_dpm_base_null_parse_port(char *port_name,
orte_process_name_t *rproc, orte_rml_tag_t *tag)
{
return OMPI_ERR_NOT_SUPPORTED;
}
int ompi_dpm_base_null_route_to_port(char *rml_uri, orte_process_name_t *rproc)
{
return OMPI_ERR_NOT_SUPPORTED;
}
int ompi_dpm_base_null_close_port(char *port_name) int ompi_dpm_base_null_close_port(char *port_name)
{ {
return OMPI_ERR_NOT_SUPPORTED; return OMPI_ERR_NOT_SUPPORTED;

Просмотреть файл

@ -42,6 +42,8 @@ OMPI_DECLSPEC ompi_dpm_base_module_t ompi_dpm = {
ompi_dpm_base_null_dyn_finalize, ompi_dpm_base_null_dyn_finalize,
ompi_dpm_base_null_mark_dyncomm, ompi_dpm_base_null_mark_dyncomm,
ompi_dpm_base_null_open_port, ompi_dpm_base_null_open_port,
ompi_dpm_base_null_parse_port,
ompi_dpm_base_null_route_to_port,
ompi_dpm_base_null_close_port, ompi_dpm_base_null_close_port,
NULL NULL
}; };

Просмотреть файл

@ -58,6 +58,8 @@ BEGIN_C_DECLS
#define OMPI_RML_TAG_DYNAMIC OMPI_RML_TAG_BASE+200 #define OMPI_RML_TAG_DYNAMIC OMPI_RML_TAG_BASE+200
/* /*
* Initialize a module * Initialize a module
*/ */
@ -115,6 +117,20 @@ typedef void (*ompi_dpm_base_module_mark_dyncomm_fn_t)(ompi_communicator_t *comm
*/ */
typedef int (*ompi_dpm_base_module_open_port_fn_t)(char *port_name, orte_rml_tag_t tag); typedef int (*ompi_dpm_base_module_open_port_fn_t)(char *port_name, orte_rml_tag_t tag);
/*
* Converts an opaque port string to a RML process nane and tag.
*/
typedef int (*ompi_dpm_base_module_parse_port_name_t)(char *port_name,
orte_process_name_t *rproc,
orte_rml_tag_t *tag);
/*
* Update the routed component to make sure that the RML can send messages to
* the remote port
*/
typedef int (*ompi_dpm_base_module_route_to_port_t)(char *rml_uri, orte_process_name_t *rproc);
/* /*
* Close a port * Close a port
*/ */
@ -145,6 +161,10 @@ struct ompi_dpm_base_module_1_0_0_t {
ompi_dpm_base_module_mark_dyncomm_fn_t mark_dyncomm; ompi_dpm_base_module_mark_dyncomm_fn_t mark_dyncomm;
/* open port */ /* open port */
ompi_dpm_base_module_open_port_fn_t open_port; ompi_dpm_base_module_open_port_fn_t open_port;
/* parse port string */
ompi_dpm_base_module_parse_port_name_t parse_port;
/* update route to a port */
ompi_dpm_base_module_route_to_port_t route_to_port;
/* close port */ /* close port */
ompi_dpm_base_module_close_port_fn_t close_port; ompi_dpm_base_module_close_port_fn_t close_port;
/* finalize */ /* finalize */

Просмотреть файл

@ -61,9 +61,6 @@ static void recv_cb(int status, orte_process_name_t* sender,
opal_buffer_t *buffer, opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata); orte_rml_tag_t tag, void *cbdata);
static void process_cb(int fd, short event, void *data); static void process_cb(int fd, short event, void *data);
static int parse_port_name(char *port_name,
orte_process_name_t *rproc,
orte_rml_tag_t *tag);
/* API functions */ /* API functions */
static int init(void); static int init(void);
@ -78,6 +75,9 @@ static int spawn(int count, char **array_of_commands,
char *port_name); char *port_name);
static int dyn_init(void); static int dyn_init(void);
static int open_port(char *port_name, orte_rml_tag_t given_tag); static int open_port(char *port_name, orte_rml_tag_t given_tag);
static int parse_port_name(char *port_name, orte_process_name_t *rproc,
orte_rml_tag_t *tag);
static int route_to_port(char *rml_uri, orte_process_name_t *rproc);
static int close_port(char *port_name); static int close_port(char *port_name);
static int finalize(void); static int finalize(void);
@ -93,6 +93,8 @@ ompi_dpm_base_module_t ompi_dpm_orte_module = {
ompi_dpm_base_dyn_finalize, ompi_dpm_base_dyn_finalize,
ompi_dpm_base_mark_dyncomm, ompi_dpm_base_mark_dyncomm,
open_port, open_port,
parse_port_name,
route_to_port,
close_port, close_port,
finalize finalize
}; };
@ -145,14 +147,17 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
* set us up to communicate with it * set us up to communicate with it
*/ */
if (NULL != port_string && 0 < strlen(port_string)) { if (NULL != port_string && 0 < strlen(port_string)) {
/* separate the string into the RML URI and tag - this function performs /* separate the string into the RML URI and tag */
* whatever route initialization is required by the selected routed module
*/
if (ORTE_SUCCESS != (rc = parse_port_name(port_string, &port, &tag))) { if (ORTE_SUCCESS != (rc = parse_port_name(port_string, &port, &tag))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
} }
} }
/* make sure we can route rml messages to the destination */
if (ORTE_SUCCESS != (rc = route_to_port(port_string, &port))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* tell the progress engine to tick the event library more /* tell the progress engine to tick the event library more
often, to make sure that the OOB messages get sent */ often, to make sure that the OOB messages get sent */
@ -783,15 +788,82 @@ static void recv_ack(int status, orte_process_name_t* sender,
ORTE_MESSAGE_EVENT(sender, buffer, tag, release_ack); ORTE_MESSAGE_EVENT(sender, buffer, tag, release_ack);
} }
static int route_to_port(char *uri, orte_process_name_t *rproc)
{
opal_buffer_t route;
orte_rml_cmd_flag_t cmd = ORTE_RML_UPDATE_CMD;
int rc;
/* if this proc is part of my job family, then I need to
* update my RML contact hash table and my routes
*/
if (ORTE_JOB_FAMILY(rproc->jobid) == ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm_parse_port: same job family - updating route",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(uri))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_routed.update_route(rproc, rproc))) {
ORTE_ERROR_LOG(rc);
}
return ORTE_SUCCESS;
}
/* the proc must be part of another job family. In this case, we
* will route any messages to the proc through our HNP. We need
* to update the HNP, though, so it knows how to reach the
* HNP of the rproc's job family
*/
/* pack a cmd so the buffer can be unpacked correctly */
OBJ_CONSTRUCT(&route, opal_buffer_t);
opal_dss.pack(&route, &cmd, 1, ORTE_RML_CMD);
/* pack the HNP uri */
opal_dss.pack(&route, &uri, 1, OPAL_STRING);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm_parse_port: %s in diff job family - sending update to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(rproc),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &route,
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&route);
return rc;
}
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
ORTE_PROGRESSED_WAIT(ack_recvd, 0, 1);
OBJ_DESTRUCT(&route);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm_parse_port: ack recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
*/
return rc;
}
static int parse_port_name(char *port_name, static int parse_port_name(char *port_name,
orte_process_name_t *rproc, orte_process_name_t *rproc,
orte_rml_tag_t *tag) orte_rml_tag_t *tag)
{ {
char *tmpstring=NULL, *ptr, *rml_uri=NULL; char *tmpstring=NULL, *ptr, *rml_uri=NULL;
orte_rml_cmd_flag_t cmd = ORTE_RML_UPDATE_CMD;
int rc; int rc;
opal_buffer_t route;
/* don't mangle the port name */ /* don't mangle the port name */
tmpstring = strdup(port_name); tmpstring = strdup(port_name);
@ -825,68 +897,6 @@ static int parse_port_name(char *port_name,
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto cleanup; goto cleanup;
} }
/* if this proc is part of my job family, then I need to
* update my RML contact hash table and my routes
*/
if (ORTE_JOB_FAMILY(rproc->jobid) == ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm_parse_port: same job family - updating route",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set the contact info into the hash table */
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(rml_uri))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_routed.update_route(rproc, rproc))) {
ORTE_ERROR_LOG(rc);
}
goto cleanup;
}
/* the proc must be part of another job family. In this case, we
* will route any messages to the proc through our HNP. We need
* to update the HNP, though, so it knows how to reach the
* HNP of the rproc's job family
*/
/* pack a cmd so the buffer can be unpacked correctly */
OBJ_CONSTRUCT(&route, opal_buffer_t);
opal_dss.pack(&route, &cmd, 1, ORTE_RML_CMD);
/* pack the HNP uri */
opal_dss.pack(&route, &tmpstring, 1, OPAL_STRING);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm_parse_port: %s in diff job family - sending update to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(rproc),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &route,
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&route);
goto cleanup;
}
/* wait right here until the HNP acks the update to ensure that
* any subsequent messaging can succeed
*/
ack_recvd = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_UPDATE_ROUTE_ACK,
ORTE_RML_NON_PERSISTENT, recv_ack, NULL);
ORTE_PROGRESSED_WAIT(ack_recvd, 0, 1);
OBJ_DESTRUCT(&route);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_output,
"%s dpm_parse_port: ack recvd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* our get_route function automatically routes all messages for
* other job families via the HNP, so nothing more to do here
*/
rc = ORTE_SUCCESS; rc = ORTE_SUCCESS;
cleanup: cleanup: