Merge pull request #1987 from hjelmn/cid
comm/cid: fix threaded CID allocation
Этот коммит содержится в:
Коммит
f3e9a72f1a
@ -181,6 +181,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t
|
||||
context->newcomm = newcomm;
|
||||
context->comm = comm;
|
||||
context->bridgecomm = bridgecomm;
|
||||
context->pml_tag = 0;
|
||||
|
||||
/* Determine which implementation of allreduce we have to use
|
||||
* for the current mode. */
|
||||
@ -245,8 +246,8 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
|
||||
static int ompi_comm_checkcid (ompi_comm_request_t *request);
|
||||
/* verify that the cid was available globally */
|
||||
static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request);
|
||||
/* lock the cid generator */
|
||||
static int ompi_comm_cid_lock (ompi_comm_request_t *request);
|
||||
|
||||
static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX;
|
||||
|
||||
int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
|
||||
ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
|
||||
@ -271,7 +272,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com
|
||||
|
||||
request->context = &context->super;
|
||||
|
||||
ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0);
|
||||
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
|
||||
ompi_comm_request_start (request);
|
||||
|
||||
*req = &request->super;
|
||||
@ -299,30 +300,33 @@ int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int ompi_comm_cid_lock (ompi_comm_request_t *request)
|
||||
{
|
||||
if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
|
||||
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
|
||||
}
|
||||
|
||||
return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0);
|
||||
}
|
||||
|
||||
static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
|
||||
{
|
||||
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
|
||||
int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag);
|
||||
ompi_request_t *subreq;
|
||||
bool flag;
|
||||
int ret;
|
||||
|
||||
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
|
||||
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
|
||||
}
|
||||
|
||||
if (ompi_comm_cid_lowest_id < my_id) {
|
||||
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
|
||||
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
|
||||
}
|
||||
|
||||
ompi_comm_cid_lowest_id = my_id;
|
||||
|
||||
/**
|
||||
* This is the real algorithm described in the doc
|
||||
*/
|
||||
flag = false;
|
||||
context->nextlocal_cid = mca_pml.pml_max_contextid;
|
||||
for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
|
||||
flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators,
|
||||
i, context->comm);
|
||||
flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
|
||||
context->comm);
|
||||
if (true == flag) {
|
||||
context->nextlocal_cid = i;
|
||||
break;
|
||||
@ -332,6 +336,7 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
|
||||
ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
|
||||
context, &subreq);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
ompi_comm_cid_lowest_id = INT64_MAX;
|
||||
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
|
||||
return ret;
|
||||
}
|
||||
@ -342,9 +347,11 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
|
||||
opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
|
||||
}
|
||||
|
||||
ompi_comm_cid_lowest_id = INT64_MAX;
|
||||
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
|
||||
|
||||
/* next we want to verify that the resulting commid is ok */
|
||||
return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1);
|
||||
@ -356,6 +363,10 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)
|
||||
ompi_request_t *subreq;
|
||||
int ret;
|
||||
|
||||
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
|
||||
return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0);
|
||||
}
|
||||
|
||||
context->flag = (context->nextcid == context->nextlocal_cid);
|
||||
|
||||
if (!context->flag) {
|
||||
@ -372,6 +383,8 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)
|
||||
ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1);
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -379,12 +392,17 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
|
||||
{
|
||||
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
|
||||
|
||||
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
|
||||
return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0);
|
||||
}
|
||||
|
||||
if (1 == context->rflag) {
|
||||
/* set the according values to the newcomm */
|
||||
context->newcomm->c_contextid = context->nextcid;
|
||||
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);
|
||||
|
||||
/* unlock the cid generator */
|
||||
ompi_comm_cid_lowest_id = INT64_MAX;
|
||||
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
|
||||
|
||||
/* done! */
|
||||
@ -399,6 +417,8 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
|
||||
|
||||
++context->iter;
|
||||
|
||||
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
|
||||
|
||||
/* try again */
|
||||
return ompi_comm_allreduce_getnextcid (request);
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user