diff --git a/orte/mca/snapc/full/snapc_full_app.c b/orte/mca/snapc/full/snapc_full_app.c index db8ad0243c..8a31f41035 100644 --- a/orte/mca/snapc/full/snapc_full_app.c +++ b/orte/mca/snapc/full/snapc_full_app.c @@ -99,12 +99,6 @@ static int current_cr_state = OPAL_CRS_NONE; static orte_sstore_base_handle_t current_ss_handle = ORTE_SSTORE_HANDLE_INVALID, last_ss_handle = ORTE_SSTORE_HANDLE_INVALID; static opal_crs_base_ckpt_options_t *current_options = NULL; -static void snapc_full_app_callback_recv(int status, - orte_process_name_t* sender, - opal_buffer_t* buffer, - orte_rml_tag_t tag, - void* cbdata); - /************************ * Function Definitions ************************/ @@ -224,6 +218,7 @@ int app_coord_finalize() opal_buffer_t *buffer = NULL; orte_std_cntr_t count; orte_grpcomm_collective_t *coll; + orte_rml_recv_cb_t *rb = NULL; /* * All processes must sync here, so the Global coordinator can know that @@ -281,6 +276,9 @@ int app_coord_finalize() goto cleanup; } + /* buffer should not be released here; the callback releases it */ + buffer = NULL; + OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle, "app) Shutdown Barrier: Waiting on FIN_ACK...!")); @@ -288,28 +286,20 @@ int app_coord_finalize() * We could have been checkpointing just as we entered finalize, so we * need to wait until the checkpoint is finished before finishing. */ - buffer = OBJ_NEW(opal_buffer_t); -#ifdef ENABLE_FT_FIXED - /* This is the old, now broken code */ - if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) { - ORTE_ERROR_LOG(ret); - exit_status = ret; - OBJ_DESTRUCT(&buffer); - goto cleanup; - } -#endif /* ENABLE_FT_FIXED */ - orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer); + rb = OBJ_NEW(orte_rml_recv_cb_t); + rb->active = true; + orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb); + ORTE_WAIT_FOR_COMPLETION(rb->active); - /* wait for completion */ count = 1; - if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) { + if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; } count = 1; - if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) { + if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; @@ -337,6 +327,10 @@ int app_coord_finalize() OBJ_RELEASE(buffer); buffer = NULL; } + if (NULL != rb) { + OBJ_RELEASE(rb); + rb = NULL; + } OBJ_RELEASE(coll); @@ -1301,6 +1295,7 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum) orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD; opal_buffer_t *buffer = NULL; orte_std_cntr_t count; + orte_rml_recv_cb_t *rb = NULL; int op_event, op_state; char *seq_str = NULL, *tmp_str = NULL; int cr_state = OPAL_CRS_CONTINUE; @@ -1535,39 +1530,30 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum) goto cleanup; } - buffer = OBJ_NEW(opal_buffer_t); - /* * Wait for a response regarding completion */ -#ifdef ENABLE_FT_FIXED - /* This is the old, now broken code */ - if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) { - ORTE_ERROR_LOG(ret); - exit_status = ret; - OBJ_DESTRUCT(&buffer); - goto cleanup; - } -#endif /* ENABLE_FT_FIXED */ - orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer); - /* wait for completion */ + rb = OBJ_NEW(orte_rml_recv_cb_t); + rb->active = true; + orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb); + ORTE_WAIT_FOR_COMPLETION(rb->active); count = 1; - if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) { + if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; } count = 1; - if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) { + if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; } count = 1; - if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) { + if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; @@ -1608,39 +1594,31 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum) "App) Request_op: Leader waiting for Migrate release (%3d)...", datum->event)); - buffer = OBJ_NEW(opal_buffer_t); /* * Wait for a response regarding completion */ -#ifdef ENABLE_FT_FIXED - /* This is the old, now broken code */ - if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) { - ORTE_ERROR_LOG(ret); - exit_status = ret; - OBJ_DESTRUCT(&buffer); - goto cleanup; - } -#endif /* ENABLE_FT_FIXED */ - orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer); - /* wait for completion */ + rb = OBJ_NEW(orte_rml_recv_cb_t); + rb->active = true; + orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb); + ORTE_WAIT_FOR_COMPLETION(rb->active); count = 1; - if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) { + if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; } count = 1; - if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) { + if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; } count = 1; - if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) { + if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) { ORTE_ERROR_LOG(ret); exit_status = ret; goto cleanup; @@ -1683,6 +1661,10 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum) OBJ_RELEASE(buffer); buffer = NULL; } + if (NULL != rb) { + OBJ_RELEASE(rb); + rb = NULL; + } if( NULL != seq_str ) { free(seq_str); @@ -1696,12 +1678,3 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum) return exit_status; } - -/* dummy implementation of a callback function to get it to compile again */ -static void snapc_full_app_callback_recv(int status, - orte_process_name_t* sender, - opal_buffer_t* buffer, - orte_rml_tag_t tag, - void* cbdata) -{ -}