From 5cdbc0013695d89f3bf5d52e829bc2019373583d Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Sat, 30 Aug 2014 19:33:46 +0000 Subject: [PATCH] Re-enable the usock oob component. Ensure the TCP component promotes messages for other procs to the OOB base so that other components have a chance to send the relay. Seems to be passing MTT, so let's see how it works for others. This commit was SVN r32650. --- opal/mca/pmix/native/usock_sendrecv.c | 23 +++- orte/mca/ess/base/ess_base_std_app.c | 104 +++++++-------- orte/mca/ess/base/ess_base_std_orted.c | 156 +++++++++++----------- orte/mca/ess/hnp/ess_hnp_module.c | 73 +++++----- orte/mca/oob/tcp/oob_tcp_sendrecv.c | 70 +++------- orte/mca/oob/usock/.opal_ignore | 0 orte/mca/oob/usock/oob_usock_component.c | 34 +++-- orte/mca/oob/usock/oob_usock_connection.c | 52 +++----- orte/mca/oob/usock/oob_usock_listener.c | 5 + orte/orted/pmix/pmix_server_sendrecv.c | 5 +- orte/test/system/oob_stress.c | 18 ++- 11 files changed, 274 insertions(+), 266 deletions(-) delete mode 100644 orte/mca/oob/usock/.opal_ignore diff --git a/opal/mca/pmix/native/usock_sendrecv.c b/opal/mca/pmix/native/usock_sendrecv.c index 89d5de50dd..d0bd1e606c 100644 --- a/opal/mca/pmix/native/usock_sendrecv.c +++ b/opal/mca/pmix/native/usock_sendrecv.c @@ -132,12 +132,19 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata) break; case PMIX_USOCK_CONNECTED: opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s usock:send_handler SENDING TO SERVER", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + "%s usock:send_handler SENDING TO SERVER with %s msg", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), + (NULL == msg) ? "NULL" : "NON-NULL"); if (NULL != msg) { if (!msg->hdr_sent) { + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s usock:send_handler SENDING HEADER", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); if (OPAL_SUCCESS == (rc = send_bytes(msg))) { /* header is completely sent */ + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s usock:send_handler HEADER SENT", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); msg->hdr_sent = true; /* setup to send the data */ if (NULL == msg->data) { @@ -154,6 +161,9 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata) } else if (OPAL_ERR_RESOURCE_BUSY == rc || OPAL_ERR_WOULD_BLOCK == rc) { /* exit this event and let the event lib progress */ + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s usock:send_handler RES BUSY OR WOULD BLOCK", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); return; } else { // report the error @@ -169,14 +179,23 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata) } if (msg->hdr_sent) { + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s usock:send_handler SENDING BODY OF MSG", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); if (OPAL_SUCCESS == (rc = send_bytes(msg))) { // message is complete + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s usock:send_handler BODY SENT", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); OBJ_RELEASE(msg); mca_pmix_native_component.send_msg = NULL; goto next; } else if (OPAL_ERR_RESOURCE_BUSY == rc || OPAL_ERR_WOULD_BLOCK == rc) { /* exit this event and let the event lib progress */ + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s usock:send_handler RES BUSY OR WOULD BLOCK", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); return; } else { // report the error diff --git a/orte/mca/ess/base/ess_base_std_app.c b/orte/mca/ess/base/ess_base_std_app.c index adc9278da7..c3d80c1200 100644 --- a/orte/mca/ess/base/ess_base_std_app.c +++ b/orte/mca/ess/base/ess_base_std_app.c @@ -133,6 +133,58 @@ int orte_ess_base_app_setup(bool db_restrict_local) goto error; } + /* setup my session directory */ + if (orte_create_session_dirs) { + OPAL_OUTPUT_VERBOSE((2, orte_ess_base_framework.framework_output, + "%s setting up session dir with\n\ttmpdir: %s\n\thost %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base, + orte_process_info.nodename)); + + if (ORTE_SUCCESS != (ret = orte_session_dir(true, + orte_process_info.tmpdir_base, + orte_process_info.nodename, NULL, + ORTE_PROC_MY_NAME))) { + ORTE_ERROR_LOG(ret); + error = "orte_session_dir"; + goto error; + } + + /* Once the session directory location has been established, set + the opal_output env file location to be in the + proc-specific session directory. */ + opal_output_set_output_file_info(orte_process_info.proc_session_dir, + "output-", NULL, NULL); + + /* store the session directory location in the database */ + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(OPAL_DSTORE_JOB_SDIR); + kv.type = OPAL_STRING; + kv.data.string = strdup(orte_process_info.job_session_dir); + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, + (opal_identifier_t*)ORTE_PROC_MY_NAME, + &kv))) { + ORTE_ERROR_LOG(ret); + OBJ_DESTRUCT(&kv); + error = "opal dstore store"; + goto error; + } + OBJ_DESTRUCT(&kv); + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(OPAL_DSTORE_MY_SDIR); + kv.type = OPAL_STRING; + kv.data.string = strdup(orte_process_info.proc_session_dir); + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, + (opal_identifier_t*)ORTE_PROC_MY_NAME, + &kv))) { + ORTE_ERROR_LOG(ret); + OBJ_DESTRUCT(&kv); + error = "opal dstore store"; + goto error; + } + OBJ_DESTRUCT(&kv); + } + /* Setup the communication infrastructure */ /* * OOB Layer @@ -220,58 +272,6 @@ int orte_ess_base_app_setup(bool db_restrict_local) goto error; } - /* setup my session directory */ - if (orte_create_session_dirs) { - OPAL_OUTPUT_VERBOSE((2, orte_ess_base_framework.framework_output, - "%s setting up session dir with\n\ttmpdir: %s\n\thost %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base, - orte_process_info.nodename)); - - if (ORTE_SUCCESS != (ret = orte_session_dir(true, - orte_process_info.tmpdir_base, - orte_process_info.nodename, NULL, - ORTE_PROC_MY_NAME))) { - ORTE_ERROR_LOG(ret); - error = "orte_session_dir"; - goto error; - } - - /* Once the session directory location has been established, set - the opal_output env file location to be in the - proc-specific session directory. */ - opal_output_set_output_file_info(orte_process_info.proc_session_dir, - "output-", NULL, NULL); - - /* store the session directory location in the database */ - OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup(OPAL_DSTORE_JOB_SDIR); - kv.type = OPAL_STRING; - kv.data.string = strdup(orte_process_info.job_session_dir); - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, - (opal_identifier_t*)ORTE_PROC_MY_NAME, - &kv))) { - ORTE_ERROR_LOG(ret); - OBJ_DESTRUCT(&kv); - error = "opal dstore store"; - goto error; - } - OBJ_DESTRUCT(&kv); - OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup(OPAL_DSTORE_MY_SDIR); - kv.type = OPAL_STRING; - kv.data.string = strdup(orte_process_info.proc_session_dir); - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, - (opal_identifier_t*)ORTE_PROC_MY_NAME, - &kv))) { - ORTE_ERROR_LOG(ret); - OBJ_DESTRUCT(&kv); - error = "opal dstore store"; - goto error; - } - OBJ_DESTRUCT(&kv); - } - /* setup the routed info */ if (ORTE_SUCCESS != (ret = orte_routed.init_routes(ORTE_PROC_MY_NAME->jobid, NULL))) { ORTE_ERROR_LOG(ret); diff --git a/orte/mca/ess/base/ess_base_std_orted.c b/orte/mca/ess/base/ess_base_std_orted.c index 161a446dfa..0f14f1da64 100644 --- a/orte/mca/ess/base/ess_base_std_orted.c +++ b/orte/mca/ess/base/ess_base_std_orted.c @@ -238,6 +238,84 @@ int orte_ess_base_orted_setup(char **hosts) } } + /* setup my session directory here as the OOB may need it */ + if (orte_create_session_dirs) { + OPAL_OUTPUT_VERBOSE((2, orte_ess_base_framework.framework_output, + "%s setting up session dir with\n\ttmpdir: %s\n\thost %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base, + orte_process_info.nodename)); + + /* take a pass thru the session directory code to fillin the + * tmpdir names - don't create anything yet + */ + if (ORTE_SUCCESS != (ret = orte_session_dir(false, + orte_process_info.tmpdir_base, + orte_process_info.nodename, NULL, + ORTE_PROC_MY_NAME))) { + ORTE_ERROR_LOG(ret); + error = "orte_session_dir define"; + goto error; + } + /* clear the session directory just in case there are + * stale directories laying around + */ + orte_session_dir_cleanup(ORTE_JOBID_WILDCARD); + + /* now actually create the directory tree */ + if (ORTE_SUCCESS != (ret = orte_session_dir(true, + orte_process_info.tmpdir_base, + orte_process_info.nodename, NULL, + ORTE_PROC_MY_NAME))) { + ORTE_ERROR_LOG(ret); + error = "orte_session_dir"; + goto error; + } + + /* set the opal_output env file location to be in the + * proc-specific session directory. */ + opal_output_set_output_file_info(orte_process_info.proc_session_dir, + "output-", NULL, NULL); + + /* setup stdout/stderr */ + if (orte_debug_daemons_file_flag) { + /* if we are debugging to a file, then send stdout/stderr to + * the orted log file + */ + + /* get my jobid */ + if (ORTE_SUCCESS != (ret = orte_util_convert_jobid_to_string(&jobidstring, + ORTE_PROC_MY_NAME->jobid))) { + ORTE_ERROR_LOG(ret); + error = "convert_jobid"; + goto error; + } + + /* define a log file name in the session directory */ + snprintf(log_file, PATH_MAX, "output-orted-%s-%s.log", + jobidstring, orte_process_info.nodename); + log_path = opal_os_path(false, + orte_process_info.tmpdir_base, + orte_process_info.top_session_dir, + log_file, + NULL); + + fd = open(log_path, O_RDWR|O_CREAT|O_TRUNC, 0640); + if (fd < 0) { + /* couldn't open the file for some reason, so + * just connect everything to /dev/null + */ + fd = open("/dev/null", O_RDWR|O_CREAT|O_TRUNC, 0666); + } else { + dup2(fd, STDOUT_FILENO); + dup2(fd, STDERR_FILENO); + if(fd != STDOUT_FILENO && fd != STDERR_FILENO) { + close(fd); + } + } + } + } + /* Setup the communication infrastructure */ if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_oob_base_framework, 0))) { ORTE_ERROR_LOG(ret); @@ -368,84 +446,6 @@ int orte_ess_base_orted_setup(char **hosts) } } - /* setup my session directory */ - if (orte_create_session_dirs) { - OPAL_OUTPUT_VERBOSE((2, orte_ess_base_framework.framework_output, - "%s setting up session dir with\n\ttmpdir: %s\n\thost %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base, - orte_process_info.nodename)); - - /* take a pass thru the session directory code to fillin the - * tmpdir names - don't create anything yet - */ - if (ORTE_SUCCESS != (ret = orte_session_dir(false, - orte_process_info.tmpdir_base, - orte_process_info.nodename, NULL, - ORTE_PROC_MY_NAME))) { - ORTE_ERROR_LOG(ret); - error = "orte_session_dir define"; - goto error; - } - /* clear the session directory just in case there are - * stale directories laying around - */ - orte_session_dir_cleanup(ORTE_JOBID_WILDCARD); - - /* now actually create the directory tree */ - if (ORTE_SUCCESS != (ret = orte_session_dir(true, - orte_process_info.tmpdir_base, - orte_process_info.nodename, NULL, - ORTE_PROC_MY_NAME))) { - ORTE_ERROR_LOG(ret); - error = "orte_session_dir"; - goto error; - } - /* Once the session directory location has been established, set - the opal_output env file location to be in the - proc-specific session directory. */ - opal_output_set_output_file_info(orte_process_info.proc_session_dir, - "output-", NULL, NULL); - - /* setup stdout/stderr */ - if (orte_debug_daemons_file_flag) { - /* if we are debugging to a file, then send stdout/stderr to - * the orted log file - */ - - /* get my jobid */ - if (ORTE_SUCCESS != (ret = orte_util_convert_jobid_to_string(&jobidstring, - ORTE_PROC_MY_NAME->jobid))) { - ORTE_ERROR_LOG(ret); - error = "convert_jobid"; - goto error; - } - - /* define a log file name in the session directory */ - snprintf(log_file, PATH_MAX, "output-orted-%s-%s.log", - jobidstring, orte_process_info.nodename); - log_path = opal_os_path(false, - orte_process_info.tmpdir_base, - orte_process_info.top_session_dir, - log_file, - NULL); - - fd = open(log_path, O_RDWR|O_CREAT|O_TRUNC, 0640); - if (fd < 0) { - /* couldn't open the file for some reason, so - * just connect everything to /dev/null - */ - fd = open("/dev/null", O_RDWR|O_CREAT|O_TRUNC, 0666); - } else { - dup2(fd, STDOUT_FILENO); - dup2(fd, STDERR_FILENO); - if(fd != STDOUT_FILENO && fd != STDERR_FILENO) { - close(fd); - } - } - } - } - /* setup the global job and node arrays */ orte_job_data = OBJ_NEW(opal_pointer_array_t); if (ORTE_SUCCESS != (ret = opal_pointer_array_init(orte_job_data, diff --git a/orte/mca/ess/hnp/ess_hnp_module.c b/orte/mca/ess/hnp/ess_hnp_module.c index 8e3b30795c..c5a795416d 100644 --- a/orte/mca/ess/hnp/ess_hnp_module.c +++ b/orte/mca/ess/hnp/ess_hnp_module.c @@ -314,6 +314,41 @@ static int rte_init(void) orte_process_info.super.proc_arch = opal_local_arch; opal_proc_local_set(&orte_process_info.super); + /* setup my session directory here as the OOB may need it */ + if (orte_create_session_dirs) { + OPAL_OUTPUT_VERBOSE((2, orte_debug_output, + "%s setting up session dir with\n\ttmpdir: %s\n\thost %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base, + orte_process_info.nodename)); + + /* take a pass thru the session directory code to fillin the + * tmpdir names - don't create anything yet + */ + if (ORTE_SUCCESS != (ret = orte_session_dir(false, + orte_process_info.tmpdir_base, + orte_process_info.nodename, NULL, + ORTE_PROC_MY_NAME))) { + ORTE_ERROR_LOG(ret); + error = "orte_session_dir define"; + goto error; + } + /* clear the session directory just in case there are + * stale directories laying around + */ + orte_session_dir_cleanup(ORTE_JOBID_WILDCARD); + + /* now actually create the directory tree */ + if (ORTE_SUCCESS != (ret = orte_session_dir(true, + orte_process_info.tmpdir_base, + orte_process_info.nodename, NULL, + ORTE_PROC_MY_NAME))) { + ORTE_ERROR_LOG(ret); + error = "orte_session_dir"; + goto error; + } + } + /* Setup the communication infrastructure */ /* @@ -599,43 +634,9 @@ static int rte_init(void) orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SHOW_HELP, ORTE_RML_PERSISTENT, orte_show_help_recv, NULL); - /* setup my session directory */ if (orte_create_session_dirs) { - OPAL_OUTPUT_VERBOSE((2, orte_debug_output, - "%s setting up session dir with\n\ttmpdir: %s\n\thost %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base, - orte_process_info.nodename)); - - /* take a pass thru the session directory code to fillin the - * tmpdir names - don't create anything yet - */ - if (ORTE_SUCCESS != (ret = orte_session_dir(false, - orte_process_info.tmpdir_base, - orte_process_info.nodename, NULL, - ORTE_PROC_MY_NAME))) { - ORTE_ERROR_LOG(ret); - error = "orte_session_dir define"; - goto error; - } - /* clear the session directory just in case there are - * stale directories laying around - */ - orte_session_dir_cleanup(ORTE_JOBID_WILDCARD); - - /* now actually create the directory tree */ - if (ORTE_SUCCESS != (ret = orte_session_dir(true, - orte_process_info.tmpdir_base, - orte_process_info.nodename, NULL, - ORTE_PROC_MY_NAME))) { - ORTE_ERROR_LOG(ret); - error = "orte_session_dir"; - goto error; - } - - /* Once the session directory location has been established, set - the opal_output hnp file location to be in the - proc-specific session directory. */ + /* set the opal_output hnp file location to be in the + * proc-specific session directory. */ opal_output_set_output_file_info(orte_process_info.proc_session_dir, "output-", NULL, NULL); diff --git a/orte/mca/oob/tcp/oob_tcp_sendrecv.c b/orte/mca/oob/tcp/oob_tcp_sendrecv.c index e6106b6043..9b932bb053 100644 --- a/orte/mca/oob/tcp/oob_tcp_sendrecv.c +++ b/orte/mca/oob/tcp/oob_tcp_sendrecv.c @@ -397,9 +397,7 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata) { mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata; int rc; - orte_process_name_t hop; - mca_oob_tcp_peer_t *relay; - uint64_t ui64; + orte_rml_send_t *snd; if (orte_abnormal_term_ordered) { return; @@ -534,52 +532,26 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata) peer->recv_msg->hdr.nbytes); OBJ_RELEASE(peer->recv_msg); } else { - /* no - find the next hop in the route */ - hop = orte_routed.get_route(&peer->recv_msg->hdr.dst); - if (hop.jobid == ORTE_JOBID_INVALID || - hop.vpid == ORTE_VPID_INVALID) { - /* no hop known - post the error to the component - * and let the OOB see if there is another way - * to get there from here - */ - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s NO ROUTE TO %s FROM HERE", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name)); - /* let the component know about the problem */ - ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, peer->recv_msg, &hop, mca_oob_tcp_component_no_route); - /* cleanup */ - OBJ_RELEASE(peer->recv_msg); - return; - } else { - /* does we know how to reach the next hop? */ - memcpy(&ui64, (char*)&hop, sizeof(uint64_t)); - if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_module.peers, ui64, (void**)&relay)) { - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s ADDRESS OF NEXT HOP %s TO %s IS UNKNOWN", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&hop), - ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst)); - /* let the component know about the problem */ - ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, peer->recv_msg, &hop, mca_oob_tcp_component_hop_unknown); - /* cleanup */ - OBJ_RELEASE(peer->recv_msg); - return; - } - opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s ROUTING TO %s FROM HERE", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&relay->name)); - /* if this came from a different job family, then ensure - * we know how to return - */ - if (ORTE_JOB_FAMILY(peer->recv_msg->hdr.origin.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) { - orte_routed.update_route(&(peer->recv_msg->hdr.origin), &peer->name); - } - /* post the message for retransmission */ - MCA_OOB_TCP_QUEUE_RELAY(peer->recv_msg, relay); - OBJ_RELEASE(peer->recv_msg); - } + /* promote this to the OOB as some other transport might + * be the next best hop */ + opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s TCP PROMOTING ROUTED MESSAGE FOR %s TO OOB", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst)); + snd = OBJ_NEW(orte_rml_send_t); + snd->dst = peer->recv_msg->hdr.dst; + snd->origin = peer->recv_msg->hdr.origin; + snd->tag = peer->recv_msg->hdr.tag; + snd->data = peer->recv_msg->data; + snd->count = peer->recv_msg->hdr.nbytes; + snd->cbfunc.iov = NULL; + snd->cbdata = NULL; + /* activate the OOB send state */ + ORTE_OOB_SEND(snd); + /* protect the data */ + peer->recv_msg->data = NULL; + /* cleanup */ + OBJ_RELEASE(peer->recv_msg); } peer->recv_msg = NULL; return; diff --git a/orte/mca/oob/usock/.opal_ignore b/orte/mca/oob/usock/.opal_ignore deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/orte/mca/oob/usock/oob_usock_component.c b/orte/mca/oob/usock/oob_usock_component.c index 97b1da41a9..d3dfec5778 100644 --- a/orte/mca/oob/usock/oob_usock_component.c +++ b/orte/mca/oob/usock/oob_usock_component.c @@ -192,16 +192,6 @@ static int component_startup(void) "%s USOCK STARTUP", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - /* if the session directory has not already been setup, do so */ - if (NULL == orte_process_info.top_session_dir) { - if (ORTE_SUCCESS != (rc = orte_session_dir(true, - orte_process_info.tmpdir_base, - orte_process_info.nodename, NULL, - ORTE_PROC_MY_NAME))) { - ORTE_ERROR_LOG(rc); - return rc; - } - } /* setup the path to the daemon rendezvous point */ memset(&mca_oob_usock_component.address, 0, sizeof(struct sockaddr_un)); mca_oob_usock_component.address.sun_family = AF_UNIX; @@ -220,6 +210,14 @@ static int component_startup(void) if (ORTE_SUCCESS != (rc = orte_oob_usock_start_listening())) { ORTE_ERROR_LOG(rc); } + } else { + /* if the rendezvous point isn't there, then that's an error */ + /* if the rendezvous file doesn't exist, that's an error */ + if (0 != access(mca_oob_usock_component.address.sun_path, R_OK)) { + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "SUNPATH: %s NOT READABLE", mca_oob_usock_component.address.sun_path); + return OPAL_ERR_NOT_FOUND; + } } /* start the module */ @@ -263,7 +261,7 @@ static int component_send(orte_rml_send_t *msg) if (NULL == (proc = orte_get_proc_object(&msg->dst))) { return ORTE_ERR_TAKE_NEXT_OPTION; } - if (!proc->local_proc) { + if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) { return ORTE_ERR_TAKE_NEXT_OPTION; } } @@ -308,8 +306,15 @@ static int component_set_addr(orte_process_name_t *peer, pr = OBJ_NEW(mca_oob_usock_peer_t); pr->name = *peer; opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), pr); - return ORTE_SUCCESS; } + if (ORTE_PROC_MY_DAEMON->jobid == peer->jobid) { + /* we have to initiate the connection because otherwise the + * daemon has no way to communicate to us via this component + * as the app doesn't have a listening port */ + pr->state = MCA_OOB_USOCK_CONNECTING; + ORTE_ACTIVATE_USOCK_CONN_STATE(pr, mca_oob_usock_peer_try_connect); + } + return ORTE_SUCCESS; } /* if I am a daemon or HNP, I can only reach my @@ -320,7 +325,7 @@ static int component_set_addr(orte_process_name_t *peer, return ORTE_ERR_TAKE_NEXT_OPTION; } if (NULL == (proc = orte_get_proc_object(peer)) || - !proc->local_proc) { + !ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) { return ORTE_ERR_TAKE_NEXT_OPTION; } /* indicate that this peer is addressable by this component */ @@ -478,7 +483,7 @@ static bool component_is_reachable(orte_process_name_t *peer) return false; } if (NULL == (proc = orte_get_proc_object(peer)) || - !proc->local_proc) { + !ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) { return false; } /* indicate that this peer is reachable by this component */ @@ -526,6 +531,7 @@ static void peer_cons(mca_oob_usock_peer_t *peer) { peer->sd = -1; peer->state = MCA_OOB_USOCK_UNCONNECTED; + peer->retries = 0; OBJ_CONSTRUCT(&peer->send_queue, opal_list_t); peer->send_msg = NULL; peer->recv_msg = NULL; diff --git a/orte/mca/oob/usock/oob_usock_connection.c b/orte/mca/oob/usock/oob_usock_connection.c index eb26f497b8..e653b09bc7 100644 --- a/orte/mca/oob/usock/oob_usock_connection.c +++ b/orte/mca/oob/usock/oob_usock_connection.c @@ -51,6 +51,7 @@ #include "opal_stdint.h" #include "opal/mca/backtrace/backtrace.h" #include "opal/mca/base/mca_base_var.h" +#include "opal/mca/dstore/dstore.h" #include "opal/mca/sec/sec.h" #include "opal/util/output.h" #include "opal/util/net.h" @@ -139,7 +140,6 @@ void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata) int rc; opal_socklen_t addrlen = 0; mca_oob_usock_send_t *snd; - bool connected = false; opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s orte_usock_peer_try_connect: " @@ -211,40 +211,28 @@ void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata) /* We were unsuccessful in establishing this connection, and are * not likely to suddenly become successful, */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s orte_usock_peer_try_connect: " + "Connection across unix domain socket to local proc %s failed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); peer->state = MCA_OOB_USOCK_FAILED; - connected = false; + CLOSE_THE_SOCKET(peer->sd); + /* let the USOCK component know that this module failed to make + * the connection so it can try other modules, and/or fail back + * to the OOB level so another component can try. This will activate + * an event in the component event base, and so it will fire async + * from us if we are in our own progress thread + */ + ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_failed_to_connect); + OBJ_RELEASE(op); + return; } } - } else { - /* connection succeeded */ - peer->retries = 0; - connected = true; } - if (!connected) { - /* we cannot reach this peer */ - peer->state = MCA_OOB_USOCK_FAILED; - opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, - "%s orte_usock_peer_try_connect: " - "Connection across unix domain socket to local proc %s failed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&peer->name)); - /* let the USOCK component know that this module failed to make - * the connection so it can try other modules, and/or fail back - * to the OOB level so another component can try. This will activate - * an event in the component event base, and so it will fire async - * from us if we are in our own progress thread - */ - ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_failed_to_connect); - /* FIXME: post any messages in the send queue back to the OOB - * level for reassignment - */ - if (NULL != peer->send_msg) { - } - while (NULL != (snd = (mca_oob_usock_send_t*)opal_list_remove_first(&peer->send_queue))) { - } - goto cleanup; - } + /* connection succeeded */ + peer->retries = 0; opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, "%s orte_usock_peer_try_connect: " @@ -271,7 +259,6 @@ void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata) ORTE_FORCED_TERMINATE(1); } - cleanup: OBJ_RELEASE(op); } @@ -295,7 +282,8 @@ static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer) hdr.tag = 0; /* get our security credential*/ - if (OPAL_SUCCESS != (rc = opal_sec.get_my_credential((opal_identifier_t*)ORTE_PROC_MY_NAME, &cred))) { + if (OPAL_SUCCESS != (rc = opal_sec.get_my_credential(opal_dstore_internal, + (opal_identifier_t*)ORTE_PROC_MY_NAME, &cred))) { ORTE_ERROR_LOG(rc); return rc; } diff --git a/orte/mca/oob/usock/oob_usock_listener.c b/orte/mca/oob/usock/oob_usock_listener.c index 617b5bdd81..5d4fa460b1 100644 --- a/orte/mca/oob/usock/oob_usock_listener.c +++ b/orte/mca/oob/usock/oob_usock_listener.c @@ -82,6 +82,11 @@ int orte_oob_usock_start_listening(void) opal_socklen_t addrlen; int sd = -1; + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s START USOCK LISTENING ON %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + mca_oob_usock_component.address.sun_path); + /* create a listen socket for incoming connection attempts */ sd = socket(PF_UNIX, SOCK_STREAM, 0); if (sd < 0) { diff --git a/orte/orted/pmix/pmix_server_sendrecv.c b/orte/orted/pmix/pmix_server_sendrecv.c index e599d290e9..dfdaae70c2 100644 --- a/orte/orted/pmix/pmix_server_sendrecv.c +++ b/orte/orted/pmix/pmix_server_sendrecv.c @@ -693,9 +693,10 @@ static void process_message(pmix_server_peer_t *peer) case PMIX_FENCE_CMD: case PMIX_FENCENB_CMD: opal_output_verbose(2, pmix_server_output, - "%s recvd %s", + "%s recvd %s FROM PROC %s ON TAG %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (PMIX_FENCENB_CMD == cmd) ? "FENCE_NB" : "FENCE"); + (PMIX_FENCENB_CMD == cmd) ? "FENCE_NB" : "FENCE", + OPAL_NAME_PRINT(id), tag); /* setup a signature object */ sig = OBJ_NEW(orte_grpcomm_signature_t); /* get the number of procs in this fence collective */ diff --git a/orte/test/system/oob_stress.c b/orte/test/system/oob_stress.c index 3efa895328..a10b3ce06d 100644 --- a/orte/test/system/oob_stress.c +++ b/orte/test/system/oob_stress.c @@ -13,11 +13,25 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/runtime/runtime.h" +#include "orte/runtime/orte_wait.h" #define MY_TAG 12345 #define MAX_COUNT 3 static bool msg_recvd; +static volatile bool msg_active; + +static void send_callback(int status, orte_process_name_t *peer, + opal_buffer_t* buffer, orte_rml_tag_t tag, + void* cbdata) + +{ + OBJ_RELEASE(buffer); + if (ORTE_SUCCESS != status) { + exit(1); + } + msg_active = false; +} int @@ -90,7 +104,9 @@ main(int argc, char *argv[]){ buf = OBJ_NEW(opal_buffer_t); opal_dss.copy_payload(buf, &blob.data); OBJ_DESTRUCT(&blob); - orte_rml.send_buffer_nb(&peer, buf, MY_TAG, orte_rml_send_callback, NULL); + msg_active = true; + orte_rml.send_buffer_nb(&peer, buf, MY_TAG, send_callback, NULL); + ORTE_WAIT_FOR_COMPLETION(msg_active); } }