Number of one-sided fixes:
* use one-sided datatype check instead of send/receive and check both the origin and target datatypes * allow error handler to be set on MPI_WIN_NULL, per standard * Allow recursive calls into the pt2pt osc component's progress function * Fix an uninitialized variable problem in the unlock header This commit was SVN r12667.
Этот коммит содержится в:
родитель
9bc25f0bec
Коммит
63e5668e29
@ -137,7 +137,7 @@ static inline int32_t ompi_ddt_is_committed( const ompi_datatype_t* type )
|
||||
static inline int32_t ompi_ddt_is_overlapped( const ompi_datatype_t* type )
|
||||
{ return ((type->flags & DT_FLAG_OVERLAP) == DT_FLAG_OVERLAP); }
|
||||
static inline int32_t ompi_ddt_is_acceptable_for_one_sided( const ompi_datatype_t* type )
|
||||
{ return ((type->flags & DT_FLAG_ONE_SIDED) == DT_FLAG_ONE_SIDED); }
|
||||
{ return true; }
|
||||
static inline int32_t ompi_ddt_is_valid( const ompi_datatype_t* type )
|
||||
{ return !((type->flags & DT_FLAG_UNAVAILABLE) == DT_FLAG_UNAVAILABLE); }
|
||||
static inline int32_t ompi_ddt_is_predefined( const ompi_datatype_t* type )
|
||||
|
@ -76,7 +76,6 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win)
|
||||
|
||||
OBJ_DESTRUCT(&(module->p2p_pending_sendreqs));
|
||||
|
||||
free(module->p2p_control_buffer);
|
||||
OBJ_DESTRUCT(&(module->p2p_pending_control_sends));
|
||||
|
||||
ompi_comm_free(&(module->p2p_comm));
|
||||
|
@ -29,6 +29,8 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define CONTROL_MSG_TAG (-200)
|
||||
|
||||
struct ompi_osc_pt2pt_component_t {
|
||||
/** Extend the basic osc component interface */
|
||||
ompi_osc_base_component_t super;
|
||||
@ -75,9 +77,6 @@ struct ompi_osc_pt2pt_module_t {
|
||||
/** communicator created with this window */
|
||||
ompi_communicator_t *p2p_comm;
|
||||
|
||||
/** eager message / control message receive buffer */
|
||||
void *p2p_control_buffer;
|
||||
|
||||
/** control message receive request */
|
||||
struct ompi_request_t *p2p_cb_request;
|
||||
|
||||
|
@ -32,6 +32,7 @@ typedef void (*ompi_osc_pt2pt_buffer_completion_fn_t)(
|
||||
struct ompi_osc_pt2pt_buffer_t {
|
||||
opal_free_list_item_t super;
|
||||
ompi_request_t *request;
|
||||
ompi_status_public_t status;
|
||||
ompi_osc_pt2pt_buffer_completion_fn_t cbfunc;
|
||||
void *cbdata;
|
||||
void *payload;
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include "ompi/datatype/dt_arch.h"
|
||||
|
||||
static int ompi_osc_pt2pt_component_open(void);
|
||||
static void ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *buffer);
|
||||
|
||||
ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
|
||||
{ /* ompi_osc_base_component_t */
|
||||
@ -226,6 +227,8 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
{
|
||||
ompi_osc_pt2pt_module_t *module;
|
||||
int ret, i;
|
||||
ompi_osc_pt2pt_buffer_t *buffer;
|
||||
opal_free_list_item_t *item;
|
||||
|
||||
/* create module structure */
|
||||
module = (ompi_osc_pt2pt_module_t*)malloc(sizeof(ompi_osc_pt2pt_module_t));
|
||||
@ -249,16 +252,6 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
return ret;
|
||||
}
|
||||
|
||||
module->p2p_control_buffer = malloc(mca_osc_pt2pt_component.p2p_c_eager_size);
|
||||
if (NULL == module->p2p_control_buffer) {
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
ompi_comm_free(&comm);
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
module->p2p_cb_request = NULL;
|
||||
|
||||
OBJ_CONSTRUCT(&module->p2p_pending_control_sends, opal_list_t);
|
||||
@ -402,13 +395,37 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
opal_atomic_mb();
|
||||
|
||||
/* start up receive for protocol headers */
|
||||
ret = MCA_PML_CALL(irecv(module->p2p_control_buffer,
|
||||
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
item, ret);
|
||||
if (NULL == item) {
|
||||
free(module->p2p_sc_remote_ranks);
|
||||
free(module->p2p_sc_remote_active_ranks);
|
||||
free(module->p2p_fence_coll_results);
|
||||
free(module->p2p_fence_coll_counts);
|
||||
free(module->p2p_copy_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_long_msgs);
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
ompi_comm_free(&comm);
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
buffer = (ompi_osc_pt2pt_buffer_t*) item;
|
||||
buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb;
|
||||
buffer->cbdata = (void*) module;
|
||||
|
||||
ret = MCA_PML_CALL(irecv(buffer->payload,
|
||||
mca_osc_pt2pt_component.p2p_c_eager_size,
|
||||
MPI_BYTE,
|
||||
MPI_ANY_SOURCE,
|
||||
-200,
|
||||
CONTROL_MSG_TAG,
|
||||
module->p2p_comm,
|
||||
&module->p2p_cb_request));
|
||||
&buffer->request));
|
||||
opal_list_append(&module->p2p_pending_control_sends,
|
||||
&buffer->super.super);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -416,12 +433,39 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
|
||||
/* dispatch for callback on message completion */
|
||||
static void
|
||||
ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module,
|
||||
void *buffer,
|
||||
size_t buffer_len)
|
||||
ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffer)
|
||||
{
|
||||
int ret;
|
||||
void *payload;
|
||||
void *payload, *buffer;
|
||||
size_t buffer_len;
|
||||
ompi_osc_pt2pt_module_t *module;
|
||||
ompi_osc_pt2pt_buffer_t *new_pt2pt_buffer;
|
||||
opal_free_list_item_t *item;
|
||||
|
||||
buffer = pt2pt_buffer->payload;
|
||||
buffer_len = pt2pt_buffer->status._count;
|
||||
module = pt2pt_buffer->cbdata;
|
||||
|
||||
/* post a new receive message */
|
||||
|
||||
/* start up receive for protocol headers */
|
||||
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
item, ret);
|
||||
assert(NULL != item);
|
||||
new_pt2pt_buffer = (ompi_osc_pt2pt_buffer_t*) item;
|
||||
new_pt2pt_buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb;
|
||||
new_pt2pt_buffer->cbdata = (void*) module;
|
||||
|
||||
ret = MCA_PML_CALL(irecv(new_pt2pt_buffer->payload,
|
||||
mca_osc_pt2pt_component.p2p_c_eager_size,
|
||||
MPI_BYTE,
|
||||
MPI_ANY_SOURCE,
|
||||
CONTROL_MSG_TAG,
|
||||
module->p2p_comm,
|
||||
&new_pt2pt_buffer->request));
|
||||
assert(OMPI_SUCCESS == ret);
|
||||
opal_list_append(&module->p2p_pending_control_sends,
|
||||
&new_pt2pt_buffer->super.super);
|
||||
|
||||
assert(buffer_len >=
|
||||
sizeof(ompi_osc_pt2pt_base_header_t));
|
||||
@ -452,10 +496,6 @@ ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module,
|
||||
OMPI_WIN_FENCE |
|
||||
OMPI_WIN_ACCESS_EPOCH |
|
||||
OMPI_WIN_EXPOSE_EPOCH);
|
||||
} else {
|
||||
opal_output(0, "Invalid MPI_PUT on Window %s. Window not in exposure epoch",
|
||||
module->p2p_win->w_name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -487,10 +527,6 @@ ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module,
|
||||
OMPI_WIN_FENCE |
|
||||
OMPI_WIN_ACCESS_EPOCH |
|
||||
OMPI_WIN_EXPOSE_EPOCH);
|
||||
} else {
|
||||
opal_output(0, "Invalid MPI_ACCUMULATE on Window %s. Window not in exposure epoch",
|
||||
module->p2p_win->w_name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -526,10 +562,6 @@ ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module,
|
||||
OMPI_WIN_FENCE |
|
||||
OMPI_WIN_ACCESS_EPOCH |
|
||||
OMPI_WIN_EXPOSE_EPOCH);
|
||||
} else {
|
||||
opal_output(0, "Invalid MPI_GET on Window %s. Window not in exposure epoch",
|
||||
module->p2p_win->w_name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -662,6 +694,10 @@ ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module,
|
||||
opal_output_verbose(5, ompi_osc_base_output,
|
||||
"received packet for Window with unknown type");
|
||||
}
|
||||
|
||||
item = &(pt2pt_buffer->super);
|
||||
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
item);
|
||||
}
|
||||
|
||||
|
||||
@ -692,7 +728,6 @@ int
|
||||
ompi_osc_pt2pt_progress(void)
|
||||
{
|
||||
int ret, done, count = 0;
|
||||
ompi_status_public_t status;
|
||||
void *node;
|
||||
uint32_t key;
|
||||
ompi_osc_pt2pt_module_t *module;
|
||||
@ -705,40 +740,20 @@ ompi_osc_pt2pt_progress(void)
|
||||
if (OMPI_SUCCESS != ret) return 0;
|
||||
|
||||
do {
|
||||
ret = ompi_osc_pt2pt_request_test(&module->p2p_cb_request, &done, &status);
|
||||
if (OMPI_SUCCESS == ret && done) {
|
||||
/* process message */
|
||||
ompi_osc_pt2pt_component_fragment_cb(module,
|
||||
module->p2p_control_buffer,
|
||||
status._count);
|
||||
|
||||
/* repost receive */
|
||||
ret = MCA_PML_CALL(irecv(module->p2p_control_buffer,
|
||||
mca_osc_pt2pt_component.p2p_c_eager_size,
|
||||
MPI_BYTE,
|
||||
MPI_ANY_SOURCE,
|
||||
-200,
|
||||
module->p2p_comm,
|
||||
&module->p2p_cb_request));
|
||||
assert(OMPI_SUCCESS == ret);
|
||||
count++;
|
||||
}
|
||||
|
||||
/* loop through sends */
|
||||
/* loop through pending requests */
|
||||
for (item = opal_list_get_first(&module->p2p_pending_control_sends) ;
|
||||
item != opal_list_get_end(&module->p2p_pending_control_sends) ;
|
||||
item = opal_list_get_next(item)) {
|
||||
ompi_osc_pt2pt_buffer_t *buffer =
|
||||
(ompi_osc_pt2pt_buffer_t*) item;
|
||||
|
||||
ret = ompi_osc_pt2pt_request_test(&buffer->request, &done, &status);
|
||||
ret = ompi_osc_pt2pt_request_test(&buffer->request, &done, &buffer->status);
|
||||
if (OMPI_SUCCESS == ret && done) {
|
||||
item = opal_list_remove_item(&module->p2p_pending_control_sends,
|
||||
item);
|
||||
buffer->cbfunc(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} while (OMPI_SUCCESS ==
|
||||
opal_hash_table_get_next_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
|
||||
&key,
|
||||
@ -748,5 +763,3 @@ ompi_osc_pt2pt_progress(void)
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
|
@ -811,6 +811,7 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
|
||||
/* pack header */
|
||||
header = (ompi_osc_pt2pt_control_header_t*) buffer->payload;
|
||||
header->hdr_base.hdr_type = type;
|
||||
header->hdr_base.hdr_flags = 0;
|
||||
header->hdr_value[0] = value0;
|
||||
header->hdr_value[1] = value1;
|
||||
header->hdr_windx = module->p2p_comm->c_contextid;
|
||||
|
@ -326,7 +326,7 @@ ompi_osc_pt2pt_module_post(ompi_group_t *group,
|
||||
|
||||
/* Set our mode to expose w/ post */
|
||||
ompi_win_remove_mode(win, OMPI_WIN_FENCE);
|
||||
ompi_win_set_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
|
||||
ompi_win_append_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
|
||||
|
||||
/* list how many complete counters we're still waiting on */
|
||||
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_complete_msgs),
|
||||
|
@ -57,6 +57,8 @@ int MPI_Accumulate(void *origin_addr, int origin_count, MPI_Datatype origin_data
|
||||
rc = MPI_ERR_RANK;
|
||||
} else if (MPI_OP_NULL == op) {
|
||||
rc = MPI_ERR_OP;
|
||||
} else if (!ompi_op_is_intrinsic(op)) {
|
||||
rc = MPI_ERR_OP;
|
||||
} else if (!ompi_win_comm_allowed(win)) {
|
||||
rc = MPI_ERR_RMA_SYNC;
|
||||
} else if ( target_disp < 0 ) {
|
||||
@ -64,7 +66,10 @@ int MPI_Accumulate(void *origin_addr, int origin_count, MPI_Datatype origin_data
|
||||
} else if ( (origin_count < 0) || (target_count < 0) ) {
|
||||
rc = MPI_ERR_COUNT;
|
||||
} else {
|
||||
OMPI_CHECK_DATATYPE_FOR_SEND(rc, origin_datatype, origin_count);
|
||||
OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, origin_datatype, origin_count);
|
||||
if (OMPI_SUCCESS == rc) {
|
||||
OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, target_datatype, target_count);
|
||||
}
|
||||
}
|
||||
OMPI_ERRHANDLER_CHECK(rc, win, rc, FUNC_NAME);
|
||||
|
||||
|
@ -65,7 +65,7 @@ extern "C" {
|
||||
if( NULL == (DDT) || MPI_DATATYPE_NULL == (DDT) ) (RC) = MPI_ERR_TYPE; \
|
||||
else if( (COUNT) < 0 ) (RC) = MPI_ERR_COUNT; \
|
||||
else if( !ompi_ddt_is_committed((DDT)) ) (RC) = MPI_ERR_TYPE; \
|
||||
else if( ompi_ddt_is_overerlapped((DDT)) ) (RC) = MPI_ERR_TYPE; \
|
||||
else if( ompi_ddt_is_overlapped((DDT)) ) (RC) = MPI_ERR_TYPE; \
|
||||
else if( !ompi_ddt_is_acceptable_for_one_sided((DDT)) ) (RC) = MPI_ERR_TYPE; \
|
||||
else if( !ompi_ddt_is_valid((DDT)) ) (RC) = MPI_ERR_TYPE; \
|
||||
} while(0)
|
||||
|
@ -60,7 +60,10 @@ int MPI_Get(void *origin_addr, int origin_count,
|
||||
} else if ( (origin_count < 0) || (target_count < 0) ) {
|
||||
rc = MPI_ERR_COUNT;
|
||||
} else {
|
||||
OMPI_CHECK_DATATYPE_FOR_SEND(rc, origin_datatype, origin_count);
|
||||
OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, origin_datatype, origin_count);
|
||||
if (OMPI_SUCCESS == rc) {
|
||||
OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, target_datatype, target_count);
|
||||
}
|
||||
}
|
||||
OMPI_ERRHANDLER_CHECK(rc, win, rc, FUNC_NAME);
|
||||
}
|
||||
|
@ -59,7 +59,10 @@ int MPI_Put(void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
|
||||
} else if ( (origin_count < 0) || (target_count < 0) ) {
|
||||
rc = MPI_ERR_COUNT;
|
||||
} else {
|
||||
OMPI_CHECK_DATATYPE_FOR_SEND(rc, origin_datatype, origin_count);
|
||||
OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, origin_datatype, origin_count);
|
||||
if (OMPI_SUCCESS == rc) {
|
||||
OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, target_datatype, target_count);
|
||||
}
|
||||
}
|
||||
OMPI_ERRHANDLER_CHECK(rc, win, rc, FUNC_NAME);
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ int MPI_Win_set_errhandler(MPI_Win win, MPI_Errhandler errhandler)
|
||||
if (MPI_PARAM_CHECK) {
|
||||
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
|
||||
|
||||
if (ompi_win_invalid(win)) {
|
||||
if (ompi_win_invalid(win) && win != MPI_WIN_NULL) {
|
||||
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_ARG, FUNC_NAME);
|
||||
} else if (NULL == errhandler ||
|
||||
MPI_ERRHANDLER_NULL == errhandler ||
|
||||
|
@ -41,7 +41,7 @@ int MPI_Win_test(MPI_Win win, int *flag)
|
||||
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
|
||||
|
||||
if (ompi_win_invalid(win)) {
|
||||
return OMPI_ERRHANDLER_INVOKE(win, MPI_ERR_WIN, FUNC_NAME);
|
||||
return OMPI_ERRHANDLER_INVOKE(MPI_WIN_NULL, MPI_ERR_WIN, FUNC_NAME);
|
||||
} else if (0 == (ompi_win_get_mode(win) & OMPI_WIN_POSTED)) {
|
||||
return OMPI_ERRHANDLER_INVOKE(win, MPI_ERR_RMA_SYNC, FUNC_NAME);
|
||||
}
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user