* properly setup communication infrastructure for libnbc.
* Prevent infinite recursion in progress loop. Should fix improper barrier eugene was seeing. This commit was SVN r26758.
Этот коммит содержится в:
родитель
e0ceabd486
Коммит
58413fa1e4
@ -22,6 +22,7 @@
|
|||||||
|
|
||||||
#include "ompi/mca/coll/coll.h"
|
#include "ompi/mca/coll/coll.h"
|
||||||
#include "ompi/request/request.h"
|
#include "ompi/request/request.h"
|
||||||
|
#include "opal/sys/atomic.h"
|
||||||
|
|
||||||
BEGIN_C_DECLS
|
BEGIN_C_DECLS
|
||||||
|
|
||||||
@ -62,6 +63,7 @@ struct ompi_coll_libnbc_component_t {
|
|||||||
ompi_free_list_t requests;
|
ompi_free_list_t requests;
|
||||||
opal_list_t active_requests;
|
opal_list_t active_requests;
|
||||||
uint32_t active_comms;
|
uint32_t active_comms;
|
||||||
|
opal_atomic_lock_t progress_lock;
|
||||||
};
|
};
|
||||||
typedef struct ompi_coll_libnbc_component_t ompi_coll_libnbc_component_t;
|
typedef struct ompi_coll_libnbc_component_t ompi_coll_libnbc_component_t;
|
||||||
|
|
||||||
@ -95,8 +97,7 @@ struct ompi_coll_libnbc_request_t {
|
|||||||
long row_offset;
|
long row_offset;
|
||||||
int tag;
|
int tag;
|
||||||
volatile int req_count;
|
volatile int req_count;
|
||||||
/*ompi_request_t **req_array;*/
|
ompi_request_t **req_array;
|
||||||
MPI_Request *req_array;
|
|
||||||
NBC_Comminfo *comminfo;
|
NBC_Comminfo *comminfo;
|
||||||
volatile NBC_Schedule *schedule;
|
volatile NBC_Schedule *schedule;
|
||||||
void *tmpbuf; /* temporary buffer e.g. used for Reduce */
|
void *tmpbuf; /* temporary buffer e.g. used for Reduce */
|
||||||
|
@ -97,6 +97,8 @@ libnbc_open(void)
|
|||||||
OBJ_CONSTRUCT(&mca_coll_libnbc_component.active_requests, opal_list_t);
|
OBJ_CONSTRUCT(&mca_coll_libnbc_component.active_requests, opal_list_t);
|
||||||
mca_coll_libnbc_component.active_comms = 0;
|
mca_coll_libnbc_component.active_comms = 0;
|
||||||
|
|
||||||
|
opal_atomic_init(&mca_coll_libnbc_component.progress_lock, OPAL_ATOMIC_UNLOCKED);
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,18 +231,27 @@ libnbc_progress(void)
|
|||||||
{
|
{
|
||||||
opal_list_item_t *item;
|
opal_list_item_t *item;
|
||||||
|
|
||||||
|
if (opal_atomic_trylock(&mca_coll_libnbc_component.progress_lock)) return 0;
|
||||||
|
|
||||||
for (item = opal_list_get_first(&mca_coll_libnbc_component.active_requests) ;
|
for (item = opal_list_get_first(&mca_coll_libnbc_component.active_requests) ;
|
||||||
item != opal_list_get_end(&mca_coll_libnbc_component.active_requests) ;
|
item != opal_list_get_end(&mca_coll_libnbc_component.active_requests) ;
|
||||||
item = opal_list_get_next(item)) {
|
item = opal_list_get_next(item)) {
|
||||||
ompi_coll_libnbc_request_t* request = (ompi_coll_libnbc_request_t*) item;
|
ompi_coll_libnbc_request_t* request = (ompi_coll_libnbc_request_t*) item;
|
||||||
if (NBC_OK == NBC_Progress(request)) {
|
if (NBC_OK == NBC_Progress(request)) {
|
||||||
/* done, remove */
|
/* done, remove and complete */
|
||||||
item = opal_list_remove_item(&mca_coll_libnbc_component.active_requests,
|
item = opal_list_remove_item(&mca_coll_libnbc_component.active_requests,
|
||||||
&request->super.super.super);
|
&request->super.super.super);
|
||||||
|
|
||||||
|
request->super.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||||
|
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||||
|
ompi_request_complete(&request->super, true);
|
||||||
|
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||||
}
|
}
|
||||||
item = opal_list_get_next(item);
|
item = opal_list_get_next(item);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opal_atomic_unlock(&mca_coll_libnbc_component.progress_lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
#include "nbc_internal.h"
|
#include "nbc_internal.h"
|
||||||
#include "ompi/mca/coll/base/coll_tags.h"
|
#include "ompi/mca/coll/base/coll_tags.h"
|
||||||
#include "ompi/op/op.h"
|
#include "ompi/op/op.h"
|
||||||
|
#include "ompi/mca/pml/pml.h"
|
||||||
|
|
||||||
/* only used in this file */
|
/* only used in this file */
|
||||||
static inline int NBC_Start_round(NBC_Handle *handle);
|
static inline int NBC_Start_round(NBC_Handle *handle);
|
||||||
@ -274,11 +275,6 @@ static inline int NBC_Free(NBC_Handle* handle) {
|
|||||||
handle->tmpbuf = NULL;
|
handle->tmpbuf = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
handle->super.req_status.MPI_ERROR = OMPI_SUCCESS;
|
|
||||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
|
||||||
ompi_request_complete(&handle->super, true);
|
|
||||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
|
||||||
|
|
||||||
return NBC_OK;
|
return NBC_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,11 +294,8 @@ int NBC_Progress(NBC_Handle *handle) {
|
|||||||
#ifdef NBC_TIMING
|
#ifdef NBC_TIMING
|
||||||
Test_time -= MPI_Wtime();
|
Test_time -= MPI_Wtime();
|
||||||
#endif
|
#endif
|
||||||
#ifdef HAVE_OMPI
|
|
||||||
res = ompi_request_test_all(handle->req_count, handle->req_array, &flag, MPI_STATUSES_IGNORE);
|
res = ompi_request_test_all(handle->req_count, handle->req_array, &flag, MPI_STATUSES_IGNORE);
|
||||||
/* res = MPI_Testall(handle->req_count, handle->req_array, &flag, MPI_STATUSES_IGNORE); */
|
|
||||||
if(res != OMPI_SUCCESS) { printf("MPI Error in MPI_Testall() (%i)\n", res); ret=res; goto error; }
|
if(res != OMPI_SUCCESS) { printf("MPI Error in MPI_Testall() (%i)\n", res); ret=res; goto error; }
|
||||||
#endif
|
|
||||||
#ifdef NBC_TIMING
|
#ifdef NBC_TIMING
|
||||||
Test_time += MPI_Wtime();
|
Test_time += MPI_Wtime();
|
||||||
#endif
|
#endif
|
||||||
@ -393,14 +386,10 @@ static inline int NBC_Start_round(NBC_Handle *handle) {
|
|||||||
#ifdef NBC_TIMING
|
#ifdef NBC_TIMING
|
||||||
Isend_time -= MPI_Wtime();
|
Isend_time -= MPI_Wtime();
|
||||||
#endif
|
#endif
|
||||||
#ifdef HAVE_OMPI
|
|
||||||
handle->req_array = (MPI_Request*)realloc((void*)handle->req_array, (handle->req_count)*sizeof(MPI_Request));
|
handle->req_array = (MPI_Request*)realloc((void*)handle->req_array, (handle->req_count)*sizeof(MPI_Request));
|
||||||
NBC_CHECK_NULL(handle->req_array);
|
NBC_CHECK_NULL(handle->req_array);
|
||||||
res = MCA_PML_CALL(isend_init(buf1, sendargs->count, sendargs->datatype, sendargs->dest, handle->tag, MCA_PML_BASE_SEND_STANDARD, handle->comm, handle->req_array+handle->req_count-1));
|
res = MCA_PML_CALL(isend(buf1, sendargs->count, sendargs->datatype, sendargs->dest, handle->tag, MCA_PML_BASE_SEND_STANDARD, handle->comm, handle->req_array+handle->req_count-1));
|
||||||
/*printf("MPI_Isend(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, sendargs->count, (unsigned long)sendargs->datatype, sendargs->dest, handle->tag, (unsigned long)handle->comm, res);*/
|
|
||||||
/* res = MPI_Isend(buf1, sendargs->count, sendargs->datatype, sendargs->dest, handle->tag, handle->comm, handle->req_array+handle->req_count-1); */
|
|
||||||
if(OMPI_SUCCESS != res) { printf("Error in MPI_Isend(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, sendargs->count, (unsigned long)sendargs->datatype, sendargs->dest, handle->tag, (unsigned long)handle->comm, res); ret=res; goto error; }
|
if(OMPI_SUCCESS != res) { printf("Error in MPI_Isend(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, sendargs->count, (unsigned long)sendargs->datatype, sendargs->dest, handle->tag, (unsigned long)handle->comm, res); ret=res; goto error; }
|
||||||
#endif
|
|
||||||
#ifdef NBC_TIMING
|
#ifdef NBC_TIMING
|
||||||
Isend_time += MPI_Wtime();
|
Isend_time += MPI_Wtime();
|
||||||
#endif
|
#endif
|
||||||
@ -421,14 +410,10 @@ static inline int NBC_Start_round(NBC_Handle *handle) {
|
|||||||
#ifdef NBC_TIMING
|
#ifdef NBC_TIMING
|
||||||
Irecv_time -= MPI_Wtime();
|
Irecv_time -= MPI_Wtime();
|
||||||
#endif
|
#endif
|
||||||
#ifdef HAVE_OMPI
|
|
||||||
handle->req_array = (MPI_Request*)realloc((void*)handle->req_array, (handle->req_count)*sizeof(MPI_Request));
|
handle->req_array = (MPI_Request*)realloc((void*)handle->req_array, (handle->req_count)*sizeof(MPI_Request));
|
||||||
NBC_CHECK_NULL(handle->req_array);
|
NBC_CHECK_NULL(handle->req_array);
|
||||||
res = MCA_PML_CALL(irecv(buf1, recvargs->count, recvargs->datatype, recvargs->source, handle->tag, handle->comm, handle->req_array+handle->req_count-1));
|
res = MCA_PML_CALL(irecv(buf1, recvargs->count, recvargs->datatype, recvargs->source, handle->tag, handle->comm, handle->req_array+handle->req_count-1));
|
||||||
/*printf("MPI_Irecv(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, recvargs->count, (unsigned long)recvargs->datatype, recvargs->source, handle->tag, (unsigned long)handle->comm, res); */
|
|
||||||
/*res = MPI_Irecv(buf1, recvargs->count, recvargs->datatype, recvargs->source, handle->tag, handle->comm, handle->req_array+handle->req_count-1); */
|
|
||||||
if(OMPI_SUCCESS != res) { printf("Error in MPI_Irecv(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, recvargs->count, (unsigned long)recvargs->datatype, recvargs->source, handle->tag, (unsigned long)handle->comm, res); ret=res; goto error; }
|
if(OMPI_SUCCESS != res) { printf("Error in MPI_Irecv(%lu, %i, %lu, %i, %i, %lu) (%i)\n", (unsigned long)buf1, recvargs->count, (unsigned long)recvargs->datatype, recvargs->source, handle->tag, (unsigned long)handle->comm, res); ret=res; goto error; }
|
||||||
#endif
|
|
||||||
#ifdef NBC_TIMING
|
#ifdef NBC_TIMING
|
||||||
Irecv_time += MPI_Wtime();
|
Irecv_time += MPI_Wtime();
|
||||||
#endif
|
#endif
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user