1
1

Extend the dpm framework API to support persistent accept/connect operations:

* paccept - establish a persistent listening port for async connect requests

* pconnect - async connect to remote process that has posted a paccept port. Provides a timeout mechanism, and allows the underlying implementation to retry until timeout 

* pclose - shuts down a prior paccept posting

Includes example programs paccept.c and pconnect.c in orte/test/mpi. New MPI extension interfaces coming...

This commit was SVN r29063.
Этот коммит содержится в:
Ralph Castain 2013-08-23 18:02:50 +00:00
родитель 96457df9bc
Коммит 6d24b34940
12 изменённых файлов: 929 добавлений и 33 удалений

Просмотреть файл

@ -9,6 +9,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -21,6 +22,13 @@
#include "ompi_config.h"
#include "ompi/constants.h"
#if HAVE_TIME_H
#include <time.h>
#endif
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include "ompi/mca/dpm/dpm.h"
/*
@ -72,6 +80,14 @@ int ompi_dpm_base_null_parse_port(char *port_name,
char **hnp_uri, char **rml_uri, ompi_rml_tag_t *tag);
int ompi_dpm_base_null_route_to_port(char *rml_uri, ompi_process_name_t *rproc);
int ompi_dpm_base_null_close_port(char *port_name);
int ompi_dpm_base_null_pconnect(char *port,
struct timeval *timeout,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata);
int ompi_dpm_base_null_paccept(char *port,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata);
void ompi_dpm_base_null_pclose(char *port);
/* useful globals */
OMPI_DECLSPEC extern ompi_dpm_base_component_t ompi_dpm_base_selected_component;

Просмотреть файл

@ -7,6 +7,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -40,7 +41,10 @@ OMPI_DECLSPEC ompi_dpm_base_module_t ompi_dpm = {
ompi_dpm_base_null_parse_port,
ompi_dpm_base_null_route_to_port,
ompi_dpm_base_null_close_port,
NULL
NULL,
ompi_dpm_base_null_pconnect,
ompi_dpm_base_null_paccept,
ompi_dpm_base_null_pclose
};
ompi_dpm_base_component_t ompi_dpm_base_selected_component;

Просмотреть файл

@ -14,6 +14,7 @@
* Copyright (c) 2006-2007 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
@ -25,6 +26,12 @@
#include "ompi_config.h"
#include <string.h>
#include <stdio.h>
#if HAVE_TIME_H
#include <time.h>
#endif
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include "ompi/mca/dpm/dpm.h"
#include "ompi/mca/dpm/base/base.h"
@ -86,3 +93,23 @@ int ompi_dpm_base_null_close_port(char *port_name)
{
return OMPI_ERR_NOT_SUPPORTED;
}
int ompi_dpm_base_null_pconnect(char *port,
struct timeval *timeout,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata)
{
return OMPI_ERR_NOT_SUPPORTED;
}
int ompi_dpm_base_null_paccept(char *port,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata)
{
return OMPI_ERR_NOT_SUPPORTED;
}
void ompi_dpm_base_null_pclose(char *port)
{
return;
}

Просмотреть файл

@ -9,7 +9,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
* Copyright (c) 2013 Intel, Inc. All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
*
@ -27,6 +28,13 @@
#include "ompi_config.h"
#if HAVE_TIME_H
#include <time.h>
#endif
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
@ -47,6 +55,51 @@ typedef int (*ompi_dpm_base_module_connect_accept_fn_t)(ompi_communicator_t *com
char *port, bool send_first,
ompi_communicator_t **newcomm);
/* define a callback function for use by non-blocking persistent connect/accept operations */
typedef void (*ompi_dpm_base_paccept_connect_callback_fn_t)(ompi_communicator_t *newcomm,
ompi_proc_t *remote_proc,
void *cbdata);
/*
* Create a persistent connection point for accepting non-blocking connection requests.
* The accept is persistent and will remain open until explicitly closed, or during
* dpm_framework_close. Any incoming connection request will be used to create a new
* communicator which will be returned via callback, along with the process name.
*
* In both cases, the callback function will return the new communicator plus the
* user's original cbdata.
*
* paccept requires a port (typically obtained by a prior call to MPI_Open_port).
* This must be published so it can be found by processes wanting to
* connect to this process, and is passed by those processes as the "port" argument for
* pconnect.
*
* Calls to pconnect are also non-blocking, with callback upon completion. Periodic
* attempts to complete the connection may be made at the discretion of the implementation.
* Failure to connect will be indicated by a callback returning a NULL communicator. Callers
* should use the cbdata to track the corresponding pconnect request. A timeout
* is provided to avoid hanging should the other process not have an active paccept
* on the specified port (e.g., the process may have closed it). A NULL value for
* the timeout argument indicates that the pconnect operation should not timeout,
* and will regularly retry the connection forever.
*
* Processes may create and publish as many ports, and call paccept as many times, as
* they like. When a process no longer wishes to accept connect requests, it can "close"
* a paccept request by passing in the port used when calling paccept. A call to "close"
* with a NULL argument will close *all* currently registered paccept channels.
*/
typedef int (*ompi_dpm_base_module_paccept_fn_t)(char *port,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata);
typedef int (*ompi_dpm_base_module_pconnect_fn_t)(char *port,
struct timeval *timeout,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata);
typedef void (*ompi_dpm_base_module_pclose_fn_t)(char *port);
/**
* Executes internally a disconnect on all dynamic communicators
* in case the user did not disconnect them.
@ -144,6 +197,10 @@ struct ompi_dpm_base_module_1_0_0_t {
ompi_dpm_base_module_close_port_fn_t close_port;
/* finalize */
ompi_dpm_base_module_finalize_fn_t finalize;
/* pconnect/accept */
ompi_dpm_base_module_pconnect_fn_t pconnect;
ompi_dpm_base_module_paccept_fn_t paccept;
ompi_dpm_base_module_pclose_fn_t pclose;
};
typedef struct ompi_dpm_base_module_1_0_0_t ompi_dpm_base_module_1_0_0_t;
typedef struct ompi_dpm_base_module_1_0_0_t ompi_dpm_base_module_t;

Просмотреть файл

@ -14,6 +14,7 @@
* Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -27,13 +28,20 @@
#include <string.h>
#include <stdio.h>
#include <ctype.h>
#if HAVE_TIME_H
#include <time.h>
#endif
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#include "opal/util/argv.h"
#include "opal/util/opal_getcwd.h"
#include "opal/dss/dss.h"
#include "opal/mca/db/db.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/plm/plm.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
@ -43,6 +51,7 @@
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
@ -59,12 +68,14 @@
/* Local static variables */
static opal_mutex_t ompi_dpm_port_mutex;
static orte_rml_tag_t next_tag;
static opal_list_t orte_dpm_acceptors, orte_dpm_connectors;
static uint32_t next_preq=0;
/* API functions */
static int init(void);
static int connect_accept ( ompi_communicator_t *comm, int root,
char *port_string, bool send_first,
ompi_communicator_t **newcomm );
static int connect_accept (ompi_communicator_t *comm, int root,
char *port_string, bool send_first,
ompi_communicator_t **newcomm);
static int disconnect(ompi_communicator_t *comm);
static int spawn(int count, char **array_of_commands,
char ***array_of_argv,
@ -78,6 +89,14 @@ static int parse_port_name(char *port_name, char **hnp_uri, char **rml_uri,
static int route_to_port(char *rml_uri, orte_process_name_t *rproc);
static int close_port(char *port_name);
static int finalize(void);
static int dpm_pconnect(char *port,
struct timeval *timeout,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata);
static int dpm_paccept(char *port,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata);
static void dpm_pclose(char *port);
/*
* instantiate the module
@ -94,9 +113,16 @@ ompi_dpm_base_module_t ompi_dpm_orte_module = {
parse_port_name,
route_to_port,
close_port,
finalize
finalize,
dpm_pconnect,
dpm_paccept,
dpm_pclose
};
static void connect_complete(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
/*
* Init the module
*/
@ -104,13 +130,22 @@ static int init(void)
{
OBJ_CONSTRUCT(&ompi_dpm_port_mutex, opal_mutex_t);
next_tag = OMPI_RML_TAG_DYNAMIC;
OBJ_CONSTRUCT(&orte_dpm_acceptors, opal_list_t);
OBJ_CONSTRUCT(&orte_dpm_connectors, opal_list_t);
/* post a receive for pconnect request responses */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
OMPI_RML_PCONNECT_TAG,
ORTE_RML_PERSISTENT,
connect_complete, NULL);
return OMPI_SUCCESS;
}
static int connect_accept ( ompi_communicator_t *comm, int root,
char *port_string, bool send_first,
ompi_communicator_t **newcomm )
static int connect_accept(ompi_communicator_t *comm, int root,
char *port_string, bool send_first,
ompi_communicator_t **newcomm)
{
int size, rsize, rank, rc;
orte_std_cntr_t num_vals;
@ -172,10 +207,6 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
free(hnp_uri); free(rml_uri);
}
/* tell the progress engine to tick the event library more
often, to make sure that the OOB messages get sent */
opal_progress_event_users_increment();
if ( rank == root ) {
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
if (send_first) {
@ -604,10 +635,6 @@ static int connect_accept ( ompi_communicator_t *comm, int root,
*/
exit:
/* done with OOB and such - slow our tick rate again */
opal_progress();
opal_progress_event_users_decrement();
if ( NULL != rprocs ) {
free ( rprocs );
}
@ -682,9 +709,6 @@ static int spawn(int count, char **array_of_commands,
- "soft": see page 92 of MPI-2.
*/
/* make sure the progress engine properly trips the event library */
opal_progress_event_users_increment();
/* setup the job object */
jdata = OBJ_NEW(orte_job_t);
@ -1366,9 +1390,6 @@ static int spawn(int count, char **array_of_commands,
return MPI_ERR_SPAWN;
}
/* clean up */
opal_progress_event_users_decrement();
return OMPI_SUCCESS;
}
@ -1400,8 +1421,6 @@ static int open_port(char *port_name, orte_rml_tag_t given_tag)
int rc, len;
char tag[12];
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
if (NULL == orte_process_info.my_hnp_uri) {
rc = OMPI_ERR_NOT_AVAILABLE;
ORTE_ERROR_LOG(rc);
@ -1415,8 +1434,10 @@ static int open_port(char *port_name, orte_rml_tag_t given_tag)
}
if (ORTE_RML_TAG_INVALID == given_tag) {
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
snprintf(tag, 12, "%d", next_tag);
next_tag++;
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
} else {
snprintf(tag, 12, "%d", given_tag);
}
@ -1439,7 +1460,6 @@ cleanup:
free(rml_uri);
}
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
return rc;
}
@ -1575,5 +1595,559 @@ static int dyn_init(void)
static int finalize(void)
{
OBJ_DESTRUCT(&ompi_dpm_port_mutex);
OPAL_LIST_DESTRUCT(&orte_dpm_acceptors);
OPAL_LIST_DESTRUCT(&orte_dpm_connectors);
return OMPI_SUCCESS;
}
typedef struct {
opal_list_item_t super;
opal_event_t ev;
bool event_active;
uint32_t id;
orte_rml_tag_t tag;
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc;
void *cbdata;
} orte_dpm_prequest_t;
OBJ_CLASS_INSTANCE(orte_dpm_prequest_t,
opal_list_item_t,
NULL, NULL);
static void timeout_cb(int fd, short args, void *cbdata)
{
orte_dpm_prequest_t *req = (orte_dpm_prequest_t*)cbdata;
/* remove the request from the list */
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
opal_list_remove_item(&orte_dpm_connectors, &req->super);
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
/* this connection request failed - notify the caller */
req->cbfunc(MPI_COMM_NULL, NULL, req->cbdata);
/* cleanup */
OBJ_RELEASE(req);
}
static int pack_request(opal_buffer_t *buf, ompi_group_t *group)
{
int rc;
/* pack the MPI info */
ompi_proc_pack(group->grp_proc_pointers, 1, buf);
/* pack our hostname */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.nodename, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack our node rank */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.my_node_rank, 1, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack our local rank */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.my_local_rank, 1, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
#if OPAL_HAVE_HWLOC
/* pack our binding info so other procs can determine our locality */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.cpuset, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
#endif
/* pack the modex entries we have received */
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(buf))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
static void process_request(orte_process_name_t* sender,
opal_buffer_t *buffer,
bool connector,
ompi_communicator_t **newcomm,
ompi_proc_t **proct)
{
ompi_communicator_t *newcomp=MPI_COMM_NULL;
ompi_group_t *group=MPI_COMM_SELF->c_local_group;
ompi_group_t *new_group_pointer;
ompi_proc_t **rprocs=NULL;
ompi_proc_t **new_proc_list;
int new_proc_len;
opal_buffer_t *xfer;
int cnt, rc;
uint32_t id;
char *hostname;
orte_node_rank_t node_rank;
orte_local_rank_t local_rank;
opal_hwloc_locality_t locality;
OPAL_OUTPUT_VERBOSE((2, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: PROCESS REQUEST: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
connector ? "connector" : "acceptor"));
/* if we are the acceptor, unpack the remote peer's request id */
if (!connector) {
cnt=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &id, &cnt, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: PROCESS REQUEST ID: %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id));
}
/* unpack the proc info */
if (OMPI_SUCCESS != (rc = ompi_proc_unpack(buffer, 1, &rprocs, &new_proc_len, &new_proc_list))) {
ORTE_ERROR_LOG(rc);
return;
}
/* If we added new procs, we need to unpack the modex info
* and then call PML add_procs
*/
if (0 < new_proc_len) {
/* unpack the peer's hostname and store it */
cnt=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &hostname, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_HOSTNAME, hostname, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* unpack the node rank */
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &node_rank, &cnt, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_NODERANK, &node_rank, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* unpack the local rank */
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &local_rank, &cnt, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_LOCALRANK, &local_rank, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* compute the locality and store in the database */
#if OPAL_HAVE_HWLOC
{
char *cpuset;
/* unpack and store the cpuset - could be NULL */
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cpuset, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_CPUSET, cpuset, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: setting proc %s cpuset %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), cpuset));
if (0 != strcmp(hostname, orte_process_info.nodename)) {
/* this is on a different node, then mark as non-local */
OPAL_OUTPUT_VERBOSE((5, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: setting proc %s locale NONLOCAL",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
locality = OPAL_PROC_NON_LOCAL;
} else if (NULL == cpuset || NULL == orte_process_info.cpuset) {
/* one or both of us is not bound, so all we can say is we are on the
* same node
*/
locality = OPAL_PROC_ON_NODE;
} else {
/* determine relative location on our node */
locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
orte_process_info.cpuset,
cpuset);
OPAL_OUTPUT_VERBOSE((5, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: setting proc %s locale %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender),
opal_hwloc_base_print_locality(locality)));
}
}
#else
if (0 != strcmp(hostname, orte_process_info.nodename)) {
/* this is on a different node, then mark as non-local */
OPAL_OUTPUT_VERBOSE((5, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: setting proc %s locale NONLOCAL",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
locality = OPAL_PROC_NON_LOCAL;
} else {
/* must be on our node */
locality = OPAL_PROC_ON_NODE;
}
#endif
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((5, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: adding modex entry for proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* process the modex info */
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(sender, buffer))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: adding procs",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (OMPI_SUCCESS != (rc = MCA_PML_CALL(add_procs(new_proc_list, new_proc_len)))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:pconnect new procs added",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
/* if we are the acceptor, we now have to send the requestor our
* info so we can collaborate on setup of the communicator - we
* must wait until this point so the route can be initiated, if
* required
*/
if (!connector) {
xfer = OBJ_NEW(opal_buffer_t);
/* pack the request id */
if (ORTE_SUCCESS != (rc = opal_dss.pack(xfer, &id, 1, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
goto cleanup;
}
/* pack the remaining info */
if (ORTE_SUCCESS != pack_request(xfer, group)) {
OBJ_RELEASE(xfer);
goto cleanup;
}
/* send to requestor */
if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(sender, xfer, OMPI_RML_PCONNECT_TAG,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
goto cleanup;
}
}
/* allocate a new group */
new_group_pointer=ompi_group_allocate(1);
if( NULL == new_group_pointer ) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* put group element in the list */
new_group_pointer->grp_proc_pointers[0] = rprocs[0];
/* increment proc reference counter */
ompi_group_increment_proc_count(new_group_pointer);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess setting up communicator",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* set up communicator structure */
rc = ompi_comm_set(&newcomp, /* new comm */
MPI_COMM_SELF, /* old comm */
1, /* local_size */
NULL, /* local_procs */
1, /* remote_size */
NULL, /* remote_procs */
NULL, /* attrs */
MPI_COMM_SELF->error_handler, /* error handler */
NULL, /* topo component */
group, /* local group */
new_group_pointer /* remote group */
);
if (NULL == newcomp) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
ompi_group_decrement_proc_count (new_group_pointer);
OBJ_RELEASE(new_group_pointer);
new_group_pointer = MPI_GROUP_NULL;
/* return the communicator */
*newcomm = newcomp;
*proct = rprocs[0];
rc = OMPI_SUCCESS;
cleanup:
if (NULL != rprocs) {
free(rprocs);
}
if (OMPI_SUCCESS != rc && MPI_COMM_NULL == newcomp) {
OBJ_RELEASE(newcomp);
}
}
static void connect_complete(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
ompi_communicator_t *newcomm=MPI_COMM_NULL;
ompi_proc_t *proct=NULL;
orte_dpm_prequest_t *req=NULL, *rptr;
int rc, cnt;
uint32_t id;
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:pconnect: starting",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* unpack the request id */
cnt=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &id, &cnt, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* find this request on the list */
req = NULL;
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
OPAL_LIST_FOREACH(rptr, &orte_dpm_connectors, orte_dpm_prequest_t) {
if (id == rptr->id) {
req = rptr;
break;
}
}
if (NULL == req) {
/* unknown request */
opal_output(0, "%s dpm:pconnect: received unknown id %u from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id,
ORTE_NAME_PRINT(sender));
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
return;
}
/* remove the request from the list */
opal_list_remove_item(&orte_dpm_connectors, &req->super);
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:pconnect: found request %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id));
if (req->event_active) {
/* release the timeout */
opal_event_del(&req->ev);
}
/* process the request - as the initiator, we will send first
* for communicator creation
*/
process_request(sender, buffer, true, &newcomm, &proct);
/* notify the MPI layer */
req->cbfunc(newcomm, proct, req->cbdata);
cleanup:
if (NULL != req) {
OBJ_RELEASE(req);
}
}
static int dpm_pconnect(char *port,
struct timeval *timeout,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata)
{
char *hnp_uri, *rml_uri;
orte_rml_tag_t tag;
int rc;
orte_dpm_prequest_t *connector;
orte_process_name_t peer;
ompi_group_t *group=MPI_COMM_SELF->c_local_group;
opal_buffer_t *buf;
/* separate the string into the HNP and RML URI and tag */
if (ORTE_SUCCESS != (rc = parse_port_name(port, &hnp_uri, &rml_uri, &tag))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* extract the originating proc's name */
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &peer, NULL))) {
ORTE_ERROR_LOG(rc);
free(hnp_uri); free(rml_uri);
return rc;
}
/* make sure we can route rml messages to the destination job */
if (ORTE_SUCCESS != (rc = route_to_port(hnp_uri, &peer))) {
ORTE_ERROR_LOG(rc);
free(hnp_uri); free(rml_uri);
return rc;
}
opal_output(0, "dpm:pconnect requesting connect to %s on tag %d",
ORTE_NAME_PRINT(&peer), tag);
free(hnp_uri); free(rml_uri);
/* create a message to the remote peer */
buf = OBJ_NEW(opal_buffer_t);
/* track the connection request */
connector = OBJ_NEW(orte_dpm_prequest_t);
connector->tag = tag;
connector->cbfunc = cbfunc;
connector->cbdata = cbdata;
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
connector->id = next_preq++;
opal_list_append(&orte_dpm_connectors, &connector->super);
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
/* pack my request id */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &connector->id, 1, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
opal_list_remove_item(&orte_dpm_connectors, &connector->super);
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
OBJ_RELEASE(connector);
return rc;
}
/* pack the request info */
if (ORTE_SUCCESS != pack_request(buf, group)) {
OBJ_RELEASE(buf);
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
opal_list_remove_item(&orte_dpm_connectors, &connector->super);
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
OBJ_RELEASE(connector);
return rc;
}
/* setup the timeout, if requested */
if (NULL != timeout) {
opal_output(0, "dpm:pconnect setting timeout");
opal_event_evtimer_set(orte_event_base,
&connector->ev, timeout_cb, connector);
opal_event_set_priority(&connector->ev, ORTE_ERROR_PRI);
opal_event_evtimer_add(&connector->ev, timeout);
connector->event_active = true;
} else {
connector->event_active = false;
}
/* send it to our new friend */
OPAL_OUTPUT_VERBOSE((2, ompi_dpm_base_framework.framework_output,
"%s dpm:pconnect sending connect to %s on tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer), tag));
if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&peer, buf, tag,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
}
return rc;
}
static void paccept_recv(int status,
struct orte_process_name_t* peer,
struct opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata)
{
orte_dpm_prequest_t *acceptor = (orte_dpm_prequest_t*)cbdata;
ompi_communicator_t *newcomm=MPI_COMM_NULL;
ompi_proc_t *proct=NULL;
OPAL_OUTPUT_VERBOSE((2, ompi_dpm_base_framework.framework_output,
"%s dpm:paccept recvd request from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer)));
/* process the request - as the acceptor, we will recv first
* on communicator formation
*/
process_request(peer, buffer, false, &newcomm, &proct);
/* if we succeeded, notify the MPI layer */
if (MPI_COMM_NULL != newcomm) {
acceptor->cbfunc(newcomm, proct, acceptor->cbdata);
}
}
static int dpm_paccept(char *port,
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc,
void *cbdata)
{
orte_rml_tag_t tag;
int rc;
orte_dpm_prequest_t *acceptor;
/* extract the RML tag from the port name - it's the only part we need */
if (OMPI_SUCCESS != (rc = parse_port_name(port, NULL, NULL, &tag))) {
return rc;
}
/* track the accept request */
acceptor = OBJ_NEW(orte_dpm_prequest_t);
acceptor->tag = tag;
acceptor->cbfunc = cbfunc;
acceptor->cbdata = cbdata;
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
opal_list_append(&orte_dpm_acceptors, &acceptor->super);
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
/* register a recv for this tag */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_PERSISTENT,
paccept_recv, acceptor);
return OMPI_SUCCESS;
}
static void dpm_pclose(char *port)
{
orte_rml_tag_t tag;
orte_dpm_prequest_t *rptr;
/* extract the RML tag from the port name - it's the only part we need */
if (OMPI_SUCCESS != parse_port_name(port, NULL, NULL, &tag)) {
return;
}
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
OPAL_LIST_FOREACH(rptr, &orte_dpm_acceptors, orte_dpm_prequest_t) {
if (tag == rptr->tag) {
/* found it */
opal_list_remove_item(&orte_dpm_acceptors, &rptr->super);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, tag);
OBJ_RELEASE(rptr);
break;
}
}
OPAL_THREAD_UNLOCK(&ompi_dpm_port_mutex);
}

Просмотреть файл

@ -208,6 +208,8 @@ BEGIN_C_DECLS
#define OMPI_RML_TAG_OFACM OMPI_RML_TAG_BASE+11
#define OMPI_RML_TAG_XOFACM OMPI_RML_TAG_BASE+12
#define OMPI_RML_PCONNECT_TAG OMPI_RML_TAG_BASE+13
#define OMPI_RML_TAG_DYNAMIC OMPI_RML_TAG_BASE+200
/*

Просмотреть файл

@ -384,6 +384,9 @@ void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata)
}
cleanup:
if (NULL == cbdata) {
return;
}
/* cleanup the list, but don't release the
* collective object as it was passed into us
*/

Просмотреть файл

@ -540,11 +540,17 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
OBJ_RELEASE(peer->recv_msg);
return;
}
/* yes - post the message for retransmission */
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
"%s ROUTING TO %s FROM HERE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&relay->name));
/* if this came from a different job family, then ensure
* we know how to return
*/
if (ORTE_JOB_FAMILY(peer->recv_msg->hdr.origin.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
orte_routed.update_route(&(peer->recv_msg->hdr.origin), &peer->name);
}
/* post the message for retransmission */
MCA_OOB_TCP_QUEUE_RELAY(peer->recv_msg, relay);
OBJ_RELEASE(peer->recv_msg);
}

Просмотреть файл

@ -81,10 +81,17 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_recv_t);
opal_list_append(&(p)->send_queue, &(s)->super); \
} \
if ((f)) { \
/* ensure the send event is active */ \
if (!(p)->send_ev_active) { \
opal_event_add(&(p)->send_event, 0); \
(p)->send_ev_active = true; \
/* if we aren't connected, then start connecting */ \
if (MCA_OOB_TCP_CONNECTED != (p)->state) { \
(p)->state = MCA_OOB_TCP_CONNECTING; \
ORTE_ACTIVATE_TCP_CONN_STATE((p)->mod, (p), \
mca_oob_tcp_peer_try_connect); \
} else { \
/* ensure the send event is active */ \
if (!(p)->send_ev_active) { \
opal_event_add(&(p)->send_event, 0); \
(p)->send_ev_active = true; \
} \
} \
} \
}while(0);

Просмотреть файл

@ -1,4 +1,4 @@
PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv accept connect hello_output hello_show_help crisscross read_write ziatest slave reduce-hang ziaprobe ziatest bcast_loop parallel_w8 parallel_w64 parallel_r8 parallel_r64 sio sendrecv_blaster early_abort debugger singleton_client_server intercomm_create spawn_tree init-exit77 mpi_info info_spawn server client
PROGS = mpi_no_op mpi_barrier hello hello_nodename abort multi_abort simple_spawn concurrent_spawn spawn_multiple mpi_spin delayed_abort loop_spawn loop_child bad_exit pubsub hello_barrier segv accept connect hello_output hello_show_help crisscross read_write ziatest slave reduce-hang ziaprobe ziatest bcast_loop parallel_w8 parallel_w64 parallel_r8 parallel_r64 sio sendrecv_blaster early_abort debugger singleton_client_server intercomm_create spawn_tree init-exit77 mpi_info info_spawn server client paccept pconnect
all: $(PROGS)

102
orte/test/mpi/paccept.c Обычный файл
Просмотреть файл

@ -0,0 +1,102 @@
/* -*- C -*-
*
* $HEADER$
*
*/
#include "ompi_config.h"
#include <stdio.h>
#include "mpi.h"
#include "ompi/mca/dpm/dpm.h"
static ompi_communicator_t *newcomp;
static ompi_proc_t *sender;
static void xnt(ompi_communicator_t *newcomm,
ompi_proc_t *remote_proc,
void *cbdata)
{
bool *lock = (bool*)cbdata;
newcomp = newcomm;
sender = remote_proc;
*lock = false;
}
int main(int argc, char* argv[])
{
char port[1024];
bool lock;
FILE *fp;
int rank, rc;
int msg=0;
/* program requires an argument specifying the file where
* the connection info is to be stored
*/
if (2 != argc) {
fprintf(stderr, "Usage: paccept <filename>\n");
exit(1);
}
MPI_Init(&argc, &argv);
/* get a port */
if (OMPI_SUCCESS != ompi_dpm.open_port(port, ORTE_RML_TAG_INVALID)) {
fprintf(stderr, "Failed to open port\n");
goto cleanup;
}
/* put it in the file */
fp = fopen(argv[1], "w");
fprintf(fp, "%s\n", port);
fclose(fp);
/* register the accept */
lock = true;
if (OMPI_SUCCESS != ompi_dpm.paccept(port, xnt, &lock)) {
fprintf(stderr, "Failed to setup accept\n");
goto cleanup;
}
/* wait for completion */
OMPI_WAIT_FOR_COMPLETION(lock);
/* allocate comm_cid */
rank = ompi_comm_rank(MPI_COMM_SELF);
rc = ompi_comm_nextcid(newcomp, /* new communicator */
MPI_COMM_SELF, /* old communicator */
NULL, /* bridge comm */
&rank, /* local leader */
&sender->proc_name, /* remote leader */
OMPI_COMM_CID_INTRA_OOB, /* mode */
false); /* send or recv first */
if ( OMPI_SUCCESS != rc ) {
fprintf(stderr, "Failed to negotiate cid\n");
goto cleanup;
}
/* activate comm and init coll-component */
rc = ompi_comm_activate(&newcomp, /* new communicator */
MPI_COMM_SELF, /* old communicator */
NULL, /* bridge comm */
&rank, /* local leader */
&sender->proc_name, /* remote leader */
OMPI_COMM_CID_INTRA_OOB, /* mode */
false); /* send or recv first */
if ( OMPI_SUCCESS != rc ) {
fprintf(stderr, "Failed to activate communicator\n");
goto cleanup;
}
fprintf(stderr, "HANDSHAKE COMPLETE\n");
MPI_Recv(&msg, 1, MPI_INT, 0, 1, newcomp, MPI_STATUS_IGNORE);
MPI_Comm_disconnect(&newcomp);
fprintf(stderr, "MESSAGE RECVD: %d\n", msg);
ompi_dpm.pclose(port);
cleanup:
MPI_Finalize();
return 0;
}

98
orte/test/mpi/pconnect.c Обычный файл
Просмотреть файл

@ -0,0 +1,98 @@
/* -*- C -*-
*
* $HEADER$
*
*/
#include "ompi_config.h"
#include <stdio.h>
#include "mpi.h"
#include "ompi/mca/dpm/dpm.h"
static ompi_communicator_t *newcomp;
static ompi_proc_t *sender;
static void xnt(ompi_communicator_t *newcomm,
ompi_proc_t *remote_proc,
void *cbdata)
{
bool *lock = (bool*)cbdata;
newcomp = newcomm;
sender = remote_proc;
*lock = false;
}
int main(int argc, char* argv[])
{
char port[1024];
bool lock;
FILE *fp;
int rank, rc;
int msg;
/* program requires an argument specifying the file where
* the connection info is to be found
*/
if (2 != argc) {
fprintf(stderr, "Usage: pconnect <filename>\n");
exit(1);
}
MPI_Init(&argc, &argv);
/* read the file */
fp = fopen(argv[1], "r");
fgets(port, 1024, fp);
port[strlen(port)-1] = '\0'; /* remove newline */
fclose(fp);
/* start the connect */
lock = true;
if (OMPI_SUCCESS != ompi_dpm.pconnect(port, NULL, xnt, &lock)) {
fprintf(stderr, "Failed to start connect\n");
goto cleanup;
}
/* wait for completion */
OMPI_WAIT_FOR_COMPLETION(lock);
/* allocate comm_cid */
rank = ompi_comm_rank(MPI_COMM_SELF);
rc = ompi_comm_nextcid(newcomp, /* new communicator */
MPI_COMM_SELF, /* old communicator */
NULL, /* bridge comm */
&rank, /* local leader */
&sender->proc_name, /* remote leader */
OMPI_COMM_CID_INTRA_OOB, /* mode */
true); /* send or recv first */
if ( OMPI_SUCCESS != rc ) {
fprintf(stderr, "Failed to negotiate cid\n");
goto cleanup;
}
/* activate comm and init coll-component */
rc = ompi_comm_activate(&newcomp, /* new communicator */
MPI_COMM_SELF, /* old communicator */
NULL, /* bridge comm */
&rank, /* local leader */
&sender->proc_name, /* remote leader */
OMPI_COMM_CID_INTRA_OOB, /* mode */
true); /* send or recv first */
if ( OMPI_SUCCESS != rc ) {
fprintf(stderr, "Failed to activate communicator\n");
goto cleanup;
}
fprintf(stderr, "HANDSHAKE COMPLETE\n");
msg = 38;
MPI_Send(&msg, 1, MPI_INT, 0, 1, newcomp);
MPI_Comm_disconnect(&newcomp);
fprintf(stderr, "MESSAGE SENT\n");
cleanup:
MPI_Finalize();
return 0;
}