From 5f939c53bef0a3ac90e88851d2627d62d43b63f8 Mon Sep 17 00:00:00 2001 From: Brian Barrett Date: Mon, 1 May 2006 20:03:49 +0000 Subject: [PATCH] * first take at send / receive for a poratls pml (still really dumb and simple) This commit was SVN r9786. --- ompi/mca/pml/portals/Makefile.am | 4 + ompi/mca/pml/portals/pml_portals.c | 169 +++++++++-- ompi/mca/pml/portals/pml_portals.h | 163 +++++++++-- ompi/mca/pml/portals/pml_portals_compat.c | 33 +++ ompi/mca/pml/portals/pml_portals_compat.h | 48 ++++ .../pml/portals/pml_portals_compat_redstorm.c | 101 +++++++ .../mca/pml/portals/pml_portals_compat_utcp.c | 263 ++++++++++++++++++ ompi/mca/pml/portals/pml_portals_component.c | 117 ++++++-- ompi/mca/pml/portals/pml_portals_datatype.h | 118 ++++++++ ompi/mca/pml/portals/pml_portals_probe.c | 15 +- ompi/mca/pml/portals/pml_portals_progress.c | 4 +- ompi/mca/pml/portals/pml_portals_recv.c | 169 ++++++++++- ompi/mca/pml/portals/pml_portals_send.c | 91 +++++- ompi/mca/pml/portals/pml_portals_start.c | 6 +- 14 files changed, 1206 insertions(+), 95 deletions(-) create mode 100644 ompi/mca/pml/portals/pml_portals_compat.c create mode 100644 ompi/mca/pml/portals/pml_portals_compat.h create mode 100644 ompi/mca/pml/portals/pml_portals_compat_redstorm.c create mode 100644 ompi/mca/pml/portals/pml_portals_compat_utcp.c create mode 100644 ompi/mca/pml/portals/pml_portals_datatype.h diff --git a/ompi/mca/pml/portals/Makefile.am b/ompi/mca/pml/portals/Makefile.am index 5354eebc35..1d2f230b8d 100644 --- a/ompi/mca/pml/portals/Makefile.am +++ b/ompi/mca/pml/portals/Makefile.am @@ -30,9 +30,13 @@ component_noinst = libmca_pml_portals.la component_install = endif +EXTRA_DIST = pml_portals_compat_redstorm.c pml_portals_compat_utcp.c + local_sources = \ pml_portals.c \ pml_portals.h \ + pml_portals_compat.c \ + pml_portals_compat.h \ pml_portals_component.c \ pml_portals_probe.c \ pml_portals_recv.c \ diff --git a/ompi/mca/pml/portals/pml_portals.c b/ompi/mca/pml/portals/pml_portals.c index adf95e79bd..37fd4615a5 100644 --- a/ompi/mca/pml/portals/pml_portals.c +++ b/ompi/mca/pml/portals/pml_portals.c @@ -22,56 +22,173 @@ #include "ompi/communicator/communicator.h" #include "opal/class/opal_list.h" -mca_pml_portals_t mca_pml_portals = { +ompi_pml_portals_t ompi_pml_portals = { { - mca_pml_portals_add_procs, - mca_pml_portals_del_procs, - mca_pml_portals_enable, - mca_pml_portals_progress, - mca_pml_portals_add_comm, - mca_pml_portals_del_comm, - mca_pml_portals_irecv_init, - mca_pml_portals_irecv, - mca_pml_portals_recv, - mca_pml_portals_isend_init, - mca_pml_portals_isend, - mca_pml_portals_send, - mca_pml_portals_iprobe, - mca_pml_portals_probe, - mca_pml_portals_start, - mca_pml_portals_dump, - 32768, /* max tag value */ - 100 /* max cid - BWB - fix me */ + ompi_pml_portals_add_procs, + ompi_pml_portals_del_procs, + ompi_pml_portals_enable, + ompi_pml_portals_progress, + ompi_pml_portals_add_comm, + ompi_pml_portals_del_comm, + ompi_pml_portals_irecv_init, + ompi_pml_portals_irecv, + ompi_pml_portals_recv, + ompi_pml_portals_isend_init, + ompi_pml_portals_isend, + ompi_pml_portals_send, + ompi_pml_portals_iprobe, + ompi_pml_portals_probe, + ompi_pml_portals_start, + ompi_pml_portals_dump, + (1UL << 30), /* max tag value - must allow negatives */ + 8191 /* max cid - 2^13 - 1 */ } }; -int mca_pml_portals_enable(bool enable) + +int +ompi_pml_portals_enable(bool enable) { return OMPI_SUCCESS; } -int mca_pml_portals_add_comm(ompi_communicator_t* comm) + +int +ompi_pml_portals_add_comm(ompi_communicator_t* comm) { + size_t comm_size = comm->c_remote_group->grp_proc_count; + size_t i; + + /* allocate portals pml specific information */ + comm->c_pml_comm = NULL; + comm->c_pml_procs = malloc(comm_size * sizeof(ompi_pml_portals_proc_t*)); + + if (NULL == comm->c_pml_procs) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + for (i = 0 ; i < comm_size ; ++i) { + comm->c_pml_procs[i] = comm->c_remote_group->grp_proc_pointers[i]->proc_pml; + OBJ_RETAIN(comm->c_pml_procs[i]); + } + return OMPI_SUCCESS; } -int mca_pml_portals_del_comm(ompi_communicator_t* comm) + +int +ompi_pml_portals_del_comm(ompi_communicator_t* comm) { + size_t comm_size = comm->c_remote_group->grp_proc_count; + size_t i; + + for (i = 0 ; i < comm_size ; ++i) { + OBJ_RELEASE(comm->c_pml_procs[i]); + } + free(comm->c_pml_procs); + return OMPI_SUCCESS; } -int mca_pml_portals_add_procs(struct ompi_proc_t** procs, size_t nprocs) + +int +ompi_pml_portals_add_procs(struct ompi_proc_t** procs, size_t nprocs) { + size_t i; + bool done_init = false; + int ret; + uint64_t match_bits = 0; + ptl_process_id_t portals_proc; + + if (0 == nprocs) return OMPI_SUCCESS; + + /* allocate space for our pml information */ + for (i = 0 ; i < nprocs ; ++i) { + procs[i]->proc_pml = (mca_pml_proc_t*) OBJ_NEW(ompi_pml_portals_proc_t); + procs[i]->proc_pml->proc_ompi = procs[i]; + } + + ompi_pml_portals_add_procs_compat(procs, nprocs); + + if (!done_init) { + ptl_md_t md; + ptl_handle_md_t md_h; + + /* setup our event queues */ + ret = PtlEQAlloc(ompi_pml_portals.portals_ni_h, + 10, /* BWB - fix me */ + PTL_EQ_HANDLER_NONE, + &(ompi_pml_portals.portals_blocking_send_queue)); + assert(ret == PTL_OK); + + ret = PtlEQAlloc(ompi_pml_portals.portals_ni_h, + 10, /* BWB - fix me */ + PTL_EQ_HANDLER_NONE, + &(ompi_pml_portals.portals_blocking_receive_queue)); + + ret = PtlEQAlloc(ompi_pml_portals.portals_ni_h, + 1024, /* BWB - fix me */ + PTL_EQ_HANDLER_NONE, + &(ompi_pml_portals.portals_unexpected_receive_queue)); + + ret = PtlEQAlloc(ompi_pml_portals.portals_ni_h, + 1024, /* BWB - fix me */ + PTL_EQ_HANDLER_NONE, + &(ompi_pml_portals.portals_nonblocking_queue)); + + /* create unexpected message match entry */ + portals_proc.nid = PTL_NID_ANY; + portals_proc.pid = PTL_PID_ANY; + + PtlMEAttach(ompi_pml_portals.portals_ni_h, + PML_PTLS_INDEX_RECV, + portals_proc, + match_bits, + ~match_bits, + PTL_RETAIN, + PTL_INS_AFTER, + &(ompi_pml_portals.portals_unexpected_me_h)); + + md.start = NULL; + md.length = 0; + md.threshold = PTL_MD_THRESH_INF; + md.options = (PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_ACK_DISABLE | PTL_MD_EVENT_START_DISABLE); + md.eq_handle = ompi_pml_portals.portals_unexpected_receive_queue; + + PtlMDAttach(ompi_pml_portals.portals_unexpected_me_h, + md, PTL_RETAIN, &md_h); + + done_init = true; + } + return OMPI_SUCCESS; } -int mca_pml_portals_del_procs(struct ompi_proc_t** procs, size_t nprocs) + +int +ompi_pml_portals_del_procs(struct ompi_proc_t** procs, size_t nprocs) { + size_t i; + + if (0 == nprocs) return OMPI_SUCCESS; + + /* allocate space for our pml information */ + for (i = 0 ; i < nprocs ; ++i) { + OBJ_RELEASE(procs[i]->proc_pml); + } + return OMPI_SUCCESS; } /* print any available useful information from this communicator */ -int mca_pml_portals_dump(struct ompi_communicator_t* comm, int verbose) +int +ompi_pml_portals_dump(struct ompi_communicator_t* comm, int verbose) { - return OMPI_SUCCESS; + return OMPI_ERR_NOT_IMPLEMENTED; } + + +/* class information for OMPI proc structure */ +OBJ_CLASS_INSTANCE(ompi_pml_portals_proc_t, opal_list_item_t, + NULL, NULL); + diff --git a/ompi/mca/pml/portals/pml_portals.h b/ompi/mca/pml/portals/pml_portals.h index fca596cdca..eba3133535 100644 --- a/ompi/mca/pml/portals/pml_portals.h +++ b/ompi/mca/pml/portals/pml_portals.h @@ -25,13 +25,16 @@ #include "opal/util/cmd_line.h" #include "ompi/request/request.h" #include "ompi/mca/pml/pml.h" +#include "ompi/mca/pml/base/base.h" #include "ompi/datatype/datatype.h" +#include "pml_portals_compat.h" + #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif -struct mca_pml_portals_t { +struct ompi_pml_portals_t { mca_pml_base_module_t super; /* free list of requests */ @@ -40,25 +43,145 @@ struct mca_pml_portals_t { /* list of pending send requests */ opal_list_t portals_send_pending; -}; -typedef struct mca_pml_portals_t mca_pml_portals_t; -extern mca_pml_portals_t mca_pml_portals; +#if OMPI_BTL_PORTALS_UTCP + /* ethernet interface to use - only has meaning with utcp + reference */ + char *portals_ifname; +#endif + + /* output channel for debugging. Value settings when using + * output_verbose: + * + * - 0 : critical user information + * - 10: general execution diagnostic information + * - 20: initialization / shutdown diagnostic information + * - 30: basic debugging information + * - 90: useful only to developers + * - 100: lots and lots of performance impacting output + */ + int portals_output; + + /* switch over point for eager -> long messages */ + int portals_eager_limit; + + /* our portals network interface */ + ptl_handle_ni_t portals_ni_h; + + /* blocking send event queue */ + ptl_handle_eq_t portals_blocking_send_queue; + + /* blocking receive event queue */ + ptl_handle_eq_t portals_blocking_receive_queue; + + /* unexpected receive event queue */ + ptl_handle_eq_t portals_unexpected_receive_queue; + + /* nonblocking event queue */ + ptl_handle_eq_t portals_nonblocking_queue; + + ptl_handle_me_t portals_unexpected_me_h; + + opal_list_t portals_unexpected_events; +}; +typedef struct ompi_pml_portals_t ompi_pml_portals_t; +extern ompi_pml_portals_t ompi_pml_portals; + +extern mca_pml_base_component_1_0_0_t mca_pml_portals_component; + +/* a pointer to this structure is hung off each ompi_proc_t when + add_procs is called */ +struct ompi_pml_portals_proc_t { + mca_pml_proc_t super; /**< see pml.h */ + ptl_process_id_t proc_id; /**< Portals process id */ +}; +typedef struct ompi_pml_portals_proc_t ompi_pml_portals_proc_t; +extern opal_class_t ompi_pml_portals_proc_t_class; + + + +/* + * Portals match info + */ + +#define PML_PTLS_READY 0x4000000000000000ULL +#define PML_PTLS_LONG 0x2000000000000000ULL +#define PML_PTLS_SHORT 0x1000000000000000ULL + +#define PML_PTLS_PROT_MASK 0xE000000000000000ULL +#define PML_PTLS_CTX_MASK 0x1FFF000000000000ULL +#define PML_PTLS_SOURCE_MASK 0x0000FFFF00000000ULL +#define PML_PTLS_TAG_MASK 0x00000000FFFFFFFFULL + +#define PML_PTLS_RECV_BITS(match_bits, ignore_bits, ctxid, src, tag) \ +{ \ + match_bits = 0; \ + ignore_bits = PML_PTLS_PROT_MASK; \ +\ + match_bits = ctxid; \ + match_bits = (match_bits << 16); \ +\ + if (src == MPI_ANY_SOURCE) { \ + match_bits = (match_bits << 32); \ + ignore_bits |= PML_PTLS_SOURCE_MASK; \ + } else { \ + match_bits |= src; \ + match_bits = (match_bits << 32); \ + } \ +\ + if (tag == MPI_ANY_TAG) { \ + ignore_bits |= PML_PTLS_TAG_MASK; \ + } else { \ + match_bits |= tag; \ + } \ +} \ + +#define PML_PTLS_SEND_BITS(match_bits, ctxid, src, tag) \ +{ \ + match_bits = ctxid; \ + match_bits = (match_bits << 16); \ + match_bits |= src; \ + match_bits = (match_bits << 32); \ + match_bits |= tag; \ + match_bits |= PML_PTLS_LONG; \ +} + +#define PML_PTLS_IS_LONG(match_bits) (match_bits & PML_PTLS_LONG) +#define PML_PTLS_GET_SOURCE(match_bits, src) \ +{ \ + src = (int)((PML_PTLS_SOURCE_MASK & match_bits) >> 32); \ +} + +#define PML_PTLS_GET_TAG(match_bits, tag) \ +{ \ + tag = (int)(PML_PTLS_TAG_MASK & match_bits); \ +} + +#define PML_PTLS_GET_CONTEXT(match_bits, ctxid) \ +{ \ + ctxid = (int)((PML_PTLS_CTX_MASK & match_bits) >> 48); \ +} + + +/* table indexes */ +#define PML_PTLS_INDEX_RECV (OMPI_PML_PORTALS_STARTING_TABLE_ID) +#define PML_PTLS_INDEX_READ (OMPI_PML_PORTALS_STARTING_TABLE_ID) +#define PML_PTLS_INDEX_ACK (OMPI_PML_PORTALS_STARTING_TABLE_ID) /* * PML interface functions. */ -extern int mca_pml_portals_add_procs(struct ompi_proc_t **procs, size_t nprocs); -extern int mca_pml_portals_del_procs(struct ompi_proc_t **procs, size_t nprocs); +extern int ompi_pml_portals_add_procs(struct ompi_proc_t **procs, size_t nprocs); +extern int ompi_pml_portals_del_procs(struct ompi_proc_t **procs, size_t nprocs); -extern int mca_pml_portals_enable(bool enable); -extern int mca_pml_portals_progress(void); +extern int ompi_pml_portals_enable(bool enable); +extern int ompi_pml_portals_progress(void); -extern int mca_pml_portals_add_comm(struct ompi_communicator_t* comm); -extern int mca_pml_portals_del_comm(struct ompi_communicator_t* comm); +extern int ompi_pml_portals_add_comm(struct ompi_communicator_t* comm); +extern int ompi_pml_portals_del_comm(struct ompi_communicator_t* comm); -extern int mca_pml_portals_irecv_init(void *buf, +extern int ompi_pml_portals_irecv_init(void *buf, size_t count, ompi_datatype_t *datatype, int src, @@ -66,7 +189,7 @@ extern int mca_pml_portals_irecv_init(void *buf, struct ompi_communicator_t* comm, struct ompi_request_t **request); -extern int mca_pml_portals_irecv(void *buf, +extern int ompi_pml_portals_irecv(void *buf, size_t count, ompi_datatype_t *datatype, int src, @@ -74,7 +197,7 @@ extern int mca_pml_portals_irecv(void *buf, struct ompi_communicator_t* comm, struct ompi_request_t **request); -extern int mca_pml_portals_recv(void *buf, +extern int ompi_pml_portals_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src, @@ -82,7 +205,7 @@ extern int mca_pml_portals_recv(void *buf, struct ompi_communicator_t* comm, ompi_status_public_t* status ); -extern int mca_pml_portals_isend_init(void *buf, +extern int ompi_pml_portals_isend_init(void *buf, size_t count, ompi_datatype_t *datatype, int dst, @@ -91,7 +214,7 @@ extern int mca_pml_portals_isend_init(void *buf, struct ompi_communicator_t* comm, struct ompi_request_t **request); -extern int mca_pml_portals_isend(void *buf, +extern int ompi_pml_portals_isend(void *buf, size_t count, ompi_datatype_t *datatype, int dst, @@ -100,7 +223,7 @@ extern int mca_pml_portals_isend(void *buf, struct ompi_communicator_t* comm, struct ompi_request_t **request); -extern int mca_pml_portals_send(void *buf, +extern int ompi_pml_portals_send(void *buf, size_t count, ompi_datatype_t *datatype, int dst, @@ -108,21 +231,21 @@ extern int mca_pml_portals_send(void *buf, mca_pml_base_send_mode_t mode, struct ompi_communicator_t* comm); -extern int mca_pml_portals_iprobe(int dst, +extern int ompi_pml_portals_iprobe(int dst, int tag, struct ompi_communicator_t* comm, int *matched, ompi_status_public_t* status); -extern int mca_pml_portals_probe(int dst, +extern int ompi_pml_portals_probe(int dst, int tag, struct ompi_communicator_t* comm, ompi_status_public_t* status); -extern int mca_pml_portals_start(size_t count, ompi_request_t** requests); +extern int ompi_pml_portals_start(size_t count, ompi_request_t** requests); -extern int mca_pml_portals_dump(struct ompi_communicator_t* comm, +extern int ompi_pml_portals_dump(struct ompi_communicator_t* comm, int verbose); diff --git a/ompi/mca/pml/portals/pml_portals_compat.c b/ompi/mca/pml/portals/pml_portals_compat.c new file mode 100644 index 0000000000..8b83db3372 --- /dev/null +++ b/ompi/mca/pml/portals/pml_portals_compat.c @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#if OMPI_PML_PORTALS_UTCP + +#include "pml_portals_compat_utcp.c" + +#elif OMPI_PML_PORTALS_REDSTORM + +#include "pml_portals_compat_redstorm.c" + +#else + +#error "Unknown Portals library configuration" + +#endif diff --git a/ompi/mca/pml/portals/pml_portals_compat.h b/ompi/mca/pml/portals/pml_portals_compat.h new file mode 100644 index 0000000000..3c3c7697d9 --- /dev/null +++ b/ompi/mca/pml/portals/pml_portals_compat.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#ifndef OMPI_PML_PORTALS_COMPAT_H +#define OMPI_PML_PORTALS_COMPAT_H + +#if OMPI_PML_PORTALS_UTCP + +#include + +#include +#include +#include +#include + +#elif OMPI_PML_PORTALS_REDSTORM + +#include + +#define PTL_EQ_HANDLER_NONE NULL + +#else + +#error "Unknown Portals library configuration" + +#endif + +extern int ompi_pml_portals_init_compat(void); +extern int ompi_pml_portals_add_procs_compat(struct ompi_proc_t **procs, + size_t nprocs); + +#endif /* OMPI_PML_PORTALS_NAL_H */ diff --git a/ompi/mca/pml/portals/pml_portals_compat_redstorm.c b/ompi/mca/pml/portals/pml_portals_compat_redstorm.c new file mode 100644 index 0000000000..674c91c49d --- /dev/null +++ b/ompi/mca/pml/portals/pml_portals_compat_redstorm.c @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/constants.h" +#include "opal/util/output.h" + +#include "pml_portals.h" +#include "pml_portals_compat.h" + +#include + +int +ompi_pml_portals_init_compat(void) +{ + int ret, max_interfaces; + uint32_t i; + + /* + * Initialize Portals interface + */ + ret = PtlInit(&max_interfaces); + if (PTL_OK != ret) { + opal_output_verbose(10, mca_pml_portals_component.portals_output, + "PtlInit failed, returning %d\n", ret); + return OMPI_ERR_FATAL; + } + + /* + * Initialize a network device + */ + ret = PtlNIInit(PTL_IFACE_DEFAULT, /* interface to initialize */ + PTL_PID_ANY, /* let library assign our pid */ + NULL, /* no desired limits */ + NULL, /* actual limits */ + &(mca_pml_portals_module.portals_ni_h) /* our interface handle */ + ); + if (PTL_OK != ret && PTL_IFACE_DUP != ret) { + opal_output_verbose(10, mca_pml_portals_component.portals_output, + "PtlNIInit failed, returning %d\n", ret); + return OMPI_ERR_FATAL; + } + + return OMPI_SUCCESS; +} + + +int +ompi_pml_portals_add_procs_compat(struct ompi_proc_t **procs, size_t nprocs) +{ + int nptl_procs = 0; + cnos_nidpid_map_t *map; + int i; + + /* + * FIXME - XXX - FIXME + * BWB - implicit assumption that cnos procs list will match our + * procs list. Don't know what to do about that... + */ + + nptl_procs = cnos_get_nidpid_map(&map); + if (nptl_procs <= 0) { + opal_output_verbose(10, mca_pml_portals_component.portals_output, + "cnos_get_nidpid_map() returned %d", nptl_procs); + return OMPI_ERR_FATAL; + } else if (nptl_procs != nprocs) { + opal_output_verbose(10, mca_pml_portals_component.portals_output, + "nptl_procs != nprocs (%d, %d)", nptl_procs, + nprocs); + return OMPI_ERR_FATAL; + } else { + opal_output_verbose(10, mca_pml_portals_component.portals_output, + "nptl_procs: %d", nptl_procs); + } + + for (i = 0 ; i < nprocs ; ++i) { + opal_output_verbose(120, mca_pml_portals_component.portals_output, + "rank %d: nid %ld, pid %ld", i, + map[i].nid, map[i].pid); + + ((ompi_pml_portals_proc_t*) procs[i]->proc_pml)->proc_id = map[i]; + } + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/pml/portals/pml_portals_compat_utcp.c b/ompi/mca/pml/portals/pml_portals_compat_utcp.c new file mode 100644 index 0000000000..e073474b41 --- /dev/null +++ b/ompi/mca/pml/portals/pml_portals_compat_utcp.c @@ -0,0 +1,263 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#include "ompi_config.h" + +#include +#include +#include +#include +#include + +#include "ompi/constants.h" +#include "opal/util/output.h" +#include "ompi/proc/proc.h" +#include "ompi/mca/pml/base/pml_base_module_exchange.h" + +#include "pml_portals.h" +#include "pml_portals_compat.h" + +#include + +/* how's this for source code diving? - find private method for + getting interface */ +extern int p3tcp_my_nid(const char *if_str, unsigned int *nid); + +static bool use_modex = true; + +int +ompi_pml_portals_init_compat(void) +{ + ptl_process_id_t info; + int ret, max_interfaces; + + /* if the environment variables for the utcp implementation are + already set, assume the user is running without the full Open + RTE and is doing RTE testing for a more tightly-coupled + platform (like, say, Red Storm). Otherwise, be nice and use + the modex to setup everything for the user */ + if (NULL == getenv("PTL_MY_RID")) { + use_modex = true; + } else { + use_modex = false; + } + + if (use_modex) { + unsigned int nid; + + p3tcp_my_nid(ompi_pml_portals.portals_ifname, &nid); + + /* post our contact info in the registry */ + info.nid = htonl(nid); + info.pid = htonl((ptl_pid_t) getpid()); + opal_output_verbose(100, ompi_pml_portals.portals_output, + "contact info: %u, %u", ntohl(info.nid), + ntohl(info.pid)); + + ret = mca_pml_base_modex_send(&mca_pml_portals_component.pmlm_version, + &info, sizeof(ptl_process_id_t)); + if (OMPI_SUCCESS != ret) { + opal_output_verbose(10, ompi_pml_portals.portals_output, + "mca_pml_base_modex_send failed: %d", ret); + return ret; + } + } else { + /* + * Initialize Portals interface + */ + ret = PtlInit(&max_interfaces); + if (PTL_OK != ret) { + opal_output_verbose(10, ompi_pml_portals.portals_output, + "PtlInit failed, returning %d\n", ret); + return OMPI_ERR_FATAL; + } + + /* tell the UTCP runtime code to read the env variables */ + PtlSetRank(PTL_INVALID_HANDLE, -1, -1); + + /* + * Initialize a network device + */ + ret = PtlNIInit(PTL_IFACE_DEFAULT, /* interface to initialize */ + PTL_PID_ANY, /* let library assign our pid */ + NULL, /* no desired limits */ + NULL, /* no need to have limits around */ + &ompi_pml_portals.portals_ni_h /* our interface handle */ + ); + if (PTL_OK != ret) { + opal_output_verbose(10, ompi_pml_portals.portals_output, + "PtlNIInit failed, returning %d\n", ret); + return OMPI_ERR_FATAL; + } + } + + return OMPI_SUCCESS; +} + + +int +ompi_pml_portals_add_procs_compat(struct ompi_proc_t **procs, size_t nprocs) +{ + int ret; + + if (use_modex) { + int my_rid; + ptl_process_id_t *info; + char *nidmap = NULL; + char *pidmap = NULL; + char *nid_str; + char *pid_str; + const size_t map_size = nprocs * 12 + 1; /* 12 is max length of long in decimal */ + size_t size, i; + char *tmp; + ompi_proc_t* proc_self = ompi_proc_local(); + int max_interfaces; + + /* + * Do all the NID/PID map setup + */ + /* each nid is a int, so need 10 there, plus the : */ + nidmap = malloc(map_size); + pidmap = malloc(map_size); + nid_str = malloc(12 + 1); + pid_str = malloc(12 + 1); + if (NULL == nidmap || NULL == pidmap || + NULL == nid_str || NULL == pid_str) + return OMPI_ERROR; + + for (i = 0 ; i < nprocs ; ++i) { + if (proc_self == procs[i]) my_rid = i; + + ret = mca_pml_base_modex_recv(&mca_pml_portals_component.pmlm_version, + procs[i], (void**) &info, &size); + if (OMPI_SUCCESS != ret) { + opal_output_verbose(10, ompi_pml_portals.portals_output, + "mca_pml_base_modex_recv failed: %d", ret); + return ret; + } else if (sizeof(ptl_process_id_t) != size) { + opal_output_verbose(10, ompi_pml_portals.portals_output, + "mca_pml_base_modex_recv returned size %d, expected %d", + size, sizeof(ptl_process_id_t)); + return OMPI_ERROR; + } + + if (i == 0) { + snprintf(nidmap, map_size, "%u", ntohl(info->nid)); + snprintf(pidmap, map_size, "%u", ntohl(info->pid)); + } else { + snprintf(nid_str, 12 + 1, ":%u", ntohl(info->nid)); + snprintf(pid_str, 12 + 1, ":%u", ntohl(info->pid)); + strncat(nidmap, nid_str, 12); + strncat(pidmap, pid_str, 12); + } + + /* update pml_proc information */ + ((ompi_pml_portals_proc_t*) procs[i]->proc_pml)->proc_id.nid = + ntohl(info->nid); + ((ompi_pml_portals_proc_t*) procs[i]->proc_pml)->proc_id.pid = + ntohl(info->pid); + + free(info); + } + + opal_output_verbose(100, ompi_pml_portals.portals_output, + "my rid: %u", my_rid); + opal_output_verbose(100, ompi_pml_portals.portals_output, + "nid map: %s", nidmap); + opal_output_verbose(100, ompi_pml_portals.portals_output, + "pid map: %s", pidmap); + opal_output_verbose(100, ompi_pml_portals.portals_output, + "iface: %s", + ompi_pml_portals.portals_ifname); + + asprintf(&tmp, "PTL_MY_RID=%u", my_rid); + putenv(tmp); + asprintf(&tmp, "PTL_NIDMAP=%s", nidmap); + putenv(tmp); + asprintf(&tmp, "PTL_PIDMAP=%s", pidmap); + putenv(tmp); + asprintf(&tmp, "PTL_IFACE=%s", ompi_pml_portals.portals_ifname); + putenv(tmp); + + free(pidmap); + free(nidmap); + free(pid_str); + free(nid_str); + + /* + * Initialize Portals + */ + ret = PtlInit(&max_interfaces); + if (PTL_OK != ret) { + opal_output_verbose(10, ompi_pml_portals.portals_output, + "PtlInit failed, returning %d\n", ret); + return OMPI_ERR_FATAL; + } + + /* tell the UTCP runtime code to read the env variables */ + PtlSetRank(PTL_INVALID_HANDLE, -1, -1); + + ret = PtlNIInit(PTL_IFACE_DEFAULT, /* interface to initialize */ + PTL_PID_ANY, /* let library assign our pid */ + NULL, /* no desired limits */ + NULL, /* save our limits somewhere */ + &(ompi_pml_portals.portals_ni_h) /* our interface handle */ + ); + if (PTL_OK != ret) { + opal_output_verbose(10, ompi_pml_portals.portals_output, + "PtlNIInit failed, returning %d\n", ret); + return OMPI_ERR_FATAL; + } + } else { /* use_modex */ + unsigned int nptl_procs, rank, i; + + /* + */ + ret = PtlGetRank(ompi_pml_portals.portals_ni_h, &rank, &nptl_procs); + if (ret != PTL_OK) { + opal_output_verbose(10, ompi_pml_portals.portals_output, + "PtlGetRank() returned %d", ret); + return OMPI_ERR_FATAL; + } else if (nptl_procs != nprocs) { + opal_output_verbose(10, ompi_pml_portals.portals_output, + "nptl_procs != nprocs (%d, %d)", nptl_procs, + nprocs); + return OMPI_ERR_FATAL; + } + + /* fill in all the proc info structs */ + for (i = 0 ; i < nprocs ; ++i) { + ret = PtlGetRankId(ompi_pml_portals.portals_ni_h, i, + &(((ompi_pml_portals_proc_t*) procs[i]->proc_pml)->proc_id)); + if (PTL_OK != ret) { + opal_output_verbose(10, + ompi_pml_portals.portals_output, + "PtlGetRankId(%d) failed: %d\n", i, ret); + return OMPI_ERR_FATAL; + } + } + } + +#if 0 + PtlNIDebug(mca_pml_portals_module.portals_ni_h, PTL_DBG_ALL); +#endif + + return OMPI_SUCCESS; +} + diff --git a/ompi/mca/pml/portals/pml_portals_component.c b/ompi/mca/pml/portals/pml_portals_component.c index 952a1996c0..7b13ee6250 100644 --- a/ompi/mca/pml/portals/pml_portals_component.c +++ b/ompi/mca/pml/portals/pml_portals_component.c @@ -22,11 +22,11 @@ #include "pml_portals.h" #include "opal/mca/base/mca_base_param.h" -static int mca_pml_portals_component_open(void); -static int mca_pml_portals_component_close(void); -static mca_pml_base_module_t* mca_pml_portals_component_init( int* priority, +static int ompi_pml_portals_component_open(void); +static int ompi_pml_portals_component_close(void); +static mca_pml_base_module_t* ompi_pml_portals_component_init( int* priority, bool enable_progress_threads, bool enable_mpi_threads); -static int mca_pml_portals_component_fini(void); +static int ompi_pml_portals_component_fini(void); mca_pml_base_component_1_0_0_t mca_pml_portals_component = { @@ -43,8 +43,8 @@ mca_pml_base_component_1_0_0_t mca_pml_portals_component = { OMPI_MAJOR_VERSION, /* MCA component major version */ OMPI_MINOR_VERSION, /* MCA component minor version */ OMPI_RELEASE_VERSION, /* MCA component release version */ - mca_pml_portals_component_open, /* component open */ - mca_pml_portals_component_close /* component close */ + ompi_pml_portals_component_open, /* component open */ + ompi_pml_portals_component_close /* component close */ }, /* Next the MCA v1.0.0 component meta data */ @@ -54,42 +54,119 @@ mca_pml_base_component_1_0_0_t mca_pml_portals_component = { false }, - mca_pml_portals_component_init, /* component init */ - mca_pml_portals_component_fini /* component finalize */ + ompi_pml_portals_component_init, /* component init */ + ompi_pml_portals_component_fini /* component finalize */ }; +static opal_output_stream_t portals_output_stream; -static int mca_pml_portals_component_open(void) +static int +ompi_pml_portals_component_open(void) { + /* start up debugging output */ + OBJ_CONSTRUCT(&portals_output_stream, opal_output_stream_t); + portals_output_stream.lds_is_debugging = true; + portals_output_stream.lds_want_stdout = true; + portals_output_stream.lds_file_suffix = "pml-portals"; + mca_base_param_reg_int(&mca_pml_portals_component.pmlm_version, + "debug_level", + "Debugging verbosity (0 - 100)", + false, + false, + 1000, + &(portals_output_stream.lds_verbose_level)); +#if OMPI_BTL_PORTALS_REDSTORM + asprintf(&(portals_output_stream.lds_prefix), + "pml: portals (%2d): ", cnos_get_rank()); +#else + asprintf(&(portals_output_stream.lds_prefix), + "pml: portals (%5d): ", getpid()); +#endif + ompi_pml_portals.portals_output = + opal_output_open(&portals_output_stream); + + /* utcp configuration */ +#if OMPI_BTL_PORTALS_UTCP + mca_base_param_reg_string(&mca_pml_portals_component.pmlm_version, + "ifname", + "Interface name to use for communication", + false, + false, + "eth0", + &(ompi_pml_portals.portals_ifname)); +#endif + + /* eager limit */ + mca_base_param_reg_int(&mca_pml_portals_component.pmlm_version, + "eager_limit", + "short message eager send limit", + false, + false, + 8192, + &(ompi_pml_portals.portals_eager_limit)); + return OMPI_SUCCESS; } -static int mca_pml_portals_component_close(void) + +static int +ompi_pml_portals_component_close(void) { +#if OMPI_BTL_PORTALS_UTCP + if (NULL != ompi_pml_portals.portals_ifname) { + free(ompi_pml_portals.portals_ifname); + } +#endif + + if (NULL != portals_output_stream.lds_prefix) { + free(portals_output_stream.lds_prefix); + } + + /* close debugging stream */ + opal_output_close(ompi_pml_portals.portals_output); + ompi_pml_portals.portals_output = -1; + return OMPI_SUCCESS; } + static mca_pml_base_module_t* -mca_pml_portals_component_init( int* priority, +ompi_pml_portals_component_init(int* priority, bool enable_progress_threads, bool enable_mpi_threads) { - mca_base_param_reg_int(&mca_pml_portals_component.pmlm_version, - "priority", - "Component priority", - false, - false, - 0, - priority); + *priority = 10; /* we don't run with no stinkin' threads */ if (enable_progress_threads || enable_mpi_threads) return NULL; - return &mca_pml_portals.super; + /* initialize our interface */ + if (OMPI_SUCCESS != ompi_pml_portals_init_compat()) { + opal_output_verbose(20, ompi_pml_portals.portals_output, + "disabled because compatibility init failed"); + return NULL; + } + + OBJ_CONSTRUCT(&(ompi_pml_portals.portals_unexpected_events), + opal_list_t); + + return &ompi_pml_portals.super; } -static int mca_pml_portals_component_fini(void) + +static int +ompi_pml_portals_component_fini(void) { + PtlEQFree(ompi_pml_portals.portals_nonblocking_queue); + PtlEQFree(ompi_pml_portals.portals_unexpected_receive_queue); + PtlEQFree(ompi_pml_portals.portals_blocking_receive_queue); + PtlEQFree(ompi_pml_portals.portals_blocking_send_queue); + + PtlNIFini(ompi_pml_portals.portals_ni_h); + + opal_output_verbose(20, ompi_pml_portals.portals_output, + "successfully finalized portals pml"); + return OMPI_SUCCESS; } diff --git a/ompi/mca/pml/portals/pml_portals_datatype.h b/ompi/mca/pml/portals/pml_portals_datatype.h new file mode 100644 index 0000000000..28aa75cd37 --- /dev/null +++ b/ompi/mca/pml/portals/pml_portals_datatype.h @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PML_PORTALS_DATATYPE_H +#define PML_PORTALS_DATATYPE_H + + +static inline int +ompi_pml_portals_prepare_md_send(ompi_convertor_t *convertor, + ptl_md_t *md, + int *free_after) +{ + struct iovec iov; + uint32_t iov_count = 1; + int32_t conv_free_after; + size_t bufsize; + + ompi_convertor_get_packed_size(convertor, &bufsize); + iov.iov_len = bufsize; + if (0 == ompi_convertor_need_buffers(convertor)) { + iov.iov_base = NULL; + *free_after = 0; + } else { + /* BWB - try to use the iovec option here */ + + iov.iov_base = malloc(bufsize); + if (NULL == iov.iov_base) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + *free_after = 1; + } + + ompi_convertor_pack(convertor, &iov, &iov_count, &bufsize, + &conv_free_after); + + md->start = iov.iov_base; + md->length = iov.iov_len; + md->options = 0; + + return OMPI_SUCCESS; +} + + +static inline int +ompi_pml_portals_free_md_send(ptl_md_t *md, int free_after) +{ + if (free_after) { + free(md->start); + } + + return OMPI_SUCCESS; +} + + +static inline int +ompi_pml_portals_prepare_md_recv(ompi_convertor_t *convertor, + ptl_md_t *md, + int *free_after) +{ + struct iovec iov; + size_t bufsize; + long lb; + + ompi_convertor_get_packed_size(convertor, &bufsize); + iov.iov_len = bufsize; + if (0 == ompi_convertor_need_buffers(convertor)) { + ompi_ddt_type_lb(convertor->pDesc, &lb); + iov.iov_base = convertor->pBaseBuf + lb + convertor->bConverted; + *free_after = 0; + } else { + /* BWB - try to use the iovec option here */ + + iov.iov_base = malloc(bufsize); + if (NULL == iov.iov_base) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + *free_after = 1; + } + + md->start = iov.iov_base; + md->length = iov.iov_len; + md->options = 0; + + return OMPI_SUCCESS; +} + +static inline int +ompi_pml_portals_free_md_recv(ompi_convertor_t *convertor, + ptl_md_t *md, + int free_after) +{ + uint32_t iov_count = 1; + size_t max_data; + struct iovec iov; + + iov.iov_len = md->length; + iov.iov_base = md->start; + + if (free_after) { + /* need to unpack into user buffer */ + ompi_convertor_unpack(convertor, &iov, &iov_count, + &max_data, &free_after); + free(md->start); + } + + return OMPI_SUCCESS; +} + +#endif diff --git a/ompi/mca/pml/portals/pml_portals_probe.c b/ompi/mca/pml/portals/pml_portals_probe.c index 0b9cd362cb..f67cfdf670 100644 --- a/ompi/mca/pml/portals/pml_portals_probe.c +++ b/ompi/mca/pml/portals/pml_portals_probe.c @@ -20,16 +20,19 @@ #include "ompi/request/request.h" #include "pml_portals.h" -int mca_pml_portals_iprobe( int src, int tag, +int +ompi_pml_portals_iprobe(int src, int tag, struct ompi_communicator_t *comm, - int *matched, ompi_status_public_t * status ) + int *matched, ompi_status_public_t * status) { - return OMPI_SUCCESS; + return OMPI_ERR_NOT_IMPLEMENTED; } -int mca_pml_portals_probe( int src, int tag, + +int +ompi_pml_portals_probe(int src, int tag, struct ompi_communicator_t *comm, - ompi_status_public_t * status ) + ompi_status_public_t * status) { - return OMPI_SUCCESS; + return OMPI_ERR_NOT_IMPLEMENTED; } diff --git a/ompi/mca/pml/portals/pml_portals_progress.c b/ompi/mca/pml/portals/pml_portals_progress.c index 1cce221d9b..b008a37045 100644 --- a/ompi/mca/pml/portals/pml_portals_progress.c +++ b/ompi/mca/pml/portals/pml_portals_progress.c @@ -19,7 +19,9 @@ #include "ompi_config.h" #include "pml_portals.h" -int mca_pml_portals_progress(void) + +int +ompi_pml_portals_progress(void) { return 0; } diff --git a/ompi/mca/pml/portals/pml_portals_recv.c b/ompi/mca/pml/portals/pml_portals_recv.c index 808989e7a3..3d18d9f086 100644 --- a/ompi/mca/pml/portals/pml_portals_recv.c +++ b/ompi/mca/pml/portals/pml_portals_recv.c @@ -16,41 +16,186 @@ * $HEADER$ */ + #include "ompi_config.h" #include "pml_portals.h" #include "ompi/request/request.h" +#include "ompi/datatype/datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/datatype/convertor.h" +#include "pml_portals_datatype.h" -int mca_pml_portals_irecv_init( void *addr, - size_t count, - ompi_datatype_t * datatype, - int src, - int tag, - struct ompi_communicator_t *comm, - struct ompi_request_t **request ) + +struct pml_portals_recv_info_t { + opal_list_item_t super; + ptl_event_t ev; +}; +typedef struct pml_portals_recv_info_t pml_portals_recv_info_t; +OBJ_CLASS_INSTANCE(pml_portals_recv_info_t, opal_list_item_t, NULL, NULL); + + +static int +get_data(ptl_event_t ev, ptl_md_t md, ompi_convertor_t *convertor) { + ptl_handle_md_t md_h; + + /* create the floating md */ + md.threshold = 1; + md.options = PTL_MD_EVENT_START_DISABLE; + md.eq_handle = ompi_pml_portals.portals_blocking_receive_queue; + + PtlMDBind(ompi_pml_portals.portals_ni_h, md, + PTL_UNLINK, &md_h); + + PtlGet(md_h, ev.initiator, PML_PTLS_INDEX_READ, + 0, ev.hdr_data, 0); + + PtlEQWait(ompi_pml_portals.portals_blocking_receive_queue, &ev); + assert(ev.type == PTL_EVENT_GET_END); + return OMPI_SUCCESS; } -int mca_pml_portals_irecv( void *addr, +int +ompi_pml_portals_irecv_init(void *addr, + size_t count, + ompi_datatype_t * datatype, + int src, + int tag, + struct ompi_communicator_t *comm, + struct ompi_request_t **request) +{ + return OMPI_ERR_NOT_IMPLEMENTED; +} + + +int +ompi_pml_portals_irecv(void *addr, size_t count, ompi_datatype_t * datatype, int src, int tag, struct ompi_communicator_t *comm, - struct ompi_request_t **request ) + struct ompi_request_t **request) { - return OMPI_SUCCESS; + return OMPI_ERR_NOT_IMPLEMENTED; } -int mca_pml_portals_recv( void *addr, +int +ompi_pml_portals_recv(void *buf, size_t count, ompi_datatype_t * datatype, int src, int tag, struct ompi_communicator_t *comm, - ompi_status_public_t * status ) + ompi_status_public_t * status) { + ompi_convertor_t convertor; + uint64_t ignore_bits, match_bits; + opal_list_item_t *list_item; + int ret, free_after; + ptl_md_t md; + ptl_handle_md_t md_h; + ptl_handle_me_t me_h; + ptl_process_id_t portals_proc; + ompi_pml_portals_proc_t *pml_portals_proc = + (ompi_pml_portals_proc_t*) comm->c_pml_procs[src]; + ptl_event_t ev; + + OBJ_CONSTRUCT(&convertor, ompi_convertor_t); + /* BWB - fix me - need some way of finding source in ANY_SOURCE case */ + ompi_convertor_copy_and_prepare_for_send(comm->c_pml_procs[comm->c_my_rank]->proc_ompi->proc_convertor, + datatype, + count, + buf, + 0, + &convertor); + + if (MPI_ANY_SOURCE == src) { + portals_proc.nid = PTL_NID_ANY; + portals_proc.pid = PTL_PID_ANY; + } else { + portals_proc = pml_portals_proc->proc_id; + } + ompi_pml_portals_prepare_md_recv(&convertor, &md, &free_after); + + PML_PTLS_RECV_BITS(match_bits, ignore_bits, comm->c_contextid, src, tag); + + /* first, make sure it's not in the queue of processed unexpected msgs */ + list_item = opal_list_get_first(&(ompi_pml_portals.portals_unexpected_events)); + while (list_item != opal_list_get_end(&(ompi_pml_portals.portals_unexpected_events))) { + opal_list_item_t *next = opal_list_get_next(list_item); + pml_portals_recv_info_t * info = (pml_portals_recv_info_t*) list_item; + + if ((info->ev.match_bits & ~ignore_bits) == match_bits) { + /* we have a match... */ + get_data(ev, md, &convertor); + opal_list_remove_item(&(ompi_pml_portals.portals_unexpected_events), + list_item); + OBJ_RELEASE(list_item); + goto cleanup; + } + list_item = next; + } + + restart_search: + /* now check the unexpected event queue */ + while (true) { + int ret = PtlEQGet(ompi_pml_portals.portals_unexpected_receive_queue, + &ev); + if (PTL_OK == ret) { + if ((ev.match_bits & ~ignore_bits) == match_bits) { + /* we have a match... */ + get_data(ev, md, &convertor); + goto cleanup; + } else { + pml_portals_recv_info_t *item = OBJ_NEW(pml_portals_recv_info_t); + item->ev = ev; + opal_list_append(&(ompi_pml_portals.portals_unexpected_events), + &(item->super)); + } + } else if (PTL_EQ_EMPTY == ret) { + break; + } + + } + + /* now post a receive */ + printf("receive from: %d, %d\n", portals_proc.nid, portals_proc.pid); + printf("receive match bits: %lx, %lx\n", match_bits, ignore_bits); + PtlMEInsert(ompi_pml_portals.portals_unexpected_me_h, + portals_proc, + match_bits, + ignore_bits, + PTL_UNLINK, + PTL_INS_BEFORE, + &me_h); + + md.threshold = 1; + md.options |= PTL_MD_EVENT_START_DISABLE; + md.eq_handle = ompi_pml_portals.portals_blocking_receive_queue; + PtlMDAttach(me_h, md, PTL_UNLINK, &md_h); + + /* now try to make active */ + md.options |= PTL_MD_OP_PUT; + ret = PtlMDUpdate(md_h, NULL, &md, ompi_pml_portals.portals_unexpected_receive_queue); + if (ret == PTL_MD_NO_UPDATE) { + /* a message has arrived since we searched - look again */ + PtlMDUnlink(md_h); + if (free_after) { free(md.start); } + goto restart_search; + } + + /* wait for our completion event */ + PtlEQWait(ompi_pml_portals.portals_blocking_receive_queue, &ev); + assert(ev.type == PTL_EVENT_PUT_END); + + cleanup: + ompi_pml_portals_free_md_recv(&convertor, &md, free_after); + + OBJ_DESTRUCT(&convertor); + return OMPI_SUCCESS; } diff --git a/ompi/mca/pml/portals/pml_portals_send.c b/ompi/mca/pml/portals/pml_portals_send.c index 0ea2060046..cdf440c3c4 100644 --- a/ompi/mca/pml/portals/pml_portals_send.c +++ b/ompi/mca/pml/portals/pml_portals_send.c @@ -20,40 +20,115 @@ #include "pml_portals.h" #include "ompi/datatype/datatype.h" #include "ompi/communicator/communicator.h" +#include "ompi/datatype/convertor.h" +#include "pml_portals_datatype.h" -int mca_pml_portals_isend_init( void* buf, +int +ompi_pml_portals_isend_init(void* buf, size_t count, ompi_datatype_t* datatype, int dst, int tag, mca_pml_base_send_mode_t sendmode, ompi_communicator_t* comm, - ompi_request_t** request ) + ompi_request_t** request) { - return OMPI_SUCCESS; + return OMPI_ERR_NOT_IMPLEMENTED; } -int mca_pml_portals_isend( void* buf, +int +ompi_pml_portals_isend(void* buf, size_t count, ompi_datatype_t* datatype, int dst, int tag, mca_pml_base_send_mode_t sendmode, ompi_communicator_t* comm, - ompi_request_t** request ) + ompi_request_t** request) { - return OMPI_SUCCESS; + return OMPI_ERR_NOT_IMPLEMENTED; } -int mca_pml_portals_send( void *buf, + +int +ompi_pml_portals_send(void *buf, size_t count, ompi_datatype_t* datatype, int dst, int tag, mca_pml_base_send_mode_t sendmode, - ompi_communicator_t* comm ) + ompi_communicator_t* comm) { + ompi_convertor_t convertor; + int ret, free_after; + static int msg_count = 0; + uint64_t match_bits; + + ptl_md_t md; + ptl_handle_me_t me_h; + ptl_handle_md_t md_h; + ptl_event_t ev; + ompi_pml_portals_proc_t *portals_proc = + (ompi_pml_portals_proc_t*) comm->c_pml_procs[dst]; + + + if (MCA_PML_BASE_SEND_SYNCHRONOUS == sendmode) abort(); + + OBJ_CONSTRUCT(&convertor, ompi_convertor_t); + ompi_convertor_copy_and_prepare_for_send(comm->c_pml_procs[dst]->proc_ompi->proc_convertor, + datatype, + count, + buf, + 0, + &convertor); + + PtlMEAttach(ompi_pml_portals.portals_ni_h, + PML_PTLS_INDEX_READ, + portals_proc->proc_id, + msg_count, + 0, + PTL_UNLINK, + PTL_INS_AFTER, + &me_h); + + ompi_pml_portals_prepare_md_send(&convertor, &md, &free_after); + md.threshold = 2; + md.options |= (PTL_MD_OP_GET | PTL_MD_EVENT_START_DISABLE); + md.eq_handle = ompi_pml_portals.portals_blocking_send_queue; + + PtlMDAttach(me_h, md, PTL_UNLINK, &md_h); + + PML_PTLS_SEND_BITS(match_bits, comm->c_contextid, comm->c_my_rank, tag); + + printf("send to: %d, %d\n", portals_proc->proc_id.nid, portals_proc->proc_id.pid); + printf("send match bits: %lx\n", match_bits); + PtlPut(md_h, PTL_ACK_REQ, portals_proc->proc_id, + PML_PTLS_INDEX_RECV, 0, + match_bits, 0, msg_count); + + msg_count++; + + /* our send end event */ + ret = PtlEQWait(ompi_pml_portals.portals_blocking_send_queue, &ev); + assert(ret == PTL_OK); + assert(ev.type == PTL_EVENT_SEND_END); + + /* our ack / get event */ + ret = PtlEQWait(ompi_pml_portals.portals_blocking_send_queue, &ev); + assert(ret == PTL_OK); +#if OMPI_PML_PORTALS_HAVE_EVENT_UNLINK + if (ev.type == PTL_EVENT_UNLINK) { + ret = PtlEQWait(ompi_pml_portals.portals_blocking_send_queue, &ev); + assert(ret == PTL_OK); + } +#endif + assert((ev.type == PTL_EVENT_ACK) || (ev.type == PTL_EVENT_GET_END)); + + ompi_pml_portals_free_md_send(&md, free_after); + + OBJ_DESTRUCT(&convertor); + return OMPI_SUCCESS; } diff --git a/ompi/mca/pml/portals/pml_portals_start.c b/ompi/mca/pml/portals/pml_portals_start.c index 2f6d1a3be0..0383483dc3 100644 --- a/ompi/mca/pml/portals/pml_portals_start.c +++ b/ompi/mca/pml/portals/pml_portals_start.c @@ -19,7 +19,9 @@ #include "ompi_config.h" #include "pml_portals.h" -int mca_pml_portals_start(size_t count, ompi_request_t** requests) + +int +ompi_pml_portals_start(size_t count, ompi_request_t** requests) { - return OMPI_SUCCESS; + return OMPI_ERR_NOT_IMPLEMENTED; }