diff --git a/ompi/mca/mtl/base/mtl_base_datatype.h b/ompi/mca/mtl/base/mtl_base_datatype.h index 741739c07e..3e09b69025 100644 --- a/ompi/mca/mtl/base/mtl_base_datatype.h +++ b/ompi/mca/mtl/base/mtl_base_datatype.h @@ -42,14 +42,15 @@ ompi_mtl_datatype_pack(struct ompi_convertor_t *convertor, iov.iov_base = malloc(max_data); if (NULL == iov.iov_base) return OMPI_ERR_OUT_OF_RESOURCE; *freeAfter = true; + } else { iov.iov_base = NULL; *freeAfter = false; } - + ompi_convertor_pack(convertor, &iov, &iov_count, &max_data, &free_after); - + *buffer = iov.iov_base; *buffer_len = iov.iov_len; @@ -98,10 +99,10 @@ ompi_mtl_datatype_unpack(struct ompi_convertor_t *convertor, max_data = iov.iov_len; - ompi_convertor_unpack(convertor, &iov, &iov_count, + if (max_data > 0 && ompi_convertor_need_buffers(convertor)) { + ompi_convertor_unpack(convertor, &iov, &iov_count, &max_data, &free_after); - if (max_data > 0 && ompi_convertor_need_buffers(convertor)) { free(buffer); } diff --git a/ompi/mca/mtl/mx/mtl_mx.c b/ompi/mca/mtl/mx/mtl_mx.c index a3e4b599ad..5b250654e5 100644 --- a/ompi/mca/mtl/mx/mtl_mx.c +++ b/ompi/mca/mtl/mx/mtl_mx.c @@ -39,7 +39,7 @@ mca_mtl_mx_module_t ompi_mtl_mx = { ompi_mtl_mx_del_procs, ompi_mtl_mx_finalize, - NULL, + ompi_mtl_mx_send, /* don't use ompi_mtl_mx_send.. */ ompi_mtl_mx_isend, ompi_mtl_mx_irecv, @@ -114,7 +114,12 @@ ompi_mtl_mx_finalize(struct mca_mtl_base_module_t* mtl) { opal_progress_unregister(ompi_mtl_mx_progress); /* free resources */ - + mx_return = mx_close_endpoint(ompi_mtl_mx.mx_endpoint); + if(mx_return != MX_SUCCESS){ + opal_output(0, "Error in mx_close_endpoint (error %s)\n", mx_strerror(mx_return)); + return OMPI_ERROR; + } + mx_return = mx_finalize(); if(mx_return != MX_SUCCESS){ opal_output(0, "Error in mx_finalize (error %s)\n", mx_strerror(mx_return)); @@ -164,68 +169,74 @@ int ompi_mtl_mx_progress( void ) { mx_status_t mx_status; uint32_t result; mca_mtl_mx_request_t* mtl_mx_request; + int completed = 0; - mx_return = mx_ipeek(ompi_mtl_mx.mx_endpoint, - &mx_request, - &result); - - if(mx_return != MX_SUCCESS) { - opal_output(0, "Error in mx_ipeek (error %s)\n", mx_strerror(mx_return)); - } - if(result) { - mx_return = mx_test(ompi_mtl_mx.mx_endpoint, - &mx_request, - &mx_status, - &result); + while(1){ + mx_return = mx_ipeek(ompi_mtl_mx.mx_endpoint, + &mx_request, + &result); + if(mx_return != MX_SUCCESS) { - opal_output(0, "Error in mx_test (error %s)\n", mx_strerror(mx_return)); - abort(); + opal_output(0, "Error in mx_ipeek (error %s)\n", mx_strerror(mx_return)); } - if(0 == result) { - opal_output(0, "Error in ompi_mtl_mx_progress, mx_ipeek returned a request, mx_test on the request resulted failure.\n"); - abort(); - } - if(mx_status.code != MX_STATUS_SUCCESS) { - opal_output(0, "Error in ompi_mtl_mx_progress, mx_test returned something other than MX_STATUS_SUCCESS: mx_status(%d).\n", + if(result) { + completed++; + mx_return = mx_test(ompi_mtl_mx.mx_endpoint, + &mx_request, + &mx_status, + &result); + if(mx_return != MX_SUCCESS) { + opal_output(0, "Error in mx_test (error %s)\n", mx_strerror(mx_return)); + abort(); + } + if(0 == result) { + opal_output(0, "Error in ompi_mtl_mx_progress, mx_ipeek returned a request, mx_test on the request resulted failure.\n"); + abort(); + } + if(mx_status.code != MX_STATUS_SUCCESS) { + opal_output(0, "Error in ompi_mtl_mx_progress, mx_test returned something other than MX_STATUS_SUCCESS: mx_status(%d).\n", mx_status); - abort(); - } - mtl_mx_request = (mca_mtl_mx_request_t*) mx_status.context; - if(OMPI_MTL_MX_ISEND == mtl_mx_request->type) { - if(mtl_mx_request->free_after) { - free(mtl_mx_request->mx_segment[0].segment_ptr); + abort(); } - mtl_mx_request->super.completion_callback(&mtl_mx_request->super); - } - if(OMPI_MTL_MX_IRECV == mtl_mx_request->type) { - - ompi_mtl_datatype_unpack(mtl_mx_request->convertor, - mtl_mx_request->mx_segment[0].segment_ptr, - mx_status.xfer_length); - /* set the status */ - MX_GET_SRC(mx_status.match_info, - mtl_mx_request->super.ompi_req->req_status.MPI_SOURCE); - MX_GET_TAG(mx_status.match_info, - mtl_mx_request->super.ompi_req->req_status.MPI_TAG); - mtl_mx_request->super.ompi_req->req_status._count = - mx_status.xfer_length; - - switch (mx_status.code) { - case MX_STATUS_SUCCESS: - mtl_mx_request->super.ompi_req->req_status.MPI_ERROR = - OMPI_SUCCESS; - break; - case MX_STATUS_TRUNCATED: - mtl_mx_request->super.ompi_req->req_status.MPI_ERROR = - MPI_ERR_TRUNCATE; - break; - default: - mtl_mx_request->super.ompi_req->req_status.MPI_ERROR = - MPI_ERR_INTERN; + mtl_mx_request = (mca_mtl_mx_request_t*) mx_status.context; + if(OMPI_MTL_MX_ISEND == mtl_mx_request->type) { + if(mtl_mx_request->free_after) { + free(mtl_mx_request->mx_segment[0].segment_ptr); + } + mtl_mx_request->super.completion_callback(&mtl_mx_request->super); } - mtl_mx_request->super.completion_callback(&mtl_mx_request->super); - } + if(OMPI_MTL_MX_IRECV == mtl_mx_request->type) { + ompi_mtl_datatype_unpack(mtl_mx_request->convertor, + mtl_mx_request->mx_segment[0].segment_ptr, + mx_status.xfer_length); + /* set the status */ + MX_GET_SRC(mx_status.match_info, + mtl_mx_request->super.ompi_req->req_status.MPI_SOURCE); + MX_GET_TAG(mx_status.match_info, + mtl_mx_request->super.ompi_req->req_status.MPI_TAG); + mtl_mx_request->super.ompi_req->req_status._count = + mx_status.xfer_length; + + switch (mx_status.code) { + case MX_STATUS_SUCCESS: + mtl_mx_request->super.ompi_req->req_status.MPI_ERROR = + OMPI_SUCCESS; + break; + case MX_STATUS_TRUNCATED: + mtl_mx_request->super.ompi_req->req_status.MPI_ERROR = + MPI_ERR_TRUNCATE; + break; + default: + mtl_mx_request->super.ompi_req->req_status.MPI_ERROR = + MPI_ERR_INTERN; + } + mtl_mx_request->super.completion_callback(&mtl_mx_request->super); + } + + } else { + return completed; + } } - return 1; } + diff --git a/ompi/mca/mtl/mx/mtl_mx.h b/ompi/mca/mtl/mx/mtl_mx.h index 0f6716d050..6bcd85924b 100644 --- a/ompi/mca/mtl/mx/mtl_mx.h +++ b/ompi/mca/mtl/mx/mtl_mx.h @@ -49,6 +49,14 @@ extern int ompi_mtl_mx_del_procs(struct mca_mtl_base_module_t* mtl, struct ompi_proc_t** procs, struct mca_mtl_base_endpoint_t **mtl_peer_data); +int +ompi_mtl_mx_send(struct mca_mtl_base_module_t* mtl, + struct ompi_communicator_t* comm, + int dest, + int tag, + struct ompi_convertor_t *convertor, + mca_pml_base_send_mode_t mode); + extern int ompi_mtl_mx_isend(struct mca_mtl_base_module_t* mtl, struct ompi_communicator_t* comm, int dest, diff --git a/ompi/mca/mtl/mx/mtl_mx_send.c b/ompi/mca/mtl/mx/mtl_mx_send.c index 8ec660e691..9f61e89f65 100644 --- a/ompi/mca/mtl/mx/mtl_mx_send.c +++ b/ompi/mca/mtl/mx/mtl_mx_send.c @@ -20,11 +20,108 @@ #include "ompi/datatype/datatype.h" #include "ompi/communicator/communicator.h" #include "ompi/datatype/convertor.h" -#include "ompi/mca/mtl/base/mtl_base_datatype.h" #include "mtl_mx.h" #include "mtl_mx_request.h" +#include "ompi/mca/mtl/base/mtl_base_datatype.h" +int +ompi_mtl_mx_send(struct mca_mtl_base_module_t* mtl, + struct ompi_communicator_t* comm, + int dest, + int tag, + struct ompi_convertor_t *convertor, + mca_pml_base_send_mode_t mode) +{ + mx_return_t mx_return; + uint64_t match_bits; + int ret; + mca_mtl_mx_request_t mtl_mx_request; + size_t length; + mx_status_t mx_status; + uint32_t result; + uint32_t timeout = 1000; + + mca_mtl_mx_endpoint_t* mx_endpoint = + (mca_mtl_mx_endpoint_t*) comm->c_pml_procs[dest]->proc_ompi->proc_pml; + + assert(mtl == &ompi_mtl_mx.super); + + MX_SET_SEND_BITS(match_bits, comm->c_contextid, comm->c_my_rank, tag); + + ret = ompi_mtl_datatype_pack(convertor, + &mtl_mx_request.mx_segment[0].segment_ptr, + &length, + &mtl_mx_request.free_after); + + + mtl_mx_request.mx_segment[0].segment_length = length; + mtl_mx_request.convertor = convertor; + mtl_mx_request.type = OMPI_MTL_MX_ISEND; + + if (OMPI_SUCCESS != ret) return ret; + + if(mode == MCA_PML_BASE_SEND_SYNCHRONOUS) { + +#if 0 + printf("issend bits: 0x%016llx\n", match_bits); +#endif + mx_return = mx_issend( ompi_mtl_mx.mx_endpoint, + mtl_mx_request.mx_segment, + 1, + mx_endpoint->mx_peer_addr, + match_bits, + &mtl_mx_request, + &mtl_mx_request.mx_request + ); + if(mx_return != MX_SUCCESS ) { + char peer_name[MX_MAX_HOSTNAME_LEN]; + if(MX_SUCCESS != mx_nic_id_to_hostname( mx_endpoint->mx_peer->nic_id, peer_name)) { + sprintf( peer_name, "unknown %lx nic_id", (long)mx_endpoint->mx_peer->nic_id ); + } + opal_output(0, "Error in mx_issend (error %s) sending to %s\n", mx_strerror(mx_return), peer_name); + } + } else { +#if 0 + printf("isend bits: 0x%016llx\n", match_bits); +#endif + mx_return = mx_isend( ompi_mtl_mx.mx_endpoint, + mtl_mx_request.mx_segment, + 1, + mx_endpoint->mx_peer_addr, + match_bits, + &mtl_mx_request, + &mtl_mx_request.mx_request + ); + + if(mx_return != MX_SUCCESS ) { + char peer_name[MX_MAX_HOSTNAME_LEN]; + if(MX_SUCCESS != mx_nic_id_to_hostname( mx_endpoint->mx_peer->nic_id, peer_name)) { + sprintf( peer_name, "unknown %lx nic_id", (long)mx_endpoint->mx_peer->nic_id ); + } + opal_output(0, "Error in mx_isend (error %s) sending to %s\n", mx_strerror(mx_return), peer_name); + } + + } + + do { + mx_return = mx_test(ompi_mtl_mx.mx_endpoint, + &mtl_mx_request.mx_request, + &mx_status, + &result); + if(mx_return != MX_SUCCESS) { + opal_output(0, "Error in mx_wait (error %s)\n", mx_strerror(mx_return)); + abort(); + } + if(result && mx_status.code != MX_STATUS_SUCCESS) { + opal_output(0, "Error in ompi_mtl_mx_send, mx_wait returned something other than MX_STATUS_SUCCESS: mx_status(%d).\n", + mx_status); + abort(); + } + } while(!result); + + return OMPI_SUCCESS; +} int ompi_mtl_mx_isend(struct mca_mtl_base_module_t* mtl, @@ -43,7 +140,7 @@ ompi_mtl_mx_isend(struct mca_mtl_base_module_t* mtl, size_t length; -mca_mtl_mx_endpoint_t* mx_endpoint = + mca_mtl_mx_endpoint_t* mx_endpoint = (mca_mtl_mx_endpoint_t*) comm->c_pml_procs[dest]->proc_ompi->proc_pml; assert(mtl == &ompi_mtl_mx.super);