diff --git a/ompi/mca/bml/base/base.h b/ompi/mca/bml/base/base.h index 847419815c..87caa0099b 100644 --- a/ompi/mca/bml/base/base.h +++ b/ompi/mca/bml/base/base.h @@ -52,6 +52,8 @@ OMPI_DECLSPEC int mca_bml_base_open(void); OMPI_DECLSPEC int mca_bml_base_init(bool enable_progress_threads, bool enable_mpi_threads); OMPI_DECLSPEC int mca_bml_base_close(void); +OMPI_DECLSPEC bool mca_bml_base_inited(void); + /* diff --git a/ompi/mca/bml/base/bml_base_init.c b/ompi/mca/bml/base/bml_base_init.c index d199dc2823..3e33058001 100644 --- a/ompi/mca/bml/base/bml_base_init.c +++ b/ompi/mca/bml/base/bml_base_init.c @@ -37,6 +37,13 @@ OMPI_DECLSPEC mca_bml_base_module_t mca_bml = { }; mca_bml_base_component_t mca_bml_component; +static bool init_called = false; + +bool +mca_bml_base_inited(void) +{ + return init_called; +} int mca_bml_base_init( bool enable_progress_threads, bool enable_mpi_threads) { @@ -46,6 +53,8 @@ int mca_bml_base_init( bool enable_progress_threads, int priority = 0, best_priority = -1; mca_base_component_list_item_t *cli = NULL; + init_called = true; + for (item = opal_list_get_first(&mca_bml_base_components_available); opal_list_get_end(&mca_bml_base_components_available) != item; item = opal_list_get_next(item)) { diff --git a/ompi/mca/btl/btl.h b/ompi/mca/btl/btl.h index 0ec7d346e0..0173bec097 100644 --- a/ompi/mca/btl/btl.h +++ b/ompi/mca/btl/btl.h @@ -131,7 +131,7 @@ typedef uint8_t mca_btl_base_tag_t; /* reserved tag values */ #define MCA_BTL_TAG_BTL 0 #define MCA_BTL_TAG_PML 1 -#define MCA_BTL_TAG_OSC_PT2PT 2 +#define MCA_BTL_TAG_OSC_RDMA 2 #define MCA_BTL_TAG_USR 3 #define MCA_BTL_TAG_MAX 255 /* 1 + highest allowed tag num */ diff --git a/ompi/mca/osc/pt2pt/Makefile.am b/ompi/mca/osc/pt2pt/Makefile.am index fd35553ee5..ba12a7897b 100644 --- a/ompi/mca/osc/pt2pt/Makefile.am +++ b/ompi/mca/osc/pt2pt/Makefile.am @@ -21,6 +21,8 @@ include $(top_ompi_srcdir)/config/Makefile.options pt2pt_sources = \ osc_pt2pt.h \ osc_pt2pt.c \ + osc_pt2pt_buffer.h \ + osc_pt2pt_buffer.c \ osc_pt2pt_comm.c \ osc_pt2pt_component.c \ osc_pt2pt_data_move.h \ diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.c b/ompi/mca/osc/pt2pt/osc_pt2pt.c index c8a97e11af..41637fb568 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.c @@ -22,7 +22,6 @@ #include "opal/threads/mutex.h" #include "ompi/win/win.h" #include "ompi/communicator/communicator.h" -#include "ompi/mca/btl/btl.h" #include "mpi.h" @@ -47,6 +46,12 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win) module->p2p_comm->c_contextid); /* only take the output of hast_table_remove if there wasn't already an error */ ret = (ret != OMPI_SUCCESS) ? ret : tmp; + + if (0 == opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules)) { + /* stop progress thread */ + opal_progress_unregister(ompi_osc_pt2pt_progress); + } + OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock); OBJ_DESTRUCT(&(module->p2p_locks_pending)); @@ -64,6 +69,9 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win) OBJ_DESTRUCT(&(module->p2p_pending_sendreqs)); + free(module->p2p_control_buffer); + OBJ_DESTRUCT(&(module->p2p_pending_control_sends)); + ompi_comm_free(&(module->p2p_comm)); module->p2p_comm = NULL; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index c61fd8c90d..efac7b2351 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -22,7 +22,6 @@ #include "opal/class/opal_hash_table.h" #include "ompi/mca/osc/osc.h" -#include "ompi/mca/btl/btl.h" #include "ompi/win/win.h" #include "ompi/communicator/communicator.h" @@ -33,13 +32,16 @@ struct ompi_osc_pt2pt_component_t { /** store the state of progress threads for this instance of OMPI */ bool p2p_c_have_progress_threads; - /** lock access to datastructures in the component structure */ - opal_mutex_t p2p_c_lock; + /** lock access to datastructures in the component structure */ + opal_mutex_t p2p_c_lock; - /** List of ompi_osc_pt2pt_module_ts currently in existance. - Needed so that received fragments can be dispatched to the - correct module */ - opal_hash_table_t p2p_c_modules; + /** List of ompi_osc_pt2pt_module_ts currently in existance. + Needed so that received fragments can be dispatched to the + correct module */ + opal_hash_table_t p2p_c_modules; + + /** max size of eager message */ + size_t p2p_c_eager_size; /** free list of ompi_osc_pt2pt_sendreq_t structures */ opal_free_list_t p2p_c_sendreqs; @@ -47,13 +49,15 @@ struct ompi_osc_pt2pt_component_t { opal_free_list_t p2p_c_replyreqs; /** free list of ompi_osc_pt2pt_longreq_t structures */ opal_free_list_t p2p_c_longreqs; + /** free list for eager / control meessages */ + opal_free_list_t p2p_c_buffers; }; typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t; struct ompi_osc_pt2pt_module_t { /** Extend the basic osc module interface */ - ompi_osc_base_module_t super; + ompi_osc_base_module_t super; /** lock access to data structures in the current module */ opal_mutex_t p2p_lock; @@ -67,6 +71,14 @@ struct ompi_osc_pt2pt_module_t { /** communicator created with this window */ ompi_communicator_t *p2p_comm; + /** eager message / control message receive buffer */ + void *p2p_control_buffer; + + /** control message receive request */ + struct ompi_request_t *p2p_cb_request; + + opal_list_t p2p_pending_control_sends; + /** list of ompi_osc_pt2pt_sendreq_t structures, and includes all requests for this access epoch that have not already been started. p2p_lock must be held when modifying this field. */ @@ -93,7 +105,7 @@ struct ompi_osc_pt2pt_module_t { atomic counter operations. */ volatile int32_t p2p_num_pending_in; - /** cyclic counter for a unique tage for long messages. Not + /** cyclic counter for a unique tag for long messages. Not protected by the p2p_lock - must use create_send_tag() to create a send tag */ volatile int32_t p2p_tag_counter; @@ -105,8 +117,6 @@ struct ompi_osc_pt2pt_module_t { opal_list_t p2p_copy_pending_sendreqs; short *p2p_copy_num_pending_sendreqs; - bool p2p_eager_send; - /* ********************* FENCE data ************************ */ /* an array of ints, each containing the value 1. */ @@ -115,8 +125,6 @@ struct ompi_osc_pt2pt_module_t { with different synchronization costs */ short *p2p_fence_coll_results; - enum { OSC_SYNC_REDUCE_SCATTER, OSC_SYNC_ALLREDUCE, OSC_SYNC_ALLTOALL } p2p_fence_sync_type; - /* ********************* PWSC data ************************ */ struct ompi_group_t *p2p_pw_group; @@ -184,6 +192,7 @@ int ompi_osc_pt2pt_component_select(struct ompi_win_t *win, struct ompi_info_t *info, struct ompi_communicator_t *comm); +int ompi_osc_pt2pt_progress(void); /* * Module interface function types @@ -253,6 +262,7 @@ int ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module, int32_t origin, int32_t count); + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.c b/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.c new file mode 100644 index 0000000000..b744d43131 --- /dev/null +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.c @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "osc_pt2pt.h" +#include "osc_pt2pt_buffer.h" + +#include "opal/class/opal_free_list.h" + +static void ompi_osc_pt2pt_buffer_construct(ompi_osc_pt2pt_buffer_t *buf) +{ + buf->payload = buf + 1; +} + + +static void ompi_osc_pt2pt_buffer_destruct(ompi_osc_pt2pt_buffer_t *buf) +{ + buf->payload = NULL; +} + + +OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_buffer_t, opal_free_list_item_t, + ompi_osc_pt2pt_buffer_construct, + ompi_osc_pt2pt_buffer_destruct); + diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.h b/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.h new file mode 100644 index 0000000000..a9f605eb16 --- /dev/null +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_buffer.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_OSC_PT2PT_BUFFER_H +#define OMPI_OSC_PT2PT__BUFFERH + +#include "opal/class/opal_free_list.h" +#include "ompi/request/request.h" + +struct ompi_osc_pt2pt_buffer_t; + +typedef void (*ompi_osc_pt2pt_buffer_completion_fn_t)( + struct ompi_osc_pt2pt_buffer_t *buffer); + +struct ompi_osc_pt2pt_buffer_t { + opal_free_list_item_t super; + ompi_request_t *request; + ompi_osc_pt2pt_buffer_completion_fn_t cbfunc; + void *cbdata; + void *payload; + size_t len; +}; +typedef struct ompi_osc_pt2pt_buffer_t ompi_osc_pt2pt_buffer_t; +OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_buffer_t); + +#endif diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index dc5916457b..c2d26c559b 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -127,26 +127,8 @@ ompi_osc_pt2pt_module_get(void *origin_addr, &sendreq); if (OMPI_SUCCESS != ret) return ret; - /* if we're doing fence synchronization, try to actively send - right now */ - if (P2P_MODULE(win)->p2p_eager_send && - (OMPI_WIN_FENCE & ompi_win_get_mode(win))) { - OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1); - - ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), sendreq); - - if (OMPI_SUCCESS == ret) { - OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); - P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++; - OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); - } else { - OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); - ret = enqueue_sendreq(P2P_MODULE(win), sendreq); - } - } else { - /* enqueue sendreq */ - ret = enqueue_sendreq(P2P_MODULE(win), sendreq); - } + /* enqueue sendreq */ + ret = enqueue_sendreq(P2P_MODULE(win), sendreq); return ret; } @@ -185,26 +167,8 @@ ompi_osc_pt2pt_module_put(void *origin_addr, int origin_count, &sendreq); if (OMPI_SUCCESS != ret) return ret; - /* if we're doing fence synchronization, try to actively send - right now */ - if (P2P_MODULE(win)->p2p_eager_send && - (OMPI_WIN_FENCE & ompi_win_get_mode(win))) { - OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1); - - ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), sendreq); - - if (OMPI_SUCCESS == ret) { - OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); - P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++; - OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); - } else { - OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); - ret = enqueue_sendreq(P2P_MODULE(win), sendreq); - } - } else { - /* enqueue sendreq */ - ret = enqueue_sendreq(P2P_MODULE(win), sendreq); - } + /* enqueue sendreq */ + ret = enqueue_sendreq(P2P_MODULE(win), sendreq); return ret; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index 7c67fd3d9c..20b2035efd 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -24,14 +24,13 @@ #include "osc_pt2pt_header.h" #include "osc_pt2pt_obj_convert.h" #include "osc_pt2pt_data_move.h" +#include "osc_pt2pt_buffer.h" #include "opal/threads/mutex.h" #include "ompi/info/info.h" #include "ompi/communicator/communicator.h" #include "ompi/mca/osc/osc.h" -#include "ompi/mca/btl/btl.h" -#include "ompi/mca/bml/bml.h" -#include "ompi/mca/bml/base/base.h" +#include "ompi/mca/pml/pml.h" #include "ompi/datatype/dt_arch.h" static int ompi_osc_pt2pt_component_open(void); @@ -41,9 +40,9 @@ ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = { { /* ompi_base_component_t */ OMPI_OSC_BASE_VERSION_1_0_0, "pt2pt", - 1, - 0, - 0, + OMPI_MAJOR_VERSION, /* MCA component major version */ + OMPI_MINOR_VERSION, /* MCA component minor version */ + OMPI_RELEASE_VERSION, /* MCA component release version */ ompi_osc_pt2pt_component_open, NULL }, @@ -79,12 +78,6 @@ ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_template = { } }; - -void ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, - mca_btl_base_tag_t tag, - mca_btl_base_descriptor_t *descriptor, - void *cbdata); - /* look up parameters for configuring this window. The code first looks in the info structure passed by the user, then through mca parameters. */ @@ -127,30 +120,23 @@ check_config_value_bool(char *key, ompi_info_t *info) } -static int fence_sync_index; - - static int ompi_osc_pt2pt_component_open(void) { - fence_sync_index = - mca_base_param_reg_string(&mca_osc_pt2pt_component.super.osc_version, - "fence_sync_method", - "How to synchronize fence: reduce_scatter, allreduce, alltoall", - false, false, "reduce_scatter", NULL); + int tmp; - mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version, - "eager_send", - "Attempt to start data movement during communication call, " - "instead of at synchrnoization time. " - "Info key of same name overrides this value, " - "if info key given.", - false, false, 0, NULL); + mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version, + "no_locks", + "Enable optimizations available only if MPI_LOCK is not used.", + false, false, 0, NULL); - mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version, - "no_locks", - "Enable optimizations available only if MPI_LOCK is not used.", - false, false, 0, NULL); + mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version, + "eager_limit", + "Max size of eagerly sent data", + false, false, 16 * 1024, + &tmp); + + mca_osc_pt2pt_component.p2p_c_eager_size = tmp; return OMPI_SUCCESS; } @@ -189,6 +175,13 @@ ompi_osc_pt2pt_component_init(bool enable_progress_threads, OBJ_CLASS(ompi_osc_pt2pt_longreq_t), 1, -1, 1); + OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_buffers, opal_free_list_t); + opal_free_list_init(&mca_osc_pt2pt_component.p2p_c_buffers, + mca_osc_pt2pt_component.p2p_c_eager_size + + sizeof(ompi_osc_pt2pt_buffer_t), + OBJ_CLASS(ompi_osc_pt2pt_buffer_t), + 1, -1, 1); + return OMPI_SUCCESS; } @@ -198,16 +191,13 @@ ompi_osc_pt2pt_component_finalize(void) { size_t num_modules; - if (0 != + if (0 != (num_modules = opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules))) { - opal_output(0, "WARNING: There were %d Windows created but not freed.", - num_modules); + opal_output(0, "WARNING: There were %d Windows created but not freed.",- num_modules); + opal_progress_unregister(ompi_osc_pt2pt_progress); } -#if 0 - mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT, NULL, NULL); -#endif - + OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_buffers); OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_longreqs); OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_replyreqs); OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_sendreqs); @@ -224,7 +214,7 @@ ompi_osc_pt2pt_component_query(ompi_win_t *win, ompi_communicator_t *comm) { /* we can always run - return a low priority */ - return 10; + return 5; } @@ -235,7 +225,6 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, { ompi_osc_pt2pt_module_t *module; int ret, i; - char *sync_string; /* create module structure */ module = malloc(sizeof(ompi_osc_pt2pt_module_t)); @@ -259,6 +248,20 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, return ret; } + module->p2p_control_buffer = malloc(mca_osc_pt2pt_component.p2p_c_eager_size); + if (NULL == module->p2p_control_buffer) { + OBJ_DESTRUCT(&module->p2p_pending_sendreqs); + ompi_comm_free(&comm); + OBJ_DESTRUCT(&(module->p2p_acc_lock)); + OBJ_DESTRUCT(&(module->p2p_lock)); + free(module); + return OMPI_ERROR; + } + + module->p2p_cb_request = NULL; + + OBJ_CONSTRUCT(&module->p2p_pending_control_sends, opal_list_t); + OBJ_CONSTRUCT(&module->p2p_pending_sendreqs, opal_list_t); module->p2p_num_pending_sendreqs = malloc(sizeof(short) * ompi_comm_size(module->p2p_comm)); @@ -296,8 +299,6 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, memset(module->p2p_num_pending_sendreqs, 0, sizeof(short) * ompi_comm_size(module->p2p_comm)); - module->p2p_eager_send = check_config_value_bool("eager_send", info); - /* fence data */ module->p2p_fence_coll_counts = malloc(sizeof(int) * ompi_comm_size(module->p2p_comm)); @@ -333,19 +334,6 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, return ret; } - /* figure out what sync method to use */ - mca_base_param_lookup_string(fence_sync_index, &sync_string); - if (0 == strcmp(sync_string, "reduce_scatter")) { - module->p2p_fence_sync_type = OSC_SYNC_REDUCE_SCATTER; - } else if (0 == strcmp(sync_string, "allreduce")) { - module->p2p_fence_sync_type = OSC_SYNC_ALLREDUCE; - } else if (0 == strcmp(sync_string, "alltoall")) { - module->p2p_fence_sync_type = OSC_SYNC_ALLTOALL; - } else { - opal_output(0, "invalid value for fence_sync_method parameter: %s\n", sync_string); - return OMPI_ERROR; - } - /* pwsc data */ module->p2p_pw_group = NULL; module->p2p_sc_group = NULL; @@ -361,6 +349,11 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, opal_hash_table_set_value_uint32(&mca_osc_pt2pt_component.p2p_c_modules, module->p2p_comm->c_contextid, module); + + if (1 == opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules)) { + /* start progress thread */ + opal_progress_register(ompi_osc_pt2pt_progress); + } OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock); /* fill in window information */ @@ -372,47 +365,40 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win, /* sync memory - make sure all initialization completed */ opal_atomic_mb(); - /* register to receive fragment callbacks */ - ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT, - ompi_osc_pt2pt_component_fragment_cb, - NULL); - - - if (module->p2p_eager_send) { - /* need to barrier if eager sending or we can receive before the - other side has been fully setup, causing much gnashing of - teeth. */ - module->p2p_comm->c_coll.coll_barrier(module->p2p_comm); - } + /* start up receive for protocol headers */ + ret = MCA_PML_CALL(irecv(module->p2p_control_buffer, + mca_osc_pt2pt_component.p2p_c_eager_size, + MPI_BYTE, + MPI_ANY_SOURCE, + -200, + module->p2p_comm, + &module->p2p_cb_request)); return ret; } - /* dispatch for callback on message completion */ -void -ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, - mca_btl_base_tag_t tag, - mca_btl_base_descriptor_t *descriptor, - void *cbdata) +static void +ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module, + void *buffer, + size_t buffer_len) { int ret; - ompi_osc_pt2pt_module_t *module; void *payload; - assert(descriptor->des_dst[0].seg_len >= + assert(buffer_len >= sizeof(ompi_osc_pt2pt_base_header_t)); /* handle message */ - switch (((ompi_osc_pt2pt_base_header_t*) descriptor->des_dst[0].seg_addr.pval)->hdr_type) { + switch (((ompi_osc_pt2pt_base_header_t*) buffer)->hdr_type) { case OMPI_OSC_PT2PT_HDR_PUT: { ompi_osc_pt2pt_send_header_t *header; /* get our header and payload */ header = (ompi_osc_pt2pt_send_header_t*) - descriptor->des_dst[0].seg_addr.pval; + buffer; payload = (void*) (header + 1); #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT @@ -421,9 +407,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, } #endif - /* get our module pointer */ - module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx); - if (NULL == module) return; + assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); ret = ompi_osc_pt2pt_sendreq_recv_put(module, header, payload); } @@ -435,7 +419,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, /* get our header and payload */ header = (ompi_osc_pt2pt_send_header_t*) - descriptor->des_dst[0].seg_addr.pval; + buffer; payload = (void*) (header + 1); #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT @@ -444,9 +428,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, } #endif - /* get our module pointer */ - module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx); - if (NULL == module) return; + assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); /* receive into temporary buffer */ ret = ompi_osc_pt2pt_sendreq_recv_accum(module, header, payload); @@ -462,7 +444,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, /* get our header and payload */ header = (ompi_osc_pt2pt_send_header_t*) - descriptor->des_dst[0].seg_addr.pval; + buffer; payload = (void*) (header + 1); #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT @@ -471,9 +453,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, } #endif - /* get our module pointer */ - module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx); - if (NULL == module) return; + assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); /* create or get a pointer to our datatype */ proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi; @@ -503,7 +483,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, /* get our header and payload */ header = (ompi_osc_pt2pt_reply_header_t*) - descriptor->des_dst[0].seg_addr.pval; + buffer; payload = (void*) (header + 1); #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT @@ -524,7 +504,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, { ompi_osc_pt2pt_control_header_t *header = (ompi_osc_pt2pt_control_header_t*) - descriptor->des_dst[0].seg_addr.pval; + buffer; #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { @@ -532,9 +512,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, } #endif - /* get our module pointer */ - module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx); - if (NULL == module) return; + assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1); } @@ -543,7 +521,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, { ompi_osc_pt2pt_control_header_t *header = (ompi_osc_pt2pt_control_header_t*) - descriptor->des_dst[0].seg_addr.pval; + buffer; #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { @@ -551,9 +529,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, } #endif - /* get our module pointer */ - module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx); - if (NULL == module) return; + assert(module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); /* we've heard from one more place, and have value reqs to process */ @@ -566,7 +542,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, { ompi_osc_pt2pt_control_header_t *header = (ompi_osc_pt2pt_control_header_t*) - descriptor->des_dst[0].seg_addr.pval; + buffer; #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { @@ -574,9 +550,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, } #endif - /* get our module pointer */ - module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx); - if (NULL == module) return; + assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); if (header->hdr_value[1] > 0) { ompi_osc_pt2pt_passive_lock(module, header->hdr_value[0], @@ -591,7 +565,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, { ompi_osc_pt2pt_control_header_t *header = (ompi_osc_pt2pt_control_header_t*) - descriptor->des_dst[0].seg_addr.pval; + buffer; #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { @@ -599,9 +573,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, } #endif - /* get our module pointer */ - module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx); - if (NULL == module) return; + assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx)); ompi_osc_pt2pt_passive_unlock(module, header->hdr_value[0], header->hdr_value[1]); @@ -614,3 +586,90 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl, abort(); } } + + + +static int +ompi_osc_pt2pt_request_test(ompi_request_t ** rptr, + int *completed, + ompi_status_public_t * status ) +{ + ompi_request_t *request = *rptr; + int ret = OMPI_SUCCESS; + +#if OMPI_ENABLE_PROGRESS_THREADS == 0 + if (request->req_state == OMPI_REQUEST_INACTIVE || + request->req_complete) { + ret = ompi_request_test(rptr, completed, status); + } else { + *completed = 0; + } +#else + ret = ompi_request_test(rptr, completed, status); +#endif + + return ret; +} + +int +ompi_osc_pt2pt_progress(void) +{ + int ret, done, count = 0; + ompi_status_public_t status; + void *node; + uint32_t key; + ompi_osc_pt2pt_module_t *module; + opal_list_item_t *item; + + ret = opal_hash_table_get_first_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules, + &key, + (void**) &module, + &node); + if (OMPI_SUCCESS != ret) return 0; + + do { + ret = ompi_osc_pt2pt_request_test(&module->p2p_cb_request, &done, &status); + if (OMPI_SUCCESS == ret && done) { + /* process message */ + ompi_osc_pt2pt_component_fragment_cb(module, + module->p2p_control_buffer, + status._count); + + /* repost receive */ + ret = MCA_PML_CALL(irecv(module->p2p_control_buffer, + mca_osc_pt2pt_component.p2p_c_eager_size, + MPI_BYTE, + MPI_ANY_SOURCE, + -200, + module->p2p_comm, + &module->p2p_cb_request)); + assert(OMPI_SUCCESS == ret); + count++; + } + + /* loop through sends */ + for (item = opal_list_get_first(&module->p2p_pending_control_sends) ; + item != opal_list_get_end(&module->p2p_pending_control_sends) ; + item = opal_list_get_next(item)) { + ompi_osc_pt2pt_buffer_t *buffer = + (ompi_osc_pt2pt_buffer_t*) item; + + ret = ompi_osc_pt2pt_request_test(&buffer->request, &done, &status); + if (OMPI_SUCCESS == ret && done) { + item = opal_list_remove_item(&module->p2p_pending_control_sends, + item); + buffer->cbfunc(buffer); + } + } + + } while (OMPI_SUCCESS == + opal_hash_table_get_next_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules, + &key, + (void**) &module, + node, + &node)); + + return count; +} + + diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index ea90533222..c616e84580 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -21,15 +21,15 @@ #include "osc_pt2pt_header.h" #include "osc_pt2pt_data_move.h" #include "osc_pt2pt_obj_convert.h" +#include "osc_pt2pt_buffer.h" #include "opal/util/output.h" #include "opal/sys/atomic.h" -#include "ompi/mca/bml/bml.h" -#include "ompi/mca/bml/base/base.h" -#include "ompi/mca/btl/btl.h" +#include "ompi/mca/pml/pml.h" #include "ompi/datatype/datatype.h" #include "ompi/datatype/dt_arch.h" + static inline int32_t create_send_tag(ompi_osc_pt2pt_module_t *module) { @@ -81,22 +81,12 @@ ompi_osc_pt2pt_sendreq_send_long_cb(ompi_osc_pt2pt_longreq_t *longreq) static void -ompi_osc_pt2pt_sendreq_send_cb(struct mca_btl_base_module_t* btl, - struct mca_btl_base_endpoint_t *endpoint, - struct mca_btl_base_descriptor_t* descriptor, - int status) +ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_buffer_t *buffer) { ompi_osc_pt2pt_sendreq_t *sendreq = - (ompi_osc_pt2pt_sendreq_t*) descriptor->des_cbdata; + (ompi_osc_pt2pt_sendreq_t*) buffer->cbdata; ompi_osc_pt2pt_send_header_t *header = - (ompi_osc_pt2pt_send_header_t*) descriptor->des_src[0].seg_addr.pval; - - if (OMPI_SUCCESS != status) { - /* requeue and return */ - /* BWB - FIX ME - figure out where to put this bad boy */ - abort(); - return; - } + (ompi_osc_pt2pt_send_header_t*) buffer->payload; /* have to look at header, and not the sendreq because in the case of get, it's possible that the sendreq has been freed already @@ -144,11 +134,9 @@ ompi_osc_pt2pt_sendreq_send_cb(struct mca_btl_base_module_t* btl, } } - /* release the descriptor and sendreq */ - btl->btl_free(btl, descriptor); - - /* any other sendreqs to restart? */ - /* BWB - FIX ME - implement sending the next sendreq here */ + /* release the buffer */ + OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, + &buffer->super); } @@ -159,10 +147,9 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_sendreq_t *sendreq) { int ret = OMPI_SUCCESS; - mca_bml_base_endpoint_t *endpoint = NULL; - mca_bml_base_btl_t *bml_btl = NULL; - mca_btl_base_descriptor_t *descriptor = NULL; + opal_free_list_item_t *item; ompi_osc_pt2pt_send_header_t *header = NULL; + ompi_osc_pt2pt_buffer_t *buffer = NULL; size_t written_data = 0; size_t needed_len = sizeof(ompi_osc_pt2pt_send_header_t); const void *packed_ddt; @@ -174,30 +161,27 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module, needed_len += sendreq->req_origin_bytes_packed; } - /* Get a BTL so we have the eager limit */ - endpoint = (mca_bml_base_endpoint_t*) sendreq->req_target_proc->proc_bml; - bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); - descriptor = bml_btl->btl_alloc(bml_btl->btl, - needed_len < bml_btl->btl_eager_limit ? needed_len : - bml_btl->btl_eager_limit); - if (NULL == descriptor) { + /* Get a buffer */ + OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers, + item, ret); + if (NULL == item) { ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; goto cleanup; } + buffer = (ompi_osc_pt2pt_buffer_t*) item; /* verify at least enough space for header */ - if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_pt2pt_send_header_t)) { + if (mca_osc_pt2pt_component.p2p_c_eager_size < sizeof(ompi_osc_pt2pt_send_header_t)) { ret = OMPI_ERR_OUT_OF_RESOURCE; goto cleanup; } - /* setup descriptor */ - descriptor->des_cbfunc = ompi_osc_pt2pt_sendreq_send_cb; - descriptor->des_cbdata = (void*) sendreq; - descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY; + /* setup buffer */ + buffer->cbfunc = ompi_osc_pt2pt_sendreq_send_cb; + buffer->cbdata = (void*) sendreq; /* pack header */ - header = (ompi_osc_pt2pt_send_header_t*) descriptor->des_src[0].seg_addr.pval; + header = (ompi_osc_pt2pt_send_header_t*) buffer->payload; written_data += sizeof(ompi_osc_pt2pt_send_header_t); header->hdr_base.hdr_flags = 0; header->hdr_windx = sendreq->req_module->p2p_comm->c_contextid; @@ -231,13 +215,13 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module, /* Set datatype id and / or pack datatype */ ret = ompi_ddt_get_pack_description(sendreq->req_target_datatype, &packed_ddt); if (OMPI_SUCCESS != ret) goto cleanup; - memcpy((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data, + memcpy((unsigned char*) buffer->payload + written_data, packed_ddt, packed_ddt_len); written_data += packed_ddt_len; if (OMPI_OSC_PT2PT_GET != sendreq->req_type) { /* if sending data and it fits, pack payload */ - if (descriptor->des_src[0].seg_len >= + if (mca_osc_pt2pt_component.p2p_c_eager_size >= written_data + sendreq->req_origin_bytes_packed) { struct iovec iov; uint32_t iov_count = 1; @@ -245,7 +229,7 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module, size_t max_data = sendreq->req_origin_bytes_packed; iov.iov_len = max_data; - iov.iov_base = (void*) ((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data); + iov.iov_base = (void*) ((unsigned char*) buffer->payload + written_data); ret = ompi_convertor_pack(&sendreq->req_origin_convertor, &iov, &iov_count, &max_data, &free_after); @@ -256,7 +240,6 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module, assert(max_data == sendreq->req_origin_bytes_packed); written_data += max_data; - descriptor->des_src[0].seg_len = written_data; header->hdr_msg_length = sendreq->req_origin_bytes_packed; } else { @@ -264,10 +247,11 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module, header->hdr_origin_tag = create_send_tag(module); } } else { - descriptor->des_src[0].seg_len = written_data; header->hdr_msg_length = 0; } + buffer->len = written_data; + #ifdef WORDS_BIGENDIAN header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; #elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT @@ -281,13 +265,23 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module, opal_output(-1, "%d sending sendreq to %d", sendreq->req_module->p2p_comm->c_my_rank, sendreq->req_target_rank); - - ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT); + + ret = MCA_PML_CALL(isend(buffer->payload, + buffer->len, + MPI_BYTE, + sendreq->req_target_rank, + -200, + MCA_PML_BASE_SEND_STANDARD, + module->p2p_comm, + &buffer->request)); + opal_list_append(&module->p2p_pending_control_sends, + &buffer->super.super); goto done; cleanup: - if (descriptor != NULL) { - mca_bml_base_free(bml_btl, descriptor); + if (item != NULL) { + OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, + item); } done: @@ -317,22 +311,12 @@ ompi_osc_pt2pt_replyreq_send_long_cb(ompi_osc_pt2pt_longreq_t *longreq) static void -ompi_osc_pt2pt_replyreq_send_cb(struct mca_btl_base_module_t* btl, - struct mca_btl_base_endpoint_t *endpoint, - struct mca_btl_base_descriptor_t* descriptor, - int status) +ompi_osc_pt2pt_replyreq_send_cb(ompi_osc_pt2pt_buffer_t *buffer) { ompi_osc_pt2pt_replyreq_t *replyreq = - (ompi_osc_pt2pt_replyreq_t*) descriptor->des_cbdata; + (ompi_osc_pt2pt_replyreq_t*) buffer->cbdata; ompi_osc_pt2pt_reply_header_t *header = - (ompi_osc_pt2pt_reply_header_t*) descriptor->des_src[0].seg_addr.pval; - - if (OMPI_SUCCESS != status) { - /* requeue and return */ - /* BWB - FIX ME - figure out where to put this bad boy */ - abort(); - return; - } + (ompi_osc_pt2pt_reply_header_t*) buffer->payload; #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { @@ -369,9 +353,8 @@ ompi_osc_pt2pt_replyreq_send_cb(struct mca_btl_base_module_t* btl, } /* release the descriptor and replyreq */ - btl->btl_free(btl, descriptor); - - /* any other replyreqs to restart? */ + OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, + &buffer->super); } @@ -380,35 +363,32 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_replyreq_t *replyreq) { int ret = OMPI_SUCCESS; - mca_bml_base_endpoint_t *endpoint = NULL; - mca_bml_base_btl_t *bml_btl = NULL; - mca_btl_base_descriptor_t *descriptor = NULL; + opal_free_list_item_t *item; + ompi_osc_pt2pt_buffer_t *buffer = NULL; ompi_osc_pt2pt_reply_header_t *header = NULL; size_t written_data = 0; - - /* Get a BTL and a fragment to go with it */ - endpoint = (mca_bml_base_endpoint_t*) replyreq->rep_origin_proc->proc_bml; - bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); - descriptor = bml_btl->btl_alloc(bml_btl->btl, - bml_btl->btl_eager_limit); - if (NULL == descriptor) { + + /* Get a buffer */ + OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers, + item, ret); + if (NULL == item) { ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; goto cleanup; } + buffer = (ompi_osc_pt2pt_buffer_t*) item; /* verify at least enough space for header */ - if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_pt2pt_reply_header_t)) { + if (mca_osc_pt2pt_component.p2p_c_eager_size < sizeof(ompi_osc_pt2pt_reply_header_t)) { ret = OMPI_ERR_OUT_OF_RESOURCE; goto cleanup; } - /* setup descriptor */ - descriptor->des_cbfunc = ompi_osc_pt2pt_replyreq_send_cb; - descriptor->des_cbdata = (void*) replyreq; - descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY; + /* setup buffer */ + buffer->cbfunc = ompi_osc_pt2pt_replyreq_send_cb; + buffer->cbdata = (void*) replyreq; /* pack header */ - header = (ompi_osc_pt2pt_reply_header_t*) descriptor->des_src[0].seg_addr.pval; + header = (ompi_osc_pt2pt_reply_header_t*) buffer->payload; written_data += sizeof(ompi_osc_pt2pt_reply_header_t); header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_REPLY; header->hdr_base.hdr_flags = 0; @@ -416,7 +396,7 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module, header->hdr_target_tag = 0; /* if sending data fits, pack payload */ - if (descriptor->des_src[0].seg_len >= + if (mca_osc_pt2pt_component.p2p_c_eager_size >= written_data + replyreq->rep_target_bytes_packed) { struct iovec iov; uint32_t iov_count = 1; @@ -424,7 +404,7 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module, size_t max_data = replyreq->rep_target_bytes_packed; iov.iov_len = max_data; - iov.iov_base = (void*) ((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data); + iov.iov_base = (void*) ((unsigned char*) buffer->payload + written_data); ret = ompi_convertor_pack(&replyreq->rep_target_convertor, &iov, &iov_count, &max_data, &free_after); @@ -435,7 +415,6 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module, assert(max_data == replyreq->rep_target_bytes_packed); written_data += max_data; - descriptor->des_src[0].seg_len = written_data; header->hdr_msg_length = replyreq->rep_target_bytes_packed; } else { @@ -443,6 +422,8 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module, header->hdr_target_tag = create_send_tag(module); } + buffer->len = written_data; + #ifdef WORDS_BIGENDIAN header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; #elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT @@ -453,12 +434,23 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module, #endif /* send fragment */ - ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT); + ret = MCA_PML_CALL(isend(buffer->payload, + buffer->len, + MPI_BYTE, + replyreq->rep_origin_rank, + -200, + MCA_PML_BASE_SEND_STANDARD, + module->p2p_comm, + &buffer->request)); + opal_list_append(&module->p2p_pending_control_sends, + &buffer->super.super); + goto done; cleanup: - if (descriptor != NULL) { - mca_bml_base_free(bml_btl, descriptor); + if (item != NULL) { + OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, + item); } done: @@ -769,13 +761,11 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module, * **********************************************************************/ static void -ompi_osc_pt2pt_control_send_cb(struct mca_btl_base_module_t* btl, - struct mca_btl_base_endpoint_t *endpoint, - struct mca_btl_base_descriptor_t* descriptor, - int status) +ompi_osc_pt2pt_control_send_cb(ompi_osc_pt2pt_buffer_t *buffer) { /* release the descriptor and sendreq */ - btl->btl_free(btl, descriptor); + OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, + &buffer->super); } @@ -785,35 +775,40 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module, uint8_t type, int32_t value0, int32_t value1) { int ret = OMPI_SUCCESS; - mca_bml_base_endpoint_t *endpoint = NULL; - mca_bml_base_btl_t *bml_btl = NULL; - mca_btl_base_descriptor_t *descriptor = NULL; + opal_free_list_item_t *item; + ompi_osc_pt2pt_buffer_t *buffer = NULL; ompi_osc_pt2pt_control_header_t *header = NULL; - - /* Get a BTL and a fragment to go with it */ - endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml; - bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); - descriptor = bml_btl->btl_alloc(bml_btl->btl, - sizeof(ompi_osc_pt2pt_control_header_t)); - if (NULL == descriptor) { + int rank, i; + + /* find the rank */ + for (i = 0 ; i < module->p2p_comm->c_remote_group->grp_proc_count ; ++i) { + if (proc == module->p2p_comm->c_remote_group->grp_proc_pointers[i]) { + rank = i; + } + } + + /* Get a buffer */ + OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers, + item, ret); + if (NULL == item) { ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; goto cleanup; } + buffer = (ompi_osc_pt2pt_buffer_t*) item; /* verify at least enough space for header */ - if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_pt2pt_control_header_t)) { + if (mca_osc_pt2pt_component.p2p_c_eager_size < sizeof(ompi_osc_pt2pt_control_header_t)) { ret = OMPI_ERR_OUT_OF_RESOURCE; goto cleanup; } - /* setup descriptor */ - descriptor->des_cbfunc = ompi_osc_pt2pt_control_send_cb; - descriptor->des_cbdata = NULL; - descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY; - descriptor->des_src[0].seg_len = sizeof(ompi_osc_pt2pt_control_header_t); + /* setup buffer */ + buffer->cbfunc = ompi_osc_pt2pt_control_send_cb; + buffer->cbdata = NULL; + buffer->len = sizeof(ompi_osc_pt2pt_control_header_t); /* pack header */ - header = (ompi_osc_pt2pt_control_header_t*) descriptor->des_src[0].seg_addr.pval; + header = (ompi_osc_pt2pt_control_header_t*) buffer->payload; header->hdr_base.hdr_type = type; header->hdr_value[0] = value0; header->hdr_value[1] = value1; @@ -829,12 +824,22 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module, #endif /* send fragment */ - ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT); + ret = MCA_PML_CALL(isend(buffer->payload, + buffer->len, + MPI_BYTE, + rank, + -200, + MCA_PML_BASE_SEND_STANDARD, + module->p2p_comm, + &buffer->request)); + opal_list_append(&module->p2p_pending_control_sends, + &buffer->super.super); goto done; cleanup: - if (descriptor != NULL) { - mca_bml_base_free(bml_btl, descriptor); + if (item != NULL) { + OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, + item); } done: diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c index 492f4d1e40..d7b9ed1608 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.c @@ -30,7 +30,7 @@ /* should have p2p_lock before calling */ static inline void -ompi_osc_pt2pt_progress(ompi_osc_pt2pt_module_t *module) +ompi_osc_pt2pt_progress_long(ompi_osc_pt2pt_module_t *module) { if (0 != opal_list_get_size(&(module->p2p_long_msgs))) { opal_list_item_t *item, *next; @@ -109,50 +109,16 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win) ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win)); - switch (P2P_MODULE(win)->p2p_fence_sync_type) { - /* find out how much data everyone is going to send us. Need to have the lock during this period so that we have a sane view of the number of sendreqs */ - case OSC_SYNC_REDUCE_SCATTER: - ret = P2P_MODULE(win)->p2p_comm-> - c_coll.coll_reduce_scatter(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs, - &incoming_reqs, - P2P_MODULE(win)->p2p_fence_coll_counts, - MPI_SHORT, - MPI_SUM, - P2P_MODULE(win)->p2p_comm); - break; - - case OSC_SYNC_ALLREDUCE: - ret = P2P_MODULE(win)->p2p_comm-> - c_coll.coll_allreduce(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs, - P2P_MODULE(win)->p2p_fence_coll_results, - ompi_comm_size(P2P_MODULE(win)->p2p_comm), - MPI_SHORT, - MPI_SUM, - P2P_MODULE(win)->p2p_comm); - incoming_reqs = P2P_MODULE(win)-> - p2p_fence_coll_results[P2P_MODULE(win)->p2p_comm->c_my_rank]; - break; - - case OSC_SYNC_ALLTOALL: - ret = P2P_MODULE(win)->p2p_comm-> - c_coll.coll_alltoall(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs, - 1, - MPI_SHORT, - P2P_MODULE(win)->p2p_fence_coll_results, - 1, - MPI_SHORT, - P2P_MODULE(win)->p2p_comm); - incoming_reqs = 0; - for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) { - incoming_reqs += P2P_MODULE(win)->p2p_fence_coll_results[i]; - } - break; - default: - assert(0 == 1); - } + ret = P2P_MODULE(win)->p2p_comm-> + c_coll.coll_reduce_scatter(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs, + &incoming_reqs, + P2P_MODULE(win)->p2p_fence_coll_counts, + MPI_SHORT, + MPI_SUM, + P2P_MODULE(win)->p2p_comm); if (OMPI_SUCCESS != ret) { /* put the stupid data back for the user. This is not @@ -201,7 +167,7 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win) /* now we know how many things we're waiting for - wait for them... */ while (P2P_MODULE(win)->p2p_num_pending_in > 0 || 0 != P2P_MODULE(win)->p2p_num_pending_out) { - ompi_osc_pt2pt_progress(P2P_MODULE(win)); + ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); } } @@ -255,7 +221,7 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win) /* wait for all the post messages */ while (0 != P2P_MODULE(win)->p2p_num_pending_in) { - ompi_osc_pt2pt_progress(P2P_MODULE(win)); + ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); } ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win)); @@ -311,7 +277,7 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win) /* wait for all the requests */ while (0 != P2P_MODULE(win)->p2p_num_pending_out) { - ompi_osc_pt2pt_progress(P2P_MODULE(win)); + ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); } cleanup: @@ -374,7 +340,7 @@ ompi_osc_pt2pt_module_wait(ompi_win_t *win) while (0 != (P2P_MODULE(win)->p2p_num_pending_in) || 0 != (P2P_MODULE(win)->p2p_num_pending_out)) { - ompi_osc_pt2pt_progress(P2P_MODULE(win)); + ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); } ompi_win_set_mode(win, 0); @@ -400,7 +366,7 @@ ompi_osc_pt2pt_module_test(ompi_win_t *win, if (0 != (P2P_MODULE(win)->p2p_num_pending_in) || 0 != (P2P_MODULE(win)->p2p_num_pending_out)) { - ompi_osc_pt2pt_progress(P2P_MODULE(win)); + ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); if (0 != (P2P_MODULE(win)->p2p_num_pending_in) || 0 != (P2P_MODULE(win)->p2p_num_pending_out)) { *flag = 0; @@ -472,7 +438,7 @@ ompi_osc_pt2pt_module_unlock(int target, ompi_proc_t *proc = P2P_MODULE(win)->p2p_comm->c_pml_procs[target]->proc_ompi; while (0 == P2P_MODULE(win)->p2p_lock_received_ack) { - ompi_osc_pt2pt_progress(P2P_MODULE(win)); + ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); } P2P_MODULE(win)->p2p_lock_received_ack = 0; @@ -501,7 +467,7 @@ ompi_osc_pt2pt_module_unlock(int target, /* wait for all the requests */ while (0 != P2P_MODULE(win)->p2p_num_pending_out) { - ompi_osc_pt2pt_progress(P2P_MODULE(win)); + ompi_osc_pt2pt_progress_long(P2P_MODULE(win)); } /* send the unlock request */ @@ -586,7 +552,7 @@ ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module, OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count); while (0 != module->p2p_num_pending_in) { - ompi_osc_pt2pt_progress(module); + ompi_osc_pt2pt_progress_long(module); } OPAL_THREAD_LOCK(&(module->p2p_lock)); diff --git a/ompi/mca/osc/rdma/osc_rdma.h b/ompi/mca/osc/rdma/osc_rdma.h index ddbd655237..0048bb693a 100644 --- a/ompi/mca/osc/rdma/osc_rdma.h +++ b/ompi/mca/osc/rdma/osc_rdma.h @@ -14,8 +14,8 @@ * $HEADER$ */ -#ifndef OMPI_OSC_PT2PT_H -#define OMPI_OSC_PT2PT_H +#ifndef OMPI_OSC_RDMA_H +#define OMPI_OSC_RDMA_H #include "opal/class/opal_list.h" #include "opal/class/opal_free_list.h" @@ -257,4 +257,4 @@ int ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module, } #endif -#endif /* OMPI_OSC_PT2PT_H */ +#endif /* OMPI_OSC_RDMA_H */ diff --git a/ompi/mca/osc/rdma/osc_rdma_comm.c b/ompi/mca/osc/rdma/osc_rdma_comm.c index d9dbc24c24..98e6e5dfbb 100644 --- a/ompi/mca/osc/rdma/osc_rdma_comm.c +++ b/ompi/mca/osc/rdma/osc_rdma_comm.c @@ -69,7 +69,7 @@ ompi_osc_rdma_module_accumulate(void *origin_addr, int origin_count, } /* create sendreq */ - ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_ACC, + ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_ACC, origin_addr, origin_count, origin_dt, @@ -115,7 +115,7 @@ ompi_osc_rdma_module_get(void *origin_addr, } /* create sendreq */ - ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_GET, + ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_GET, origin_addr, origin_count, origin_dt, @@ -173,7 +173,7 @@ ompi_osc_rdma_module_put(void *origin_addr, int origin_count, } /* create sendreq */ - ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_PUT, + ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_PUT, origin_addr, origin_count, origin_dt, diff --git a/ompi/mca/osc/rdma/osc_rdma_component.c b/ompi/mca/osc/rdma/osc_rdma_component.c index 4ef4821de0..ef1f06b897 100644 --- a/ompi/mca/osc/rdma/osc_rdma_component.c +++ b/ompi/mca/osc/rdma/osc_rdma_component.c @@ -40,10 +40,10 @@ ompi_osc_rdma_component_t mca_osc_rdma_component = { { /* ompi_osc_base_component_t */ { /* ompi_base_component_t */ OMPI_OSC_BASE_VERSION_1_0_0, - "pt2pt", - 1, - 0, - 0, + "rdma", + OMPI_MAJOR_VERSION, /* MCA component major version */ + OMPI_MINOR_VERSION, /* MCA component minor version */ + OMPI_RELEASE_VERSION, /* MCA component release version */ ompi_osc_rdma_component_open, NULL }, @@ -115,7 +115,7 @@ check_config_value_bool(char *key, ompi_info_t *info) return result; info_not_found: - param = mca_base_param_find("osc", "pt2pt", key); + param = mca_base_param_find("osc", "rdma", key); if (param == OPAL_ERROR) return false; ret = mca_base_param_lookup_int(param, &flag); @@ -139,18 +139,18 @@ ompi_osc_rdma_component_open(void) "How to synchronize fence: reduce_scatter, allreduce, alltoall", false, false, "reduce_scatter", NULL); - mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version, - "eager_send", - "Attempt to start data movement during communication call, " - "instead of at synchrnoization time. " - "Info key of same name overrides this value, " - "if info key given.", - false, false, 0, NULL); + mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version, + "eager_send", + "Attempt to start data movement during communication call, " + "instead of at synchrnoization time. " + "Info key of same name overrides this value, " + "if info key given.", + false, false, 0, NULL); - mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version, - "no_locks", - "Enable optimizations available only if MPI_LOCK is not used.", - false, false, 0, NULL); + mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version, + "no_locks", + "Enable optimizations available only if MPI_LOCK is not used.", + false, false, 0, NULL); return OMPI_SUCCESS; } @@ -160,6 +160,8 @@ int ompi_osc_rdma_component_init(bool enable_progress_threads, bool enable_mpi_threads) { + if (!mca_bml_base_inited()) return OMPI_ERROR; + /* we can run with either threads or not threads (may not be able to do win locks)... */ mca_osc_rdma_component.p2p_c_have_progress_threads = @@ -204,9 +206,7 @@ ompi_osc_rdma_component_finalize(void) num_modules); } -#if 0 - mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT, NULL, NULL); -#endif + mca_bml.bml_register(MCA_BTL_TAG_OSC_RDMA, NULL, NULL); OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_longreqs); OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_replyreqs); @@ -223,7 +223,10 @@ ompi_osc_rdma_component_query(ompi_win_t *win, ompi_info_t *info, ompi_communicator_t *comm) { - /* we can always run - return a low priority */ + /* if we inited, then the BMLs are available and we have a path to + each peer. Return slightly higher priority than the + point-to-point code */ + return 10; } @@ -373,7 +376,7 @@ ompi_osc_rdma_component_select(ompi_win_t *win, opal_atomic_mb(); /* register to receive fragment callbacks */ - ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT, + ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_RDMA, ompi_osc_rdma_component_fragment_cb, NULL); @@ -406,7 +409,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, /* handle message */ switch (((ompi_osc_rdma_base_header_t*) descriptor->des_dst[0].seg_addr.pval)->hdr_type) { - case OMPI_OSC_PT2PT_HDR_PUT: + case OMPI_OSC_RDMA_HDR_PUT: { ompi_osc_rdma_send_header_t *header; @@ -416,8 +419,8 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, payload = (void*) (header + 1); #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_SEND_HDR_NTOH(*header); } #endif @@ -429,7 +432,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, } break; - case OMPI_OSC_PT2PT_HDR_ACC: + case OMPI_OSC_RDMA_HDR_ACC: { ompi_osc_rdma_send_header_t *header; @@ -439,8 +442,8 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, payload = (void*) (header + 1); #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_SEND_HDR_NTOH(*header); } #endif @@ -453,7 +456,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, } break; - case OMPI_OSC_PT2PT_HDR_GET: + case OMPI_OSC_RDMA_HDR_GET: { ompi_datatype_t *datatype; ompi_osc_rdma_send_header_t *header; @@ -466,8 +469,8 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, payload = (void*) (header + 1); #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_SEND_HDR_NTOH(*header); } #endif @@ -496,7 +499,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, } break; - case OMPI_OSC_PT2PT_HDR_REPLY: + case OMPI_OSC_RDMA_HDR_REPLY: { ompi_osc_rdma_reply_header_t *header; ompi_osc_rdma_sendreq_t *sendreq; @@ -507,8 +510,8 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, payload = (void*) (header + 1); #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_REPLY_HDR_NTOH(*header); } #endif @@ -520,15 +523,15 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, ompi_osc_rdma_replyreq_recv(module, sendreq, header, payload); } break; - case OMPI_OSC_PT2PT_HDR_POST: + case OMPI_OSC_RDMA_HDR_POST: { ompi_osc_rdma_control_header_t *header = (ompi_osc_rdma_control_header_t*) descriptor->des_dst[0].seg_addr.pval; #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header); } #endif @@ -539,15 +542,15 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1); } break; - case OMPI_OSC_PT2PT_HDR_COMPLETE: + case OMPI_OSC_RDMA_HDR_COMPLETE: { ompi_osc_rdma_control_header_t *header = (ompi_osc_rdma_control_header_t*) descriptor->des_dst[0].seg_addr.pval; #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header); } #endif @@ -562,15 +565,15 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, } break; - case OMPI_OSC_PT2PT_HDR_LOCK_REQ: + case OMPI_OSC_RDMA_HDR_LOCK_REQ: { ompi_osc_rdma_control_header_t *header = (ompi_osc_rdma_control_header_t*) descriptor->des_dst[0].seg_addr.pval; #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header); } #endif @@ -587,15 +590,15 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, } break; - case OMPI_OSC_PT2PT_HDR_UNLOCK_REQ: + case OMPI_OSC_RDMA_HDR_UNLOCK_REQ: { ompi_osc_rdma_control_header_t *header = (ompi_osc_rdma_control_header_t*) descriptor->des_dst[0].seg_addr.pval; #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header); } #endif diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.c b/ompi/mca/osc/rdma/osc_rdma_data_move.c index 9b612ad867..cc65b58b0a 100644 --- a/ompi/mca/osc/rdma/osc_rdma_data_move.c +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.c @@ -105,10 +105,10 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl, wait for this completion before exiting a synchronization point in the case of get, as we really don't care when it completes - only when the data arrives. */ - if (OMPI_OSC_PT2PT_HDR_GET != header->hdr_base.hdr_type) { + if (OMPI_OSC_RDMA_HDR_GET != header->hdr_base.hdr_type) { #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_SEND_HDR_NTOH(*header); } #endif /* do we need to post a send? */ @@ -170,7 +170,7 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module, /* we always need to send the ddt */ needed_len += packed_ddt_len; - if (OMPI_OSC_PT2PT_GET != sendreq->req_type) { + if (OMPI_OSC_RDMA_GET != sendreq->req_type) { needed_len += sendreq->req_origin_bytes_packed; } @@ -208,20 +208,20 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module, header->hdr_target_count = sendreq->req_target_count; switch (sendreq->req_type) { - case OMPI_OSC_PT2PT_PUT: - header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_PUT; + case OMPI_OSC_RDMA_PUT: + header->hdr_base.hdr_type = OMPI_OSC_RDMA_HDR_PUT; #if OMPI_ENABLE_MEM_DEBUG header->hdr_target_op = 0; #endif break; - case OMPI_OSC_PT2PT_ACC: - header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_ACC; + case OMPI_OSC_RDMA_ACC: + header->hdr_base.hdr_type = OMPI_OSC_RDMA_HDR_ACC; header->hdr_target_op = sendreq->req_op_id; break; - case OMPI_OSC_PT2PT_GET: - header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_GET; + case OMPI_OSC_RDMA_GET: + header->hdr_base.hdr_type = OMPI_OSC_RDMA_HDR_GET; #if OMPI_ENABLE_MEM_DEBUG header->hdr_target_op = 0; #endif @@ -235,7 +235,7 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module, packed_ddt, packed_ddt_len); written_data += packed_ddt_len; - if (OMPI_OSC_PT2PT_GET != sendreq->req_type) { + if (OMPI_OSC_RDMA_GET != sendreq->req_type) { /* if sending data and it fits, pack payload */ if (descriptor->des_src[0].seg_len >= written_data + sendreq->req_origin_bytes_packed) { @@ -269,11 +269,11 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module, } #ifdef WORDS_BIGENDIAN - header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; + header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO; #elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (sendreq->req_target_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) { - header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; - OMPI_OSC_PT2PT_SEND_HDR_HTON(*header); + header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO; + OMPI_OSC_RDMA_SEND_HDR_HTON(*header); } #endif @@ -282,7 +282,7 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module, sendreq->req_module->p2p_comm->c_my_rank, sendreq->req_target_rank); - ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT); + ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_RDMA); goto done; cleanup: @@ -335,8 +335,8 @@ ompi_osc_rdma_replyreq_send_cb(struct mca_btl_base_module_t* btl, } #if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT - if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { - OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header); + if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) { + OMPI_OSC_RDMA_REPLY_HDR_NTOH(*header); } #endif @@ -410,7 +410,7 @@ ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module, /* pack header */ header = (ompi_osc_rdma_reply_header_t*) descriptor->des_src[0].seg_addr.pval; written_data += sizeof(ompi_osc_rdma_reply_header_t); - header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_REPLY; + header->hdr_base.hdr_type = OMPI_OSC_RDMA_HDR_REPLY; header->hdr_base.hdr_flags = 0; header->hdr_origin_sendreq = replyreq->rep_origin_sendreq; header->hdr_target_tag = 0; @@ -444,16 +444,16 @@ ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module, } #ifdef WORDS_BIGENDIAN - header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; + header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO; #elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (replyreq->rep_origin_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) { - header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; - OMPI_OSC_PT2PT_REPLY_HDR_HTON(*header); + header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO; + OMPI_OSC_RDMA_REPLY_HDR_HTON(*header); } #endif /* send fragment */ - ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT); + ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_RDMA); goto done; cleanup: @@ -820,16 +820,16 @@ ompi_osc_rdma_control_send(ompi_osc_rdma_module_t *module, header->hdr_windx = module->p2p_comm->c_contextid; #ifdef WORDS_BIGENDIAN - header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; + header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO; #elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT if (proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) { - header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; - OMPI_OSC_PT2PT_CONTROL_HDR_HTON(*header); + header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO; + OMPI_OSC_RDMA_CONTROL_HDR_HTON(*header); } #endif /* send fragment */ - ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT); + ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_RDMA); goto done; cleanup: diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.h b/ompi/mca/osc/rdma/osc_rdma_data_move.h index b13ad9b146..eb50e967c5 100644 --- a/ompi/mca/osc/rdma/osc_rdma_data_move.h +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.h @@ -14,8 +14,8 @@ * $HEADER$ */ -#ifndef OMPI_MCA_OSC_PT2PT_DATA_MOVE_H -#define OMPI_MCA_OSC_PT2PT_DATA_MOVE_H +#ifndef OMPI_MCA_OSC_RDMA_DATA_MOVE_H +#define OMPI_MCA_OSC_RDMA_DATA_MOVE_H #include "osc_rdma_sendreq.h" #include "osc_rdma_replyreq.h" diff --git a/ompi/mca/osc/rdma/osc_rdma_header.h b/ompi/mca/osc/rdma/osc_rdma_header.h index dc2cc6e7b2..0fe610cddc 100644 --- a/ompi/mca/osc/rdma/osc_rdma_header.h +++ b/ompi/mca/osc/rdma/osc_rdma_header.h @@ -14,8 +14,8 @@ * $HEADER$ */ -#ifndef OMPI_MCA_OSC_PT2PT_HDR_H -#define OMPI_MCA_OSC_PT2PT_HDR_H +#ifndef OMPI_MCA_OSC_RDMA_HDR_H +#define OMPI_MCA_OSC_RDMA_HDR_H #ifdef HAVE_NETINET_IN_H #include @@ -23,16 +23,16 @@ #include "opal/types.h" -#define OMPI_OSC_PT2PT_HDR_PUT 0x0001 -#define OMPI_OSC_PT2PT_HDR_ACC 0x0002 -#define OMPI_OSC_PT2PT_HDR_GET 0x0004 -#define OMPI_OSC_PT2PT_HDR_REPLY 0x0008 -#define OMPI_OSC_PT2PT_HDR_POST 0x0010 -#define OMPI_OSC_PT2PT_HDR_COMPLETE 0x0020 -#define OMPI_OSC_PT2PT_HDR_LOCK_REQ 0x0040 -#define OMPI_OSC_PT2PT_HDR_UNLOCK_REQ 0x0080 +#define OMPI_OSC_RDMA_HDR_PUT 0x0001 +#define OMPI_OSC_RDMA_HDR_ACC 0x0002 +#define OMPI_OSC_RDMA_HDR_GET 0x0004 +#define OMPI_OSC_RDMA_HDR_REPLY 0x0008 +#define OMPI_OSC_RDMA_HDR_POST 0x0010 +#define OMPI_OSC_RDMA_HDR_COMPLETE 0x0020 +#define OMPI_OSC_RDMA_HDR_LOCK_REQ 0x0040 +#define OMPI_OSC_RDMA_HDR_UNLOCK_REQ 0x0080 -#define OMPI_OSC_PT2PT_HDR_FLAG_NBO 0x0001 +#define OMPI_OSC_RDMA_HDR_FLAG_NBO 0x0001 struct ompi_osc_rdma_base_header_t { uint8_t hdr_type; @@ -41,8 +41,8 @@ struct ompi_osc_rdma_base_header_t { }; typedef struct ompi_osc_rdma_base_header_t ompi_osc_rdma_base_header_t; -#define OMPI_OSC_PT2PT_BASE_HDR_NTOH(h) -#define OMPI_OSC_PT2PT_BASE_HDR_HTON(h) +#define OMPI_OSC_RDMA_BASE_HDR_NTOH(h) +#define OMPI_OSC_RDMA_BASE_HDR_HTON(h) struct ompi_osc_rdma_send_header_t { ompi_osc_rdma_base_header_t hdr_base; @@ -60,9 +60,9 @@ struct ompi_osc_rdma_send_header_t { }; typedef struct ompi_osc_rdma_send_header_t ompi_osc_rdma_send_header_t; -#define OMPI_OSC_PT2PT_SEND_HDR_HTON(hdr) \ +#define OMPI_OSC_RDMA_SEND_HDR_HTON(hdr) \ do { \ - OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \ + OMPI_OSC_RDMA_BASE_HDR_HTON((hdr).hdr_base) \ (hdr).hdr_windx = htons((hdr).hdr_windx); \ (hdr).hdr_origin = htonl((hdr).hdr_origin); \ (hdr).hdr_origin_tag = htonl((hdr).hdr_origin_tag); \ @@ -72,9 +72,9 @@ typedef struct ompi_osc_rdma_send_header_t ompi_osc_rdma_send_header_t; (hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \ } while (0) -#define OMPI_OSC_PT2PT_SEND_HDR_NTOH(hdr) \ +#define OMPI_OSC_RDMA_SEND_HDR_NTOH(hdr) \ do { \ - OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \ + OMPI_OSC_RDMA_BASE_HDR_NTOH((hdr).hdr_base) \ (hdr).hdr_windx = ntohs((hdr).hdr_windx); \ (hdr).hdr_origin = ntohl((hdr).hdr_origin); \ (hdr).hdr_origin_tag = ntohl((hdr).hdr_origin_tag); \ @@ -95,16 +95,16 @@ struct ompi_osc_rdma_reply_header_t { }; typedef struct ompi_osc_rdma_reply_header_t ompi_osc_rdma_reply_header_t; -#define OMPI_OSC_PT2PT_REPLY_HDR_HTON(hdr) \ +#define OMPI_OSC_RDMA_REPLY_HDR_HTON(hdr) \ do { \ - OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \ + OMPI_OSC_RDMA_BASE_HDR_HTON((hdr).hdr_base) \ (hdr).hdr_target_tag = htonl((hdr).hdr_target_tag); \ (hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \ } while (0) -#define OMPI_OSC_PT2PT_REPLY_HDR_NTOH(hdr) \ +#define OMPI_OSC_RDMA_REPLY_HDR_NTOH(hdr) \ do { \ - OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \ + OMPI_OSC_RDMA_BASE_HDR_NTOH((hdr).hdr_base) \ (hdr).hdr_target_tag = ntohl((hdr).hdr_target_tag); \ (hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \ } while (0) @@ -117,20 +117,20 @@ struct ompi_osc_rdma_control_header_t { }; typedef struct ompi_osc_rdma_control_header_t ompi_osc_rdma_control_header_t; -#define OMPI_OSC_PT2PT_CONTROL_HDR_HTON(hdr) \ +#define OMPI_OSC_RDMA_CONTROL_HDR_HTON(hdr) \ do { \ - OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \ + OMPI_OSC_RDMA_BASE_HDR_HTON((hdr).hdr_base) \ (hdr).hdr_windx = htons((hdr).hdr_windx); \ (hdr).hdr_value[0] = htonl((hdr).hdr_value[0]); \ (hdr).hdr_value[1] = htonl((hdr).hdr_value[1]); \ } while (0) -#define OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(hdr) \ +#define OMPI_OSC_RDMA_CONTROL_HDR_NTOH(hdr) \ do { \ - OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \ + OMPI_OSC_RDMA_BASE_HDR_NTOH((hdr).hdr_base) \ (hdr).hdr_windx = ntohs((hdr).hdr_windx); \ (hdr).hdr_value[0] = ntohl((hdr).hdr_value[0]); \ (hdr).hdr_value[1] = ntohl((hdr).hdr_value[1]); \ } while (0) -#endif /* OMPI_MCA_OSC_PT2PT_HDR_H */ +#endif /* OMPI_MCA_OSC_RDMA_HDR_H */ diff --git a/ompi/mca/osc/rdma/osc_rdma_longreq.h b/ompi/mca/osc/rdma/osc_rdma_longreq.h index 1330562e0c..450af0aa8f 100644 --- a/ompi/mca/osc/rdma/osc_rdma_longreq.h +++ b/ompi/mca/osc/rdma/osc_rdma_longreq.h @@ -14,8 +14,8 @@ * $HEADER$ */ -#ifndef OSC_PT2PT_LONGREQ_H -#define OSC_PT2PT_LONGREQ_H +#ifndef OSC_RDMA_LONGREQ_H +#define OSC_RDMA_LONGREQ_H #include "osc_rdma.h" diff --git a/ompi/mca/osc/rdma/osc_rdma_replyreq.h b/ompi/mca/osc/rdma/osc_rdma_replyreq.h index 1de566e8ff..78f7703305 100644 --- a/ompi/mca/osc/rdma/osc_rdma_replyreq.h +++ b/ompi/mca/osc/rdma/osc_rdma_replyreq.h @@ -14,8 +14,8 @@ * $HEADER$ */ -#ifndef OMPI_OSC_PT2PT_REPLYREQ_H -#define OMPI_OSC_PT2PT_REPLYREQ_H +#ifndef OMPI_OSC_RDMA_REPLYREQ_H +#define OMPI_OSC_RDMA_REPLYREQ_H #include "osc_rdma.h" #include "osc_rdma_longreq.h" @@ -136,4 +136,4 @@ ompi_osc_rdma_replyreq_free(ompi_osc_rdma_replyreq_t *replyreq) return OMPI_SUCCESS; } -#endif /* OMPI_OSC_PT2PT_REPLYREQ_H */ +#endif /* OMPI_OSC_RDMA_REPLYREQ_H */ diff --git a/ompi/mca/osc/rdma/osc_rdma_sendreq.h b/ompi/mca/osc/rdma/osc_rdma_sendreq.h index 1dc780a95d..fae7ce953f 100644 --- a/ompi/mca/osc/rdma/osc_rdma_sendreq.h +++ b/ompi/mca/osc/rdma/osc_rdma_sendreq.h @@ -14,8 +14,8 @@ * $HEADER$ */ -#ifndef OMPI_OSC_PT2PT_SENDREQ_H -#define OMPI_OSC_PT2PT_SENDREQ_H +#ifndef OMPI_OSC_RDMA_SENDREQ_H +#define OMPI_OSC_RDMA_SENDREQ_H #include "osc_rdma.h" #include "osc_rdma_longreq.h" @@ -30,9 +30,9 @@ #include "ompi/mca/pml/pml.h" typedef enum { - OMPI_OSC_PT2PT_GET, - OMPI_OSC_PT2PT_ACC, - OMPI_OSC_PT2PT_PUT + OMPI_OSC_RDMA_GET, + OMPI_OSC_RDMA_ACC, + OMPI_OSC_RDMA_PUT } ompi_osc_rdma_req_type_t; @@ -118,7 +118,7 @@ ompi_osc_rdma_sendreq_init_origin(ompi_osc_rdma_sendreq_t *sendreq, sendreq->req_origin_datatype = origin_dt; sendreq->req_type = req_type; - if (req_type != OMPI_OSC_PT2PT_GET) { + if (req_type != OMPI_OSC_RDMA_GET) { ompi_convertor_copy_and_prepare_for_send(sendreq->req_target_proc->proc_convertor, origin_dt, origin_count, @@ -172,4 +172,4 @@ ompi_osc_rdma_sendreq_free(ompi_osc_rdma_sendreq_t *sendreq) return OMPI_SUCCESS; } -#endif /* OMPI_OSC_PT2PT_SENDREQ_H */ +#endif /* OMPI_OSC_RDMA_SENDREQ_H */ diff --git a/ompi/mca/osc/rdma/osc_rdma_sync.c b/ompi/mca/osc/rdma/osc_rdma_sync.c index 61432066f6..9a40e8d208 100644 --- a/ompi/mca/osc/rdma/osc_rdma_sync.c +++ b/ompi/mca/osc/rdma/osc_rdma_sync.c @@ -287,7 +287,7 @@ ompi_osc_rdma_module_complete(ompi_win_t *win) P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank]); ompi_osc_rdma_control_send(P2P_MODULE(win), P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i], - OMPI_OSC_PT2PT_HDR_COMPLETE, + OMPI_OSC_RDMA_HDR_COMPLETE, P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank], 0); } @@ -360,7 +360,7 @@ ompi_osc_rdma_module_post(ompi_group_t *group, for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) { ompi_osc_rdma_control_send(P2P_MODULE(win), group->grp_proc_pointers[i], - OMPI_OSC_PT2PT_HDR_POST, 1, 0); + OMPI_OSC_RDMA_HDR_POST, 1, 0); } return OMPI_SUCCESS; @@ -453,7 +453,7 @@ ompi_osc_rdma_module_lock(int lock_type, /* generate a lock request */ ompi_osc_rdma_control_send(P2P_MODULE(win), proc, - OMPI_OSC_PT2PT_HDR_LOCK_REQ, + OMPI_OSC_RDMA_HDR_LOCK_REQ, P2P_MODULE(win)->p2p_comm->c_my_rank, lock_type); @@ -509,7 +509,7 @@ ompi_osc_rdma_module_unlock(int target, P2P_MODULE(win)->p2p_comm->c_my_rank, target); ompi_osc_rdma_control_send(P2P_MODULE(win), proc, - OMPI_OSC_PT2PT_HDR_UNLOCK_REQ, + OMPI_OSC_RDMA_HDR_UNLOCK_REQ, P2P_MODULE(win)->p2p_comm->c_my_rank, out_count); @@ -565,7 +565,7 @@ ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module, opal_output(-1, "%d sending lock ack to %d", module->p2p_comm->c_my_rank, origin); ompi_osc_rdma_control_send(module, proc, - OMPI_OSC_PT2PT_HDR_LOCK_REQ, + OMPI_OSC_RDMA_HDR_LOCK_REQ, module->p2p_comm->c_my_rank, OMPI_SUCCESS); } @@ -610,7 +610,7 @@ ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module, module->p2p_lock_status = new_pending->lock_type; ompi_osc_rdma_control_send(module, new_pending->proc, - OMPI_OSC_PT2PT_HDR_LOCK_REQ, + OMPI_OSC_RDMA_HDR_LOCK_REQ, module->p2p_comm->c_my_rank, OMPI_SUCCESS); OBJ_DESTRUCT(new_pending);