diff --git a/ompi/mca/coll/hcoll/coll_hcoll.h b/ompi/mca/coll/hcoll/coll_hcoll.h index edd32b721c..e2215a4534 100644 --- a/ompi/mca/coll/hcoll/coll_hcoll.h +++ b/ompi/mca/coll/hcoll/coll_hcoll.h @@ -61,6 +61,12 @@ struct mca_coll_hcoll_component_t { /** MCA parameter: Enable FCA */ int hcoll_enable; + /** r/o MCA parameter: libhcoll compiletime version */ + char* compiletime_version; + + /** r/o MCA parameter: libhcoll runtime version */ + const char* runtime_version; + /** MCA parameter: Minimal number of processes in the communicator for the corresponding hcoll context to be created */ int hcoll_np; @@ -73,12 +79,6 @@ struct mca_coll_hcoll_component_t { /** MCA parameter: ON/OFF user defined datatype through HCOLL */ int hcoll_datatype_fallback; - /** r/o MCA parameter: libhcoll runtime version */ - const char* runtime_version; - - /** r/o MCA parameter: libhcoll compiletime version */ - char* compiletime_version; - /* FCA global stuff */ mca_coll_hcoll_ops_t hcoll_ops; ompi_free_list_t requests; @@ -132,6 +132,8 @@ struct mca_coll_hcoll_module_t { mca_coll_base_module_t *previous_iallgather_module; mca_coll_base_module_iallreduce_fn_t previous_iallreduce; mca_coll_base_module_t *previous_iallreduce_module; + mca_coll_base_module_igatherv_fn_t previous_igatherv; + mca_coll_base_module_t *previous_igatherv_module; }; typedef struct mca_coll_hcoll_module_t mca_coll_hcoll_module_t; @@ -170,7 +172,6 @@ int mca_coll_hcoll_gather(void *sbuf, int scount, struct ompi_communicator_t *comm, mca_coll_base_module_t *module); - int mca_coll_hcoll_allreduce(void *sbuf, void *rbuf, int count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, @@ -184,6 +185,14 @@ int mca_coll_hcoll_alltoall(void *sbuf, int scount, struct ompi_communicator_t *comm, mca_coll_base_module_t *module); +int mca_coll_hcoll_gatherv(void* sbuf, int scount, + struct ompi_datatype_t *sdtype, + void* rbuf, int *rcounts, int *displs, + struct ompi_datatype_t *rdtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module); + int mca_coll_hcoll_ibarrier(struct ompi_communicator_t *comm, ompi_request_t** request, mca_coll_base_module_t *module); @@ -208,6 +217,16 @@ int mca_coll_hcoll_iallreduce(void *sbuf, void *rbuf, int count, struct ompi_communicator_t *comm, ompi_request_t** request, mca_coll_base_module_t *module); + +int mca_coll_hcoll_igatherv(void* sbuf, int scount, + struct ompi_datatype_t *sdtype, + void* rbuf, int *rcounts, int *displs, + struct ompi_datatype_t *rdtype, + int root, + struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t *module); + int mca_coll_hcoll_progress(void); void mca_coll_hcoll_mem_release_cb(void *buf, size_t length, void *cbdata, bool from_alloc); END_C_DECLS diff --git a/ompi/mca/coll/hcoll/coll_hcoll_module.c b/ompi/mca/coll/hcoll/coll_hcoll_module.c index 394ed79496..1fdfe38d28 100644 --- a/ompi/mca/coll/hcoll/coll_hcoll_module.c +++ b/ompi/mca/coll/hcoll/coll_hcoll_module.c @@ -40,6 +40,7 @@ static void mca_coll_hcoll_module_clear(mca_coll_hcoll_module_t *hcoll_module) hcoll_module->previous_ibcast = NULL; hcoll_module->previous_iallreduce = NULL; hcoll_module->previous_iallgather = NULL; + hcoll_module->previous_igatherv = NULL; } static void mca_coll_hcoll_module_construct(mca_coll_hcoll_module_t *hcoll_module) @@ -77,11 +78,13 @@ static void mca_coll_hcoll_module_destruct(mca_coll_hcoll_module_t *hcoll_module OBJ_RELEASE(hcoll_module->previous_bcast_module); OBJ_RELEASE(hcoll_module->previous_allreduce_module); OBJ_RELEASE(hcoll_module->previous_allgather_module); + OBJ_RELEASE(hcoll_module->previous_gatherv_module); OBJ_RELEASE(hcoll_module->previous_ibarrier_module); OBJ_RELEASE(hcoll_module->previous_ibcast_module); OBJ_RELEASE(hcoll_module->previous_iallreduce_module); OBJ_RELEASE(hcoll_module->previous_iallgather_module); + OBJ_RELEASE(hcoll_module->previous_igatherv_module); /* OBJ_RELEASE(hcoll_module->previous_allgatherv_module); @@ -122,11 +125,13 @@ static int mca_coll_hcoll_save_coll_handlers(mca_coll_hcoll_module_t *hcoll_modu HCOL_SAVE_PREV_COLL_API(bcast); HCOL_SAVE_PREV_COLL_API(allreduce); HCOL_SAVE_PREV_COLL_API(allgather); + HCOL_SAVE_PREV_COLL_API(gatherv); HCOL_SAVE_PREV_COLL_API(ibarrier); HCOL_SAVE_PREV_COLL_API(ibcast); HCOL_SAVE_PREV_COLL_API(iallreduce); HCOL_SAVE_PREV_COLL_API(iallgather); + HCOL_SAVE_PREV_COLL_API(igatherv); /* These collectives are not yet part of hcoll, so @@ -135,7 +140,6 @@ static int mca_coll_hcoll_save_coll_handlers(mca_coll_hcoll_module_t *hcoll_modu HCOL_SAVE_PREV_COLL_API(gather); HCOL_SAVE_PREV_COLL_API(reduce); HCOL_SAVE_PREV_COLL_API(allgatherv); - HCOL_SAVE_PREV_COLL_API(gatherv); HCOL_SAVE_PREV_COLL_API(alltoall); HCOL_SAVE_PREV_COLL_API(alltoallv); HCOL_SAVE_PREV_COLL_API(alltoallw); @@ -294,11 +298,13 @@ mca_coll_hcoll_comm_query(struct ompi_communicator_t *comm, int *priority) hcoll_module->super.coll_allgather = hcoll_collectives.coll_allgather ? mca_coll_hcoll_allgather : NULL; hcoll_module->super.coll_allreduce = hcoll_collectives.coll_allreduce ? mca_coll_hcoll_allreduce : NULL; hcoll_module->super.coll_alltoall = /*hcoll_collectives.coll_alltoall ? mca_coll_hcoll_alltoall : */ NULL; + hcoll_module->super.coll_gatherv = hcoll_collectives.coll_gatherv ? mca_coll_hcoll_gatherv : NULL; hcoll_module->super.coll_ibarrier = hcoll_collectives.coll_ibarrier ? mca_coll_hcoll_ibarrier : NULL; hcoll_module->super.coll_ibcast = hcoll_collectives.coll_ibcast ? mca_coll_hcoll_ibcast : NULL; hcoll_module->super.coll_iallgather = hcoll_collectives.coll_iallgather ? mca_coll_hcoll_iallgather : NULL; hcoll_module->super.coll_iallreduce = hcoll_collectives.coll_iallreduce ? mca_coll_hcoll_iallreduce : NULL; hcoll_module->super.coll_gather = /*hcoll_collectives.coll_gather ? mca_coll_hcoll_gather :*/ NULL; + hcoll_module->super.coll_igatherv = hcoll_collectives.coll_igatherv ? mca_coll_hcoll_igatherv : NULL; *priority = cm->hcoll_priority; module = &hcoll_module->super; diff --git a/ompi/mca/coll/hcoll/coll_hcoll_ops.c b/ompi/mca/coll/hcoll/coll_hcoll_ops.c index 5a80bf2a8d..621e83f157 100644 --- a/ompi/mca/coll/hcoll/coll_hcoll_ops.c +++ b/ompi/mca/coll/hcoll/coll_hcoll_ops.c @@ -216,6 +216,44 @@ int mca_coll_hcoll_alltoall(void *sbuf, int scount, return rc; } +int mca_coll_hcoll_gatherv(void* sbuf, int scount, + struct ompi_datatype_t *sdtype, + void* rbuf, int *rcounts, int *displs, + struct ompi_datatype_t *rdtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + dte_data_representation_t stype; + dte_data_representation_t rtype; + int rc; + HCOL_VERBOSE(20,"RUNNING HCOL GATHERV"); + mca_coll_hcoll_module_t *hcoll_module = (mca_coll_hcoll_module_t*)module; + stype = ompi_dtype_2_dte_dtype(sdtype); + rtype = ompi_dtype_2_dte_dtype(rdtype); + if (OPAL_UNLIKELY(HCOL_DTE_IS_ZERO(stype) || HCOL_DTE_IS_ZERO(rtype))){ + /*If we are here then datatype is not simple predefined datatype */ + /*In future we need to add more complex mapping to the dte_data_representation_t */ + /* Now use fallback */ + HCOL_VERBOSE(20,"Ompi_datatype is not supported: sdtype = %s, rdtype = %s; calling fallback gatherv;", + sdtype->super.name, + rdtype->super.name); + rc = hcoll_module->previous_gatherv(sbuf,scount,sdtype, + rbuf, rcounts, displs, rdtype,root, + comm, hcoll_module->previous_gatherv_module); + return rc; + } + rc = hcoll_collectives.coll_gatherv(sbuf,scount,stype,rbuf,rcounts,displs, rtype, root, hcoll_module->hcoll_context); + if (HCOLL_SUCCESS != rc){ + HCOL_VERBOSE(20,"RUNNING FALLBACK GATHERV"); + rc = hcoll_module->previous_gatherv(sbuf,scount,sdtype, + rbuf, rcounts, displs, rdtype,root, + comm, hcoll_module->previous_igatherv_module); + } + return rc; + +} + int mca_coll_hcoll_ibarrier(struct ompi_communicator_t *comm, ompi_request_t ** request, mca_coll_base_module_t *module) @@ -357,3 +395,46 @@ int mca_coll_hcoll_iallreduce(void *sbuf, void *rbuf, int count, return rc; } +int mca_coll_hcoll_igatherv(void* sbuf, int scount, + struct ompi_datatype_t *sdtype, + void* rbuf, int *rcounts, int *displs, + struct ompi_datatype_t *rdtype, + int root, + struct ompi_communicator_t *comm, + ompi_request_t ** request, + mca_coll_base_module_t *module) +{ + dte_data_representation_t stype; + dte_data_representation_t rtype; + int rc; + void** rt_handle; + HCOL_VERBOSE(20,"RUNNING HCOL IGATHERV"); + mca_coll_hcoll_module_t *hcoll_module = (mca_coll_hcoll_module_t*)module; + rt_handle = (void**) request; + stype = ompi_dtype_2_dte_dtype(sdtype); + rtype = ompi_dtype_2_dte_dtype(rdtype); + if (OPAL_UNLIKELY(HCOL_DTE_IS_ZERO(stype) || HCOL_DTE_IS_ZERO(rtype))){ + /*If we are here then datatype is not simple predefined datatype */ + /*In future we need to add more complex mapping to the dte_data_representation_t */ + /* Now use fallback */ + HCOL_VERBOSE(20,"Ompi_datatype is not supported: sdtype = %s, rdtype = %s; calling fallback igatherv;", + sdtype->super.name, + rdtype->super.name); + rc = hcoll_module->previous_igatherv(sbuf,scount,sdtype, + rbuf, rcounts, displs, rdtype,root, + comm, request, + hcoll_module->previous_igatherv_module); + return rc; + } + rc = hcoll_collectives.coll_igatherv(sbuf,scount,stype,rbuf,rcounts,displs, rtype, root, hcoll_module->hcoll_context, rt_handle); + if (HCOLL_SUCCESS != rc){ + HCOL_VERBOSE(20,"RUNNING FALLBACK IGATHERV"); + rc = hcoll_module->previous_igatherv(sbuf,scount,sdtype, + rbuf, rcounts, displs, rdtype,root, + comm, request, + hcoll_module->previous_igatherv_module); + } + return rc; + +} + diff --git a/ompi/mca/coll/hcoll/coll_hcoll_rte.c b/ompi/mca/coll/hcoll/coll_hcoll_rte.c index 62a7b33f1b..4d6c8acc60 100644 --- a/ompi/mca/coll/hcoll/coll_hcoll_rte.c +++ b/ompi/mca/coll/hcoll/coll_hcoll_rte.c @@ -130,7 +130,7 @@ void hcoll_rte_fns_setup(void) 0, 0, /* NOTE: hack - need to parametrize this */ 10, - 50, + -1, 10, /* No Mpool */ NULL,