From 9280e182f5ffb5b0aba19270fab07435d4cdebed Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Mon, 2 Aug 2004 21:24:00 +0000 Subject: [PATCH] - changed oob i/f functions to accept tag parameter - tcp implementation in progress - changed calls to oob to reflect additional parameter This commit was SVN r1839. --- src/class/ompi_free_list.h | 98 ++++-- src/mca/base/mca_base_module_exchange.c | 12 +- src/mca/ns/base/ns_base_remote_fns.c | 4 +- src/mca/ns/replica/src/ns_replica_component.c | 2 +- src/mca/oob/base/base.h | 4 +- src/mca/oob/base/oob_base_init.c | 22 +- src/mca/oob/base/oob_base_open.c | 7 +- src/mca/oob/base/oob_base_recv.c | 19 +- src/mca/oob/base/oob_base_recv_nb.c | 17 +- src/mca/oob/base/oob_base_send.c | 10 +- src/mca/oob/base/oob_base_send_nb.c | 29 +- src/mca/oob/cofs/src/oob_cofs.c | 56 ++-- src/mca/oob/cofs/src/oob_cofs.h | 6 +- src/mca/oob/cofs/src/oob_cofs_component.c | 5 +- src/mca/oob/oob.h | 83 +++-- src/mca/oob/tcp/oob_tcp.c | 291 ++++++++++++++---- src/mca/oob/tcp/oob_tcp.h | 101 +++++- src/mca/oob/tcp/oob_tcp_hdr.h | 30 ++ src/mca/oob/tcp/oob_tcp_msg.c | 168 +++++++--- src/mca/oob/tcp/oob_tcp_msg.h | 126 ++++++-- src/mca/oob/tcp/oob_tcp_peer.c | 258 +++++++++++++--- src/mca/oob/tcp/oob_tcp_peer.h | 26 +- src/mca/oob/tcp/oob_tcp_recv.c | 266 ++++++++-------- src/mca/oob/tcp/oob_tcp_send.c | 82 +++-- src/mca/ptl/ptl.h | 18 +- src/threads/condition_spinlock.h | 2 + 26 files changed, 1263 insertions(+), 479 deletions(-) create mode 100644 src/mca/oob/tcp/oob_tcp_hdr.h diff --git a/src/class/ompi_free_list.h b/src/class/ompi_free_list.h index b4a9d5b254..b5afd8ab26 100644 --- a/src/class/ompi_free_list.h +++ b/src/class/ompi_free_list.h @@ -7,6 +7,8 @@ #include "ompi_config.h" #include "class/ompi_list.h" +#include "threads/thread.h" +#include "threads/condition.h" #include "include/constants.h" #include "mca/mpool/mpool.h" @@ -20,16 +22,30 @@ struct ompi_free_list_t int fl_max_to_alloc; int fl_num_allocated; int fl_num_per_alloc; + int fl_num_waiting; size_t fl_elem_size; ompi_class_t* fl_elem_class; mca_mpool_base_module_t* fl_mpool; ompi_mutex_t fl_lock; + ompi_condition_t fl_condition; }; typedef struct ompi_free_list_t ompi_free_list_t; +/** + * Initialize a free list. + * + * @param free_list (IN) Free list. + * @param element_size (IN) Size of each element. + * @param element_class (IN) ompi_class_t of element - used to initialize allocated elements. + * @param num_elements_to_alloc Initial number of elements to allocate. + * @param max_elements_to_alloc Maximum number of elements to allocate. + * @param num_elements_per_alloc Number of elements to grow by per allocation. + * @param mpool Optional memory pool for allocation.s + */ + int ompi_free_list_init( - ompi_free_list_t *flist, + ompi_free_list_t *free_list, size_t element_size, ompi_class_t* element_class, int num_elements_to_alloc, @@ -39,7 +55,19 @@ int ompi_free_list_init( int ompi_free_list_grow(ompi_free_list_t* flist, size_t num_elements); - +/** + * Attemp to obtain an item from a free list. + * + * @param fl (IN) Free list. + * @param item (OUT) Allocated item. + * @param rc (OUT) OMPI_SUCCESS or error status on failure. + * + * If the requested item is not available the free list is grown to + * accomodate the request - unless the max number of allocations has + * been reached. If this is the case - an out of resource error is + * returned to the caller. + */ + #define OMPI_FREE_LIST_GET(fl, item, rc) \ { \ if(ompi_using_threads()) { \ @@ -60,30 +88,56 @@ int ompi_free_list_grow(ompi_free_list_t* flist, size_t num_elements); rc = (NULL == item) ? OMPI_ERR_TEMP_OUT_OF_RESOURCE : OMPI_SUCCESS; \ } +/** + * Blocking call to obtain an item from a free list. + * + * @param fl (IN) Free list. + * @param item (OUT) Allocated item. + * @param rc (OUT) OMPI_SUCCESS or error status on failure. + * + * If the requested item is not available the free list is grown to + * accomodate the request - unless the max number of allocations has + * been reached. In this case the caller is blocked until an item + * is returned to the list. + */ + -#define OMPI_FREE_LIST_WAIT(fl, item, rc) \ -{ \ - if(ompi_using_threads()) { \ - ompi_mutex_lock(&((fl)->fl_lock)); \ - item = ompi_list_remove_first(&((fl)->super)); \ - if(NULL == item) { \ - ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \ - item = ompi_list_remove_first(&((fl)->super)); \ - } \ - ompi_mutex_unlock(&((fl)->fl_lock)); \ - } else { \ - item = ompi_list_remove_first(&((fl)->super)); \ - if(NULL == item) { \ - ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \ - item = ompi_list_remove_first(&((fl)->super)); \ - } \ - } \ - rc = (NULL == item) ? OMPI_ERR_TEMP_OUT_OF_RESOURCE : OMPI_SUCCESS; \ +#define OMPI_FREE_LIST_WAIT(fl, item, rc) \ +{ \ + OMPI_THREAD_LOCK(&((fl)->fl_lock)); \ + item = ompi_list_remove_first(&((fl)->super)); \ + while(NULL == item) { \ + if((fl)->fl_max_to_alloc > (fl)->fl_num_allocated) { \ + (fl)->fl_num_waiting++; \ + ompi_condition_wait(&((fl)->fl_condition), &((fl)->fl_lock)); \ + (fl)->fl_num_waiting--; \ + } else { \ + ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \ + } \ + item = ompi_list_remove_first(&((fl)->super)); \ + } \ + OMPI_THREAD_UNLOCK(&((fl)->fl_lock)); \ + rc = (NULL == item) ? OMPI_ERR_OUT_OF_RESOURCE : OMPI_SUCCESS; \ } -#define OMPI_FREE_LIST_RETURN(fl, item) \ - THREAD_SCOPED_LOCK(&((fl)->fl_lock), ompi_list_prepend(&((fl)->super), (item))); +/** + * Return an item to a free list. + * + * @param fl (IN) Free list. + * @param item (OUT) Allocated item. + * + */ + +#define OMPI_FREE_LIST_RETURN(fl, item) \ +{ \ + OMPI_THREAD_LOCK(&(fl)->fl_lock); \ + ompi_list_prepend(&((fl)->super), (item)); \ + if((fl)->fl_num_waiting > 0) { \ + ompi_condition_signal(&((fl)->fl_condition)); \ + } \ + OMPI_THREAD_UNLOCK(&(fl)->fl_lock); \ +} #endif diff --git a/src/mca/base/mca_base_module_exchange.c b/src/mca/base/mca_base_module_exchange.c index 6af9399e7a..83261b14e5 100644 --- a/src/mca/base/mca_base_module_exchange.c +++ b/src/mca/base/mca_base_module_exchange.c @@ -125,8 +125,10 @@ static mca_base_modex_module_t* mca_base_modex_create_module( * during mca_base_modex_exchange(). */ -int mca_base_modex_send(mca_base_component_t *source_component, - const void *buffer, size_t size) +int mca_base_modex_send( + mca_base_component_t *source_component, + const void *buffer, + size_t size) { ompi_proc_t *self = ompi_proc_local(); mca_base_modex_t* modex; @@ -243,7 +245,7 @@ int mca_base_modex_exchange(void) iov.iov_base = self_module->module_data; iov.iov_len = self_module->module_data_size; - rc = mca_oob_send(&proc->proc_name, &iov, 1, 0); + rc = mca_oob_send(&proc->proc_name, &iov, 1, MCA_OOB_TAG_ANY, 0); if(rc != iov.iov_len) { free(procs); OMPI_THREAD_UNLOCK(&self->proc_lock); @@ -284,7 +286,7 @@ int mca_base_modex_exchange(void) return OMPI_ERR_OUT_OF_RESOURCE; } - size = mca_oob_recv(&proc->proc_name, 0, 0, MCA_OOB_TRUNC|MCA_OOB_PEEK); + size = mca_oob_recv(&proc->proc_name, 0, 0, MCA_OOB_TAG_ANY, MCA_OOB_TRUNC|MCA_OOB_PEEK); if(size <= 0) { free(procs); OMPI_THREAD_UNLOCK(&proc->proc_lock); @@ -297,7 +299,7 @@ int mca_base_modex_exchange(void) iov.iov_base = proc_module->module_data; iov.iov_len = size; - rc = mca_oob_recv(&proc->proc_name, &iov, 1, 0); + rc = mca_oob_recv(&proc->proc_name, &iov, 1, MCA_OOB_TAG_ANY, 0); if(rc != size) { free(procs); OMPI_THREAD_UNLOCK(&proc->proc_lock); diff --git a/src/mca/ns/base/ns_base_remote_fns.c b/src/mca/ns/base/ns_base_remote_fns.c index 74b0e33675..8fee8e4cdd 100644 --- a/src/mca/ns/base/ns_base_remote_fns.c +++ b/src/mca/ns/base/ns_base_remote_fns.c @@ -31,11 +31,11 @@ mca_ns_base_cellid_t ns_base_create_cellid(void) msg.iov_base = (char*)&cmd; msg.iov_len = sizeof(cmd); - if (0 > mca_oob_send(&mca_ns_my_replica, &msg, 1, 0)) { /* error on send */ + if (0 > mca_oob_send(&mca_ns_my_replica, &msg, 1, MCA_OOB_TAG_ANY, 0)) { /* error on send */ return 0; } - if (0 > mca_oob_recv(&mca_ns_my_replica, &msg, 1, 0)) { /* error on recv */ + if (0 > mca_oob_recv(&mca_ns_my_replica, &msg, 1, MCA_OOB_TAG_ANY, 0)) { /* error on recv */ return 0; } diff --git a/src/mca/ns/replica/src/ns_replica_component.c b/src/mca/ns/replica/src/ns_replica_component.c index 152011cd17..3793e1b0ef 100644 --- a/src/mca/ns/replica/src/ns_replica_component.c +++ b/src/mca/ns/replica/src/ns_replica_component.c @@ -187,7 +187,7 @@ mca_oob_callback_fn_t mca_ns_replica_recv(int status, const ompi_process_name_t reply.iov_base = (char*)&answer; reply.iov_len = sizeof(answer); - mca_oob_send(sender, &reply, 1, 0); + mca_oob_send(sender, &reply, 1, MCA_OOB_TAG_ANY, 0); } } return OMPI_SUCCESS; diff --git a/src/mca/oob/base/base.h b/src/mca/oob/base/base.h index f1fd405cf8..5313789e22 100644 --- a/src/mca/oob/base/base.h +++ b/src/mca/oob/base/base.h @@ -19,7 +19,7 @@ * This is the first module on the list. This is here temporarily * to make things work */ -extern mca_oob_base_module_t mca_oob; +extern mca_oob_t mca_oob; /** * associate a component and a module that belongs to it @@ -27,7 +27,7 @@ extern mca_oob_base_module_t mca_oob; struct mca_oob_base_info_t { ompi_list_item_t super; mca_oob_base_component_t *oob_component; - mca_oob_base_module_t *oob_module; + mca_oob_t *oob_module; }; typedef struct mca_oob_base_info_t mca_oob_base_info_t; diff --git a/src/mca/oob/base/oob_base_init.c b/src/mca/oob/base/oob_base_init.c index c45730fd85..df33bd6030 100644 --- a/src/mca/oob/base/oob_base_init.c +++ b/src/mca/oob/base/oob_base_init.c @@ -16,7 +16,7 @@ OBJ_CLASS_INSTANCE( - mca_oob_base_module_t, + mca_oob_t, ompi_list_item_t, NULL, NULL @@ -28,8 +28,9 @@ OBJ_CLASS_INSTANCE( NULL ); -ompi_process_name_t mca_oob_base_self; -ompi_process_name_t mca_oob_base_any; +ompi_process_name_t mca_oob_name_seed; +ompi_process_name_t mca_oob_name_self; +ompi_process_name_t mca_oob_name_any; /** * Function for selecting one module from all those that are @@ -43,21 +44,26 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads) mca_base_component_list_item_t *cli; mca_oob_base_info_t * first; mca_oob_base_component_t *component; - mca_oob_base_module_t *module; + mca_oob_t *module; extern ompi_list_t mca_oob_base_components; ompi_process_name_t *self; /* setup local name */ + OBJ_CONSTRUCT(&mca_oob_name_self, ompi_process_name_t); self = mca_pcm.pcm_self(); if(NULL == self) { return OMPI_ERROR; } - mca_oob_base_self = *self; + mca_oob_name_self = *self; /* setup wildcard name */ - mca_oob_base_any.cellid = -1; - mca_oob_base_any.jobid = -1; - mca_oob_base_any.vpid = -1; + OBJ_CONSTRUCT(&mca_oob_name_any, ompi_process_name_t); + mca_oob_name_any.cellid = -1; + mca_oob_name_any.jobid = -1; + mca_oob_name_any.vpid = -1; + + /* setup seed daemons name */ + OBJ_CONSTRUCT(&mca_oob_name_seed, ompi_process_name_t); /* Traverse the list of available modules; call their init functions. */ for (item = ompi_list_get_first(&mca_oob_base_components); diff --git a/src/mca/oob/base/oob_base_open.c b/src/mca/oob/base/oob_base_open.c index 306f926ea0..11a322acdd 100644 --- a/src/mca/oob/base/oob_base_open.c +++ b/src/mca/oob/base/oob_base_open.c @@ -21,7 +21,7 @@ /* * Global variables */ -mca_oob_base_module_t mca_oob; +mca_oob_t mca_oob; int mca_oob_base_output = -1; ompi_list_t mca_oob_base_components; ompi_list_t mca_oob_base_modules; @@ -35,13 +35,14 @@ int mca_oob_base_open(void) { /* Open up all available components */ + OBJ_CONSTRUCT(&mca_oob_base_components, ompi_list_t); + OBJ_CONSTRUCT(&mca_oob_base_modules, ompi_list_t); + if (OMPI_SUCCESS != mca_base_components_open("oob", 0, mca_oob_base_static_components, &mca_oob_base_components)) { return OMPI_ERROR; } - OBJ_CONSTRUCT(&mca_oob_base_components, ompi_list_t); - OBJ_CONSTRUCT(&mca_oob_base_modules, ompi_list_t); /* All done */ diff --git a/src/mca/oob/base/oob_base_recv.c b/src/mca/oob/base/oob_base_recv.c index 1154d39a0f..bb91a31f22 100644 --- a/src/mca/oob/base/oob_base_recv.c +++ b/src/mca/oob/base/oob_base_recv.c @@ -10,13 +10,14 @@ * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * iovec array without removing the message from the queue. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int flags) +int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int tag, int flags) { - return(mca_oob.oob_recv(peer, msg, count, flags)); + return(mca_oob.oob_recv(peer, msg, count, tag, flags)); } /* @@ -26,12 +27,18 @@ int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param count (IN) Number of elements in iovec array. + * @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * iovec array without removing the message from the queue. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_recv_ntoh(ompi_process_name_t* peer, const struct iovec *msg, - const mca_oob_base_type_t *types, int count, int flags) +int mca_oob_recv_ntoh( + ompi_process_name_t* peer, + const struct iovec *msg, + const mca_oob_base_type_t *types, + int count, + int tag, + int flags) { int rc, num, i = 0; struct iovec * orig; @@ -63,7 +70,7 @@ int mca_oob_recv_ntoh(ompi_process_name_t* peer, const struct iovec *msg, } } /* now the new buffers are ready. do the recieve */ - rc = mca_oob.oob_recv(peer, orig, count, flags); + rc = mca_oob.oob_recv(peer, orig, count, tag, flags); /* now we have to do the conversions */ for(i = 0; i < count; i++) { if(types[i] == MCA_OOB_BASE_INT16) { @@ -85,7 +92,7 @@ int mca_oob_recv_ntoh(ompi_process_name_t* peer, const struct iovec *msg, /* free the iovecs we allocated */ free(orig); } else { - rc = mca_oob.oob_recv(peer, msg, count, flags); + rc = mca_oob.oob_recv(peer, msg, count, tag, flags); } return rc; } diff --git a/src/mca/oob/base/oob_base_recv_nb.c b/src/mca/oob/base/oob_base_recv_nb.c index 5df83110b1..275dad81ca 100644 --- a/src/mca/oob/base/oob_base_recv_nb.c +++ b/src/mca/oob/base/oob_base_recv_nb.c @@ -12,11 +12,11 @@ struct mca_oob_base_cb_data_t { /* this is the callback function we will register when we have to do any conversion */ static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer, const struct iovec* msg, - size_t count, void* cbdata); + int count, int tag, void* cbdata); static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer, const struct iovec* msg, - size_t count, void* cbdata) + int count, int tag, void* cbdata) { int i, num; struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata; @@ -43,7 +43,7 @@ static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer, /* free the iovecs we allocated */ free((void *)msg); /* call the user callback function */ - cb_struct->user_callback(status, peer, user_iovec, count, cb_struct->user_data); + cb_struct->user_callback(status, peer, user_iovec, count, tag, cb_struct->user_data); /* free the cb structure */ free(cb_struct); return; @@ -60,10 +60,10 @@ static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer, * @param cbdata (IN) User data that is passed to callback function. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int count, int flags, +int mca_oob_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int count, int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { - return(mca_oob.oob_recv_nb(peer, msg, count, flags, cbfunc, cbdata)); + return(mca_oob.oob_recv_nb(peer, msg, count, tag, flags, cbfunc, cbdata)); } /* @@ -80,7 +80,7 @@ int mca_oob_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int coun */ int mca_oob_recv_ntoh_nb(ompi_process_name_t* peer, const struct iovec* msg, - const mca_oob_base_type_t* types, int count, int flags, + const mca_oob_base_type_t* types, int count, int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { int rc, i = 0; @@ -120,10 +120,9 @@ int mca_oob_recv_ntoh_nb(ompi_process_name_t* peer, const struct iovec* msg, } } /* now the new buffers are ready. do the recieve */ - rc = mca_oob.oob_recv_nb(peer, orig, count, flags, &mca_oob_base_recv_cb, - (void *) cb_struct); + rc = mca_oob.oob_recv_nb(peer, orig, count, tag, flags, mca_oob_base_recv_cb, cb_struct); } else { - rc = mca_oob.oob_recv_nb(peer, msg, count, flags, cbfunc, cbdata); + rc = mca_oob.oob_recv_nb(peer, msg, count, tag, flags, cbfunc, cbdata); } return rc; } diff --git a/src/mca/oob/base/oob_base_send.c b/src/mca/oob/base/oob_base_send.c index 1c35601454..d55e82f6a4 100644 --- a/src/mca/oob/base/oob_base_send.c +++ b/src/mca/oob/base/oob_base_send.c @@ -11,9 +11,9 @@ * @return OMPI error code (<0) on error number of bytes actually sent. */ -int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int flags) +int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int tag, int flags) { - return(mca_oob.oob_send(peer, msg, count, flags)); + return(mca_oob.oob_send(peer, msg, count, tag, flags)); } /* @@ -28,7 +28,7 @@ int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int c */ int mca_oob_send_hton(const ompi_process_name_t* peer, const struct iovec *msg, - const mca_oob_base_type_t *types, int count, int flags) + const mca_oob_base_type_t *types, int count, int tag, int flags) { int rc, i = 0; struct iovec * converted; @@ -65,7 +65,7 @@ int mca_oob_send_hton(const ompi_process_name_t* peer, const struct iovec *msg, mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32); } } - rc = mca_oob.oob_send(peer, converted, count, flags); + rc = mca_oob.oob_send(peer, converted, count, tag, flags); /* clean up any space we allocated */ for(i = 0; i < count; i++) { if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { @@ -74,7 +74,7 @@ int mca_oob_send_hton(const ompi_process_name_t* peer, const struct iovec *msg, } free(converted); } else { - rc = mca_oob.oob_send(peer, msg, count, flags); + rc = mca_oob.oob_send(peer, msg, count, tag, flags); } return rc; diff --git a/src/mca/oob/base/oob_base_send_nb.c b/src/mca/oob/base/oob_base_send_nb.c index 3f8fc8ddaf..2e9b6dcd5b 100644 --- a/src/mca/oob/base/oob_base_send_nb.c +++ b/src/mca/oob/base/oob_base_send_nb.c @@ -10,13 +10,13 @@ struct mca_oob_base_cb_data_t { }; /* this is the callback function we will register when we have to do any conversion */ -static void mca_oob_base_send_cb(int status, const ompi_process_name_t* peer, - const struct iovec* msg, - size_t count, void* cbdata); - -static void mca_oob_base_send_cb(int status, const ompi_process_name_t* peer, - const struct iovec* msg, - size_t count, void* cbdata) +static void mca_oob_base_send_cb( + int status, + const ompi_process_name_t* peer, + const struct iovec* msg, + int count, + int tag, + void* cbdata) { int i; struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata; @@ -28,12 +28,11 @@ static void mca_oob_base_send_cb(int status, const ompi_process_name_t* peer, } free((void *)msg); /* call the user callback function */ - cb_struct->user_callback(status, peer, cb_struct->user_iovec, count, cb_struct->user_data); + cb_struct->user_callback(status, peer, cb_struct->user_iovec, count, tag, cb_struct->user_data); free(cb_struct); return; } - /* @@ -49,10 +48,10 @@ static void mca_oob_base_send_cb(int status, const ompi_process_name_t* peer, * */ -int mca_oob_send_nb(const ompi_process_name_t* peer, const struct iovec* msg, int count, +int mca_oob_send_nb(const ompi_process_name_t* peer, const struct iovec* msg, int count, int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { - return(mca_oob.oob_send_nb(peer, msg, count, flags, cbfunc, cbdata)); + return(mca_oob.oob_send_nb(peer, msg, count, tag, flags, cbfunc, cbdata)); } /* @@ -69,7 +68,7 @@ int mca_oob_send_nb(const ompi_process_name_t* peer, const struct iovec* msg, in */ int mca_oob_send_hton_nb(const ompi_process_name_t* peer, const struct iovec* msg, - const mca_oob_base_type_t* types, int count, int flags, + const mca_oob_base_type_t* types, int count, int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { int rc, i = 0; @@ -113,11 +112,9 @@ int mca_oob_send_hton_nb(const ompi_process_name_t* peer, const struct iovec* ms mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32); } } - rc = mca_oob.oob_send_nb(peer, converted, count, flags, - &mca_oob_base_send_cb, - (void *) cb_struct); + rc = mca_oob.oob_send_nb(peer, converted, count, tag, flags, mca_oob_base_send_cb, cb_struct); } else { - rc = mca_oob.oob_send_nb(peer, msg, count, flags, cbfunc, cbdata); + rc = mca_oob.oob_send_nb(peer, msg, count, tag, flags, cbfunc, cbdata); } return rc; } diff --git a/src/mca/oob/cofs/src/oob_cofs.c b/src/mca/oob/cofs/src/oob_cofs.c index 2a91f1dc17..ce5cd184cc 100644 --- a/src/mca/oob/cofs/src/oob_cofs.c +++ b/src/mca/oob/cofs/src/oob_cofs.c @@ -20,7 +20,7 @@ #include #include -static int do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int flags); +static int do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int tag, int flags); /** * Similiar to unix send(2). @@ -36,6 +36,7 @@ int mca_oob_cofs_send( const ompi_process_name_t* peer, const struct iovec *iov, int count, + int tag, int flags) { FILE *fp; @@ -45,14 +46,14 @@ int mca_oob_cofs_send( char msg_file_tmp[OMPI_PATH_MAX]; /* create the file and open it... */ - snprintf(msg_file, OMPI_PATH_MAX, "%s/%d_%d_%d_%ld.msg", mca_oob_cofs_comm_loc, - ompi_name_server.get_jobid(&mca_oob_base_self), - ompi_name_server.get_vpid(&mca_oob_base_self), - ompi_name_server.get_vpid(peer), (long)mca_oob_cofs_serial); - snprintf(msg_file_tmp, OMPI_PATH_MAX, "%s/.%d_%d_%d_%ld.msg", mca_oob_cofs_comm_loc, - ompi_name_server.get_jobid(&mca_oob_base_self), - ompi_name_server.get_vpid(&mca_oob_base_self), - ompi_name_server.get_vpid(peer), (long)mca_oob_cofs_serial); + snprintf(msg_file, OMPI_PATH_MAX, "%s/%d_%d_%d_%d_%ld.msg", mca_oob_cofs_comm_loc, + ompi_name_server.get_jobid(&mca_oob_name_self), + ompi_name_server.get_vpid(&mca_oob_name_self), + ompi_name_server.get_vpid(peer), tag, (long)mca_oob_cofs_serial); + snprintf(msg_file_tmp, OMPI_PATH_MAX, "%s/.%d_%d_%d_%d_%ld.msg", mca_oob_cofs_comm_loc, + ompi_name_server.get_jobid(&mca_oob_name_self), + ompi_name_server.get_vpid(&mca_oob_name_self), + ompi_name_server.get_vpid(peer), tag, (long)mca_oob_cofs_serial); fp = fopen(msg_file_tmp, "w"); if (fp == NULL) { @@ -90,24 +91,30 @@ int mca_oob_cofs_send_nb( const ompi_process_name_t* peer, const struct iovec *iov, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { - int status = mca_oob_cofs_send(peer, iov, count, flags); + int status = mca_oob_cofs_send(peer, iov, count, tag, flags); if(NULL != cbfunc) - cbfunc(status, peer, iov, count, cbdata); + cbfunc(status, peer, iov, count, tag, cbdata); return status; } int -mca_oob_cofs_recv(ompi_process_name_t* peer, const struct iovec* iov, int count, int flags) +mca_oob_cofs_recv( + ompi_process_name_t* peer, + const struct iovec* iov, + int count, + int tag, + int flags) { int ret = OMPI_ERR_WOULD_BLOCK; while (ret == OMPI_ERR_WOULD_BLOCK) { ret = do_recv(ompi_name_server.get_jobid(peer), - ompi_name_server.get_vpid(peer), iov, count, flags); + ompi_name_server.get_vpid(peer), iov, count, tag, flags); sleep(1); } return ret; @@ -119,24 +126,25 @@ mca_oob_cofs_recv_nb( ompi_process_name_t* peer, const struct iovec* iov, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { - int status = mca_oob_cofs_recv(peer, iov, count, flags); + int status = mca_oob_cofs_recv(peer, iov, count, tag, flags); if(NULL != cbfunc) - cbfunc(status, peer, iov, count, cbdata); + cbfunc(status, peer, iov, count, tag, cbdata); return status; } static char* -find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid) +find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int tag) { DIR* dir; struct dirent *ent; unsigned long tmp_serial; - int tmp_jobid, tmp_procid, tmp_myprocid; + int tmp_jobid, tmp_procid, tmp_myprocid, tmp_tag; int ret; bool found = false; char best_name[OMPI_PATH_MAX]; @@ -150,21 +158,23 @@ find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid) while ((ent = readdir(dir)) != NULL) { if (ent->d_name[0] == '.') continue; - ret = sscanf(ent->d_name, "%d_%d_%d_%lu.msg", &tmp_jobid, &tmp_procid, - &tmp_myprocid, &tmp_serial); - if (ret != 4) { + ret = sscanf(ent->d_name, "%d_%d_%d_%d_%lu.msg", &tmp_jobid, &tmp_procid, + &tmp_myprocid, &tmp_tag, &tmp_serial); + if (ret != 5) { continue; } if (tmp_jobid != jobid) { continue; } - if (tmp_myprocid != ompi_name_server.get_vpid(&mca_oob_base_self)) { + if (tmp_myprocid != ompi_name_server.get_vpid(&mca_oob_name_self)) { continue; } if (tmp_procid != procid) { continue; } + if (tag != MCA_OOB_TAG_ANY && tag != tmp_tag) + continue; /* do best one here... */ found = true; @@ -184,7 +194,7 @@ find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid) static int -do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int flags) +do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int tag, int flags) { char *fname; char full_fname[OMPI_PATH_MAX]; @@ -192,7 +202,7 @@ do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec size_t rlen; size_t size; - fname = find_match(jobid, procid); + fname = find_match(jobid, procid, tag); if (fname == NULL) { return OMPI_ERR_WOULD_BLOCK; } diff --git a/src/mca/oob/cofs/src/oob_cofs.h b/src/mca/oob/cofs/src/oob_cofs.h index f76bbdf88b..801a73401b 100644 --- a/src/mca/oob/cofs/src/oob_cofs.h +++ b/src/mca/oob/cofs/src/oob_cofs.h @@ -20,7 +20,7 @@ int mca_oob_cofs_close(void); /* * Startup / Shutdown */ -mca_oob_base_module_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads); +mca_oob_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads); int mca_oob_cofs_finalize(void); @@ -39,6 +39,7 @@ int mca_oob_cofs_send( const ompi_process_name_t*, const struct iovec* msg, int count, + int tag, int flags); @@ -58,6 +59,7 @@ int mca_oob_cofs_recv( ompi_process_name_t* peer, const struct iovec *msg, int count, + int tag, int flags); @@ -78,6 +80,7 @@ int mca_oob_cofs_send_nb( const ompi_process_name_t* peer, const struct iovec* msg, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); @@ -99,6 +102,7 @@ int mca_oob_cofs_recv_nb( ompi_process_name_t* peer, const struct iovec* msg, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); diff --git a/src/mca/oob/cofs/src/oob_cofs_component.c b/src/mca/oob/cofs/src/oob_cofs_component.c index 94a1892403..07c066049c 100644 --- a/src/mca/oob/cofs/src/oob_cofs_component.c +++ b/src/mca/oob/cofs/src/oob_cofs_component.c @@ -41,7 +41,7 @@ mca_oob_base_component_1_0_0_t mca_oob_cofs_component = { mca_oob_cofs_finalize }; -mca_oob_base_module_t mca_oob_cofs = { +mca_oob_t mca_oob_cofs = { mca_oob_cofs_send, mca_oob_cofs_recv, mca_oob_cofs_send_nb, @@ -54,8 +54,7 @@ int mca_oob_cofs_my_procid; uint64_t mca_oob_cofs_serial; -struct mca_oob_base_module_1_0_0_t* -mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads) +mca_oob_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads) { int len; char *tmp; diff --git a/src/mca/oob/oob.h b/src/mca/oob/oob.h index 6ef2c712ff..04eca958c1 100644 --- a/src/mca/oob/oob.h +++ b/src/mca/oob/oob.h @@ -12,16 +12,25 @@ #include "mca/base/base.h" #include "mca/ns/ns.h" #include + + /* - * Address for wildcard receives. + * Well known address */ -extern ompi_process_name_t mca_oob_base_any; -extern ompi_process_name_t mca_oob_base_self; +extern ompi_process_name_t mca_oob_name_any; +extern ompi_process_name_t mca_oob_name_seed; +extern ompi_process_name_t mca_oob_name_self; -#define MCA_OOB_BASE_ANY &mca_oob_base_any -#define MCA_OOB_BASE_SELF &mca_oob_base_self +#define MCA_OOB_NAME_ANY &mca_oob_name_any +#define MCA_OOB_NAME_SEED &mca_oob_name_seed +#define MCA_OOB_NAME_SELF &mca_oob_name_self +/* + * Other constants + */ + +#define MCA_OOB_TAG_ANY 0 /* * OOB API @@ -58,28 +67,31 @@ extern "C" { /** * Similiar to unix writev(2). * -* @param peer (IN) Opaque name of peer process. -* @param msg (IN) Array of iovecs describing user buffers and lengths. -* @param count (IN) Number of elements in iovec array. -* @param flags (IN) Currently unused. -* @return OMPI error code (<0) on error number of bytes actually sent. +* @param peer (IN) Opaque name of peer process. +* @param msg (IN) Array of iovecs describing user buffers and lengths. +* @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. +* @param flags (IN) Currently unused. +* @return OMPI error code (<0) on error number of bytes actually sent. */ int mca_oob_send( const ompi_process_name_t* peer, const struct iovec *msg, int count, + int tag, int flags); /** * Convert data (if required) to network byte order prior to sending to peer. * -* @param peer (IN) Opaque name of peer process. -* @param msg (IN) Array of iovecs describing user buffers and lengths. -* @param types (IN) Parallel array to iovecs describing data type of each iovec element. -* @param count (IN) Number of elements in iovec array. -* @param flags (IN) Currently unused. -* @return OMPI error code (<0) on error number of bytes actually sent. +* @param peer (IN) Opaque name of peer process. +* @param msg (IN) Array of iovecs describing user buffers and lengths. +* @param types (IN) Parallel array to iovecs describing data type of each iovec element. +* @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. +* @param flags (IN) Currently unused. +* @return OMPI error code (<0) on error number of bytes actually sent. */ int mca_oob_send_hton( @@ -87,6 +99,7 @@ int mca_oob_send_hton( const struct iovec *msg, const mca_oob_base_type_t *types, int count, + int tag, int flags); @@ -96,12 +109,18 @@ int mca_oob_send_hton( * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * iovec array without removing the message from the queue. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int flags); +int mca_oob_recv( + ompi_process_name_t* peer, + const struct iovec *msg, + int count, + int tag, + int flags); /** * Receive data and convert (if required) to host byte order. @@ -110,6 +129,7 @@ int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * iovec array without removing the message from the queue. * @return OMPI error code (<0) on error or number of bytes actually received. @@ -120,6 +140,7 @@ int mca_oob_recv_ntoh( const struct iovec *msg, const mca_oob_base_type_t *types, int count, + int tag, int flags); /* @@ -134,6 +155,7 @@ int mca_oob_recv_ntoh( * @param peer (IN) Opaque name of peer process. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param cbdata (IN) User data. */ @@ -141,7 +163,8 @@ typedef void (*mca_oob_callback_fn_t)( int status, const ompi_process_name_t* peer, const struct iovec* msg, - size_t count, + int count, + int tag, void* cbdata); /** @@ -150,6 +173,7 @@ typedef void (*mca_oob_callback_fn_t)( * @param peer (IN) Opaque name of peer process. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) Currently unused. * @param cbfunc (IN) Callback function on send completion. * @param cbdata (IN) User data that is passed to callback function. @@ -161,6 +185,7 @@ int mca_oob_send_nb( const ompi_process_name_t* peer, const struct iovec* msg, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); @@ -172,6 +197,7 @@ int mca_oob_send_nb( * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) Currently unused. * @param cbfunc (IN) Callback function on send completion. * @param cbdata (IN) User data that is passed to callback function. @@ -184,6 +210,7 @@ int mca_oob_send_hton_nb( const struct iovec* msg, const mca_oob_base_type_t* types, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); @@ -194,6 +221,7 @@ int mca_oob_send_hton_nb( * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param cbfunc (IN) Callback function on recv completion. * @param cbdata (IN) User data that is passed to callback function. @@ -204,6 +232,7 @@ int mca_oob_recv_nb( ompi_process_name_t* peer, const struct iovec* msg, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); @@ -215,6 +244,7 @@ int mca_oob_recv_nb( * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param cbfunc (IN) Callback function on recv completion. * @param cbdata (IN) User data that is passed to callback function. @@ -226,6 +256,7 @@ int mca_oob_recv_ntoh_nb( const struct iovec* msg, const mca_oob_base_type_t* types, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); @@ -243,6 +274,7 @@ int mca_oob_recv_ntoh_nb( * @param peer (IN) Opaque name of peer process. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) Currently unused. * @return OMPI error code (<0) on error number of bytes actually sent. */ @@ -251,6 +283,7 @@ typedef int (*mca_oob_base_module_send_fn_t)( const ompi_process_name_t* peer, const struct iovec *msg, int count, + int tag, int flags); @@ -261,6 +294,7 @@ typedef int (*mca_oob_base_module_send_fn_t)( * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * iovec array without removing the message from the queue. * @return OMPI error code (<0) on error or number of bytes actually received. @@ -270,6 +304,7 @@ typedef int (*mca_oob_base_module_recv_fn_t)( ompi_process_name_t* peer, const struct iovec *msg, int count, + int tag, int flags); /** @@ -278,6 +313,7 @@ typedef int (*mca_oob_base_module_recv_fn_t)( * @param peer (IN) Opaque name of peer process. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) Currently unused. * @param cbfunc (IN) Callback function on send completion. * @param cbdata (IN) User data that is passed to callback function. @@ -289,6 +325,7 @@ typedef int (*mca_oob_base_module_send_nb_fn_t)( const ompi_process_name_t* peer, const struct iovec* msg, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); @@ -299,6 +336,7 @@ typedef int (*mca_oob_base_module_send_nb_fn_t)( * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. +* @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param cbfunc (IN) Callback function on recv completion. * @param cbdata (IN) User data that is passed to callback function. @@ -309,6 +347,7 @@ typedef int (*mca_oob_base_module_recv_nb_fn_t)( ompi_process_name_t* peer, const struct iovec* msg, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); @@ -317,20 +356,20 @@ typedef int (*mca_oob_base_module_recv_nb_fn_t)( * OOB Module */ -struct mca_oob_base_module_1_0_0_t { +struct mca_oob_1_0_0_t { mca_oob_base_module_send_fn_t oob_send; mca_oob_base_module_recv_fn_t oob_recv; mca_oob_base_module_send_nb_fn_t oob_send_nb; mca_oob_base_module_recv_nb_fn_t oob_recv_nb; }; -typedef struct mca_oob_base_module_1_0_0_t mca_oob_base_module_1_0_0_t; -typedef struct mca_oob_base_module_1_0_0_t mca_oob_base_module_t; +typedef struct mca_oob_1_0_0_t mca_oob_1_0_0_t; +typedef struct mca_oob_1_0_0_t mca_oob_t; /** * OOB Component */ -typedef mca_oob_base_module_t* (*mca_oob_base_component_init_fn_t)( +typedef mca_oob_t* (*mca_oob_base_component_init_fn_t)( bool *allow_multi_user_threads, bool *have_hidden_threads); diff --git a/src/mca/oob/tcp/oob_tcp.c b/src/mca/oob/tcp/oob_tcp.c index 208d811614..40491650ef 100644 --- a/src/mca/oob/tcp/oob_tcp.c +++ b/src/mca/oob/tcp/oob_tcp.c @@ -4,8 +4,18 @@ * */ +#include +#include +#include +#include "util/output.h" #include "mca/oob/tcp/oob_tcp.h" + +static int mca_oob_tcp_create_listen(void); +static void mca_oob_tcp_recv_handler(int sd, short flags, void* user); +static void mca_oob_tcp_accept(void); + + /* * Struct of function pointers and all that to let us be initialized */ @@ -28,28 +38,48 @@ mca_oob_tcp_component_t mca_oob_tcp_component = { } }; -static struct mca_oob_base_module_1_0_0_t mca_oob_tcp = { +static mca_oob_t mca_oob_tcp = { mca_oob_tcp_send, mca_oob_tcp_recv, mca_oob_tcp_send_nb, mca_oob_tcp_recv_nb }; + +/* + * Utility function to register/lookup module parameters. + */ + +static inline int mca_oob_tcp_param_register_int( + const char* param_name, + int default_value) +{ + int id = mca_base_param_register_int("ptl","tcp",param_name,NULL,default_value); + int param_value = default_value; + mca_base_param_lookup_int(id,¶m_value); + return param_value; +} + + /* * Initialize global variables used w/in this module. */ int mca_oob_tcp_open(void) { -#if 0 - mca_oob_tcp_module.tcp_listen_port = 1; - OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_list, ompi_list_t); - OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_tree, ompi_rb_tree_t); - OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_free, ompi_free_list_t); - OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_lock, ompi_mutex_t); - OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_condition, ompi_condition_t); - OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_post_recv, ompi_list_t); - OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_msg_recv, ompi_list_t); -#endif + OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_list, ompi_list_t); + OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_tree, ompi_rb_tree_t); + OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_free, ompi_free_list_t); + OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msgs, ompi_free_list_t); + OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_lock, ompi_mutex_t); + OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_post, ompi_list_t); + OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_recv, ompi_list_t); + OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock, ompi_mutex_t); + + /* register oob module parameters */ + mca_oob_tcp_component.tcp_peer_limit = + mca_oob_tcp_param_register_int("peer_limit", -1); + mca_oob_tcp_component.tcp_peer_retries = + mca_oob_tcp_param_register_int("peer_retries", 60); return OMPI_SUCCESS; } @@ -60,65 +90,214 @@ int mca_oob_tcp_open(void) int mca_oob_tcp_close(void) { -#if 0 - OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_list); - OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_tree); - OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_free); - OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_condition); - OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_lock); - OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_post_recv); - OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_msg_recv); -#endif + OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list); + OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_tree); + OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free); + OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs); + OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock); + OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post); + OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_recv); + OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock); return OMPI_SUCCESS; } -/** -* Compare two process names for equality. -* -* @param n1 Process name 1. -* @param n2 Process name 2. -* @return (-1 for n1n2) -* -* Note that the definition of < or > is somewhat arbitrary - -* just needs to be consistently applied to maintain an ordering -* when process names are used as indices. +/* + * Called by mca_oob_tcp_recv_handler() when the TCP listen + * socket has pending connection requests. Accept incoming + * requests and queue for completion of the connection handshake. */ -static int ompi_process_name_compare(ompi_process_name_t* n1, - ompi_process_name_t* n2) +static void mca_oob_tcp_accept(void) { - if(n1->cellid < n2->cellid) - return -1; - else if(n1->cellid > n2->cellid) - return 1; - else if(n1->jobid < n2->jobid) - return -1; - else if(n1->jobid > n2->jobid) - return 1; - else if(n1->vpid < n2->vpid) - return -1; - else if(n1->vpid > n2->vpid) - return 1; - return(0); -} - + while(true) { + ompi_socklen_t addrlen = sizeof(struct sockaddr_in); + struct sockaddr_in addr; + ompi_event_t* event; + int sd = accept(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&addr, &addrlen); + if(sd < 0) { + if(errno == EINTR) + continue; + if(errno != EAGAIN || errno != EWOULDBLOCK) + ompi_output(0, "mca_oob_tcp_accept: accept() failed with errno %d.", errno); + return; + } + + /* wait for receipt of peers process identifier to complete this connection */ + event = malloc(sizeof(ompi_event_t)); + ompi_event_set(event, sd, OMPI_EV_READ|OMPI_EV_PERSIST, mca_oob_tcp_recv_handler, event); + ompi_event_add(event, 0); + } +} /* - * this function will temporarily return NULL so we don't use it + * Create a listen socket and bind to all interfaces */ -struct mca_oob_base_module_1_0_0_t* mca_oob_tcp_init(bool *allow_multi_user_threads, - bool *have_hidden_threads) + +static int mca_oob_tcp_create_listen(void) { -#if 0 - /* initialize data structures */ - ompi_rb_tree_init(&mca_oob_tcp_module.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)ompi_process_name_compare); -#endif - /* return &mca_oob_tcp; */ - return NULL; + int flags; + struct sockaddr_in inaddr; + ompi_socklen_t addrlen; + + /* create a listen socket for incoming connections */ + mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0); + if(mca_oob_tcp_component.tcp_listen_sd < 0) { + ompi_output(0,"mca_oob_tcp_component_init: socket() failed with errno=%d", errno); + return OMPI_ERROR; + } + + /* bind to all addresses and dynamically assigned port */ + memset(&inaddr, 0, sizeof(inaddr)); + inaddr.sin_family = AF_INET; + inaddr.sin_addr.s_addr = INADDR_ANY; + inaddr.sin_port = htons(5000+mca_oob_name_self.vpid); + + if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) { + ompi_output(0,"mca_oob_tcp_create_listen: bind() failed with errno=%d", errno); + return OMPI_ERROR; + } + + /* resolve system assignend port */ + addrlen = sizeof(struct sockaddr_in); + if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) { + ompi_output(0, "mca_oob_tcp_create_listen: getsockname() failed with errno=%d", errno); + return OMPI_ERROR; + } + mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port; + + /* setup listen backlog to maximum allowed by kernel */ + if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) { + ompi_output(0, "mca_oob_tcp_component_init: listen() failed with errno=%d", errno); + return OMPI_ERROR; + } + + /* set socket up to be non-blocking, otherwise accept could block */ + if((flags = fcntl(mca_oob_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) { + ompi_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed with errno=%d", errno); + return OMPI_ERROR; + } else { + flags |= O_NONBLOCK; + if(fcntl(mca_oob_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) { + ompi_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed with errno=%d", errno); + return OMPI_ERROR; + } + } + + /* register listen port */ + ompi_event_set( + &mca_oob_tcp_component.tcp_recv_event, + mca_oob_tcp_component.tcp_listen_sd, + OMPI_EV_READ|OMPI_EV_PERSIST, + mca_oob_tcp_recv_handler, + 0); + ompi_event_add(&mca_oob_tcp_component.tcp_recv_event, 0); + return OMPI_SUCCESS; } +/* + * Event callback when there is data available on the registered + * socket to recv. + */ + +static void mca_oob_tcp_recv_handler(int sd, short flags, void* user) +{ + ompi_process_name_t name; + mca_oob_tcp_peer_t* peer; + int rc; + + /* accept new connections on the listen socket */ + if(mca_oob_tcp_component.tcp_listen_sd == sd) { + mca_oob_tcp_accept(); + return; + } + ompi_event_del((ompi_event_t*)user); + free(user); + + /* recv the process identifier */ + rc = recv(sd, &name, sizeof(name), 0); + if(rc != sizeof(name)) { + ompi_output(0, "mca_oob_tcp_recv_handler: recv() return value %d != %d, errno = %d", + rc, sizeof(name), errno); + close(sd); + return; + } + OMPI_PROCESS_NAME_NTOH(name); + + /* now set socket up to be non-blocking */ + if((flags = fcntl(sd, F_GETFL, 0)) < 0) { + ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_GETFL) failed with errno=%d", errno); + } else { + flags |= O_NONBLOCK; + if(fcntl(sd, F_SETFL, flags) < 0) { + ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_SETFL) failed with errno=%d", errno); + } + } + + /* lookup the corresponding process */ + peer = mca_oob_tcp_peer_lookup(&name, true); + if(NULL == peer) { + ompi_output(0, "mca_oob_tcp_recv_handler: unable to locate peer"); + close(sd); + return; + } + + /* is the peer instance willing to accept this connection */ + if(mca_oob_tcp_peer_accept(peer, sd) == false) { + close(sd); + return; + } +} + +/* + * Module initialization. + * (1) initialize static resources + * (2) create listen socket + */ +mca_oob_t* mca_oob_tcp_init(bool *allow_multi_user_threads, bool *have_hidden_threads) +{ + /* initialize data structures */ + ompi_rb_tree_init(&mca_oob_tcp_component.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)ompi_process_name_compare); + + ompi_free_list_init(&mca_oob_tcp_component.tcp_peer_free, + sizeof(mca_oob_tcp_peer_t), + OBJ_CLASS(mca_oob_tcp_peer_t), + 8, /* initial number */ + mca_oob_tcp_component.tcp_peer_limit, /* maximum number */ + 8, /* increment to grow by */ + NULL); /* use default allocator */ + + ompi_free_list_init(&mca_oob_tcp_component.tcp_msgs, + sizeof(mca_oob_tcp_msg_t), + OBJ_CLASS(mca_oob_tcp_msg_t), + 8, /* initial number */ + -1, /* maximum number */ + 8, /* increment to grow by */ + NULL); /* use default allocator */ + +#if 0 + /* intialize event library */ + if(ompi_event_init() != OMPI_SUCCESS) { + ompi_output(0, "mca_oob_tcp_init: unable to initialize event library\n"); + return NULL; + } + + /* create a listen socket */ + if(mca_oob_tcp_create_listen() != OMPI_SUCCESS) { + ompi_output(0, "mca_oob_tcp_init: unable to create listen socket\n"); + return NULL; + } + return &mca_oob_tcp; +#else + return NULL; +#endif +} + + +/* + * Module cleanup. + */ int mca_oob_tcp_finalize(void) { /* probably want to try to finish all sends and recieves here diff --git a/src/mca/oob/tcp/oob_tcp.h b/src/mca/oob/tcp/oob_tcp.h index 4c30d38acf..af395cef1e 100644 --- a/src/mca/oob/tcp/oob_tcp.h +++ b/src/mca/oob/tcp/oob_tcp.h @@ -29,22 +29,78 @@ extern "C" { */ int mca_oob_tcp_open(void); int mca_oob_tcp_close(void); -struct mca_oob_base_module_1_0_0_t* mca_oob_tcp_init(bool *allow_multi_user_threads, - bool *have_hidden_threads); +mca_oob_t* mca_oob_tcp_init(bool *allow_multi_user_threads, bool *have_hidden_threads); int mca_oob_tcp_finalize(void); +/* + * Convert process name from network to host byte order. + * + * @param name + */ +#define OMPI_PROCESS_NAME_NTOH(n) \ + n.cellid = ntohl(n.cellid); \ + n.jobid = ntohl(n.jobid); \ + n.vpid = ntohl(n.vpid); + +/* + * Convert process name from host to network byte order. + * + * @param name + */ + +#define OMPI_PROCESS_NAME_HTON(n) \ + n.cellid = htonl(n.cellid); \ + n.jobid = htonl(n.jobid); \ + n.vpid = htonl(n.vpid); + + +/** +* Compare two process names for equality. +* +* @param n1 Process name 1. +* @param n2 Process name 2. +* @return (-1 for n1n2) +* +* Note that the definition of < or > is somewhat arbitrary - +* just needs to be consistently applied to maintain an ordering +* when process names are used as indices. +*/ + +static int ompi_process_name_compare(const ompi_process_name_t* n1, const ompi_process_name_t* n2) +{ + if(n1->cellid < n2->cellid) + return -1; + else if(n1->cellid > n2->cellid) + return 1; + else if(n1->jobid < n2->jobid) + return -1; + else if(n1->jobid > n2->jobid) + return 1; + else if(n1->vpid < n2->vpid) + return -1; + else if(n1->vpid > n2->vpid) + return 1; + return(0); +} + /** * Similiar to unix writev(2). * * @param peer (IN) Opaque name of peer process. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. + * @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) Currently unused. * @return OMPI error code (<0) on error number of bytes actually sent. */ -int mca_oob_tcp_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int flags); +int mca_oob_tcp_send( + const ompi_process_name_t* peer, + const struct iovec *msg, + int count, + int tag, + int flags); /** * Similiar to unix readv(2) @@ -52,12 +108,18 @@ int mca_oob_tcp_send(const ompi_process_name_t* peer, const struct iovec *msg, i * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. + * @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * iovec array without removing the message from the queue. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int count, int flags); +int mca_oob_tcp_recv( + ompi_process_name_t* peer, + const struct iovec *iov, + int count, + int tag, + int flags); /* @@ -70,6 +132,7 @@ int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int cou * @param peer (IN) Opaque name of peer process. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. + * @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) Currently unused. * @param cbfunc (IN) Callback function on send completion. * @param cbdata (IN) User data that is passed to callback function. @@ -77,8 +140,14 @@ int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int cou * */ -int mca_oob_tcp_send_nb(const ompi_process_name_t* peer, const struct iovec* iov, int count, - int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); +int mca_oob_tcp_send_nb( + const ompi_process_name_t* peer, + const struct iovec* iov, + int count, + int tag, + int flags, + mca_oob_callback_fn_t cbfunc, + void* cbdata); /** * Non-blocking version of mca_oob_recv(). @@ -86,14 +155,21 @@ int mca_oob_tcp_send_nb(const ompi_process_name_t* peer, const struct iovec* iov * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. + * @param tag (IN) User defined tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param cbfunc (IN) Callback function on recv completion. * @param cbdata (IN) User data that is passed to callback function. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* iov, int count, int flags, - mca_oob_callback_fn_t cbfunc, void* cbdata); +int mca_oob_tcp_recv_nb( + ompi_process_name_t* peer, + const struct iovec* iov, + int count, + int tag, + int flags, + mca_oob_callback_fn_t cbfunc, + void* cbdata); /** @@ -101,19 +177,20 @@ int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* iov, int */ struct mca_oob_tcp_component_t { mca_oob_base_component_1_0_0_t super; /**< base OOB component */ - int tcp_listen_sd; /**< listen socket for incoming connection requests */ + int tcp_listen_sd; /**< listen socket for incoming connection requests */ unsigned short tcp_listen_port; /**< listen port */ ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */ ompi_rb_tree_t tcp_peer_tree; /**< tree of peers sorted by name */ ompi_free_list_t tcp_peer_free; /**< free list of peers */ + size_t tcp_peer_limit; /**< max size of tcp peer cache */ + int tcp_peer_retries; /**< max number of retries before declaring peer gone */ ompi_free_list_t tcp_msgs; /**< free list of messages */ ompi_event_t tcp_send_event; /**< event structure for sends */ ompi_event_t tcp_recv_event; /**< event structure for recvs */ ompi_mutex_t tcp_lock; /**< lock for accessing module state */ - ompi_condition_t tcp_condition; /**< condition variable for blocking sends */ - size_t tcp_cache_size; /**< max size of tcp peer cache */ - ompi_list_t tcp_post_recv; /**< list of the recvs the user has posted */ + ompi_list_t tcp_msg_post; /**< list of recieves user has posted */ ompi_list_t tcp_msg_recv; /**< list of recieved messages */ + ompi_mutex_t tcp_match_lock; /**< lock held while searching/posting messages */ }; typedef struct mca_oob_tcp_component_t mca_oob_tcp_component_t; diff --git a/src/mca/oob/tcp/oob_tcp_hdr.h b/src/mca/oob/tcp/oob_tcp_hdr.h new file mode 100644 index 0000000000..03da288fd7 --- /dev/null +++ b/src/mca/oob/tcp/oob_tcp_hdr.h @@ -0,0 +1,30 @@ +/* + * $HEADER$ + */ +/** @file: + * + * Contains header used by tcp oob. + */ + +#ifndef _MCA_OOB_TCP_HDR_H_ +#define _MCA_OOB_TCP_HDR_H_ + +/* + * Header used by tcp oob protocol. + */ +struct mca_oob_tcp_hdr_t { + uint32_t msg_size; /**< the total size of the message body - excluding header */ + int32_t msg_tag; /**< user provided tag */ +}; +typedef struct mca_oob_tcp_hdr_t mca_oob_tcp_hdr_t; + +#define MCA_OOB_TCP_HDR_NTOHL(h) \ + ntohl(h->msg_size); \ + ntohl(h->msg_tag); + +#define MCA_OOB_TCP_HDR_HTONL(h) \ + htonl(h->msg_size); \ + htonl(h->msg_tag); + +#endif /* _MCA_OOB_TCP_MESSAGE_H_ */ + diff --git a/src/mca/oob/tcp/oob_tcp_msg.c b/src/mca/oob/tcp/oob_tcp_msg.c index 2ac9da91b6..e7b7977288 100644 --- a/src/mca/oob/tcp/oob_tcp_msg.c +++ b/src/mca/oob/tcp/oob_tcp_msg.c @@ -30,20 +30,24 @@ static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg) /** * Wait for a msg to complete. * @param msg (IN) Message to wait on. - * @param size (OUT) Number of bytes delivered. + * @param rc (OUT) Return code (number of bytes read on success or error code on failure). * @retval OMPI_SUCCESS or error code on failure. */ -int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size) +int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc) { - int rc = OMPI_SUCCESS; + /* wait for message to complete */ ompi_mutex_lock(&msg->msg_lock); - while(msg->msg_complete == false) + while(msg->msg_complete == false) { ompi_condition_wait(&msg->msg_condition, &msg->msg_lock); + } ompi_mutex_unlock(&msg->msg_lock); - *size = msg->msg_state; - MCA_OOB_TCP_MSG_RETURN(msg); - return rc; + + /* return status */ + if(NULL != rc) { + *rc = msg->msg_rc; + } + return OMPI_SUCCESS; } /** @@ -57,7 +61,7 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, ompi_process_name_t * peer) ompi_mutex_lock(&msg->msg_lock); msg->msg_complete = true; if(NULL != msg->msg_cbfunc) { - msg->msg_cbfunc(msg->msg_state, peer, msg->msg_iov, msg->msg_count, msg->msg_cbdata); + msg->msg_cbfunc(msg->msg_rc, peer, msg->msg_uiov, msg->msg_ucnt, ntohl(msg->msg_hdr.msg_tag), msg->msg_cbdata); ompi_mutex_unlock(&msg->msg_lock); MCA_OOB_TCP_MSG_RETURN(msg); } else { @@ -78,69 +82,59 @@ bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee { int rc; while(1) { - rc = writev(peer->peer_sd, msg->msg_rwptr, msg->msg_rwcnt); - if(rc <= 0) { + rc = writev(peer->peer_sd, msg->msg_rwptr, msg->msg_rwnum); + if(rc < 0) { if(errno == EINTR) continue; else if (errno == EAGAIN) return false; else { - close(peer->peer_sd); - peer->peer_state = MCA_OOB_TCP_CLOSED; + mca_oob_tcp_peer_close(peer); return false; } } - msg->msg_state += rc; - do {/* while there is still more iovects to write */ + + msg->msg_rc += rc; + do {/* while there is still more iovecs to write */ if(rc < msg->msg_rwptr->iov_len) { msg->msg_rwptr->iov_len -= rc; msg->msg_rwptr->iov_base = (void *) ((char *) msg->msg_rwptr->iov_base + rc); break; } else { rc -= msg->msg_rwptr->iov_len; - (msg->msg_rwcnt)--; + (msg->msg_rwnum)--; (msg->msg_rwptr)++; - if(0 == msg->msg_rwcnt) { - ompi_list_remove_item(&peer->peer_send_queue, (ompi_list_item_t *) msg); - mca_oob_tcp_msg_complete(msg, &peer->peer_name); + if(0 == msg->msg_rwnum) { return true; } } - } while(msg->msg_rwcnt); + } while(msg->msg_rwnum); } } /** - * Actually recieves the data - * + * Receives message data. * @param msg the message to be recieved into * @param peer the peer to recieve from - * @retval true if the whole message was recieved - * @retval false if the whole message was not recieved + * @retval true if the whole message was received + * @retval false if the whole message was not received */ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer) { int rc; - /* get the first sizeof(unint32_t) bytes of the message - * to either match this with a posted recieve, or to create - * message - * then use this information to allocate an array of size 2 - * of iovecs and a buffer for the second part large enough to hold the - * whole message */ while(1) { - rc = readv(peer->peer_sd, msg->msg_rwptr, msg->msg_rwcnt); + rc = readv(peer->peer_sd, msg->msg_rwptr, msg->msg_rwnum); if(rc <= 0) { if(errno == EINTR) continue; else if (errno == EAGAIN) return false; else { - close(peer->peer_sd); - peer->peer_state = MCA_OOB_TCP_CLOSED; + mca_oob_tcp_peer_close(peer); return false; } } - msg->msg_state += rc; + do { if(rc < msg->msg_rwptr->iov_len) { msg->msg_rwptr->iov_len -= rc; @@ -148,14 +142,114 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee break; } else { rc -= msg->msg_rwptr->iov_len; - (msg->msg_rwcnt)--; + (msg->msg_rwnum)--; (msg->msg_rwptr)++; - if(0 == msg->msg_rwcnt) { - mca_oob_tcp_msg_complete(msg, &peer->peer_name); + if(0 == msg->msg_rwnum) { return true; } } - } while(msg->msg_rwcnt); + } while(msg->msg_rwnum); } } +/** + * Called to copy the results of a message into user supplied iovec array. + * @param msg (IN) Message send that is in progress. + * @param iov (IN) Iovec array of user supplied buffers. + * @retval count Number of elements in iovec array. + */ + +int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int count) +{ + int i; + const struct iovec *src = msg->msg_rwiov; + const struct iovec *dst = iov; + unsigned char* src_ptr = (unsigned char*)src->iov_base; + size_t src_len = src->iov_len; + int src_cnt = 0; + int rc = 0; + + for(i=0; iiov_base; + size_t dst_len = dst->iov_len; + while(dst_len > 0) { + size_t len = (dst_len <= src_len) ? dst_len : src_len; + memcpy(dst_ptr, src_ptr, len); + rc += len; + dst_ptr += len; + dst_len -= len; + src_ptr += len; + src_len -= len; + if(src_len == 0) { + if(++src_cnt == count) + return rc; + src++; + src_ptr = src->iov_base; + src_len = src->iov_len; + } + } + dst++; + } + return rc; +} + +/** + * Match name to a message that has been received asynchronously (unexpected). + * + * @param name (IN) Name associated with peer or wildcard to match first posted recv. + * @return msg Matched message or NULL. + * + * Note - this routine requires the caller to be holding the module lock. + */ + +mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, int tag) +{ + mca_oob_tcp_msg_t* msg; + for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_recv); + msg != (mca_oob_tcp_msg_t*) ompi_list_get_end(&mca_oob_tcp_component.tcp_msg_recv); + msg = (mca_oob_tcp_msg_t*) ompi_list_get_next(msg)) { + + if((0 == ompi_process_name_compare(name,MCA_OOB_NAME_ANY) || + (0 == ompi_process_name_compare(name, &msg->msg_peer)))) { + if (tag == MCA_OOB_TAG_ANY || tag == msg->msg_hdr.msg_tag) { + return msg; + } + } + } + return NULL; +} + +/** + * Match name to a posted recv request. + * + * @param name (IN) Name associated with peer or wildcard to match first posted recv. + * @return msg Matched message or NULL. + * + * Note - this routine requires the caller to be holding the module lock. + */ + +mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(const ompi_process_name_t* name, int tag, bool peek) +{ + mca_oob_tcp_msg_t* msg; + for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_post); + msg != (mca_oob_tcp_msg_t*) ompi_list_get_end(&mca_oob_tcp_component.tcp_msg_post); + msg = (mca_oob_tcp_msg_t*) ompi_list_get_next(msg)) { + + if((0 == ompi_process_name_compare(&msg->msg_peer,MCA_OOB_NAME_ANY) || + (0 == ompi_process_name_compare(&msg->msg_peer,name)))) { + if (msg->msg_hdr.msg_tag == MCA_OOB_TAG_ANY || msg->msg_hdr.msg_tag == tag) { + if((msg->msg_flags & MCA_OOB_PEEK) == 0 || peek) { + ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_post, &msg->super); + return msg; + } else { + return NULL; + } + } + } + } + return NULL; +} + + + + diff --git a/src/mca/oob/tcp/oob_tcp_msg.h b/src/mca/oob/tcp/oob_tcp_msg.h index e19221ad3c..9166315aeb 100644 --- a/src/mca/oob/tcp/oob_tcp_msg.h +++ b/src/mca/oob/tcp/oob_tcp_msg.h @@ -10,29 +10,43 @@ #define _MCA_OOB_TCP_MESSAGE_H_ #include "class/ompi_list.h" -#include "mca/oob/tcp/oob_tcp_peer.h" #include "mca/oob/oob.h" +#include "oob_tcp_peer.h" +#include "oob_tcp_hdr.h" #include + struct mca_oob_tcp_peer_t; + +#define MCA_OOB_TCP_IOV_MAX 16 + +typedef enum { MCA_OOB_TCP_POSTED, MCA_OOB_TCP_UNEXPECTED } mca_oob_tcp_type_t; + + /** * describes each message being progressed. */ struct mca_oob_tcp_msg_t { - ompi_list_item_t super; /**< make it so we can put this on a list */ - int msg_state; /**< the amount sent or recieved or errno */ - uint32_t msg_size; /**< the total size of the message */ - const struct iovec * msg_user; /**< the data of the message */ - struct iovec * msg_iov; /**< copy of iovec array - not data */ - struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */ - int msg_rwcnt; /**< number of iovecs left for read/write */ - int msg_count; /**< the number of items in the iovec array */ - mca_oob_callback_fn_t msg_cbfunc; /**< the callback function for the send/recieve */ - void *msg_cbdata; /**< the data for the callback fnuction */ - bool msg_complete; /**< whether the message is done sending or not */ - ompi_process_name_t * msg_peer; /**< the name of the peer */ - ompi_mutex_t msg_lock; /**< lock for the condition variable */ - ompi_condition_t msg_condition; /**< the message condition */ + ompi_list_item_t super; /**< allow this item to be put on a list */ + mca_oob_tcp_type_t msg_type; /**< posted receive or unexpected */ + int msg_flags; /**< flags to send/recv */ + int msg_rc; /**< the return code for the send/recv (amount sent/recvd or errno) */ + mca_oob_tcp_hdr_t msg_hdr; /**< header used to convey message properties to peer */ + const struct iovec* msg_uiov; /**< the user supplied iovec array */ + int msg_ucnt; /**< the number of items in the user iovec array */ + struct iovec * msg_rwiov; /**< copy of iovec array - not data */ + struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */ + int msg_rwnum; /**< number of iovecs left for read/write */ + int msg_rwcnt; /**< total number of iovecs for read/write */ + void* msg_rwbuf; /**< optional buffer for send/recv */ + mca_oob_callback_fn_t msg_cbfunc; /**< the callback function for the send/receive */ + void * msg_cbdata; /**< the data for the callback fnuction */ + bool msg_complete; /**< whether the message is done sending or not */ + ompi_process_name_t msg_peer; /**< the name of the peer */ + ompi_mutex_t msg_lock; /**< lock for the condition variable */ + ompi_condition_t msg_condition; /**< condition variable for completion */ + struct iovec msg_iov[MCA_OOB_TCP_IOV_MAX]; /** preallocate space for iovec array */ }; + /** * Convenience typedef */ @@ -55,15 +69,18 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t); */ #define MCA_OOB_TCP_MSG_RETURN(msg) \ { \ - /* frees the iovec allocated during the send/recieve */ \ - if(NULL != msg->msg_iov) free(msg->msg_iov); \ + /* frees the iovec allocated during the send/receive */ \ + if(NULL != msg->msg_rwiov) \ + mca_oob_tcp_msg_iov_return(msg,msg->msg_rwiov); \ + if(NULL != msg->msg_rwbuf) \ + free(msg->msg_rwbuf); \ OMPI_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_msgs, (ompi_list_item_t*)msg); \ } /** * Wait for a msg to complete. - * @param msg (IN) Message to wait on. - * @param size (OUT) Number of bytes delivered. + * @param msg (IN) Message to wait on. + * @param size (OUT) Number of bytes delivered. * @retval OMPI_SUCCESS or error code on failure. */ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size); @@ -72,16 +89,25 @@ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size); * Signal that a message has completed. Wakes up any pending threads (for blocking send) * or invokes callbacks for non-blocking case. * @param msg (IN) Message send/recv that has completed. - * @param peer (IN) The peer the send/recieve was from + * @param peer (IN) The peer the send/receive was from * @retval OMPI_SUCCESS or error code on failure. */ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, ompi_process_name_t * peer); +/** + * Called to copy the results of a message into user supplied iovec array. + * @param msg (IN) Message send that is in progress. + * @param iov (IN) Iovec array of user supplied buffers. + * @retval count Number of elements in iovec array. + */ + +int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int count); + /** * Called asynchronously to progress sending a message from the event library thread. * @param msg (IN) Message send that is in progress. * @param sd (IN) Socket descriptor to use for send. - * @retval bool Bool flag indicating wether operation has completed. + * @retval Number of bytes copied. */ bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer); @@ -94,5 +120,63 @@ bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer); +/** + * Match name to a message that has been received asynchronously (unexpected). + * + * @param name (IN) Name associated with peer or wildcard to match first posted recv. + * @param tag (IN) Message tag. + * @return msg Matched message or NULL. + * + * Note - this routine requires the caller to be holding the module lock. + */ + +mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t*, int tag); + +/** + * Match name to a posted recv request. + * + * @param name (IN) Name associated with peer or wildcard to match first posted recv. + * @param tag (IN) Message tag. + * @param peek (IN) Match message with MCA_OOB_PEEK flag set. + * @return msg Matched message or NULL. + * + * Note - this routine requires the caller to be holding the module lock. + */ + +mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(const ompi_process_name_t*, int tag, bool peek); + +/** + * Allocate space for iovec array - if the request number of elements is less than + * MCA_OOB_TCP_IOV_MAX then use the array allocated along w/ the message - otherwise + * allocate count elements. + * + * @param msg (IN) Message to allocate array. + * @return Array of iovec elements. + * + */ + + +static inline struct iovec* mca_oob_tcp_msg_iov_alloc(mca_oob_tcp_msg_t* msg, int count) +{ + if(count <= MCA_OOB_TCP_IOV_MAX) + return msg->msg_iov; + return malloc(sizeof(struct iovec) * count); +} + + +/** + * Release resource held by iovec array if this is not part of the message. + * + * @param msg (IN) Message to allocate array. + * @param iov (IN) Iovec array to return. + * + */ + +static inline void mca_oob_tcp_msg_iov_return(mca_oob_tcp_msg_t* msg, struct iovec* iov) +{ + if(iov != msg->msg_iov) + free(iov); +} + #endif /* _MCA_OOB_TCP_MESSAGE_H_ */ diff --git a/src/mca/oob/tcp/oob_tcp_peer.c b/src/mca/oob/tcp/oob_tcp_peer.c index 308f5f55df..ba457a9fa1 100644 --- a/src/mca/oob/tcp/oob_tcp_peer.c +++ b/src/mca/oob/tcp/oob_tcp_peer.c @@ -12,7 +12,6 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer); static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer); -static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer); static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer); static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer); static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t* peer); @@ -82,7 +81,8 @@ static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer) } /* - * + * Initiate the appropriate action based on the state of the connection + * to the peer. * */ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg) @@ -154,11 +154,17 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bo return NULL; } + /* initialize peer state */ peer->peer_name = *name; + peer->peer_sd = -1; + peer->peer_state = MCA_OOB_TCP_CLOSED; + /****** * need to add the peer's address to the structure ******/ - + peer->peer_addr.sin_family = AF_INET; + peer->peer_addr.sin_port = htons(5000+peer->peer_name.vpid); + peer->peer_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, (ompi_process_name_t *) name, peer)) { @@ -168,10 +174,11 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bo } return NULL; } - ompi_list_prepend(&mca_oob_tcp_component.tcp_peer_list, (ompi_list_item_t *) peer); + /* if the peer list is over the maximum size, remove one unsed peer */ + ompi_list_prepend(&mca_oob_tcp_component.tcp_peer_list, (ompi_list_item_t *) peer); if(ompi_list_get_size(&mca_oob_tcp_component.tcp_peer_list) > - mca_oob_tcp_component.tcp_cache_size) { + mca_oob_tcp_component.tcp_peer_limit) { old = (mca_oob_tcp_peer_t *) ompi_list_get_last(&mca_oob_tcp_component.tcp_peer_list); while(1) { @@ -183,7 +190,7 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bo break; } else { old = (mca_oob_tcp_peer_t *) ompi_list_get_prev(old); - if(NULL == old) { + if(ompi_list_get_begin(&mca_oob_tcp_component.tcp_peer_list) == (ompi_list_item_t*)old) { /* we tried, but we couldn't find one that was valid to get rid * of. Oh well. */ break; @@ -208,7 +215,6 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bo static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer) { int rc,flags; - peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0); if (peer->peer_sd < 0) { peer->peer_retries++; @@ -250,6 +256,7 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer) return rc; } + /* * Check the status of the connection. If the connection failed, will retry * later. Otherwise, send this processes identifier to the peer on the @@ -259,21 +266,30 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer) { int so_error = 0; ompi_socklen_t so_length = sizeof(so_error); - + /* unregister from receiving event notifications */ ompi_event_del(&peer->peer_send_event); - + /* check connect completion status */ if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_ERROR, &so_error, &so_length) < 0) { - ompi_output(0, "mca_ptl_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n", errno); + ompi_output(0, "mca_oob_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n", errno); mca_oob_tcp_peer_close(peer); return; } if(so_error == EINPROGRESS) { ompi_event_add(&peer->peer_send_event, 0); return; - } - if(so_error != 0) { + } else if (so_error == ECONNREFUSED) { + if(peer->peer_retries++ > mca_oob_tcp_component.tcp_peer_retries) { + ompi_output(0, "mca_oob_tcp_peer_complete_connect: unable to contact peer after %d retries\n", peer->peer_retries); + mca_oob_tcp_peer_close(peer); + return; + } + mca_oob_tcp_peer_close(peer); + sleep(1); + mca_oob_tcp_peer_start_connect(peer); + return; + } else if(so_error != 0) { ompi_output(0, "mca_oob_tcp_peer_complete_connect: connect() failed with errno=%d\n", so_error); mca_oob_tcp_peer_close(peer); return; @@ -308,9 +324,10 @@ static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer) * and update the peer state to reflect the connection has * been closed. */ -static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer) +void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer) { - if(peer->peer_sd >= 0) { + if(peer->peer_state != MCA_OOB_TCP_CLOSED && + peer->peer_sd >= 0) { ompi_event_del(&peer->peer_recv_event); ompi_event_del(&peer->peer_send_event); close(peer->peer_sd); @@ -327,8 +344,9 @@ static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer) static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) { /* send process identifier to remote peer */ - if(mca_oob_tcp_peer_send_blocking( peer, &mca_oob_base_self, sizeof(mca_oob_base_self)) != - sizeof(mca_oob_base_self)) { + ompi_process_name_t guid = mca_oob_name_self; + OMPI_PROCESS_NAME_HTON(guid); + if(mca_oob_tcp_peer_send_blocking( peer, &guid, sizeof(guid)) != sizeof(guid)) { return OMPI_ERR_UNREACH; } return OMPI_SUCCESS; @@ -345,6 +363,7 @@ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer) if((mca_oob_tcp_peer_recv_blocking(peer, &guid, sizeof(ompi_process_name_t))) != sizeof(ompi_process_name_t)) { return OMPI_ERR_UNREACH; } + OMPI_PROCESS_NAME_NTOH(guid); /* compare this to the expected values */ if(memcmp(&peer->peer_name, &guid, sizeof(ompi_process_name_t)) != 0) { @@ -417,41 +436,159 @@ static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data, return cnt; } +/* + * Progress a completed recv: + * (1) signal a posted recv as complete + * (2) queue an unexpected message in the recv list + */ +static void mca_oob_tcp_peer_recv_progress(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t *msg) +{ + /* was this a posted recv? */ + if(msg->msg_type == MCA_OOB_TCP_POSTED) { + mca_oob_tcp_msg_complete(msg, &peer->peer_name); + } else { + /* if not attempt to match unexpected message to a posted recv */ + mca_oob_tcp_msg_t* post; + OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock); + post = mca_oob_tcp_msg_match_post(&peer->peer_name, msg->msg_hdr.msg_tag,true); + if(NULL != post) { + + /* copy msg data into posted recv */ + post->msg_rc = mca_oob_tcp_msg_copy(msg, post->msg_uiov, post->msg_ucnt); + if(post->msg_flags & MCA_OOB_TRUNC) { + int i, size = 0; + for(i=0; imsg_rwcnt; i++) + size += msg->msg_rwiov[i].iov_len; + post->msg_rc = size; + } + if(post->msg_flags & MCA_OOB_PEEK) { + /* will need message for actual receive */ + ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, &msg->super); + } else { + MCA_OOB_TCP_MSG_RETURN(msg); + } + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); + mca_oob_tcp_msg_complete(post, &peer->peer_name); + + } else { + ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t*)msg); + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); + } + } +} + + +/* + * Start receiving a new message. + * (1) receive header + * (2) attempt to match posted receives + * (3) if a posted receive is available - receive into users buffer + * (4) otherwise, allocate a new message and buffer for receive + */ +static void mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer) +{ + mca_oob_tcp_msg_t* msg; + mca_oob_tcp_hdr_t hdr; + uint32_t size; + + /* blocking receive of the message header */ + if(mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr)) + return; + size = ntohl(hdr.msg_size); + + /* attempt to match posted receive + * however - dont match message w/ peek attribute, as we need to + * queue the message anyway to match subsequent recv. + */ + OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock); + msg = mca_oob_tcp_msg_match_post(&peer->peer_name, hdr.msg_tag, false); + if(NULL != msg) { + uint32_t posted_size = 0; + int i; + + /* setup buffer for receive */ + for(i=0; imsg_ucnt; i++) + posted_size += msg->msg_uiov[i].iov_len; + + /* allocate an additional buffer to receive entire message */ + if(posted_size < size) { + uint32_t alloc_size = size - posted_size; + msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,msg->msg_ucnt+1); + memcpy(msg->msg_rwiov, msg->msg_uiov, msg->msg_ucnt * sizeof(struct iovec)); + msg->msg_rwbuf = malloc(alloc_size); + msg->msg_rwiov[msg->msg_ucnt].iov_base = msg->msg_rwbuf; + msg->msg_rwiov[msg->msg_ucnt].iov_len = alloc_size; + msg->msg_rwnum = msg->msg_ucnt+1; + } else { + msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,msg->msg_ucnt); + memcpy(msg->msg_rwiov, msg->msg_uiov, msg->msg_ucnt * sizeof(struct iovec)); + msg->msg_rwnum = msg->msg_ucnt; + } + + } else { + /* allocate a new message along with buffer */ + int rc; + MCA_OOB_TCP_MSG_ALLOC(msg, rc); + if(NULL == msg) { + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); + OMPI_THREAD_UNLOCK(&peer->peer_lock); + return; + } + msg->msg_type = MCA_OOB_TCP_UNEXPECTED; + msg->msg_rc = 0; + msg->msg_flags = 0; + msg->msg_peer = peer->peer_name; + msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,1); + msg->msg_rwbuf = malloc(size); + msg->msg_rwiov->iov_base = msg->msg_rwbuf; + msg->msg_rwiov->iov_len = size; + msg->msg_rwcnt = msg->msg_rwnum = 1; + } + + msg->msg_rwptr = msg->msg_rwiov; + msg->msg_hdr = hdr; + + /* if receive of message data completed - queue the receive message */ + if(mca_oob_tcp_msg_recv_handler(msg, peer)) { + mca_oob_tcp_peer_recv_progress(peer, msg); + } else { + /* continue processing until complete */ + peer->peer_recv_msg = msg; + } + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); +} + + +/* + * Dispatch to the appropriate action routine based on the state + * of the connection with the peer. + */ static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user) { mca_oob_tcp_peer_t* peer = user; OMPI_THREAD_LOCK(&peer->peer_lock); switch(peer->peer_state) { - case MCA_OOB_TCP_CONNECT_ACK: + case MCA_OOB_TCP_CONNECT_ACK: { - mca_oob_tcp_peer_recv_connect_ack(peer); - break; + mca_oob_tcp_peer_recv_connect_ack(peer); + break; } - case MCA_OOB_TCP_CONNECTED: + case MCA_OOB_TCP_CONNECTED: { - mca_oob_tcp_msg_t* msg = peer->peer_recv_msg; - if(NULL == msg) { - int rc; - MCA_OOB_TCP_MSG_ALLOC(msg, rc); - if(NULL == msg) { - OMPI_THREAD_UNLOCK(&peer->peer_lock); - return; + if(NULL == peer->peer_recv_msg) { + mca_oob_tcp_peer_recv_start(peer); + } else if (mca_oob_tcp_msg_recv_handler(peer->peer_recv_msg, peer)) { + mca_oob_tcp_peer_recv_progress(peer, peer->peer_recv_msg); + peer->peer_recv_msg = NULL; } + break; } - - /* check for completion of non-blocking recv on the current fragment */ - if(mca_oob_tcp_msg_recv_handler(msg, peer) == false) - peer->peer_recv_msg = msg; - else - peer->peer_recv_msg = 0; - break; - } - default: + default: { - ompi_output(0, "mca_oob_tcp_peer_recv_handler: invalid socket state(%d)", peer->peer_state); - mca_oob_tcp_peer_close(peer); - break; + ompi_output(0, "mca_oob_tcp_peer_recv_handler: invalid socket state(%d)", peer->peer_state); + mca_oob_tcp_peer_close(peer); + break; } } OMPI_THREAD_UNLOCK(&peer->peer_lock); @@ -471,14 +608,16 @@ static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user) break; case MCA_OOB_TCP_CONNECTED: { - /* complete the current send */ do { + /* complete the current send */ mca_oob_tcp_msg_t* msg = peer->peer_send_msg; - if(mca_oob_tcp_msg_send_handler(msg, peer) == false) { + if(mca_oob_tcp_msg_send_handler(msg, peer)) { + mca_oob_tcp_msg_complete(msg, &peer->peer_name); + } else { break; } - /* if required - update request status and release fragment */ - /* progress any pending sends */ + + /* if current completed - progress any pending sends */ peer->peer_send_msg = (mca_oob_tcp_msg_t*) ompi_list_remove_first(&peer->peer_send_queue); } while (NULL != peer->peer_send_msg); @@ -551,3 +690,36 @@ static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg) ompi_output(0, buff); } + +/* + * Accept incoming connection - if not already connected. + */ + +bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd) +{ + OMPI_THREAD_LOCK(&peer->peer_lock); + if ((peer->peer_state == MCA_OOB_TCP_CLOSED) || + (peer->peer_state != MCA_OOB_TCP_CONNECTED && + ompi_process_name_compare(&peer->peer_name, MCA_OOB_NAME_SELF) < 0)) { + mca_oob_tcp_peer_close(peer); + peer->peer_sd = sd; + mca_oob_tcp_peer_event_init(peer); + + if(mca_oob_tcp_peer_send_connect_ack(peer) != OMPI_SUCCESS) { + mca_oob_tcp_peer_close(peer); + OMPI_THREAD_UNLOCK(&peer->peer_lock); + return false; + } + ompi_event_add(&peer->peer_recv_event, 0); + mca_oob_tcp_peer_connected(peer); +#if OMPI_ENABLE_DEBUG + mca_oob_tcp_peer_dump(peer, "accepted"); +#endif + OMPI_THREAD_UNLOCK(&peer->peer_lock); + return true; + } + OMPI_THREAD_UNLOCK(&peer->peer_lock); + return false; +} + + diff --git a/src/mca/oob/tcp/oob_tcp_peer.h b/src/mca/oob/tcp/oob_tcp_peer.h index 0bbd803dfd..e6cebe307a 100644 --- a/src/mca/oob/tcp/oob_tcp_peer.h +++ b/src/mca/oob/tcp/oob_tcp_peer.h @@ -30,7 +30,7 @@ typedef enum { /** - * This structire describes a peer + * This structure describes a peer */ struct mca_oob_tcp_peer_t { ompi_list_item_t super; /**< allow this to be on a list */ @@ -41,13 +41,19 @@ struct mca_oob_tcp_peer_t { int peer_sd; /**< socket descriptor of the connection */ ompi_event_t peer_send_event; /**< registration with event thread for send events */ ompi_event_t peer_recv_event; /**< registration with event thread for recv events */ - ompi_mutex_t peer_lock; /**< make sure only one thread accesses critical data structures */ + ompi_mutex_t peer_lock; /**< protect critical data structures */ ompi_list_t peer_send_queue; /**< list of messages to send */ mca_oob_tcp_msg_t *peer_send_msg; /**< current send in progress */ mca_oob_tcp_msg_t *peer_recv_msg; /**< current recv in progress */ }; typedef struct mca_oob_tcp_peer_t mca_oob_tcp_peer_t; +/* + * Class declaration. + */ + +OBJ_CLASS_DECLARATION(mca_oob_tcp_peer_t); + /** * Get a new peer data structure */ @@ -92,6 +98,22 @@ mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(const ompi_process_name_t* peer_name */ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg); +/** + * Connection request for this peer. Check the status of our connection + * before accepting the peers. + * + * @param peer The peer process. + * @param sd Incoming connection request. + */ +bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd); + +/** + * Cleanup/close the connection to the peer. + * + * @param peer The peer process. + */ +void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer); + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/src/mca/oob/tcp/oob_tcp_recv.c b/src/mca/oob/tcp/oob_tcp_recv.c index 0f1e2a544a..4340ac3aaa 100644 --- a/src/mca/oob/tcp/oob_tcp_recv.c +++ b/src/mca/oob/tcp/oob_tcp_recv.c @@ -7,89 +7,79 @@ * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param count (IN) Number of elements in iovec array. + * @param tag (IN) User supplied tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * iovec array without removing the message from the queue. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int count, int flags) +int mca_oob_tcp_recv( + ompi_process_name_t* peer, + const struct iovec *iov, + int count, + int tag, + int flags) { - mca_oob_tcp_msg_t * msg = (mca_oob_tcp_msg_t *) - ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_recv); - int i, amount, read, size = 0; - char * base; + mca_oob_tcp_msg_t *msg; + int i, rc, size = 0; + + /* lock the tcp struct */ + OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock); + + /* check to see if a matching receive is on the list */ + msg = mca_oob_tcp_msg_match_recv(peer, tag); + if(NULL != msg) { + + /* if we are just doing peek, return bytes without dequeing message */ + if(msg->msg_rc < 0) + return msg->msg_rc; + + rc = mca_oob_tcp_msg_copy(msg, iov, count); + if(rc >= 0 && MCA_OOB_TRUNC & flags) { + rc = 0; + for(i=0; imsg_rwcnt; i++) + rc += msg->msg_rwiov[i].iov_len; + } + if(MCA_OOB_PEEK & flags) { + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); + return rc; + } + + /* otherwise dequeue the message and return to free list */ + ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg); + MCA_OOB_TCP_MSG_RETURN(msg); + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); + return rc; + } + + /* the message has not already been received. So we add it to the receive queue */ + MCA_OOB_TCP_MSG_ALLOC(msg, rc); + if(NULL == msg) { + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); + return rc; + } + + /* determine overall size of user supplied buffer */ for(i = 0; i < count; i++) { size += iov[i].iov_len; } - /* lock the tcp struct */ - OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); - /* check to see if a matching recieve is on the list */ - while(NULL != msg) { - if(MCA_OOB_BASE_ANY == peer || - (0 == memcmp(peer, &msg->msg_peer, sizeof(ompi_process_name_t)))) { - /* if we are just doing peek, all we need to do is match the peer name, - * nothing else. */ - if(MCA_OOB_PEEK & flags) { - base = (char *) msg->msg_iov[1].iov_base; - size = msg->msg_iov[i].iov_len; - read = 0; - for(i = 0; i < count; i++) { - amount = ((iov[i].iov_len < size) ? iov[i].iov_len : size); - memcpy(iov[i].iov_base, base, amount); - size -= amount; - base +=amount; - read += amount; - } - OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); - return read; - } - /* what are we going to define as matching? we don't know the number - * of iovecs we got off the network, we only know the total size - * For now, we wil just use the size == the size we have available */ - if(size == msg->msg_size) { - /* match */ - base = (char *) msg->msg_iov[1].iov_base; - size = msg->msg_iov[i].iov_len; - read = 0; - for(i = 0; i < count; i++) { - memcpy(iov[i].iov_base, base, iov[i].iov_len); - base +=amount; - read += amount; - } - ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, - (ompi_list_item_t *) msg); - MCA_OOB_TCP_MSG_RETURN(msg); - OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); - return read; - } - msg = (mca_oob_tcp_msg_t *) ompi_list_get_next(msg); - } - } - /* the message has not already been recieved. So we add it to the recieve queue */ - MCA_OOB_TCP_MSG_ALLOC(msg, i); - if(NULL == msg) { - OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); - return i; - } + /* fill in the struct */ - msg->msg_state = 0; - msg->msg_size = size; - msg->msg_user = iov; - msg->msg_iov = NULL; - msg->msg_rwcnt = msg->msg_count = count; + msg->msg_type = MCA_OOB_TCP_POSTED; + msg->msg_rc = 0; + msg->msg_flags = flags; + msg->msg_uiov = iov; + msg->msg_ucnt = count; msg->msg_cbfunc = NULL; + msg->msg_cbdata = NULL; msg->msg_complete = false; - if(0 == memcmp(MCA_OOB_BASE_ANY, peer, sizeof(ompi_process_name_t))) { - msg->msg_peer = MCA_OOB_BASE_ANY; - } else { - mca_oob_tcp_peer_t * other = mca_oob_tcp_peer_lookup(peer, false); - msg->msg_peer = &other->peer_name; - } - /* add to list */ - ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg); + msg->msg_peer = *peer; + ompi_list_append(&mca_oob_tcp_component.tcp_msg_post, (ompi_list_item_t *) msg); OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); - /* wait for the recieve to complete */ - mca_oob_tcp_msg_wait(msg, &read); - return read; + + /* wait for the receive to complete */ + mca_oob_tcp_msg_wait(msg, &rc); + MCA_OOB_TCP_MSG_RETURN(msg); + return rc; } /* @@ -98,91 +88,79 @@ int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int cou * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param msg (IN) Array of iovecs describing user buffers and lengths. * @param count (IN) Number of elements in iovec array. + * @param tag (IN) User supplied tag for matching send/recv. * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param cbfunc (IN) Callback function on recv completion. * @param cbdata (IN) User data that is passed to callback function. - * @return OMPI error code (<0) on error or number of bytes actually received. + * @return OMPI error code (<0) on error. */ -int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* iov, int count, - int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) +int mca_oob_tcp_recv_nb( + ompi_process_name_t* peer, + const struct iovec* iov, + int count, + int tag, + int flags, + mca_oob_callback_fn_t cbfunc, + void* cbdata) { - mca_oob_tcp_msg_t * msg = (mca_oob_tcp_msg_t *) - ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_recv); - int i, amount, read, size = 0; - char * base; + mca_oob_tcp_msg_t *msg; + int i, rc, size = 0; + + /* lock the tcp struct */ + OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); + + /* check to see if a matching receive is on the list */ + msg = mca_oob_tcp_msg_match_recv(peer, tag); + if(NULL != msg) { + + if(msg->msg_rc < 0) + return msg->msg_rc; + + /* if we are just doing peek, return bytes without dequeing message */ + rc = mca_oob_tcp_msg_copy(msg, iov, count); + if(rc >= 0 && MCA_OOB_TRUNC & flags) { + rc = 0; + for(i=0; imsg_rwcnt; i++) + rc += msg->msg_rwiov[i].iov_len; + } + if(MCA_OOB_PEEK & flags) { + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); + cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata); + return 0; + } + + /* otherwise dequeue the message and return to free list */ + ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg); + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); + cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata); + MCA_OOB_TCP_MSG_RETURN(msg); + return 0; + } + + /* the message has not already been received. So we add it to the receive queue */ + MCA_OOB_TCP_MSG_ALLOC(msg, rc); + if(NULL == msg) { + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); + return rc; + } + + /* determine overall size of user supplied buffer */ for(i = 0; i < count; i++) { size += iov[i].iov_len; } - /* lock the tcp struct */ - OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock); - /* check to see if a matching recieve is on the list */ - while(NULL != msg) { - if(MCA_OOB_BASE_ANY == peer || - (0 == memcmp(peer, &msg->msg_peer, sizeof(ompi_process_name_t)))) { - /* if we are just doing peek, all we need to do is match the peer name, - * nothing else. */ - if(MCA_OOB_PEEK & flags) { - base = (char *) msg->msg_iov[1].iov_base; - size = msg->msg_iov[i].iov_len; - read = 0; - for(i = 0; i < count; i++) { - amount = ((iov[i].iov_len < size) ? iov[i].iov_len : size); - memcpy(iov[i].iov_base, base, amount); - size -= amount; - base +=amount; - read += amount; - } - cbfunc(read, peer, iov, count, cbdata); - OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); - return read; - } - /* what are we going to define as matching? we don't know the number - * of iovecs we got off the network, we only know the total size - * For now, we wil just use the size == the size we have available */ - if(size == msg->msg_size) { - /* match */ - base = (char *) msg->msg_iov[1].iov_base; - size = msg->msg_iov[i].iov_len; - read = 0; - for(i = 0; i < count; i++) { - memcpy(iov[i].iov_base, base, iov[i].iov_len); - base +=amount; - read += amount; - } - ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, - (ompi_list_item_t *) msg); - MCA_OOB_TCP_MSG_RETURN(msg); - OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); - cbfunc(read, peer, iov, count, cbdata); - return read; - } - msg = (mca_oob_tcp_msg_t *) ompi_list_get_next(msg); - } - } - /* the message has not already been recieved. So we add it to the recieve queue */ - MCA_OOB_TCP_MSG_ALLOC(msg, i); - if(NULL == msg) { - OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); - return i; - } + /* fill in the struct */ - msg->msg_state = 0; - msg->msg_size = size; - msg->msg_user = iov; - msg->msg_iov = NULL; - msg->msg_rwcnt = msg->msg_count = count; + msg->msg_type = MCA_OOB_TCP_POSTED; + msg->msg_rc = 0; + msg->msg_flags = flags; + msg->msg_uiov = iov; + msg->msg_ucnt = count; msg->msg_cbfunc = cbfunc; msg->msg_cbdata = cbdata; - if(0 == memcmp(MCA_OOB_BASE_ANY, peer, sizeof(ompi_process_name_t))) { - msg->msg_peer = MCA_OOB_BASE_ANY; - } else { - mca_oob_tcp_peer_t * other = mca_oob_tcp_peer_lookup(peer, false); - msg->msg_peer = &other->peer_name; - } msg->msg_complete = false; - /* add to list */ - ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg); + msg->msg_peer = *peer; + ompi_list_append(&mca_oob_tcp_component.tcp_msg_post, (ompi_list_item_t *) msg); OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); - return OMPI_SUCCESS; + return 0; } diff --git a/src/mca/oob/tcp/oob_tcp_send.c b/src/mca/oob/tcp/oob_tcp_send.c index 97295c9126..80e9f073a2 100644 --- a/src/mca/oob/tcp/oob_tcp_send.c +++ b/src/mca/oob/tcp/oob_tcp_send.c @@ -10,11 +10,18 @@ * @return OMPI error code (<0) on error number of bytes actually sent. */ -int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, int count, int flags) +int mca_oob_tcp_send( + const ompi_process_name_t* name, + const struct iovec *iov, + int count, + int tag, + int flags) { mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name, true); mca_oob_tcp_msg_t* msg; - int rc, sent; + int size; + int rc; + if(NULL == peer) return OMPI_ERR_UNREACH; @@ -23,25 +30,32 @@ int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, i return rc; /* calculate the size of the message */ - msg->msg_size = sizeof(uint32_t); + size = 0; for(rc = 0; rc < count; rc++) { - msg->msg_size += iov[rc].iov_len; + size += iov[rc].iov_len; } + /* turn the size to network byte order so there will be no problems */ - msg->msg_size = htonl(msg->msg_size); - msg->msg_user = iov; - /* create one additional iovect that will hold the size of the message */ - msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*(count + 1)); - msg->msg_iov[0].iov_base = &msg->msg_size; - msg->msg_iov[0].iov_len = sizeof(uint32_t); - msg->msg_rwptr = msg->msg_iov; - msg->msg_count = msg->msg_rwcnt = count + 1; - memcpy(msg->msg_iov, &(msg->msg_user[1]), sizeof(struct iovec)*count); + msg->msg_hdr.msg_size = htonl(size); + msg->msg_hdr.msg_tag = htonl(tag); + + /* create one additional iovect that will hold the header */ + msg->msg_type = MCA_OOB_TCP_POSTED; + msg->msg_rc = 0; + msg->msg_flags = flags; + msg->msg_uiov = iov; + msg->msg_ucnt = count; + msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg, count+1); + msg->msg_rwiov[0].iov_base = &msg->msg_hdr; + msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr); + msg->msg_rwptr = msg->msg_rwiov; + msg->msg_rwcnt = msg->msg_rwnum = count + 1; + memcpy(msg->msg_rwiov+1, msg->msg_uiov, sizeof(struct iovec)*msg->msg_ucnt); + msg->msg_rwbuf = NULL; msg->msg_cbfunc = NULL; msg->msg_cbdata = NULL; msg->msg_complete = false; - msg->msg_peer = &peer->peer_name; - msg->msg_state = 0; + msg->msg_peer = peer->peer_name; rc = mca_oob_tcp_peer_send(peer, msg); if(rc != OMPI_SUCCESS) { @@ -49,10 +63,12 @@ int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, i return rc; } - rc = mca_oob_tcp_msg_wait(msg, &sent); + rc = mca_oob_tcp_msg_wait(msg, &size); + MCA_OOB_TCP_MSG_RETURN(msg); if(rc != OMPI_SUCCESS) return rc; - return sent; + size -= sizeof(mca_oob_tcp_hdr_t); + return size; } /* @@ -72,13 +88,16 @@ int mca_oob_tcp_send_nb( const ompi_process_name_t* name, const struct iovec* iov, int count, + int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name, true); mca_oob_tcp_msg_t* msg; + int size; int rc; + if(NULL == peer) return OMPI_ERR_UNREACH; @@ -87,26 +106,31 @@ int mca_oob_tcp_send_nb( return rc; /* calculate the size of the message */ - msg->msg_size = sizeof(size_t); + size = 0; for(rc = 0; rc < count; rc++) { - msg->msg_size += iov[rc].iov_len; + size += iov[rc].iov_len; } /* turn the size to network byte order so there will be no problems */ - msg->msg_size = htonl(msg->msg_size); + msg->msg_hdr.msg_size = htonl(size); + msg->msg_hdr.msg_tag = htonl(tag); - msg->msg_user = iov; /* create one additional iovect that will hold the size of the message */ - msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*(count + 1)); - msg->msg_iov[0].iov_base = &msg->msg_size; - msg->msg_iov[0].iov_len = sizeof(size_t); - msg->msg_rwptr = msg->msg_iov; - msg->msg_count = msg->msg_rwcnt = count + 1; - memcpy(msg->msg_iov, &(msg->msg_user[1]), sizeof(struct iovec)*count); + msg->msg_type = MCA_OOB_TCP_POSTED; + msg->msg_rc = 0; + msg->msg_flags = flags; + msg->msg_uiov = iov; + msg->msg_ucnt = count; + msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,count+1); + msg->msg_rwiov[0].iov_base = &msg->msg_hdr; + msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr); + msg->msg_rwptr = msg->msg_rwiov; + msg->msg_rwcnt = msg->msg_rwnum = count + 1; + memcpy(msg->msg_rwiov+1, msg->msg_uiov, sizeof(struct iovec)*msg->msg_ucnt); + msg->msg_rwbuf = NULL; msg->msg_cbfunc = cbfunc; msg->msg_cbdata = cbdata; msg->msg_complete = false; - msg->msg_peer = &peer->peer_name; - msg->msg_state = 0; + msg->msg_peer = peer->peer_name; rc = mca_oob_tcp_peer_send(peer, msg); if(rc != OMPI_SUCCESS) { diff --git a/src/mca/ptl/ptl.h b/src/mca/ptl/ptl.h index d61c8e80cb..f609c7b109 100644 --- a/src/mca/ptl/ptl.h +++ b/src/mca/ptl/ptl.h @@ -328,16 +328,20 @@ typedef int (*mca_ptl_base_module_del_procs_fn_t)( * PML->PTL Initialize a send request for use by the PTL. * * @param ptl (IN) PTL instance - * @param request (OUT) Pointer to allocated request. + * @param request (IN) Pointer to allocated request. * * To reduce latency (number of required allocations), the PML allocates additional * space along w/ each request - that may be used by the PTL for additional control - * information (e.g. first fragment descriptor). + * information (e.g. first fragment descriptor). If the PTL intends to use this space + * the ptl_cache_bytes attributes should be set to reflect the number of bytes needed + * by the PTL on a per-request basis. This space is allocated contiguously along with + * the mca_pml_base_send_request_t, w/ the space available to the PTL immediately + * following the request. * * The init function is called the first time the request is ued by the PTL. On * completion of the request - the PML will cache the request for later use by the * same PTL. When the request is re-used from the cache, the init function is NOT - * called for subsequent sends. + * called for subsequent sends. */ typedef int (*mca_ptl_base_module_request_init_fn_t)( struct mca_ptl_base_module_t* ptl, @@ -350,12 +354,12 @@ typedef int (*mca_ptl_base_module_request_init_fn_t)( * request by the PTL. * * @param ptl (IN) PTL instance - * @param request (OUT) Pointer to allocated request. + * @param request (IN) Pointer to allocated request. * * The fini function is called when the PML removes a request from the PTLs - * cache (due to resource constraints) or reaching cache limit, prior to re-using - * the request for another PTL. This provides the PTL the chance to cleanup/release - * any resources cached on the send descriptor by the PTL. + * cache (due to resource constraints) or the cache limit has been reached, prior + * to re-using the request for another PTL. This provides the PTL the chance to + * cleanup/release any resources cached on the send descriptor by the PTL. */ typedef void (*mca_ptl_base_module_request_fini_fn_t)( diff --git a/src/threads/condition_spinlock.h b/src/threads/condition_spinlock.h index 0fcdf2b622..afb731e08e 100644 --- a/src/threads/condition_spinlock.h +++ b/src/threads/condition_spinlock.h @@ -30,7 +30,9 @@ static inline int ompi_condition_wait(ompi_condition_t* c, ompi_mutex_t* m) } } else { while(c->c_signaled == 0) { + ompi_mutex_unlock(m); ompi_progress(); + ompi_mutex_lock(m); } } c->c_signaled--;