diff --git a/ompi/datatype/datatype.h b/ompi/datatype/datatype.h index e7609ba5d3..d1ef0042d7 100644 --- a/ompi/datatype/datatype.h +++ b/ompi/datatype/datatype.h @@ -137,7 +137,7 @@ static inline int32_t ompi_ddt_is_committed( const ompi_datatype_t* type ) static inline int32_t ompi_ddt_is_overlapped( const ompi_datatype_t* type ) { return ((type->flags & DT_FLAG_OVERLAP) == DT_FLAG_OVERLAP); } static inline int32_t ompi_ddt_is_acceptable_for_one_sided( const ompi_datatype_t* type ) -{ return ((type->flags & DT_FLAG_ONE_SIDED) == DT_FLAG_ONE_SIDED); } +{ return true; } static inline int32_t ompi_ddt_is_valid( const ompi_datatype_t* type ) { return !((type->flags & DT_FLAG_UNAVAILABLE) == DT_FLAG_UNAVAILABLE); } static inline int32_t ompi_ddt_is_predefined( const ompi_datatype_t* type ) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.c b/ompi/mca/osc/pt2pt/osc_pt2pt.c index 409b8bff75..217d9931d8 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.c @@ -76,7 +76,6 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win) OBJ_DESTRUCT(&(module->p2p_pending_sendreqs)); - free(module->p2p_control_buffer); OBJ_DESTRUCT(&(module->p2p_pending_control_sends)); ompi_comm_free(&(module->p2p_comm)); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 6adf3fcb7a..2f85553265 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -29,6 +29,8 @@ extern "C" { #endif +#define CONTROL_MSG_TAG (-200) + struct ompi_osc_pt2pt_component_t { /** Extend the basic osc component interface */ ompi_osc_base_component_t super; @@ -75,9 +77,6 @@ struct ompi_osc_pt2pt_module_t { /** communicator created with this window */ ompi_communicator_t *p2p_comm; - /** eager message / control message receive buffer */ - void *p2p_control_buffer; - /** control message receive request */ struct ompi_request_t *p2p_cb_request; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.h b/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.h index 2d2d89ec28..4f1184fb6c 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.h @@ -32,6 +32,7 @@ typedef void (*ompi_osc_pt2pt_buffer_completion_fn_t)( struct ompi_osc_pt2pt_buffer_t { opal_free_list_item_t super; ompi_request_t *request; + ompi_status_public_t status; ompi_osc_pt2pt_buffer_completion_fn_t cbfunc; void *cbdata; void *payload; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index f08d8bca21..15f7924fb8 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -35,6 +35,7 @@ #include "ompi/datatype/dt_arch.h" static int ompi_osc_pt2pt_component_open(void); +static void ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *buffer); ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = { { /* ompi_osc_base_component_t */ @@ -226,6 +227,8 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, { ompi_osc_pt2pt_module_t *module; int ret, i; + ompi_osc_pt2pt_buffer_t *buffer; + opal_free_list_item_t *item; /* create module structure */ module = (ompi_osc_pt2pt_module_t*)malloc(sizeof(ompi_osc_pt2pt_module_t)); @@ -249,16 +252,6 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, return ret; } - module->p2p_control_buffer = malloc(mca_osc_pt2pt_component.p2p_c_eager_size); - if (NULL == module->p2p_control_buffer) { - OBJ_DESTRUCT(&module->p2p_pending_sendreqs); - ompi_comm_free(&comm); - OBJ_DESTRUCT(&(module->p2p_acc_lock)); - OBJ_DESTRUCT(&(module->p2p_lock)); - free(module); - return OMPI_ERROR; - } - module->p2p_cb_request = NULL; OBJ_CONSTRUCT(&module->p2p_pending_control_sends, opal_list_t); @@ -402,13 +395,37 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, opal_atomic_mb(); /* start up receive for protocol headers */ - ret = MCA_PML_CALL(irecv(module->p2p_control_buffer, + OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers, + item, ret); + if (NULL == item) { + free(module->p2p_sc_remote_ranks); + free(module->p2p_sc_remote_active_ranks); + free(module->p2p_fence_coll_results); + free(module->p2p_fence_coll_counts); + free(module->p2p_copy_num_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_long_msgs); + free(module->p2p_num_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_pending_sendreqs); + ompi_comm_free(&comm); + OBJ_DESTRUCT(&(module->p2p_acc_lock)); + OBJ_DESTRUCT(&(module->p2p_lock)); + free(module); + return OMPI_ERROR; + } + buffer = (ompi_osc_pt2pt_buffer_t*) item; + buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb; + buffer->cbdata = (void*) module; + + ret = MCA_PML_CALL(irecv(buffer->payload, mca_osc_pt2pt_component.p2p_c_eager_size, MPI_BYTE, MPI_ANY_SOURCE, - -200, + CONTROL_MSG_TAG, module->p2p_comm, - &module->p2p_cb_request)); + &buffer->request)); + opal_list_append(&module->p2p_pending_control_sends, + &buffer->super.super); return ret; } @@ -416,12 +433,39 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, /* dispatch for callback on message completion */ static void -ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module, - void *buffer, - size_t buffer_len) +ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffer) { int ret; - void *payload; + void *payload, *buffer; + size_t buffer_len; + ompi_osc_pt2pt_module_t *module; + ompi_osc_pt2pt_buffer_t *new_pt2pt_buffer; + opal_free_list_item_t *item; + + buffer = pt2pt_buffer->payload; + buffer_len = pt2pt_buffer->status._count; + module = pt2pt_buffer->cbdata; + + /* post a new receive message */ + + /* start up receive for protocol headers */ + OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers, + item, ret); + assert(NULL != item); + new_pt2pt_buffer = (ompi_osc_pt2pt_buffer_t*) item; + new_pt2pt_buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb; + new_pt2pt_buffer->cbdata = (void*) module; + + ret = MCA_PML_CALL(irecv(new_pt2pt_buffer->payload, + mca_osc_pt2pt_component.p2p_c_eager_size, + MPI_BYTE, + MPI_ANY_SOURCE, + CONTROL_MSG_TAG, + module->p2p_comm, + &new_pt2pt_buffer->request)); + assert(OMPI_SUCCESS == ret); + opal_list_append(&module->p2p_pending_control_sends, + &new_pt2pt_buffer->super.super); assert(buffer_len >= sizeof(ompi_osc_pt2pt_base_header_t)); @@ -452,10 +496,6 @@ ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_EXPOSE_EPOCH); - } else { - opal_output(0, "Invalid MPI_PUT on Window %s. Window not in exposure epoch", - module->p2p_win->w_name); - break; } } @@ -487,10 +527,6 @@ ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_EXPOSE_EPOCH); - } else { - opal_output(0, "Invalid MPI_ACCUMULATE on Window %s. Window not in exposure epoch", - module->p2p_win->w_name); - break; } } @@ -526,10 +562,6 @@ ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_EXPOSE_EPOCH); - } else { - opal_output(0, "Invalid MPI_GET on Window %s. Window not in exposure epoch", - module->p2p_win->w_name); - break; } } @@ -662,6 +694,10 @@ ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module, opal_output_verbose(5, ompi_osc_base_output, "received packet for Window with unknown type"); } + + item = &(pt2pt_buffer->super); + OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, + item); } @@ -692,7 +728,6 @@ int ompi_osc_pt2pt_progress(void) { int ret, done, count = 0; - ompi_status_public_t status; void *node; uint32_t key; ompi_osc_pt2pt_module_t *module; @@ -705,40 +740,20 @@ ompi_osc_pt2pt_progress(void) if (OMPI_SUCCESS != ret) return 0; do { - ret = ompi_osc_pt2pt_request_test(&module->p2p_cb_request, &done, &status); - if (OMPI_SUCCESS == ret && done) { - /* process message */ - ompi_osc_pt2pt_component_fragment_cb(module, - module->p2p_control_buffer, - status._count); - - /* repost receive */ - ret = MCA_PML_CALL(irecv(module->p2p_control_buffer, - mca_osc_pt2pt_component.p2p_c_eager_size, - MPI_BYTE, - MPI_ANY_SOURCE, - -200, - module->p2p_comm, - &module->p2p_cb_request)); - assert(OMPI_SUCCESS == ret); - count++; - } - - /* loop through sends */ + /* loop through pending requests */ for (item = opal_list_get_first(&module->p2p_pending_control_sends) ; item != opal_list_get_end(&module->p2p_pending_control_sends) ; item = opal_list_get_next(item)) { ompi_osc_pt2pt_buffer_t *buffer = (ompi_osc_pt2pt_buffer_t*) item; - ret = ompi_osc_pt2pt_request_test(&buffer->request, &done, &status); + ret = ompi_osc_pt2pt_request_test(&buffer->request, &done, &buffer->status); if (OMPI_SUCCESS == ret && done) { item = opal_list_remove_item(&module->p2p_pending_control_sends, item); buffer->cbfunc(buffer); - } + } } - } while (OMPI_SUCCESS == opal_hash_table_get_next_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules, &key, @@ -748,5 +763,3 @@ ompi_osc_pt2pt_progress(void) return count; } - - diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index 60c8a119cd..81b67eb9f1 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -811,6 +811,7 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module, /* pack header */ header = (ompi_osc_pt2pt_control_header_t*) buffer->payload; header->hdr_base.hdr_type = type; + header->hdr_base.hdr_flags = 0; header->hdr_value[0] = value0; header->hdr_value[1] = value1; header->hdr_windx = module->p2p_comm->c_contextid; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c index 4cd4e551ce..3f5f65bda6 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c @@ -326,7 +326,7 @@ ompi_osc_pt2pt_module_post(ompi_group_t *group, /* Set our mode to expose w/ post */ ompi_win_remove_mode(win, OMPI_WIN_FENCE); - ompi_win_set_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED); + ompi_win_append_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED); /* list how many complete counters we're still waiting on */ OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_complete_msgs), diff --git a/ompi/mpi/c/accumulate.c b/ompi/mpi/c/accumulate.c index ac31fc76f5..9ede8258aa 100644 --- a/ompi/mpi/c/accumulate.c +++ b/ompi/mpi/c/accumulate.c @@ -57,6 +57,8 @@ int MPI_Accumulate(void *origin_addr, int origin_count, MPI_Datatype origin_data rc = MPI_ERR_RANK; } else if (MPI_OP_NULL == op) { rc = MPI_ERR_OP; + } else if (!ompi_op_is_intrinsic(op)) { + rc = MPI_ERR_OP; } else if (!ompi_win_comm_allowed(win)) { rc = MPI_ERR_RMA_SYNC; } else if ( target_disp < 0 ) { @@ -64,7 +66,10 @@ int MPI_Accumulate(void *origin_addr, int origin_count, MPI_Datatype origin_data } else if ( (origin_count < 0) || (target_count < 0) ) { rc = MPI_ERR_COUNT; } else { - OMPI_CHECK_DATATYPE_FOR_SEND(rc, origin_datatype, origin_count); + OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, origin_datatype, origin_count); + if (OMPI_SUCCESS == rc) { + OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, target_datatype, target_count); + } } OMPI_ERRHANDLER_CHECK(rc, win, rc, FUNC_NAME); diff --git a/ompi/mpi/c/bindings.h b/ompi/mpi/c/bindings.h index a30cc3e5fb..740fb1b019 100644 --- a/ompi/mpi/c/bindings.h +++ b/ompi/mpi/c/bindings.h @@ -65,7 +65,7 @@ extern "C" { if( NULL == (DDT) || MPI_DATATYPE_NULL == (DDT) ) (RC) = MPI_ERR_TYPE; \ else if( (COUNT) < 0 ) (RC) = MPI_ERR_COUNT; \ else if( !ompi_ddt_is_committed((DDT)) ) (RC) = MPI_ERR_TYPE; \ - else if( ompi_ddt_is_overerlapped((DDT)) ) (RC) = MPI_ERR_TYPE; \ + else if( ompi_ddt_is_overlapped((DDT)) ) (RC) = MPI_ERR_TYPE; \ else if( !ompi_ddt_is_acceptable_for_one_sided((DDT)) ) (RC) = MPI_ERR_TYPE; \ else if( !ompi_ddt_is_valid((DDT)) ) (RC) = MPI_ERR_TYPE; \ } while(0) diff --git a/ompi/mpi/c/get.c b/ompi/mpi/c/get.c index b833fc02b5..38887ef577 100644 --- a/ompi/mpi/c/get.c +++ b/ompi/mpi/c/get.c @@ -60,7 +60,10 @@ int MPI_Get(void *origin_addr, int origin_count, } else if ( (origin_count < 0) || (target_count < 0) ) { rc = MPI_ERR_COUNT; } else { - OMPI_CHECK_DATATYPE_FOR_SEND(rc, origin_datatype, origin_count); + OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, origin_datatype, origin_count); + if (OMPI_SUCCESS == rc) { + OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, target_datatype, target_count); + } } OMPI_ERRHANDLER_CHECK(rc, win, rc, FUNC_NAME); } diff --git a/ompi/mpi/c/put.c b/ompi/mpi/c/put.c index e37bb63649..6db9c67ddd 100644 --- a/ompi/mpi/c/put.c +++ b/ompi/mpi/c/put.c @@ -59,7 +59,10 @@ int MPI_Put(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, } else if ( (origin_count < 0) || (target_count < 0) ) { rc = MPI_ERR_COUNT; } else { - OMPI_CHECK_DATATYPE_FOR_SEND(rc, origin_datatype, origin_count); + OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, origin_datatype, origin_count); + if (OMPI_SUCCESS == rc) { + OMPI_CHECK_DATATYPE_FOR_ONE_SIDED(rc, target_datatype, target_count); + } } OMPI_ERRHANDLER_CHECK(rc, win, rc, FUNC_NAME); } diff --git a/ompi/mpi/c/win_set_errhandler.c b/ompi/mpi/c/win_set_errhandler.c index 9b61314cae..52fbe6fc2a 100644 --- a/ompi/mpi/c/win_set_errhandler.c +++ b/ompi/mpi/c/win_set_errhandler.c @@ -37,7 +37,7 @@ int MPI_Win_set_errhandler(MPI_Win win, MPI_Errhandler errhandler) if (MPI_PARAM_CHECK) { OMPI_ERR_INIT_FINALIZE(FUNC_NAME); - if (ompi_win_invalid(win)) { + if (ompi_win_invalid(win) && win != MPI_WIN_NULL) { return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_ARG, FUNC_NAME); } else if (NULL == errhandler || MPI_ERRHANDLER_NULL == errhandler || diff --git a/ompi/mpi/c/win_test.c b/ompi/mpi/c/win_test.c index 3230f582aa..b38ed1be38 100644 --- a/ompi/mpi/c/win_test.c +++ b/ompi/mpi/c/win_test.c @@ -41,7 +41,7 @@ int MPI_Win_test(MPI_Win win, int *flag) OMPI_ERR_INIT_FINALIZE(FUNC_NAME); if (ompi_win_invalid(win)) { - return OMPI_ERRHANDLER_INVOKE(win, MPI_ERR_WIN, FUNC_NAME); + return OMPI_ERRHANDLER_INVOKE(MPI_WIN_NULL, MPI_ERR_WIN, FUNC_NAME); } else if (0 == (ompi_win_get_mode(win) & OMPI_WIN_POSTED)) { return OMPI_ERRHANDLER_INVOKE(win, MPI_ERR_RMA_SYNC, FUNC_NAME); }