diff --git a/ompi/mca/pml/yalla/Makefile.am b/ompi/mca/pml/yalla/Makefile.am new file mode 100644 index 0000000000..a0f6487476 --- /dev/null +++ b/ompi/mca/pml/yalla/Makefile.am @@ -0,0 +1,45 @@ +# +# Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +AM_CPPFLAGS = $(pml_yalla_CPPFLAGS) + +local_sources = \ + pml_yalla.h \ + pml_yalla.c \ + pml_yalla_request.h \ + pml_yalla_request.c \ + pml_yalla_datatype.h \ + pml_yalla_datatype.c \ + pml_yalla_freelist.h \ + pml_yalla_component.c + +if MCA_BUILD_ompi_pml_yalla_DSO +component_noinst = +component_install = mca_pml_yalla.la +else +component_noinst = libmca_pml_yalla.la +component_install = +endif + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_pml_yalla_la_SOURCES = $(local_sources) +mca_pml_yalla_la_LIBADD = $(pml_yalla_LIBS) +mca_pml_yalla_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_pml_yalla_la_SOURCES = $(local_sources) +libmca_pml_yalla_la_LIBADD = $(pml_yalla_LIBS) +libmca_pml_yalla_la_LDFLAGS = -module -avoid-version + diff --git a/ompi/mca/pml/yalla/configure.m4 b/ompi/mca/pml/yalla/configure.m4 new file mode 100644 index 0000000000..5a44f8d981 --- /dev/null +++ b/ompi/mca/pml/yalla/configure.m4 @@ -0,0 +1,31 @@ +# +# Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + + +AC_DEFUN([MCA_ompi_pml_yalla_POST_CONFIG], [ + AS_IF([test "$1" = "1"], [OMPI_REQUIRE_ENDPOINT_TAG([PML])]) +]) + +AC_DEFUN([MCA_ompi_pml_yalla_CONFIG], [ + AC_CONFIG_FILES([ompi/mca/pml/yalla/Makefile]) + + OMPI_CHECK_MXM([pml_yalla], + [pml_yalla_happy="yes"], + [pml_yalla_happy="no"]) + + AS_IF([test "$pml_yalla_happy" = "yes"], + [$1], + [$2]) + + # substitute in the things needed to build mxm + AC_SUBST([pml_yalla_CFLAGS]) + AC_SUBST([pml_yalla_CPPFLAGS]) + AC_SUBST([pml_yalla_LDFLAGS]) + AC_SUBST([pml_yalla_LIBS]) +]) diff --git a/ompi/mca/pml/yalla/pml_yalla.c b/ompi/mca/pml/yalla/pml_yalla.c new file mode 100644 index 0000000000..884c2aff36 --- /dev/null +++ b/ompi/mca/pml/yalla/pml_yalla.c @@ -0,0 +1,698 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +* This software product is a proprietary product of Mellanox Technologies Ltd. +* (the "Company") and all right, title, and interest and to the software product, +* including all associated intellectual property rights, are and shall +* remain exclusively with the Company. +* +* This software product is governed by the End User License Agreement +* provided with the software product. +* $COPYRIGHT$ +* $HEADER$ +*/ + + +#include "pml_yalla.h" +#include "pml_yalla_request.h" + +#include "opal/runtime/opal.h" +#include "opal/memoryhooks/memory.h" +#include "opal/mca/pmix/pmix.h" +#include "ompi/mca/pml/base/pml_base_bsend.h" +#include "ompi/message/message.h" + +#define MODEX_KEY "yalla-mxm" + +mca_pml_yalla_module_t ompi_pml_yalla = { + { + mca_pml_yalla_add_procs, + mca_pml_yalla_del_procs, + mca_pml_yalla_enable, + NULL, + mca_pml_yalla_add_comm, + mca_pml_yalla_del_comm, + mca_pml_yalla_irecv_init, + mca_pml_yalla_irecv, + mca_pml_yalla_recv, + mca_pml_yalla_isend_init, + mca_pml_yalla_isend, + mca_pml_yalla_send, + mca_pml_yalla_iprobe, + mca_pml_yalla_probe, + mca_pml_yalla_start, + mca_pml_yalla_improbe, + mca_pml_yalla_mprobe, + mca_pml_yalla_imrecv, + mca_pml_yalla_mrecv, + mca_pml_yalla_dump, + NULL, /* FT */ + 1ul << (sizeof(mxm_ctxid_t)*8) - 1, + 1ul << (sizeof(mxm_tag_t)*8 - 1) - 1, + }, + NULL, + NULL +}; + +static int send_ep_address(void) +{ + mxm_error_t error; + void *address; + size_t addrlen; + int rc; + + addrlen = 0; + mxm_ep_get_address(ompi_pml_yalla.mxm_ep, NULL, &addrlen); + + address = alloca(addrlen); + error = mxm_ep_get_address(ompi_pml_yalla.mxm_ep, address, &addrlen); + if (MXM_OK != error) { + PML_YALLA_ERROR("Failed to get EP address"); + return OMPI_ERROR; + } + + OPAL_MODEX_SEND(rc, PMIX_SYNC_REQD, PMIX_GLOBAL, + &mca_pml_yalla_component.pmlm_version, address, addrlen); + if (OMPI_SUCCESS != rc) { + PML_YALLA_ERROR("Open MPI couldn't distribute EP connection details"); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +static int recv_ep_address(ompi_proc_t *proc, void **address_p, size_t *addrlen_p) +{ + int rc; + + OPAL_MODEX_RECV(rc, &mca_pml_yalla_component.pmlm_version, &proc->super, + address_p, addrlen_p); + if (rc < 0) { + PML_YALLA_ERROR("Failed to receive EP address"); + } + return rc; +} + +static void mca_pml_yalla_mem_release_cb(void *buf, size_t length, + void *cbdata, bool from_alloc) +{ + mxm_mem_unmap(ompi_pml_yalla.mxm_context, buf, length, + from_alloc ? MXM_MEM_UNMAP_MARK_INVALID : 0); +} + +int mca_pml_yalla_init(void) +{ + mxm_context_opts_t *ctx_opts; + mxm_ep_opts_t *ep_opts; + mxm_error_t error; + int rc; + + PML_YALLA_VERBOSE(1, "mca_pml_yalla_init"); + + /* Set memory hooks */ + if ((OPAL_MEMORY_FREE_SUPPORT | OPAL_MEMORY_MUNMAP_SUPPORT) == + ((OPAL_MEMORY_FREE_SUPPORT | OPAL_MEMORY_MUNMAP_SUPPORT) & + opal_mem_hooks_support_level())) + { + PML_YALLA_VERBOSE(1, "enabling on-demand memory mapping"); + setenv("MXM_PML_MEM_ON_DEMAND_MAP", "y", 0); + ompi_pml_yalla.using_mem_hooks = 1; + } else { + PML_YALLA_VERBOSE(1, "disabling on-demand memory mapping"); + ompi_pml_yalla.using_mem_hooks = 0; + } + setenv("MXM_PML_SINGLE_THREAD", ompi_mpi_thread_multiple ? "n" : "y" , 0); + + /* Read options */ + error = mxm_config_read_opts(&ctx_opts, &ep_opts, "PML", NULL, 0); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + error = mxm_init(ctx_opts, &ompi_pml_yalla.mxm_context); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + if (ompi_pml_yalla.using_mem_hooks) { + opal_mem_hooks_register_release(mca_pml_yalla_mem_release_cb, NULL); + } + + error = mxm_ep_create(ompi_pml_yalla.mxm_context, ep_opts, &ompi_pml_yalla.mxm_ep); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + mxm_config_free_context_opts(ctx_opts); + mxm_config_free_ep_opts(ep_opts); + + rc = send_ep_address(); + if (rc < 0) { + return rc; + } + + OBJ_CONSTRUCT(&ompi_pml_yalla.send_reqs, mca_pml_yalla_freelist_t); + OBJ_CONSTRUCT(&ompi_pml_yalla.bsend_reqs, mca_pml_yalla_freelist_t); + OBJ_CONSTRUCT(&ompi_pml_yalla.recv_reqs, mca_pml_yalla_freelist_t); + OBJ_CONSTRUCT(&ompi_pml_yalla.convs, mca_pml_yalla_freelist_t); + + opal_progress_register(mca_pml_yalla_progress); + + PML_YALLA_VERBOSE(2, "created mxm context %p ep %p", ompi_pml_yalla.mxm_context, + ompi_pml_yalla.mxm_ep); + return OMPI_SUCCESS; +} + +int mca_pml_yalla_cleanup(void) +{ + PML_YALLA_VERBOSE(1, "mca_pml_yalla_cleanup"); + + opal_progress_unregister(mca_pml_yalla_progress); + + OBJ_DESTRUCT(&ompi_pml_yalla.convs); + OBJ_DESTRUCT(&ompi_pml_yalla.recv_reqs); + OBJ_DESTRUCT(&ompi_pml_yalla.bsend_reqs); + OBJ_DESTRUCT(&ompi_pml_yalla.send_reqs); + + if (ompi_pml_yalla.mxm_ep) { + mxm_ep_destroy(ompi_pml_yalla.mxm_ep); + ompi_pml_yalla.mxm_ep = NULL; + } + if (ompi_pml_yalla.using_mem_hooks) { + opal_mem_hooks_unregister_release(mca_pml_yalla_mem_release_cb); + } + if (ompi_pml_yalla.mxm_context) { + mxm_cleanup(ompi_pml_yalla.mxm_context); + ompi_pml_yalla.mxm_context = NULL; + } + return OMPI_SUCCESS; +} + +int mca_pml_yalla_add_procs(struct ompi_proc_t **procs, size_t nprocs) +{ + size_t i; + int ret; + void *address; + mxm_conn_h conn; + size_t addrlen; + mxm_error_t error; + + if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("yalla", + procs, + nprocs))) { + return ret; + } + + for (i = 0; i < nprocs; ++i) { + ret = recv_ep_address(procs[i], &address, &addrlen); + if (ret < 0) { + return ret; + } + + if (procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]) { + PML_YALLA_VERBOSE(3, "already connected to proc. %ld", + procs[i]->super.proc_name); + continue; + } + + PML_YALLA_VERBOSE(2, "connecting to proc. %ld", + procs[i]->super.proc_name); + error = mxm_ep_connect(ompi_pml_yalla.mxm_ep, address, &conn); + free(address); + + if (MXM_OK != error) { + PML_YALLA_ERROR("Failed to connect"); + return OMPI_ERROR; + } + + procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = conn; + } + + return OMPI_SUCCESS; +} + +int mca_pml_yalla_del_procs(struct ompi_proc_t **procs, size_t nprocs) +{ + size_t i; + + if (ompi_mpi_finalized) { + PML_YALLA_VERBOSE(3, "using bulk powerdown"); + mxm_ep_powerdown(ompi_pml_yalla.mxm_ep); + } + + for (i = 0; i < nprocs; ++i) { + mxm_ep_disconnect(procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]); + PML_YALLA_VERBOSE(2, "disconnected from rank %ld", procs[i]->super.proc_name); + procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = NULL; + } + return OMPI_SUCCESS; +} + +int mca_pml_yalla_enable(bool enable) +{ + mca_pml_yalla_init_reqs(); + mca_pml_yalla_init_datatype(); + return OMPI_SUCCESS; +} + +int mca_pml_yalla_progress(void) +{ + mxm_progress(ompi_pml_yalla.mxm_context); + return OMPI_SUCCESS; +} + +int mca_pml_yalla_add_comm(struct ompi_communicator_t* comm) +{ + mxm_error_t error; + mxm_mq_h mq; + + error = mxm_mq_create(ompi_pml_yalla.mxm_context, comm->c_contextid, &mq); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + comm->c_pml_comm = (void*)mq; + PML_YALLA_VERBOSE(2, "created mq ctxid %d for comm %s", comm->c_contextid, + comm->c_name); + return OMPI_SUCCESS; +} + +int mca_pml_yalla_del_comm(struct ompi_communicator_t* comm) +{ + mxm_mq_h mq = (void*)comm->c_pml_comm; + + PML_YALLA_VERBOSE(2, "destroying mq ctxid %d of comm %s", comm->c_contextid, + comm->c_name); + mxm_mq_destroy(mq); + return OMPI_SUCCESS; +} + +int mca_pml_yalla_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype, + int src, int tag, struct ompi_communicator_t* comm, + struct ompi_request_t **request) +{ + mca_pml_yalla_recv_request_t *rreq; + + rreq = MCA_PML_YALLA_RREQ_INIT(buf, count, datatype, src, tag, comm, + OMPI_REQUEST_INACTIVE); + rreq->super.ompi.req_persistent = true; + rreq->super.flags = 0; + *request = &rreq->super.ompi; + PML_YALLA_VERBOSE(9, "init recv request %p src %d tag %d comm %s", *request, + src, tag, comm->c_name); + return OMPI_SUCCESS; +} + +int mca_pml_yalla_irecv(void *buf, size_t count, ompi_datatype_t *datatype, + int src, int tag, struct ompi_communicator_t* comm, + struct ompi_request_t **request) +{ + mca_pml_yalla_recv_request_t *rreq; + mxm_error_t error; + + rreq = MCA_PML_YALLA_RREQ_INIT(buf, count, datatype, src, tag, comm, + OMPI_REQUEST_ACTIVE); + rreq->super.ompi.req_persistent = false; + rreq->super.flags = 0; + + PML_YALLA_VERBOSE(8, "receive request *%p=%p from %d tag %d dtype %s count %Zu", + request, rreq, src, tag, datatype->name, count); + + error = mxm_req_recv(&rreq->mxm); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + *request = &rreq->super.ompi; + return OMPI_SUCCESS; +} + +int mca_pml_yalla_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src, + int tag, struct ompi_communicator_t* comm, + ompi_status_public_t* status) +{ + mxm_recv_req_t rreq; + mxm_error_t error; + + PML_YALLA_INIT_MXM_RECV_REQ(&rreq, buf, count, datatype, src, tag, comm, recv); + PML_YALLA_INIT_BLOCKING_MXM_RECV_REQ(&rreq); + + PML_YALLA_VERBOSE(8, "receive from %d tag %d dtype %s count %Zu", src, tag, + datatype->name, count); + + error = mxm_req_recv(&rreq); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + PML_YALLA_WAIT_MXM_REQ(&rreq.base); + PML_YALLA_VERBOSE(8, "receive completed with status %s source %d rtag %d(%d/0x%x) len %Zu", + mxm_error_string(rreq.base.error), + rreq.completion.sender_imm, rreq.completion.sender_tag, + rreq.tag, rreq.tag_mask, + rreq.completion.actual_len); + PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.actual_len, status); + PML_YALLA_FREE_BLOCKING_MXM_REQ(&rreq.base); + + return OMPI_SUCCESS; +} + +int mca_pml_yalla_isend_init(void *buf, size_t count, ompi_datatype_t *datatype, + int dst, int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request) +{ + mca_pml_yalla_send_request_t *sreq; + + sreq = MCA_PML_YALLA_SREQ_INIT(buf, count, datatype, dst, tag, mode, comm, + OMPI_REQUEST_INACTIVE); + sreq->super.ompi.req_persistent = true; + sreq->super.flags = MCA_PML_YALLA_REQUEST_FLAG_SEND; + if (mode == MCA_PML_BASE_SEND_BUFFERED) { + sreq->super.flags |= MCA_PML_YALLA_REQUEST_FLAG_BSEND; + } + + *request = &sreq->super.ompi; + PML_YALLA_VERBOSE(9, "init send request %p dst %d tag %d comm %s", *request, + dst, tag, comm->c_name); + return OMPI_SUCCESS; +} + +static int mca_pml_yalla_bsend(mxm_send_req_t *mxm_sreq) +{ + mca_pml_yalla_bsend_request_t *bsreq = PML_YALLA_FREELIST_GET(&ompi_pml_yalla.bsend_reqs); + mxm_error_t error; + size_t length; + + /* Create a new send request using MPI internal buffer */ + + bsreq->mxm.base.state = mxm_sreq->base.state; + bsreq->mxm.base.mq = mxm_sreq->base.mq; + bsreq->mxm.base.conn = mxm_sreq->base.conn; + + bsreq->mxm.base.data_type = MXM_REQ_DATA_BUFFER; + switch (mxm_sreq->base.data_type) { + case MXM_REQ_DATA_BUFFER: + length = mxm_sreq->base.data.buffer.length; + bsreq->mxm.base.data.buffer.ptr = mca_pml_base_bsend_request_alloc_buf(length); + bsreq->mxm.base.data.buffer.length = length; + memcpy(bsreq->mxm.base.data.buffer.ptr, mxm_sreq->base.data.buffer.ptr, length); + break; + case MXM_REQ_DATA_STREAM: + length = mxm_sreq->base.data.stream.length; + bsreq->mxm.base.data.buffer.ptr = mca_pml_base_bsend_request_alloc_buf(length); + bsreq->mxm.base.data.buffer.length = length; + mxm_sreq->base.data.stream.cb(bsreq->mxm.base.data.buffer.ptr, length, + 0, mxm_sreq->base.context); + break; + default: + return OMPI_ERROR; + } + + bsreq->mxm.opcode = mxm_sreq->opcode; + bsreq->mxm.flags = mxm_sreq->flags; + bsreq->mxm.op.send = mxm_sreq->op.send; + + error = mxm_req_send(&bsreq->mxm); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + /* Make the completion handler believe it's ok to release the original request */ + mxm_sreq->base.state = MXM_REQ_COMPLETED; + + return OMPI_SUCCESS; +} + +int mca_pml_yalla_isend(void *buf, size_t count, ompi_datatype_t *datatype, + int dst, int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request) +{ + mca_pml_yalla_send_request_t *sreq; + mxm_error_t error; + int rc; + + sreq = MCA_PML_YALLA_SREQ_INIT(buf, count, datatype, dst, tag, mode, comm, + OMPI_REQUEST_ACTIVE); + sreq->super.ompi.req_persistent = false; + sreq->super.flags = 0; + + PML_YALLA_VERBOSE(8, "send request *%p=%p to %d mode %d tag %d dtype %s count %Zu", + request, sreq, dst, mode, tag, datatype->name, count); + + if (mode == MCA_PML_BASE_SEND_BUFFERED) { + rc = mca_pml_yalla_bsend(&sreq->mxm); + ompi_request_complete(&sreq->super.ompi, true); + *request = &sreq->super.ompi; + return rc; + } + + error = mxm_req_send(&sreq->mxm); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + *request = &sreq->super.ompi; + return OMPI_SUCCESS; +} + +int mca_pml_yalla_send(void *buf, size_t count, ompi_datatype_t *datatype, int dst, + int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm) +{ + mxm_send_req_t sreq; + mxm_error_t error; + + PML_YALLA_INIT_MXM_SEND_REQ(&sreq, buf, count, datatype, dst, tag, mode, comm, send); + PML_YALLA_INIT_BLOCKING_MXM_SEND_REQ(&sreq); + + PML_YALLA_VERBOSE(8, "send to %d tag %d dtype %s count %Zu", dst, tag, + datatype->name, count); + + if (mode == MCA_PML_BASE_SEND_BUFFERED) { + return mca_pml_yalla_bsend(&sreq); + } + + error = mxm_req_send(&sreq); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + PML_YALLA_WAIT_MXM_REQ(&sreq.base); + if (MXM_OK != sreq.base.error) { + return OMPI_ERROR; + } + + PML_YALLA_FREE_BLOCKING_MXM_REQ(&sreq.base); + + return OMPI_SUCCESS; +} + +int mca_pml_yalla_iprobe(int src, int tag, struct ompi_communicator_t* comm, + int *matched, ompi_status_public_t* status) +{ + mxm_recv_req_t rreq; + mxm_error_t error; + + PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm); + + error = mxm_req_probe(&rreq); + switch (error) { + case MXM_OK: + *matched = 1; + PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status); + return OMPI_SUCCESS; + case MXM_ERR_NO_MESSAGE: + *matched = 0; + return OMPI_SUCCESS; + default: + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +int mca_pml_yalla_probe(int src, int tag, struct ompi_communicator_t* comm, + ompi_status_public_t* status) +{ + mxm_recv_req_t rreq; + mxm_error_t error; + + PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm); + for (;;) { + error = mxm_req_probe(&rreq); + switch (error) { + case MXM_OK: + PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status); + return OMPI_SUCCESS; + case MXM_ERR_NO_MESSAGE: + continue; + default: + return OMPI_ERROR; + } + + opal_progress(); + } +} + +int mca_pml_yalla_improbe(int src, int tag, struct ompi_communicator_t* comm, + int *matched, struct ompi_message_t **message, + ompi_status_public_t* status) +{ + mxm_recv_req_t rreq; + mxm_message_h mxm_msg; + mxm_error_t error; + + PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm); + + error = mxm_req_mprobe(&rreq, &mxm_msg); + switch (error) { + case MXM_OK: + *matched = 1; + PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status); + PML_YALLA_SET_MESSAGE(&rreq, comm, mxm_msg, message); + return OMPI_SUCCESS; + case MXM_ERR_NO_MESSAGE: + *matched = 0; + return OMPI_SUCCESS; + default: + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +int mca_pml_yalla_mprobe(int src, int tag, struct ompi_communicator_t* comm, + struct ompi_message_t **message, + ompi_status_public_t* status) +{ + mxm_recv_req_t rreq; + mxm_message_h mxm_msg; + mxm_error_t error; + + PML_YALLA_INIT_MXM_PROBE_REQ(&rreq, src, tag, comm); + for (;;) { + error = mxm_req_mprobe(&rreq, &mxm_msg); + switch (error) { + case MXM_OK: + PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.sender_len, status); + PML_YALLA_SET_MESSAGE(&rreq, comm, mxm_msg, message); + return OMPI_SUCCESS; + case MXM_ERR_NO_MESSAGE: + continue; + default: + return OMPI_ERROR; + } + + opal_progress(); + } + return OMPI_SUCCESS; +} + +int mca_pml_yalla_imrecv(void *buf, size_t count, ompi_datatype_t *datatype, + struct ompi_message_t **message, + struct ompi_request_t **request) +{ + mca_pml_yalla_recv_request_t *rreq; + mxm_error_t error; + + rreq = MCA_PML_YALLA_RREQ_INIT(buf, count, datatype, -1, 0, (*message)->comm, + OMPI_REQUEST_ACTIVE); + rreq->super.ompi.req_persistent = false; + rreq->super.flags = 0; + + PML_YALLA_VERBOSE(8, "receive request *%p=%p message *%p=%p dtype %s count %Zu", + request, rreq, message, *message, datatype->name, count); + + error = mxm_message_recv(&rreq->mxm, (*message)->req_ptr); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + PML_YALLA_MESSAGE_RELEASE(message); + + *request = &rreq->super.ompi; + return OMPI_SUCCESS; +} + +int mca_pml_yalla_mrecv(void *buf, size_t count, ompi_datatype_t *datatype, + struct ompi_message_t **message, + ompi_status_public_t* status) +{ + mxm_recv_req_t rreq; + mxm_error_t error; + + PML_YALLA_INIT_MXM_RECV_REQ(&rreq, buf, count, datatype, -1, 0, (*message)->comm, recv); + PML_YALLA_INIT_BLOCKING_MXM_RECV_REQ(&rreq); + + PML_YALLA_VERBOSE(8, "receive message *%p=%p dtype %s count %Zu", message, + *message, datatype->name, count); + + error = mxm_message_recv(&rreq, (*message)->req_ptr); + if (MXM_OK != error) { + return OMPI_ERROR; + } + + PML_YALLA_MESSAGE_RELEASE(message); + + PML_YALLA_WAIT_MXM_REQ(&rreq.base); + PML_YALLA_VERBOSE(8, "receive completed with status %s source %d rtag %d(%d/0x%x) len %Zu", + mxm_error_string(rreq.base.error), + rreq.completion.sender_imm, rreq.completion.sender_tag, + rreq.tag, rreq.tag_mask, + rreq.completion.actual_len); + PML_YALLA_SET_RECV_STATUS(&rreq, rreq.completion.actual_len, status); + return OMPI_SUCCESS; +} + +int mca_pml_yalla_start(size_t count, ompi_request_t** requests) +{ + mca_pml_yalla_base_request_t *req; + mca_pml_yalla_send_request_t *sreq; + mxm_error_t error; + size_t i; + int rc; + + for (i = 0; i < count; ++i) { + req = (mca_pml_yalla_base_request_t *)requests[i]; + + if ((req == NULL) || (OMPI_REQUEST_PML != req->ompi.req_type)) { + /* Skip irrelevant requests */ + continue; + } + + PML_YALLA_ASSERT(req->ompi.req_state != OMPI_REQUEST_INVALID); + PML_YALLA_RESET_OMPI_REQ(&req->ompi, OMPI_REQUEST_ACTIVE); + PML_YALLA_RESET_PML_REQ(req); + + if (req->flags & MCA_PML_YALLA_REQUEST_FLAG_SEND) { + sreq = (mca_pml_yalla_send_request_t *)req; + if (req->flags & MCA_PML_YALLA_REQUEST_FLAG_BSEND) { + PML_YALLA_VERBOSE(8, "start bsend request %p", sreq); + rc = mca_pml_yalla_bsend(&sreq->mxm); + ompi_request_complete(&sreq->super.ompi, true); + if (OMPI_SUCCESS != rc) { + return rc; + } + } else { + PML_YALLA_VERBOSE(8, "start send request %p", sreq); + error = mxm_req_send(&sreq->mxm); + if (MXM_OK != error) { + return OMPI_ERROR; + } + } + } else { + PML_YALLA_VERBOSE(8, "start recv request %p", req); + error = mxm_req_recv(&((mca_pml_yalla_recv_request_t *)req)->mxm); + if (MXM_OK != error) { + return OMPI_ERROR; + } + } + } + return OMPI_SUCCESS; +} + +int mca_pml_yalla_dump(struct ompi_communicator_t* comm, int verbose) +{ + return OMPI_SUCCESS; +} diff --git a/ompi/mca/pml/yalla/pml_yalla.h b/ompi/mca/pml/yalla/pml_yalla.h new file mode 100644 index 0000000000..b330db8033 --- /dev/null +++ b/ompi/mca/pml/yalla/pml_yalla.h @@ -0,0 +1,150 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +* This software product is a proprietary product of Mellanox Technologies Ltd. +* (the "Company") and all right, title, and interest and to the software product, +* including all associated intellectual property rights, are and shall +* remain exclusively with the Company. +* +* This software product is governed by the End User License Agreement +* provided with the software product. +* $COPYRIGHT$ +* $HEADER$ +*/ + +#ifndef PML_YALLA_H_ +#define PML_YALLA_H_ + +#include "pml_yalla_freelist.h" + +#include "ompi_config.h" +#include "ompi/request/request.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/base.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/request/request.h" + +#include + +typedef struct mca_pml_yalla_module mca_pml_yalla_module_t; +typedef struct pml_yalla_base_request mca_pml_yalla_base_request_t; +typedef struct pml_yalla_send_request mca_pml_yalla_send_request_t; +typedef struct pml_yalla_bsend_request mca_pml_yalla_bsend_request_t; +typedef struct pml_yalla_recv_request mca_pml_yalla_recv_request_t; +typedef struct pml_yalla_convertor mca_pml_yalla_convertor_t; + +#if MXM_API < MXM_VERSION(2,0) +# error "MXM 2.0 or above is required" +#endif + +struct mca_pml_yalla_module { + mca_pml_base_module_t super; + + /* MXM global objects */ + mxm_h mxm_context; + mxm_ep_h mxm_ep; + + /* MXM requests */ + mca_pml_yalla_freelist_t send_reqs; + mca_pml_yalla_freelist_t bsend_reqs; + mca_pml_yalla_freelist_t recv_reqs; + + /* Convertors pool */ + mca_pml_yalla_freelist_t convs; + + int using_mem_hooks; + int priority; + int verbose; + int output; +}; + +extern mca_pml_base_component_2_0_0_t mca_pml_yalla_component; +extern mca_pml_yalla_module_t ompi_pml_yalla; + + +/* Debugging */ +#define PML_YALLA_ENABLE_DEBUG OPAL_ENABLE_DEBUG +#if PML_YALLA_ENABLE_DEBUG +# define PML_YALLA_MAX_VERBOSE 9 +# define PML_YALLA_ASSERT(_x) assert(_x) +#else +# define PML_YALLA_MAX_VERBOSE 2 +# define PML_YALLA_ASSERT(_x) +#endif + + +#define PML_YALLA_ERROR(format, ... ) \ + opal_output_verbose(0, ompi_pml_yalla.output, "Error: %s:%d - %s() " format, \ + __FILE__, __LINE__, __FUNCTION__, ## __VA_ARGS__) + +#define PML_YALLA_VERBOSE(_level, format, ... ) \ + if (((_level) <= PML_YALLA_MAX_VERBOSE) && ((_level) <= ompi_pml_yalla.verbose)) { \ + opal_output_verbose(_level, ompi_pml_yalla.output, "%s:%d - %s() " format, \ + __FILE__, __LINE__, __FUNCTION__, ## __VA_ARGS__); \ + } + +int mca_pml_yalla_init(void); +int mca_pml_yalla_cleanup(void); + +int mca_pml_yalla_add_procs(struct ompi_proc_t **procs, size_t nprocs); +int mca_pml_yalla_del_procs(struct ompi_proc_t **procs, size_t nprocs); + +int mca_pml_yalla_enable(bool enable); +int mca_pml_yalla_progress(void); + +int mca_pml_yalla_add_comm(struct ompi_communicator_t* comm); +int mca_pml_yalla_del_comm(struct ompi_communicator_t* comm); + +int mca_pml_yalla_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype, + int src, int tag, struct ompi_communicator_t* comm, + struct ompi_request_t **request); + +int mca_pml_yalla_irecv(void *buf, size_t count, ompi_datatype_t *datatype, + int src, int tag, struct ompi_communicator_t* comm, + struct ompi_request_t **request); + +int mca_pml_yalla_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src, + int tag, struct ompi_communicator_t* comm, + ompi_status_public_t* status); + +int mca_pml_yalla_isend_init(void *buf, size_t count, ompi_datatype_t *datatype, + int dst, int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request); + +int mca_pml_yalla_isend(void *buf, size_t count, ompi_datatype_t *datatype, + int dst, int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request); + +int mca_pml_yalla_send(void *buf, size_t count, ompi_datatype_t *datatype, int dst, + int tag, mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm); + +int mca_pml_yalla_iprobe(int src, int tag, struct ompi_communicator_t* comm, + int *matched, ompi_status_public_t* status); + +int mca_pml_yalla_probe(int src, int tag, struct ompi_communicator_t* comm, + ompi_status_public_t* status); + +int mca_pml_yalla_improbe(int src, int tag, struct ompi_communicator_t* comm, + int *matched, struct ompi_message_t **message, + ompi_status_public_t* status); + +int mca_pml_yalla_mprobe(int src, int tag, struct ompi_communicator_t* comm, + struct ompi_message_t **message, + ompi_status_public_t* status); + +int mca_pml_yalla_imrecv(void *buf, size_t count, ompi_datatype_t *datatype, + struct ompi_message_t **message, + struct ompi_request_t **request); + +int mca_pml_yalla_mrecv(void *buf, size_t count, ompi_datatype_t *datatype, + struct ompi_message_t **message, + ompi_status_public_t* status); + +int mca_pml_yalla_start(size_t count, ompi_request_t** requests); + +int mca_pml_yalla_dump(struct ompi_communicator_t* comm, int verbose); + +#endif /* PML_YALLA_H_ */ diff --git a/ompi/mca/pml/yalla/pml_yalla_component.c b/ompi/mca/pml/yalla/pml_yalla_component.c new file mode 100644 index 0000000000..fc013244d0 --- /dev/null +++ b/ompi/mca/pml/yalla/pml_yalla_component.c @@ -0,0 +1,103 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +* This software product is a proprietary product of Mellanox Technologies Ltd. +* (the "Company") and all right, title, and interest and to the software product, +* including all associated intellectual property rights, are and shall +* remain exclusively with the Company. +* +* This software product is governed by the End User License Agreement +* provided with the software product. +* $COPYRIGHT$ +* $HEADER$ +*/ + +#include "pml_yalla.h" + + +static int mca_pml_yalla_component_register(void); +static int mca_pml_yalla_component_open(void); +static int mca_pml_yalla_component_close(void); + +static mca_pml_base_module_t* +mca_pml_yalla_component_init(int* priority, bool enable_progress_threads, + bool enable_mpi_threads); +static int mca_pml_yalla_component_fini(void); + + +mca_pml_base_component_2_0_0_t mca_pml_yalla_component = { + + /* First, the mca_base_component_t struct containing meta + * information about the component itself */ + { + MCA_PML_BASE_VERSION_2_0_0, + + "yalla", /* MCA component name */ + OMPI_MAJOR_VERSION, /* MCA component major version */ + OMPI_MINOR_VERSION, /* MCA component minor version */ + OMPI_RELEASE_VERSION, /* MCA component release version */ + mca_pml_yalla_component_open, /* component open */ + mca_pml_yalla_component_close, /* component close */ + NULL, + mca_pml_yalla_component_register, + }, + { + /* This component is not checkpoint ready */ + MCA_BASE_METADATA_PARAM_NONE + }, + + mca_pml_yalla_component_init, /* component init */ + mca_pml_yalla_component_fini /* component finalize */ +}; + +static int mca_pml_yalla_component_register(void) +{ + ompi_pml_yalla.verbose = 0; + (void) mca_base_component_var_register(&mca_pml_yalla_component.pmlm_version, "verbose", + "Verbose level of the yalla component", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_3, + MCA_BASE_VAR_SCOPE_LOCAL, + &ompi_pml_yalla.verbose); + + ompi_pml_yalla.priority = 50; + (void) mca_base_component_var_register(&mca_pml_yalla_component.pmlm_version, "priority", + "Priority of the yalla component", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_3, + MCA_BASE_VAR_SCOPE_LOCAL, + &ompi_pml_yalla.priority); + return 0; +} + +static int mca_pml_yalla_component_open(void) +{ + ompi_pml_yalla.output = opal_output_open(NULL); + opal_output_set_verbosity(ompi_pml_yalla.output, ompi_pml_yalla.verbose); + return 0; +} + +static int mca_pml_yalla_component_close(void) +{ + opal_output_close(ompi_pml_yalla.output); + return 0; +} + +static mca_pml_base_module_t* +mca_pml_yalla_component_init(int* priority, bool enable_progress_threads, + bool enable_mpi_threads) +{ + int ret; + + if ( (ret = mca_pml_yalla_init()) != 0) { + return NULL; + } + + *priority = ompi_pml_yalla.priority; + return &ompi_pml_yalla.super; +} + +static int mca_pml_yalla_component_fini(void) +{ + return mca_pml_yalla_cleanup(); +} + diff --git a/ompi/mca/pml/yalla/pml_yalla_datatype.c b/ompi/mca/pml/yalla/pml_yalla_datatype.c new file mode 100644 index 0000000000..e6015de386 --- /dev/null +++ b/ompi/mca/pml/yalla/pml_yalla_datatype.c @@ -0,0 +1,167 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +* This software product is a proprietary product of Mellanox Technologies Ltd. +* (the "Company") and all right, title, and interest and to the software product, +* including all associated intellectual property rights, are and shall +* remain exclusively with the Company. +* +* This software product is governed by the End User License Agreement +* provided with the software product. +* $COPYRIGHT$ +* $HEADER$ +*/ + +#include "pml_yalla_datatype.h" +#include "pml_yalla_request.h" + + +static mca_pml_yalla_convertor_t *mca_pml_yalla_get_send_convertor(void *buf, size_t count, + ompi_datatype_t *datatype) +{ + mca_pml_yalla_convertor_t *convertor = PML_YALLA_FREELIST_GET(&ompi_pml_yalla.convs); + + convertor->datatype = datatype; + OBJ_RETAIN(datatype); + opal_convertor_copy_and_prepare_for_send(ompi_proc_local_proc->super.proc_convertor, + &datatype->super, count, buf, 0, + &convertor->convertor); + return convertor; +} + +static mca_pml_yalla_convertor_t *mca_pml_yalla_get_recv_convertor(void *buf, size_t count, + ompi_datatype_t *datatype) +{ + mca_pml_yalla_convertor_t *convertor = PML_YALLA_FREELIST_GET(&ompi_pml_yalla.convs); + + convertor->datatype = datatype; + OBJ_RETAIN(datatype); + opal_convertor_copy_and_prepare_for_recv(ompi_proc_local_proc->super.proc_convertor, + &datatype->super, count, buf, 0, + &convertor->convertor); + return convertor; +} + +static void mca_pml_yalla_noncontig_req_init(mxm_req_base_t *mxm_req, + mca_pml_yalla_convertor_t *convertor, + mxm_stream_cb_t stream_cb) +{ + mxm_req->data_type = MXM_REQ_DATA_STREAM; + mxm_req->data.stream.cb = stream_cb; + opal_convertor_get_packed_size(&convertor->convertor, &mxm_req->data.stream.length); +} + +static size_t mca_pml_yalla_stream_unpack(void *buffer, size_t length, size_t offset, + opal_convertor_t *convertor) +{ + uint32_t iov_count; + struct iovec iov; + + iov_count = 1; + iov.iov_base = buffer; + iov.iov_len = length; + + opal_convertor_set_position(convertor, &offset); + opal_convertor_unpack(convertor, &iov, &iov_count, &length); + return length; +} + +static size_t mca_pml_yalla_stream_pack(void *buffer, size_t length, size_t offset, + opal_convertor_t *convertor) +{ + uint32_t iov_count; + struct iovec iov; + + iov_count = 1; + iov.iov_base = buffer; + iov.iov_len = length; + + opal_convertor_set_position(convertor, &offset); + opal_convertor_pack(convertor, &iov, &iov_count, &length); + return length; +} + +static size_t mxm_pml_yalla_irecv_stream_cb(void *buffer, size_t length, + size_t offset, void *context) +{ + mca_pml_yalla_base_request_t *req = context; + return mca_pml_yalla_stream_unpack(buffer, length, offset, &req->convertor->convertor); +} + +static size_t mxm_pml_yalla_recv_stream_cb(void *buffer, size_t length, + size_t offset, void *context) +{ + mca_pml_yalla_convertor_t *convertor = context; + return mca_pml_yalla_stream_unpack(buffer, length, offset, &convertor->convertor); +} + +static size_t mxm_pml_yalla_isend_stream_cb(void *buffer, size_t length, + size_t offset, void *context) +{ + mca_pml_yalla_base_request_t *req = context; + return mca_pml_yalla_stream_pack(buffer, length, offset, &req->convertor->convertor); +} + +static size_t mxm_pml_yalla_send_stream_cb(void *buffer, size_t length, + size_t offset, void *context) +{ + mca_pml_yalla_convertor_t *convertor = context; + return mca_pml_yalla_stream_pack(buffer, length, offset, &convertor->convertor); +} + +void mca_pml_yalla_set_noncontig_data_irecv(mxm_req_base_t *mxm_req, void *buf, + size_t count, ompi_datatype_t *datatype, + mca_pml_yalla_recv_request_t *rreq) +{ + rreq->super.convertor = mca_pml_yalla_get_recv_convertor(buf, count, datatype); + mca_pml_yalla_noncontig_req_init(mxm_req, rreq->super.convertor, mxm_pml_yalla_irecv_stream_cb); +} + +void mca_pml_yalla_set_noncontig_data_recv(mxm_req_base_t *mxm_req, void *buf, + size_t count, ompi_datatype_t *datatype) +{ + mca_pml_yalla_convertor_t *convertor; + + convertor = mca_pml_yalla_get_recv_convertor(buf, count, datatype); + mca_pml_yalla_noncontig_req_init(mxm_req, convertor, mxm_pml_yalla_recv_stream_cb); + mxm_req->context = convertor; +} + +void mca_pml_yalla_set_noncontig_data_isend(mxm_req_base_t *mxm_req, void *buf, + size_t count, ompi_datatype_t *datatype, + mca_pml_yalla_send_request_t *sreq) +{ + sreq->super.convertor = mca_pml_yalla_get_send_convertor(buf, count, datatype); + mca_pml_yalla_noncontig_req_init(mxm_req, sreq->super.convertor, mxm_pml_yalla_isend_stream_cb); +} + +void mca_pml_yalla_set_noncontig_data_send(mxm_req_base_t *mxm_req, void *buf, + size_t count, ompi_datatype_t *datatype) +{ + mca_pml_yalla_convertor_t *convertor; + + convertor = mca_pml_yalla_get_send_convertor(buf, count, datatype); + mca_pml_yalla_noncontig_req_init(mxm_req, convertor, mxm_pml_yalla_send_stream_cb); + mxm_req->context = convertor; +} + +static void mca_pml_yalla_convertor_construct(mca_pml_yalla_convertor_t *convertor) +{ + OBJ_CONSTRUCT(&convertor->convertor, opal_convertor_t); +} + +static void mca_pml_yalla_convertor_destruct(mca_pml_yalla_convertor_t *convertor) +{ + OBJ_DESTRUCT(&convertor->convertor); +} + +void mca_pml_yalla_init_datatype(void) +{ + PML_YALLA_FREELIST_INIT(&ompi_pml_yalla.convs, mca_pml_yalla_convertor_t, + 128, -1, 128); +} + +OBJ_CLASS_INSTANCE(mca_pml_yalla_convertor_t, + ompi_free_list_item_t, + mca_pml_yalla_convertor_construct, + mca_pml_yalla_convertor_destruct); + diff --git a/ompi/mca/pml/yalla/pml_yalla_datatype.h b/ompi/mca/pml/yalla/pml_yalla_datatype.h new file mode 100644 index 0000000000..c3be5ab469 --- /dev/null +++ b/ompi/mca/pml/yalla/pml_yalla_datatype.h @@ -0,0 +1,77 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +* This software product is a proprietary product of Mellanox Technologies Ltd. +* (the "Company") and all right, title, and interest and to the software product, +* including all associated intellectual property rights, are and shall +* remain exclusively with the Company. +* +* This software product is governed by the End User License Agreement +* provided with the software product. +* $COPYRIGHT$ +* $HEADER$ +*/ + +#ifndef PML_YALLA_DATATYPE_H_ +#define PML_YALLA_DATATYPE_H_ + +#include "pml_yalla.h" + +struct pml_yalla_convertor { + ompi_free_list_item_t super; + ompi_datatype_t *datatype; + opal_convertor_t convertor; +}; + +OBJ_CLASS_DECLARATION(mca_pml_yalla_convertor_t); + +#define PML_YALLA_INIT_MXM_REQ_DATA(_req_base, _buf, _count, _dtype, _stream_type, ...) \ + { \ + ptrdiff_t size, lb; \ + \ + if (opal_datatype_is_contiguous_memory_layout(&(_dtype)->super, _count)) { \ + ompi_datatype_type_size(_dtype, &size); \ + ompi_datatype_type_lb(_dtype, &lb); \ + (_req_base)->data_type = MXM_REQ_DATA_BUFFER; \ + (_req_base)->data.buffer.ptr = _buf + lb; \ + (_req_base)->data.buffer.length = size * (_count); \ + } else { \ + mca_pml_yalla_set_noncontig_data_ ## _stream_type(_req_base, \ + _buf, _count, \ + _dtype, ## __VA_ARGS__); \ + } \ + } + +#define PML_YALLA_RESET_PML_REQ_DATA(_pml_req) \ + { \ + if ((_pml_req)->convertor != NULL) { \ + size_t _position = 0; \ + opal_convertor_set_position(&(_pml_req)->convertor->convertor, &_position); \ + } \ + } + + +static inline void mca_pml_yalla_convertor_free(mca_pml_yalla_convertor_t *convertor) +{ + opal_convertor_cleanup(&convertor->convertor); + OBJ_RELEASE(convertor->datatype); + PML_YALLA_FREELIST_RETURN(&ompi_pml_yalla.convs, &convertor->super); +} + +void mca_pml_yalla_set_noncontig_data_irecv(mxm_req_base_t *mxm_req, void *buf, + size_t count, ompi_datatype_t *datatype, + mca_pml_yalla_recv_request_t *rreq); + +void mca_pml_yalla_set_noncontig_data_recv(mxm_req_base_t *mxm_req, void *buf, + size_t count, ompi_datatype_t *datatype); + +void mca_pml_yalla_set_noncontig_data_isend(mxm_req_base_t *mxm_req, void *buf, + size_t count, ompi_datatype_t *datatype, + mca_pml_yalla_send_request_t *sreq); + +void mca_pml_yalla_set_noncontig_data_send(mxm_req_base_t *mxm_req, void *buf, + size_t count, ompi_datatype_t *datatype); + +void mca_pml_yalla_init_datatype(void); + + +#endif /* PML_YALLA_DATATYPE_H_ */ diff --git a/ompi/mca/pml/yalla/pml_yalla_freelist.h b/ompi/mca/pml/yalla/pml_yalla_freelist.h new file mode 100644 index 0000000000..5279cd190f --- /dev/null +++ b/ompi/mca/pml/yalla/pml_yalla_freelist.h @@ -0,0 +1,40 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +* This software product is a proprietary product of Mellanox Technologies Ltd. +* (the "Company") and all right, title, and interest and to the software product, +* including all associated intellectual property rights, are and shall +* remain exclusively with the Company. +* +* This software product is governed by the End User License Agreement +* provided with the software product. +* $COPYRIGHT$ +* $HEADER$ +*/ + +#ifndef PML_YALLA_FREELIST_H_ +#define PML_YALLA_FREELIST_H_ + +#include "ompi_config.h" +#include "opal/class/opal_free_list.h" + + +#define mca_pml_yalla_freelist_t opal_free_list_t + +#define PML_YALLA_FREELIST_GET(_freelist) \ + ({ \ + opal_free_list_item_t *item; \ + int rc; \ + OPAL_FREE_LIST_GET(_freelist, item, rc); \ + (void*)(item); \ + }) + +#define PML_YALLA_FREELIST_RETURN(_freelist, _item) \ + { \ + OPAL_FREE_LIST_RETURN(_freelist, _item); \ + } + +#define PML_YALLA_FREELIST_INIT(_fl, _type, _initial, _max, _batch) \ + opal_free_list_init(_fl, sizeof(_type), OBJ_CLASS(_type), \ + _initial, _max, _batch); + +#endif /* PML_YALLA_FREELIST_H_ */ diff --git a/ompi/mca/pml/yalla/pml_yalla_request.c b/ompi/mca/pml/yalla/pml_yalla_request.c new file mode 100644 index 0000000000..4ce05f0969 --- /dev/null +++ b/ompi/mca/pml/yalla/pml_yalla_request.c @@ -0,0 +1,288 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +* This software product is a proprietary product of Mellanox Technologies Ltd. +* (the "Company") and all right, title, and interest and to the software product, +* including all associated intellectual property rights, are and shall +* remain exclusively with the Company. +* +* This software product is governed by the End User License Agreement +* provided with the software product. +* $COPYRIGHT$ +* $HEADER$ +*/ + + +#include "pml_yalla_request.h" + +#include "ompi/mca/pml/base/pml_base_bsend.h" +#include "ompi/message/message.h" + + +static inline void mca_pml_yalla_request_release(mca_pml_yalla_base_request_t *req, + mca_pml_yalla_freelist_t *fl) +{ + if (req->convertor != NULL) { + mca_pml_yalla_convertor_free(req->convertor); + req->convertor = NULL; + } + OBJ_RELEASE(req->ompi.req_mpi_object.comm); + +#if PML_YALLA_ENABLE_DEBUG + req->ompi.req_state = OMPI_REQUEST_INVALID; +#endif + PML_YALLA_FREELIST_RETURN(fl, &req->ompi.super); +} + +static inline int +mca_pml_yalla_check_request_state(mca_pml_yalla_base_request_t *req) +{ + if (req->mxm_base->state != MXM_REQ_COMPLETED) { + PML_YALLA_VERBOSE(8, "request %p free called before completed", req); + req->flags |= MCA_PML_YALLA_REQUEST_FLAG_FREE_CALLED; + return 0; + } + + return 1; +} + +static int mca_pml_yalla_send_request_free(ompi_request_t **request) +{ + mca_pml_yalla_base_request_t *req = (mca_pml_yalla_base_request_t*)(*request); + + PML_YALLA_VERBOSE(9, "free send request *%p=%p", request, *request); + + OPAL_THREAD_LOCK(&ompi_request_lock); + if (mca_pml_yalla_check_request_state(req)) { + mca_pml_yalla_request_release(req, &ompi_pml_yalla.send_reqs); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + + *request = MPI_REQUEST_NULL; + return OMPI_SUCCESS; +} + +static int mca_pml_yalla_send_request_cancel(ompi_request_t *request, int flag) +{ + mca_pml_yalla_send_request_t *sreq = (mca_pml_yalla_send_request_t*)request; + mxm_error_t error; + + if (request->req_complete) { + /* + * This might be a buffered send request which has completed anyway, so + * we cannot cancel it anymore. Just hope for the best. + */ + PML_YALLA_VERBOSE(7, "not canceling a completed send request %p", request); + return OMPI_SUCCESS; + } + + error = mxm_req_cancel_send(&sreq->mxm); + if ((error != MXM_OK) && (error != MXM_ERR_NO_PROGRESS)) { + PML_YALLA_ERROR("failed to cancel send request %p: %s", request, + mxm_error_string(error)); + return OMPI_ERROR; + } + + PML_YALLA_VERBOSE(9, "canceled send request %p", request); + return OMPI_SUCCESS; +} + +static int mca_pml_yalla_recv_request_free(ompi_request_t **request) +{ + mca_pml_yalla_base_request_t *req = (mca_pml_yalla_base_request_t*)(*request); + + PML_YALLA_VERBOSE(9, "free receive request *%p=%p", request, *request); + + OPAL_THREAD_LOCK(&ompi_request_lock); + if (mca_pml_yalla_check_request_state(req)) { + mca_pml_yalla_request_release(req, &ompi_pml_yalla.recv_reqs); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); + + *request = MPI_REQUEST_NULL; + return OMPI_SUCCESS; +} + +static int mca_pml_yalla_recv_request_cancel(ompi_request_t *request, int flag) +{ + mca_pml_yalla_recv_request_t *rreq = (mca_pml_yalla_recv_request_t*)request; + mxm_error_t error; + + error = mxm_req_cancel_recv(&rreq->mxm); + if ((error != MXM_OK) && (error != MXM_ERR_NO_PROGRESS)) { + PML_YALLA_ERROR("failed to cancel receive request %p: %s", request, + mxm_error_string(error)); + return OMPI_ERROR; + } + + PML_YALLA_VERBOSE(9, "canceled receive request %p", request); + return OMPI_SUCCESS; +} + +static void init_mxm_base_req(mxm_req_base_t *mxm_req_base) +{ + mxm_req_base->state = MXM_REQ_NEW; + mxm_req_base->mq = NULL; + mxm_req_base->conn = NULL; + mxm_req_base->data_type = MXM_REQ_DATA_BUFFER; + mxm_req_base->data.buffer.ptr = NULL; + mxm_req_base->data.buffer.length = 0; + mxm_req_base->data.buffer.memh = 0; + mxm_req_base->context = NULL; + mxm_req_base->completed_cb = NULL; +} + +static void init_mxm_send_req(mxm_send_req_t *mxm_sreq) +{ + init_mxm_base_req(&mxm_sreq->base); + mxm_sreq->opcode = MXM_REQ_OP_SEND; + mxm_sreq->op.send.imm_data = 0; + mxm_sreq->op.send.tag = 0; +#if defined(MXM_REQ_SEND_FLAG_REENTRANT) + mxm_sreq->flags = MXM_REQ_SEND_FLAG_REENTRANT; +#else + mxm_sreq->flags = 0; +#endif +} + +static void init_mxm_recv_req(mxm_recv_req_t *mxm_rreq) +{ + init_mxm_base_req(&mxm_rreq->base); + mxm_rreq->tag = 0; + mxm_rreq->tag_mask = 0x7fffffff; +} + +static void init_base_req(mca_pml_yalla_base_request_t *req) +{ + OMPI_REQUEST_INIT(&req->ompi, false); + req->ompi.req_type = OMPI_REQUEST_PML; + req->ompi.req_cancel = NULL; + req->ompi.req_complete_cb = NULL; + req->ompi.req_complete_cb_data = NULL; + req->convertor = NULL; +} + +static void mca_pml_yalla_send_completion_cb(void *context) +{ + mca_pml_yalla_send_request_t* sreq = context; + + switch (sreq->mxm.base.error) { + case MXM_OK: + sreq->super.ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + break; + case MXM_ERR_CANCELED: + sreq->super.ompi.req_status._cancelled = true; + break; + default: + sreq->super.ompi.req_status.MPI_ERROR = MPI_ERR_INTERN; + break; + } + + PML_YALLA_VERBOSE(8, "send request %p completed with status %s", sreq, + mxm_error_string(sreq->mxm.base.error)); + + OPAL_THREAD_LOCK(&ompi_request_lock); + ompi_request_complete(&sreq->super.ompi, true); + if (sreq->super.flags & MCA_PML_YALLA_REQUEST_FLAG_FREE_CALLED) { + PML_YALLA_VERBOSE(7, "release request %p because free was already called", sreq); + mca_pml_yalla_request_release(&sreq->super, &ompi_pml_yalla.send_reqs); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); +} + +static void mca_pml_yalla_bsend_completion_cb(void *context) +{ + mca_pml_yalla_bsend_request_t *bsreq = context; + + PML_YALLA_VERBOSE(8, "bsend request %p completed with status %s", bsreq, + mxm_error_string(bsreq->mxm.base.error)); + + mca_pml_base_bsend_request_free(bsreq->mxm.base.data.buffer.ptr); + PML_YALLA_FREELIST_RETURN(&ompi_pml_yalla.bsend_reqs, &bsreq->super); +} + +static void mca_pml_yalla_recv_completion_cb(void *context) +{ + mca_pml_yalla_recv_request_t* rreq = context; + + PML_YALLA_SET_RECV_STATUS(&rreq->mxm, rreq->mxm.completion.actual_len, + &rreq->super.ompi.req_status); + + PML_YALLA_VERBOSE(8, "receive request %p completed with status %s source %d rtag %d(%d/0x%x) len %Zu", + rreq, mxm_error_string(rreq->mxm.base.error), + rreq->mxm.completion.sender_imm, rreq->mxm.completion.sender_tag, + rreq->mxm.tag, rreq->mxm.tag_mask, + rreq->mxm.completion.actual_len); + + OPAL_THREAD_LOCK(&ompi_request_lock); + ompi_request_complete(&rreq->super.ompi, true); + if (rreq->super.flags & MCA_PML_YALLA_REQUEST_FLAG_FREE_CALLED) { + PML_YALLA_VERBOSE(7, "release request %p because free was already called", rreq); + mca_pml_yalla_request_release(&rreq->super, &ompi_pml_yalla.recv_reqs); + } + OPAL_THREAD_UNLOCK(&ompi_request_lock); +} + +static void mca_pml_yalla_send_request_construct(mca_pml_yalla_send_request_t* sreq) +{ + init_base_req(&sreq->super); + init_mxm_send_req(&sreq->mxm); + sreq->super.ompi.req_free = mca_pml_yalla_send_request_free; + sreq->super.ompi.req_cancel = mca_pml_yalla_send_request_cancel; + sreq->mxm.base.context = sreq; + sreq->mxm.base.completed_cb = mca_pml_yalla_send_completion_cb; +} + +static void mca_pml_yalla_send_request_destruct(mca_pml_yalla_send_request_t *sreq) +{ + OMPI_REQUEST_FINI(&sreq->super.ompi); +} + +static void mca_pml_yalla_bsend_request_construct(mca_pml_yalla_bsend_request_t* bsreq) +{ + init_mxm_send_req(&bsreq->mxm); + bsreq->mxm.base.context = bsreq; + bsreq->mxm.base.completed_cb = mca_pml_yalla_bsend_completion_cb; +} + +static void mca_pml_yalla_recv_request_construct(mca_pml_yalla_recv_request_t* rreq) +{ + init_base_req(&rreq->super); + init_mxm_recv_req(&rreq->mxm); + rreq->super.ompi.req_free = mca_pml_yalla_recv_request_free; + rreq->super.ompi.req_cancel = mca_pml_yalla_recv_request_cancel; + rreq->mxm.base.context = rreq; + rreq->mxm.base.completed_cb = mca_pml_yalla_recv_completion_cb; +} + +static void mca_pml_yalla_recv_request_destruct(mca_pml_yalla_recv_request_t *rreq) +{ + OMPI_REQUEST_FINI(&rreq->super.ompi); +} + +void mca_pml_yalla_init_reqs(void) +{ + PML_YALLA_FREELIST_INIT(&ompi_pml_yalla.send_reqs, mca_pml_yalla_send_request_t, + 128, -1, 128); + + PML_YALLA_FREELIST_INIT(&ompi_pml_yalla.bsend_reqs, mca_pml_yalla_bsend_request_t, + 128, -1, 128); + + PML_YALLA_FREELIST_INIT(&ompi_pml_yalla.recv_reqs, mca_pml_yalla_recv_request_t, + 128, -1, 128); +} + +OBJ_CLASS_INSTANCE(mca_pml_yalla_send_request_t, + ompi_request_t, + mca_pml_yalla_send_request_construct, + mca_pml_yalla_send_request_destruct); + +OBJ_CLASS_INSTANCE(mca_pml_yalla_bsend_request_t, + ompi_free_list_item_t, + mca_pml_yalla_bsend_request_construct, + NULL); + +OBJ_CLASS_INSTANCE(mca_pml_yalla_recv_request_t, + ompi_request_t, + mca_pml_yalla_recv_request_construct, + mca_pml_yalla_recv_request_destruct); + diff --git a/ompi/mca/pml/yalla/pml_yalla_request.h b/ompi/mca/pml/yalla/pml_yalla_request.h new file mode 100644 index 0000000000..8f50452708 --- /dev/null +++ b/ompi/mca/pml/yalla/pml_yalla_request.h @@ -0,0 +1,219 @@ +/** +* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. +* This software product is a proprietary product of Mellanox Technologies Ltd. +* (the "Company") and all right, title, and interest and to the software product, +* including all associated intellectual property rights, are and shall +* remain exclusively with the Company. +* +* This software product is governed by the End User License Agreement +* provided with the software product. +* $COPYRIGHT$ +* $HEADER$ +*/ + +#ifndef PML_YALLA_REQUEST_H_ +#define PML_YALLA_REQUEST_H_ + +#include "pml_yalla.h" +#include "pml_yalla_datatype.h" + + +#define MCA_PML_YALLA_REQUEST_FLAG_SEND 0x1 /* Persistent send */ +#define MCA_PML_YALLA_REQUEST_FLAG_BSEND 0x2 /* Persistent buffered send */ +#define MCA_PML_YALLA_REQUEST_FLAG_FREE_CALLED 0x4 + +struct pml_yalla_base_request { + ompi_request_t ompi; + mca_pml_yalla_convertor_t *convertor; + int flags; + mxm_req_base_t mxm_base[0]; /* overlaps with base of send/recv */ +}; + +struct pml_yalla_send_request { + mca_pml_yalla_base_request_t super; + mxm_send_req_t mxm; +}; + +struct pml_yalla_bsend_request { + ompi_free_list_item_t super; + mxm_send_req_t mxm; +}; + +struct pml_yalla_recv_request { + mca_pml_yalla_base_request_t super; + mxm_recv_req_t mxm; +}; + + +OBJ_CLASS_DECLARATION(mca_pml_yalla_send_request_t); +OBJ_CLASS_DECLARATION(mca_pml_yalla_bsend_request_t); +OBJ_CLASS_DECLARATION(mca_pml_yalla_recv_request_t); + +void mca_pml_yalla_init_reqs(void); + +#define PML_YALLA_RESET_OMPI_REQ(_ompi_req, _state) \ + { \ + (_ompi_req)->req_state = _state; \ + (_ompi_req)->req_complete = false; \ + (_ompi_req)->req_status._cancelled = false; \ + } + +#define PML_YALLA_INIT_OMPI_REQ(_ompi_req, _comm, _state) \ + { \ + PML_YALLA_RESET_OMPI_REQ(_ompi_req, _state); \ + (_ompi_req)->req_mpi_object.comm = _comm; \ + OBJ_RETAIN(_comm); \ + } + +#define PML_YALLA_RESET_PML_REQ(_pml_req) \ + { \ + (_pml_req)->mxm_base[0].state = MXM_REQ_NEW; \ + PML_YALLA_RESET_PML_REQ_DATA(_pml_req); \ + } + +#define PML_YALLA_INIT_MXM_REQ_BASE(_req_base, _comm) \ + { \ + (_req_base)->state = MXM_REQ_NEW; \ + (_req_base)->mq = (void*)(_comm)->c_pml_comm; \ + } + +#define PML_YALLA_PEER_CONN(_comm, _rank) \ + ompi_comm_peer_lookup(_comm, _rank)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] + +#define PML_YALLA_INIT_MXM_SEND_REQ(_sreq, _buf, _count, _dtype, _rank, _tag, _mode, _comm, _stream_type, ...) \ + { \ + PML_YALLA_INIT_MXM_REQ_BASE(&(_sreq)->base, _comm); \ + PML_YALLA_INIT_MXM_REQ_DATA(&(_sreq)->base, _buf, _count, _dtype, _stream_type, ## __VA_ARGS__); \ + (_sreq)->base.conn = PML_YALLA_PEER_CONN(_comm, _rank); \ + (_sreq)->opcode = ((_mode) == MCA_PML_BASE_SEND_SYNCHRONOUS) ? MXM_REQ_OP_SEND_SYNC : MXM_REQ_OP_SEND; \ + (_sreq)->op.send.tag = _tag; \ + (_sreq)->op.send.imm_data = ompi_comm_rank(_comm); \ + } + +#define PML_YALLA_INIT_MXM_RECV_REQ_ENVELOPE(_rreq, _rank, _tag, _comm) \ + { \ + (_rreq)->base.conn = ((_rank) == MPI_ANY_SOURCE) ? NULL : PML_YALLA_PEER_CONN(_comm, _rank); \ + if ((_tag) == MPI_ANY_TAG) { \ + (_rreq)->tag = 0; \ + (_rreq)->tag_mask = 0x80000000u; \ + } else { \ + (_rreq)->tag = _tag; \ + (_rreq)->tag_mask = 0xffffffffu; \ + } \ + } + +#define PML_YALLA_INIT_MXM_RECV_REQ(_rreq, _buf, _count, _dtype, _rank, _tag, _comm, _stream_type, ...) \ + { \ + PML_YALLA_INIT_MXM_REQ_BASE(&(_rreq)->base, _comm); \ + PML_YALLA_INIT_MXM_REQ_DATA(&(_rreq)->base, _buf, _count, _dtype, _stream_type, ## __VA_ARGS__); \ + PML_YALLA_INIT_MXM_RECV_REQ_ENVELOPE(_rreq, _rank, _tag, _comm); \ + } + +#define PML_YALLA_INIT_BLOCKING_MXM_SEND_REQ(_sreq) \ + { \ + (_sreq)->base.completed_cb = NULL; \ + (_sreq)->flags = MXM_REQ_SEND_FLAG_BLOCKING; \ + } + +#define PML_YALLA_INIT_BLOCKING_MXM_RECV_REQ(_rreq) \ + { \ + (_rreq)->base.completed_cb = NULL; \ + } + +#define PML_YALLA_FREE_BLOCKING_MXM_REQ(_req) \ + { \ + if ((_req)->completed_cb != NULL) { \ + mca_pml_yalla_convertor_free((mca_pml_yalla_convertor_t*)((_req)->context)); \ + } \ + } + +#define MCA_PML_YALLA_RREQ_INIT(_buf, _count, _datatype, _src, _tag, _comm, _state) \ + ({ \ + mca_pml_yalla_recv_request_t *rreq = PML_YALLA_FREELIST_GET(&ompi_pml_yalla.recv_reqs); \ + \ + PML_YALLA_INIT_OMPI_REQ(&rreq->super.ompi, _comm, _state); \ + PML_YALLA_INIT_MXM_RECV_REQ(&rreq->mxm, _buf, _count, _datatype, _src, _tag, \ + _comm, irecv, rreq); \ + rreq; \ + }) + +#define MCA_PML_YALLA_SREQ_INIT(_buf, _count, _datatype, _dst, _tag, _mode, _comm, _state) \ + ({ \ + mca_pml_yalla_send_request_t *sreq = PML_YALLA_FREELIST_GET(&ompi_pml_yalla.send_reqs); \ + \ + PML_YALLA_INIT_OMPI_REQ(&sreq->super.ompi, _comm, _state); \ + PML_YALLA_INIT_MXM_SEND_REQ(&sreq->mxm, _buf, _count, _datatype, _dst, _tag, \ + mode, _comm, isend, sreq); \ + sreq->super.ompi.req_status.MPI_TAG = _tag; \ + sreq->super.ompi.req_status.MPI_SOURCE = (_comm)->c_my_rank; \ + sreq->super.ompi.req_status._ucount = _count; \ + sreq; \ + }) + +#define PML_YALLA_INIT_MXM_PROBE_REQ(_rreq, _rank, _tag, _comm) \ + { \ + PML_YALLA_INIT_MXM_REQ_BASE(&(_rreq)->base, _comm); \ + PML_YALLA_INIT_MXM_RECV_REQ_ENVELOPE(_rreq, _rank, _tag, _comm); \ + } + +/* + * For multi-threaded MPI, avoid blocking inside mxm_wait(), since it prevents + * from other threads making progress. + */ +#define PML_YALLA_WAIT_MXM_REQ(_req_base) \ + { \ + if (opal_using_threads()) { \ + while (!mxm_req_test(_req_base)) { \ + sched_yield(); \ + opal_progress(); \ + } \ + } else if (!mxm_req_test(_req_base)) { \ + mxm_wait_t wait; \ + wait.progress_cb = (mxm_progress_cb_t)opal_progress; \ + wait.progress_arg = NULL; \ + wait.req = (_req_base); \ + wait.state = MXM_REQ_COMPLETED; \ + mxm_wait(&wait); \ + } \ + } + +#define PML_YALLA_SET_RECV_STATUS(_rreq, _length, _mpi_status) \ + { \ + if ((_mpi_status) != MPI_STATUS_IGNORE) { \ + switch ((_rreq)->base.error) { \ + case MXM_OK: \ + (_mpi_status)->MPI_ERROR = OMPI_SUCCESS; \ + break; \ + case MXM_ERR_CANCELED: \ + (_mpi_status)->_cancelled = true; \ + break; \ + case MXM_ERR_MESSAGE_TRUNCATED: \ + (_mpi_status)->MPI_ERROR = MPI_ERR_TRUNCATE; \ + break; \ + default: \ + (_mpi_status)->MPI_ERROR = MPI_ERR_INTERN; \ + break; \ + } \ + \ + (_mpi_status)->MPI_TAG = (_rreq)->completion.sender_tag; \ + (_mpi_status)->MPI_SOURCE = (_rreq)->completion.sender_imm; \ + (_mpi_status)->_ucount = (_length); \ + } \ + } + +#define PML_YALLA_SET_MESSAGE(_rreq, _comm, _mxm_msg, _message) \ + { \ + *(_message) = ompi_message_alloc(); \ + (*(_message))->comm = (_comm); \ + (*(_message))->count = (_rreq)->completion.sender_len; \ + (*(_message))->peer = (_rreq)->completion.sender_imm; \ + (*(_message))->req_ptr = (_mxm_msg); \ + } + +#define PML_YALLA_MESSAGE_RELEASE(_message) \ + { \ + ompi_message_return(*(_message)); \ + *(_message) = NULL; \ + } + +#endif /* PML_YALLA_REQUEST_H_ */