1
1
Do some code cleanup in the connect/accept code. Ensure that the OMPI
layer has access to the PMIx identifier for the process. Add macros for
converting PMIx names to/from strings. Cleanup a few of the simple test
programs. Add a little more info to a btl/tcp error message.

Signed-off-by: Ralph Castain <rhc@pmix.org>
Этот коммит содержится в:
Ralph Castain 2020-04-08 08:37:25 -07:00
родитель 2c0b9bd1e4
Коммит a210f8046f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B63B630167D26BB5
9 изменённых файлов: 143 добавлений и 163 удалений

Просмотреть файл

@ -106,8 +106,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
pmix_proc_t *procs, pxproc;
size_t nprocs, n;
pmix_status_t pret;
opal_namelist_t *nm;
opal_jobid_t jobid;
opal_proclist_t *plt;
ompi_communicator_t *newcomp=MPI_COMM_NULL;
ompi_proc_t *proc;
@ -131,24 +130,14 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
* procs is used to complete construction of the intercommunicator. */
/* everyone constructs the list of members from their communicator */
pname.jobid = OMPI_PROC_MY_NAME->jobid;
pname.vpid = OPAL_VPID_WILDCARD;
if (MPI_COMM_WORLD == comm) {
pname.jobid = OMPI_PROC_MY_NAME->jobid;
pname.vpid = OPAL_VPID_WILDCARD;
rc = opal_convert_process_name_to_string(&nstring, &pname);
if (OPAL_SUCCESS != rc) {
return OMPI_ERROR;
}
PMIX_LOAD_PROCID(&pxproc, ompi_process_info.myprocid.nspace, PMIX_RANK_WILDCARD);
OPAL_PMIX_CONVERT_PROCT_TO_STRING(&nstring, &pxproc);
opal_argv_append_nosize(&members, nstring);
free(nstring);
/* have to add the number of procs in the job so the remote side
* can correctly add the procs by computing their names, and our nspace
* so they can update their records */
nstring = opal_jobid_print(pname.jobid);
if (NULL == nstring) {
opal_argv_free(members);
return OMPI_ERROR;
}
opal_argv_append_nosize(&members, nstring);
/* add the number of procs in this job */
(void)opal_asprintf(&nstring, "%d", size);
opal_argv_append_nosize(&members, nstring);
free(nstring);
@ -176,22 +165,10 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
} else {
proc_name = proc_list[i]->super.proc_name;
}
rc = opal_convert_process_name_to_string(&nstring, &proc_name);
if (OPAL_SUCCESS != rc) {
if (!dense) {
free(proc_list);
proc_list = NULL;
}
return OMPI_ERROR;
}
OPAL_PMIX_CONVERT_NAME(&pxproc, &proc_name);
OPAL_PMIX_CONVERT_PROCT_TO_STRING(&nstring, &pxproc);
opal_argv_append_nosize(&members, nstring);
free(nstring);
nstring = opal_jobid_print(pname.jobid);
if (OPAL_SUCCESS != rc) {
opal_argv_free(members);
return OMPI_ERROR;
}
opal_argv_append_nosize(&members, nstring);
}
if (!dense) {
free(proc_list);
@ -260,64 +237,18 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
* starting with our own members */
OBJ_CONSTRUCT(&mlist, opal_list_t);
for (i=0; NULL != members[i]; i++) {
nm = OBJ_NEW(opal_namelist_t);
if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) {
OMPI_ERROR_LOG(rc);
opal_argv_free(members);
free(rport);
OPAL_LIST_DESTRUCT(&mlist);
goto exit;
}
/* step over the nspace */
++i;
if (NULL == members[i]) {
/* this shouldn't happen and is an error */
OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
OPAL_LIST_DESTRUCT(&mlist);
opal_argv_free(members);
free(rport);
rc = OMPI_ERR_BAD_PARAM;
goto exit;
}
/* if the rank is wildcard, then we need to add all procs
* in that job to the list */
if (OPAL_VPID_WILDCARD == nm->name.vpid) {
jobid = nm->name.jobid;
OBJ_RELEASE(nm);
for (k=0; k < size; k++) {
nm = OBJ_NEW(opal_namelist_t);
nm->name.jobid = jobid;
nm->name.vpid = k;
opal_list_append(&mlist, &nm->super);
}
/* now step over the size */
if (NULL == members[i+1]) {
/* this shouldn't happen and is an error */
OMPI_ERROR_LOG(OMPI_ERR_BAD_PARAM);
OPAL_LIST_DESTRUCT(&mlist);
opal_argv_free(members);
free(rport);
rc = OMPI_ERR_BAD_PARAM;
goto exit;
}
OPAL_PMIX_CONVERT_STRING_TO_PROCT(&pxproc, members[i]);
plt = OBJ_NEW(opal_proclist_t);
memcpy(&plt->procid, &pxproc, sizeof(pmix_proc_t));
opal_list_append(&mlist, &plt->super);
/* if the rank is wildcard, then we need to skip
* the next position */
if (PMIX_RANK_WILDCARD == pxproc.rank) {
++i;
} else {
opal_list_append(&mlist, &nm->super);
}
}
opal_argv_free(members);
members = NULL;
/* convert the list of members to a pmix_proc_t array */
nprocs = opal_list_get_size(&mlist);
PMIX_PROC_CREATE(procs, nprocs);
n = 0;
OPAL_LIST_FOREACH(nm, &mlist, opal_namelist_t) {
OPAL_PMIX_CONVERT_NAME(&procs[n], &nm->name);
++n;
}
OPAL_LIST_DESTRUCT(&mlist);
/* rport contains a colon-delimited list
* of process names for the remote procs - convert it
* into an argv array */
@ -330,29 +261,13 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
OBJ_CONSTRUCT(&rlist, opal_list_t);
for (i=0; NULL != members[i]; i++) {
nm = OBJ_NEW(opal_namelist_t);
if (OPAL_SUCCESS != (rc = opal_convert_string_to_process_name(&nm->name, members[i]))) {
OMPI_ERROR_LOG(rc);
opal_argv_free(members);
OPAL_LIST_DESTRUCT(&ilist);
OPAL_LIST_DESTRUCT(&rlist);
PMIX_PROC_FREE(procs, nprocs);
goto exit;
}
/* next entry is the nspace - register it */
++i;
if (NULL == members[i]) {
OMPI_ERROR_LOG(OMPI_ERR_NOT_SUPPORTED);
opal_argv_free(members);
OPAL_LIST_DESTRUCT(&ilist);
OPAL_LIST_DESTRUCT(&rlist);
PMIX_PROC_FREE(procs, nprocs);
goto exit;
}
if (OPAL_VPID_WILDCARD == nm->name.vpid) {
jobid = nm->name.jobid;
OBJ_RELEASE(nm);
/* if the vpid is wildcard, then we are including all ranks
OPAL_PMIX_CONVERT_STRING_TO_PROCT(&pxproc, members[i]);
plt = OBJ_NEW(opal_proclist_t);
memcpy(&plt->procid, &pxproc, sizeof(pmix_proc_t));
opal_list_append(&mlist, &plt->super);
if (PMIX_RANK_WILDCARD == pxproc.rank) {
/* if the rank is wildcard, then we are including all ranks
* of that job, and the next entry in members should be the
* number of procs in the job */
if (NULL == members[i+1]) {
@ -361,19 +276,25 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
opal_argv_free(members);
OPAL_LIST_DESTRUCT(&ilist);
OPAL_LIST_DESTRUCT(&rlist);
OPAL_LIST_DESTRUCT(&mlist);
rc = OMPI_ERR_BAD_PARAM;
PMIX_PROC_FREE(procs, nprocs);
goto exit;
}
rsize = strtoul(members[i+1], NULL, 10);
++i;
for (k=0; k < rsize; k++) {
nm = OBJ_NEW(opal_namelist_t);
nm->name.jobid = jobid;
nm->name.vpid = k;
opal_list_append(&mlist, &nm->super);
pxproc.rank = k;
OPAL_PMIX_CONVERT_PROCT(rc, &pname, &pxproc);
if (OPAL_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
opal_argv_free(members);
OPAL_LIST_DESTRUCT(&ilist);
OPAL_LIST_DESTRUCT(&rlist);
OPAL_LIST_DESTRUCT(&mlist);
goto exit;
}
/* see if this needs to be added to our ompi_proc_t array */
proc = ompi_proc_find_and_add(&nm->name, &isnew);
proc = ompi_proc_find_and_add(&pname, &isnew);
if (isnew) {
cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
cd->p = proc;
@ -385,9 +306,17 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
opal_list_append(&rlist, &cd->super);
}
} else {
opal_list_append(&mlist, &nm->super);
OPAL_PMIX_CONVERT_PROCT(rc, &pname, &pxproc);
if (OPAL_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
opal_argv_free(members);
OPAL_LIST_DESTRUCT(&ilist);
OPAL_LIST_DESTRUCT(&rlist);
OPAL_LIST_DESTRUCT(&mlist);
goto exit;
}
/* see if this needs to be added to our ompi_proc_t array */
proc = ompi_proc_find_and_add(&nm->name, &isnew);
proc = ompi_proc_find_and_add(&pname, &isnew);
if (isnew) {
cd = OBJ_NEW(ompi_dpm_proct_caddy_t);
cd->p = proc;
@ -401,6 +330,16 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
}
opal_argv_free(members);
/* convert the list of members to a pmix_proc_t array */
nprocs = opal_list_get_size(&mlist);
PMIX_PROC_CREATE(procs, nprocs);
n = 0;
OPAL_LIST_FOREACH(plt, &mlist, opal_proclist_t) {
memcpy(&procs[n], &plt->procid, sizeof(pmix_proc_t));
++n;
}
OPAL_LIST_DESTRUCT(&mlist);
/* tell the host RTE to connect us - this will download
* all known data for the nspace's of participating procs
* so that add_procs will not result in a slew of lookups */

Просмотреть файл

@ -62,6 +62,7 @@ opal_process_name_t pmix_name_invalid = {UINT32_MAX, UINT32_MAX};
hwloc_cpuset_t ompi_proc_applied_binding = NULL;
pmix_process_info_t pmix_process_info = {
.my_name = {OPAL_JOBID_INVALID, OPAL_VPID_INVALID},
.myprocid = {{0}, PMIX_RANK_INVALID},
.nodename = NULL,
.pid = 0,
.top_session_dir = NULL,
@ -84,8 +85,6 @@ pmix_process_info_t pmix_process_info = {
bool pmix_proc_is_bound = false;
bool ompi_singleton = false;
static pmix_proc_t myprocid;
static int _setup_top_session_dir(char **sdir);
static int _setup_job_session_dir(char **sdir);
static int _setup_proc_session_dir(char **sdir);
@ -550,7 +549,7 @@ int ompi_rte_init(int *pargc, char ***pargv)
opal_pmix_setup_nspace_tracker();
/* initialize the selected module */
if (!PMIx_Initialized() && (PMIX_SUCCESS != (ret = PMIx_Init(&myprocid, NULL, 0)))) {
if (!PMIx_Initialized() && (PMIX_SUCCESS != (ret = PMIx_Init(&pmix_process_info.myprocid, NULL, 0)))) {
/* if we get PMIX_ERR_UNREACH indicating that we cannot reach the
* server, then we assume we are operating as a singleton */
if (PMIX_ERR_UNREACH == ret) {
@ -565,7 +564,7 @@ int ompi_rte_init(int *pargc, char ***pargv)
}
/* setup the process name fields - also registers the new nspace */
OPAL_PMIX_CONVERT_PROCT(rc, &pname, &myprocid);
OPAL_PMIX_CONVERT_PROCT(rc, &pname, &pmix_process_info.myprocid);
if (OPAL_SUCCESS != rc) {
return rc;
}

Просмотреть файл

@ -245,6 +245,7 @@ typedef uint16_t ompi_local_rank_t;
typedef struct {
opal_process_name_t my_name;
pmix_proc_t myprocid;
char *nodename;
pid_t pid;
char *top_session_dir;

Просмотреть файл

@ -398,19 +398,19 @@ mca_btl_tcp_endpoint_send_blocking(mca_btl_base_endpoint_t* btl_endpoint,
* Send the globally unique identifier for this process to a endpoint on
* a newly connected socket.
*/
static int
static int
mca_btl_tcp_endpoint_send_connect_ack(mca_btl_base_endpoint_t* btl_endpoint)
{
opal_process_name_t guid = opal_proc_local_get()->proc_name;
OPAL_PROCESS_NAME_HTON(guid);
mca_btl_tcp_endpoint_hs_msg_t hs_msg;
opal_string_copy(hs_msg.magic_id, mca_btl_tcp_magic_id_string,
sizeof(hs_msg.magic_id));
hs_msg.guid = guid;
if(sizeof(hs_msg) !=
mca_btl_tcp_endpoint_send_blocking(btl_endpoint,
if(sizeof(hs_msg) !=
mca_btl_tcp_endpoint_send_blocking(btl_endpoint,
&hs_msg, sizeof(hs_msg))) {
opal_show_help("help-mpi-btl-tcp.txt", "client handshake fail",
true, opal_process_info.nodename,
@ -649,8 +649,8 @@ static int mca_btl_tcp_endpoint_recv_connect_ack(mca_btl_base_endpoint_t* btl_en
* to be able to exchange the opal_process_name_t over the network.
*/
if (0 != opal_compare_proc(btl_proc->proc_opal->proc_name, guid)) {
BTL_ERROR(("received unexpected process identifier %s",
OPAL_NAME_PRINT(guid)));
BTL_ERROR(("received unexpected process identifier: got %s expected %s",
OPAL_NAME_PRINT(guid), OPAL_NAME_PRINT(btl_proc->proc_opal->proc_name)));
btl_endpoint->endpoint_state = MCA_BTL_TCP_FAILED;
mca_btl_tcp_endpoint_close(btl_endpoint);
return OPAL_ERR_UNREACH;
@ -758,9 +758,9 @@ static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpo
mca_btl_tcp_proc_tosocks(btl_endpoint->endpoint_addr, &endpoint_addr);
/* Bind the socket to one of the addresses associated with
* this btl module. This sets the source IP to one of the
* addresses shared in modex, so that the destination rank
* can properly pair btl modules, even in cases where Linux
* this btl module. This sets the source IP to one of the
* addresses shared in modex, so that the destination rank
* can properly pair btl modules, even in cases where Linux
* might do something unexpected with routing */
if (endpoint_addr.ss_family == AF_INET) {
assert(NULL != &btl_endpoint->endpoint_btl->tcp_ifaddr);
@ -965,7 +965,7 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void* user)
the magic string ID failed). recv_connect_ack already cleaned
up the socket. */
/* If we get OPAL_ERROR, the other end closed the connection
* because it has initiated a symetrical connexion on its end.
* because it has initiated a symetrical connexion on its end.
* recv_connect_ack already cleaned up the socket. */
}
else {

Просмотреть файл

@ -137,21 +137,28 @@ int opal_pmix_convert_nspace(opal_jobid_t *jobid, pmix_nspace_t nspace)
return OPAL_SUCCESS;
}
if (NULL != strstr(nspace, "JOBID_WILDCARD")) {
*jobid = OPAL_JOBID_WILDCARD;
if (NULL != jobid) {
*jobid = OPAL_JOBID_WILDCARD;
}
return OPAL_SUCCESS;
}
if (NULL != strstr(nspace, "JOBID_INVALID")) {
*jobid = OPAL_JOBID_INVALID;
if (NULL != jobid) {
*jobid = OPAL_JOBID_INVALID;
}
return OPAL_SUCCESS;
}
/* cycle across our list of known jobids */
/* cycle across our list of known nspace's */
OPAL_LIST_FOREACH(nptr, &localnspaces, opal_nptr_t) {
if (PMIX_CHECK_NSPACE(nspace, nptr->nspace)) {
*jobid = nptr->jobid;
if (NULL != jobid) {
*jobid = nptr->jobid;
}
return OPAL_SUCCESS;
}
}
/* if we get here, we don't know this nspace */
/* find the "." at the end that indicates the child job */
if (NULL != (p = strrchr(nspace, '.'))) {
@ -167,7 +174,9 @@ int opal_pmix_convert_nspace(opal_jobid_t *jobid, pmix_nspace_t nspace)
/* now compress to 16-bits */
jobfam = (uint16_t)(((0x0000ffff & (0xffff0000 & hash32) >> 16)) ^ (0x0000ffff & hash32));
jid = (0xffff0000 & ((uint32_t)jobfam << 16)) | (0x0000ffff & localjob);
*jobid = jid;
if (NULL != jobid) {
*jobid = jid;
}
/* save this jobid/nspace pair */
nptr = OBJ_NEW(opal_nptr_t);
nptr->jobid = jid;
@ -956,3 +965,7 @@ static void infoitdecon(opal_info_item_t *p)
OBJ_CLASS_INSTANCE(opal_info_item_t,
opal_list_item_t,
infoitmcon, infoitdecon);
OBJ_CLASS_INSTANCE(opal_proclist_t,
opal_list_item_t,
NULL, NULL);

Просмотреть файл

@ -64,6 +64,14 @@ typedef struct {
} opal_info_item_t;
OBJ_CLASS_DECLARATION(opal_info_item_t);
/* define the equivalent to opal_namelist_t for pmix_proc_t */
typedef struct {
opal_list_item_t super;
pmix_proc_t procid;
} opal_proclist_t;
OBJ_CLASS_DECLARATION(opal_proclist_t);
typedef opal_cond_t opal_pmix_condition_t;
typedef struct {
@ -599,18 +607,26 @@ OPAL_DECLSPEC int opal_pmix_convert_nspace(opal_jobid_t *jobid, pmix_nspace_t ns
OPAL_DECLSPEC void opal_pmix_setup_nspace_tracker(void);
OPAL_DECLSPEC void opal_pmix_finalize_nspace_tracker(void);
#define OPAL_SCHEMA_DELIMITER_CHAR '.'
#define OPAL_SCHEMA_WILDCARD_CHAR '*'
#define OPAL_SCHEMA_WILDCARD_STRING "*"
#define OPAL_SCHEMA_INVALID_CHAR '$'
#define OPAL_SCHEMA_INVALID_STRING "$"
/* convert jobid to nspace */
#define OPAL_PMIX_CONVERT_JOBID(n, j) \
opal_pmix_convert_jobid((n), (j))
/* convert vpid to rank */
#define OPAL_PMIX_CONVERT_VPID(r, v) \
do { \
if (OPAL_VPID_WILDCARD == (v)) { \
(r) = PMIX_RANK_WILDCARD; \
} else { \
(r) = (v); \
} \
#define OPAL_PMIX_CONVERT_VPID(r, v) \
do { \
if (OPAL_VPID_WILDCARD == (v)) { \
(r) = PMIX_RANK_WILDCARD; \
} else if (OPAL_VPID_INVALID == (v)) { \
(r) = PMIX_RANK_INVALID; \
} else { \
(r) = (v); \
} \
} while(0)
/* convert opal_process_name_t to pmix_proc_t */
@ -646,6 +662,33 @@ OPAL_DECLSPEC void opal_pmix_finalize_nspace_tracker(void);
} \
} while(0)
#define OPAL_PMIX_CONVERT_PROCT_TO_STRING(s, p) \
do { \
if (PMIX_RANK_WILDCARD == (p)->rank) { \
(void)opal_asprintf((s), "%s.*", (p)->nspace); \
} else if (PMIX_RANK_INVALID == (p)->rank) { \
(void)opal_asprintf((s), "%s.$", (p)->nspace); \
} else { \
(void)opal_asprintf((s), "%s.%u", (p)->nspace, (p)->rank); \
} \
} while(0)
#define OPAL_PMIX_CONVERT_STRING_TO_PROCT(p, s) \
do { \
char *_ptr; \
_ptr = strrchr((s), '.'); \
*_ptr = '\0'; \
_ptr++; \
PMIX_LOAD_NSPACE((p)->nspace, (s)); \
if ('*' == *_ptr) { \
(p)->rank = PMIX_RANK_WILDCARD; \
} else if ('$' == *_ptr) { \
(p)->rank = PMIX_RANK_INVALID; \
} else { \
(p)->rank = strtoul(_ptr, NULL, 10); \
} \
} while(0)
OPAL_DECLSPEC void opal_pmix_value_load(pmix_value_t *v,
opal_value_t *kv);

Просмотреть файл

@ -6,34 +6,18 @@
*/
#include <stdio.h>
#include "opal/mca/hwloc/base/base.h"
#include "mpi.h"
#include "orte/util/proc_info.h"
int main(int argc, char* argv[])
{
int rank, size, rc;
hwloc_cpuset_t cpus;
char *bindings = NULL;
pid_t pid;
int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
pid = getpid();
printf("[%lu] Rank %d: getting topology\n", (unsigned long)pid, rank);
fflush(stdout);
if (OPAL_SUCCESS == opal_hwloc_base_get_topology()) {
cpus = hwloc_bitmap_alloc();
rc = hwloc_get_cpubind(opal_hwloc_topology, cpus, HWLOC_CPUBIND_PROCESS);
hwloc_bitmap_list_asprintf(&bindings, cpus);
}
printf("Hello, World, I am %d of %d [%d local peers]: get_cpubind: %d bitmap %s\n",
rank, size, orte_process_info.num_local_peers, rc,
(NULL == bindings) ? "NULL" : bindings);
printf("Hello, World, I am %d of %d\n", rank, size);
MPI_Finalize();
return 0;

Просмотреть файл

@ -51,6 +51,8 @@ main(int argc, char *argv[])
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
printf("Intercomm: Rank %d of %d\n", rank, size);
/* Check to see if we *were* spawned -- because this is a test, we
can only assume the existence of this one executable. Hence, we
both mpirun it and spawn it. */

Просмотреть файл

@ -4,7 +4,6 @@
#include <unistd.h>
#include <sys/param.h>
#include "opal/runtime/opal.h"
#include <mpi.h>
@ -13,14 +12,14 @@ int main(int argc, char* argv[])
int msg, rc;
MPI_Comm parent, child;
int rank, size;
const char *hostname;
char hostname[1024];
pid_t pid;
char *env_rank,*env_nspace;
env_rank = getenv("PMIX_RANK");
env_nspace = getenv("PMIX_NAMESPACE");
pid = getpid();
hostname = opal_gethostname();
gethostname(hostname, 1024);
printf("[%s:%s pid %ld] starting up on node %s!\n", env_nspace, env_rank, (long)pid, hostname);