Re-enable the usock oob component. Ensure the TCP component promotes messages for other procs to the OOB base so that other components have a chance to send the relay. Seems to be passing MTT, so let's see how it works for others.
This commit was SVN r32650.
Этот коммит содержится в:
родитель
a2085a5916
Коммит
5cdbc00136
@ -132,12 +132,19 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
|
||||
break;
|
||||
case PMIX_USOCK_CONNECTED:
|
||||
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
|
||||
"%s usock:send_handler SENDING TO SERVER",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
|
||||
"%s usock:send_handler SENDING TO SERVER with %s msg",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
|
||||
(NULL == msg) ? "NULL" : "NON-NULL");
|
||||
if (NULL != msg) {
|
||||
if (!msg->hdr_sent) {
|
||||
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
|
||||
"%s usock:send_handler SENDING HEADER",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
|
||||
if (OPAL_SUCCESS == (rc = send_bytes(msg))) {
|
||||
/* header is completely sent */
|
||||
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
|
||||
"%s usock:send_handler HEADER SENT",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
|
||||
msg->hdr_sent = true;
|
||||
/* setup to send the data */
|
||||
if (NULL == msg->data) {
|
||||
@ -154,6 +161,9 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
|
||||
} else if (OPAL_ERR_RESOURCE_BUSY == rc ||
|
||||
OPAL_ERR_WOULD_BLOCK == rc) {
|
||||
/* exit this event and let the event lib progress */
|
||||
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
|
||||
"%s usock:send_handler RES BUSY OR WOULD BLOCK",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
|
||||
return;
|
||||
} else {
|
||||
// report the error
|
||||
@ -169,14 +179,23 @@ void pmix_usock_send_handler(int sd, short flags, void *cbdata)
|
||||
}
|
||||
|
||||
if (msg->hdr_sent) {
|
||||
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
|
||||
"%s usock:send_handler SENDING BODY OF MSG",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
|
||||
if (OPAL_SUCCESS == (rc = send_bytes(msg))) {
|
||||
// message is complete
|
||||
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
|
||||
"%s usock:send_handler BODY SENT",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
|
||||
OBJ_RELEASE(msg);
|
||||
mca_pmix_native_component.send_msg = NULL;
|
||||
goto next;
|
||||
} else if (OPAL_ERR_RESOURCE_BUSY == rc ||
|
||||
OPAL_ERR_WOULD_BLOCK == rc) {
|
||||
/* exit this event and let the event lib progress */
|
||||
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
|
||||
"%s usock:send_handler RES BUSY OR WOULD BLOCK",
|
||||
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
|
||||
return;
|
||||
} else {
|
||||
// report the error
|
||||
|
@ -133,6 +133,58 @@ int orte_ess_base_app_setup(bool db_restrict_local)
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* setup my session directory */
|
||||
if (orte_create_session_dirs) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_framework.framework_output,
|
||||
"%s setting up session dir with\n\ttmpdir: %s\n\thost %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename));
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(true,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Once the session directory location has been established, set
|
||||
the opal_output env file location to be in the
|
||||
proc-specific session directory. */
|
||||
opal_output_set_output_file_info(orte_process_info.proc_session_dir,
|
||||
"output-", NULL, NULL);
|
||||
|
||||
/* store the session directory location in the database */
|
||||
OBJ_CONSTRUCT(&kv, opal_value_t);
|
||||
kv.key = strdup(OPAL_DSTORE_JOB_SDIR);
|
||||
kv.type = OPAL_STRING;
|
||||
kv.data.string = strdup(orte_process_info.job_session_dir);
|
||||
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal,
|
||||
(opal_identifier_t*)ORTE_PROC_MY_NAME,
|
||||
&kv))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_DESTRUCT(&kv);
|
||||
error = "opal dstore store";
|
||||
goto error;
|
||||
}
|
||||
OBJ_DESTRUCT(&kv);
|
||||
OBJ_CONSTRUCT(&kv, opal_value_t);
|
||||
kv.key = strdup(OPAL_DSTORE_MY_SDIR);
|
||||
kv.type = OPAL_STRING;
|
||||
kv.data.string = strdup(orte_process_info.proc_session_dir);
|
||||
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal,
|
||||
(opal_identifier_t*)ORTE_PROC_MY_NAME,
|
||||
&kv))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_DESTRUCT(&kv);
|
||||
error = "opal dstore store";
|
||||
goto error;
|
||||
}
|
||||
OBJ_DESTRUCT(&kv);
|
||||
}
|
||||
|
||||
/* Setup the communication infrastructure */
|
||||
/*
|
||||
* OOB Layer
|
||||
@ -220,58 +272,6 @@ int orte_ess_base_app_setup(bool db_restrict_local)
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* setup my session directory */
|
||||
if (orte_create_session_dirs) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_framework.framework_output,
|
||||
"%s setting up session dir with\n\ttmpdir: %s\n\thost %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename));
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(true,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Once the session directory location has been established, set
|
||||
the opal_output env file location to be in the
|
||||
proc-specific session directory. */
|
||||
opal_output_set_output_file_info(orte_process_info.proc_session_dir,
|
||||
"output-", NULL, NULL);
|
||||
|
||||
/* store the session directory location in the database */
|
||||
OBJ_CONSTRUCT(&kv, opal_value_t);
|
||||
kv.key = strdup(OPAL_DSTORE_JOB_SDIR);
|
||||
kv.type = OPAL_STRING;
|
||||
kv.data.string = strdup(orte_process_info.job_session_dir);
|
||||
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal,
|
||||
(opal_identifier_t*)ORTE_PROC_MY_NAME,
|
||||
&kv))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_DESTRUCT(&kv);
|
||||
error = "opal dstore store";
|
||||
goto error;
|
||||
}
|
||||
OBJ_DESTRUCT(&kv);
|
||||
OBJ_CONSTRUCT(&kv, opal_value_t);
|
||||
kv.key = strdup(OPAL_DSTORE_MY_SDIR);
|
||||
kv.type = OPAL_STRING;
|
||||
kv.data.string = strdup(orte_process_info.proc_session_dir);
|
||||
if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal,
|
||||
(opal_identifier_t*)ORTE_PROC_MY_NAME,
|
||||
&kv))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
OBJ_DESTRUCT(&kv);
|
||||
error = "opal dstore store";
|
||||
goto error;
|
||||
}
|
||||
OBJ_DESTRUCT(&kv);
|
||||
}
|
||||
|
||||
/* setup the routed info */
|
||||
if (ORTE_SUCCESS != (ret = orte_routed.init_routes(ORTE_PROC_MY_NAME->jobid, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
|
@ -238,6 +238,84 @@ int orte_ess_base_orted_setup(char **hosts)
|
||||
}
|
||||
}
|
||||
|
||||
/* setup my session directory here as the OOB may need it */
|
||||
if (orte_create_session_dirs) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_framework.framework_output,
|
||||
"%s setting up session dir with\n\ttmpdir: %s\n\thost %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename));
|
||||
|
||||
/* take a pass thru the session directory code to fillin the
|
||||
* tmpdir names - don't create anything yet
|
||||
*/
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(false,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir define";
|
||||
goto error;
|
||||
}
|
||||
/* clear the session directory just in case there are
|
||||
* stale directories laying around
|
||||
*/
|
||||
orte_session_dir_cleanup(ORTE_JOBID_WILDCARD);
|
||||
|
||||
/* now actually create the directory tree */
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(true,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* set the opal_output env file location to be in the
|
||||
* proc-specific session directory. */
|
||||
opal_output_set_output_file_info(orte_process_info.proc_session_dir,
|
||||
"output-", NULL, NULL);
|
||||
|
||||
/* setup stdout/stderr */
|
||||
if (orte_debug_daemons_file_flag) {
|
||||
/* if we are debugging to a file, then send stdout/stderr to
|
||||
* the orted log file
|
||||
*/
|
||||
|
||||
/* get my jobid */
|
||||
if (ORTE_SUCCESS != (ret = orte_util_convert_jobid_to_string(&jobidstring,
|
||||
ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "convert_jobid";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* define a log file name in the session directory */
|
||||
snprintf(log_file, PATH_MAX, "output-orted-%s-%s.log",
|
||||
jobidstring, orte_process_info.nodename);
|
||||
log_path = opal_os_path(false,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.top_session_dir,
|
||||
log_file,
|
||||
NULL);
|
||||
|
||||
fd = open(log_path, O_RDWR|O_CREAT|O_TRUNC, 0640);
|
||||
if (fd < 0) {
|
||||
/* couldn't open the file for some reason, so
|
||||
* just connect everything to /dev/null
|
||||
*/
|
||||
fd = open("/dev/null", O_RDWR|O_CREAT|O_TRUNC, 0666);
|
||||
} else {
|
||||
dup2(fd, STDOUT_FILENO);
|
||||
dup2(fd, STDERR_FILENO);
|
||||
if(fd != STDOUT_FILENO && fd != STDERR_FILENO) {
|
||||
close(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Setup the communication infrastructure */
|
||||
if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_oob_base_framework, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
@ -368,84 +446,6 @@ int orte_ess_base_orted_setup(char **hosts)
|
||||
}
|
||||
}
|
||||
|
||||
/* setup my session directory */
|
||||
if (orte_create_session_dirs) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_framework.framework_output,
|
||||
"%s setting up session dir with\n\ttmpdir: %s\n\thost %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename));
|
||||
|
||||
/* take a pass thru the session directory code to fillin the
|
||||
* tmpdir names - don't create anything yet
|
||||
*/
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(false,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir define";
|
||||
goto error;
|
||||
}
|
||||
/* clear the session directory just in case there are
|
||||
* stale directories laying around
|
||||
*/
|
||||
orte_session_dir_cleanup(ORTE_JOBID_WILDCARD);
|
||||
|
||||
/* now actually create the directory tree */
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(true,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir";
|
||||
goto error;
|
||||
}
|
||||
/* Once the session directory location has been established, set
|
||||
the opal_output env file location to be in the
|
||||
proc-specific session directory. */
|
||||
opal_output_set_output_file_info(orte_process_info.proc_session_dir,
|
||||
"output-", NULL, NULL);
|
||||
|
||||
/* setup stdout/stderr */
|
||||
if (orte_debug_daemons_file_flag) {
|
||||
/* if we are debugging to a file, then send stdout/stderr to
|
||||
* the orted log file
|
||||
*/
|
||||
|
||||
/* get my jobid */
|
||||
if (ORTE_SUCCESS != (ret = orte_util_convert_jobid_to_string(&jobidstring,
|
||||
ORTE_PROC_MY_NAME->jobid))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "convert_jobid";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* define a log file name in the session directory */
|
||||
snprintf(log_file, PATH_MAX, "output-orted-%s-%s.log",
|
||||
jobidstring, orte_process_info.nodename);
|
||||
log_path = opal_os_path(false,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.top_session_dir,
|
||||
log_file,
|
||||
NULL);
|
||||
|
||||
fd = open(log_path, O_RDWR|O_CREAT|O_TRUNC, 0640);
|
||||
if (fd < 0) {
|
||||
/* couldn't open the file for some reason, so
|
||||
* just connect everything to /dev/null
|
||||
*/
|
||||
fd = open("/dev/null", O_RDWR|O_CREAT|O_TRUNC, 0666);
|
||||
} else {
|
||||
dup2(fd, STDOUT_FILENO);
|
||||
dup2(fd, STDERR_FILENO);
|
||||
if(fd != STDOUT_FILENO && fd != STDERR_FILENO) {
|
||||
close(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* setup the global job and node arrays */
|
||||
orte_job_data = OBJ_NEW(opal_pointer_array_t);
|
||||
if (ORTE_SUCCESS != (ret = opal_pointer_array_init(orte_job_data,
|
||||
|
@ -314,6 +314,41 @@ static int rte_init(void)
|
||||
orte_process_info.super.proc_arch = opal_local_arch;
|
||||
opal_proc_local_set(&orte_process_info.super);
|
||||
|
||||
/* setup my session directory here as the OOB may need it */
|
||||
if (orte_create_session_dirs) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_debug_output,
|
||||
"%s setting up session dir with\n\ttmpdir: %s\n\thost %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename));
|
||||
|
||||
/* take a pass thru the session directory code to fillin the
|
||||
* tmpdir names - don't create anything yet
|
||||
*/
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(false,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir define";
|
||||
goto error;
|
||||
}
|
||||
/* clear the session directory just in case there are
|
||||
* stale directories laying around
|
||||
*/
|
||||
orte_session_dir_cleanup(ORTE_JOBID_WILDCARD);
|
||||
|
||||
/* now actually create the directory tree */
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(true,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir";
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
|
||||
/* Setup the communication infrastructure */
|
||||
|
||||
/*
|
||||
@ -599,43 +634,9 @@ static int rte_init(void)
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SHOW_HELP,
|
||||
ORTE_RML_PERSISTENT, orte_show_help_recv, NULL);
|
||||
|
||||
/* setup my session directory */
|
||||
if (orte_create_session_dirs) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_debug_output,
|
||||
"%s setting up session dir with\n\ttmpdir: %s\n\thost %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(NULL == orte_process_info.tmpdir_base) ? "UNDEF" : orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename));
|
||||
|
||||
/* take a pass thru the session directory code to fillin the
|
||||
* tmpdir names - don't create anything yet
|
||||
*/
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(false,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir define";
|
||||
goto error;
|
||||
}
|
||||
/* clear the session directory just in case there are
|
||||
* stale directories laying around
|
||||
*/
|
||||
orte_session_dir_cleanup(ORTE_JOBID_WILDCARD);
|
||||
|
||||
/* now actually create the directory tree */
|
||||
if (ORTE_SUCCESS != (ret = orte_session_dir(true,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
error = "orte_session_dir";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Once the session directory location has been established, set
|
||||
the opal_output hnp file location to be in the
|
||||
proc-specific session directory. */
|
||||
/* set the opal_output hnp file location to be in the
|
||||
* proc-specific session directory. */
|
||||
opal_output_set_output_file_info(orte_process_info.proc_session_dir,
|
||||
"output-", NULL, NULL);
|
||||
|
||||
|
@ -397,9 +397,7 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
|
||||
{
|
||||
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)cbdata;
|
||||
int rc;
|
||||
orte_process_name_t hop;
|
||||
mca_oob_tcp_peer_t *relay;
|
||||
uint64_t ui64;
|
||||
orte_rml_send_t *snd;
|
||||
|
||||
if (orte_abnormal_term_ordered) {
|
||||
return;
|
||||
@ -534,52 +532,26 @@ void mca_oob_tcp_recv_handler(int sd, short flags, void *cbdata)
|
||||
peer->recv_msg->hdr.nbytes);
|
||||
OBJ_RELEASE(peer->recv_msg);
|
||||
} else {
|
||||
/* no - find the next hop in the route */
|
||||
hop = orte_routed.get_route(&peer->recv_msg->hdr.dst);
|
||||
if (hop.jobid == ORTE_JOBID_INVALID ||
|
||||
hop.vpid == ORTE_VPID_INVALID) {
|
||||
/* no hop known - post the error to the component
|
||||
* and let the OOB see if there is another way
|
||||
* to get there from here
|
||||
*/
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s NO ROUTE TO %s FROM HERE",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
/* let the component know about the problem */
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, peer->recv_msg, &hop, mca_oob_tcp_component_no_route);
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(peer->recv_msg);
|
||||
return;
|
||||
} else {
|
||||
/* does we know how to reach the next hop? */
|
||||
memcpy(&ui64, (char*)&hop, sizeof(uint64_t));
|
||||
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_tcp_module.peers, ui64, (void**)&relay)) {
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s ADDRESS OF NEXT HOP %s TO %s IS UNKNOWN",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&hop),
|
||||
ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst));
|
||||
/* let the component know about the problem */
|
||||
ORTE_ACTIVATE_TCP_MSG_ERROR(NULL, peer->recv_msg, &hop, mca_oob_tcp_component_hop_unknown);
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(peer->recv_msg);
|
||||
return;
|
||||
}
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s ROUTING TO %s FROM HERE",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&relay->name));
|
||||
/* if this came from a different job family, then ensure
|
||||
* we know how to return
|
||||
*/
|
||||
if (ORTE_JOB_FAMILY(peer->recv_msg->hdr.origin.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||
orte_routed.update_route(&(peer->recv_msg->hdr.origin), &peer->name);
|
||||
}
|
||||
/* post the message for retransmission */
|
||||
MCA_OOB_TCP_QUEUE_RELAY(peer->recv_msg, relay);
|
||||
OBJ_RELEASE(peer->recv_msg);
|
||||
}
|
||||
/* promote this to the OOB as some other transport might
|
||||
* be the next best hop */
|
||||
opal_output_verbose(OOB_TCP_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s TCP PROMOTING ROUTED MESSAGE FOR %s TO OOB",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst));
|
||||
snd = OBJ_NEW(orte_rml_send_t);
|
||||
snd->dst = peer->recv_msg->hdr.dst;
|
||||
snd->origin = peer->recv_msg->hdr.origin;
|
||||
snd->tag = peer->recv_msg->hdr.tag;
|
||||
snd->data = peer->recv_msg->data;
|
||||
snd->count = peer->recv_msg->hdr.nbytes;
|
||||
snd->cbfunc.iov = NULL;
|
||||
snd->cbdata = NULL;
|
||||
/* activate the OOB send state */
|
||||
ORTE_OOB_SEND(snd);
|
||||
/* protect the data */
|
||||
peer->recv_msg->data = NULL;
|
||||
/* cleanup */
|
||||
OBJ_RELEASE(peer->recv_msg);
|
||||
}
|
||||
peer->recv_msg = NULL;
|
||||
return;
|
||||
|
@ -192,16 +192,6 @@ static int component_startup(void)
|
||||
"%s USOCK STARTUP",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
|
||||
/* if the session directory has not already been setup, do so */
|
||||
if (NULL == orte_process_info.top_session_dir) {
|
||||
if (ORTE_SUCCESS != (rc = orte_session_dir(true,
|
||||
orte_process_info.tmpdir_base,
|
||||
orte_process_info.nodename, NULL,
|
||||
ORTE_PROC_MY_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
/* setup the path to the daemon rendezvous point */
|
||||
memset(&mca_oob_usock_component.address, 0, sizeof(struct sockaddr_un));
|
||||
mca_oob_usock_component.address.sun_family = AF_UNIX;
|
||||
@ -220,6 +210,14 @@ static int component_startup(void)
|
||||
if (ORTE_SUCCESS != (rc = orte_oob_usock_start_listening())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
} else {
|
||||
/* if the rendezvous point isn't there, then that's an error */
|
||||
/* if the rendezvous file doesn't exist, that's an error */
|
||||
if (0 != access(mca_oob_usock_component.address.sun_path, R_OK)) {
|
||||
opal_output_verbose(2, orte_oob_base_framework.framework_output,
|
||||
"SUNPATH: %s NOT READABLE", mca_oob_usock_component.address.sun_path);
|
||||
return OPAL_ERR_NOT_FOUND;
|
||||
}
|
||||
}
|
||||
|
||||
/* start the module */
|
||||
@ -263,7 +261,7 @@ static int component_send(orte_rml_send_t *msg)
|
||||
if (NULL == (proc = orte_get_proc_object(&msg->dst))) {
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
}
|
||||
if (!proc->local_proc) {
|
||||
if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) {
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
}
|
||||
}
|
||||
@ -308,8 +306,15 @@ static int component_set_addr(orte_process_name_t *peer,
|
||||
pr = OBJ_NEW(mca_oob_usock_peer_t);
|
||||
pr->name = *peer;
|
||||
opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), pr);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
if (ORTE_PROC_MY_DAEMON->jobid == peer->jobid) {
|
||||
/* we have to initiate the connection because otherwise the
|
||||
* daemon has no way to communicate to us via this component
|
||||
* as the app doesn't have a listening port */
|
||||
pr->state = MCA_OOB_USOCK_CONNECTING;
|
||||
ORTE_ACTIVATE_USOCK_CONN_STATE(pr, mca_oob_usock_peer_try_connect);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* if I am a daemon or HNP, I can only reach my
|
||||
@ -320,7 +325,7 @@ static int component_set_addr(orte_process_name_t *peer,
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
}
|
||||
if (NULL == (proc = orte_get_proc_object(peer)) ||
|
||||
!proc->local_proc) {
|
||||
!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) {
|
||||
return ORTE_ERR_TAKE_NEXT_OPTION;
|
||||
}
|
||||
/* indicate that this peer is addressable by this component */
|
||||
@ -478,7 +483,7 @@ static bool component_is_reachable(orte_process_name_t *peer)
|
||||
return false;
|
||||
}
|
||||
if (NULL == (proc = orte_get_proc_object(peer)) ||
|
||||
!proc->local_proc) {
|
||||
!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) {
|
||||
return false;
|
||||
}
|
||||
/* indicate that this peer is reachable by this component */
|
||||
@ -526,6 +531,7 @@ static void peer_cons(mca_oob_usock_peer_t *peer)
|
||||
{
|
||||
peer->sd = -1;
|
||||
peer->state = MCA_OOB_USOCK_UNCONNECTED;
|
||||
peer->retries = 0;
|
||||
OBJ_CONSTRUCT(&peer->send_queue, opal_list_t);
|
||||
peer->send_msg = NULL;
|
||||
peer->recv_msg = NULL;
|
||||
|
@ -51,6 +51,7 @@
|
||||
#include "opal_stdint.h"
|
||||
#include "opal/mca/backtrace/backtrace.h"
|
||||
#include "opal/mca/base/mca_base_var.h"
|
||||
#include "opal/mca/dstore/dstore.h"
|
||||
#include "opal/mca/sec/sec.h"
|
||||
#include "opal/util/output.h"
|
||||
#include "opal/util/net.h"
|
||||
@ -139,7 +140,6 @@ void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata)
|
||||
int rc;
|
||||
opal_socklen_t addrlen = 0;
|
||||
mca_oob_usock_send_t *snd;
|
||||
bool connected = false;
|
||||
|
||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s orte_usock_peer_try_connect: "
|
||||
@ -211,40 +211,28 @@ void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata)
|
||||
/* We were unsuccessful in establishing this connection, and are
|
||||
* not likely to suddenly become successful,
|
||||
*/
|
||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s orte_usock_peer_try_connect: "
|
||||
"Connection across unix domain socket to local proc %s failed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
peer->state = MCA_OOB_USOCK_FAILED;
|
||||
connected = false;
|
||||
CLOSE_THE_SOCKET(peer->sd);
|
||||
/* let the USOCK component know that this module failed to make
|
||||
* the connection so it can try other modules, and/or fail back
|
||||
* to the OOB level so another component can try. This will activate
|
||||
* an event in the component event base, and so it will fire async
|
||||
* from us if we are in our own progress thread
|
||||
*/
|
||||
ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_failed_to_connect);
|
||||
OBJ_RELEASE(op);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/* connection succeeded */
|
||||
peer->retries = 0;
|
||||
connected = true;
|
||||
}
|
||||
|
||||
if (!connected) {
|
||||
/* we cannot reach this peer */
|
||||
peer->state = MCA_OOB_USOCK_FAILED;
|
||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s orte_usock_peer_try_connect: "
|
||||
"Connection across unix domain socket to local proc %s failed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&peer->name));
|
||||
/* let the USOCK component know that this module failed to make
|
||||
* the connection so it can try other modules, and/or fail back
|
||||
* to the OOB level so another component can try. This will activate
|
||||
* an event in the component event base, and so it will fire async
|
||||
* from us if we are in our own progress thread
|
||||
*/
|
||||
ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_failed_to_connect);
|
||||
/* FIXME: post any messages in the send queue back to the OOB
|
||||
* level for reassignment
|
||||
*/
|
||||
if (NULL != peer->send_msg) {
|
||||
}
|
||||
while (NULL != (snd = (mca_oob_usock_send_t*)opal_list_remove_first(&peer->send_queue))) {
|
||||
}
|
||||
goto cleanup;
|
||||
}
|
||||
/* connection succeeded */
|
||||
peer->retries = 0;
|
||||
|
||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s orte_usock_peer_try_connect: "
|
||||
@ -271,7 +259,6 @@ void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata)
|
||||
ORTE_FORCED_TERMINATE(1);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_RELEASE(op);
|
||||
}
|
||||
|
||||
@ -295,7 +282,8 @@ static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer)
|
||||
hdr.tag = 0;
|
||||
|
||||
/* get our security credential*/
|
||||
if (OPAL_SUCCESS != (rc = opal_sec.get_my_credential((opal_identifier_t*)ORTE_PROC_MY_NAME, &cred))) {
|
||||
if (OPAL_SUCCESS != (rc = opal_sec.get_my_credential(opal_dstore_internal,
|
||||
(opal_identifier_t*)ORTE_PROC_MY_NAME, &cred))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
@ -82,6 +82,11 @@ int orte_oob_usock_start_listening(void)
|
||||
opal_socklen_t addrlen;
|
||||
int sd = -1;
|
||||
|
||||
opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output,
|
||||
"%s START USOCK LISTENING ON %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
mca_oob_usock_component.address.sun_path);
|
||||
|
||||
/* create a listen socket for incoming connection attempts */
|
||||
sd = socket(PF_UNIX, SOCK_STREAM, 0);
|
||||
if (sd < 0) {
|
||||
|
@ -693,9 +693,10 @@ static void process_message(pmix_server_peer_t *peer)
|
||||
case PMIX_FENCE_CMD:
|
||||
case PMIX_FENCENB_CMD:
|
||||
opal_output_verbose(2, pmix_server_output,
|
||||
"%s recvd %s",
|
||||
"%s recvd %s FROM PROC %s ON TAG %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(PMIX_FENCENB_CMD == cmd) ? "FENCE_NB" : "FENCE");
|
||||
(PMIX_FENCENB_CMD == cmd) ? "FENCE_NB" : "FENCE",
|
||||
OPAL_NAME_PRINT(id), tag);
|
||||
/* setup a signature object */
|
||||
sig = OBJ_NEW(orte_grpcomm_signature_t);
|
||||
/* get the number of procs in this fence collective */
|
||||
|
@ -13,11 +13,25 @@
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
|
||||
#include "orte/runtime/runtime.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#define MY_TAG 12345
|
||||
#define MAX_COUNT 3
|
||||
|
||||
static bool msg_recvd;
|
||||
static volatile bool msg_active;
|
||||
|
||||
static void send_callback(int status, orte_process_name_t *peer,
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
|
||||
{
|
||||
OBJ_RELEASE(buffer);
|
||||
if (ORTE_SUCCESS != status) {
|
||||
exit(1);
|
||||
}
|
||||
msg_active = false;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
@ -90,7 +104,9 @@ main(int argc, char *argv[]){
|
||||
buf = OBJ_NEW(opal_buffer_t);
|
||||
opal_dss.copy_payload(buf, &blob.data);
|
||||
OBJ_DESTRUCT(&blob);
|
||||
orte_rml.send_buffer_nb(&peer, buf, MY_TAG, orte_rml_send_callback, NULL);
|
||||
msg_active = true;
|
||||
orte_rml.send_buffer_nb(&peer, buf, MY_TAG, send_callback, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(msg_active);
|
||||
}
|
||||
}
|
||||
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user