Fix ompi-server operations
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
родитель
4bfb0fcddd
Коммит
8c2a06477c
@ -9,7 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
||||
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -72,6 +72,7 @@ int orte_rml_base_get_contact_info(orte_jobid_t job, opal_buffer_t *data)
|
||||
int orte_rml_base_update_contact_info(opal_buffer_t* data)
|
||||
{
|
||||
orte_std_cntr_t cnt;
|
||||
orte_process_name_t peer;
|
||||
orte_vpid_t num_procs;
|
||||
char *rml_uri;
|
||||
int rc;
|
||||
@ -89,11 +90,18 @@ int orte_rml_base_update_contact_info(opal_buffer_t* data)
|
||||
if (NULL != rml_uri) {
|
||||
/* set the contact info into the hash table */
|
||||
orte_rml.set_contact_info(rml_uri);
|
||||
/* if this was an update to my own job, then
|
||||
* track how many procs were in the message */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &peer, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
free(rml_uri);
|
||||
return rc;
|
||||
}
|
||||
if (peer.jobid == ORTE_PROC_MY_NAME->jobid) {
|
||||
++num_procs;
|
||||
}
|
||||
free(rml_uri);
|
||||
}
|
||||
|
||||
/* track how many procs were in the message */
|
||||
++num_procs;
|
||||
}
|
||||
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
|
@ -80,3 +80,10 @@ This is usually caused by a large job that encounters significant
|
||||
delays across the cluster when starting the application processes.
|
||||
Your job may terminate as a result of this problem. You may want to
|
||||
adjust the MCA parameter pmix_server_max_reqs and try again.
|
||||
#
|
||||
[noserver]
|
||||
A publish/lookup server was provided, but we were unable to connect
|
||||
to it - please check the connection info and ensure the server
|
||||
is alive:
|
||||
|
||||
Connection: %s
|
||||
|
@ -296,94 +296,6 @@ int pmix_server_init(void)
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&info);
|
||||
|
||||
/* if the universal server wasn't specified, then we use
|
||||
* our own HNP for that purpose */
|
||||
if (NULL == orte_pmix_server_globals.server_uri) {
|
||||
orte_pmix_server_globals.server = *ORTE_PROC_MY_HNP;
|
||||
} else {
|
||||
char *server;
|
||||
opal_buffer_t buf;
|
||||
if (0 == strncmp(orte_pmix_server_globals.server_uri, "file", strlen("file")) ||
|
||||
0 == strncmp(orte_pmix_server_globals.server_uri, "FILE", strlen("FILE"))) {
|
||||
char input[1024], *filename;
|
||||
FILE *fp;
|
||||
|
||||
/* it is a file - get the filename */
|
||||
filename = strchr(orte_pmix_server_globals.server_uri, ':');
|
||||
if (NULL == filename) {
|
||||
/* filename is not correctly formatted */
|
||||
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-bad", true,
|
||||
orte_basename, orte_pmix_server_globals.server_uri);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
++filename; /* space past the : */
|
||||
|
||||
if (0 >= strlen(filename)) {
|
||||
/* they forgot to give us the name! */
|
||||
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-missing", true,
|
||||
orte_basename, orte_pmix_server_globals.server_uri);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* open the file and extract the uri */
|
||||
fp = fopen(filename, "r");
|
||||
if (NULL == fp) { /* can't find or read file! */
|
||||
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-access", true,
|
||||
orte_basename, orte_pmix_server_globals.server_uri);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
if (NULL == fgets(input, 1024, fp)) {
|
||||
/* something malformed about file */
|
||||
fclose(fp);
|
||||
orte_show_help("help-orterun.txt", "orterun:ompi-server-file-bad", true,
|
||||
orte_basename, orte_pmix_server_globals.server_uri,
|
||||
orte_basename);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
fclose(fp);
|
||||
input[strlen(input)-1] = '\0'; /* remove newline */
|
||||
server = strdup(input);
|
||||
} else {
|
||||
server = strdup(orte_pmix_server_globals.server_uri);
|
||||
}
|
||||
/* setup our route to the server */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
opal_dss.pack(&buf, &server, 1, OPAL_STRING);
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(&buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&buf);
|
||||
/* parse the URI to get the server's name */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(server, &orte_pmix_server_globals.server, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* check if we are to wait for the server to start - resolves
|
||||
* a race condition that can occur when the server is run
|
||||
* as a background job - e.g., in scripts
|
||||
*/
|
||||
if (orte_pmix_server_globals.wait_for_server) {
|
||||
/* ping the server */
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = orte_pmix_server_globals.timeout;
|
||||
timeout.tv_usec = 0;
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.ping(orte_mgmt_conduit, server, &timeout))) {
|
||||
/* try it one more time */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.ping(orte_mgmt_conduit, server, &timeout))) {
|
||||
/* okay give up */
|
||||
orte_show_help("help-orterun.txt", "orterun:server-not-found", true,
|
||||
orte_basename, server,
|
||||
(long)orte_pmix_server_globals.timeout,
|
||||
ORTE_ERROR_NAME(rc));
|
||||
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -716,8 +628,9 @@ OBJ_CLASS_INSTANCE(orte_pmix_server_op_caddy_t,
|
||||
static void rqcon(pmix_server_req_t *p)
|
||||
{
|
||||
p->operation = NULL;
|
||||
p->target = *ORTE_NAME_INVALID;
|
||||
p->range = OPAL_PMIX_RANGE_SESSION;
|
||||
p->proxy = *ORTE_NAME_INVALID;
|
||||
p->target = *ORTE_NAME_INVALID;
|
||||
p->timeout = orte_pmix_server_globals.timeout;
|
||||
p->jdata = NULL;
|
||||
OBJ_CONSTRUCT(&p->msg, opal_buffer_t);
|
||||
|
@ -67,8 +67,9 @@
|
||||
int timeout;
|
||||
int room_num;
|
||||
int remote_room_num;
|
||||
opal_pmix_data_range_t range;
|
||||
orte_process_name_t proxy;
|
||||
opal_process_name_t target;
|
||||
orte_process_name_t target;
|
||||
orte_job_t *jdata;
|
||||
opal_buffer_t msg;
|
||||
opal_pmix_op_cbfunc_t opcbfunc;
|
||||
@ -255,6 +256,7 @@ typedef struct {
|
||||
bool wait_for_server;
|
||||
orte_process_name_t server;
|
||||
opal_list_t notifications;
|
||||
bool pubsub_init;
|
||||
} pmix_server_globals_t;
|
||||
|
||||
extern pmix_server_globals_t orte_pmix_server_globals;
|
||||
|
@ -42,14 +42,126 @@
|
||||
#include "orte/runtime/orte_data_server.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/rml/base/rml_contact.h"
|
||||
|
||||
#include "pmix_server_internal.h"
|
||||
|
||||
static int init_server(void)
|
||||
{
|
||||
char *server;
|
||||
opal_buffer_t buf;
|
||||
char input[1024], *filename;
|
||||
FILE *fp;
|
||||
int rc;
|
||||
|
||||
/* only do this once */
|
||||
orte_pmix_server_globals.pubsub_init = true;
|
||||
|
||||
/* if the universal server wasn't specified, then we use
|
||||
* our own HNP for that purpose */
|
||||
if (NULL == orte_pmix_server_globals.server_uri) {
|
||||
orte_pmix_server_globals.server = *ORTE_PROC_MY_HNP;
|
||||
} else {
|
||||
if (0 == strncmp(orte_pmix_server_globals.server_uri, "file", strlen("file")) ||
|
||||
0 == strncmp(orte_pmix_server_globals.server_uri, "FILE", strlen("FILE"))) {
|
||||
/* it is a file - get the filename */
|
||||
filename = strchr(orte_pmix_server_globals.server_uri, ':');
|
||||
if (NULL == filename) {
|
||||
/* filename is not correctly formatted */
|
||||
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-bad", true,
|
||||
orte_basename, orte_pmix_server_globals.server_uri);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
++filename; /* space past the : */
|
||||
|
||||
if (0 >= strlen(filename)) {
|
||||
/* they forgot to give us the name! */
|
||||
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-missing", true,
|
||||
orte_basename, orte_pmix_server_globals.server_uri);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
|
||||
/* open the file and extract the uri */
|
||||
fp = fopen(filename, "r");
|
||||
if (NULL == fp) { /* can't find or read file! */
|
||||
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-access", true,
|
||||
orte_basename, orte_pmix_server_globals.server_uri);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
if (NULL == fgets(input, 1024, fp)) {
|
||||
/* something malformed about file */
|
||||
fclose(fp);
|
||||
orte_show_help("help-orterun.txt", "orterun:ompi-server-file-bad", true,
|
||||
orte_basename, orte_pmix_server_globals.server_uri,
|
||||
orte_basename);
|
||||
return ORTE_ERR_BAD_PARAM;
|
||||
}
|
||||
fclose(fp);
|
||||
input[strlen(input)-1] = '\0'; /* remove newline */
|
||||
server = strdup(input);
|
||||
} else {
|
||||
server = strdup(orte_pmix_server_globals.server_uri);
|
||||
}
|
||||
/* setup our route to the server */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
opal_dss.pack(&buf, &server, 1, OPAL_STRING);
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(&buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&buf);
|
||||
/* parse the URI to get the server's name */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(server, &orte_pmix_server_globals.server, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
/* check if we are to wait for the server to start - resolves
|
||||
* a race condition that can occur when the server is run
|
||||
* as a background job - e.g., in scripts
|
||||
*/
|
||||
if (orte_pmix_server_globals.wait_for_server) {
|
||||
opal_output(0, "WAIT");
|
||||
/* ping the server */
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = orte_pmix_server_globals.timeout;
|
||||
timeout.tv_usec = 0;
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.ping(orte_mgmt_conduit, server, &timeout))) {
|
||||
/* try it one more time */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.ping(orte_mgmt_conduit, server, &timeout))) {
|
||||
/* okay give up */
|
||||
orte_show_help("help-orterun.txt", "orterun:server-not-found", true,
|
||||
orte_basename, server,
|
||||
(long)orte_pmix_server_globals.timeout,
|
||||
ORTE_ERROR_NAME(rc));
|
||||
ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
opal_output(0, "SERVER READY");
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static void execute(int sd, short args, void *cbdata)
|
||||
{
|
||||
pmix_server_req_t *req = (pmix_server_req_t*)cbdata;
|
||||
int rc;
|
||||
opal_buffer_t *xfer;
|
||||
orte_process_name_t *target;
|
||||
|
||||
if (!orte_pmix_server_globals.pubsub_init) {
|
||||
/* we need to initialize our connection to the server */
|
||||
if (ORTE_SUCCESS != (rc = init_server())) {
|
||||
orte_show_help("help-orted.txt", "noserver", true,
|
||||
(NULL == orte_pmix_server_globals.server_uri) ?
|
||||
"NULL" : orte_pmix_server_globals.server_uri);
|
||||
goto callback;
|
||||
}
|
||||
}
|
||||
|
||||
/* add this request to our tracker hotel */
|
||||
if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) {
|
||||
@ -67,9 +179,16 @@ static void execute(int sd, short args, void *cbdata)
|
||||
}
|
||||
opal_dss.copy_payload(xfer, &req->msg);
|
||||
|
||||
/* if the range is SESSION, then set the target to the global server */
|
||||
if (OPAL_PMIX_RANGE_SESSION == req->range) {
|
||||
target = &orte_pmix_server_globals.server;
|
||||
} else {
|
||||
target = ORTE_PROC_MY_HNP;
|
||||
}
|
||||
|
||||
/* send the request to the target */
|
||||
rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
|
||||
&req->target, xfer,
|
||||
target, xfer,
|
||||
ORTE_RML_TAG_DATA_SERVER,
|
||||
orte_rml_send_callback, NULL);
|
||||
if (ORTE_SUCCESS == rc) {
|
||||
@ -95,7 +214,6 @@ int pmix_server_publish_fn(opal_process_name_t *proc,
|
||||
int rc;
|
||||
uint8_t cmd = ORTE_PMIX_PUBLISH_CMD;
|
||||
opal_value_t *iptr;
|
||||
opal_pmix_data_range_t range = OPAL_PMIX_RANGE_SESSION;
|
||||
opal_pmix_persistence_t persist = OPAL_PMIX_PERSIST_APP;
|
||||
bool rset, pset;
|
||||
|
||||
@ -128,7 +246,7 @@ int pmix_server_publish_fn(opal_process_name_t *proc,
|
||||
pset = false;
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
|
||||
range = (opal_pmix_data_range_t)iptr->data.uint;
|
||||
req->range = (opal_pmix_data_range_t)iptr->data.uint;
|
||||
if (pset) {
|
||||
break;
|
||||
}
|
||||
@ -143,19 +261,12 @@ int pmix_server_publish_fn(opal_process_name_t *proc,
|
||||
}
|
||||
|
||||
/* pack the range */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_PMIX_DATA_RANGE))) {
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_PMIX_DATA_RANGE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(req);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if the range is SESSION, then set the target to the global server */
|
||||
if (OPAL_PMIX_RANGE_SESSION == range) {
|
||||
req->target = orte_pmix_server_globals.server;
|
||||
} else {
|
||||
req->target = *ORTE_PROC_MY_HNP;
|
||||
}
|
||||
|
||||
/* pack the persistence */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &persist, 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
@ -205,7 +316,6 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys,
|
||||
uint8_t cmd = ORTE_PMIX_LOOKUP_CMD;
|
||||
int32_t nkeys, i;
|
||||
opal_value_t *iptr;
|
||||
opal_pmix_data_range_t range = OPAL_PMIX_RANGE_SESSION;
|
||||
|
||||
/* the list of info objects are directives for us - they include
|
||||
* things like timeout constraints, so there is no reason to
|
||||
@ -234,25 +344,18 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys,
|
||||
/* no help for it - need to search for range */
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
|
||||
range = (opal_pmix_data_range_t)iptr->data.uint;
|
||||
req->range = (opal_pmix_data_range_t)iptr->data.uint;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* pack the range */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_PMIX_DATA_RANGE))) {
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_PMIX_DATA_RANGE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(req);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if the range is SESSION, then set the target to the global server */
|
||||
if (OPAL_PMIX_RANGE_SESSION == range) {
|
||||
req->target = orte_pmix_server_globals.server;
|
||||
} else {
|
||||
req->target = *ORTE_PROC_MY_HNP;
|
||||
}
|
||||
|
||||
/* pack the number of keys */
|
||||
nkeys = opal_argv_count(keys);
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) {
|
||||
@ -309,7 +412,6 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc, char **keys,
|
||||
uint8_t cmd = ORTE_PMIX_UNPUBLISH_CMD;
|
||||
uint32_t nkeys, n;
|
||||
opal_value_t *iptr;
|
||||
opal_pmix_data_range_t range = OPAL_PMIX_RANGE_SESSION;
|
||||
|
||||
/* create the caddy */
|
||||
req = OBJ_NEW(pmix_server_req_t);
|
||||
@ -334,25 +436,18 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc, char **keys,
|
||||
/* no help for it - need to search for range */
|
||||
OPAL_LIST_FOREACH(iptr, info, opal_value_t) {
|
||||
if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) {
|
||||
range = (opal_pmix_data_range_t)iptr->data.integer;
|
||||
req->range = (opal_pmix_data_range_t)iptr->data.integer;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* pack the range */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_INT))) {
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(req);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* if the range is SESSION, then set the target to the global server */
|
||||
if (OPAL_PMIX_RANGE_SESSION == range) {
|
||||
req->target = orte_pmix_server_globals.server;
|
||||
} else {
|
||||
req->target = *ORTE_PROC_MY_HNP;
|
||||
}
|
||||
|
||||
/* pack the number of keys */
|
||||
nkeys = opal_argv_count(keys);
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) {
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user