diff --git a/orte/mca/rml/base/rml_base_contact.c b/orte/mca/rml/base/rml_base_contact.c index 6bc41fe228..6ee2f2c2c8 100644 --- a/orte/mca/rml/base/rml_base_contact.c +++ b/orte/mca/rml/base/rml_base_contact.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2016 Intel, Inc. All rights reserved. + * Copyright (c) 2016-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -72,6 +72,7 @@ int orte_rml_base_get_contact_info(orte_jobid_t job, opal_buffer_t *data) int orte_rml_base_update_contact_info(opal_buffer_t* data) { orte_std_cntr_t cnt; + orte_process_name_t peer; orte_vpid_t num_procs; char *rml_uri; int rc; @@ -89,11 +90,18 @@ int orte_rml_base_update_contact_info(opal_buffer_t* data) if (NULL != rml_uri) { /* set the contact info into the hash table */ orte_rml.set_contact_info(rml_uri); + /* if this was an update to my own job, then + * track how many procs were in the message */ + if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &peer, NULL))) { + ORTE_ERROR_LOG(rc); + free(rml_uri); + return rc; + } + if (peer.jobid == ORTE_PROC_MY_NAME->jobid) { + ++num_procs; + } free(rml_uri); } - - /* track how many procs were in the message */ - ++num_procs; } if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { ORTE_ERROR_LOG(rc); diff --git a/orte/orted/help-orted.txt b/orte/orted/help-orted.txt index c89d4e1015..cec46c2d15 100644 --- a/orte/orted/help-orted.txt +++ b/orte/orted/help-orted.txt @@ -80,3 +80,10 @@ This is usually caused by a large job that encounters significant delays across the cluster when starting the application processes. Your job may terminate as a result of this problem. You may want to adjust the MCA parameter pmix_server_max_reqs and try again. +# +[noserver] +A publish/lookup server was provided, but we were unable to connect +to it - please check the connection info and ensure the server +is alive: + + Connection: %s diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c index 63b4dbfdd3..0ed02ce6b7 100644 --- a/orte/orted/pmix/pmix_server.c +++ b/orte/orted/pmix/pmix_server.c @@ -296,94 +296,6 @@ int pmix_server_init(void) } OPAL_LIST_DESTRUCT(&info); - /* if the universal server wasn't specified, then we use - * our own HNP for that purpose */ - if (NULL == orte_pmix_server_globals.server_uri) { - orte_pmix_server_globals.server = *ORTE_PROC_MY_HNP; - } else { - char *server; - opal_buffer_t buf; - if (0 == strncmp(orte_pmix_server_globals.server_uri, "file", strlen("file")) || - 0 == strncmp(orte_pmix_server_globals.server_uri, "FILE", strlen("FILE"))) { - char input[1024], *filename; - FILE *fp; - - /* it is a file - get the filename */ - filename = strchr(orte_pmix_server_globals.server_uri, ':'); - if (NULL == filename) { - /* filename is not correctly formatted */ - orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-bad", true, - orte_basename, orte_pmix_server_globals.server_uri); - return ORTE_ERR_BAD_PARAM; - } - ++filename; /* space past the : */ - - if (0 >= strlen(filename)) { - /* they forgot to give us the name! */ - orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-missing", true, - orte_basename, orte_pmix_server_globals.server_uri); - return ORTE_ERR_BAD_PARAM; - } - - /* open the file and extract the uri */ - fp = fopen(filename, "r"); - if (NULL == fp) { /* can't find or read file! */ - orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-access", true, - orte_basename, orte_pmix_server_globals.server_uri); - return ORTE_ERR_BAD_PARAM; - } - if (NULL == fgets(input, 1024, fp)) { - /* something malformed about file */ - fclose(fp); - orte_show_help("help-orterun.txt", "orterun:ompi-server-file-bad", true, - orte_basename, orte_pmix_server_globals.server_uri, - orte_basename); - return ORTE_ERR_BAD_PARAM; - } - fclose(fp); - input[strlen(input)-1] = '\0'; /* remove newline */ - server = strdup(input); - } else { - server = strdup(orte_pmix_server_globals.server_uri); - } - /* setup our route to the server */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - opal_dss.pack(&buf, &server, 1, OPAL_STRING); - if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(&buf))) { - ORTE_ERROR_LOG(rc); - ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE); - return rc; - } - OBJ_DESTRUCT(&buf); - /* parse the URI to get the server's name */ - if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(server, &orte_pmix_server_globals.server, NULL))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* check if we are to wait for the server to start - resolves - * a race condition that can occur when the server is run - * as a background job - e.g., in scripts - */ - if (orte_pmix_server_globals.wait_for_server) { - /* ping the server */ - struct timeval timeout; - timeout.tv_sec = orte_pmix_server_globals.timeout; - timeout.tv_usec = 0; - if (ORTE_SUCCESS != (rc = orte_rml.ping(orte_mgmt_conduit, server, &timeout))) { - /* try it one more time */ - if (ORTE_SUCCESS != (rc = orte_rml.ping(orte_mgmt_conduit, server, &timeout))) { - /* okay give up */ - orte_show_help("help-orterun.txt", "orterun:server-not-found", true, - orte_basename, server, - (long)orte_pmix_server_globals.timeout, - ORTE_ERROR_NAME(rc)); - ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE); - return rc; - } - } - } - } - return rc; } @@ -716,8 +628,9 @@ OBJ_CLASS_INSTANCE(orte_pmix_server_op_caddy_t, static void rqcon(pmix_server_req_t *p) { p->operation = NULL; - p->target = *ORTE_NAME_INVALID; + p->range = OPAL_PMIX_RANGE_SESSION; p->proxy = *ORTE_NAME_INVALID; + p->target = *ORTE_NAME_INVALID; p->timeout = orte_pmix_server_globals.timeout; p->jdata = NULL; OBJ_CONSTRUCT(&p->msg, opal_buffer_t); diff --git a/orte/orted/pmix/pmix_server_internal.h b/orte/orted/pmix/pmix_server_internal.h index 5712529b5c..5246027188 100644 --- a/orte/orted/pmix/pmix_server_internal.h +++ b/orte/orted/pmix/pmix_server_internal.h @@ -67,8 +67,9 @@ int timeout; int room_num; int remote_room_num; + opal_pmix_data_range_t range; orte_process_name_t proxy; - opal_process_name_t target; + orte_process_name_t target; orte_job_t *jdata; opal_buffer_t msg; opal_pmix_op_cbfunc_t opcbfunc; @@ -255,6 +256,7 @@ typedef struct { bool wait_for_server; orte_process_name_t server; opal_list_t notifications; + bool pubsub_init; } pmix_server_globals_t; extern pmix_server_globals_t orte_pmix_server_globals; diff --git a/orte/orted/pmix/pmix_server_pub.c b/orte/orted/pmix/pmix_server_pub.c index 6b3e5bde78..0f009d1a9f 100644 --- a/orte/orted/pmix/pmix_server_pub.c +++ b/orte/orted/pmix/pmix_server_pub.c @@ -42,14 +42,126 @@ #include "orte/runtime/orte_data_server.h" #include "orte/runtime/orte_globals.h" #include "orte/mca/rml/rml.h" +#include "orte/mca/rml/base/rml_contact.h" #include "pmix_server_internal.h" +static int init_server(void) +{ + char *server; + opal_buffer_t buf; + char input[1024], *filename; + FILE *fp; + int rc; + + /* only do this once */ + orte_pmix_server_globals.pubsub_init = true; + + /* if the universal server wasn't specified, then we use + * our own HNP for that purpose */ + if (NULL == orte_pmix_server_globals.server_uri) { + orte_pmix_server_globals.server = *ORTE_PROC_MY_HNP; + } else { + if (0 == strncmp(orte_pmix_server_globals.server_uri, "file", strlen("file")) || + 0 == strncmp(orte_pmix_server_globals.server_uri, "FILE", strlen("FILE"))) { + /* it is a file - get the filename */ + filename = strchr(orte_pmix_server_globals.server_uri, ':'); + if (NULL == filename) { + /* filename is not correctly formatted */ + orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-bad", true, + orte_basename, orte_pmix_server_globals.server_uri); + return ORTE_ERR_BAD_PARAM; + } + ++filename; /* space past the : */ + + if (0 >= strlen(filename)) { + /* they forgot to give us the name! */ + orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-missing", true, + orte_basename, orte_pmix_server_globals.server_uri); + return ORTE_ERR_BAD_PARAM; + } + + /* open the file and extract the uri */ + fp = fopen(filename, "r"); + if (NULL == fp) { /* can't find or read file! */ + orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-access", true, + orte_basename, orte_pmix_server_globals.server_uri); + return ORTE_ERR_BAD_PARAM; + } + if (NULL == fgets(input, 1024, fp)) { + /* something malformed about file */ + fclose(fp); + orte_show_help("help-orterun.txt", "orterun:ompi-server-file-bad", true, + orte_basename, orte_pmix_server_globals.server_uri, + orte_basename); + return ORTE_ERR_BAD_PARAM; + } + fclose(fp); + input[strlen(input)-1] = '\0'; /* remove newline */ + server = strdup(input); + } else { + server = strdup(orte_pmix_server_globals.server_uri); + } + /* setup our route to the server */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.pack(&buf, &server, 1, OPAL_STRING); + if (ORTE_SUCCESS != (rc = orte_rml_base_update_contact_info(&buf))) { + ORTE_ERROR_LOG(rc); + ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE); + return rc; + } + OBJ_DESTRUCT(&buf); + /* parse the URI to get the server's name */ + if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(server, &orte_pmix_server_globals.server, NULL))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* check if we are to wait for the server to start - resolves + * a race condition that can occur when the server is run + * as a background job - e.g., in scripts + */ + if (orte_pmix_server_globals.wait_for_server) { + opal_output(0, "WAIT"); + /* ping the server */ + struct timeval timeout; + timeout.tv_sec = orte_pmix_server_globals.timeout; + timeout.tv_usec = 0; + if (ORTE_SUCCESS != (rc = orte_rml.ping(orte_mgmt_conduit, server, &timeout))) { + /* try it one more time */ + if (ORTE_SUCCESS != (rc = orte_rml.ping(orte_mgmt_conduit, server, &timeout))) { + /* okay give up */ + orte_show_help("help-orterun.txt", "orterun:server-not-found", true, + orte_basename, server, + (long)orte_pmix_server_globals.timeout, + ORTE_ERROR_NAME(rc)); + ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE); + return rc; + } + } + } + } + + opal_output(0, "SERVER READY"); + + return ORTE_SUCCESS; +} + static void execute(int sd, short args, void *cbdata) { pmix_server_req_t *req = (pmix_server_req_t*)cbdata; int rc; opal_buffer_t *xfer; + orte_process_name_t *target; + + if (!orte_pmix_server_globals.pubsub_init) { + /* we need to initialize our connection to the server */ + if (ORTE_SUCCESS != (rc = init_server())) { + orte_show_help("help-orted.txt", "noserver", true, + (NULL == orte_pmix_server_globals.server_uri) ? + "NULL" : orte_pmix_server_globals.server_uri); + goto callback; + } + } /* add this request to our tracker hotel */ if (OPAL_SUCCESS != (rc = opal_hotel_checkin(&orte_pmix_server_globals.reqs, req, &req->room_num))) { @@ -67,9 +179,16 @@ static void execute(int sd, short args, void *cbdata) } opal_dss.copy_payload(xfer, &req->msg); + /* if the range is SESSION, then set the target to the global server */ + if (OPAL_PMIX_RANGE_SESSION == req->range) { + target = &orte_pmix_server_globals.server; + } else { + target = ORTE_PROC_MY_HNP; + } + /* send the request to the target */ rc = orte_rml.send_buffer_nb(orte_mgmt_conduit, - &req->target, xfer, + target, xfer, ORTE_RML_TAG_DATA_SERVER, orte_rml_send_callback, NULL); if (ORTE_SUCCESS == rc) { @@ -95,7 +214,6 @@ int pmix_server_publish_fn(opal_process_name_t *proc, int rc; uint8_t cmd = ORTE_PMIX_PUBLISH_CMD; opal_value_t *iptr; - opal_pmix_data_range_t range = OPAL_PMIX_RANGE_SESSION; opal_pmix_persistence_t persist = OPAL_PMIX_PERSIST_APP; bool rset, pset; @@ -128,7 +246,7 @@ int pmix_server_publish_fn(opal_process_name_t *proc, pset = false; OPAL_LIST_FOREACH(iptr, info, opal_value_t) { if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) { - range = (opal_pmix_data_range_t)iptr->data.uint; + req->range = (opal_pmix_data_range_t)iptr->data.uint; if (pset) { break; } @@ -143,19 +261,12 @@ int pmix_server_publish_fn(opal_process_name_t *proc, } /* pack the range */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_PMIX_DATA_RANGE))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_PMIX_DATA_RANGE))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(req); return rc; } - /* if the range is SESSION, then set the target to the global server */ - if (OPAL_PMIX_RANGE_SESSION == range) { - req->target = orte_pmix_server_globals.server; - } else { - req->target = *ORTE_PROC_MY_HNP; - } - /* pack the persistence */ if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &persist, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); @@ -205,7 +316,6 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys, uint8_t cmd = ORTE_PMIX_LOOKUP_CMD; int32_t nkeys, i; opal_value_t *iptr; - opal_pmix_data_range_t range = OPAL_PMIX_RANGE_SESSION; /* the list of info objects are directives for us - they include * things like timeout constraints, so there is no reason to @@ -234,25 +344,18 @@ int pmix_server_lookup_fn(opal_process_name_t *proc, char **keys, /* no help for it - need to search for range */ OPAL_LIST_FOREACH(iptr, info, opal_value_t) { if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) { - range = (opal_pmix_data_range_t)iptr->data.uint; + req->range = (opal_pmix_data_range_t)iptr->data.uint; break; } } /* pack the range */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_PMIX_DATA_RANGE))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_PMIX_DATA_RANGE))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(req); return rc; } - /* if the range is SESSION, then set the target to the global server */ - if (OPAL_PMIX_RANGE_SESSION == range) { - req->target = orte_pmix_server_globals.server; - } else { - req->target = *ORTE_PROC_MY_HNP; - } - /* pack the number of keys */ nkeys = opal_argv_count(keys); if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) { @@ -309,7 +412,6 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc, char **keys, uint8_t cmd = ORTE_PMIX_UNPUBLISH_CMD; uint32_t nkeys, n; opal_value_t *iptr; - opal_pmix_data_range_t range = OPAL_PMIX_RANGE_SESSION; /* create the caddy */ req = OBJ_NEW(pmix_server_req_t); @@ -334,25 +436,18 @@ int pmix_server_unpublish_fn(opal_process_name_t *proc, char **keys, /* no help for it - need to search for range */ OPAL_LIST_FOREACH(iptr, info, opal_value_t) { if (0 == strcmp(iptr->key, OPAL_PMIX_RANGE)) { - range = (opal_pmix_data_range_t)iptr->data.integer; + req->range = (opal_pmix_data_range_t)iptr->data.integer; break; } } /* pack the range */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &range, 1, OPAL_INT))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &req->range, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(req); return rc; } - /* if the range is SESSION, then set the target to the global server */ - if (OPAL_PMIX_RANGE_SESSION == range) { - req->target = orte_pmix_server_globals.server; - } else { - req->target = *ORTE_PROC_MY_HNP; - } - /* pack the number of keys */ nkeys = opal_argv_count(keys); if (OPAL_SUCCESS != (rc = opal_dss.pack(&req->msg, &nkeys, 1, OPAL_UINT32))) {