Cleanup and fix bugs in the MPI dynamics section. Modify the dpm API so it properly takes ports instead of process names (as correctly identified by Aurelien). Fix race conditions in the use of ompi-server. Fix incompatibilities between the mpi bindings and the dpm implemenation that could cause segfaults due to uninitialized memory.
Fix the ompi-server -h cmd line option so it actually tells you something! Add two new testing codes to the orte/test/mpi area: accept and connect. This commit was SVN r18176.
Этот коммит содержится в:
родитель
aa616b9530
Коммит
7b91f8baff
@ -38,6 +38,9 @@
|
||||
BEGIN_C_DECLS
|
||||
|
||||
/* OMPI port definitions */
|
||||
/* carry over the INVALID def */
|
||||
#define OMPI_RML_TAG_INVALID ORTE_RML_TAG_INVALID
|
||||
/* define a starting point to avoid conflicts */
|
||||
#define OMPI_RML_TAG_BASE ORTE_RML_TAG_MAX
|
||||
|
||||
#define OMPI_RML_TAG_UDAPL OMPI_RML_TAG_BASE+1
|
||||
@ -65,8 +68,8 @@ typedef int (*ompi_dpm_base_module_init_fn_t)(void);
|
||||
* Connect/accept communications
|
||||
*/
|
||||
typedef int (*ompi_dpm_base_module_connect_accept_fn_t)(ompi_communicator_t *comm, int root,
|
||||
orte_process_name_t *port, bool send_first,
|
||||
ompi_communicator_t **newcomm, orte_rml_tag_t tag);
|
||||
char *port, bool send_first,
|
||||
ompi_communicator_t **newcomm);
|
||||
|
||||
/**
|
||||
* Executes internally a disconnect on all dynamic communicators
|
||||
@ -107,9 +110,11 @@ typedef int (*ompi_dpm_base_module_dyn_finalize_fn_t)(void);
|
||||
typedef void (*ompi_dpm_base_module_mark_dyncomm_fn_t)(ompi_communicator_t *comm);
|
||||
|
||||
/*
|
||||
* Open a port to interface to a dynamically spawned job
|
||||
* Open a port to interface to a dynamically spawned job - if the
|
||||
* specified tag is valid, then it will be used to form the port. Otherwise,
|
||||
* a dynamically assigned tag that is unique to this request will be provided
|
||||
*/
|
||||
typedef int (*ompi_dpm_base_module_open_port_fn_t)(char *port_name);
|
||||
typedef int (*ompi_dpm_base_module_open_port_fn_t)(char *port_name, orte_rml_tag_t tag);
|
||||
|
||||
/*
|
||||
* Parse a port name to get the contact info and tag
|
||||
|
@ -52,6 +52,40 @@
|
||||
static opal_mutex_t ompi_dpm_port_mutex;
|
||||
static orte_rml_tag_t next_tag;
|
||||
|
||||
/* API functions */
|
||||
static int init(void);
|
||||
static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
char *port_string, bool send_first,
|
||||
ompi_communicator_t **newcomm );
|
||||
static void disconnect(ompi_communicator_t *comm);
|
||||
static int spawn(int count, char **array_of_commands,
|
||||
char ***array_of_argv,
|
||||
int *array_of_maxprocs,
|
||||
MPI_Info *array_of_info,
|
||||
char *port_name);
|
||||
static int dyn_init(void);
|
||||
static int open_port(char *port_name, orte_rml_tag_t given_tag);
|
||||
static char *parse_port (char *port_name, orte_rml_tag_t *tag);
|
||||
static int close_port(char *port_name);
|
||||
static int finalize(void);
|
||||
|
||||
/*
|
||||
* instantiate the module
|
||||
*/
|
||||
ompi_dpm_base_module_t ompi_dpm_orte_module = {
|
||||
init,
|
||||
connect_accept,
|
||||
disconnect,
|
||||
spawn,
|
||||
dyn_init,
|
||||
ompi_dpm_base_dyn_finalize,
|
||||
ompi_dpm_base_mark_dyncomm,
|
||||
open_port,
|
||||
parse_port,
|
||||
close_port,
|
||||
finalize
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* Init the module
|
||||
@ -70,8 +104,8 @@ static int get_rport (orte_process_name_t *port,
|
||||
|
||||
|
||||
static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
orte_process_name_t *port, bool send_first,
|
||||
ompi_communicator_t **newcomm, orte_rml_tag_t tag )
|
||||
char *port_string, bool send_first,
|
||||
ompi_communicator_t **newcomm )
|
||||
{
|
||||
int size, rsize, rank, rc;
|
||||
orte_std_cntr_t num_vals;
|
||||
@ -82,7 +116,8 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
ompi_communicator_t *newcomp=MPI_COMM_NULL;
|
||||
ompi_proc_t **rprocs=NULL;
|
||||
ompi_group_t *group=comm->c_local_group;
|
||||
orte_process_name_t *rport=NULL, tmp_port_name;
|
||||
orte_process_name_t port, *rport=NULL, tmp_port_name;
|
||||
orte_rml_tag_t tag=ORTE_RML_TAG_INVALID;
|
||||
opal_buffer_t *nbuf=NULL, *nrbuf=NULL;
|
||||
ompi_proc_t **proc_list=NULL, **new_proc_list;
|
||||
int i,j, new_proc_len;
|
||||
@ -92,12 +127,44 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
OPAL_OUTPUT_VERBOSE((1, ompi_dpm_base_output,
|
||||
"%s dpm:orte:connect_accept with port %s %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(port),
|
||||
send_first ? "sending first" : "recv first"));
|
||||
port_string, send_first ? "sending first" : "recv first"));
|
||||
|
||||
/* set default error return */
|
||||
*newcomm = MPI_COMM_NULL;
|
||||
|
||||
size = ompi_comm_size ( comm );
|
||||
rank = ompi_comm_rank ( comm );
|
||||
|
||||
/* extract the process name from the port string, if given, and
|
||||
* set us up to communicate with it
|
||||
*/
|
||||
if (NULL != port_string && 0 < strlen(port_string)) {
|
||||
char *rml_uri;
|
||||
/* separate the string into the RML URI and tag */
|
||||
rml_uri = parse_port(port_string, &tag);
|
||||
/* extract the process name from the rml_uri */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &port, NULL))) {
|
||||
free(rml_uri);
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* update the local hash table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(rml_uri))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(rml_uri);
|
||||
return rc;
|
||||
}
|
||||
/* update the route as "direct" - the selected routed
|
||||
* module will handle this appropriate to its methods
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&port, &port))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(rml_uri);
|
||||
return rc;
|
||||
}
|
||||
free(rml_uri);
|
||||
}
|
||||
|
||||
/* tell the progress engine to tick the event library more
|
||||
often, to make sure that the OOB messages get sent */
|
||||
opal_progress_event_users_increment();
|
||||
@ -122,12 +189,12 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
|
||||
if ( OMPI_COMM_JOIN_TAG != tag ) {
|
||||
if(OMPI_GROUP_IS_DENSE(group)){
|
||||
rc = get_rport(port,send_first,
|
||||
rc = get_rport(&port,send_first,
|
||||
group->grp_proc_pointers[rank], tag,
|
||||
&tmp_port_name);
|
||||
}
|
||||
else {
|
||||
rc = get_rport(port,send_first,
|
||||
rc = get_rport(&port,send_first,
|
||||
proc_list[rank], tag,
|
||||
&tmp_port_name);
|
||||
}
|
||||
@ -136,7 +203,7 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
|
||||
}
|
||||
rport = &tmp_port_name;
|
||||
} else {
|
||||
rport = port;
|
||||
rport = &port;
|
||||
}
|
||||
|
||||
/* Generate the message buffer containing the number of processes and the list of
|
||||
@ -424,9 +491,9 @@ static void disconnect(ompi_communicator_t *comm)
|
||||
* point.
|
||||
*
|
||||
*/
|
||||
int get_rport(orte_process_name_t *port, int send_first,
|
||||
ompi_proc_t *proc, orte_rml_tag_t tag,
|
||||
orte_process_name_t *rport_name)
|
||||
static int get_rport(orte_process_name_t *port, int send_first,
|
||||
ompi_proc_t *proc, orte_rml_tag_t tag,
|
||||
orte_process_name_t *rport_name)
|
||||
{
|
||||
int rc;
|
||||
orte_std_cntr_t num_vals;
|
||||
@ -435,9 +502,9 @@ int get_rport(orte_process_name_t *port, int send_first,
|
||||
opal_buffer_t *sbuf;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, ompi_dpm_base_output,
|
||||
"%s dpm:orte:get_rport sending to %s",
|
||||
"%s dpm:orte:get_rport sending to %s tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(port)));
|
||||
ORTE_NAME_PRINT(port), (int)tag));
|
||||
|
||||
sbuf = OBJ_NEW(opal_buffer_t);
|
||||
if (NULL == sbuf) {
|
||||
@ -461,8 +528,8 @@ int get_rport(orte_process_name_t *port, int send_first,
|
||||
opal_buffer_t *rbuf;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, ompi_dpm_base_output,
|
||||
"%s dpm:orte:get_rport waiting to recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
"%s dpm:orte:get_rport waiting to recv on tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)tag));
|
||||
|
||||
rbuf = OBJ_NEW(opal_buffer_t);
|
||||
if (NULL == rbuf) {
|
||||
@ -720,11 +787,16 @@ static int spawn(int count, char **array_of_commands,
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int open_port(char *port_name)
|
||||
/* optionally can provide a tag to be used - otherwise, we supply the
|
||||
* next dynamically assigned tag
|
||||
*/
|
||||
static int open_port(char *port_name, orte_rml_tag_t given_tag)
|
||||
{
|
||||
char *rml_uri, *ptr, tag[12];
|
||||
int rc;
|
||||
|
||||
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
|
||||
|
||||
/*
|
||||
* The port_name is equal to the OOB-contact information
|
||||
* and an RML tag. The reason for adding the tag is
|
||||
@ -732,10 +804,17 @@ static int open_port(char *port_name)
|
||||
*/
|
||||
|
||||
if (NULL == (rml_uri = orte_rml.get_contact_info())) {
|
||||
return OMPI_ERR_NOT_AVAILABLE;
|
||||
rc = OMPI_ERR_NOT_AVAILABLE;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
sprintf(tag, "%d", (int)next_tag);
|
||||
if (ORTE_RML_TAG_INVALID == given_tag) {
|
||||
snprintf(tag, 12, "%d", (int)next_tag);
|
||||
next_tag++;
|
||||
} else {
|
||||
/* use the given tag */
|
||||
snprintf(tag, 12, "%d", (int)given_tag);
|
||||
}
|
||||
|
||||
/* if the overall port name is too long, we try to truncate the rml uri */
|
||||
rc = 0;
|
||||
@ -743,7 +822,8 @@ static int open_port(char *port_name)
|
||||
/* if we have already tried several times, punt! */
|
||||
if (4 < rc) {
|
||||
free(rml_uri);
|
||||
return OMPI_ERROR;
|
||||
rc = OMPI_ERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
/* find the trailing uri and truncate there */
|
||||
ptr = strrchr(rml_uri, ';');
|
||||
@ -751,19 +831,19 @@ static int open_port(char *port_name)
|
||||
++rc;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
|
||||
sprintf (port_name, "%s:%s", rml_uri, tag);
|
||||
next_tag++;
|
||||
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
|
||||
snprintf (port_name, MPI_MAX_PORT_NAME, "%s:%s", rml_uri, tag);
|
||||
|
||||
free ( rml_uri );
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
rc = OMPI_SUCCESS;
|
||||
|
||||
cleanup:
|
||||
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* takes a port_name and separates it into the RML URI
|
||||
* and the tag
|
||||
*/
|
||||
* and the tag
|
||||
*/
|
||||
static char *parse_port (char *port_name, orte_rml_tag_t *tag)
|
||||
{
|
||||
char *tmp_string, *ptr;
|
||||
@ -781,15 +861,15 @@ static char *parse_port (char *port_name, orte_rml_tag_t *tag)
|
||||
sscanf(ptr,"%d", (int*)tag);
|
||||
|
||||
/* see if the length of the RML uri is too long - if so,
|
||||
* truncate it
|
||||
*/
|
||||
* truncate it
|
||||
*/
|
||||
if (strlen(port_name) > MPI_MAX_PORT_NAME) {
|
||||
port_name[MPI_MAX_PORT_NAME] = '\0';
|
||||
}
|
||||
|
||||
/* copy the RML uri so we can return a malloc'd value
|
||||
* that can later be free'd
|
||||
*/
|
||||
* that can later be free'd
|
||||
*/
|
||||
tmp_string = strdup(port_name);
|
||||
|
||||
return tmp_string;
|
||||
@ -802,13 +882,10 @@ static int close_port(char *port_name)
|
||||
|
||||
static int dyn_init(void)
|
||||
{
|
||||
char *oob_port=NULL;
|
||||
char *port_name=NULL;
|
||||
int root=0, rc;
|
||||
bool send_first = true;
|
||||
orte_rml_tag_t tag;
|
||||
ompi_communicator_t *newcomm=NULL;
|
||||
orte_process_name_t port_proc_name;
|
||||
ompi_group_t *group = NULL;
|
||||
ompi_errhandler_t *errhandler = NULL;
|
||||
|
||||
@ -826,42 +903,7 @@ static int dyn_init(void)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
port_name));
|
||||
|
||||
/* split the content of the environment variable into
|
||||
its pieces, which are RML-uri:tag */
|
||||
oob_port = parse_port (port_name, &tag);
|
||||
|
||||
/* set the contact info into the local hash table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(oob_port))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(oob_port);
|
||||
return(rc);
|
||||
}
|
||||
|
||||
/* process the RML uri to get the port's process name */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(oob_port, &port_proc_name, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(oob_port);
|
||||
return rc;
|
||||
}
|
||||
free(oob_port); /* done with this */
|
||||
|
||||
/* update the route to this process - in this case, we always give it
|
||||
* as direct since we were given the contact info. We trust the
|
||||
* selected routed component to do the Right Thing for its own mode
|
||||
* of operation
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&port_proc_name, &port_proc_name))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, ompi_dpm_base_output,
|
||||
"%s dpm:orte:dyn_init calling connect_accept to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&port_proc_name)));
|
||||
|
||||
rc = connect_accept (MPI_COMM_WORLD, root, &port_proc_name,
|
||||
send_first, &newcomm, tag );
|
||||
rc = connect_accept (MPI_COMM_WORLD, root, port_name, send_first, &newcomm);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
return rc;
|
||||
}
|
||||
@ -897,21 +939,4 @@ static int finalize(void)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* instantiate the module
|
||||
*/
|
||||
ompi_dpm_base_module_t ompi_dpm_orte_module = {
|
||||
init,
|
||||
connect_accept,
|
||||
disconnect,
|
||||
spawn,
|
||||
dyn_init,
|
||||
ompi_dpm_base_dyn_finalize,
|
||||
ompi_dpm_base_mark_dyncomm,
|
||||
open_port,
|
||||
parse_port,
|
||||
close_port,
|
||||
finalize
|
||||
};
|
||||
|
||||
|
||||
|
@ -64,9 +64,10 @@ static bool server_setup=false;
|
||||
|
||||
static void setup_server(void)
|
||||
{
|
||||
opal_buffer_t buf;
|
||||
orte_rml_cmd_flag_t command=ORTE_RML_UPDATE_CMD;
|
||||
int rc;
|
||||
OPAL_OUTPUT_VERBOSE((1, ompi_pubsub_base_output,
|
||||
"%s pubsub:orte: setting up server at URI %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == mca_pubsub_orte_component.server_uri) ? "NULL" : mca_pubsub_orte_component.server_uri));
|
||||
|
||||
if (NULL == mca_pubsub_orte_component.server_uri) {
|
||||
/* if the contact info for the server is NULL, then there
|
||||
@ -76,23 +77,6 @@ static void setup_server(void)
|
||||
return;
|
||||
}
|
||||
|
||||
/* setup the route to the server using the
|
||||
* selected routed component. This allows us
|
||||
* to tell the local daemon how to reach the
|
||||
* server, so we can still only have one connection
|
||||
* open! To do this, we need to insert the server's
|
||||
* uri into a buffer
|
||||
*/
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
opal_dss.pack(&buf, &command, 1, ORTE_RML_CMD);
|
||||
opal_dss.pack(&buf, &mca_pubsub_orte_component.server_uri, 1, OPAL_STRING);
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.init_routes(ORTE_PROC_MY_NAME->jobid, &buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
server_setup = true;
|
||||
return;
|
||||
}
|
||||
OBJ_DESTRUCT(&buf);
|
||||
|
||||
/* extract the server's name */
|
||||
orte_rml_base_parse_uris(mca_pubsub_orte_component.server_uri, &mca_pubsub_orte_component.server, NULL);
|
||||
|
||||
@ -101,6 +85,11 @@ static void setup_server(void)
|
||||
|
||||
/* flag setup as completed */
|
||||
server_setup = true;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, ompi_pubsub_base_output,
|
||||
"%s pubsub:orte: server %s setup",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mca_pubsub_orte_component.server)));
|
||||
}
|
||||
|
||||
/*
|
||||
@ -127,6 +116,11 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name )
|
||||
|
||||
ompi_info_get_bool(info, "ompi_global_scope", &global_scope, &flag);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, ompi_pubsub_base_output,
|
||||
"%s pubsub:orte: publishing service %s scope %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
service_name, global_scope ? "Global" : "Local"));
|
||||
|
||||
if (!global_scope) {
|
||||
/* if the scope is not global, then store the value on the HNP */
|
||||
info_host = ORTE_PROC_MY_HNP;
|
||||
@ -266,6 +260,11 @@ static char* lookup ( char *service_name, ompi_info_t *info )
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, ompi_pubsub_base_output,
|
||||
"%s pubsub:orte: lookup service %s scope %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
service_name, lookup[0]));
|
||||
|
||||
/* check for error situations */
|
||||
|
||||
if (NONE == lookup[0]) {
|
||||
@ -407,6 +406,11 @@ static int unpublish ( char *service_name, ompi_info_t *info )
|
||||
|
||||
ompi_info_get_bool(info, "ompi_global_scope", &global_scope, &flag);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, ompi_pubsub_base_output,
|
||||
"%s pubsub:orte: unpublish service %s scope %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
service_name, global_scope ? "Global" : "Local"));
|
||||
|
||||
if (!global_scope) {
|
||||
/* if the scope is not global, then unpublish the value from the HNP */
|
||||
info_host = ORTE_PROC_MY_HNP;
|
||||
|
@ -41,8 +41,6 @@ int MPI_Comm_accept(char *port_name, MPI_Info info, int root,
|
||||
int rank, rc;
|
||||
bool send_first=false; /* we receive first */
|
||||
ompi_communicator_t *newcomp=MPI_COMM_NULL;
|
||||
char *tmp_port=NULL;
|
||||
orte_rml_tag_t tag = 0; /* tag is set & used in get_rport only at root; silence coverity */
|
||||
|
||||
MEMCHECKER(
|
||||
memchecker_comm(comm);
|
||||
@ -89,15 +87,7 @@ int MPI_Comm_accept(char *port_name, MPI_Info info, int root,
|
||||
*/
|
||||
OPAL_CR_ENTER_LIBRARY();
|
||||
|
||||
/*
|
||||
* Our own port_name is not of interest here, so we pass in NULL.
|
||||
* The two leaders will figure this out later. However, we need the tag.
|
||||
*/
|
||||
if ( rank == root ) {
|
||||
tmp_port = ompi_dpm.parse_port(port_name, &tag);
|
||||
free (tmp_port);
|
||||
}
|
||||
rc = ompi_dpm.connect_accept (comm, root, NULL, send_first, &newcomp, tag);
|
||||
rc = ompi_dpm.connect_accept (comm, root, port_name, send_first, &newcomp);
|
||||
|
||||
*newcomm = newcomp;
|
||||
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME );
|
||||
|
@ -24,13 +24,6 @@
|
||||
#include "ompi/mca/dpm/dpm.h"
|
||||
#include "ompi/memchecker.h"
|
||||
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/routed/routed.h"
|
||||
|
||||
#if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
|
||||
#pragma weak MPI_Comm_connect = PMPI_Comm_connect
|
||||
#endif
|
||||
@ -48,9 +41,6 @@ int MPI_Comm_connect(char *port_name, MPI_Info info, int root,
|
||||
int rank, rc;
|
||||
bool send_first=true; /* yes, we are the active part in this game */
|
||||
ompi_communicator_t *newcomp=MPI_COMM_NULL;
|
||||
orte_process_name_t port_proc_name;
|
||||
char *tmp_port=NULL;
|
||||
orte_rml_tag_t tag = 0; /* tag is set & used in get_rport only at root; silence coverity */
|
||||
|
||||
MEMCHECKER(
|
||||
memchecker_comm(comm);
|
||||
@ -99,43 +89,7 @@ int MPI_Comm_connect(char *port_name, MPI_Info info, int root,
|
||||
|
||||
OPAL_CR_ENTER_LIBRARY();
|
||||
|
||||
/*
|
||||
* translate the port_name string into the according process_name_t
|
||||
* structure.
|
||||
*/
|
||||
if ( rank == root ) {
|
||||
tmp_port = ompi_dpm.parse_port (port_name, &tag);
|
||||
if (ORTE_SUCCESS != (rc = orte_util_convert_string_to_process_name(&port_proc_name, tmp_port))) {
|
||||
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_PORT, FUNC_NAME);
|
||||
}
|
||||
if ( OPAL_EQUAL == opal_dss.compare(&port_proc_name, ORTE_NAME_INVALID, ORTE_NAME) ) {
|
||||
*newcomm = MPI_COMM_NULL;
|
||||
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_PORT, FUNC_NAME);
|
||||
}
|
||||
|
||||
/* Make sure we can route the connect attempt to the remote side if we
|
||||
* don't share the same HNP
|
||||
*/
|
||||
|
||||
/* set the contact info into the local hash table */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(tmp_port))) {
|
||||
free(tmp_port);
|
||||
return OMPI_ERRHANDLER_INVOKE(comm, rc, FUNC_NAME);
|
||||
}
|
||||
/* update the route to this process - in this case, we always give it
|
||||
* as direct since we were given the contact info. We trust the
|
||||
* selected routed component to do the Right Thing for its own mode
|
||||
* of operation
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&port_proc_name, &port_proc_name))) {
|
||||
return OMPI_ERRHANDLER_INVOKE(comm, rc, FUNC_NAME);
|
||||
}
|
||||
|
||||
free (tmp_port);
|
||||
}
|
||||
|
||||
rc = ompi_dpm.connect_accept (comm, root, &port_proc_name, send_first,
|
||||
&newcomp, tag);
|
||||
rc = ompi_dpm.connect_accept (comm, root, port_name, send_first, &newcomp);
|
||||
|
||||
*newcomm = newcomp;
|
||||
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
|
||||
|
@ -54,15 +54,12 @@ static int ompi_socket_recv (int fd, char *buf, int len );
|
||||
int MPI_Comm_join(int fd, MPI_Comm *intercomm)
|
||||
{
|
||||
int rc;
|
||||
orte_rml_tag_t tag=OMPI_COMM_JOIN_TAG;
|
||||
size_t size;
|
||||
uint32_t len, rlen, llen, lrlen;
|
||||
int send_first=1;
|
||||
char *rname, *name;
|
||||
char *rport, *rname;
|
||||
|
||||
ompi_proc_t **myproc=NULL;
|
||||
ompi_communicator_t *newcomp;
|
||||
orte_process_name_t port_proc_name;
|
||||
char port_name[MPI_MAX_PORT_NAME];
|
||||
|
||||
if ( MPI_PARAM_CHECK ) {
|
||||
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
|
||||
@ -75,15 +72,16 @@ int MPI_Comm_join(int fd, MPI_Comm *intercomm)
|
||||
|
||||
OPAL_CR_ENTER_LIBRARY();
|
||||
|
||||
/* sendrecv OOB-name (port-name) through the socket connection.
|
||||
Need to determine somehow how to avoid a potential deadlock
|
||||
here. */
|
||||
myproc = ompi_proc_self (&size);
|
||||
if (ORTE_SUCCESS != (rc = orte_util_convert_process_name_to_string (&name, &(myproc[0]->proc_name)))) {
|
||||
/* open a port using the specified tag */
|
||||
if (OMPI_SUCCESS != (rc = ompi_dpm.open_port(port_name, OMPI_COMM_JOIN_TAG))) {
|
||||
OPAL_CR_EXIT_LIBRARY();
|
||||
return rc;
|
||||
}
|
||||
llen = (uint32_t)(strlen(name)+1);
|
||||
|
||||
/* sendrecv port-name through the socket connection.
|
||||
Need to determine somehow how to avoid a potential deadlock
|
||||
here. */
|
||||
llen = (uint32_t)(strlen(port_name)+1);
|
||||
len = htonl(llen);
|
||||
|
||||
ompi_socket_send( fd, (char *) &len, sizeof(uint32_t));
|
||||
@ -100,20 +98,14 @@ int MPI_Comm_join(int fd, MPI_Comm *intercomm)
|
||||
/* Assumption: socket_send should not block, even if the socket
|
||||
is not configured to be non-blocking, because the message length are
|
||||
so short. */
|
||||
ompi_socket_send (fd, name, llen);
|
||||
ompi_socket_recv (fd, rname, lrlen);
|
||||
ompi_socket_send (fd, port_name, llen);
|
||||
ompi_socket_recv (fd, rport, lrlen);
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_util_convert_string_to_process_name(&port_proc_name, rname))) {
|
||||
OPAL_CR_EXIT_LIBRARY();
|
||||
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_PORT, FUNC_NAME);
|
||||
}
|
||||
rc = ompi_dpm.connect_accept (MPI_COMM_SELF, 0, &port_proc_name,
|
||||
send_first, &newcomp, tag);
|
||||
/* use the port we received to connect/accept */
|
||||
rc = ompi_dpm.connect_accept (MPI_COMM_SELF, 0, rport, send_first, &newcomp);
|
||||
|
||||
|
||||
free ( name );
|
||||
free ( rname);
|
||||
free ( myproc );
|
||||
free ( rport );
|
||||
|
||||
*intercomm = newcomp;
|
||||
OMPI_ERRHANDLER_RETURN (rc, MPI_COMM_SELF, rc, FUNC_NAME);
|
||||
|
@ -44,8 +44,6 @@ int MPI_Comm_spawn(char *command, char **argv, int maxprocs, MPI_Info info,
|
||||
bool send_first = false; /* we wait to be contacted */
|
||||
ompi_communicator_t *newcomp=NULL;
|
||||
char port_name[MPI_MAX_PORT_NAME];
|
||||
char *tmp_port;
|
||||
orte_rml_tag_t tag = 0; /* tag is set & used in get_rport only at root; silence coverity */
|
||||
bool non_mpi = false;
|
||||
|
||||
MEMCHECKER(
|
||||
@ -91,36 +89,32 @@ int MPI_Comm_spawn(char *command, char **argv, int maxprocs, MPI_Info info,
|
||||
}
|
||||
}
|
||||
|
||||
/* initialize the port name to avoid problems */
|
||||
memset(port_name, 0, MPI_MAX_PORT_NAME);
|
||||
|
||||
/* See if the info key "ompi_non_mpi" was set to true */
|
||||
ompi_info_get_bool(info, "ompi_non_mpi", &non_mpi, &flag);
|
||||
|
||||
OPAL_CR_ENTER_LIBRARY();
|
||||
|
||||
if ( rank == root ) {
|
||||
if (non_mpi) {
|
||||
/* no port is required since we won't be
|
||||
* communicating with the children
|
||||
*/
|
||||
port_name[0] = '\0';
|
||||
} else {
|
||||
if (!non_mpi) {
|
||||
/* Open a port. The port_name is passed as an environment
|
||||
variable to the children. */
|
||||
ompi_dpm.open_port (port_name);
|
||||
if (OMPI_SUCCESS != (rc = ompi_dpm.open_port (port_name, OMPI_RML_TAG_INVALID))) {
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
if (OMPI_SUCCESS != (rc = ompi_dpm.spawn (1, &command, &argv, &maxprocs,
|
||||
&info, port_name))) {
|
||||
goto error;
|
||||
}
|
||||
if (!non_mpi) {
|
||||
tmp_port = ompi_dpm.parse_port (port_name, &tag);
|
||||
free(tmp_port);
|
||||
}
|
||||
}
|
||||
|
||||
if (non_mpi) {
|
||||
newcomp = MPI_COMM_NULL;
|
||||
} else {
|
||||
rc = ompi_dpm.connect_accept (comm, root, NULL, send_first, &newcomp, tag);
|
||||
rc = ompi_dpm.connect_accept (comm, root, port_name, send_first, &newcomp);
|
||||
}
|
||||
|
||||
error:
|
||||
|
@ -45,8 +45,6 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o
|
||||
ompi_communicator_t *newcomp=NULL;
|
||||
bool send_first=false; /* they are contacting us first */
|
||||
char port_name[MPI_MAX_PORT_NAME];
|
||||
char *tmp_port;
|
||||
orte_rml_tag_t tag = 0;
|
||||
bool non_mpi, cumulative = false;
|
||||
|
||||
MEMCHECKER(
|
||||
@ -136,32 +134,30 @@ int MPI_Comm_spawn_multiple(int count, char **array_of_commands, char ***array_o
|
||||
}
|
||||
}
|
||||
|
||||
/* initialize the port name to avoid problems */
|
||||
memset(port_name, 0, MPI_MAX_PORT_NAME);
|
||||
|
||||
OPAL_CR_ENTER_LIBRARY();
|
||||
|
||||
if ( rank == root ) {
|
||||
if (non_mpi) {
|
||||
/* RHC: should this be better? */
|
||||
port_name[0] = '\0';
|
||||
} else {
|
||||
if (!non_mpi) {
|
||||
/* Open a port. The port_name is passed as an environment
|
||||
variable to the children. */
|
||||
ompi_dpm.open_port (port_name);
|
||||
if (OMPI_SUCCESS != (rc = ompi_dpm.open_port (port_name, OMPI_RML_TAG_INVALID))) {
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
if (OMPI_SUCCESS != (rc = ompi_dpm.spawn(count, array_of_commands,
|
||||
array_of_argv, array_of_maxprocs,
|
||||
array_of_info, port_name))) {
|
||||
goto error;
|
||||
}
|
||||
if (!non_mpi) {
|
||||
tmp_port = ompi_dpm.parse_port (port_name, &tag);
|
||||
free(tmp_port);
|
||||
}
|
||||
}
|
||||
|
||||
if (non_mpi) {
|
||||
newcomp = MPI_COMM_NULL;
|
||||
} else {
|
||||
rc = ompi_dpm.connect_accept (comm, root, NULL, send_first, &newcomp, tag);
|
||||
rc = ompi_dpm.connect_accept (comm, root, port_name, send_first, &newcomp);
|
||||
}
|
||||
|
||||
error:
|
||||
|
@ -62,7 +62,7 @@ int MPI_Open_port(MPI_Info info, char *port_name)
|
||||
*/
|
||||
}
|
||||
|
||||
rc = ompi_dpm.open_port(port_name);
|
||||
rc = ompi_dpm.open_port(port_name, OMPI_RML_TAG_INVALID);
|
||||
|
||||
OPAL_CR_ENTER_LIBRARY();
|
||||
|
||||
|
@ -19,5 +19,9 @@
|
||||
# This is the US/English general help file for Open MPI's data server.
|
||||
#
|
||||
[ompiserver:usage]
|
||||
Usage: %s [OPTION]...
|
||||
Start an Open MPI data server.
|
||||
Start an Open MPI data server
|
||||
|
||||
Usage: %s [OPTIONS]
|
||||
|
||||
%s
|
||||
|
||||
|
@ -89,9 +89,9 @@ opal_cmd_line_init_t ompi_server_cmd_line_opts[] = {
|
||||
&no_daemonize, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Don't daemonize into the background" },
|
||||
|
||||
{ NULL, NULL, NULL, '\0', NULL, "report-uri", 1,
|
||||
{ NULL, NULL, NULL, 'r', NULL, "report-uri", 1,
|
||||
&report_uri, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Report the server's uri on stdout"},
|
||||
"Report the server's uri on stdout [-], stderr [+], or a file [anything else]"},
|
||||
|
||||
/* End of list */
|
||||
{ NULL, NULL, NULL, '\0', NULL, NULL, 0,
|
||||
|
@ -33,6 +33,7 @@
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/rml/base/base.h"
|
||||
@ -87,6 +88,11 @@ static void process_message(int fd, short event, void *data)
|
||||
orte_std_cntr_t count;
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rml_base_output,
|
||||
"%s rml:base:recv: processing message from %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&mev->sender)));
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &command, &count, ORTE_RML_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
|
@ -91,6 +91,12 @@ orte_rml_oob_send(orte_process_name_t* peer,
|
||||
int i;
|
||||
int bytes = 0;
|
||||
|
||||
if (ORTE_RML_TAG_INVALID == tag) {
|
||||
/* cannot send to an invalid tag */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
msg->msg_type = ORTE_RML_BLOCKING_SEND;
|
||||
flags |= ORTE_RML_FLAG_RECURSIVE_CALLBACK;
|
||||
|
||||
@ -172,6 +178,12 @@ orte_rml_oob_send_nb(orte_process_name_t* peer,
|
||||
int i;
|
||||
int bytes = 0;
|
||||
|
||||
if (ORTE_RML_TAG_INVALID == tag) {
|
||||
/* cannot send to an invalid tag */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
msg->msg_type = ORTE_RML_NONBLOCKING_IOV_SEND;
|
||||
msg->msg_cbfunc.iov = cbfunc;
|
||||
msg->msg_cbdata = cbdata;
|
||||
@ -240,6 +252,12 @@ orte_rml_oob_send_buffer(orte_process_name_t* peer,
|
||||
orte_std_cntr_t datalen;
|
||||
struct iovec iov[1];
|
||||
|
||||
if (ORTE_RML_TAG_INVALID == tag) {
|
||||
/* cannot send to an invalid tag */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* first build iovec from buffer information */
|
||||
ret = opal_dss.unload(buffer, &dataptr, &datalen);
|
||||
if (ret != ORTE_SUCCESS) return ret;
|
||||
@ -268,6 +286,12 @@ orte_rml_oob_send_buffer_nb(orte_process_name_t* peer,
|
||||
orte_process_name_t next;
|
||||
int bytes = 0;
|
||||
|
||||
if (ORTE_RML_TAG_INVALID == tag) {
|
||||
/* cannot send to an invalid tag */
|
||||
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* first build iovec from buffer information */
|
||||
ret = opal_dss.unload(buffer, &dataptr, &datalen);
|
||||
if (ORTE_SUCCESS != ret) {
|
||||
|
@ -50,6 +50,7 @@ BEGIN_C_DECLS
|
||||
|
||||
#define ORTE_RML_TAG_T OPAL_UINT32
|
||||
|
||||
#define ORTE_RML_TAG_INVALID 0
|
||||
#define ORTE_RML_TAG_DAEMON 1
|
||||
#define ORTE_RML_TAG_IOF_SVC 2
|
||||
#define ORTE_RML_TAG_IOF_CLNT 3
|
||||
|
@ -1,4 +1,4 @@
|
||||
PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv
|
||||
PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv accept connect
|
||||
|
||||
all: $(PROGS)
|
||||
|
||||
|
43
orte/test/mpi/accept.c
Обычный файл
43
orte/test/mpi/accept.c
Обычный файл
@ -0,0 +1,43 @@
|
||||
/* -*- C -*-
|
||||
*
|
||||
* $HEADER$
|
||||
*
|
||||
* Test of connect/accept - the accept (server) side
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include "mpi.h"
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
int rank, size;
|
||||
MPI_Info info;
|
||||
char port[MPI_MAX_PORT_NAME];
|
||||
MPI_Comm client;
|
||||
|
||||
MPI_Init(&argc, &argv);
|
||||
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &size);
|
||||
|
||||
printf("Hello, World, I am %d of %d\n", rank, size);
|
||||
|
||||
MPI_Info_create(&info);
|
||||
MPI_Info_set(info, "ompi_global_scope", "true");
|
||||
|
||||
if (0 == rank) {
|
||||
MPI_Open_port(MPI_INFO_NULL, port);
|
||||
MPI_Publish_name("test-pub", info, port);
|
||||
MPI_Comm_accept(port, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &client);
|
||||
}
|
||||
|
||||
MPI_Barrier(client);
|
||||
|
||||
if (0 == rank) {
|
||||
MPI_Unpublish_name("test-pub", info, port);
|
||||
MPI_Close_port(port);
|
||||
}
|
||||
|
||||
MPI_Finalize();
|
||||
return 0;
|
||||
}
|
35
orte/test/mpi/connect.c
Обычный файл
35
orte/test/mpi/connect.c
Обычный файл
@ -0,0 +1,35 @@
|
||||
/* -*- C -*-
|
||||
*
|
||||
* $HEADER$
|
||||
*
|
||||
* Test of connect/accept - the accept (server) side
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include "mpi.h"
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
int rank, size;
|
||||
MPI_Comm server;
|
||||
MPI_Info info;
|
||||
char port[MPI_MAX_PORT_NAME];
|
||||
|
||||
MPI_Init(&argc, &argv);
|
||||
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &size);
|
||||
|
||||
printf("Hello, World, I am %d of %d\n", rank, size);
|
||||
|
||||
MPI_Info_create(&info);
|
||||
MPI_Info_set(info, "ompi_global_scope", "true");
|
||||
|
||||
MPI_Lookup_name("test-pub", info, port);
|
||||
MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &server);
|
||||
|
||||
MPI_Barrier(server);
|
||||
|
||||
MPI_Finalize();
|
||||
return 0;
|
||||
}
|
@ -77,6 +77,7 @@
|
||||
#include "orte/mca/plm/plm.h"
|
||||
#include "orte/mca/rmaps/rmaps_types.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/rml/base/rml_contact.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
|
||||
#include "orte/runtime/runtime.h"
|
||||
@ -491,6 +492,16 @@ int orterun(int argc, char *argv[])
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if an uri for the ompi-server was provided, set the route */
|
||||
if (NULL != ompi_server) {
|
||||
opal_buffer_t buf;
|
||||
/* setup our route to the server */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
opal_dss.pack(&buf, &ompi_server, 1, OPAL_STRING);
|
||||
orte_rml_base_update_contact_info(&buf);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
}
|
||||
|
||||
/** setup callbacks for abort signals */
|
||||
opal_signal_set(&term_handler, SIGTERM,
|
||||
abort_signal_callback, &term_handler);
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user