diff --git a/src/mca/ptl/portals/configure.stub b/src/mca/ptl/portals/configure.stub index 2e0ceae099..55a96bd263 100644 --- a/src/mca/ptl/portals/configure.stub +++ b/src/mca/ptl/portals/configure.stub @@ -168,6 +168,10 @@ AC_DEFUN([MCA_CONFIGURE_STUB],[ [PTL_PORTALS_DEFAULT_FIRST_FRAG_ENTRY_SIZE], [1048576], [Default size of memory associeted with first fag md]) + MCA_PTL_PORTALS_CONFIG_VAL([first-frag-queue-size], + [PTL_PORTALS_DEFAULT_FIRST_FRAG_QUEUE_SIZE], [512], + [Default size of event queue for first frag mds]) + MCA_PTL_PORTALS_CONFIG_VAL([rndv-frag-min-size], [PTL_PORTALS_DEFAULT_RNDV_FRAG_MIN_SIZE], [0], [Default minimum size of rndv fragments]) diff --git a/src/mca/ptl/portals/ptl_portals.c b/src/mca/ptl/portals/ptl_portals.c index 9a896b158a..bb17b0427f 100644 --- a/src/mca/ptl/portals/ptl_portals.c +++ b/src/mca/ptl/portals/ptl_portals.c @@ -24,6 +24,8 @@ #include "include/constants.h" #include "util/output.h" +#include "mca/pml/pml.h" +#include "mca/pml/base/pml_base_sendreq.h" #include "ptl_portals.h" #include "ptl_portals_compat.h" @@ -144,6 +146,14 @@ mca_ptl_portals_module_enable(struct mca_ptl_portals_module_t *ptl, ptl->first_frag_queue_size, PTL_EQ_HANDLER_NONE, &(ptl->frag_receive_eq_handle)); + if (ret != PTL_OK) { + ompi_output(mca_ptl_portals_component.portals_output, + "Failed to allocate event queue: %d", ret); + return OMPI_ERROR; + } + ompi_output_verbose(100, mca_ptl_portals_component.portals_output, + "allocated event queue: %d", + ptl->frag_receive_eq_handle); for (i = 0 ; i < ptl->first_frag_num_entries ; ++i) { ret = ptl_portals_new_frag_entry(ptl); @@ -166,7 +176,6 @@ ptl_portals_new_frag_entry(struct mca_ptl_portals_module_t *ptl) int ret; ptl_process_id_t proc = { PTL_NID_ANY, PTL_PID_ANY }; - /* create match entry */ ret = PtlMEAttach(ptl->ni_handle, PTL_PORTALS_FRAG_TABLE_ID, @@ -192,7 +201,7 @@ ptl_portals_new_frag_entry(struct mca_ptl_portals_module_t *ptl) md.max_size = md.length - ptl->super.ptl_first_frag_size; md.options = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE; md.user_ptr = NULL; - md.eventq = ptl->frag_receive_eq_handle; + md.eq_handle = ptl->frag_receive_eq_handle; ret = PtlMDAttach(me_handle, md, @@ -204,12 +213,92 @@ ptl_portals_new_frag_entry(struct mca_ptl_portals_module_t *ptl) } ompi_output_verbose(50, mca_ptl_portals_component.portals_output, - "new fragment added"); + "new receive buffer posted"); return OMPI_SUCCESS; } +int +mca_ptl_portals_send(struct mca_ptl_base_module_t *ptl_base, + struct mca_ptl_base_peer_t *ptl_peer, + struct mca_pml_base_send_request_t *sendreq, + size_t offset, size_t size, int flags) +{ + mca_ptl_portals_module_t* ptl = (mca_ptl_portals_module_t*) ptl_base; + ptl_process_id_t *peer_id = (ptl_process_id_t*) ptl_peer; + mca_ptl_portals_send_frag_t* sendfrag; + mca_ptl_base_header_t* hdr; + int ret; + ptl_md_t md; + + ompi_output_verbose(100, mca_ptl_portals_component.portals_output, + "mca_ptl_portals_send to %lu, %lu", + peer_id->nid, peer_id->pid); + + if (sendreq->req_cached) { + sendfrag = (mca_ptl_portals_send_frag_t*)(sendreq+1); + } else { + ompi_output(mca_ptl_portals_component.portals_output, + "request not cached - not implemented."); + return OMPI_ERROR; + } + + /* initialize convertor */ + if (size > 0) { + ompi_output(mca_ptl_portals_component.portals_output, + "request size > 0, not implemented"); + return OMPI_ERROR; + } else { + sendfrag->frag_send.frag_base.frag_addr = NULL; + sendfrag->frag_send.frag_base.frag_size = 0; + } + + /* setup message header */ + hdr = &sendfrag->frag_send.frag_base.frag_header; + if(offset == 0) { + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; + hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; + hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer; + hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag; + hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed; + hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence; + } else { + ompi_output(mca_ptl_portals_component.portals_output, + "offset > 0, not implemented"); + return OMPI_ERROR; + } + + /* fragment state */ +#if 0 + sendfrag->frag_send.frag_base.frag_owner = &ptl_peer->peer_ptl->super; +#endif + sendfrag->frag_send.frag_request = sendreq; +#if 0 + sendfrag->frag_send.frag_base.frag_peer = ptl_peer; +#endif + + + /* must update the offset after actual fragment size is determined + * before attempting to send the fragment + */ + mca_pml_base_send_request_offset(sendreq, + sendfrag->frag_send.frag_base.frag_size); +#if 0 + md.start = mem; + md.length = ptl->first_frag_entry_size; + md.threshold = PTL_MD_THRESH_INF; + md.max_size = md.length - ptl->super.ptl_first_frag_size; + md.options = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE; + md.user_ptr = NULL; + md.eq_handle = ptl->frag_receive_eq_handle; +#endif + + return OMPI_ERROR; +} + + int mca_ptl_portals_finalize(struct mca_ptl_base_module_t *ptl_base) { diff --git a/src/mca/ptl/portals/ptl_portals_compat_utcp.c b/src/mca/ptl/portals/ptl_portals_compat_utcp.c index a15dc5dc46..e361404510 100644 --- a/src/mca/ptl/portals/ptl_portals_compat_utcp.c +++ b/src/mca/ptl/portals/ptl_portals_compat_utcp.c @@ -50,7 +50,8 @@ mca_ptl_portals_init(mca_ptl_portals_component_t *comp) info.nid = htonl(utcp_my_nid(mca_ptl_portals_component.portals_ifname)); info.pid = htonl((ptl_pid_t) getpid()); ompi_output_verbose(100, mca_ptl_portals_component.portals_output, - "contact info: %u, %u", info.nid, info.pid); + "contact info: %u, %u", ntohl(info.nid), + ntohl(info.pid)); ret = mca_base_modex_send(&mca_ptl_portals_component.super.ptlm_version, &info, sizeof(ptl_process_id_t)); @@ -144,8 +145,8 @@ mca_ptl_portals_add_procs_compat(struct mca_ptl_portals_module_t* ptl, } /* update my local array of proc structs */ - (*portals_procs)[i].nid = info->nid; - (*portals_procs)[i].pid = info->pid; + (*portals_procs)[i].nid = ntohl(info->nid); + (*portals_procs)[i].pid = ntohl(info->pid); free(info); } diff --git a/src/mca/ptl/portals/ptl_portals_component.c b/src/mca/ptl/portals/ptl_portals_component.c index ac02bad3a9..084cc3a8c0 100644 --- a/src/mca/ptl/portals/ptl_portals_component.c +++ b/src/mca/ptl/portals/ptl_portals_component.c @@ -145,6 +145,10 @@ mca_ptl_portals_component_open(void) mca_ptl_portals_module.first_frag_entry_size = mca_ptl_portals_param_register_int("first_frag_entry_size", PTL_PORTALS_DEFAULT_FIRST_FRAG_ENTRY_SIZE); + mca_ptl_portals_module.first_frag_queue_size = + mca_ptl_portals_param_register_int("first_frag_queue_size", + PTL_PORTALS_DEFAULT_FIRST_FRAG_QUEUE_SIZE); + /* finish with objects */ mca_ptl_portals_component.portals_output = @@ -256,11 +260,34 @@ int mca_ptl_portals_component_progress(mca_ptl_tstamp_t tstamp) { int num_progressed = 0; + size_t i; + int ret; - ompi_output_verbose(110, mca_ptl_portals_component.portals_output, - "mca_ptl_portals_component_progress(%ld)", tstamp); + for (i = 0 ; i < mca_ptl_portals_component.portals_num_modules ; ++i) { + struct mca_ptl_portals_module_t *module = + mca_ptl_portals_component.portals_modules[i]; + ptl_event_t my_event; - /* BWB - write me */ + if (! module->frag_queues_created) continue; + + ret = PtlEQGet(module->frag_receive_eq_handle, &my_event); + if (PTL_EQ_EMPTY == ret) { + continue; + } else if (!(PTL_OK == ret || PTL_EQ_DROPPED == ret)) { + ompi_output(mca_ptl_portals_component.portals_output, + "Error calling PtlEQGet: %d", ret); + continue; + } else if (PTL_EQ_DROPPED == ret) { + ompi_output_verbose(20, mca_ptl_portals_component.portals_output, + "Progress found dropped packets"); + } + + ompi_output_verbose(100, mca_ptl_portals_component.portals_output, + "my_event: %d, %d, %d, %d %d %d", + my_event.type, my_event.rlength, my_event.offset, + my_event.link, my_event.ni_fail_type, my_event.sequence); + num_progressed++; + } return num_progressed; } diff --git a/src/mca/ptl/portals/ptl_portals_sendfrag.h b/src/mca/ptl/portals/ptl_portals_sendfrag.h index 02f649077c..5805d5ea6f 100644 --- a/src/mca/ptl/portals/ptl_portals_sendfrag.h +++ b/src/mca/ptl/portals/ptl_portals_sendfrag.h @@ -27,9 +27,6 @@ extern "C" { struct mca_ptl_portals_send_frag_t { mca_ptl_base_send_frag_t frag_send; - ptl_md_t memory_descriptor; /* BWB - not sure if we need this */ - ptl_handle_eq_t eq_handle; - int status; }; typedef struct mca_ptl_portals_send_frag_t mca_ptl_portals_send_frag_t; diff --git a/src/mca/ptl/portals/ptl_portals_stubs.c b/src/mca/ptl/portals/ptl_portals_stubs.c index 48a62557de..0af8bb2789 100644 --- a/src/mca/ptl/portals/ptl_portals_stubs.c +++ b/src/mca/ptl/portals/ptl_portals_stubs.c @@ -35,7 +35,9 @@ int mca_ptl_portals_request_init(struct mca_ptl_base_module_t *ptl, struct mca_pml_base_send_request_t *req) { - return OMPI_ERROR; + ompi_output(mca_ptl_portals_component.portals_output, + "unimplemented function mca_ptl_request_init"); + return OMPI_SUCCESS; } @@ -43,6 +45,8 @@ void mca_ptl_portals_request_fini(struct mca_ptl_base_module_t *ptl, struct mca_pml_base_send_request_t *req) { + ompi_output(mca_ptl_portals_component.portals_output, + "unimplemented function mca_ptl_request_fini"); return; } @@ -50,23 +54,7 @@ void mca_ptl_portals_matched(struct mca_ptl_base_module_t *ptl, struct mca_ptl_base_recv_frag_t *frag) { + ompi_output(mca_ptl_portals_component.portals_output, + "unimplemented function mca_ptl_portals_matched"); return; } - -int -mca_ptl_portals_send(struct mca_ptl_base_module_t *ptl, - struct mca_ptl_base_peer_t *ptl_peer, - struct mca_pml_base_send_request_t *req, - size_t offset, size_t size, int flags) -{ - return OMPI_ERROR; -} - -int -mca_ptl_portals_send_continue(struct mca_ptl_base_module_t *ptl, - struct mca_ptl_base_peer_t *ptl_peer, - struct mca_pml_base_send_request_t *req, - size_t offset, size_t size, int flags) -{ - return OMPI_ERROR; -}