diff --git a/ompi/mca/btl/portals/configure.m4 b/ompi/mca/btl/portals/configure.m4 index 16848e5a79..10127d9225 100644 --- a/ompi/mca/btl/portals/configure.m4 +++ b/ompi/mca/btl/portals/configure.m4 @@ -33,7 +33,7 @@ AC_DEFUN([MCA_btl_portals_CONFIG_VAL], [ AC_MSG_ERROR([--without-btl-portals-$1 is invalid argument]) ;; *) - $2="$with_m4_bpatsubst([btl-portals-$1], -, _)" + $2="[$with_]m4_bpatsubst([btl-portals-$1], -, _)" AC_MSG_RESULT([[$]$2]) ;; esac diff --git a/ompi/mca/btl/portals/src/btl_portals.c b/ompi/mca/btl/portals/src/btl_portals.c index 498360cdc2..f5c042b7b2 100644 --- a/ompi/mca/btl/portals/src/btl_portals.c +++ b/ompi/mca/btl/portals/src/btl_portals.c @@ -60,7 +60,7 @@ mca_btl_portals_module_t mca_btl_portals_module = { mca_btl_portals_prepare_dst, mca_btl_portals_send, mca_btl_portals_put, - mca_btl_portals_get, + mca_btl_portals_get }, }; @@ -125,14 +125,30 @@ mca_btl_portals_add_procs(struct mca_btl_base_module_t* btl, if (NULL != portals_procs) free(portals_procs); - OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock); - if (need_recv_setup) { + /* create eqs */ + int i; + for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) { + int ptl_ret = PtlEQAlloc(ptl_btl->portals_ni_h, + ptl_btl->portals_eq_sizes[i], + PTL_EQ_HANDLER_NONE, + &(ptl_btl->portals_eq_handles[i])); + if (PTL_OK != ptl_ret) { + opal_output(mca_btl_portals_component.portals_output, + "Error creating EQ %d: %d", i, ptl_ret); + OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock); + /* BWB - better error code? */ + return OMPI_ERROR; + } + } + ret = mca_btl_portals_recv_enable(ptl_btl); } else { ret = OMPI_SUCCESS; } + OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock); + return ret; } @@ -149,6 +165,9 @@ mca_btl_portals_del_procs(struct mca_btl_base_module_t *btl, int ret = OMPI_SUCCESS; bool need_recv_shutdown = false; + opal_output_verbose(100, mca_btl_portals_component.portals_output, + "del_procs called for %ld procs", (long) nprocs); + OPAL_THREAD_LOCK(&ptl_btl->portals_lock); for (i = 0 ; i < nprocs ; ++i) { @@ -161,14 +180,26 @@ mca_btl_portals_del_procs(struct mca_btl_base_module_t *btl, need_recv_shutdown = true; } - OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock); - if (need_recv_shutdown) { + int i; + ret = mca_btl_portals_recv_disable(ptl_btl); + + /* destroy eqs */ + for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) { + int ptl_ret = PtlEQFree(ptl_btl->portals_eq_handles[i]); + if (PTL_OK != ptl_ret) { + opal_output(mca_btl_portals_component.portals_output, + "Error freeing EQ %d: %d", i, ptl_ret); + } + } + } else { ret = OMPI_SUCCESS; } + OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock); + return ret; } @@ -208,6 +239,7 @@ mca_btl_portals_alloc(struct mca_btl_base_module_t* btl, } frag->base.des_flags = 0; + frag->type = MCA_BTL_PORTALS_FRAG_SEND; return (mca_btl_base_descriptor_t*) frag; } @@ -217,16 +249,22 @@ int mca_btl_portals_free(struct mca_btl_base_module_t* btl, mca_btl_base_descriptor_t* des) { + mca_btl_portals_module_t* portals_btl = (mca_btl_portals_module_t*) btl; mca_btl_portals_frag_t* frag = (mca_btl_portals_frag_t*) des; - if (frag->size == 0) { - MCA_BTL_PORTALS_FRAG_RETURN_USER(btl, frag); - } else if (frag->size == btl->btl_eager_limit){ - MCA_BTL_PORTALS_FRAG_RETURN_EAGER(btl, frag); - } else if (frag->size == btl->btl_max_send_size) { - MCA_BTL_PORTALS_FRAG_RETURN_MAX(btl, frag); - } else { - return OMPI_ERR_BAD_PARAM; + if (frag->type == MCA_BTL_PORTALS_FRAG_SEND) { + if (frag->size == 0) { + MCA_BTL_PORTALS_FRAG_RETURN_USER(btl, frag); + } else if (frag->size == btl->btl_eager_limit){ + MCA_BTL_PORTALS_FRAG_RETURN_EAGER(btl, frag); + } else if (frag->size == btl->btl_max_send_size) { + MCA_BTL_PORTALS_FRAG_RETURN_MAX(btl, frag); + } else { + return OMPI_ERR_BAD_PARAM; + } + } else { + mca_btl_portals_return_chunk_part(portals_btl, frag); + MCA_BTL_PORTALS_FRAG_RETURN_USER(btl, frag); } return OMPI_SUCCESS; @@ -238,7 +276,39 @@ mca_btl_portals_finalize(struct mca_btl_base_module_t *btl_base) { struct mca_btl_portals_module_t *btl = (struct mca_btl_portals_module_t *) btl_base; - int ret; + int ret, i; + opal_list_item_t *item; + + OPAL_THREAD_LOCK(&ptl_btl->portals_lock); + + if (0 != opal_list_get_size(&btl->portals_endpoint_list)) { + OPAL_THREAD_LOCK(&ptl_btl->portals_lock); + while (NULL != + (item = opal_list_remove_first(&btl->portals_endpoint_list))) { + OBJ_RELEASE(item); + } + + /* only do this if there was something in the endpoint list. + otherwise, it has alredy been done. */ + + /* shut down recv queues */ + ret = mca_btl_portals_recv_disable(btl); + + /* destroy eqs */ + for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) { + int ptl_ret = PtlEQFree(btl->portals_eq_handles[i]); + if (PTL_OK != ptl_ret) { + opal_output(mca_btl_portals_component.portals_output, + "Error freeing EQ %d: %d", i, ptl_ret); + } + } + } + + OBJ_DESTRUCT(&btl->portals_endpoint_list); + OBJ_DESTRUCT(&btl->portals_recv_chunks); + + + OPAL_THREAD_UNLOCK(&btl->portals_lock); if (PTL_INVALID_HANDLE != btl->portals_ni_h) { ret = PtlNIFini(btl->portals_ni_h); @@ -248,6 +318,9 @@ mca_btl_portals_finalize(struct mca_btl_base_module_t *btl_base) return OMPI_ERROR; } } + + OBJ_DESTRUCT(&btl->portals_lock); + opal_output_verbose(20, mca_btl_portals_component.portals_output, "successfully finalized module"); diff --git a/ompi/mca/btl/portals/src/btl_portals.h b/ompi/mca/btl/portals/src/btl_portals.h index 9a36e945ed..a8c7f6451c 100644 --- a/ompi/mca/btl/portals/src/btl_portals.h +++ b/ompi/mca/btl/portals/src/btl_portals.h @@ -99,12 +99,20 @@ struct mca_btl_portals_module_t { int portals_recv_mds_num; /* size of each md for first frags */ int portals_recv_mds_size; + /* list of recv chunks */ + opal_list_t portals_recv_chunks; /* size for event queue */ int portals_eq_sizes[MCA_BTL_PORTALS_EQ_SIZE]; /* frag receive event queue */ ptl_handle_eq_t portals_eq_handles[MCA_BTL_PORTALS_EQ_SIZE]; + /* "reject" entry for recv match list */ + ptl_handle_me_t portals_recv_reject_me_h; + + /* number outstanding sends */ + volatile int32_t portals_outstanding_sends; + /* our portals network interface */ ptl_handle_ni_t portals_ni_h; /* the limits returned from PtlNIInit for interface */ @@ -137,7 +145,7 @@ int mca_btl_portals_component_progress(void); * Not part of the BTL interface. Need to be implemented for every * version of Portals */ -int mca_btl_portals_init(mca_btl_portals_component_t *comp); +int mca_btl_portals_init_compat(mca_btl_portals_component_t *comp); /* 4th argument is a ptl_peers array, as that's what we'll get back from many of the access functions... */ diff --git a/ompi/mca/btl/portals/src/btl_portals_compat_utcp.c b/ompi/mca/btl/portals/src/btl_portals_compat_utcp.c index e6b84e51d6..afb502b01e 100644 --- a/ompi/mca/btl/portals/src/btl_portals_compat_utcp.c +++ b/ompi/mca/btl/portals/src/btl_portals_compat_utcp.c @@ -40,7 +40,7 @@ FILE* utcp_api_out; FILE* utcp_lib_out; int -mca_btl_portals_init(mca_btl_portals_component_t *comp) +mca_btl_portals_init_compat(mca_btl_portals_component_t *comp) { ptl_process_id_t info; int ret; diff --git a/ompi/mca/btl/portals/src/btl_portals_component.c b/ompi/mca/btl/portals/src/btl_portals_component.c index 4f9fdcfab3..01d27b4a4e 100644 --- a/ompi/mca/btl/portals/src/btl_portals_component.c +++ b/ompi/mca/btl/portals/src/btl_portals_component.c @@ -123,10 +123,6 @@ mca_btl_portals_component_open(void) mca_btl_portals_component.portals_ifname = param_register_string("ifname", "eth0"); #endif - portals_output_stream.lds_verbose_level = - param_register_int("debug_level", - BTL_PORTALS_DEFAULT_DEBUG_LEVEL); - mca_btl_portals_component.portals_free_list_init_num = param_register_int("free_list_init_num", BTL_PORTALS_DEFAULT_FREE_LIST_INIT_NUM); @@ -138,24 +134,15 @@ mca_btl_portals_component_open(void) BTL_PORTALS_DEFAULT_FREE_LIST_INC_NUM); /* start up debugging output */ + portals_output_stream.lds_verbose_level = + param_register_int("debug_level", + BTL_PORTALS_DEFAULT_DEBUG_LEVEL); asprintf(&(portals_output_stream.lds_prefix), "btl: portals (%5d): ", getpid()); - mca_btl_portals_component.portals_output = opal_output_open(&portals_output_stream); /* fill default module state */ - mca_btl_portals_module.super.btl_flags = MCA_BTL_FLAGS_SEND; - - for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) { - mca_btl_portals_module.portals_eq_sizes[i] = 0; - mca_btl_portals_module.portals_eq_handles[i] = PTL_EQ_NONE; - } - - mca_btl_portals_module.portals_ni_h = PTL_INVALID_HANDLE; - mca_btl_portals_module.portals_sr_dropped = 0; - - /* get configured state for default module */ mca_btl_portals_module.super.btl_eager_limit = param_register_int("eager_limit", BTL_PORTALS_DEFAULT_EAGER_LIMIT); @@ -178,6 +165,36 @@ mca_btl_portals_component_open(void) mca_btl_portals_module.super.btl_bandwidth = param_register_int("bandwidth", 1000); + mca_btl_portals_module.super.btl_flags = MCA_BTL_FLAGS_SEND; + + bzero(&(mca_btl_portals_module.portals_reg), + sizeof(mca_btl_portals_module.portals_reg)); + + for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) { + mca_btl_portals_module.portals_eq_sizes[i] = 0; + mca_btl_portals_module.portals_eq_handles[i] = PTL_EQ_NONE; + } + /* eq handles will be created when the module is instantiated. + Set sizes here */ + mca_btl_portals_module.portals_eq_sizes[MCA_BTL_PORTALS_EQ_RECV] = + param_register_int("eq_recv_size", BTL_PORTALS_DEFAULT_RECV_QUEUE_SIZE); + /* sends_pending * 3 for start, end, ack */ + mca_btl_portals_module.portals_eq_sizes[MCA_BTL_PORTALS_EQ_SEND] = + param_register_int("eq_send_max_pending", BTL_PORTALS_MAX_SENDS_PENDING) * 3; + mca_btl_portals_module.portals_eq_sizes[MCA_BTL_PORTALS_EQ_RDMA] = + param_register_int("eq_rdma_size", 512); /* BWB - FIXME - make param */ + + mca_btl_portals_module.portals_recv_reject_me_h = PTL_INVALID_HANDLE; + + mca_btl_portals_module.portals_recv_mds_num = + param_register_int("recv_md_num", 3); /* BWB - FIXME - make param */ + mca_btl_portals_module.portals_recv_mds_size = + param_register_int("recv_md_size", 524288); /* BWB - FIXME - make param */ + + mca_btl_portals_module.portals_ni_h = PTL_INVALID_HANDLE; + mca_btl_portals_module.portals_sr_dropped = 0; + mca_btl_portals_module.portals_outstanding_sends = 0; + return OMPI_SUCCESS; } @@ -200,6 +217,7 @@ mca_btl_portals_component_close(void) free(portals_output_stream.lds_prefix); } + /* close debugging stream */ opal_output_close(mca_btl_portals_component.portals_output); mca_btl_portals_component.portals_output = -1; @@ -217,15 +235,15 @@ mca_btl_portals_component_init(int *num_btls, *num_btls = 0; - if (enable_progress_threads) { + if (enable_progress_threads || enable_mpi_threads) { opal_output_verbose(20, mca_btl_portals_component.portals_output, - "disabled because progress threads enabled"); + "disabled because threads enabled"); return NULL; } /* initialize portals btl. note that this is in the compat code because it's fairly non-portable between implementations */ - if (OMPI_SUCCESS != mca_btl_portals_init(&mca_btl_portals_component)) { + if (OMPI_SUCCESS != mca_btl_portals_init_compat(&mca_btl_portals_component)) { opal_output_verbose(20, mca_btl_portals_component.portals_output, "disabled because compatibility init failed"); return NULL; @@ -275,6 +293,15 @@ mca_btl_portals_component_init(int *num_btls, mca_btl_portals_component.portals_free_list_max_num, mca_btl_portals_component.portals_free_list_inc_num, NULL); + + /* endpoint list */ + OBJ_CONSTRUCT(&(ptl_btl->portals_endpoint_list), opal_list_t); + + /* receive chunk list */ + OBJ_CONSTRUCT(&(ptl_btl->portals_recv_chunks), opal_list_t); + + /* lock */ + OBJ_CONSTRUCT(&(ptl_btl->portals_lock), opal_mutex_t); } *num_btls = mca_btl_portals_component.portals_num_modules; @@ -304,7 +331,8 @@ mca_btl_portals_component_progress(void) PTL_EQ_NONE) continue; /* they are all initialized at once */ #if OMPI_ENABLE_DEBUG - /* BWB - this is going to kill performance */ + /* check for dropped packets. In theory, our protocol covers + this, but it can't hurt to check while we're debugging */ PtlNIStatus(module->portals_ni_h, PTL_SR_DROP_COUNT, &numdropped); @@ -318,7 +346,7 @@ mca_btl_portals_component_progress(void) ret = PtlEQPoll(module->portals_eq_handles, MCA_BTL_PORTALS_EQ_SIZE, /* number of eq handles */ - 10, + 10, /* poll time */ &ev, &which); if (PTL_EQ_EMPTY == ret) { @@ -334,17 +362,6 @@ mca_btl_portals_component_progress(void) "*** Event queue entries were dropped"); } -#if BTL_PORTALS_HAVE_EVENT_UNLINK && OMPI_ENABLE_DEBUG - /* not everyone has UNLINK. Use it only to print the event, - so we can make sure we properly re-initialize the ones that - need to be re-initialized */ - if (PTL_EVENT_UNLINK == ev.type) { - OPAL_OUTPUT_VERBOSE((100, mca_btl_portals_component.portals_output, - "unlink event occurred")); - continue; - } -#endif - switch (which) { case MCA_BTL_PORTALS_EQ_RECV: mca_btl_portals_process_recv(module, &ev); diff --git a/ompi/mca/btl/portals/src/btl_portals_endpoint.c b/ompi/mca/btl/portals/src/btl_portals_endpoint.c index 18f165ef14..993142eff1 100644 --- a/ompi/mca/btl/portals/src/btl_portals_endpoint.c +++ b/ompi/mca/btl/portals/src/btl_portals_endpoint.c @@ -38,7 +38,7 @@ static void mca_btl_portals_endpoint_construct(mca_btl_base_endpoint_t* endpoint OBJ_CLASS_INSTANCE( mca_btl_portals_endpoint_t, - opal_object_t, + opal_list_item_t, mca_btl_portals_endpoint_construct, NULL); diff --git a/ompi/mca/btl/portals/src/btl_portals_frag.c b/ompi/mca/btl/portals/src/btl_portals_frag.c index c5d5d79957..760715da86 100644 --- a/ompi/mca/btl/portals/src/btl_portals_frag.c +++ b/ompi/mca/btl/portals/src/btl_portals_frag.c @@ -21,7 +21,7 @@ static void -mca_btl_portals_frag_common_constructor(mca_btl_portals_frag_t* frag) +mca_btl_portals_frag_common_send_constructor(mca_btl_portals_frag_t* frag) { frag->base.des_dst = 0; frag->base.des_dst_cnt = 0; @@ -31,6 +31,8 @@ mca_btl_portals_frag_common_constructor(mca_btl_portals_frag_t* frag) frag->segment.seg_addr.pval = frag + sizeof(mca_btl_portals_frag_t); frag->segment.seg_len = frag->size; frag->segment.seg_key.key64 = 0; + + frag->type = MCA_BTL_PORTALS_FRAG_SEND; } @@ -38,7 +40,7 @@ static void mca_btl_portals_frag_eager_constructor(mca_btl_portals_frag_t* frag) { frag->size = mca_btl_portals_module.super.btl_eager_limit; - mca_btl_portals_frag_common_constructor(frag); + mca_btl_portals_frag_common_send_constructor(frag); } @@ -46,7 +48,7 @@ static void mca_btl_portals_frag_max_constructor(mca_btl_portals_frag_t* frag) { frag->size = mca_btl_portals_module.super.btl_max_send_size; - mca_btl_portals_frag_common_constructor(frag); + mca_btl_portals_frag_common_send_constructor(frag); } diff --git a/ompi/mca/btl/portals/src/btl_portals_frag.h b/ompi/mca/btl/portals/src/btl_portals_frag.h index b181690ca6..6be22fb856 100644 --- a/ompi/mca/btl/portals/src/btl_portals_frag.h +++ b/ompi/mca/btl/portals/src/btl_portals_frag.h @@ -22,6 +22,22 @@ extern "C" { #endif OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_portals_frag_t); +typedef enum { + MCA_BTL_PORTALS_FRAG_SEND, + MCA_BTL_PORTALS_FRAG_RECV +} mca_btl_portals_frag_type_t; + +struct mca_btl_portals_send_frag_t { + struct mca_btl_portals_module_t *btl; + struct mca_btl_base_endpoint_t *endpoint; + mca_btl_base_header_t hdr; +}; +typedef struct mca_btl_portals_send_frag_t mca_btl_portals_send_frag_t; + +struct mca_btl_portals_recv_frag_t { + struct mca_btl_portals_recv_chunk_t *chunk; +}; +typedef struct mca_btl_portals_recv_frag_t mca_btl_portals_recv_frag_t; /** @@ -30,10 +46,13 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_portals_frag_t); struct mca_btl_portals_frag_t { mca_btl_base_descriptor_t base; mca_btl_base_segment_t segment; - struct mca_btl_portals_module_t *btl; - struct mca_btl_base_endpoint_t *endpoint; - mca_btl_base_header_t hdr; + mca_btl_portals_frag_type_t type; size_t size; + + union { + mca_btl_portals_send_frag_t send_frag; + mca_btl_portals_recv_frag_t recv_frag; + } u; }; typedef struct mca_btl_portals_frag_t mca_btl_portals_frag_t; OBJ_CLASS_DECLARATION(mca_btl_portals_frag_t); diff --git a/ompi/mca/btl/portals/src/btl_portals_recv.c b/ompi/mca/btl/portals/src/btl_portals_recv.c index 87a8f1129a..6af0fe41f1 100644 --- a/ompi/mca/btl/portals/src/btl_portals_recv.c +++ b/ompi/mca/btl/portals/src/btl_portals_recv.c @@ -21,11 +21,69 @@ #include "btl_portals.h" #include "btl_portals_recv.h" +#include "btl_portals_frag.h" +OBJ_CLASS_INSTANCE(mca_btl_portals_recv_chunk_t, + opal_list_item_t, + NULL, NULL); + int mca_btl_portals_recv_enable(mca_btl_portals_module_t *module) { + ptl_md_t md; + ptl_handle_md_t md_h; + ptl_process_id_t any_proc = {PTL_NID_ANY, PTL_PID_ANY}; + int ret; + int i; + + /* create the reject entry */ + md.start = NULL; + md.length = 0; + md.threshold = PTL_MD_THRESH_INF; + md.max_size = 0; + md.options = PTL_MD_TRUNCATE; + md.user_ptr = NULL; + md.eq_handle = PTL_EQ_NONE; + + ret = PtlMEAttach(module->portals_ni_h, + BTL_PORTALS_SEND_TABLE_ID, + any_proc, + 0, /* match */ + 0, /* ignore */ + PTL_RETAIN, + PTL_INS_AFTER, + &(module->portals_recv_reject_me_h)); + if (PTL_OK != ret) { + opal_output(mca_btl_portals_component.portals_output, + "Error creating recv reject ME: %d", ret); + return OMPI_ERROR; + } + + ret = PtlMDAttach(module->portals_recv_reject_me_h, + md, + PTL_RETAIN, + &md_h); + if (PTL_OK != ret) { + opal_output(mca_btl_portals_component.portals_output, + "Error attaching recv reject MD: %d", ret); + mca_btl_portals_recv_disable(module); + return OMPI_ERROR; + } + + /* create the recv chunks */ + for (i = 0 ; i < module->portals_recv_mds_num ; ++i) { + mca_btl_portals_recv_chunk_t *chunk = + mca_btl_portals_recv_chunk_init(module); + if (NULL == chunk) { + mca_btl_portals_recv_disable(module); + return OMPI_ERROR; + } + opal_list_append(&(module->portals_recv_chunks), + (opal_list_item_t*) chunk); + mca_btl_portals_activate_chunk(chunk); + } + return OMPI_SUCCESS; } @@ -33,14 +91,23 @@ mca_btl_portals_recv_enable(mca_btl_portals_module_t *module) int mca_btl_portals_recv_disable(mca_btl_portals_module_t *module) { - return OMPI_SUCCESS; -} + opal_list_item_t *item; + if (opal_list_get_size(&module->portals_recv_chunks) > 0) { + while (NULL != + (item = opal_list_remove_first(&module->portals_recv_chunks))) { + mca_btl_portals_recv_chunk_t *chunk = + (mca_btl_portals_recv_chunk_t*) item; + mca_btl_portals_recv_chunk_free(chunk); + } + } + + if (PTL_INVALID_HANDLE != module->portals_recv_reject_me_h) { + /* destroy the reject entry */ + PtlMEUnlink(module->portals_recv_reject_me_h); + module->portals_recv_reject_me_h = PTL_INVALID_HANDLE; + } -int -mca_btl_portals_process_recv(mca_btl_portals_module_t *module, - ptl_event_t *ev) -{ return OMPI_SUCCESS; } @@ -69,12 +136,7 @@ mca_btl_portals_recv_chunk_init(mca_btl_portals_module_t *module) int mca_btl_portals_recv_chunk_free(mca_btl_portals_recv_chunk_t *chunk) { - - if (PTL_INVALID_HANDLE != chunk->me_h) { - PtlMEUnlink(chunk->me_h); - chunk->me_h = PTL_INVALID_HANDLE; - } - + /* need to clear out the md */ while (chunk->pending != 0) { mca_btl_portals_component_progress(); } @@ -95,6 +157,78 @@ mca_btl_portals_recv_chunk_free(mca_btl_portals_recv_chunk_t *chunk) } -OBJ_CLASS_INSTANCE(mca_btl_portals_recv_chunk_t, - opal_list_item_t, - NULL, NULL); +int +mca_btl_portals_process_recv(mca_btl_portals_module_t *module, + ptl_event_t *ev) +{ + mca_btl_portals_frag_t *frag = NULL; + mca_btl_portals_recv_chunk_t *chunk = ev->md.user_ptr; + mca_btl_base_tag_t tag = (mca_btl_base_tag_t) ev->hdr_data; + + int ret; + + switch (ev->type) { + case PTL_EVENT_PUT_START: + opal_output_verbose(90, mca_btl_portals_component.portals_output, + "recv: PTL_EVENT_PUT_START for tag %d, link %d", + tag, (int) ev->link); + + if (ev->ni_fail_type != PTL_NI_OK) { + opal_output(mca_btl_portals_component.portals_output, + "Failure to start event\n"); + + OPAL_THREAD_ADD32(&(chunk->pending), 1); + } + break; + case PTL_EVENT_PUT_END: + opal_output_verbose(90, mca_btl_portals_component.portals_output, + "recv: PTL_EVENT_PUT_END for tag %d, link %d", + tag, (int) ev->link); + + if (ev->ni_fail_type != PTL_NI_OK) { + opal_output(mca_btl_portals_component.portals_output, + "Failure to start event\n"); + return OMPI_ERROR; + } + + /* ok, we've got data */ + opal_output_verbose(95, mca_btl_portals_component.portals_output, + "received data for tag %d\n", tag); + + /* it's a user, so we have to manually setup the segment */ + MCA_BTL_PORTALS_FRAG_ALLOC_USER(module, frag, ret); + frag->type = MCA_BTL_PORTALS_FRAG_RECV; + frag->size = ev->mlength; + frag->base.des_dst = &frag->segment; + frag->base.des_dst_cnt = 1; + frag->base.des_src = NULL; + frag->base.des_src_cnt = 0; + + frag->segment.seg_addr.pval = (((char*) ev->md.start) + ev->offset); + frag->segment.seg_len = frag->size; + frag->segment.seg_key.key64 = 0; + + frag->u.recv_frag.chunk = chunk; + + if (ev->md.length - (ev->offset + ev->mlength) < ev->md.max_size) { + /* the chunk is full. It's deactivated, automagically but we + can't start it up again until everyone is done with it. + The actual reactivation and all that will happen after the + free completes the last operation... */ + chunk->full = true; + opal_atomic_mb(); + } + + module->portals_reg[tag].cbfunc(&module->super, + tag, + &frag->base, + module->portals_reg[tag].cbdata); + break; + default: + break; + } + + return OMPI_SUCCESS; +} + + diff --git a/ompi/mca/btl/portals/src/btl_portals_recv.h b/ompi/mca/btl/portals/src/btl_portals_recv.h index fb10de9d95..b939a5d258 100644 --- a/ompi/mca/btl/portals/src/btl_portals_recv.h +++ b/ompi/mca/btl/portals/src/btl_portals_recv.h @@ -17,6 +17,8 @@ #ifndef MCA_BTL_PORTALS_RECV_H #define MCA_BTL_PORTALS_RECV_H +#include "btl_portals_frag.h" + struct mca_btl_portals_recv_chunk_t { opal_list_item_t base; @@ -44,20 +46,28 @@ int mca_btl_portals_process_recv(mca_btl_portals_module_t *module, /** * Create a chunk of memory for receiving send messages. Must call * activate_chunk on the returned chunk of memory before it will be - * active with the POrtals library */ + * active with the POrtals library + * + * Module lock must be held before calling this function + */ mca_btl_portals_recv_chunk_t* mca_btl_portals_recv_chunk_init(mca_btl_portals_module_t *module); + /** * Free a chunk of memory. Will remove the match entry, then progress * Portals until the pending count is returned to 0. Will then free * all resources associated with chunk. + * + * Module lock must be held before calling this function */ int mca_btl_portals_recv_chunk_free(mca_btl_portals_recv_chunk_t *chunk); -/* + +/** * activate a chunk. Chunks that are full (have gone inactive) can be - * re-activated with this call + * re-activated with this call. There is no need to hold the lock + * before calling this function */ static inline int mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk) @@ -92,7 +102,10 @@ mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk) md.user_ptr = chunk; md.eq_handle = chunk->btl->portals_eq_handles[MCA_BTL_PORTALS_EQ_RECV]; + chunk->pending = 0; chunk->full = false; + /* make sure that everyone sees the update on full value */ + opal_atomic_mb(); ret = PtlMDAttach(chunk->me_h, md, @@ -109,4 +122,23 @@ mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk) return OMPI_SUCCESS; } + +static inline void +mca_btl_portals_return_chunk_part(mca_btl_portals_module_t *module, + mca_btl_portals_frag_t *frag) +{ + mca_btl_portals_recv_chunk_t *chunk = frag->u.recv_frag.chunk; + int ret; + + if (chunk->full == true) { + OPAL_THREAD_ADD32(&(chunk->pending), -1); + if (chunk->pending == 0) { + ret = mca_btl_portals_activate_chunk(chunk); + if (OMPI_SUCCESS != ret) { + /* BWB - now what? */ + } + } + } +} + #endif /* MCA_BTL_PORTALS_RECV_H */ diff --git a/ompi/mca/btl/portals/src/btl_portals_send.c b/ompi/mca/btl/portals/src/btl_portals_send.c index 319a4480ab..1bef4ea4f0 100644 --- a/ompi/mca/btl/portals/src/btl_portals_send.c +++ b/ompi/mca/btl/portals/src/btl_portals_send.c @@ -27,8 +27,82 @@ int mca_btl_portals_process_send(mca_btl_portals_module_t *module, ptl_event_t *ev) { - opal_output_verbose(99, mca_btl_portals_component.portals_output, - "process_send"); + mca_btl_portals_frag_t *frag = + (mca_btl_portals_frag_t*) ev->md.user_ptr; + + switch (ev->type) { + case PTL_EVENT_SEND_START: + opal_output_verbose(90, mca_btl_portals_component.portals_output, + "send: PTL_EVENT_SEND_START for 0x%x", + frag); + + if (ev->ni_fail_type != PTL_NI_OK) { + opal_output(mca_btl_portals_component.portals_output, + "Failure to start send event\n"); + frag->base.des_cbfunc(&module->super, + frag->u.send_frag.endpoint, + &frag->base, + OMPI_ERROR); + } + break; + case PTL_EVENT_SEND_END: + opal_output_verbose(90, mca_btl_portals_component.portals_output, + "send: PTL_EVENT_SEND_END for 0x%x", + frag); + + if (ev->ni_fail_type != PTL_NI_OK) { + opal_output(mca_btl_portals_component.portals_output, + "Failure to end send event\n"); + frag->base.des_cbfunc(&module->super, + frag->u.send_frag.endpoint, + &frag->base, + OMPI_ERROR); + } + break; + case PTL_EVENT_ACK: + /* ok, this is the real work - the message has been received + on the other side. If mlength == 0, that means that we hit + the reject md and we need to try to retransmit */ + + opal_output_verbose(90, mca_btl_portals_component.portals_output, + "send: PTL_EVENT_ACK for 0x%x", + frag); + + if (0 == ev->mlength) { + /* other side did not receive the message */ + + /* BWB - implement check for retransmit */ + opal_output(mca_btl_portals_component.portals_output, + "message was dropped and retransmits not implemented"); + frag->base.des_cbfunc(&module->super, + frag->u.send_frag.endpoint, + &frag->base, + OMPI_ERROR); + } else { + /* the other side received the message */ + OPAL_THREAD_ADD32(&module->portals_outstanding_sends, -1); + /* we're done with the md - return it. Do this before + anything else in case the PML releases resources, then + gets more resources (ie, what's currently in this + md) */ + PtlMDUnlink(ev->md_handle); + + /* let the PML know we're done... */ + frag->base.des_cbfunc(&module->super, + frag->u.send_frag.endpoint, + &frag->base, + OMPI_SUCCESS); + } + break; + default: + opal_output_verbose(90, mca_btl_portals_component.portals_output, + "send: unexpected event %d for 0x%x", + ev->type, frag); + + break; + } + + return OMPI_SUCCESS; } @@ -42,9 +116,17 @@ mca_btl_portals_send(struct mca_btl_base_module_t* btl, { mca_btl_portals_module_t *ptl_btl = (mca_btl_portals_module_t*) btl; mca_btl_portals_frag_t *frag = (mca_btl_portals_frag_t*) descriptor; - frag->endpoint = endpoint; - frag->hdr.tag = tag; - frag->btl = ptl_btl; + int32_t num_sends; + + frag->u.send_frag.endpoint = endpoint; + frag->u.send_frag.hdr.tag = tag; + frag->u.send_frag.btl = ptl_btl; + + num_sends = OPAL_THREAD_ADD32(&ptl_btl->portals_outstanding_sends, 1); + + /* BWB - implement check for too many pending messages */ + opal_output_verbose(90, mca_btl_portals_component.portals_output, + "send called for frag 0x%x", frag); return mca_btl_portals_send_frag(frag); } diff --git a/ompi/mca/btl/portals/src/btl_portals_send.h b/ompi/mca/btl/portals/src/btl_portals_send.h index f47e010457..2ff2357233 100644 --- a/ompi/mca/btl/portals/src/btl_portals_send.h +++ b/ompi/mca/btl/portals/src/btl_portals_send.h @@ -30,7 +30,6 @@ mca_btl_portals_send_frag(mca_btl_portals_frag_t *frag) ptl_handle_md_t md_h; int ret; - /* setup the send */ md.start = frag->segment.seg_addr.pval; md.length = frag->segment.seg_len; @@ -38,10 +37,10 @@ mca_btl_portals_send_frag(mca_btl_portals_frag_t *frag) md.max_size = 0; md.options = 0; /* BWB - can we optimize? */ md.user_ptr = frag; /* keep a pointer to ourselves */ - md.eq_handle = frag->btl->portals_eq_handles[MCA_BTL_PORTALS_EQ_SEND]; + md.eq_handle = frag->u.send_frag.btl->portals_eq_handles[MCA_BTL_PORTALS_EQ_SEND]; /* make a free-floater */ - ret = PtlMDBind(frag->btl->portals_ni_h, + ret = PtlMDBind(frag->u.send_frag.btl->portals_ni_h, md, PTL_UNLINK, &md_h); @@ -53,12 +52,12 @@ mca_btl_portals_send_frag(mca_btl_portals_frag_t *frag) ret = PtlPut(md_h, PTL_ACK_REQ, - frag->endpoint->endpoint_ptl_id, + frag->u.send_frag.endpoint->endpoint_ptl_id, BTL_PORTALS_SEND_TABLE_ID, - 0, /* ac_index */ - 0, /* match bits */ + 0, /* ac_index - not used*/ + frag->segment.seg_key.key64, /* match bits */ 0, /* remote offset - not used */ - frag->hdr.tag); /* hdr_data - tag */ + frag->u.send_frag.hdr.tag); /* hdr_data - tag */ if (ret != PTL_OK) { opal_output(mca_btl_portals_component.portals_output, "PtlPut failed with error %d", ret);