diff --git a/src/mca/base/mca_base_module_exchange.c b/src/mca/base/mca_base_module_exchange.c index 04efa426b6..7fcd65f360 100644 --- a/src/mca/base/mca_base_module_exchange.c +++ b/src/mca/base/mca_base_module_exchange.c @@ -236,7 +236,7 @@ int mca_base_modex_exchange(void) rc = mca_oob.oob_send( proc->proc_job, proc->proc_vpid, - MCA_OOB_ANY_TAG, + 0, self_module->module_data, self_module->module_data_size); if(rc != LAM_SUCCESS) { @@ -255,7 +255,7 @@ int mca_base_modex_exchange(void) for(i=0; iptlm_control(param,value,size); + if(rc != LAM_SUCCESS) + return rc; + } + return LAM_SUCCESS; +} + /* * For each proc setup a datastructure that indicates the PTLs * that can be used to reach the destination. diff --git a/src/mca/pml/teg/src/pml_teg.h b/src/mca/pml/teg/src/pml_teg.h index 76a388cc59..5541da407f 100644 --- a/src/mca/pml/teg/src/pml_teg.h +++ b/src/mca/pml/teg/src/pml_teg.h @@ -17,6 +17,7 @@ #include "mca/pml/base/pml_base_request.h" #include "mca/ptl/ptl.h" +#define MCA_PML_TEG_STATISTICS 1 /** * TEG PML Interface @@ -42,6 +43,12 @@ struct mca_pml_teg_t { /* free list of recv requests */ lam_free_list_t teg_recv_requests; +#if MCA_PML_TEG_STATISTICS + long teg_waits; + long teg_condition_waits; + long teg_condition_broadcasts; +#endif + /* request completion */ lam_mutex_t teg_request_lock; lam_condition_t teg_request_cond; @@ -99,6 +106,14 @@ extern int mca_pml_teg_add_ptls( lam_list_t *ptls ); +extern int mca_pml_teg_control( + int param, + void *size, + size_t value +); + +extern int mca_pml_teg_progress(void); + extern int mca_pml_teg_isend_init( void *buf, size_t count, diff --git a/src/mca/pml/teg/src/pml_teg_module.c b/src/mca/pml/teg/src/pml_teg_module.c index 7eee3875dd..ea3e8e2bca 100644 --- a/src/mca/pml/teg/src/pml_teg_module.c +++ b/src/mca/pml/teg/src/pml_teg_module.c @@ -71,6 +71,12 @@ int mca_pml_teg_module_open(void) teg_null->req_status.MPI_ERROR = LAM_SUCCESS; teg_null->req_status._count = 0; +#if MCA_PML_TEG_STATISTICS + mca_pml_teg.teg_waits = 0; + mca_pml_teg.teg_condition_waits = 0; + mca_pml_teg.teg_condition_broadcasts = 0; +#endif + mca_pml_teg.teg_free_list_num = mca_pml_teg_param_register_int("free_list_num", 256); mca_pml_teg.teg_free_list_max = @@ -85,6 +91,22 @@ int mca_pml_teg_module_open(void) int mca_pml_teg_module_close(void) { +#if MCA_PML_TEG_STATISTICS + lam_output(0, "mca_pml_teg.teg_waits = %d\n", + mca_pml_teg.teg_waits); + lam_output(0, "mca_pml_teg.teg_condition_waits = %d\n", + mca_pml_teg.teg_condition_waits); + lam_output(0, "mca_pml_teg.teg_condition_broadcast = %d\n", + mca_pml_teg.teg_condition_broadcasts); +#endif + + if (mca_pml_teg.teg_recv_requests.fl_num_allocated != + mca_pml_teg.teg_recv_requests.super.lam_list_length) { + lam_output(0, "teg recv requests: %d allocated %d returned\n", + mca_pml_teg.teg_recv_requests.fl_num_allocated, + mca_pml_teg.teg_recv_requests.super.lam_list_length); + } + if(NULL != mca_pml_teg.teg_ptl_modules) free(mca_pml_teg.teg_ptl_modules); if(NULL != mca_pml_teg.teg_ptls) diff --git a/src/mca/pml/teg/src/pml_teg_progress.c b/src/mca/pml/teg/src/pml_teg_progress.c index a1897e84fb..32a41bc817 100644 --- a/src/mca/pml/teg/src/pml_teg_progress.c +++ b/src/mca/pml/teg/src/pml_teg_progress.c @@ -4,8 +4,7 @@ int mca_pml_teg_progress(void) { -#if 0 - mca_ptl_base_tstamp_t tstamp; + mca_ptl_tstamp_t tstamp = 0; size_t i; /* @@ -13,7 +12,6 @@ int mca_pml_teg_progress(void) */ for(i=0; iptlm_progress(tstamp); -#endif return LAM_SUCCESS; } diff --git a/src/mca/pml/teg/src/pml_teg_recvreq.c b/src/mca/pml/teg/src/pml_teg_recvreq.c index 43ebcb5d8f..2426001765 100644 --- a/src/mca/pml/teg/src/pml_teg_recvreq.c +++ b/src/mca/pml/teg/src/pml_teg_recvreq.c @@ -15,10 +15,13 @@ void mca_pml_teg_recv_request_progress( req->super.req_status.MPI_TAG = req->super.req_tag; req->super.req_status.MPI_ERROR = LAM_SUCCESS; req->super.req_status._count = req->req_bytes_delivered; - req->super.req_mpi_done = true; req->super.req_pml_done = true; + req->super.req_mpi_done = true; if(mca_pml_teg.teg_request_waiting) { +#if MCA_PML_TEG_STATISTICS + mca_pml_teg.teg_condition_broadcasts++; lam_condition_broadcast(&mca_pml_teg.teg_request_cond); +#endif } } lam_mutex_unlock(&mca_pml_teg.teg_request_lock); diff --git a/src/mca/pml/teg/src/pml_teg_wait.c b/src/mca/pml/teg/src/pml_teg_wait.c index 658bbb6fe3..082cd64042 100644 --- a/src/mca/pml/teg/src/pml_teg_wait.c +++ b/src/mca/pml/teg/src/pml_teg_wait.c @@ -14,6 +14,10 @@ int mca_pml_teg_wait( int completed = -1; mca_pml_base_request_t* pml_request; +#if MCA_PML_TEG_STATISTICS + mca_pml_teg.teg_waits++; +#endif + #if LAM_HAVE_THREADS /* poll for completion */ for(c=0; completed < 0 && c < mca_pml_teg.teg_poll_iterations; c++) { @@ -41,8 +45,12 @@ int mca_pml_teg_wait( break; } } - if(completed < 0) + if(completed < 0) { +#if MCA_PML_TEG_STATISTICS + mca_pml_teg.teg_condition_waits++; +#endif lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); + } } while(completed < 0); mca_pml_teg.teg_request_waiting--; lam_mutex_unlock(&mca_pml_teg.teg_request_lock); diff --git a/src/mca/ptl/base/ptl_base_comm.c b/src/mca/ptl/base/ptl_base_comm.c index 2c105cdb98..cdddb3ea6e 100644 --- a/src/mca/ptl/base/ptl_base_comm.c +++ b/src/mca/ptl/base/ptl_base_comm.c @@ -42,16 +42,16 @@ int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size) size_t i; /* send message sequence-number support - sender side */ - comm->c_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size); + comm->c_msg_seq = malloc(sizeof(mca_ptl_sequence_t) * size); if(NULL == comm->c_msg_seq) return LAM_ERR_OUT_OF_RESOURCE; - memset(comm->c_msg_seq, 0, sizeof(mca_ptl_base_sequence_t) * size); + memset(comm->c_msg_seq, 0, sizeof(mca_ptl_sequence_t) * size); /* send message sequence-number support - receiver side */ - comm->c_next_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size); + comm->c_next_msg_seq = malloc(sizeof(mca_ptl_sequence_t) * size); if(NULL == comm->c_next_msg_seq) return LAM_ERR_OUT_OF_RESOURCE; - memset(comm->c_next_msg_seq, 0, sizeof(mca_ptl_base_sequence_t) * size); + memset(comm->c_next_msg_seq, 0, sizeof(mca_ptl_sequence_t) * size); /* unexpected fragments queues */ comm->c_unexpected_frags = malloc(sizeof(lam_list_t) * size); diff --git a/src/mca/ptl/base/ptl_base_comm.h b/src/mca/ptl/base/ptl_base_comm.h index d5109b9e4b..61d6faa973 100644 --- a/src/mca/ptl/base/ptl_base_comm.h +++ b/src/mca/ptl/base/ptl_base_comm.h @@ -20,9 +20,9 @@ extern lam_class_t mca_pml_ptl_comm_t_class; */ struct mca_pml_comm_t { lam_object_t super; - mca_ptl_base_sequence_t *c_msg_seq; /**< send message sequence number - sender side */ - mca_ptl_base_sequence_t *c_next_msg_seq; /**< send message sequence number - receiver side */ - mca_ptl_base_sequence_t c_recv_seq; /**< recv request sequence number - receiver side */ + mca_ptl_sequence_t *c_msg_seq; /**< send message sequence number - sender side */ + mca_ptl_sequence_t *c_next_msg_seq; /**< send message sequence number - receiver side */ + mca_ptl_sequence_t c_recv_seq; /**< recv request sequence number - receiver side */ lam_mutex_t c_matching_lock; /**< matching lock */ lam_list_t *c_unexpected_frags; /**< unexpected fragment queues */ lam_list_t *c_frags_cant_match; /**< out-of-order fragment queues */ @@ -50,9 +50,9 @@ extern int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size); * @return Next available sequence number. */ -static inline mca_ptl_base_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst) +static inline mca_ptl_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst) { - mca_ptl_base_sequence_t sequence; + mca_ptl_sequence_t sequence; lam_mutex_lock(&comm->c_matching_lock); sequence = comm->c_msg_seq[dst]++; lam_mutex_unlock(&comm->c_matching_lock); diff --git a/src/mca/ptl/base/ptl_base_header.h b/src/mca/ptl/base/ptl_base_header.h index 2ba8617a52..a02b721593 100644 --- a/src/mca/ptl/base/ptl_base_header.h +++ b/src/mca/ptl/base/ptl_base_header.h @@ -37,7 +37,7 @@ struct mca_ptl_base_frag_header_t { mca_ptl_base_common_header_t hdr_common; /**< common attributes */ uint32_t hdr_frag_length; /**< fragment length */ uint32_t hdr_frag_offset; /**< offset into message */ - mca_ptl_base_sequence_t hdr_frag_seq; /**< fragment sequence number */ + mca_ptl_sequence_t hdr_frag_seq; /**< fragment sequence number */ lam_ptr_t hdr_src_ptr; /**< pointer to source fragment */ lam_ptr_t hdr_dst_ptr; /**< pointer to matched receive */ }; @@ -55,7 +55,7 @@ struct mca_ptl_base_match_header_t { int32_t hdr_dst; /**< destination rank */ int32_t hdr_tag; /**< user tag */ uint32_t hdr_msg_length; /**< message length */ - mca_ptl_base_sequence_t hdr_msg_seq; /**< message sequence number */ + mca_ptl_sequence_t hdr_msg_seq; /**< message sequence number */ }; typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t; diff --git a/src/mca/ptl/base/ptl_base_match.c b/src/mca/ptl/base/ptl_base_match.c index 451153dcdc..4e6660cf06 100644 --- a/src/mca/ptl/base/ptl_base_match.c +++ b/src/mca/ptl/base/ptl_base_match.c @@ -79,7 +79,7 @@ bool mca_ptl_base_match( lam_list_t *additional_matches) { /* local variables */ - mca_ptl_base_sequence_t frag_msg_seq,next_msg_seq_expected; + mca_ptl_sequence_t frag_msg_seq,next_msg_seq_expected; lam_communicator_t *comm_ptr; mca_ptl_base_recv_request_t *matched_receive; mca_pml_ptl_comm_t *pml_comm; @@ -374,7 +374,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive { /* local variables */ mca_ptl_base_recv_request_t *specific_recv, *wild_recv, *return_match; - mca_ptl_base_sequence_t wild_recv_seq, specific_recv_seq; + mca_ptl_sequence_t wild_recv_seq, specific_recv_seq; int frag_src,frag_tag, wild_recv_tag, specific_recv_tag; /* initialization */ @@ -513,7 +513,7 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche { /* local parameters */ int match_found; - mca_ptl_base_sequence_t next_msg_seq_expected, frag_seq; + mca_ptl_sequence_t next_msg_seq_expected, frag_seq; mca_ptl_base_recv_frag_t *frag_desc; mca_ptl_base_recv_request_t *matched_receive; diff --git a/src/mca/ptl/ptl.h b/src/mca/ptl/ptl.h index feed5cc426..85209af2d4 100644 --- a/src/mca/ptl/ptl.h +++ b/src/mca/ptl/ptl.h @@ -144,9 +144,13 @@ struct mca_ptl_base_recv_frag_t; struct mca_ptl_base_send_frag_t; struct mca_ptl_base_match_header_t; -typedef uint64_t mca_ptl_base_sequence_t; -typedef uint64_t mca_ptl_base_tstamp_t; -typedef lam_list_t mca_ptl_base_queue_t; +typedef uint64_t mca_ptl_sequence_t; +typedef uint64_t mca_ptl_tstamp_t; +typedef lam_list_t mca_ptl_queue_t; + +typedef enum { + MCA_PTL_ENABLE +} mca_ptl_control_t; /* * PTL module interface functions and datatype. @@ -179,6 +183,33 @@ typedef struct mca_ptl_t** (*mca_ptl_base_module_init_fn_t)( bool *have_hidden_threads ); + +/** + * MCA->PTL Called to dynamically change a module parameter. + * + * @param flag (IN) Parameter to change. + * @param value (IN) Optional parameter value. + * + * @return LAM_SUCCESS or error code on failure. + */ +typedef int (*mca_ptl_base_module_control_fn_t)( + int param, + void* value, + size_t size +); + + +/** + * MCA->PTL Called to progress outstanding requests for + * non-threaded polling environments. + * + * @param tstamp Current time. + * @return LAM_SUCCESS or error code on failure. + */ +typedef int (*mca_ptl_base_module_progress_fn_t)( + mca_ptl_tstamp_t tstamp +); + /** * PTL module descriptor. Contains module version information @@ -189,6 +220,8 @@ struct mca_ptl_base_module_1_0_0_t { mca_base_module_t ptlm_version; mca_base_module_data_1_0_0_t ptlm_data; mca_ptl_base_module_init_fn_t ptlm_init; + mca_ptl_base_module_control_fn_t ptlm_control; + mca_ptl_base_module_progress_fn_t ptlm_progress; }; typedef struct mca_ptl_base_module_1_0_0_t mca_ptl_base_module_1_0_0_t; typedef struct mca_ptl_base_module_1_0_0_t mca_ptl_base_module_t; diff --git a/src/mca/ptl/tcp/src/ptl_tcp.h b/src/mca/ptl/tcp/src/ptl_tcp.h index 2c61e953dc..ced839875b 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp.h +++ b/src/mca/ptl/tcp/src/ptl_tcp.h @@ -77,6 +77,22 @@ extern mca_ptl_t** mca_ptl_tcp_module_init( bool *have_hidden_threads ); +/** + * TCP module control. + */ +extern int mca_ptl_tcp_module_control( + int param, + void* value, + size_t size +); + +/** + * TCP module progress. + */ +extern int mca_ptl_tcp_module_progress( + mca_ptl_tstamp_t tstamp +); + /** * TCP PTL Interface */ diff --git a/src/mca/ptl/tcp/src/ptl_tcp_module.c b/src/mca/ptl/tcp/src/ptl_tcp_module.c index 7ae39f466b..6e444939be 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_module.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_module.c @@ -55,7 +55,9 @@ mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = { false }, - mca_ptl_tcp_module_init /* module init */ + mca_ptl_tcp_module_init, + mca_ptl_tcp_module_control, + mca_ptl_tcp_module_progress, } }; @@ -135,6 +137,25 @@ int mca_ptl_tcp_module_open(void) int mca_ptl_tcp_module_close(void) { + if (mca_ptl_tcp_module.tcp_send_requests.fl_num_allocated != + mca_ptl_tcp_module.tcp_send_requests.super.lam_list_length) { + lam_output(0, "tcp send requests: %d allocated %d returned\n", + mca_ptl_tcp_module.tcp_send_requests.fl_num_allocated != + mca_ptl_tcp_module.tcp_send_requests.super.lam_list_length); + } + if (mca_ptl_tcp_module.tcp_send_frags.fl_num_allocated != + mca_ptl_tcp_module.tcp_send_frags.super.lam_list_length) { + lam_output(0, "tcp send frags: %d allocated %d returned\n", + mca_ptl_tcp_module.tcp_send_frags.fl_num_allocated != + mca_ptl_tcp_module.tcp_send_frags.super.lam_list_length); + } + if (mca_ptl_tcp_module.tcp_recv_frags.fl_num_allocated != + mca_ptl_tcp_module.tcp_recv_frags.super.lam_list_length) { + lam_output(0, "tcp recv frags: %d allocated %d returned\n", + mca_ptl_tcp_module.tcp_recv_frags.fl_num_allocated != + mca_ptl_tcp_module.tcp_recv_frags.super.lam_list_length); + } + free(mca_ptl_tcp_module.tcp_if_include); free(mca_ptl_tcp_module.tcp_if_exclude); if (NULL != mca_ptl_tcp_module.tcp_ptls) @@ -202,11 +223,14 @@ static int mca_ptl_tcp_module_create_instances(void) if(if_index < 0) { lam_output(0,"mca_ptl_tcp_module_init: invalid interface \"%s\"", if_name); } else { + lam_output(0,"interface: %s\n", if_name); mca_ptl_tcp_create(if_index); } argv++; } lam_argv_free(include); + if(mca_ptl_tcp_module.tcp_num_ptls) + return LAM_SUCCESS; /* if the interface list was not specified by the user, create * a PTL for each interface that was not excluded. @@ -224,8 +248,10 @@ static int mca_ptl_tcp_module_create_instances(void) argv++; } /* if this interface was not found in the excluded list - create a PTL */ - if(argv == 0 || *argv == 0) + if(argv == 0 || *argv == 0) { + lam_output(0,"interface: %s\n", if_name); mca_ptl_tcp_create(if_index); + } } lam_argv_free(exclude); return LAM_SUCCESS; @@ -292,7 +318,6 @@ static int mca_ptl_tcp_module_create_listen(void) LAM_EV_READ|LAM_EV_PERSIST, mca_ptl_tcp_module_recv_handler, 0); - lam_event_add(&mca_ptl_tcp_module.tcp_recv_event, 0); return LAM_SUCCESS; } @@ -387,6 +412,37 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls, return ptls; } +/* + * TCP module control + */ + +int mca_ptl_tcp_module_control(int param, void* value, size_t size) +{ + switch(param) { + case MCA_PTL_ENABLE: + if(*(int*)value) + lam_event_add(&mca_ptl_tcp_module.tcp_recv_event, 0); + else + lam_event_del(&mca_ptl_tcp_module.tcp_recv_event); + break; + default: + break; + } + return LAM_SUCCESS; +} + + +/* + * TCP module progress. + */ + +int mca_ptl_tcp_module_progress(mca_ptl_tstamp_t tstamp) +{ + lam_event_loop(LAM_EVLOOP_ONCE); + return LAM_SUCCESS; +} + + /* * Called by mca_ptl_tcp_module_recv() when the TCP listen * socket has pending connection requests. Accept incoming diff --git a/src/mca/ptl/tcp/src/ptl_tcp_peer.c b/src/mca/ptl/tcp/src/ptl_tcp_peer.c index 38410f8c7e..5bb69f887f 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_peer.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_peer.c @@ -111,6 +111,7 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd)) { mca_ptl_tcp_send_frag_progress(frag); } else { + lam_output(0, "send incomplete\n"); ptl_peer->peer_send_frag = frag; lam_event_add(&ptl_peer->peer_send_event, 0); } diff --git a/src/mca/ptl/tcp/src/ptl_tcp_proc.c b/src/mca/ptl/tcp/src/ptl_tcp_proc.c index 83b5ee2db3..8b2894afe7 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_proc.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_proc.c @@ -221,8 +221,10 @@ bool mca_ptl_tcp_proc_accept(mca_ptl_tcp_proc_t* ptl_proc, struct sockaddr_in* a THREAD_LOCK(&ptl_proc->proc_lock); for(i=0; iproc_peer_count; i++) { mca_ptl_base_peer_t* ptl_peer = ptl_proc->proc_peers[i]; - if(mca_ptl_tcp_peer_accept(ptl_peer, addr, sd)) + if(mca_ptl_tcp_peer_accept(ptl_peer, addr, sd)) { + THREAD_UNLOCK(&ptl_proc->proc_lock); return true; + } } THREAD_UNLOCK(&ptl_proc->proc_lock); return false; diff --git a/src/runtime/lam_mpi_init.c b/src/runtime/lam_mpi_init.c index 6def0cb6d8..30746a374a 100644 --- a/src/runtime/lam_mpi_init.c +++ b/src/runtime/lam_mpi_init.c @@ -122,8 +122,7 @@ int lam_mpi_init(int argc, char **argv, int requested, int *provided) default */ mca_base_param_lookup_int(param, &value); - /* lam_mpi_param_check = (bool) value; */ - lam_mpi_param_check = false; + lam_mpi_param_check = (bool) value; /* do module exchange */ if (LAM_SUCCESS != (ret = mca_base_modex_exchange())) { @@ -139,6 +138,11 @@ int lam_mpi_init(int argc, char **argv, int requested, int *provided) } free(procs); + /* start PTL's */ + param = 1; + if (LAM_SUCCESS != (ret = mca_pml.pml_control(MCA_PTL_ENABLE, ¶m, sizeof(param)))) + return ret; + /* save the resulting thread levels */ lam_mpi_thread_requested = requested;