/* * Copyright (C) Mellanox Technologies Ltd. 2001-2011. ALL RIGHTS RESERVED. * Copyright (c) 2013 Intel, Inc. All rights reserved * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "ompi_config.h" #include "ompi/mca/mtl/mtl.h" #include "ompi/runtime/ompi_module_exchange.h" #include "ompi/mca/mtl/base/mtl_base_datatype.h" #include "ompi/proc/proc.h" #include "ompi/communicator/communicator.h" #include "opal/memoryhooks/memory.h" #include "opal/util/show_help.h" #include "mtl_mxm.h" #include "mtl_mxm_types.h" #include "mtl_mxm_endpoint.h" #include "mtl_mxm_request.h" mca_mtl_mxm_module_t ompi_mtl_mxm = { { 0, /* max context id */ 0, /* max tag value */ 0, /* request reserve space */ 0, /* flags */ ompi_mtl_mxm_add_procs, ompi_mtl_mxm_del_procs, ompi_mtl_mxm_finalize, ompi_mtl_mxm_send, ompi_mtl_mxm_isend, ompi_mtl_mxm_irecv, ompi_mtl_mxm_iprobe, ompi_mtl_mxm_imrecv, ompi_mtl_mxm_improbe, ompi_mtl_mxm_cancel, ompi_mtl_mxm_add_comm, ompi_mtl_mxm_del_comm }, 0, 0, NULL, NULL }; static uint32_t ompi_mtl_mxm_get_job_id(void) { uint8_t unique_job_key[16]; uint32_t job_key; unsigned long long *uu; char *generated_key; uu = (unsigned long long *) unique_job_key; generated_key = getenv("OMPI_MCA_orte_precondition_transports"); memset(uu, 0, sizeof(unique_job_key)); if (!generated_key || (strlen(generated_key) != 33) || sscanf(generated_key, "%016llx-%016llx", &uu[0], &uu[1]) != 2) { opal_show_help("help-mtl-mxm.txt", "no uuid present", true, generated_key ? "could not be parsed from" : "not present in", ompi_process_info.nodename); return 0; } /* * decode OMPI_MCA_orte_precondition_transports that looks as * 000003ca00000000-0000000100000000 * jobfam-stepid * to get jobid coded with ORTE_CONSTRUCT_LOCAL_JOBID() */ #define GET_LOCAL_JOBID(local, job) \ ( ((local) & 0xffff0000) | ((job) & 0x0000ffff) ) job_key = GET_LOCAL_JOBID((uu[0]>>(8 * sizeof(int))) << 16, uu[1]>>(8 * sizeof(int))); return job_key; } int ompi_mtl_mxm_progress(void); #if MXM_API >= MXM_VERSION(2,0) static void ompi_mtl_mxm_mem_release_cb(void *buf, size_t length, void *cbdata, bool from_alloc); #endif #if MXM_API < MXM_VERSION(2,0) static int ompi_mtl_mxm_get_ep_address(ompi_mtl_mxm_ep_conn_info_t *ep_info, mxm_ptl_id_t ptlid) { size_t addrlen; mxm_error_t err; addrlen = sizeof(ep_info->ptl_addr[ptlid]); err = mxm_ep_address(ompi_mtl_mxm.ep, ptlid, (struct sockaddr *) &ep_info->ptl_addr[ptlid], &addrlen); if (MXM_OK != err) { opal_show_help("help-mtl-mxm.txt", "unable to extract endpoint ptl address", true, (int)ptlid, mxm_error_string(err)); return OMPI_ERROR; } return OMPI_SUCCESS; } #else static int ompi_mtl_mxm_get_ep_address(void **address_p, size_t *address_len_p) { mxm_error_t err; *address_len_p = 0; err = mxm_ep_get_address(ompi_mtl_mxm.ep, NULL, address_len_p); if (err != MXM_ERR_BUFFER_TOO_SMALL) { MXM_ERROR("Failed to get ep address length"); return OMPI_ERROR; } *address_p = malloc(*address_len_p); if (*address_p == NULL) { MXM_ERROR("Failed to allocate ep address buffer"); return OMPI_ERR_OUT_OF_RESOURCE; } err = mxm_ep_get_address(ompi_mtl_mxm.ep, *address_p, address_len_p); if (MXM_OK != err) { opal_show_help("help-mtl-mxm.txt", "unable to extract endpoint address", true, mxm_error_string(err)); return OMPI_ERROR; } return OMPI_SUCCESS; } #endif #define max(a,b) ((a)>(b)?(a):(b)) static mxm_error_t ompi_mtl_mxm_create_ep(mxm_h ctx, mxm_ep_h *ep, unsigned ptl_bitmap, int lr, uint32_t jobid, uint64_t mxlr, int nlps) { mxm_error_t err; #if MXM_API < MXM_VERSION(2,0) ompi_mtl_mxm.mxm_ep_opts->job_id = jobid; ompi_mtl_mxm.mxm_ep_opts->local_rank = lr; ompi_mtl_mxm.mxm_ep_opts->num_local_procs = nlps; err = mxm_ep_create(ctx, ompi_mtl_mxm.mxm_ep_opts, ep); #else err = mxm_ep_create(ctx, ompi_mtl_mxm.mxm_ep_opts, ep); #endif return err; } /* * send information using modex (in some case there is limitation on data size for example ess/pmi) * set size of data sent for once * */ static int ompi_mtl_mxm_send_ep_address(void *address, size_t address_len) { char *modex_component_name = mca_base_component_to_string(&mca_mtl_mxm_component.super.mtl_version); char *modex_name = malloc(strlen(modex_component_name) + 5); const size_t modex_max_size = 0x60; unsigned char *modex_buf_ptr; size_t modex_buf_size; size_t modex_cur_size; int modex_name_id = 0; int rc; /* Send address length */ sprintf(modex_name, "%s-len", modex_component_name); rc = ompi_modex_send_string((const char *)modex_name, &address_len, sizeof(address_len)); if (OMPI_SUCCESS != rc) { MXM_ERROR("failed to send address length"); goto bail; } /* Send address, in parts. * modex name looks as mtl.mxm.1.5-18 where mtl.mxm.1.5 is the component and 18 is part index. */ modex_buf_size = address_len; modex_buf_ptr = address; while (modex_buf_size) { sprintf(modex_name, "%s-%d", modex_component_name, modex_name_id); modex_cur_size = (modex_buf_size < modex_max_size) ? modex_buf_size : modex_max_size; rc = ompi_modex_send_string(modex_name, modex_buf_ptr, modex_cur_size); if (OMPI_SUCCESS != rc) { MXM_ERROR("Open MPI couldn't distribute EP connection details"); goto bail; } modex_name_id++; modex_buf_ptr += modex_cur_size; modex_buf_size -= modex_cur_size; } rc = OMPI_SUCCESS; bail: free(modex_component_name); free(modex_name); return rc; } /* * recieve information using modex */ static int ompi_mtl_mxm_recv_ep_address(ompi_proc_t *source_proc, void **address_p, size_t *address_len_p) { char *modex_component_name = mca_base_component_to_string(&mca_mtl_mxm_component.super.mtl_version); char *modex_name = malloc(strlen(modex_component_name) + 5); unsigned char *modex_buf_ptr; size_t modex_cur_size; size_t modex_buf_size; size_t *address_len_buf_ptr; int modex_name_id = 0; int rc; *address_p = NULL; *address_len_p = 0; /* Receive address length */ sprintf(modex_name, "%s-len", modex_component_name); rc = ompi_modex_recv_string(modex_name, source_proc, (void**)&address_len_buf_ptr, &modex_cur_size); if (OMPI_SUCCESS != rc) { MXM_ERROR("Failed to receive ep address length"); goto bail; } /* Allocate buffer to hold the address */ *address_len_p = *address_len_buf_ptr; *address_p = malloc(*address_len_p); if (*address_p == NULL) { MXM_ERROR("Failed to allocate modex receive buffer"); rc = OMPI_ERR_OUT_OF_RESOURCE; goto bail; } /* Receive the data, in parts */ modex_buf_size = 0; while (modex_buf_size < *address_len_p) { sprintf(modex_name, "%s-%d", modex_component_name, modex_name_id); rc = ompi_modex_recv_string(modex_name, source_proc, (void**)&modex_buf_ptr, &modex_cur_size); if (OMPI_SUCCESS != rc) { MXM_ERROR("Open MPI couldn't distribute EP connection details"); goto bail; } memcpy((char*)(*address_p) + modex_buf_size, modex_buf_ptr, modex_cur_size); modex_buf_size += modex_cur_size; modex_name_id++; } rc = OMPI_SUCCESS; bail: free(modex_component_name); free(modex_name); return rc; } int ompi_mtl_mxm_module_init(void) { #if MXM_API < MXM_VERSION(2,0) ompi_mtl_mxm_ep_conn_info_t ep_info; #endif void *ep_address; size_t ep_address_len; mxm_error_t err; uint32_t jobid; uint64_t mxlr; ompi_proc_t **procs; unsigned ptl_bitmap; size_t totps, proc; int lr, nlps; int rc; mxlr = 0; lr = -1; jobid = ompi_mtl_mxm_get_job_id(); if (0 == jobid) { MXM_ERROR("Failed to generate jobid"); return OMPI_ERROR; } if (NULL == (procs = ompi_proc_world(&totps))) { MXM_ERROR("Unable to obtain process list"); return OMPI_ERROR; } if (totps < (size_t)ompi_mtl_mxm.mxm_np) { MXM_VERBOSE(1, "MXM support will be disabled because of total number " "of processes (%lu) is less than the minimum set by the " "mtl_mxm_np MCA parameter (%u)", totps, ompi_mtl_mxm.mxm_np); return OMPI_ERR_NOT_SUPPORTED; } MXM_VERBOSE(1, "MXM support enabled"); if (ORTE_NODE_RANK_INVALID == (lr = ompi_process_info.my_node_rank)) { MXM_ERROR("Unable to obtain local node rank"); return OMPI_ERROR; } nlps = ompi_process_info.num_local_peers + 1; for (proc = 0; proc < totps; proc++) { if (OPAL_PROC_ON_LOCAL_NODE(procs[proc]->proc_flags)) { mxlr = max(mxlr, procs[proc]->proc_name.vpid); } } /* Setup the endpoint options and local addresses to bind to. */ #if MXM_API < MXM_VERSION(2,0) ptl_bitmap = ompi_mtl_mxm.mxm_ctx_opts->ptl_bitmap; #else ptl_bitmap = 0; #endif /* Open MXM endpoint */ err = ompi_mtl_mxm_create_ep(ompi_mtl_mxm.mxm_context, &ompi_mtl_mxm.ep, ptl_bitmap, lr, jobid, mxlr, nlps); if (MXM_OK != err) { opal_show_help("help-mtl-mxm.txt", "unable to create endpoint", true, mxm_error_string(err)); return OMPI_ERROR; } /* * Get address for each PTL on this endpoint, and share it with other ranks. */ #if MXM_API < MXM_VERSION(2,0) if ((ptl_bitmap & MXM_BIT(MXM_PTL_SELF)) && OMPI_SUCCESS != ompi_mtl_mxm_get_ep_address(&ep_info, MXM_PTL_SELF)) { return OMPI_ERROR; } if ((ptl_bitmap & MXM_BIT(MXM_PTL_RDMA)) && OMPI_SUCCESS != ompi_mtl_mxm_get_ep_address(&ep_info, MXM_PTL_RDMA)) { return OMPI_ERROR; } if ((ptl_bitmap & MXM_BIT(MXM_PTL_SHM)) && OMPI_SUCCESS != ompi_mtl_mxm_get_ep_address(&ep_info, MXM_PTL_SHM)) { return OMPI_ERROR; } ep_address = &ep_info; ep_address_len = sizeof(ep_info); #else rc = ompi_mtl_mxm_get_ep_address(&ep_address, &ep_address_len); if (OMPI_SUCCESS != rc) { return rc; } #endif rc = ompi_mtl_mxm_send_ep_address(ep_address, ep_address_len); if (OMPI_SUCCESS != rc) { MXM_ERROR("Modex session failed."); return rc; } #if MXM_API >= MXM_VERSION(2,0) free(ep_address); #endif /* Register the MXM progress function */ opal_progress_register(ompi_mtl_mxm_progress); #if MXM_API >= MXM_VERSION(2,0) if (ompi_mtl_mxm.using_mem_hooks) { opal_mem_hooks_register_release(ompi_mtl_mxm_mem_release_cb, NULL); } #endif return OMPI_SUCCESS; } int ompi_mtl_mxm_finalize(struct mca_mtl_base_module_t* mtl) { #if MXM_API >= MXM_VERSION(2,0) if (ompi_mtl_mxm.using_mem_hooks) { opal_mem_hooks_unregister_release(ompi_mtl_mxm_mem_release_cb); } #endif opal_progress_unregister(ompi_mtl_mxm_progress); mxm_ep_destroy(ompi_mtl_mxm.ep); return OMPI_SUCCESS; } int ompi_mtl_mxm_add_procs(struct mca_mtl_base_module_t *mtl, size_t nprocs, struct ompi_proc_t** procs) { #if MXM_API < MXM_VERSION(2,0) ompi_mtl_mxm_ep_conn_info_t *ep_info; mxm_conn_req_t *conn_reqs; #endif void *ep_address; size_t ep_address_len; mxm_error_t err; size_t i, ep_index = 0; int rc; mca_mtl_mxm_endpoint_t *endpoint; assert(mtl == &ompi_mtl_mxm.super); #if MXM_API < MXM_VERSION(2,0) /* Allocate connection requests */ conn_reqs = calloc(nprocs, sizeof(mxm_conn_req_t)); ep_info = calloc(nprocs, sizeof(ompi_mtl_mxm_ep_conn_info_t)); if (NULL == conn_reqs || NULL == ep_info) { rc = OMPI_ERR_OUT_OF_RESOURCE; goto bail; } #endif /* Get the EP connection requests for all the processes from modex */ for (i = 0; i < nprocs; ++i) { if (NULL != procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL]) { continue; /* already connected to this endpoint */ } rc = ompi_mtl_mxm_recv_ep_address(procs[i], &ep_address, &ep_address_len); if (rc != OMPI_SUCCESS) { goto bail; } #if MXM_API < MXM_VERSION(2,0) if (ep_address_len != sizeof(ep_info[i])) { MXM_ERROR("Invalid endpoint address length"); rc = OMPI_ERROR; goto bail; } memcpy(&ep_info[i], ep_address, ep_address_len); conn_reqs[ep_index].ptl_addr[MXM_PTL_SELF] = (struct sockaddr *)&(ep_info[i].ptl_addr[MXM_PTL_SELF]); conn_reqs[ep_index].ptl_addr[MXM_PTL_SHM] = (struct sockaddr *)&(ep_info[i].ptl_addr[MXM_PTL_SHM]); conn_reqs[ep_index].ptl_addr[MXM_PTL_RDMA] = (struct sockaddr *)&(ep_info[i].ptl_addr[MXM_PTL_RDMA]); ep_index++; #else endpoint = OBJ_NEW(mca_mtl_mxm_endpoint_t); endpoint->mtl_mxm_module = &ompi_mtl_mxm; err = mxm_ep_connect(ompi_mtl_mxm.ep, ep_address, &endpoint->mxm_conn); if (err != MXM_OK) { MXM_ERROR("MXM returned connect error: %s\n", mxm_error_string(err)); rc = OMPI_ERROR; goto bail; } procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL] = endpoint; #endif free(ep_address); } #if MXM_API < MXM_VERSION(2,0) /* Connect to remote peers */ err = mxm_ep_connect(ompi_mtl_mxm.ep, conn_reqs, ep_index, -1); if (MXM_OK != err) { MXM_ERROR("MXM returned connect error: %s\n", mxm_error_string(err)); for (i = 0; i < ep_index; ++i) { if (MXM_OK != conn_reqs[i].error) { MXM_ERROR("MXM EP connect to %s error: %s\n", (NULL == procs[i]->proc_hostname) ? "unknown" : procs[i]->proc_hostname, mxm_error_string(conn_reqs[i].error)); } } rc = OMPI_ERROR; goto bail; } /* Save returned connections */ for (i = 0; i < ep_index; ++i) { endpoint = OBJ_NEW(mca_mtl_mxm_endpoint_t); endpoint->mtl_mxm_module = &ompi_mtl_mxm; endpoint->mxm_conn = conn_reqs[i].conn; procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL] = endpoint; } #endif rc = OMPI_SUCCESS; bail: #if MXM_API < MXM_VERSION(2,0) free(conn_reqs); free(ep_info); #endif return rc; } int ompi_mtl_mxm_del_procs(struct mca_mtl_base_module_t *mtl, size_t nprocs, struct ompi_proc_t** procs) { size_t i; for (i = 0; i < nprocs; ++i) { mca_mtl_mxm_endpoint_t *endpoint = (mca_mtl_mxm_endpoint_t*) procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_MTL]; mxm_ep_disconnect(endpoint->mxm_conn); OBJ_RELEASE(endpoint); } return OMPI_SUCCESS; } int ompi_mtl_mxm_add_comm(struct mca_mtl_base_module_t *mtl, struct ompi_communicator_t *comm) { mxm_error_t err; mxm_mq_h mq; assert(mtl == &ompi_mtl_mxm.super); assert(NULL != ompi_mtl_mxm.mxm_context); err = mxm_mq_create(ompi_mtl_mxm.mxm_context, comm->c_contextid, &mq); if (MXM_OK != err) { opal_show_help("help-mtl-mxm.txt", "mxm mq create", true, mxm_error_string(err)); return OMPI_ERROR; } comm->c_pml_comm = (void*)mq; return OMPI_SUCCESS; } int ompi_mtl_mxm_del_comm(struct mca_mtl_base_module_t *mtl, struct ompi_communicator_t *comm) { assert(mtl == &ompi_mtl_mxm.super); if (NULL != ompi_mtl_mxm.mxm_context) { mxm_mq_destroy((mxm_mq_h)comm->c_pml_comm); } return OMPI_SUCCESS; } int ompi_mtl_mxm_progress(void) { mxm_error_t err; err = mxm_progress(ompi_mtl_mxm.mxm_context); if ((MXM_OK != err) && (MXM_ERR_NO_PROGRESS != err) ) { opal_show_help("help-mtl-mxm.txt", "errors during mxm_progress", true, mxm_error_string(err)); } return 1; } #if MXM_API >= MXM_VERSION(2,0) static void ompi_mtl_mxm_mem_release_cb(void *buf, size_t length, void *cbdata, bool from_alloc) { mxm_mem_unmap(ompi_mtl_mxm.mxm_context, buf, length, from_alloc ? MXM_MEM_UNMAP_MARK_INVALID : 0); } #endif OBJ_CLASS_INSTANCE( ompi_mtl_mxm_message_t, ompi_free_list_item_t, NULL, NULL);