1
1

- Remove an unnecessary barrier

- verbose -> VERBOSE just for the fun of it

This commit was SVN r16811.
Этот коммит содержится в:
Josh Hursey 2007-11-30 22:26:18 +00:00
родитель ff6bc1364b
Коммит 5fb83a4f10

Просмотреть файл

@ -264,14 +264,6 @@ static int ft_event_finalize_exchange(void);
*/
static int ft_event_exchange_bookmarks(void);
/*
* A basic barrier -- JJH Improve this
*/
static int coord_basic_barrier(void);
static int coord_basic_barrier_send(int idx);
static int coord_basic_barrier_recv(int idx);
/*
* Send Bookmarks to peer
*/
@ -428,12 +420,11 @@ static int wait_quiesce_drain_ack(void);
#define CRCP_TIMER_CKPT_CK_B 2
#define CRCP_TIMER_CKPT_POST 3
#define CRCP_TIMER_CKPT_WAIT 4
#define CRCP_TIMER_CKPT_BARR 5
#define CRCP_TIMER_CONT 6
#define CRCP_TIMER_CKPT_PEER_S 7
#define CRCP_TIMER_CKPT_PEER_R 8
#define CRCP_TIMER_CKPT_WAIT_B 9
#define CRCP_TIMER_MAX 10
#define CRCP_TIMER_CONT 5
#define CRCP_TIMER_CKPT_PEER_S 6
#define CRCP_TIMER_CKPT_PEER_R 7
#define CRCP_TIMER_CKPT_WAIT_B 8
#define CRCP_TIMER_MAX 9
static double get_time(void);
static void start_time(int idx);
@ -818,7 +809,6 @@ int ompi_crcp_coord_pml_init(void) {
timer_label[CRCP_TIMER_CKPT_CK_B] = strdup("Ckpt Check");
timer_label[CRCP_TIMER_CKPT_POST] = strdup("Ckpt Post");
timer_label[CRCP_TIMER_CKPT_WAIT] = strdup("Ckpt Wait");
timer_label[CRCP_TIMER_CKPT_BARR] = strdup("Ckpt Barrier");
timer_label[CRCP_TIMER_CONT] = strdup("Continue");
timer_label[CRCP_TIMER_CKPT_PEER_S] = strdup("Peer Send");
timer_label[CRCP_TIMER_CKPT_PEER_R] = strdup("Peer Recv");
@ -852,8 +842,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_enable(
bool enable,
ompi_crcp_base_pml_state_t* pml_state )
{
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_enable()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_enable()"));
pml_state->error_code = OMPI_SUCCESS;
return pml_state;
}
@ -862,8 +852,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_enable(
ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_progress(
ompi_crcp_base_pml_state_t* pml_state)
{
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_progress()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_progress()"));
pml_state->error_code = OMPI_SUCCESS;
return pml_state;
}
@ -880,8 +870,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_iprobe(
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe(%d, %d)", dst, tag);
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe(%d, %d)", dst, tag));
/*
* Before PML Call
@ -910,8 +900,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_iprobe(
* - Mark the 'matched' flag as true
*/
if( NULL != drain_msg_ref ) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe(): Matched a drained message...");
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe(): Matched a drained message..."));
/* Copy the status information */
if( MPI_STATUS_IGNORE != status ) {
@ -950,8 +940,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_probe(
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_probe(%d, %d)", dst, tag);
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_probe(%d, %d)", dst, tag));
/*
* Before PML Call
@ -979,8 +969,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_probe(
* - Copy of the status structure to pass back to the user
*/
if( NULL != drain_msg_ref ) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe(): Matched a drained message...");
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe(): Matched a drained message..."));
/* Copy the status information */
if( MPI_STATUS_IGNORE != status ) {
@ -1005,8 +995,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_dump(
int verbose,
ompi_crcp_base_pml_state_t* pml_state )
{
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_dump()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_dump()"));
pml_state->error_code = OMPI_SUCCESS;
return pml_state;
}
@ -1017,8 +1007,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_add_comm(
struct ompi_communicator_t* comm,
ompi_crcp_base_pml_state_t* pml_state )
{
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_add_comm()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_add_comm()"));
pml_state->error_code = OMPI_SUCCESS;
return pml_state;
}
@ -1027,8 +1017,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_del_comm(
struct ompi_communicator_t* comm,
ompi_crcp_base_pml_state_t* pml_state )
{
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_del_comm()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_del_comm()"));
pml_state->error_code = OMPI_SUCCESS;
return pml_state;
}
@ -1046,8 +1036,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_add_procs(
goto DONE;
}
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_add_procs()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_add_procs()"));
/*
* Save pointers to the wrapped PML
@ -1086,8 +1076,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_del_procs(
goto DONE;
}
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_del_procs()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_del_procs()"));
for( i = 0; i < nprocs; ++i) {
item = (opal_list_item_t*)find_peer(procs[i]->proc_name);
@ -1126,8 +1116,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_isend_init(
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_isend_init()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_isend_init()"));
/*
* Before the PML gets the message:
@ -1220,8 +1210,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_isend(
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_isend()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_isend()"));
/*
* Before the PML gets the message:
@ -1315,8 +1305,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_send(
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_send()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_send()"));
/*
* Before the PML gets the message:
@ -1414,8 +1404,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_irecv_init(
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_irecv_init()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_irecv_init()"));
/*
* Before PML Call
@ -1528,8 +1518,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_irecv(
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_irecv()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_irecv()"));
/*
* Before PML Call
@ -1559,11 +1549,11 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_irecv(
* when we originally drained the message.
*/
if( NULL != drain_msg_ref ) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_irecv(): Matched a drained message. "
"[%3d, %3d] vs [%3d, %3d]",
(int)datatype->size, (int)count,
(int)drain_msg_ref->ddt_size, (int)drain_msg_ref->count);
(int)drain_msg_ref->ddt_size, (int)drain_msg_ref->count));
/* Copy the drained message */
src = drain_msg_ref->rank;
@ -1717,8 +1707,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_recv(
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_recv()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_recv()"));
/*
* Before PML Call
@ -1748,8 +1738,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_recv(
* when we originally drained the message.
*/
if( NULL != drain_msg_ref ) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_recv(): Matched a drained message...");
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_recv(): Matched a drained message..."));
/* Copy the drained message */
src = drain_msg_ref->rank;
@ -1939,8 +1929,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_start(
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_start()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_start()"));
/*
* Check from drained recv message before PML gets a chance to do anything
@ -2070,9 +2060,9 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_start(
}
}
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_start: Matched a Recv_init: total = %d",
peer_ref->total_recv_init_msgs);
peer_ref->total_recv_init_msgs));
/* Copy the drained message */
@ -2364,8 +2354,8 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
int src;
int tag;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_request_complete()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_request_complete()"));
/*
* Extract the PML version of the request
@ -2441,9 +2431,9 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
goto DONE;
}
if( NULL != msg_ref ) {
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: req_complete: Matched an iSend: total = %d",
peer_ref->total_isend_msgs);
peer_ref->total_isend_msgs));
goto FOUND;
}
else {
@ -2535,9 +2525,9 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
peer_ref->total_irecv_msgs += 1;
}
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: req_complete: Matched an iRecv: total = %d",
peer_ref->total_irecv_msgs);
peer_ref->total_irecv_msgs));
goto FOUND;
}
@ -2573,9 +2563,9 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
peer_ref->total_irecv_msgs += 1;
}
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: req_complete: Matched an iRecv: total = %d",
peer_ref->total_irecv_msgs);
peer_ref->total_irecv_msgs));
goto FOUND;
}
else {
@ -2687,9 +2677,9 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
msg_ref->done = true;
msg_ref->active = false;
opal_output_verbose(25, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((25, mca_crcp_coord_component.super.output_handle,
"crcp:coord: req_complete: Marked Message... ( %d, %d )\n",
peer_ref->total_isend_msgs, peer_ref->total_irecv_msgs);
peer_ref->total_isend_msgs, peer_ref->total_irecv_msgs));
}
else {
/*
@ -2697,10 +2687,10 @@ int ompi_crcp_coord_request_complete(struct ompi_request_t *request)
* so this case can occur during normal operation.
* This is caused by us checking for completeness twice in ompi_request_wait_all.
*/
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
peer_ref->total_isend_msgs, peer_ref->total_irecv_msgs,
breq->req_peer, src, breq->req_comm->c_contextid);
breq->req_peer, src, breq->req_comm->c_contextid));
}
DONE:
@ -2722,8 +2712,8 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_ft_event(
goto STEP_1;
}
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_ft_event()");
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_ft_event()"));
/**************************
* Prepare for a Checkpoint
@ -2750,9 +2740,9 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_ft_event(
}
if( stall_for_completion ) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_ft_event: STALLING PID %d\n",
getpid());
getpid()));
stall_for_completion = false;
opal_cr_stall_check = true;
@ -2887,11 +2877,11 @@ static int find_drained_msg(size_t ddt_size,
drain_msg = (ompi_crcp_coord_pml_message_ref_t*)item;
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: find_drain_msg(): Compare [%d, %d, %d, %d] to %c-[%d, %d, %d, %d]",
(int)ddt_size, (int)count, tag, peer,
(NULL == drain_msg->buffer ? 'T' : 'F'),
(int)drain_msg->ddt_size, (int)drain_msg->count, (int)drain_msg->tag, (int)drain_msg->rank);
(int)drain_msg->ddt_size, (int)drain_msg->count, (int)drain_msg->tag, (int)drain_msg->rank));
/* If the buffer is invalid then this is not a valid message or
* has not been completed draining just yet */
@ -3001,9 +2991,9 @@ static int ft_event_coordinate_peers(void)
* Check if we need to stall for completion of tasks
*/
if( stall_for_completion ) {
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: ft_event_coordinate_peers: %s **** STALLING ***",
ORTE_NAME_PRINT(orte_process_info.my_name));
ORTE_NAME_PRINT(orte_process_info.my_name)));
step_to_return_to = 1;
exit_status = OMPI_SUCCESS;
goto DONE;
@ -3027,25 +3017,9 @@ static int ft_event_coordinate_peers(void)
}
END_TIMER(CRCP_TIMER_CKPT_WAIT);
/*
* Exchange Bookmarks with peers -- JJH -- Improve this
* - This serves as finish barrier
* We need a barrier here to keep peers from sending to us before we have taken
* our checkpoint. Ideally this would not happen, but needs some futher investigation.
*/
START_TIMER(CRCP_TIMER_CKPT_BARR);
if( OMPI_SUCCESS != (ret = coord_basic_barrier() ) ) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: ft_event_coordinate_peers: Basic Barrier Failed %d",
ret);
exit_status = ret;
goto DONE;
}
END_TIMER(CRCP_TIMER_CKPT_BARR);
opal_output_verbose(5, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, mca_crcp_coord_component.super.output_handle,
"crcp:coord: ft_event_coordinate_peers: %s Coordination Finished...\n",
ORTE_NAME_PRINT(orte_process_info.my_name) );
ORTE_NAME_PRINT(orte_process_info.my_name) ));
/*
* Now that all our peer channels are marked as drained
@ -3171,13 +3145,13 @@ static int ft_event_check_bookmarks(void)
if( 10 <= mca_crcp_coord_component.super.verbose ) {
sleep(orte_process_info.my_name->vpid);
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"Process %s Match Table",
ORTE_NAME_PRINT(orte_process_info.my_name));
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
ORTE_NAME_PRINT(orte_process_info.my_name)));
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"%s %5s | %7s | %7s | %7s | %7s |",
ORTE_NAME_PRINT(orte_process_info.my_name),
"Vpid", "T_Send", "M_Recv", "M_Send", "T_Recv");
"Vpid", "T_Send", "M_Recv", "M_Send", "T_Recv"));
for(item = opal_list_get_first(&ompi_crcp_coord_pml_peer_refs);
item != opal_list_get_end(&ompi_crcp_coord_pml_peer_refs);
@ -3200,11 +3174,11 @@ static int ft_event_check_bookmarks(void)
peer_ref->matched_irecv_msgs +
peer_ref->matched_recv_init_msgs );
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"%s %5d | %7d | %7d | %7d | %7d |",
ORTE_NAME_PRINT(orte_process_info.my_name),
peer_ref->proc_name.vpid,
t_send, m_recv, m_send, t_recv);
t_send, m_recv, m_send, t_recv));
}
}
@ -3255,7 +3229,7 @@ static int ft_event_check_bookmarks(void)
/* I've send more than my peer has received,
* so need to coordinate with peer. */
if( p_n_to_p_m > p_n_from_p_m) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: check_bookmarks: %s --> %s "
"Sent Msgs (%4d) = Received Msgs (%4d). Peer needs %4d.\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
@ -3263,7 +3237,7 @@ static int ft_event_check_bookmarks(void)
p_n_to_p_m,
p_n_from_p_m,
(p_n_to_p_m - p_n_from_p_m)
);
));
/*
* Tell the peer what the outstanding messages looked like.
* Since we can't tell which ones they are, we need to send the
@ -3306,7 +3280,7 @@ static int ft_event_check_bookmarks(void)
/* I've recv'ed less than my peer has sent,
* so need to coordinate with peer. */
if( p_n_to_p_m > p_n_from_p_m) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: check_bookmarks: %s <-- %s "
"Received Msgs (%4d) = Sent Msgs (%4d). I need %4d.\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
@ -3314,7 +3288,7 @@ static int ft_event_check_bookmarks(void)
p_n_to_p_m,
p_n_from_p_m,
(p_n_to_p_m - p_n_from_p_m)
);
));
/*
* Receive from peer the datatypes of the outstanding sends
* As we figure out what they are post Irecv's for them into a drained buffer list.
@ -3358,7 +3332,7 @@ static int ft_event_check_bookmarks(void)
/* I've recv'ed less than my peer has sent,
* so need to coordinate with peer. */
if( p_n_to_p_m > p_n_from_p_m) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: check_bookmarks: %s <-- %s "
"Received Msgs (%4d) = Sent Msgs (%4d). I need %4d.\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
@ -3366,7 +3340,7 @@ static int ft_event_check_bookmarks(void)
p_n_to_p_m,
p_n_from_p_m,
(p_n_to_p_m - p_n_from_p_m)
);
));
/*
* Receive from peer the datatypes of the outstanding sends
* As we figure out what they are post Irecv's for them into a drained buffer list.
@ -3408,7 +3382,7 @@ static int ft_event_check_bookmarks(void)
/* I've send more than my peer has received,
* so need to coordinate with peer. */
if( p_n_to_p_m > p_n_from_p_m) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: check_bookmarks: %s --> %s "
"Sent Msgs (%4d) = Received Msgs (%4d). Peer needs %4d.\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
@ -3416,7 +3390,7 @@ static int ft_event_check_bookmarks(void)
p_n_to_p_m,
p_n_from_p_m,
(p_n_to_p_m - p_n_from_p_m)
);
));
/*
* Tell the peer what the outstanding messages looked like.
* Since we can't tell which ones they are, we need to send the
@ -3457,10 +3431,10 @@ static int ft_event_post_drain_acks(void)
return OMPI_SUCCESS;
}
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: post_drain_ack: %s Wait on %d Drain ACK Messages.\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
(int)req_size);
(int)req_size));
/*
* We have loaded our peer with the message information
@ -3518,10 +3492,10 @@ static void drain_message_ack_cbfunc(int status,
drain_msg_ack->peer.vpid == sender->vpid ) {
/* We found it! */
drain_msg_ack->complete = true;
opal_output_verbose(5, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, mca_crcp_coord_component.super.output_handle,
"crcp:coord: drain_message_ack_cbfunc: %s --> %s Received ACK of FLUSH from peer\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
ORTE_NAME_PRINT(sender) );
ORTE_NAME_PRINT(sender) ));
return;
}
}
@ -3547,10 +3521,10 @@ static int ft_event_post_drained(void)
return OMPI_SUCCESS;
}
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: post_drained: %s Draining %d Messages.\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
(int)req_size);
(int)req_size));
/*
* For each message in the drained list,
@ -3568,19 +3542,19 @@ static int ft_event_post_drained(void)
* we have requests for
*/
if( drain_msg->already_posted ) {
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: post_drained: %s Found a message that we don't need to post.\n",
ORTE_NAME_PRINT(orte_process_info.my_name));
ORTE_NAME_PRINT(orte_process_info.my_name)));
continue;
}
/*
* Post a receive to drain this message
*/
else {
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: post_drained: %s Posting a message to be drained from %d.\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
drain_msg->rank);
drain_msg->rank));
if( OMPI_SUCCESS != (ret = wrapped_pml_module->pml_irecv(drain_msg->buffer,
(drain_msg->count * drain_msg->ddt_size),
drain_msg->datatype,
@ -3652,10 +3626,10 @@ static int wait_quiesce_drained(void)
return OMPI_SUCCESS;
}
opal_output_verbose(5, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, mca_crcp_coord_component.super.output_handle,
"crcp:coord: wait_quiesce_drained: %s Waiting on %d messages to drain\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
(int)req_size);
(int)req_size));
/*
* Create an array of requests to wait upon, and associated statuses
@ -3707,17 +3681,17 @@ static int wait_quiesce_drained(void)
* Create the array of requests to wait on
*/
if( drain_msg->already_posted && NULL == drain_msg->request) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: wait_quiesce_drained: %s - %s Already posted this msg.\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
ORTE_NAME_PRINT(&(drain_msg->proc_name)) );
ORTE_NAME_PRINT(&(drain_msg->proc_name)) ));
}
else {
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: wait_quiesce_drained: %s - %s Waiting on message. (index = %d)\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
ORTE_NAME_PRINT(&(drain_msg->proc_name)),
(int)wait_any_count);
(int)wait_any_count));
wait_any_requests[wait_any_count] = drain_msg->request;
wait_any_status[wait_any_count] = &drain_msg->status;
@ -3737,11 +3711,11 @@ static int wait_quiesce_drained(void)
}
}
if( !found ) {
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: wait_quiesce: %s - %s Add process to response list [idx %d]\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
ORTE_NAME_PRINT(&(drain_msg->proc_name)),
(int)last_proc_idx);
(int)last_proc_idx));
proc_names[last_proc_idx].jobid = drain_msg->proc_name.jobid;
proc_names[last_proc_idx].vpid = drain_msg->proc_name.vpid;
@ -3765,9 +3739,9 @@ static int wait_quiesce_drained(void)
/*
* Send ACKs to all peers
*/
opal_output_verbose(5, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((5, mca_crcp_coord_component.super.output_handle,
"crcp:coord: wait_quiesce: %s Send ACKs to all Peers\n",
ORTE_NAME_PRINT(orte_process_info.my_name));
ORTE_NAME_PRINT(orte_process_info.my_name)));
for(i = 0; i < last_proc_idx; ++i) {
orte_buffer_t *buffer = NULL;
@ -3845,10 +3819,10 @@ static int coord_request_wait_all( size_t count,
coord_request_wait(req, status);
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: request_wait_all: %s Done with idx %d of %d\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
(int)i, (int)count);
(int)i, (int)count));
}
return exit_status;
@ -3882,10 +3856,10 @@ static int wait_quiesce_drain_ack(void)
return OMPI_SUCCESS;
}
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: wait_quiesce_drain_ack: %s Waiting on %d Drain ACK messages\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
num_outstanding);
num_outstanding));
while(0 < num_outstanding) {
for(item = opal_list_get_first(&drained_msg_ack_list);
@ -3937,7 +3911,7 @@ static int send_bookmarks(int peer_idx)
goto cleanup;
}
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: send_bookmarks: %s -> %s Sending bookmark S[%4d,%4d,%4d] R[%4d,%4d,%4d]\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
ORTE_NAME_PRINT(&peer_name),
@ -3946,7 +3920,7 @@ static int send_bookmarks(int peer_idx)
peer_ref->total_send_init_msgs,
peer_ref->total_recv_msgs,
peer_ref->total_irecv_msgs,
peer_ref->total_recv_init_msgs);
peer_ref->total_recv_init_msgs));
/*
* Send the bookmarks to peer
@ -4088,7 +4062,7 @@ static int recv_bookmarks(int peer_idx)
"crcp:coord: recv_bookmarks: Unable to unpack total_recv_init_msgs");
peer_ref->matched_recv_init_msgs = tmp_int;
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv_bookmarks: %s <- %s Received bookmark S[%4d,%4d,%4d] R[%4d,%4d,%4d]\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
ORTE_NAME_PRINT(&peer_name),
@ -4097,7 +4071,7 @@ static int recv_bookmarks(int peer_idx)
peer_ref->matched_send_init_msgs,
peer_ref->matched_recv_msgs,
peer_ref->matched_irecv_msgs,
peer_ref->matched_recv_init_msgs);
peer_ref->matched_recv_init_msgs));
cleanup:
if(NULL != buffer) {
@ -4157,7 +4131,7 @@ static void recv_bookmarks_cbfunc(int status,
"crcp:coord: recv_bookmarks: Unable to unpack total_recv_init_msgs");
peer_ref->matched_recv_init_msgs = tmp_int;
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv_bookmarks: %s <- %s Received bookmark S[%4d,%4d,%4d] R[%4d,%4d,%4d]\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
ORTE_NAME_PRINT(sender),
@ -4166,7 +4140,7 @@ static void recv_bookmarks_cbfunc(int status,
peer_ref->matched_send_init_msgs,
peer_ref->matched_recv_msgs,
peer_ref->matched_irecv_msgs,
peer_ref->matched_recv_init_msgs);
peer_ref->matched_recv_init_msgs));
cleanup:
END_TIMER(CRCP_TIMER_CKPT_PEER_R);
@ -4249,9 +4223,9 @@ static int send_msg_details(ompi_crcp_coord_pml_peer_ref_t *peer_ref,
ALL_SENT:
if( need > found ) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: send_msg_details: ERROR: ****** Need (%d) vs Found (%d)",
need, found);
need, found));
}
assert(need <= found);
@ -4264,10 +4238,10 @@ static int send_msg_details(ompi_crcp_coord_pml_peer_ref_t *peer_ref,
d_msg_ack->peer.vpid = peer_ref->proc_name.vpid;
d_msg_ack->complete = false;
opal_list_append(&drained_msg_ack_list, &(d_msg_ack->super));
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: send_msg_details: %s <--> %s Will wait on ACK from this peer.\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
ORTE_NAME_PRINT(&(peer_ref->proc_name)));
ORTE_NAME_PRINT(&(peer_ref->proc_name))));
/*
* If we know that we are in the middle of a blocking send/recv then we
@ -4585,7 +4559,7 @@ static int do_recv_msg_detail_check(ompi_crcp_coord_pml_peer_ref_t *peer_ref,
goto cleanup;
}
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv_msg_detail_check: %s -- %s"
" found %s, complete %s, posted %s, peer_rank=[%d vs %d]\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
@ -4594,7 +4568,7 @@ static int do_recv_msg_detail_check(ompi_crcp_coord_pml_peer_ref_t *peer_ref,
(true == msg_complete ? "True " : "False"),
(true == msg_already_posted ? "True " : "False"),
(NULL == posted_msg_ref ? -1 : posted_msg_ref->rank),
peer_ref->proc_name.vpid);
peer_ref->proc_name.vpid));
/*
* The message was not found
@ -4604,9 +4578,9 @@ static int do_recv_msg_detail_check(ompi_crcp_coord_pml_peer_ref_t *peer_ref,
if( !msg_found ) {
ompi_crcp_coord_pml_message_ref_t *d_msg = NULL;
opal_output_verbose(15, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((15, mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv_msg_detail_check: %s Found a message that needs to be drained\n",
ORTE_NAME_PRINT(orte_process_info.my_name) );
ORTE_NAME_PRINT(orte_process_info.my_name) ));
/*
* Construct a message for draining
@ -4662,10 +4636,10 @@ static int do_recv_msg_detail_check(ompi_crcp_coord_pml_peer_ref_t *peer_ref,
else if( msg_already_posted ) {
ompi_crcp_coord_pml_message_ref_t *d_msg = NULL;
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv_msg_detail_check: %s "
"Found a message already posted! Prepare to drain.\n",
ORTE_NAME_PRINT(orte_process_info.my_name));
ORTE_NAME_PRINT(orte_process_info.my_name)));
/*
* If this is the current blocking recv,
@ -4673,10 +4647,10 @@ static int do_recv_msg_detail_check(ompi_crcp_coord_pml_peer_ref_t *peer_ref,
*/
if( current_msg_id == posted_msg_ref->msg_id &&
COORD_MSG_TYPE_B_RECV == posted_msg_ref->msg_type) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv_msg_detail_check: %s "
"Found a message already posted! Prepare to STALL.\n",
ORTE_NAME_PRINT(orte_process_info.my_name));
ORTE_NAME_PRINT(orte_process_info.my_name)));
stall_for_completion = true;
}
/*
@ -4684,14 +4658,14 @@ static int do_recv_msg_detail_check(ompi_crcp_coord_pml_peer_ref_t *peer_ref,
* current blocking recv
*/
else {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv_msg_detail_check: %s "
"Found a message already posted! No stall required [%3d, %3d, %3d, %3d].\n",
ORTE_NAME_PRINT(orte_process_info.my_name),
(int)current_msg_id,
(int)current_msg_type,
(int)posted_msg_ref->msg_id,
(int)posted_msg_ref->msg_type);
(int)posted_msg_ref->msg_type));
; /* JJH CHECK - peer_ref->total_drained_msgs += 1; */
}
@ -5000,9 +4974,9 @@ static int find_message_named(opal_list_t * search_list,
(peer == INVALID_INT || msg_ref->rank == peer) &&
msg_ref->ddt_size == ddt_size) {
opal_output_verbose(30, mca_crcp_coord_component.super.output_handle,
OPAL_OUTPUT_VERBOSE((30, mca_crcp_coord_component.super.output_handle,
"crcp:coord: find_message_named: Found Message -- Comm list (%d, %d)\n",
tag, peer);
tag, peer));
*found_msg_ref = msg_ref;
return OMPI_SUCCESS;
@ -5045,124 +5019,6 @@ static int do_recv_msg_detail_resp(ompi_crcp_coord_pml_peer_ref_t *peer_ref,
return exit_status;
}
/* Staggered alltoall */
static int coord_basic_barrier(void)
{
int peer_idx = 0;
int my_idx = orte_process_info.my_name->vpid;
int iter = 0;
int num_peers = 0;
num_peers = opal_list_get_size(&ompi_crcp_coord_pml_peer_refs);
for( peer_idx = (num_peers - my_idx - 1), iter = 0;
iter < num_peers;
peer_idx = (peer_idx + 1) % num_peers, ++iter)
{
if(my_idx > peer_idx) {
/* Send our bookmark status */
coord_basic_barrier_send(peer_idx);
/* Recv peer bookmark status */
coord_basic_barrier_recv(peer_idx);
}
else if(my_idx < peer_idx) {
/* Recv peer bookmark status */
coord_basic_barrier_recv(peer_idx);
/* Send our bookmark status */
coord_basic_barrier_send(peer_idx);
}
}
return OMPI_SUCCESS;
}
/* Paired with coord_basic_barrier_recv */
static int coord_basic_barrier_send(int peer_idx)
{
orte_process_name_t peer_name;
orte_buffer_t *buffer = NULL;
int value = 1234;
int exit_status = OMPI_SUCCESS;
int ret;
/*
* Find the peer structure for this peer
*/
peer_name.jobid = orte_process_info.my_name->jobid;
peer_name.vpid = peer_idx;
/*
* Send the bookmarks to peer
*/
if (NULL == (buffer = OBJ_NEW(orte_buffer_t))) {
exit_status = OMPI_ERROR;
goto cleanup;
}
PACK_BUFFER(buffer, (value), 1, ORTE_UINT32,
"crcp:coord: coord_basic_barrier_send: Unable to pack ACK");
/* JJH -- Really Establish TAG in rml_types.h */
if ( 0 > ( ret = orte_rml.send_buffer(&peer_name, buffer, OMPI_CRCP_COORD_BOOKMARK_TAG+1, 0)) ) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: coord_basic_barrier_send: Failed to send ACK to peer %s: Return %d\n",
ORTE_NAME_PRINT(&peer_name),
ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if(NULL != buffer) {
OBJ_RELEASE(buffer);
}
return exit_status;
}
/* Paired with coord_basic_barrier_send */
static int coord_basic_barrier_recv(int peer_idx)
{
orte_process_name_t peer_name;
orte_buffer_t * buffer = NULL;
int value = 0;
int exit_status = OMPI_SUCCESS;
int ret;
/*
* Find the peer structure for this peer
*/
peer_name.jobid = orte_process_info.my_name->jobid;
peer_name.vpid = peer_idx;
/*
* Receive the bookmark from peer
*/
if (NULL == (buffer = OBJ_NEW(orte_buffer_t))) {
exit_status = ORTE_ERROR;
goto cleanup;
}
if ( 0 > (ret = orte_rml.recv_buffer(&peer_name, buffer, OMPI_CRCP_COORD_BOOKMARK_TAG+1, 0) ) ) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv_bookmarks: Failed to receive bookmark from peer %s: Return %d\n",
ORTE_NAME_PRINT(&peer_name),
ret);
exit_status = ret;
goto cleanup;
}
UNPACK_BUFFER(buffer, value, 1, ORTE_UINT32,
"crcp:coord: recv_bookmarks: Unable to unpack total_send_msgs");
cleanup:
if(NULL != buffer) {
OBJ_RELEASE(buffer);
}
return exit_status;
}
/**************** Timing functionality ********************/
static void start_time(int idx) {
if(idx < CRCP_TIMER_MAX ) {