* Lock around tag management, so that it's thread safe
* Only register the progress function on first call to a non-blocking collective operation, to try to reduce overall performance impact * Fix tag management in roll-over case This commit was SVN r27498.
Этот коммит содержится в:
родитель
32db48d30e
Коммит
8b40c0de9b
@ -72,6 +72,8 @@ OMPI_MODULE_DECLSPEC extern ompi_coll_libnbc_component_t mca_coll_libnbc_compone
|
||||
|
||||
struct ompi_coll_libnbc_module_t {
|
||||
mca_coll_base_module_t super;
|
||||
opal_mutex_t mutex;
|
||||
bool comm_registered;
|
||||
int tag;
|
||||
#ifdef NBC_CACHE_SCHEDULE
|
||||
void *NBC_Dict[NBC_NUM_COLL]; /* this should point to a struct
|
||||
@ -129,6 +131,8 @@ typedef ompi_coll_libnbc_request_t NBC_Handle;
|
||||
(ompi_free_list_item_t*) req); \
|
||||
} while (0)
|
||||
|
||||
int ompi_coll_libnbc_progress(void);
|
||||
|
||||
int NBC_Init_comm(MPI_Comm comm, ompi_coll_libnbc_module_t *module);
|
||||
int NBC_Progress(NBC_Handle *handle);
|
||||
|
||||
|
@ -41,7 +41,7 @@ static int libnbc_register(void);
|
||||
static int libnbc_init_query(bool, bool);
|
||||
static mca_coll_base_module_t *libnbc_comm_query(struct ompi_communicator_t *, int *);
|
||||
static int libnbc_module_enable(mca_coll_base_module_t *, struct ompi_communicator_t *);
|
||||
static int libnbc_progress(void);
|
||||
|
||||
|
||||
/*
|
||||
* Instantiate the public struct with all of our public information
|
||||
@ -95,6 +95,8 @@ libnbc_open(void)
|
||||
if (OMPI_SUCCESS != ret) return ret;
|
||||
|
||||
OBJ_CONSTRUCT(&mca_coll_libnbc_component.active_requests, opal_list_t);
|
||||
/* note: active comms is the number of communicators who have had
|
||||
a non-blocking collective started */
|
||||
mca_coll_libnbc_component.active_comms = 0;
|
||||
|
||||
opal_atomic_init(&mca_coll_libnbc_component.progress_lock, OPAL_ATOMIC_UNLOCKED);
|
||||
@ -106,7 +108,7 @@ static int
|
||||
libnbc_close(void)
|
||||
{
|
||||
if (0 != mca_coll_libnbc_component.active_comms) {
|
||||
opal_progress_unregister(libnbc_progress);
|
||||
opal_progress_unregister(ompi_coll_libnbc_progress);
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&mca_coll_libnbc_component.requests);
|
||||
@ -219,15 +221,12 @@ libnbc_module_enable(mca_coll_base_module_t *module,
|
||||
struct ompi_communicator_t *comm)
|
||||
{
|
||||
/* All done */
|
||||
if (0 == mca_coll_libnbc_component.active_comms++) {
|
||||
opal_progress_register(libnbc_progress);
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
libnbc_progress(void)
|
||||
int
|
||||
ompi_coll_libnbc_progress(void)
|
||||
{
|
||||
opal_list_item_t *item;
|
||||
|
||||
@ -259,14 +258,23 @@ libnbc_progress(void)
|
||||
static void
|
||||
libnbc_module_construct(ompi_coll_libnbc_module_t *module)
|
||||
{
|
||||
OBJ_CONSTRUCT(&module->mutex, opal_mutex_t);
|
||||
module->comm_registered = false;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
libnbc_module_destruct(ompi_coll_libnbc_module_t *module)
|
||||
{
|
||||
if (0 == --mca_coll_libnbc_component.active_comms) {
|
||||
opal_progress_unregister(libnbc_progress);
|
||||
OBJ_DESTRUCT(&module->mutex);
|
||||
|
||||
/* if we ever were used for a collective op, do the progress cleanup. */
|
||||
if (true == module->comm_registered) {
|
||||
uint64_t tmp =
|
||||
OPAL_THREAD_ADD32(&mca_coll_libnbc_component.active_comms, -1);
|
||||
if (0 == tmp) {
|
||||
opal_progress_unregister(ompi_coll_libnbc_progress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -521,7 +521,8 @@ error:
|
||||
|
||||
int NBC_Init_handle(struct ompi_communicator_t *comm, ompi_coll_libnbc_request_t **request, ompi_coll_libnbc_module_t *comminfo)
|
||||
{
|
||||
int res;
|
||||
int res, tmp_tag;
|
||||
bool need_register = false;
|
||||
ompi_coll_libnbc_request_t *handle;
|
||||
|
||||
OMPI_COLL_LIBNBC_REQUEST_ALLOC(comm, handle, res);
|
||||
@ -538,18 +539,32 @@ int NBC_Init_handle(struct ompi_communicator_t *comm, ompi_coll_libnbc_request_t
|
||||
|
||||
/******************** Do the tag and shadow comm administration ... ***************/
|
||||
|
||||
/* we found it */
|
||||
comminfo->tag--;
|
||||
OPAL_THREAD_LOCK(&comminfo->mutex);
|
||||
tmp_tag = comminfo->tag--;
|
||||
if (tmp_tag == (-1 * mca_pml.pml_max_tag)) {
|
||||
tmp_tag = comminfo->tag = MCA_COLL_BASE_TAG_NONBLOCKING_BASE;
|
||||
NBC_DEBUG(2,"resetting tags ...\n");
|
||||
}
|
||||
|
||||
if (true != comminfo->comm_registered) {
|
||||
comminfo->comm_registered = true;
|
||||
need_register = true;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&comminfo->mutex);
|
||||
|
||||
handle->tag=comminfo->tag;
|
||||
|
||||
/* register progress */
|
||||
if (need_register) {
|
||||
uint32_t tmp =
|
||||
OPAL_THREAD_ADD32(&mca_coll_libnbc_component.active_comms, 1);
|
||||
if (tmp == 1) {
|
||||
opal_progress_register(ompi_coll_libnbc_progress);
|
||||
}
|
||||
}
|
||||
|
||||
handle->comm=comm;
|
||||
/*printf("got comminfo: %lu tag: %i\n", comminfo, comminfo->tag);*/
|
||||
|
||||
/* reset counter ... */
|
||||
if(handle->tag == -32767) {
|
||||
handle->tag=1;
|
||||
comminfo->tag=1;
|
||||
NBC_DEBUG(2,"resetting tags ...\n");
|
||||
}
|
||||
|
||||
/******************** end of tag and shadow comm administration ... ***************/
|
||||
handle->comminfo = comminfo;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user