1
1

Implement the MPI_Iprobe and MPI_Probe wrappers.

Remove some old, unused code.

This commit was SVN r16178.
Этот коммит содержится в:
Josh Hursey 2007-09-21 16:28:46 +00:00
родитель 8bdd14ba40
Коммит 3e51d7bb25
2 изменённых файлов: 128 добавлений и 228 удалений

Просмотреть файл

@ -185,6 +185,9 @@
/************************************
* Locally Global vars
************************************/
#define PROBE_ANY_SIZE ((size_t) 0)
#define PROBE_ANY_COUNT ((size_t) 0)
/* Pointers to the 'real' PML */
static mca_pml_base_component_t *wrapped_pml_component = NULL;
static mca_pml_base_module_t *wrapped_pml_module = NULL;
@ -872,9 +875,66 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_iprobe(
ompi_status_public_t* status,
ompi_crcp_base_pml_state_t* pml_state )
{
ompi_crcp_coord_pml_message_ref_t *drain_msg_ref = NULL;
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe()");
pml_state->error_code = OMPI_SUCCESS;
"crcp:coord: pml_iprobe(%d, %d)", dst, tag);
/*
* Before PML Call
* - Determine if this can be satisfied from the drained list
* - Otherwise let the PML handle it
*/
if( OMPI_CRCP_PML_PRE == pml_state->state) {
/*
* Check to see if this message is in the drained message list
*/
if( OMPI_SUCCESS != (ret = find_drained_msg(PROBE_ANY_SIZE, PROBE_ANY_COUNT,
tag, dst,
&drain_msg_ref) ) ) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe(): Failed trying to find a drained message."
" This should never happen. (%d)",
ret);
exit_status = ret;
goto DONE;
}
/*
* If the message is a drained message
* - Copy of the status structure to pass back to the user
* - Mark the 'matched' flag as true
*/
if( NULL != drain_msg_ref ) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe(): Matched a drained message...");
/* Copy the status information */
if( MPI_STATUS_IGNORE != status ) {
memcpy(status, &drain_msg_ref->status, sizeof(ompi_status_public_t));
}
/* Mark as complete */
*matched = 1;
/* This will identify to the wrapper that this message is complete */
pml_state->state = OMPI_CRCP_PML_DONE;
pml_state->error_code = OMPI_SUCCESS;
return pml_state;
}
/*
* Otherwise the message is not drained (common case), so let the PML deal with it
*/
else {
/* Mark as not complete */
*matched = 0;
}
}
DONE:
pml_state->error_code = exit_status;
return pml_state;
}
@ -884,9 +944,55 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_probe(
ompi_status_public_t* status,
ompi_crcp_base_pml_state_t* pml_state )
{
ompi_crcp_coord_pml_message_ref_t *drain_msg_ref = NULL;
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_probe()");
pml_state->error_code = OMPI_SUCCESS;
"crcp:coord: pml_probe(%d, %d)", dst, tag);
/*
* Before PML Call
* - Determine if this can be satisfied from the drained list
* - Otherwise let the PML handle it
*/
if( OMPI_CRCP_PML_PRE == pml_state->state) {
/*
* Check to see if this message is in the drained message list
*/
if( OMPI_SUCCESS != (ret = find_drained_msg(PROBE_ANY_SIZE, PROBE_ANY_COUNT,
tag, dst,
&drain_msg_ref) ) ) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_probe(): Failed trying to find a drained message."
" This should never happen. (%d)",
ret);
exit_status = ret;
goto DONE;
}
/*
* If the message is a drained message
* - Copy of the status structure to pass back to the user
*/
if( NULL != drain_msg_ref ) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_iprobe(): Matched a drained message...");
/* Copy the status information */
if( MPI_STATUS_IGNORE != status ) {
memcpy(status, &drain_msg_ref->status, sizeof(ompi_status_public_t));
}
/* This will identify to the wrapper that this message is complete */
pml_state->state = OMPI_CRCP_PML_DONE;
pml_state->error_code = OMPI_SUCCESS;
return pml_state;
}
}
DONE:
pml_state->error_code = exit_status;
return pml_state;
}
@ -1592,224 +1698,6 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_irecv(
return pml_state;
}
#if 0
/* JJH Alternative - RECV -> IRECV/WAIT -- Testing... */
ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_recv(
void *buf, size_t count,
ompi_datatype_t *datatype,
int src, int tag,
struct ompi_communicator_t* comm,
ompi_status_public_t* status,
ompi_crcp_base_pml_state_t* pml_state)
{
ompi_crcp_coord_pml_peer_ref_t *peer_ref = NULL;
ompi_crcp_coord_pml_message_ref_t *msg_ref = NULL;
ompi_crcp_coord_pml_message_ref_t *drain_msg_ref = NULL;
int exit_status = OMPI_SUCCESS;
int ret;
opal_output_verbose(20, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_recv()");
if( OMPI_CRCP_PML_PRE != pml_state->state) {
pml_state->error_code = OMPI_SUCCESS;
return pml_state;
}
/*
* - Determine if this can be satisfied from the drained list
* - Otherwise create a new reference to it so we can track it
*/
/*
* Check to see if this message is in the drained message list
*/
if( OMPI_SUCCESS != (ret = find_drained_msg(datatype->size,
count, tag, src,
&drain_msg_ref) ) ) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_recv(): Failed trying to find a drained message."
" This should never happen. (%d)",
ret);
exit_status = ret;
goto DONE;
}
/*
* If the message is a drained message
* - Complete it right now
* - We do not need to increment any counters here since we already have
* when we originally drained the message.
*/
if( NULL != drain_msg_ref ) {
opal_output_verbose(10, mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_recv(): Matched a drained message...");
/* Copy the drained message */
src = drain_msg_ref->rank;
tag = drain_msg_ref->tag;
if( 0 != ompi_ddt_copy_content_same_ddt(datatype, count,
buf, drain_msg_ref->buffer) ) {
opal_output( mca_crcp_coord_component.super.output_handle,
"crcp:coord: pml_recv(): Datatype copy failed (%d)",
ret);
}
if( MPI_STATUS_IGNORE != status ) {
memcpy(status, &drain_msg_ref->status, sizeof(ompi_status_public_t));
}
/* Remove the message from the list */
opal_list_remove_item(&drained_msg_list, &(drain_msg_ref->super));
drain_msg_ref->request = NULL;
OBJ_RELEASE(drain_msg_ref);
/*
* Find the peer for this source
*/
if( NULL == peer_ref ) {
if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv: Failed to find peer_ref\n");
exit_status = ret;
goto DONE;
}
if( NULL == peer_ref ) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv: Failed to find peer_ref - peer_ref is NULL\n");
exit_status = ret;
goto DONE;
}
}
peer_ref->total_drained_msgs -= 1;
/* Do *not* increment:
* peer_ref->total_recv_msgs += 1;
* Because we accounted for this message during the last checkpoint.
*/
}
/*
* Otherwise the message is not drained (common case)
* Magically convert RECV to IRECV+WAIT
*/
else {
ompi_request_t *request;
request = (ompi_request_t *)malloc(sizeof(ompi_request_t));
/*
* Post the IRECV directly to the PML
*/
if( OMPI_SUCCESS != (ret = wrapped_pml_module->pml_irecv(buf, count,
datatype,
src, tag,
comm,
&request))) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv: Failed to post PML Irecv\n");
exit_status = ret;
goto DONE;
}
/*
* Create a new Message Object
*/
CREATE_NEW_MSG(msg_ref, COORD_MSG_TYPE_B_RECV,
buf,
count, datatype, tag, src,
comm, request,
ORTE_JOBID_INVALID,
ORTE_VPID_INVALID);
msg_ref->matched = false;
msg_ref->done = false;
msg_ref->active = true;
msg_ref->already_posted = true;
current_msg_id = msg_ref->msg_id;
current_msg_type = COORD_MSG_TYPE_B_RECV;
/*
* Find the Peer
*/
if( MPI_ANY_SOURCE == src || src < 0) {
opal_list_append(&(unknown_recv_from_list), &(msg_ref->super));
}
else {
if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv: Failed to find peer_ref\n");
exit_status = ret;
goto DONE;
}
if( NULL == peer_ref ) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv: Failed to find peer_ref - peer_ref is NULL\n");
exit_status = ret;
goto DONE;
}
msg_ref->proc_name.jobid = peer_ref->proc_name.jobid;
msg_ref->proc_name.vpid = peer_ref->proc_name.vpid;
opal_list_append(&(peer_ref->recv_list), &(msg_ref->super));
}
/*
* Wait for the irecv to complete
*/
coord_request_wait(request, status);
/*
* If MPI_ANY_SOUCE, then move the message from the unknown list
* to the list associated with the resolved process.
*/
if( NULL == peer_ref ) {
src = status->MPI_SOURCE;
if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv: Failed to resolve peer_ref (rank %d)\n",
src);
exit_status = ret;
goto DONE;
}
if( NULL == peer_ref ) {
opal_output(mca_crcp_coord_component.super.output_handle,
"crcp:coord: recv: Failed to resolve peer_ref (rank %d) - peer_ref is NULL\n",
src);
exit_status = ret;
goto DONE;
}
msg_ref->proc_name.jobid = peer_ref->proc_name.jobid;
msg_ref->proc_name.vpid = peer_ref->proc_name.vpid;
opal_list_remove_item(&(unknown_recv_from_list), &(msg_ref->super));
opal_list_append(&(peer_ref->recv_list), &(msg_ref->super));
}
/*
* Do the update
* - some Booleans already set...
* msg_ref->matched = false;
* msg_ref->already_posted = true;
*/
msg_ref->done = true;
msg_ref->active = false;
peer_ref->total_recv_msgs += 1;
current_msg_id = 0;
current_msg_type = COORD_MSG_TYPE_UNKNOWN;
}
/* This will identify to the wrapper that this message is complete */
pml_state->state = OMPI_CRCP_PML_DONE;
DONE:
pml_state->error_code = exit_status;
return pml_state;
}
#else
/* JJH Alternative - RECV -> IRECV/WAIT */
ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_recv(
void *buf, size_t count,
ompi_datatype_t *datatype,
@ -2027,8 +1915,6 @@ ompi_crcp_base_pml_state_t* ompi_crcp_coord_pml_recv(
pml_state->error_code = exit_status;
return pml_state;
}
#endif
/* JJH End Alternative - RECV -> IRECV/WAIT */
/**************** Start *****************/
@ -3009,10 +2895,14 @@ static int find_drained_msg(size_t ddt_size,
}
}
/* Check the datatype size and count to make sure it matches */
if((drain_msg->count ) != count ||
(drain_msg->ddt_size) != ddt_size) {
continue;
/* Check the datatype size, if specified for a match */
if( ddt_size != PROBE_ANY_SIZE &&
count != PROBE_ANY_COUNT) {
/* Check the datatype size and count to make sure it matches */
if((drain_msg->count ) != count ||
(drain_msg->ddt_size) != ddt_size) {
continue;
}
}
/* If we get here then the message matches */

Просмотреть файл

@ -278,6 +278,10 @@ int mca_pml_crcpw_iprobe(int dst, int tag, struct ompi_communicator_t* comm, int
return ret;
}
if( OMPI_CRCP_PML_DONE == pml_state->state) {
goto CLEANUP;
}
if( OMPI_CRCP_PML_SKIP != pml_state->state) {
if( OMPI_SUCCESS != (ret = mca_pml_crcpw_module.wrapped_pml_module.pml_iprobe(dst, tag, comm, matched, status) ) ) {
PML_CRCP_STATE_RETURN(pml_state);
@ -293,6 +297,7 @@ int mca_pml_crcpw_iprobe(int dst, int tag, struct ompi_communicator_t* comm, int
return ret;
}
CLEANUP:
PML_CRCP_STATE_RETURN(pml_state);
return OMPI_SUCCESS;
@ -316,6 +321,10 @@ int mca_pml_crcpw_probe( int dst, int tag, struct ompi_communicator_t* comm, omp
return ret;
}
if( OMPI_CRCP_PML_DONE == pml_state->state) {
goto CLEANUP;
}
if( OMPI_CRCP_PML_SKIP != pml_state->state) {
if( OMPI_SUCCESS != (ret = mca_pml_crcpw_module.wrapped_pml_module.pml_probe(dst, tag, comm, status) ) ) {
PML_CRCP_STATE_RETURN(pml_state);
@ -331,6 +340,7 @@ int mca_pml_crcpw_probe( int dst, int tag, struct ompi_communicator_t* comm, omp
return ret;
}
CLEANUP:
PML_CRCP_STATE_RETURN(pml_state);
return OMPI_SUCCESS;