Trying to get the C/R code to compile again. (recv_*_nb)
This patch changes all recv/recv_buffer occurrences in the C/R code to recv_nb/recv_buffer_nb. The old code is still there but disabled using ifdefs (ENABLE_FT_FIXED). The new code compiles but does not work. Changes from V1: * #ifdef out the code (so it is preserved for later re-design) * marked the broken C/R code with ENABLE_FT_FIXED Changes from V2: * only #ifdef out the code where the behaviour is changed (used to be blocking; now non-blocking) This commit was SVN r30035.
Этот коммит содержится в:
родитель
695d854cd8
Коммит
a3813d37c7
@ -4717,7 +4717,6 @@ static int ft_event_post_drain_acks(void)
|
||||
ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack = NULL;
|
||||
opal_list_item_t* item = NULL;
|
||||
size_t req_size;
|
||||
int ret;
|
||||
|
||||
req_size = opal_list_get_size(&drained_msg_ack_list);
|
||||
if(req_size <= 0) {
|
||||
@ -4739,17 +4738,8 @@ static int ft_event_post_drain_acks(void)
|
||||
drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
|
||||
|
||||
/* Post the receive */
|
||||
if( OMPI_SUCCESS != (ret = ompi_rte_recv_buffer_nb( &drain_msg_ack->peer,
|
||||
OMPI_CRCP_COORD_BOOKMARK_TAG,
|
||||
0,
|
||||
drain_message_ack_cbfunc,
|
||||
NULL) ) ) {
|
||||
opal_output(mca_crcp_bkmrk_component.super.output_handle,
|
||||
"crcp:bkmrk: %s <-- %s: Failed to post a RML receive to the peer\n",
|
||||
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
|
||||
OMPI_NAME_PRINT(&(drain_msg_ack->peer)));
|
||||
return ret;
|
||||
}
|
||||
ompi_rte_recv_buffer_nb(&drain_msg_ack->peer, OMPI_CRCP_COORD_BOOKMARK_TAG,
|
||||
0, drain_message_ack_cbfunc, NULL);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -5322,26 +5312,14 @@ static int send_bookmarks(int peer_idx)
|
||||
static int recv_bookmarks(int peer_idx)
|
||||
{
|
||||
ompi_process_name_t peer_name;
|
||||
int exit_status = OMPI_SUCCESS;
|
||||
int ret;
|
||||
|
||||
START_TIMER(CRCP_TIMER_CKPT_EX_PEER_R);
|
||||
|
||||
peer_name.jobid = OMPI_PROC_MY_NAME->jobid;
|
||||
peer_name.vpid = peer_idx;
|
||||
|
||||
if ( 0 > (ret = ompi_rte_recv_buffer_nb(&peer_name,
|
||||
OMPI_CRCP_COORD_BOOKMARK_TAG,
|
||||
0,
|
||||
recv_bookmarks_cbfunc,
|
||||
NULL) ) ) {
|
||||
opal_output(mca_crcp_bkmrk_component.super.output_handle,
|
||||
"crcp:bkmrk: recv_bookmarks: Failed to post receive bookmark from peer %s: Return %d\n",
|
||||
OMPI_NAME_PRINT(&peer_name),
|
||||
ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
ompi_rte_recv_buffer_nb(&peer_name, OMPI_CRCP_COORD_BOOKMARK_TAG,
|
||||
0, recv_bookmarks_cbfunc, NULL);
|
||||
|
||||
++total_recv_bookmarks;
|
||||
|
||||
@ -5350,7 +5328,7 @@ static int recv_bookmarks(int peer_idx)
|
||||
/* JJH Doesn't make much sense to print this. The real bottleneck is always the send_bookmarks() */
|
||||
/*DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_EX_PEER_R, peer_idx, 1);*/
|
||||
|
||||
return exit_status;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static void recv_bookmarks_cbfunc(int status,
|
||||
@ -5616,6 +5594,8 @@ static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
|
||||
/*
|
||||
* Recv the ACK msg
|
||||
*/
|
||||
#ifdef ENABLE_FT_FIXED
|
||||
/* This is the old, now broken code */
|
||||
if ( 0 > (ret = ompi_rte_recv_buffer(&peer_ref->proc_name, buffer,
|
||||
OMPI_CRCP_COORD_BOOKMARK_TAG, 0) ) ) {
|
||||
opal_output(mca_crcp_bkmrk_component.super.output_handle,
|
||||
@ -5626,6 +5606,9 @@ static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
#endif /* ENABLE_FT_FIXED */
|
||||
ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0,
|
||||
orte_rml_recv_callback, NULL);
|
||||
|
||||
UNPACK_BUFFER(buffer, recv_response, 1, OPAL_UINT32,
|
||||
"crcp:bkmrk: send_msg_details: Failed to unpack the ACK from peer buffer.");
|
||||
@ -5790,6 +5773,8 @@ static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
|
||||
/*
|
||||
* Recv the msg
|
||||
*/
|
||||
#ifdef ENABLE_FT_FIXED
|
||||
/* This is the old, now broken code */
|
||||
if ( 0 > (ret = ompi_rte_recv_buffer(&peer_ref->proc_name, buffer, OMPI_CRCP_COORD_BOOKMARK_TAG, 0) ) ) {
|
||||
opal_output(mca_crcp_bkmrk_component.super.output_handle,
|
||||
"crcp:bkmrk: do_recv_msg_detail: %s <-- %s Failed to receive buffer from peer. Return %d\n",
|
||||
@ -5799,6 +5784,8 @@ static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
#endif /* ENABLE_FT_FIXED */
|
||||
ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0, orte_rml_recv_callback, NULL);
|
||||
|
||||
/* Pull out the communicator ID */
|
||||
UNPACK_BUFFER(buffer, (*comm_id), 1, OPAL_UINT32,
|
||||
|
@ -248,8 +248,6 @@ int orte_errmgr_base_migrate_update(int status)
|
||||
********************/
|
||||
static int errmgr_base_tool_start_cmdline_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if (errmgr_cmdline_recv_issued && ORTE_PROC_IS_HNP) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -262,20 +260,12 @@ static int errmgr_base_tool_start_cmdline_listener(void)
|
||||
*/
|
||||
errmgr_cmdline_sender.jobid = ORTE_JOBID_INVALID;
|
||||
errmgr_cmdline_sender.vpid = ORTE_VPID_INVALID;
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_MIGRATE,
|
||||
0,
|
||||
errmgr_base_tool_cmdline_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MIGRATE,
|
||||
0, errmgr_base_tool_cmdline_recv, NULL);
|
||||
|
||||
errmgr_cmdline_recv_issued = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
@ -108,40 +108,21 @@ BEGIN_C_DECLS
|
||||
orte_rml_buffer_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
/*
|
||||
* Recv
|
||||
*/
|
||||
int orte_rml_ftrm_recv(orte_process_name_t* peer,
|
||||
struct iovec *msg,
|
||||
int count,
|
||||
orte_rml_tag_t tag,
|
||||
int flags);
|
||||
|
||||
/*
|
||||
* Recv Non-blocking
|
||||
*/
|
||||
int orte_rml_ftrm_recv_nb(orte_process_name_t* peer,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
void orte_rml_ftrm_recv_nb(orte_process_name_t* peer,
|
||||
orte_rml_tag_t tag,
|
||||
int flags,
|
||||
bool persistent,
|
||||
orte_rml_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
/*
|
||||
* Recv Buffer
|
||||
*/
|
||||
int orte_rml_ftrm_recv_buffer(orte_process_name_t* peer,
|
||||
opal_buffer_t *buf,
|
||||
orte_rml_tag_t tag,
|
||||
int flags);
|
||||
|
||||
/*
|
||||
* Recv Buffer Non-blocking
|
||||
*/
|
||||
int orte_rml_ftrm_recv_buffer_nb(orte_process_name_t* peer,
|
||||
void orte_rml_ftrm_recv_buffer_nb(orte_process_name_t* peer,
|
||||
orte_rml_tag_t tag,
|
||||
int flags,
|
||||
bool persistent,
|
||||
orte_rml_buffer_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
|
@ -73,9 +73,7 @@ orte_rml_module_t orte_rml_ftrm_module = {
|
||||
orte_rml_ftrm_send_buffer,
|
||||
orte_rml_ftrm_send_buffer_nb,
|
||||
|
||||
orte_rml_ftrm_recv,
|
||||
orte_rml_ftrm_recv_nb,
|
||||
orte_rml_ftrm_recv_buffer,
|
||||
orte_rml_ftrm_recv_buffer_nb,
|
||||
orte_rml_ftrm_recv_cancel,
|
||||
|
||||
|
@ -224,101 +224,41 @@ int orte_rml_ftrm_send_buffer_nb(orte_process_name_t* peer,
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Recv
|
||||
*/
|
||||
int orte_rml_ftrm_recv(orte_process_name_t* peer,
|
||||
struct iovec *msg,
|
||||
int count,
|
||||
orte_rml_tag_t tag,
|
||||
int flags)
|
||||
{
|
||||
int ret;
|
||||
|
||||
opal_output_verbose(20, rml_ftrm_output_handle,
|
||||
"orte_rml_ftrm: recv(%s, %d, %d, %d )",
|
||||
ORTE_NAME_PRINT(peer), count, tag, flags);
|
||||
|
||||
if( NULL != orte_rml_ftrm_wrapped_module.recv ) {
|
||||
if( ORTE_SUCCESS != (ret = orte_rml_ftrm_wrapped_module.recv(peer, msg, count, tag, flags) ) ) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Recv Non-blocking
|
||||
*/
|
||||
int orte_rml_ftrm_recv_nb(orte_process_name_t* peer,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
void orte_rml_ftrm_recv_nb(orte_process_name_t* peer,
|
||||
orte_rml_tag_t tag,
|
||||
int flags,
|
||||
bool persistent,
|
||||
orte_rml_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
int ret;
|
||||
|
||||
opal_output_verbose(20, rml_ftrm_output_handle,
|
||||
"orte_rml_ftrm: recv_nb(%s, %d, %d, %d )",
|
||||
ORTE_NAME_PRINT(peer), count, tag, flags);
|
||||
"orte_rml_ftrm: recv_nb(%s, %d, %d )",
|
||||
ORTE_NAME_PRINT(peer), tag, persistent);
|
||||
|
||||
if( NULL != orte_rml_ftrm_wrapped_module.recv_nb ) {
|
||||
if( ORTE_SUCCESS != (ret = orte_rml_ftrm_wrapped_module.recv_nb(peer, msg, count, tag, flags, cbfunc, cbdata) ) ) {
|
||||
return ret;
|
||||
}
|
||||
orte_rml_ftrm_wrapped_module.recv_nb(peer, tag, persistent, cbfunc, cbdata);
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Recv Buffer
|
||||
*/
|
||||
int orte_rml_ftrm_recv_buffer(orte_process_name_t* peer,
|
||||
opal_buffer_t *buf,
|
||||
orte_rml_tag_t tag,
|
||||
int flags)
|
||||
{
|
||||
int ret;
|
||||
|
||||
opal_output_verbose(20, rml_ftrm_output_handle,
|
||||
"orte_rml_ftrm: recv_buffer(%s, %d )",
|
||||
ORTE_NAME_PRINT(peer), tag);
|
||||
|
||||
if( NULL != orte_rml_ftrm_wrapped_module.recv_buffer ) {
|
||||
if( ORTE_SUCCESS != (ret = orte_rml_ftrm_wrapped_module.recv_buffer(peer, buf, tag, flags) ) ) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Recv Buffer Non-blocking
|
||||
*/
|
||||
int orte_rml_ftrm_recv_buffer_nb(orte_process_name_t* peer,
|
||||
void orte_rml_ftrm_recv_buffer_nb(orte_process_name_t* peer,
|
||||
orte_rml_tag_t tag,
|
||||
int flags,
|
||||
bool persistent,
|
||||
orte_rml_buffer_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
int ret;
|
||||
|
||||
opal_output_verbose(20, rml_ftrm_output_handle,
|
||||
"orte_rml_ftrm: recv_buffer_nb(%s, %d, %d)",
|
||||
ORTE_NAME_PRINT(peer), tag, flags);
|
||||
ORTE_NAME_PRINT(peer), tag, persistent);
|
||||
|
||||
if( NULL != orte_rml_ftrm_wrapped_module.recv_buffer_nb ) {
|
||||
if( ORTE_SUCCESS != (ret = orte_rml_ftrm_wrapped_module.recv_buffer_nb(peer, tag, flags, cbfunc, cbdata) ) ) {
|
||||
return ret;
|
||||
}
|
||||
orte_rml_ftrm_wrapped_module.recv_buffer_nb(peer, tag, persistent, cbfunc, cbdata);
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -289,12 +289,16 @@ int app_coord_finalize()
|
||||
* need to wait until the checkpoint is finished before finishing.
|
||||
*/
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
#ifdef ENABLE_FT_FIXED
|
||||
/* This is the old, now broken code */
|
||||
if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
goto cleanup;
|
||||
}
|
||||
#endif /* ENABLE_FT_FIXED */
|
||||
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, NULL);
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
|
||||
@ -1534,12 +1538,16 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
|
||||
/*
|
||||
* Wait for a response regarding completion
|
||||
*/
|
||||
#ifdef ENABLE_FT_FIXED
|
||||
/* This is the old, now broken code */
|
||||
if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
goto cleanup;
|
||||
}
|
||||
#endif /* ENABLE_FT_FIXED */
|
||||
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, NULL);
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
|
||||
@ -1604,12 +1612,16 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
|
||||
/*
|
||||
* Wait for a response regarding completion
|
||||
*/
|
||||
#ifdef ENABLE_FT_FIXED
|
||||
/* This is the old, now broken code */
|
||||
if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
goto cleanup;
|
||||
}
|
||||
#endif /* ENABLE_FT_FIXED */
|
||||
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, NULL);
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
|
||||
|
@ -875,32 +875,22 @@ static int global_refresh_job_structs(void)
|
||||
*****************/
|
||||
static int snapc_full_global_start_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if (snapc_orted_recv_issued && ORTE_PROC_IS_HNP) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
|
||||
"Global) Startup Coordinator Channel"));
|
||||
|
||||
/*
|
||||
* Coordinator command listener
|
||||
*/
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_SNAPC_FULL,
|
||||
ORTE_RML_PERSISTENT,
|
||||
snapc_full_global_orted_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL,
|
||||
ORTE_RML_PERSISTENT, snapc_full_global_orted_recv, NULL);
|
||||
|
||||
snapc_orted_recv_issued = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int snapc_full_global_stop_listener(void)
|
||||
@ -920,8 +910,6 @@ static int snapc_full_global_stop_listener(void)
|
||||
|
||||
static int snapc_full_global_start_cmdline_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if (snapc_cmdline_recv_issued && ORTE_PROC_IS_HNP) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -932,20 +920,11 @@ static int snapc_full_global_start_cmdline_listener(void)
|
||||
/*
|
||||
* Coordinator command listener
|
||||
*/
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_CKPT,
|
||||
0,
|
||||
snapc_full_global_cmdline_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_CKPT, 0,
|
||||
snapc_full_global_cmdline_recv, NULL);
|
||||
|
||||
snapc_cmdline_recv_issued = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int snapc_full_global_stop_cmdline_listener(void)
|
||||
|
@ -341,8 +341,6 @@ int local_coord_release_job(orte_jobid_t jobid)
|
||||
******************/
|
||||
static int snapc_full_local_start_hnp_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
/*
|
||||
* Global Coordinator: Do not register a Local listener
|
||||
*/
|
||||
@ -360,20 +358,12 @@ static int snapc_full_local_start_hnp_listener(void)
|
||||
/*
|
||||
* Coordinator command listener
|
||||
*/
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_SNAPC_FULL,
|
||||
ORTE_RML_PERSISTENT,
|
||||
snapc_full_local_hnp_cmd_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC_FULL,
|
||||
ORTE_RML_PERSISTENT, snapc_full_local_hnp_cmd_recv, NULL);
|
||||
|
||||
snapc_local_hnp_recv_issued = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int snapc_full_local_stop_hnp_listener(void)
|
||||
@ -400,8 +390,6 @@ static int snapc_full_local_stop_hnp_listener(void)
|
||||
|
||||
static int snapc_full_local_start_app_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if (snapc_local_app_recv_issued) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
@ -412,20 +400,12 @@ static int snapc_full_local_start_app_listener(void)
|
||||
/*
|
||||
* Coordinator command listener
|
||||
*/
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_SNAPC,
|
||||
ORTE_RML_PERSISTENT,
|
||||
snapc_full_local_app_cmd_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SNAPC,
|
||||
ORTE_RML_PERSISTENT, snapc_full_local_app_cmd_recv,
|
||||
NULL);
|
||||
|
||||
snapc_local_app_recv_issued = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int snapc_full_local_stop_app_listener(void)
|
||||
|
@ -477,6 +477,8 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
|
||||
"sstore:central:(app): pull() from %s -> %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_DAEMON)));
|
||||
#ifdef ENABLE_FT_FIXED
|
||||
/* This is the old, now broken code */
|
||||
if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer(ORTE_PROC_MY_DAEMON,
|
||||
&buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
@ -485,6 +487,10 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
#endif /* ENABLE_FT_FIXED */
|
||||
|
||||
orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
0, orte_rml_recv_callback, NULL);
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
|
||||
|
@ -800,26 +800,15 @@ static orte_sstore_central_global_snapshot_info_t *find_handle_info_from_ref(cha
|
||||
|
||||
static int sstore_central_global_start_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if( is_global_listener_active ) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
ORTE_RML_PERSISTENT,
|
||||
sstore_central_global_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
ORTE_RML_PERSISTENT, sstore_central_global_recv, NULL);
|
||||
|
||||
is_global_listener_active = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int sstore_central_global_stop_listener(void)
|
||||
|
@ -619,26 +619,15 @@ static orte_sstore_central_local_app_snapshot_info_t *find_app_handle_info(orte_
|
||||
|
||||
static int sstore_central_local_start_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if( is_global_listener_active ) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
ORTE_RML_PERSISTENT,
|
||||
orte_sstore_central_local_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
ORTE_RML_PERSISTENT, orte_sstore_central_local_recv, NULL);
|
||||
|
||||
is_global_listener_active = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int sstore_central_local_stop_listener(void)
|
||||
|
@ -466,6 +466,8 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
|
||||
"sstore:stage:(app): pull() from %s -> %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_DAEMON)));
|
||||
#ifdef ENABLE_FT_FIXED
|
||||
/* This is the old, now broken code */
|
||||
if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer(ORTE_PROC_MY_DAEMON,
|
||||
&buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
@ -475,6 +477,9 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
#endif /* ENABLE_FT_FIXED */
|
||||
orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
0, orte_rml_recv_callback, NULL);
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
|
@ -992,26 +992,15 @@ static orte_sstore_stage_global_snapshot_info_t *find_handle_info(orte_sstore_ba
|
||||
|
||||
static int sstore_stage_global_start_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if( is_global_listener_active ) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
ORTE_RML_PERSISTENT,
|
||||
sstore_stage_global_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
ORTE_RML_PERSISTENT, sstore_stage_global_recv, NULL);
|
||||
|
||||
is_global_listener_active = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int sstore_stage_global_stop_listener(void)
|
||||
|
@ -1043,26 +1043,15 @@ static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_ss
|
||||
|
||||
static int sstore_stage_local_start_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if( is_global_listener_active ) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
ORTE_RML_PERSISTENT,
|
||||
sstore_stage_local_recv,
|
||||
NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
ORTE_RML_PERSISTENT, sstore_stage_local_recv, NULL);
|
||||
|
||||
is_global_listener_active = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int sstore_stage_local_stop_listener(void)
|
||||
|
@ -652,21 +652,11 @@ static int ckpt_finalize(void) {
|
||||
|
||||
static int start_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_CKPT,
|
||||
ORTE_RML_PERSISTENT,
|
||||
hnp_receiver,
|
||||
NULL))) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_CKPT,
|
||||
ORTE_RML_PERSISTENT, hnp_receiver, NULL);
|
||||
|
||||
listener_started = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int stop_listener(void)
|
||||
|
@ -513,21 +513,11 @@ static int tool_finalize(void) {
|
||||
|
||||
static int start_listener(void)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_MIGRATE,
|
||||
ORTE_RML_PERSISTENT,
|
||||
hnp_receiver,
|
||||
NULL))) {
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MIGRATE,
|
||||
ORTE_RML_PERSISTENT, hnp_receiver, NULL);
|
||||
|
||||
listener_started = true;
|
||||
|
||||
cleanup:
|
||||
return exit_status;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int stop_listener(void)
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user