A bit of cleanup for async case... Still one bug in there.
This commit was SVN r14197.
Этот коммит содержится в:
родитель
120cf76ad8
Коммит
71937c3eaf
@ -1554,9 +1554,6 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
|
||||
* receive multiple request complete calls for the
|
||||
* same request.
|
||||
*/
|
||||
#if JJH_DEBUG == 1
|
||||
printf("JJH - Special Case Send file %s line %d\n", __FILE__, __LINE__);
|
||||
#endif
|
||||
exit_status = OMPI_SUCCESS;
|
||||
goto DONE;
|
||||
}
|
||||
@ -1608,9 +1605,6 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
|
||||
* receive multiple request complete calls for the
|
||||
* same request.
|
||||
*/
|
||||
#if JJH_DEBUG == 1
|
||||
printf("JJH - Special Case Send file %s line %d\n", __FILE__, __LINE__);
|
||||
#endif
|
||||
exit_status = OMPI_SUCCESS;
|
||||
goto DONE;
|
||||
}
|
||||
@ -1685,9 +1679,6 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
|
||||
* receive multiple request complete calls for the
|
||||
* same request.
|
||||
*/
|
||||
#if JJH_DEBUG == 1
|
||||
printf("JJH - Special Case Send file %s line %d\n", __FILE__, __LINE__);
|
||||
#endif
|
||||
exit_status = OMPI_SUCCESS;
|
||||
goto DONE;
|
||||
}
|
||||
@ -1736,9 +1727,6 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
|
||||
* receive multiple request complete calls for the
|
||||
* same request.
|
||||
*/
|
||||
#if JJH_DEBUG == 1
|
||||
printf("JJH - Special Case Send file %s line %d\n", __FILE__, __LINE__);
|
||||
#endif
|
||||
exit_status = OMPI_SUCCESS;
|
||||
goto DONE;
|
||||
}
|
||||
@ -2400,68 +2388,138 @@ static int ft_event_check_bookmarks(void) {
|
||||
ompi_crcp_coord_pml_bookmark_proc_t *peer_ref;
|
||||
peer_ref = (ompi_crcp_coord_pml_bookmark_proc_t*)item;
|
||||
|
||||
/* Check P_n --> P_m
|
||||
* Has the peer received all the messages that I have put on the wire?
|
||||
*/
|
||||
p_n_to_p_m = (peer_ref->total_send_msgs +
|
||||
peer_ref->total_isend_msgs +
|
||||
peer_ref->total_send_init_msgs );
|
||||
p_n_from_p_m = (peer_ref->matched_recv_msgs +
|
||||
peer_ref->matched_irecv_msgs +
|
||||
peer_ref->matched_recv_init_msgs );
|
||||
if( p_n_to_p_m > p_n_from_p_m) {
|
||||
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: [%lu,%lu,%lu] --> [%lu,%lu,%lu] "
|
||||
"Sent Msgs (%4d) = Received Msgs (%4d). Peer needs %4d.\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name),
|
||||
ORTE_NAME_ARGS(&(peer_ref->proc_name)),
|
||||
p_n_to_p_m,
|
||||
p_n_from_p_m,
|
||||
(p_n_to_p_m - p_n_from_p_m)
|
||||
);
|
||||
/*
|
||||
* Tell the peer what the outstanding messages looked like.
|
||||
* Since we can't tell which ones they are, we need to send the
|
||||
* information for all of the messages since the last checkpoint
|
||||
if( 0 == (peer_ref->proc_name.vpid) % 2) {
|
||||
/* Check P_n --> P_m
|
||||
* Has the peer received all the messages that I have put on the wire?
|
||||
*/
|
||||
if( OMPI_SUCCESS != (ret = ft_event_send_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
|
||||
opal_output(mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: Unable to send message details to peer [%lu,%lu,%lu]: Return %d\n",
|
||||
ORTE_NAME_ARGS(&peer_ref->proc_name),
|
||||
ret);
|
||||
return ret;
|
||||
p_n_to_p_m = (peer_ref->total_send_msgs +
|
||||
peer_ref->total_isend_msgs +
|
||||
peer_ref->total_send_init_msgs );
|
||||
p_n_from_p_m = (peer_ref->matched_recv_msgs +
|
||||
peer_ref->matched_irecv_msgs +
|
||||
peer_ref->matched_recv_init_msgs );
|
||||
if( p_n_to_p_m > p_n_from_p_m) {
|
||||
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: [%lu,%lu,%lu] --> [%lu,%lu,%lu] "
|
||||
"Sent Msgs (%4d) = Received Msgs (%4d). Peer needs %4d.\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name),
|
||||
ORTE_NAME_ARGS(&(peer_ref->proc_name)),
|
||||
p_n_to_p_m,
|
||||
p_n_from_p_m,
|
||||
(p_n_to_p_m - p_n_from_p_m)
|
||||
);
|
||||
/*
|
||||
* Tell the peer what the outstanding messages looked like.
|
||||
* Since we can't tell which ones they are, we need to send the
|
||||
* information for all of the messages since the last checkpoint
|
||||
*/
|
||||
if( OMPI_SUCCESS != (ret = ft_event_send_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
|
||||
opal_output(mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: Unable to send message details to peer [%lu,%lu,%lu]: Return %d\n",
|
||||
ORTE_NAME_ARGS(&peer_ref->proc_name),
|
||||
ret);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/* Check P_n <-- P_m
|
||||
* Have I received all the messages that my peer has put on the wire?
|
||||
*/
|
||||
p_n_to_p_m = (peer_ref->matched_send_msgs +
|
||||
peer_ref->matched_isend_msgs +
|
||||
peer_ref->matched_send_init_msgs );
|
||||
p_n_from_p_m = (peer_ref->total_recv_msgs +
|
||||
peer_ref->total_irecv_msgs +
|
||||
peer_ref->total_recv_init_msgs );
|
||||
|
||||
if( p_n_to_p_m > p_n_from_p_m) {
|
||||
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: [%lu,%lu,%lu] <-- [%lu,%lu,%lu] "
|
||||
"Received Msgs (%4d) = Sent Msgs (%4d). I need %4d.\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name),
|
||||
ORTE_NAME_ARGS(&(peer_ref->proc_name)),
|
||||
p_n_to_p_m,
|
||||
p_n_from_p_m,
|
||||
(p_n_to_p_m - p_n_from_p_m)
|
||||
);
|
||||
/*
|
||||
* Receive from peer the datatypes of the outstanding sends
|
||||
* As we figure out what they are post Irecv's for them into a drained buffer list.
|
||||
*/
|
||||
if( OMPI_SUCCESS != (ret = ft_event_recv_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
|
||||
opal_output(mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: Unable to recv message details from peer [%lu,%lu,%lu]: Return %d\n",
|
||||
ORTE_NAME_ARGS(&peer_ref->proc_name),
|
||||
ret);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Check P_n <-- P_m
|
||||
* Have I received all the messages that my peer has put on the wire?
|
||||
*/
|
||||
p_n_to_p_m = (peer_ref->matched_send_msgs +
|
||||
peer_ref->matched_isend_msgs +
|
||||
peer_ref->matched_send_init_msgs );
|
||||
p_n_from_p_m = (peer_ref->total_recv_msgs +
|
||||
peer_ref->total_irecv_msgs +
|
||||
peer_ref->total_recv_init_msgs );
|
||||
if( p_n_to_p_m > p_n_from_p_m) {
|
||||
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: [%lu,%lu,%lu] <-- [%lu,%lu,%lu] "
|
||||
"Received Msgs (%4d) = Sent Msgs (%4d). I need %4d.\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name),
|
||||
ORTE_NAME_ARGS(&(peer_ref->proc_name)),
|
||||
p_n_to_p_m,
|
||||
p_n_from_p_m,
|
||||
(p_n_to_p_m - p_n_from_p_m)
|
||||
);
|
||||
/*
|
||||
* Receive from peer the datatypes of the outstanding sends
|
||||
* As we figure out what they are post Irecv's for them into a drained buffer list.
|
||||
else { /* Odd */
|
||||
/* Check P_n <-- P_m
|
||||
* Have I received all the messages that my peer has put on the wire?
|
||||
*/
|
||||
if( OMPI_SUCCESS != (ret = ft_event_recv_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
|
||||
opal_output(mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: Unable to recv message details from peer [%lu,%lu,%lu]: Return %d\n",
|
||||
ORTE_NAME_ARGS(&peer_ref->proc_name),
|
||||
ret);
|
||||
return ret;
|
||||
p_n_to_p_m = (peer_ref->matched_send_msgs +
|
||||
peer_ref->matched_isend_msgs +
|
||||
peer_ref->matched_send_init_msgs );
|
||||
p_n_from_p_m = (peer_ref->total_recv_msgs +
|
||||
peer_ref->total_irecv_msgs +
|
||||
peer_ref->total_recv_init_msgs );
|
||||
|
||||
if( p_n_to_p_m > p_n_from_p_m) {
|
||||
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: [%lu,%lu,%lu] <-- [%lu,%lu,%lu] "
|
||||
"Received Msgs (%4d) = Sent Msgs (%4d). I need %4d.\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name),
|
||||
ORTE_NAME_ARGS(&(peer_ref->proc_name)),
|
||||
p_n_to_p_m,
|
||||
p_n_from_p_m,
|
||||
(p_n_to_p_m - p_n_from_p_m)
|
||||
);
|
||||
/*
|
||||
* Receive from peer the datatypes of the outstanding sends
|
||||
* As we figure out what they are post Irecv's for them into a drained buffer list.
|
||||
*/
|
||||
if( OMPI_SUCCESS != (ret = ft_event_recv_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
|
||||
opal_output(mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: Unable to recv message details from peer [%lu,%lu,%lu]: Return %d\n",
|
||||
ORTE_NAME_ARGS(&peer_ref->proc_name),
|
||||
ret);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/* Check P_n --> P_m
|
||||
* Has the peer received all the messages that I have put on the wire?
|
||||
*/
|
||||
p_n_to_p_m = (peer_ref->total_send_msgs +
|
||||
peer_ref->total_isend_msgs +
|
||||
peer_ref->total_send_init_msgs );
|
||||
p_n_from_p_m = (peer_ref->matched_recv_msgs +
|
||||
peer_ref->matched_irecv_msgs +
|
||||
peer_ref->matched_recv_init_msgs );
|
||||
if( p_n_to_p_m > p_n_from_p_m) {
|
||||
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: [%lu,%lu,%lu] --> [%lu,%lu,%lu] "
|
||||
"Sent Msgs (%4d) = Received Msgs (%4d). Peer needs %4d.\n",
|
||||
ORTE_NAME_ARGS(orte_process_info.my_name),
|
||||
ORTE_NAME_ARGS(&(peer_ref->proc_name)),
|
||||
p_n_to_p_m,
|
||||
p_n_from_p_m,
|
||||
(p_n_to_p_m - p_n_from_p_m)
|
||||
);
|
||||
/*
|
||||
* Tell the peer what the outstanding messages looked like.
|
||||
* Since we can't tell which ones they are, we need to send the
|
||||
* information for all of the messages since the last checkpoint
|
||||
*/
|
||||
if( OMPI_SUCCESS != (ret = ft_event_send_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
|
||||
opal_output(mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: check_bookmarks: Unable to send message details to peer [%lu,%lu,%lu]: Return %d\n",
|
||||
ORTE_NAME_ARGS(&peer_ref->proc_name),
|
||||
ret);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2533,7 +2591,7 @@ static int ft_event_send_msg_details(ompi_crcp_coord_pml_bookmark_proc_t *peer_r
|
||||
ompi_crcp_coord_pml_drain_msg_ack_ref_t * d_msg_ack = NULL;
|
||||
orte_buffer_t *buffer = NULL;
|
||||
int need;
|
||||
int32_t tmp_int = -1;
|
||||
int32_t req_more = -1;
|
||||
int comm_my_rank;
|
||||
int ctr = 0;
|
||||
opal_list_t *search_list;
|
||||
@ -2637,13 +2695,13 @@ static int ft_event_send_msg_details(ompi_crcp_coord_pml_bookmark_proc_t *peer_r
|
||||
/*
|
||||
* Pull out the communicator ID
|
||||
*/
|
||||
UNPACK_BUFFER(buffer, tmp_int, 1, ORTE_UINT32,
|
||||
UNPACK_BUFFER(buffer, req_more, 1, ORTE_UINT32,
|
||||
"crcp:coord: send_msg_details: Failed to unpack the ACK from peer buffer.");
|
||||
|
||||
/* Debug marker */
|
||||
msg_ref->matched = true;
|
||||
|
||||
if(tmp_int == 0) {
|
||||
if(req_more == 0) {
|
||||
/* Need to send more results ... */
|
||||
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
|
||||
"crcp:coord: send_msg_details: [%lu,%lu,%lu] --> [%lu,%lu,%lu] Need to send more results to peer..."
|
||||
@ -2689,7 +2747,7 @@ static int ft_event_send_msg_details(ompi_crcp_coord_pml_bookmark_proc_t *peer_r
|
||||
}
|
||||
|
||||
ALL_SENT:
|
||||
assert(tmp_int != 0);
|
||||
assert(req_more != 0);
|
||||
|
||||
/* Prepare to post a Recv for the ACK All Clear signal from the peer
|
||||
* which is sent when they have finished receiving all of the
|
||||
@ -2724,7 +2782,7 @@ static int ft_event_recv_msg_details(ompi_crcp_coord_pml_bookmark_proc_t *peer_r
|
||||
int ret, exit_status = OMPI_SUCCESS;
|
||||
int need;
|
||||
int found = 0;
|
||||
int32_t tmp = 0;
|
||||
int32_t req_more = 0;
|
||||
orte_buffer_t * buffer = NULL;
|
||||
ompi_crcp_coord_pml_message_ref_t *posted_msg_ref = NULL;
|
||||
opal_list_t *found_on_this_list = NULL;
|
||||
@ -2845,15 +2903,9 @@ static int ft_event_recv_msg_details(ompi_crcp_coord_pml_bookmark_proc_t *peer_r
|
||||
peer_ref->proc_name.jobid,
|
||||
peer_ref->proc_name.vpid);
|
||||
|
||||
#if 0 /* JJH Verify this */
|
||||
d_msg->count = p_msg_count;
|
||||
d_msg->datatype = ompi_ddt_create(p_msg_buffer_size);
|
||||
d_msg->ddt_size = (d_msg->datatype)->size;
|
||||
#else
|
||||
d_msg->count = p_msg_count * p_msg_buffer_size;
|
||||
ompi_ddt_duplicate(&ompi_mpi_packed, &(d_msg->datatype));
|
||||
d_msg->ddt_size = (d_msg->datatype)->size;
|
||||
#endif
|
||||
|
||||
d_msg->matched = true;
|
||||
d_msg->done = p_msg_done;
|
||||
@ -2963,11 +3015,11 @@ static int ft_event_recv_msg_details(ompi_crcp_coord_pml_bookmark_proc_t *peer_r
|
||||
*/
|
||||
if( need > found) {
|
||||
/* Ask for more */
|
||||
tmp = 0;
|
||||
req_more = 0;
|
||||
}
|
||||
else {
|
||||
/* Tell them we are all done */
|
||||
tmp = 1;
|
||||
req_more = 1;
|
||||
}
|
||||
|
||||
if( NULL != buffer) {
|
||||
@ -2980,7 +3032,7 @@ static int ft_event_recv_msg_details(ompi_crcp_coord_pml_bookmark_proc_t *peer_r
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
PACK_BUFFER(buffer, tmp, 1, ORTE_UINT32,
|
||||
PACK_BUFFER(buffer, req_more, 1, ORTE_UINT32,
|
||||
"crcp:coord: recv_msg_details: Unable to ask peer for more messages");
|
||||
|
||||
if ( 0 > ( ret = orte_rml.send_buffer(&peer_ref->proc_name, buffer, OMPI_CRCP_COORD_BOOKMARK_TAG, 0)) ) {
|
||||
@ -3112,10 +3164,6 @@ static bool ft_event_have_received_msg(uint32_t comm_id, int tag, size_t count,
|
||||
|
||||
if( NULL != *found_msg_ref) {
|
||||
*found_on_this_list = &(unknown_recv_from_list);
|
||||
#if 0 /* JJH Check this */
|
||||
opal_list_remove_item(&(unknown_recv_from_list), &((*found_msg_ref)->super));
|
||||
opal_list_append( &(peer_ref->irecv_list), &((*found_msg_ref)->super));
|
||||
#endif
|
||||
goto FOUND;
|
||||
}
|
||||
|
||||
@ -3137,10 +3185,6 @@ static bool ft_event_have_received_msg(uint32_t comm_id, int tag, size_t count,
|
||||
|
||||
if( NULL != *found_msg_ref) {
|
||||
*found_on_this_list = &(unknown_persist_recv_list);
|
||||
#if 0 /* JJH Check this */
|
||||
opal_list_remove_item(&(unknown_persist_recv_list), &((*found_msg_ref)->super));
|
||||
opal_list_append( &(peer_ref->recv_init_list), &((*found_msg_ref)->super));
|
||||
#endif
|
||||
goto FOUND;
|
||||
}
|
||||
|
||||
@ -3157,10 +3201,6 @@ static bool ft_event_have_received_msg(uint32_t comm_id, int tag, size_t count,
|
||||
|
||||
if( NULL != *found_msg_ref) {
|
||||
*found_on_this_list = &(unknown_persist_recv_list);
|
||||
#if 0 /* JJH Check this */
|
||||
opal_list_remove_item(&(unknown_persist_recv_list), &((*found_msg_ref)->super));
|
||||
opal_list_append( &(peer_ref->recv_init_list), &((*found_msg_ref)->super));
|
||||
#endif
|
||||
goto FOUND;
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user