1
1

SSTORE/CRCP: use ORTE_WAIT_FOR_COMPLETION with non-blocking receives

During the commits to make the C/R code compile again the
blocking receive calls were replaced by non-blocking
which broke the code. This patch uses ORTE_WAIT_FOR_COMPLETION()
to wait until the non-blocking calls have finished.

This commit was SVN r30486.
Этот коммит содержится в:
Adrian Reber 2014-01-29 20:30:35 +00:00
родитель d5c1e33900
Коммит fa1036f38c
3 изменённых файлов: 62 добавлений и 103 удалений

Просмотреть файл

@ -5514,6 +5514,7 @@ static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
{
int ret, exit_status = OMPI_SUCCESS;
opal_buffer_t *buffer = NULL;
orte_rml_recv_cb_t *rb = NULL;
int32_t recv_response = RECV_MATCH_RESP_ERROR;
int32_t num_resolv = -1;
int32_t p_total_found = -1;
@ -5582,46 +5583,24 @@ static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
exit_status = OMPI_ERROR;
goto cleanup;
}
if( NULL != buffer) {
OBJ_RELEASE(buffer);
buffer = NULL;
}
/*
* Check return value from peer to see if we found a match.
*/
if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
exit_status = OMPI_ERROR;
goto cleanup;
}
/*
* Recv the ACK msg
*/
#ifdef ENABLE_FT_FIXED
/* This is the old, now broken code */
if ( 0 > (ret = ompi_rte_recv_buffer(&peer_ref->proc_name, buffer,
OMPI_CRCP_COORD_BOOKMARK_TAG, 0) ) ) {
opal_output(mca_crcp_bkmrk_component.super.output_handle,
"crcp:bkmrk: do_send_msg_detail: %s --> %s Failed to receive ACK buffer from peer. Return %d\n",
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
OMPI_NAME_PRINT(&(peer_ref->proc_name)),
ret);
exit_status = ret;
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
rb = OBJ_NEW(orte_rml_recv_cb_t);
rb->active = true;
ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0,
orte_rml_recv_callback, NULL);
orte_rml_recv_callback, rb);
ORTE_WAIT_FOR_COMPLETION(rb->active);
UNPACK_BUFFER(buffer, recv_response, 1, OPAL_UINT32,
UNPACK_BUFFER(&rb->data, recv_response, 1, OPAL_UINT32,
"crcp:bkmrk: send_msg_details: Failed to unpack the ACK from peer buffer.");
UNPACK_BUFFER(buffer, num_resolv, 1, OPAL_UINT32,
UNPACK_BUFFER(&rb->data, num_resolv, 1, OPAL_UINT32,
"crcp:bkmrk: send_msg_details: Failed to unpack the num_resolv from peer buffer.");
UNPACK_BUFFER(buffer, p_total_found, 1, OPAL_UINT32,
UNPACK_BUFFER(&rb->data, p_total_found, 1, OPAL_UINT32,
"crcp:bkmrk: send_msg_details: Failed to unpack the total_found from peer buffer.");
OBJ_RELEASE(rb);
/* Mark message as matched */
msg_ref->matched += num_resolv;
*num_matches = num_resolv;
@ -5767,55 +5746,38 @@ static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
size_t *count, size_t *datatype_size,
int *p_num_sent)
{
opal_buffer_t * buffer = NULL;
orte_rml_recv_cb_t *rb = NULL;
int exit_status = OMPI_SUCCESS;
int ret;
if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
goto cleanup;
}
/*
* Recv the msg
*/
#ifdef ENABLE_FT_FIXED
/* This is the old, now broken code */
if ( 0 > (ret = ompi_rte_recv_buffer(&peer_ref->proc_name, buffer, OMPI_CRCP_COORD_BOOKMARK_TAG, 0) ) ) {
opal_output(mca_crcp_bkmrk_component.super.output_handle,
"crcp:bkmrk: do_recv_msg_detail: %s <-- %s Failed to receive buffer from peer. Return %d\n",
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
OMPI_NAME_PRINT(&(peer_ref->proc_name)),
ret);
exit_status = ret;
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0, orte_rml_recv_callback, NULL);
rb = OBJ_NEW(orte_rml_recv_cb_t);
rb->active = true;
ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0, orte_rml_recv_callback, rb);
ORTE_WAIT_FOR_COMPLETION(rb->active);
/* Pull out the communicator ID */
UNPACK_BUFFER(buffer, (*comm_id), 1, OPAL_UINT32,
UNPACK_BUFFER(&rb->data, (*comm_id), 1, OPAL_UINT32,
"crcp:bkmrk: recv_msg_details: Failed to unpack the communicator ID");
UNPACK_BUFFER(buffer, (*rank), 1, OPAL_INT,
UNPACK_BUFFER(&rb->data, (*rank), 1, OPAL_INT,
"crcp:bkmrk: recv_msg_details: Failed to unpack the communicator rank ID");
/* Pull out the message details */
UNPACK_BUFFER(buffer, (*tag), 1, OPAL_INT,
UNPACK_BUFFER(&rb->data, (*tag), 1, OPAL_INT,
"crcp:bkmrk: recv_msg_details: Failed to unpack the tag");
UNPACK_BUFFER(buffer, (*count), 1, OPAL_SIZE,
UNPACK_BUFFER(&rb->data, (*count), 1, OPAL_SIZE,
"crcp:bkmrk: recv_msg_details: Failed to unpack the count");
UNPACK_BUFFER(buffer, (*datatype_size), 1, OPAL_SIZE,
UNPACK_BUFFER(&rb->data, (*datatype_size), 1, OPAL_SIZE,
"crcp:bkmrk: recv_msg_details: Failed to unpack the datatype size");
/* Pull out the counts */
UNPACK_BUFFER(buffer, (*p_num_sent), 1, OPAL_INT,
UNPACK_BUFFER(&rb->data, (*p_num_sent), 1, OPAL_INT,
"crcp:bkmrk: recv_msg_details: Failed to unpack the sent count");
cleanup:
if( NULL != buffer) {
OBJ_RELEASE(buffer);
buffer = NULL;
}
cleanup:
OBJ_RELEASE(rb);
return exit_status;
}

Просмотреть файл

@ -443,6 +443,7 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
orte_sstore_central_cmd_flag_t command;
orte_std_cntr_t count;
orte_sstore_base_handle_t loc_id;
orte_rml_recv_cb_t* rb = NULL;
buffer = OBJ_NEW(opal_buffer_t);
@ -470,40 +471,32 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
goto cleanup;
}
/* buffer should not be released here; the callback releases it */
buffer = NULL;
/*
* Receive the response
*/
buffer = OBJ_NEW(opal_buffer_t);
OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
"sstore:central:(app): pull() from %s -> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_DAEMON)));
#ifdef ENABLE_FT_FIXED
/* This is the old, now broken code */
if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer(ORTE_PROC_MY_DAEMON,
&buffer,
ORTE_RML_TAG_SSTORE_INTERNAL,
0)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
rb = OBJ_NEW(orte_rml_recv_cb_t);
rb->active = true;
orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
0, orte_rml_recv_callback, NULL);
/* wait for completion */
0, orte_rml_recv_callback, rb);
ORTE_WAIT_FOR_COMPLETION(rb->active);
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -513,28 +506,28 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->seq_num), &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->local_location), &count, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->local_location), &count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -553,6 +546,10 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
OBJ_RELEASE(buffer);
buffer = NULL;
}
if (NULL != rb) {
OBJ_RELEASE(rb);
buffer = NULL;
}
return exit_status;
}

Просмотреть файл

@ -432,6 +432,7 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
orte_sstore_stage_cmd_flag_t command;
orte_std_cntr_t count;
orte_sstore_base_handle_t loc_id;
orte_rml_recv_cb_t *rb = NULL;
buffer = OBJ_NEW(opal_buffer_t);
@ -459,37 +460,32 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
goto cleanup;
}
/* buffer should not be released here; the callback releases it */
buffer = NULL;
/*
* Receive the response
*/
buffer = OBJ_NEW(opal_buffer_t);
OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
"sstore:stage:(app): pull() from %s -> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_DAEMON)));
#ifdef ENABLE_FT_FIXED
/* This is the old, now broken code */
if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer(ORTE_PROC_MY_DAEMON,
&buffer,
ORTE_RML_TAG_SSTORE_INTERNAL,
0)) ) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
rb = OBJ_NEW(orte_rml_recv_cb_t);
rb->active = true;
orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
0, orte_rml_recv_callback, NULL);
0, orte_rml_recv_callback, rb);
ORTE_WAIT_FOR_COMPLETION(rb->active);
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -499,28 +495,28 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->seq_num), &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->local_location), &count, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->local_location), &count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -531,6 +527,10 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
OBJ_RELEASE(buffer);
buffer = NULL;
}
if (NULL != rb) {
OBJ_RELEASE(rb);
buffer = NULL;
}
return exit_status;
}