1
1

SNAPC: use ORTE_WAIT_FOR_COMPLETION with non-blocking receives

During the commits to make the C/R code compile again the
blocking receive calls in snapc_full_app.c were
replaced by non-blocking receive calls.
This commit adds ORTE_WAIT_FOR_COMPLETION()
after each non-blocking receive to wait for the data.

This commit was SVN r30487.
Этот коммит содержится в:
Adrian Reber 2014-01-29 20:46:14 +00:00
родитель fa1036f38c
Коммит 5f95db3902

Просмотреть файл

@ -99,12 +99,6 @@ static int current_cr_state = OPAL_CRS_NONE;
static orte_sstore_base_handle_t current_ss_handle = ORTE_SSTORE_HANDLE_INVALID, last_ss_handle = ORTE_SSTORE_HANDLE_INVALID;
static opal_crs_base_ckpt_options_t *current_options = NULL;
static void snapc_full_app_callback_recv(int status,
orte_process_name_t* sender,
opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata);
/************************
* Function Definitions
************************/
@ -224,6 +218,7 @@ int app_coord_finalize()
opal_buffer_t *buffer = NULL;
orte_std_cntr_t count;
orte_grpcomm_collective_t *coll;
orte_rml_recv_cb_t *rb = NULL;
/*
* All processes must sync here, so the Global coordinator can know that
@ -281,6 +276,9 @@ int app_coord_finalize()
goto cleanup;
}
/* buffer should not be released here; the callback releases it */
buffer = NULL;
OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
"app) Shutdown Barrier: Waiting on FIN_ACK...!"));
@ -288,28 +286,20 @@ int app_coord_finalize()
* We could have been checkpointing just as we entered finalize, so we
* need to wait until the checkpoint is finished before finishing.
*/
buffer = OBJ_NEW(opal_buffer_t);
#ifdef ENABLE_FT_FIXED
/* This is the old, now broken code */
if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
rb = OBJ_NEW(orte_rml_recv_cb_t);
rb->active = true;
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
ORTE_WAIT_FOR_COMPLETION(rb->active);
/* wait for completion */
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -337,6 +327,10 @@ int app_coord_finalize()
OBJ_RELEASE(buffer);
buffer = NULL;
}
if (NULL != rb) {
OBJ_RELEASE(rb);
rb = NULL;
}
OBJ_RELEASE(coll);
@ -1301,6 +1295,7 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
opal_buffer_t *buffer = NULL;
orte_std_cntr_t count;
orte_rml_recv_cb_t *rb = NULL;
int op_event, op_state;
char *seq_str = NULL, *tmp_str = NULL;
int cr_state = OPAL_CRS_CONTINUE;
@ -1535,39 +1530,30 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
goto cleanup;
}
buffer = OBJ_NEW(opal_buffer_t);
/*
* Wait for a response regarding completion
*/
#ifdef ENABLE_FT_FIXED
/* This is the old, now broken code */
if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
/* wait for completion */
rb = OBJ_NEW(orte_rml_recv_cb_t);
rb->active = true;
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
ORTE_WAIT_FOR_COMPLETION(rb->active);
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -1608,39 +1594,31 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
"App) Request_op: Leader waiting for Migrate release (%3d)...",
datum->event));
buffer = OBJ_NEW(opal_buffer_t);
/*
* Wait for a response regarding completion
*/
#ifdef ENABLE_FT_FIXED
/* This is the old, now broken code */
if (0 > (ret = orte_rml.recv_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
/* wait for completion */
rb = OBJ_NEW(orte_rml_recv_cb_t);
rb->active = true;
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
ORTE_WAIT_FOR_COMPLETION(rb->active);
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -1683,6 +1661,10 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
OBJ_RELEASE(buffer);
buffer = NULL;
}
if (NULL != rb) {
OBJ_RELEASE(rb);
rb = NULL;
}
if( NULL != seq_str ) {
free(seq_str);
@ -1696,12 +1678,3 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
return exit_status;
}
/* dummy implementation of a callback function to get it to compile again */
static void snapc_full_app_callback_recv(int status,
orte_process_name_t* sender,
opal_buffer_t* buffer,
orte_rml_tag_t tag,
void* cbdata)
{
}