pml/ob1: Add support for dynamically calling add_procs
This commit contains the following changes: - pml/ob1: use the bml accessor function when requesting a bml endpoint. this will ensure that bml endpoints are only created when needed. for example, a bml endpoint is not requested and not allocated when receiving an eager message from a peer. - pml/ob1: change the pml_procs array in the ob1 communicator to a proc pointer array. at the cost of a single level of extra redirection this will allow us to allocate pml procs on demand. - pml/ob1: add an accessor function to access the pml proc structure for a given peer. this function will allocate the proc if it doesn't already exist. Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
Этот коммит содержится в:
родитель
6fa6513003
Коммит
b4a0d40915
@ -191,11 +191,9 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
{
|
||||
/* allocate pml specific comm data */
|
||||
mca_pml_ob1_comm_t* pml_comm = OBJ_NEW(mca_pml_ob1_comm_t);
|
||||
opal_list_item_t *item, *next_item;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
mca_pml_ob1_recv_frag_t *frag, *next_frag;
|
||||
mca_pml_ob1_comm_proc_t* pml_proc;
|
||||
mca_pml_ob1_match_hdr_t* hdr;
|
||||
int i;
|
||||
|
||||
if (NULL == pml_comm) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
@ -210,16 +208,8 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
mca_pml_ob1_comm_init_size(pml_comm, comm->c_remote_group->grp_proc_count);
|
||||
comm->c_pml_comm = pml_comm;
|
||||
|
||||
for( i = 0; i < comm->c_remote_group->grp_proc_count; i++ ) {
|
||||
pml_comm->procs[i].ompi_proc = ompi_group_peer_lookup(comm->c_remote_group,i);
|
||||
OBJ_RETAIN(pml_comm->procs[i].ompi_proc);
|
||||
}
|
||||
/* Grab all related messages from the non_existing_communicator pending queue */
|
||||
for( item = opal_list_get_first(&mca_pml_ob1.non_existing_communicator_pending);
|
||||
item != opal_list_get_end(&mca_pml_ob1.non_existing_communicator_pending);
|
||||
item = next_item ) {
|
||||
frag = (mca_pml_ob1_recv_frag_t*)item;
|
||||
next_item = opal_list_get_next(item);
|
||||
OPAL_LIST_FOREACH_SAFE(frag, next_frag, &mca_pml_ob1.non_existing_communicator_pending, mca_pml_ob1_recv_frag_t) {
|
||||
hdr = &frag->hdr.hdr_match;
|
||||
|
||||
/* Is this fragment for the current communicator ? */
|
||||
@ -229,8 +219,8 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
/* As we now know we work on a fragment for this communicator
|
||||
* we should remove it from the
|
||||
* non_existing_communicator_pending list. */
|
||||
opal_list_remove_item( &mca_pml_ob1.non_existing_communicator_pending,
|
||||
item );
|
||||
opal_list_remove_item (&mca_pml_ob1.non_existing_communicator_pending,
|
||||
(opal_list_item_t *) frag);
|
||||
|
||||
add_fragment_to_unexpected:
|
||||
|
||||
@ -249,7 +239,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
* We just have to push the fragment into the unexpected list of the corresponding
|
||||
* proc, or into the out-of-order (cant_match) list.
|
||||
*/
|
||||
pml_proc = &(pml_comm->procs[hdr->hdr_src]);
|
||||
pml_proc = mca_pml_ob1_peer_lookup(comm, hdr->hdr_src);
|
||||
|
||||
if( ((uint16_t)hdr->hdr_seq) == ((uint16_t)pml_proc->expected_sequence) ) {
|
||||
/* We're now expecting the next sequence number. */
|
||||
@ -283,12 +273,6 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
|
||||
|
||||
int mca_pml_ob1_del_comm(ompi_communicator_t* comm)
|
||||
{
|
||||
mca_pml_ob1_comm_t* pml_comm = comm->c_pml_comm;
|
||||
int i;
|
||||
|
||||
for( i = 0; i < comm->c_remote_group->grp_proc_count; i++ ) {
|
||||
OBJ_RELEASE(pml_comm->procs[i].ompi_proc);
|
||||
}
|
||||
OBJ_RELEASE(comm->c_pml_comm);
|
||||
comm->c_pml_comm = NULL;
|
||||
return OMPI_SUCCESS;
|
||||
@ -303,9 +287,9 @@ int mca_pml_ob1_del_comm(ompi_communicator_t* comm)
|
||||
|
||||
int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
{
|
||||
mca_btl_base_selected_module_t *sm;
|
||||
opal_bitmap_t reachable;
|
||||
int rc;
|
||||
opal_list_item_t *item;
|
||||
|
||||
if(nprocs == 0)
|
||||
return OMPI_SUCCESS;
|
||||
@ -347,11 +331,7 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
BTLs requires iterating over the procs, as the BML does not
|
||||
expose all currently in use btls. */
|
||||
|
||||
for (item = opal_list_get_first(&mca_btl_base_modules_initialized) ;
|
||||
item != opal_list_get_end(&mca_btl_base_modules_initialized) ;
|
||||
item = opal_list_get_next(item)) {
|
||||
mca_btl_base_selected_module_t *sm =
|
||||
(mca_btl_base_selected_module_t*) item;
|
||||
OPAL_LIST_FOREACH(sm, &mca_btl_base_modules_initialized, mca_btl_base_selected_module_t) {
|
||||
if (sm->btl_module->btl_eager_limit < sizeof(mca_pml_ob1_hdr_t)) {
|
||||
opal_show_help("help-mpi-pml-ob1.txt", "eager_limit_too_small",
|
||||
true,
|
||||
@ -589,13 +569,19 @@ int mca_pml_ob1_dump(struct ompi_communicator_t* comm, int verbose)
|
||||
|
||||
/* iterate through all procs on communicator */
|
||||
for( i = 0; i < (int)pml_comm->num_procs; i++ ) {
|
||||
mca_pml_ob1_comm_proc_t* proc = &pml_comm->procs[i];
|
||||
mca_pml_ob1_comm_proc_t* proc = pml_comm->procs[i];
|
||||
|
||||
if (NULL == proc) {
|
||||
continue;
|
||||
}
|
||||
|
||||
mca_bml_base_endpoint_t* ep = (mca_bml_base_endpoint_t*)proc->ompi_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_BML];
|
||||
size_t n;
|
||||
|
||||
opal_output(0, "[Rank %d] expected_seq %d ompi_proc %p send_seq %d\n",
|
||||
i, proc->expected_sequence, (void*) proc->ompi_proc,
|
||||
proc->send_sequence);
|
||||
|
||||
/* dump all receive queues */
|
||||
if( opal_list_get_size(&proc->specific_receives) ) {
|
||||
opal_output(0, "expected specific receives\n");
|
||||
|
@ -40,14 +40,15 @@ static void mca_pml_ob1_comm_proc_destruct(mca_pml_ob1_comm_proc_t* proc)
|
||||
OBJ_DESTRUCT(&proc->frags_cant_match);
|
||||
OBJ_DESTRUCT(&proc->specific_receives);
|
||||
OBJ_DESTRUCT(&proc->unexpected_frags);
|
||||
if (proc->ompi_proc) {
|
||||
OBJ_RELEASE(proc->ompi_proc);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static OBJ_CLASS_INSTANCE(
|
||||
mca_pml_ob1_comm_proc_t,
|
||||
opal_object_t,
|
||||
mca_pml_ob1_comm_proc_construct,
|
||||
mca_pml_ob1_comm_proc_destruct);
|
||||
OBJ_CLASS_INSTANCE(mca_pml_ob1_comm_proc_t, opal_object_t,
|
||||
mca_pml_ob1_comm_proc_construct,
|
||||
mca_pml_ob1_comm_proc_destruct);
|
||||
|
||||
|
||||
static void mca_pml_ob1_comm_construct(mca_pml_ob1_comm_t* comm)
|
||||
@ -63,11 +64,16 @@ static void mca_pml_ob1_comm_construct(mca_pml_ob1_comm_t* comm)
|
||||
|
||||
static void mca_pml_ob1_comm_destruct(mca_pml_ob1_comm_t* comm)
|
||||
{
|
||||
size_t i;
|
||||
for(i=0; i<comm->num_procs; i++)
|
||||
OBJ_DESTRUCT((&comm->procs[i]));
|
||||
if(NULL != comm->procs)
|
||||
if (NULL != comm->procs) {
|
||||
for (size_t i = 0; i < comm->num_procs; ++i) {
|
||||
if (comm->procs[i]) {
|
||||
OBJ_RELEASE(comm->procs[i]);
|
||||
}
|
||||
}
|
||||
|
||||
free(comm->procs);
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&comm->wild_receives);
|
||||
OBJ_DESTRUCT(&comm->matching_lock);
|
||||
}
|
||||
@ -80,18 +86,13 @@ OBJ_CLASS_INSTANCE(
|
||||
mca_pml_ob1_comm_destruct);
|
||||
|
||||
|
||||
int mca_pml_ob1_comm_init_size(mca_pml_ob1_comm_t* comm, size_t size)
|
||||
int mca_pml_ob1_comm_init_size (mca_pml_ob1_comm_t* comm, size_t size)
|
||||
{
|
||||
size_t i;
|
||||
|
||||
/* send message sequence-number support - sender side */
|
||||
comm->procs = (mca_pml_ob1_comm_proc_t*)malloc(sizeof(mca_pml_ob1_comm_proc_t)*size);
|
||||
comm->procs = (mca_pml_ob1_comm_proc_t **) calloc(size, sizeof (mca_pml_ob1_comm_proc_t *));
|
||||
if(NULL == comm->procs) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
for(i=0; i<size; i++) {
|
||||
OBJ_CONSTRUCT(comm->procs+i, mca_pml_ob1_comm_proc_t);
|
||||
}
|
||||
comm->num_procs = size;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "opal/threads/mutex.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "ompi/proc/proc.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
BEGIN_C_DECLS
|
||||
|
||||
|
||||
@ -42,6 +43,7 @@ struct mca_pml_ob1_comm_proc_t {
|
||||
};
|
||||
typedef struct mca_pml_ob1_comm_proc_t mca_pml_ob1_comm_proc_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_pml_ob1_comm_proc_t);
|
||||
|
||||
/**
|
||||
* Cached on ompi_communicator_t to hold queues/state
|
||||
@ -56,7 +58,7 @@ struct mca_pml_comm_t {
|
||||
#endif
|
||||
opal_mutex_t matching_lock; /**< matching lock */
|
||||
opal_list_t wild_receives; /**< queue of unmatched wild (source process not specified) receives */
|
||||
mca_pml_ob1_comm_proc_t* procs;
|
||||
mca_pml_ob1_comm_proc_t **procs;
|
||||
size_t num_procs;
|
||||
size_t last_probed;
|
||||
};
|
||||
@ -64,6 +66,18 @@ typedef struct mca_pml_comm_t mca_pml_ob1_comm_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_pml_ob1_comm_t);
|
||||
|
||||
static inline mca_pml_ob1_comm_proc_t *mca_pml_ob1_peer_lookup (struct ompi_communicator_t *comm, int rank)
|
||||
{
|
||||
mca_pml_ob1_comm_t *pml_comm = (mca_pml_ob1_comm_t *)comm->c_pml_comm;
|
||||
|
||||
if (OPAL_UNLIKELY(NULL == pml_comm->procs[rank])) {
|
||||
pml_comm->procs[rank] = OBJ_NEW(mca_pml_ob1_comm_proc_t);
|
||||
pml_comm->procs[rank]->ompi_proc = ompi_comm_peer_lookup (comm, rank);
|
||||
OBJ_RETAIN(pml_comm->procs[rank]->ompi_proc);
|
||||
}
|
||||
|
||||
return pml_comm->procs[rank];
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize an instance of mca_pml_ob1_comm_t based on the communicator size.
|
||||
|
@ -144,9 +144,12 @@ static int mca_pml_ob1_get_unex_msgq_size (const struct mca_base_pvar_t *pvar, v
|
||||
int i;
|
||||
|
||||
for (i = 0 ; i < comm_size ; ++i) {
|
||||
pml_proc = pml_comm->procs + i;
|
||||
|
||||
values[i] = opal_list_get_size (&pml_proc->unexpected_frags);
|
||||
pml_proc = pml_comm->procs[i];
|
||||
if (pml_proc) {
|
||||
values[i] = opal_list_get_size (&pml_proc->unexpected_frags);
|
||||
} else {
|
||||
values[i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -162,9 +165,13 @@ static int mca_pml_ob1_get_posted_recvq_size (const struct mca_base_pvar_t *pvar
|
||||
int i;
|
||||
|
||||
for (i = 0 ; i < comm_size ; ++i) {
|
||||
pml_proc = pml_comm->procs + i;
|
||||
pml_proc = pml_comm->procs[i];
|
||||
|
||||
values[i] = opal_list_get_size (&pml_proc->specific_receives);
|
||||
if (pml_proc) {
|
||||
values[i] = opal_list_get_size (&pml_proc->specific_receives);
|
||||
} else {
|
||||
values[i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -148,7 +148,6 @@ mca_pml_ob1_imrecv( void *buf,
|
||||
int src, tag;
|
||||
ompi_communicator_t *comm;
|
||||
mca_pml_ob1_comm_proc_t* proc;
|
||||
mca_pml_ob1_comm_t* ob1_comm;
|
||||
uint64_t seq;
|
||||
|
||||
/* get the request from the message and the frag from the request
|
||||
@ -158,7 +157,6 @@ mca_pml_ob1_imrecv( void *buf,
|
||||
src = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
|
||||
tag = recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG;
|
||||
comm = (*message)->comm;
|
||||
ob1_comm = recvreq->req_recv.req_base.req_comm->c_pml_comm;
|
||||
seq = recvreq->req_recv.req_base.req_sequence;
|
||||
|
||||
/* make the request a recv request again */
|
||||
@ -196,7 +194,7 @@ mca_pml_ob1_imrecv( void *buf,
|
||||
/* Note - sequence number already assigned */
|
||||
recvreq->req_recv.req_base.req_sequence = seq;
|
||||
|
||||
proc = &ob1_comm->procs[recvreq->req_recv.req_base.req_peer];
|
||||
proc = mca_pml_ob1_peer_lookup (comm, recvreq->req_recv.req_base.req_peer);
|
||||
recvreq->req_recv.req_base.req_proc = proc->ompi_proc;
|
||||
prepare_recv_req_converter(recvreq);
|
||||
|
||||
@ -243,7 +241,6 @@ mca_pml_ob1_mrecv( void *buf,
|
||||
int src, tag, rc;
|
||||
ompi_communicator_t *comm;
|
||||
mca_pml_ob1_comm_proc_t* proc;
|
||||
mca_pml_ob1_comm_t* ob1_comm;
|
||||
uint64_t seq;
|
||||
|
||||
/* get the request from the message and the frag from the request
|
||||
@ -254,7 +251,6 @@ mca_pml_ob1_mrecv( void *buf,
|
||||
src = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;
|
||||
tag = recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG;
|
||||
seq = recvreq->req_recv.req_base.req_sequence;
|
||||
ob1_comm = recvreq->req_recv.req_base.req_comm->c_pml_comm;
|
||||
|
||||
/* make the request a recv request again */
|
||||
/* The old request kept pointers to comm and the char datatype.
|
||||
@ -290,7 +286,7 @@ mca_pml_ob1_mrecv( void *buf,
|
||||
/* Note - sequence number already assigned */
|
||||
recvreq->req_recv.req_base.req_sequence = seq;
|
||||
|
||||
proc = &ob1_comm->procs[recvreq->req_recv.req_base.req_peer];
|
||||
proc = mca_pml_ob1_peer_lookup (comm, recvreq->req_recv.req_base.req_peer);
|
||||
recvreq->req_recv.req_base.req_proc = proc->ompi_proc;
|
||||
prepare_recv_req_converter(recvreq);
|
||||
|
||||
|
@ -126,15 +126,14 @@ int mca_pml_ob1_isend(const void *buf,
|
||||
ompi_communicator_t * comm,
|
||||
ompi_request_t ** request)
|
||||
{
|
||||
mca_pml_ob1_comm_t* ob1_comm = comm->c_pml_comm;
|
||||
mca_pml_ob1_comm_proc_t *ob1_proc = mca_pml_ob1_peer_lookup (comm, dst);
|
||||
mca_pml_ob1_send_request_t *sendreq = NULL;
|
||||
ompi_proc_t *dst_proc = ompi_comm_peer_lookup (comm, dst);
|
||||
mca_bml_base_endpoint_t* endpoint = (mca_bml_base_endpoint_t*)
|
||||
dst_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_BML];
|
||||
ompi_proc_t *dst_proc = ob1_proc->ompi_proc;
|
||||
mca_bml_base_endpoint_t* endpoint = mca_bml_base_get_endpoint (dst_proc);
|
||||
int16_t seqn;
|
||||
int rc;
|
||||
|
||||
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_comm->procs[dst].send_sequence, 1);
|
||||
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
|
||||
|
||||
if (MCA_PML_BASE_SEND_SYNCHRONOUS != sendmode) {
|
||||
rc = mca_pml_ob1_send_inline (buf, count, datatype, dst, tag, seqn, dst_proc,
|
||||
@ -176,10 +175,9 @@ int mca_pml_ob1_send(const void *buf,
|
||||
mca_pml_base_send_mode_t sendmode,
|
||||
ompi_communicator_t * comm)
|
||||
{
|
||||
mca_pml_ob1_comm_t* ob1_comm = comm->c_pml_comm;
|
||||
ompi_proc_t *dst_proc = ompi_comm_peer_lookup (comm, dst);
|
||||
mca_bml_base_endpoint_t* endpoint = (mca_bml_base_endpoint_t*)
|
||||
dst_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_BML];
|
||||
mca_pml_ob1_comm_proc_t *ob1_proc = mca_pml_ob1_peer_lookup (comm, dst);
|
||||
ompi_proc_t *dst_proc = ob1_proc->ompi_proc;
|
||||
mca_bml_base_endpoint_t* endpoint = mca_bml_base_get_endpoint (dst_proc);
|
||||
mca_pml_ob1_send_request_t *sendreq = NULL;
|
||||
int16_t seqn;
|
||||
int rc;
|
||||
@ -202,7 +200,7 @@ int mca_pml_ob1_send(const void *buf,
|
||||
return OMPI_ERR_UNREACH;
|
||||
}
|
||||
|
||||
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_comm->procs[dst].send_sequence, 1);
|
||||
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
|
||||
|
||||
/**
|
||||
* The immediate send will not have a request, so they are
|
||||
|
@ -143,7 +143,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
|
||||
comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
||||
/* source sequence number */
|
||||
proc = &comm->procs[hdr->hdr_src];
|
||||
proc = mca_pml_ob1_peer_lookup (comm_ptr, hdr->hdr_src);
|
||||
|
||||
/* We generate the MSG_ARRIVED event as soon as the PML is aware
|
||||
* of a matching fragment arrival. Independing if it is received
|
||||
@ -650,7 +650,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
|
||||
|
||||
/* source sequence number */
|
||||
frag_msg_seq = hdr->hdr_seq;
|
||||
proc = &comm->procs[hdr->hdr_src];
|
||||
proc = mca_pml_ob1_peer_lookup (comm_ptr, hdr->hdr_src);
|
||||
|
||||
/**
|
||||
* We generate the MSG_ARRIVED event as soon as the PML is aware of a matching
|
||||
|
@ -100,7 +100,8 @@ static int mca_pml_ob1_recv_request_free(struct ompi_request_t** request)
|
||||
static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request, int complete)
|
||||
{
|
||||
mca_pml_ob1_recv_request_t* request = (mca_pml_ob1_recv_request_t*)ompi_request;
|
||||
mca_pml_ob1_comm_t* comm = request->req_recv.req_base.req_comm->c_pml_comm;
|
||||
ompi_communicator_t *comm = request->req_recv.req_base.req_comm;
|
||||
mca_pml_ob1_comm_t *ob1_comm = comm->c_pml_comm;
|
||||
|
||||
if( true == request->req_match_received ) { /* way to late to cancel this one */
|
||||
assert( OMPI_ANY_TAG != ompi_request->req_status.MPI_TAG ); /* not matched isn't it */
|
||||
@ -108,11 +109,11 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
|
||||
}
|
||||
|
||||
/* The rest should be protected behind the match logic lock */
|
||||
OPAL_THREAD_LOCK(&comm->matching_lock);
|
||||
OPAL_THREAD_LOCK(&ob1_comm->matching_lock);
|
||||
if( request->req_recv.req_base.req_peer == OMPI_ANY_SOURCE ) {
|
||||
opal_list_remove_item( &comm->wild_receives, (opal_list_item_t*)request );
|
||||
opal_list_remove_item( &ob1_comm->wild_receives, (opal_list_item_t*)request );
|
||||
} else {
|
||||
mca_pml_ob1_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer;
|
||||
mca_pml_ob1_comm_proc_t* proc = mca_pml_ob1_peer_lookup (comm, request->req_recv.req_base.req_peer);
|
||||
opal_list_remove_item(&proc->specific_receives, (opal_list_item_t*)request);
|
||||
}
|
||||
PERUSE_TRACE_COMM_EVENT( PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q,
|
||||
@ -122,7 +123,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
|
||||
* to true. Otherwise, the request will never be freed.
|
||||
*/
|
||||
request->req_recv.req_base.req_pml_complete = true;
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
|
||||
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
ompi_request->req_status._cancelled = true;
|
||||
@ -260,7 +261,7 @@ static int mca_pml_ob1_recv_request_ack(
|
||||
ompi_proc_t* proc = (ompi_proc_t*)recvreq->req_recv.req_base.req_proc;
|
||||
mca_bml_base_endpoint_t* bml_endpoint = NULL;
|
||||
|
||||
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_BML];
|
||||
bml_endpoint = mca_bml_base_get_endpoint (proc);
|
||||
|
||||
/* by default copy everything */
|
||||
recvreq->req_send_offset = bytes_received;
|
||||
@ -654,7 +655,7 @@ void mca_pml_ob1_recv_request_progress_rget( mca_pml_ob1_recv_request_t* recvreq
|
||||
}
|
||||
|
||||
/* lookup bml datastructures */
|
||||
bml_endpoint = (mca_bml_base_endpoint_t*)recvreq->req_recv.req_base.req_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_BML];
|
||||
bml_endpoint = mca_bml_base_get_endpoint (recvreq->req_recv.req_base.req_proc);
|
||||
rdma_bml = mca_bml_base_btl_array_find(&bml_endpoint->btl_rdma, btl);
|
||||
|
||||
#if OPAL_CUDA_SUPPORT
|
||||
@ -1079,8 +1080,11 @@ static mca_pml_ob1_recv_frag_t*
|
||||
recv_req_match_specific_proc( const mca_pml_ob1_recv_request_t *req,
|
||||
mca_pml_ob1_comm_proc_t *proc )
|
||||
{
|
||||
if (NULL == proc) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
opal_list_t* unexpected_frags = &proc->unexpected_frags;
|
||||
opal_list_item_t *i;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
int tag = req->req_recv.req_base.req_tag;
|
||||
|
||||
@ -1088,20 +1092,12 @@ recv_req_match_specific_proc( const mca_pml_ob1_recv_request_t *req,
|
||||
return NULL;
|
||||
|
||||
if( OMPI_ANY_TAG == tag ) {
|
||||
for (i = opal_list_get_first(unexpected_frags);
|
||||
i != opal_list_get_end(unexpected_frags);
|
||||
i = opal_list_get_next(i)) {
|
||||
frag = (mca_pml_ob1_recv_frag_t*)i;
|
||||
|
||||
OPAL_LIST_FOREACH(frag, unexpected_frags, mca_pml_ob1_recv_frag_t) {
|
||||
if( frag->hdr.hdr_match.hdr_tag >= 0 )
|
||||
return frag;
|
||||
}
|
||||
} else {
|
||||
for (i = opal_list_get_first(unexpected_frags);
|
||||
i != opal_list_get_end(unexpected_frags);
|
||||
i = opal_list_get_next(i)) {
|
||||
frag = (mca_pml_ob1_recv_frag_t*)i;
|
||||
|
||||
OPAL_LIST_FOREACH(frag, unexpected_frags, mca_pml_ob1_recv_frag_t) {
|
||||
if( frag->hdr.hdr_match.hdr_tag == tag )
|
||||
return frag;
|
||||
}
|
||||
@ -1118,7 +1114,7 @@ recv_req_match_wild( mca_pml_ob1_recv_request_t* req,
|
||||
mca_pml_ob1_comm_proc_t **p)
|
||||
{
|
||||
mca_pml_ob1_comm_t* comm = req->req_recv.req_base.req_comm->c_pml_comm;
|
||||
mca_pml_ob1_comm_proc_t* proc = comm->procs;
|
||||
mca_pml_ob1_comm_proc_t **procp = comm->procs;
|
||||
size_t i;
|
||||
|
||||
/*
|
||||
@ -1133,10 +1129,10 @@ recv_req_match_wild( mca_pml_ob1_recv_request_t* req,
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
|
||||
/* loop over messages from the current proc */
|
||||
if((frag = recv_req_match_specific_proc(req, &proc[i]))) {
|
||||
*p = &proc[i];
|
||||
if((frag = recv_req_match_specific_proc(req, procp[i]))) {
|
||||
*p = procp[i];
|
||||
comm->last_probed = i;
|
||||
req->req_recv.req_base.req_proc = proc[i].ompi_proc;
|
||||
req->req_recv.req_base.req_proc = procp[i]->ompi_proc;
|
||||
prepare_recv_req_converter(req);
|
||||
return frag; /* match found */
|
||||
}
|
||||
@ -1145,10 +1141,10 @@ recv_req_match_wild( mca_pml_ob1_recv_request_t* req,
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
|
||||
/* loop over messages from the current proc */
|
||||
if((frag = recv_req_match_specific_proc(req, &proc[i]))) {
|
||||
*p = &proc[i];
|
||||
if((frag = recv_req_match_specific_proc(req, procp[i]))) {
|
||||
*p = procp[i];
|
||||
comm->last_probed = i;
|
||||
req->req_recv.req_base.req_proc = proc[i].ompi_proc;
|
||||
req->req_recv.req_base.req_proc = procp[i]->ompi_proc;
|
||||
prepare_recv_req_converter(req);
|
||||
return frag; /* match found */
|
||||
}
|
||||
@ -1161,7 +1157,8 @@ recv_req_match_wild( mca_pml_ob1_recv_request_t* req,
|
||||
|
||||
void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
{
|
||||
mca_pml_ob1_comm_t* comm = req->req_recv.req_base.req_comm->c_pml_comm;
|
||||
ompi_communicator_t *comm = req->req_recv.req_base.req_comm;
|
||||
mca_pml_ob1_comm_t *ob1_comm = comm->c_pml_comm;
|
||||
mca_pml_ob1_comm_proc_t* proc;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
opal_list_t *queue;
|
||||
@ -1179,7 +1176,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
|
||||
MCA_PML_BASE_RECV_START(&req->req_recv.req_base);
|
||||
|
||||
OPAL_THREAD_LOCK(&comm->matching_lock);
|
||||
OPAL_THREAD_LOCK(&ob1_comm->matching_lock);
|
||||
/**
|
||||
* The laps of time between the ACTIVATE event and the SEARCH_UNEX one include
|
||||
* the cost of the request lock.
|
||||
@ -1188,12 +1185,12 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
&(req->req_recv.req_base), PERUSE_RECV);
|
||||
|
||||
/* assign sequence number */
|
||||
req->req_recv.req_base.req_sequence = comm->recv_sequence++;
|
||||
req->req_recv.req_base.req_sequence = ob1_comm->recv_sequence++;
|
||||
|
||||
/* attempt to match posted recv */
|
||||
if(req->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) {
|
||||
frag = recv_req_match_wild(req, &proc);
|
||||
queue = &comm->wild_receives;
|
||||
queue = &ob1_comm->wild_receives;
|
||||
#if !OPAL_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
/* As we are in a homogeneous environment we know that all remote
|
||||
* architectures are exactly the same as the local one. Therefore,
|
||||
@ -1206,7 +1203,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
}
|
||||
#endif /* !OPAL_ENABLE_HETEROGENEOUS_SUPPORT */
|
||||
} else {
|
||||
proc = &comm->procs[req->req_recv.req_base.req_peer];
|
||||
proc = mca_pml_ob1_peer_lookup (comm, req->req_recv.req_base.req_peer);
|
||||
req->req_recv.req_base.req_proc = proc->ompi_proc;
|
||||
frag = recv_req_match_specific_proc(req, proc);
|
||||
queue = &proc->specific_receives;
|
||||
@ -1221,7 +1218,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
it when the message comes in. */
|
||||
append_recv_req_to_queue(queue, req);
|
||||
req->req_match_received = false;
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
|
||||
} else {
|
||||
if(OPAL_LIKELY(!IS_PROB_REQ(req))) {
|
||||
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_MATCH_UNEX,
|
||||
@ -1239,7 +1236,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
|
||||
opal_list_remove_item(&proc->unexpected_frags,
|
||||
(opal_list_item_t*)frag);
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
|
||||
|
||||
switch(hdr->hdr_common.hdr_type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
@ -1269,14 +1266,14 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
|
||||
restarted with this request during mrecv */
|
||||
opal_list_remove_item(&proc->unexpected_frags,
|
||||
(opal_list_item_t*)frag);
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
|
||||
|
||||
req->req_recv.req_base.req_addr = frag;
|
||||
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
|
||||
frag->segments, frag->num_segments);
|
||||
|
||||
} else {
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
|
||||
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
|
||||
frag->segments, frag->num_segments);
|
||||
}
|
||||
|
@ -433,8 +433,7 @@ static inline int mca_pml_ob1_recv_request_ack_send(ompi_proc_t* proc,
|
||||
{
|
||||
size_t i;
|
||||
mca_bml_base_btl_t* bml_btl;
|
||||
mca_bml_base_endpoint_t* endpoint =
|
||||
(mca_bml_base_endpoint_t*)proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_BML];
|
||||
mca_bml_base_endpoint_t* endpoint = mca_bml_base_get_endpoint (proc);
|
||||
|
||||
for(i = 0; i < mca_bml_base_btl_array_get_size(&endpoint->btl_eager); i++) {
|
||||
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
|
||||
|
@ -480,16 +480,16 @@ mca_pml_ob1_send_request_start_seq (mca_pml_ob1_send_request_t* sendreq, mca_bml
|
||||
static inline int
|
||||
mca_pml_ob1_send_request_start( mca_pml_ob1_send_request_t* sendreq )
|
||||
{
|
||||
mca_bml_base_endpoint_t* endpoint = (mca_bml_base_endpoint_t*)
|
||||
sendreq->req_send.req_base.req_proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_BML];
|
||||
mca_pml_ob1_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm;
|
||||
mca_bml_base_endpoint_t *endpoint = mca_bml_base_get_endpoint (sendreq->req_send.req_base.req_proc);
|
||||
ompi_communicator_t *comm = sendreq->req_send.req_base.req_comm;
|
||||
mca_pml_ob1_comm_proc_t *ob1_proc = mca_pml_ob1_peer_lookup (comm, sendreq->req_send.req_base.req_peer);
|
||||
int32_t seqn;
|
||||
|
||||
if (OPAL_UNLIKELY(NULL == endpoint)) {
|
||||
return OMPI_ERR_UNREACH;
|
||||
}
|
||||
|
||||
seqn = OPAL_THREAD_ADD32(&comm->procs[sendreq->req_send.req_base.req_peer].send_sequence, 1);
|
||||
seqn = OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
|
||||
|
||||
return mca_pml_ob1_send_request_start_seq (sendreq, endpoint, seqn);
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user