1
1

- changed oob i/f functions to accept tag parameter

- tcp implementation in progress
- changed calls to oob to reflect additional parameter

This commit was SVN r1839.
Этот коммит содержится в:
Tim Woodall 2004-08-02 21:24:00 +00:00
родитель 9f15b8c553
Коммит 9280e182f5
26 изменённых файлов: 1263 добавлений и 479 удалений

Просмотреть файл

@ -7,6 +7,8 @@
#include "ompi_config.h" #include "ompi_config.h"
#include "class/ompi_list.h" #include "class/ompi_list.h"
#include "threads/thread.h"
#include "threads/condition.h"
#include "include/constants.h" #include "include/constants.h"
#include "mca/mpool/mpool.h" #include "mca/mpool/mpool.h"
@ -20,16 +22,30 @@ struct ompi_free_list_t
int fl_max_to_alloc; int fl_max_to_alloc;
int fl_num_allocated; int fl_num_allocated;
int fl_num_per_alloc; int fl_num_per_alloc;
int fl_num_waiting;
size_t fl_elem_size; size_t fl_elem_size;
ompi_class_t* fl_elem_class; ompi_class_t* fl_elem_class;
mca_mpool_base_module_t* fl_mpool; mca_mpool_base_module_t* fl_mpool;
ompi_mutex_t fl_lock; ompi_mutex_t fl_lock;
ompi_condition_t fl_condition;
}; };
typedef struct ompi_free_list_t ompi_free_list_t; typedef struct ompi_free_list_t ompi_free_list_t;
/**
* Initialize a free list.
*
* @param free_list (IN) Free list.
* @param element_size (IN) Size of each element.
* @param element_class (IN) ompi_class_t of element - used to initialize allocated elements.
* @param num_elements_to_alloc Initial number of elements to allocate.
* @param max_elements_to_alloc Maximum number of elements to allocate.
* @param num_elements_per_alloc Number of elements to grow by per allocation.
* @param mpool Optional memory pool for allocation.s
*/
int ompi_free_list_init( int ompi_free_list_init(
ompi_free_list_t *flist, ompi_free_list_t *free_list,
size_t element_size, size_t element_size,
ompi_class_t* element_class, ompi_class_t* element_class,
int num_elements_to_alloc, int num_elements_to_alloc,
@ -39,7 +55,19 @@ int ompi_free_list_init(
int ompi_free_list_grow(ompi_free_list_t* flist, size_t num_elements); int ompi_free_list_grow(ompi_free_list_t* flist, size_t num_elements);
/**
* Attemp to obtain an item from a free list.
*
* @param fl (IN) Free list.
* @param item (OUT) Allocated item.
* @param rc (OUT) OMPI_SUCCESS or error status on failure.
*
* If the requested item is not available the free list is grown to
* accomodate the request - unless the max number of allocations has
* been reached. If this is the case - an out of resource error is
* returned to the caller.
*/
#define OMPI_FREE_LIST_GET(fl, item, rc) \ #define OMPI_FREE_LIST_GET(fl, item, rc) \
{ \ { \
if(ompi_using_threads()) { \ if(ompi_using_threads()) { \
@ -60,30 +88,56 @@ int ompi_free_list_grow(ompi_free_list_t* flist, size_t num_elements);
rc = (NULL == item) ? OMPI_ERR_TEMP_OUT_OF_RESOURCE : OMPI_SUCCESS; \ rc = (NULL == item) ? OMPI_ERR_TEMP_OUT_OF_RESOURCE : OMPI_SUCCESS; \
} }
/**
* Blocking call to obtain an item from a free list.
*
* @param fl (IN) Free list.
* @param item (OUT) Allocated item.
* @param rc (OUT) OMPI_SUCCESS or error status on failure.
*
* If the requested item is not available the free list is grown to
* accomodate the request - unless the max number of allocations has
* been reached. In this case the caller is blocked until an item
* is returned to the list.
*/
#define OMPI_FREE_LIST_WAIT(fl, item, rc) \ #define OMPI_FREE_LIST_WAIT(fl, item, rc) \
{ \ { \
if(ompi_using_threads()) { \ OMPI_THREAD_LOCK(&((fl)->fl_lock)); \
ompi_mutex_lock(&((fl)->fl_lock)); \ item = ompi_list_remove_first(&((fl)->super)); \
item = ompi_list_remove_first(&((fl)->super)); \ while(NULL == item) { \
if(NULL == item) { \ if((fl)->fl_max_to_alloc > (fl)->fl_num_allocated) { \
ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \ (fl)->fl_num_waiting++; \
item = ompi_list_remove_first(&((fl)->super)); \ ompi_condition_wait(&((fl)->fl_condition), &((fl)->fl_lock)); \
} \ (fl)->fl_num_waiting--; \
ompi_mutex_unlock(&((fl)->fl_lock)); \ } else { \
} else { \ ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \
item = ompi_list_remove_first(&((fl)->super)); \ } \
if(NULL == item) { \ item = ompi_list_remove_first(&((fl)->super)); \
ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \ } \
item = ompi_list_remove_first(&((fl)->super)); \ OMPI_THREAD_UNLOCK(&((fl)->fl_lock)); \
} \ rc = (NULL == item) ? OMPI_ERR_OUT_OF_RESOURCE : OMPI_SUCCESS; \
} \
rc = (NULL == item) ? OMPI_ERR_TEMP_OUT_OF_RESOURCE : OMPI_SUCCESS; \
} }
#define OMPI_FREE_LIST_RETURN(fl, item) \ /**
THREAD_SCOPED_LOCK(&((fl)->fl_lock), ompi_list_prepend(&((fl)->super), (item))); * Return an item to a free list.
*
* @param fl (IN) Free list.
* @param item (OUT) Allocated item.
*
*/
#define OMPI_FREE_LIST_RETURN(fl, item) \
{ \
OMPI_THREAD_LOCK(&(fl)->fl_lock); \
ompi_list_prepend(&((fl)->super), (item)); \
if((fl)->fl_num_waiting > 0) { \
ompi_condition_signal(&((fl)->fl_condition)); \
} \
OMPI_THREAD_UNLOCK(&(fl)->fl_lock); \
}
#endif #endif

Просмотреть файл

@ -125,8 +125,10 @@ static mca_base_modex_module_t* mca_base_modex_create_module(
* during mca_base_modex_exchange(). * during mca_base_modex_exchange().
*/ */
int mca_base_modex_send(mca_base_component_t *source_component, int mca_base_modex_send(
const void *buffer, size_t size) mca_base_component_t *source_component,
const void *buffer,
size_t size)
{ {
ompi_proc_t *self = ompi_proc_local(); ompi_proc_t *self = ompi_proc_local();
mca_base_modex_t* modex; mca_base_modex_t* modex;
@ -243,7 +245,7 @@ int mca_base_modex_exchange(void)
iov.iov_base = self_module->module_data; iov.iov_base = self_module->module_data;
iov.iov_len = self_module->module_data_size; iov.iov_len = self_module->module_data_size;
rc = mca_oob_send(&proc->proc_name, &iov, 1, 0); rc = mca_oob_send(&proc->proc_name, &iov, 1, MCA_OOB_TAG_ANY, 0);
if(rc != iov.iov_len) { if(rc != iov.iov_len) {
free(procs); free(procs);
OMPI_THREAD_UNLOCK(&self->proc_lock); OMPI_THREAD_UNLOCK(&self->proc_lock);
@ -284,7 +286,7 @@ int mca_base_modex_exchange(void)
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
size = mca_oob_recv(&proc->proc_name, 0, 0, MCA_OOB_TRUNC|MCA_OOB_PEEK); size = mca_oob_recv(&proc->proc_name, 0, 0, MCA_OOB_TAG_ANY, MCA_OOB_TRUNC|MCA_OOB_PEEK);
if(size <= 0) { if(size <= 0) {
free(procs); free(procs);
OMPI_THREAD_UNLOCK(&proc->proc_lock); OMPI_THREAD_UNLOCK(&proc->proc_lock);
@ -297,7 +299,7 @@ int mca_base_modex_exchange(void)
iov.iov_base = proc_module->module_data; iov.iov_base = proc_module->module_data;
iov.iov_len = size; iov.iov_len = size;
rc = mca_oob_recv(&proc->proc_name, &iov, 1, 0); rc = mca_oob_recv(&proc->proc_name, &iov, 1, MCA_OOB_TAG_ANY, 0);
if(rc != size) { if(rc != size) {
free(procs); free(procs);
OMPI_THREAD_UNLOCK(&proc->proc_lock); OMPI_THREAD_UNLOCK(&proc->proc_lock);

Просмотреть файл

@ -31,11 +31,11 @@ mca_ns_base_cellid_t ns_base_create_cellid(void)
msg.iov_base = (char*)&cmd; msg.iov_base = (char*)&cmd;
msg.iov_len = sizeof(cmd); msg.iov_len = sizeof(cmd);
if (0 > mca_oob_send(&mca_ns_my_replica, &msg, 1, 0)) { /* error on send */ if (0 > mca_oob_send(&mca_ns_my_replica, &msg, 1, MCA_OOB_TAG_ANY, 0)) { /* error on send */
return 0; return 0;
} }
if (0 > mca_oob_recv(&mca_ns_my_replica, &msg, 1, 0)) { /* error on recv */ if (0 > mca_oob_recv(&mca_ns_my_replica, &msg, 1, MCA_OOB_TAG_ANY, 0)) { /* error on recv */
return 0; return 0;
} }

Просмотреть файл

@ -187,7 +187,7 @@ mca_oob_callback_fn_t mca_ns_replica_recv(int status, const ompi_process_name_t
reply.iov_base = (char*)&answer; reply.iov_base = (char*)&answer;
reply.iov_len = sizeof(answer); reply.iov_len = sizeof(answer);
mca_oob_send(sender, &reply, 1, 0); mca_oob_send(sender, &reply, 1, MCA_OOB_TAG_ANY, 0);
} }
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;

Просмотреть файл

@ -19,7 +19,7 @@
* This is the first module on the list. This is here temporarily * This is the first module on the list. This is here temporarily
* to make things work * to make things work
*/ */
extern mca_oob_base_module_t mca_oob; extern mca_oob_t mca_oob;
/** /**
* associate a component and a module that belongs to it * associate a component and a module that belongs to it
@ -27,7 +27,7 @@ extern mca_oob_base_module_t mca_oob;
struct mca_oob_base_info_t { struct mca_oob_base_info_t {
ompi_list_item_t super; ompi_list_item_t super;
mca_oob_base_component_t *oob_component; mca_oob_base_component_t *oob_component;
mca_oob_base_module_t *oob_module; mca_oob_t *oob_module;
}; };
typedef struct mca_oob_base_info_t mca_oob_base_info_t; typedef struct mca_oob_base_info_t mca_oob_base_info_t;

Просмотреть файл

@ -16,7 +16,7 @@
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(
mca_oob_base_module_t, mca_oob_t,
ompi_list_item_t, ompi_list_item_t,
NULL, NULL,
NULL NULL
@ -28,8 +28,9 @@ OBJ_CLASS_INSTANCE(
NULL NULL
); );
ompi_process_name_t mca_oob_base_self; ompi_process_name_t mca_oob_name_seed;
ompi_process_name_t mca_oob_base_any; ompi_process_name_t mca_oob_name_self;
ompi_process_name_t mca_oob_name_any;
/** /**
* Function for selecting one module from all those that are * Function for selecting one module from all those that are
@ -43,21 +44,26 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
mca_base_component_list_item_t *cli; mca_base_component_list_item_t *cli;
mca_oob_base_info_t * first; mca_oob_base_info_t * first;
mca_oob_base_component_t *component; mca_oob_base_component_t *component;
mca_oob_base_module_t *module; mca_oob_t *module;
extern ompi_list_t mca_oob_base_components; extern ompi_list_t mca_oob_base_components;
ompi_process_name_t *self; ompi_process_name_t *self;
/* setup local name */ /* setup local name */
OBJ_CONSTRUCT(&mca_oob_name_self, ompi_process_name_t);
self = mca_pcm.pcm_self(); self = mca_pcm.pcm_self();
if(NULL == self) { if(NULL == self) {
return OMPI_ERROR; return OMPI_ERROR;
} }
mca_oob_base_self = *self; mca_oob_name_self = *self;
/* setup wildcard name */ /* setup wildcard name */
mca_oob_base_any.cellid = -1; OBJ_CONSTRUCT(&mca_oob_name_any, ompi_process_name_t);
mca_oob_base_any.jobid = -1; mca_oob_name_any.cellid = -1;
mca_oob_base_any.vpid = -1; mca_oob_name_any.jobid = -1;
mca_oob_name_any.vpid = -1;
/* setup seed daemons name */
OBJ_CONSTRUCT(&mca_oob_name_seed, ompi_process_name_t);
/* Traverse the list of available modules; call their init functions. */ /* Traverse the list of available modules; call their init functions. */
for (item = ompi_list_get_first(&mca_oob_base_components); for (item = ompi_list_get_first(&mca_oob_base_components);

Просмотреть файл

@ -21,7 +21,7 @@
/* /*
* Global variables * Global variables
*/ */
mca_oob_base_module_t mca_oob; mca_oob_t mca_oob;
int mca_oob_base_output = -1; int mca_oob_base_output = -1;
ompi_list_t mca_oob_base_components; ompi_list_t mca_oob_base_components;
ompi_list_t mca_oob_base_modules; ompi_list_t mca_oob_base_modules;
@ -35,13 +35,14 @@ int mca_oob_base_open(void)
{ {
/* Open up all available components */ /* Open up all available components */
OBJ_CONSTRUCT(&mca_oob_base_components, ompi_list_t);
OBJ_CONSTRUCT(&mca_oob_base_modules, ompi_list_t);
if (OMPI_SUCCESS != if (OMPI_SUCCESS !=
mca_base_components_open("oob", 0, mca_oob_base_static_components, mca_base_components_open("oob", 0, mca_oob_base_static_components,
&mca_oob_base_components)) { &mca_oob_base_components)) {
return OMPI_ERROR; return OMPI_ERROR;
} }
OBJ_CONSTRUCT(&mca_oob_base_components, ompi_list_t);
OBJ_CONSTRUCT(&mca_oob_base_modules, ompi_list_t);
/* All done */ /* All done */

Просмотреть файл

@ -10,13 +10,14 @@
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param types (IN) Parallel array to iovecs describing data type of each iovec element.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
* iovec array without removing the message from the queue. * iovec array without removing the message from the queue.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
*/ */
int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int flags) int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int tag, int flags)
{ {
return(mca_oob.oob_recv(peer, msg, count, flags)); return(mca_oob.oob_recv(peer, msg, count, tag, flags));
} }
/* /*
@ -26,12 +27,18 @@ int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count,
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param types (IN) Parallel array to iovecs describing data type of each iovec element.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
* iovec array without removing the message from the queue. * iovec array without removing the message from the queue.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
*/ */
int mca_oob_recv_ntoh(ompi_process_name_t* peer, const struct iovec *msg, int mca_oob_recv_ntoh(
const mca_oob_base_type_t *types, int count, int flags) ompi_process_name_t* peer,
const struct iovec *msg,
const mca_oob_base_type_t *types,
int count,
int tag,
int flags)
{ {
int rc, num, i = 0; int rc, num, i = 0;
struct iovec * orig; struct iovec * orig;
@ -63,7 +70,7 @@ int mca_oob_recv_ntoh(ompi_process_name_t* peer, const struct iovec *msg,
} }
} }
/* now the new buffers are ready. do the recieve */ /* now the new buffers are ready. do the recieve */
rc = mca_oob.oob_recv(peer, orig, count, flags); rc = mca_oob.oob_recv(peer, orig, count, tag, flags);
/* now we have to do the conversions */ /* now we have to do the conversions */
for(i = 0; i < count; i++) { for(i = 0; i < count; i++) {
if(types[i] == MCA_OOB_BASE_INT16) { if(types[i] == MCA_OOB_BASE_INT16) {
@ -85,7 +92,7 @@ int mca_oob_recv_ntoh(ompi_process_name_t* peer, const struct iovec *msg,
/* free the iovecs we allocated */ /* free the iovecs we allocated */
free(orig); free(orig);
} else { } else {
rc = mca_oob.oob_recv(peer, msg, count, flags); rc = mca_oob.oob_recv(peer, msg, count, tag, flags);
} }
return rc; return rc;
} }

Просмотреть файл

@ -12,11 +12,11 @@ struct mca_oob_base_cb_data_t {
/* this is the callback function we will register when we have to do any conversion */ /* this is the callback function we will register when we have to do any conversion */
static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer, static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer,
const struct iovec* msg, const struct iovec* msg,
size_t count, void* cbdata); int count, int tag, void* cbdata);
static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer, static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer,
const struct iovec* msg, const struct iovec* msg,
size_t count, void* cbdata) int count, int tag, void* cbdata)
{ {
int i, num; int i, num;
struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata; struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata;
@ -43,7 +43,7 @@ static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer,
/* free the iovecs we allocated */ /* free the iovecs we allocated */
free((void *)msg); free((void *)msg);
/* call the user callback function */ /* call the user callback function */
cb_struct->user_callback(status, peer, user_iovec, count, cb_struct->user_data); cb_struct->user_callback(status, peer, user_iovec, count, tag, cb_struct->user_data);
/* free the cb structure */ /* free the cb structure */
free(cb_struct); free(cb_struct);
return; return;
@ -60,10 +60,10 @@ static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer,
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
*/ */
int mca_oob_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int count, int flags, int mca_oob_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int count, int tag, int flags,
mca_oob_callback_fn_t cbfunc, void* cbdata) mca_oob_callback_fn_t cbfunc, void* cbdata)
{ {
return(mca_oob.oob_recv_nb(peer, msg, count, flags, cbfunc, cbdata)); return(mca_oob.oob_recv_nb(peer, msg, count, tag, flags, cbfunc, cbdata));
} }
/* /*
@ -80,7 +80,7 @@ int mca_oob_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int coun
*/ */
int mca_oob_recv_ntoh_nb(ompi_process_name_t* peer, const struct iovec* msg, int mca_oob_recv_ntoh_nb(ompi_process_name_t* peer, const struct iovec* msg,
const mca_oob_base_type_t* types, int count, int flags, const mca_oob_base_type_t* types, int count, int tag, int flags,
mca_oob_callback_fn_t cbfunc, void* cbdata) mca_oob_callback_fn_t cbfunc, void* cbdata)
{ {
int rc, i = 0; int rc, i = 0;
@ -120,10 +120,9 @@ int mca_oob_recv_ntoh_nb(ompi_process_name_t* peer, const struct iovec* msg,
} }
} }
/* now the new buffers are ready. do the recieve */ /* now the new buffers are ready. do the recieve */
rc = mca_oob.oob_recv_nb(peer, orig, count, flags, &mca_oob_base_recv_cb, rc = mca_oob.oob_recv_nb(peer, orig, count, tag, flags, mca_oob_base_recv_cb, cb_struct);
(void *) cb_struct);
} else { } else {
rc = mca_oob.oob_recv_nb(peer, msg, count, flags, cbfunc, cbdata); rc = mca_oob.oob_recv_nb(peer, msg, count, tag, flags, cbfunc, cbdata);
} }
return rc; return rc;
} }

Просмотреть файл

@ -11,9 +11,9 @@
* @return OMPI error code (<0) on error number of bytes actually sent. * @return OMPI error code (<0) on error number of bytes actually sent.
*/ */
int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int flags) int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int tag, int flags)
{ {
return(mca_oob.oob_send(peer, msg, count, flags)); return(mca_oob.oob_send(peer, msg, count, tag, flags));
} }
/* /*
@ -28,7 +28,7 @@ int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int c
*/ */
int mca_oob_send_hton(const ompi_process_name_t* peer, const struct iovec *msg, int mca_oob_send_hton(const ompi_process_name_t* peer, const struct iovec *msg,
const mca_oob_base_type_t *types, int count, int flags) const mca_oob_base_type_t *types, int count, int tag, int flags)
{ {
int rc, i = 0; int rc, i = 0;
struct iovec * converted; struct iovec * converted;
@ -65,7 +65,7 @@ int mca_oob_send_hton(const ompi_process_name_t* peer, const struct iovec *msg,
mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32); mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32);
} }
} }
rc = mca_oob.oob_send(peer, converted, count, flags); rc = mca_oob.oob_send(peer, converted, count, tag, flags);
/* clean up any space we allocated */ /* clean up any space we allocated */
for(i = 0; i < count; i++) { for(i = 0; i < count; i++) {
if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) {
@ -74,7 +74,7 @@ int mca_oob_send_hton(const ompi_process_name_t* peer, const struct iovec *msg,
} }
free(converted); free(converted);
} else { } else {
rc = mca_oob.oob_send(peer, msg, count, flags); rc = mca_oob.oob_send(peer, msg, count, tag, flags);
} }
return rc; return rc;

Просмотреть файл

@ -10,13 +10,13 @@ struct mca_oob_base_cb_data_t {
}; };
/* this is the callback function we will register when we have to do any conversion */ /* this is the callback function we will register when we have to do any conversion */
static void mca_oob_base_send_cb(int status, const ompi_process_name_t* peer, static void mca_oob_base_send_cb(
const struct iovec* msg, int status,
size_t count, void* cbdata); const ompi_process_name_t* peer,
const struct iovec* msg,
static void mca_oob_base_send_cb(int status, const ompi_process_name_t* peer, int count,
const struct iovec* msg, int tag,
size_t count, void* cbdata) void* cbdata)
{ {
int i; int i;
struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata; struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata;
@ -28,12 +28,11 @@ static void mca_oob_base_send_cb(int status, const ompi_process_name_t* peer,
} }
free((void *)msg); free((void *)msg);
/* call the user callback function */ /* call the user callback function */
cb_struct->user_callback(status, peer, cb_struct->user_iovec, count, cb_struct->user_data); cb_struct->user_callback(status, peer, cb_struct->user_iovec, count, tag, cb_struct->user_data);
free(cb_struct); free(cb_struct);
return; return;
} }
/* /*
@ -49,10 +48,10 @@ static void mca_oob_base_send_cb(int status, const ompi_process_name_t* peer,
* *
*/ */
int mca_oob_send_nb(const ompi_process_name_t* peer, const struct iovec* msg, int count, int mca_oob_send_nb(const ompi_process_name_t* peer, const struct iovec* msg, int count, int tag,
int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) int flags, mca_oob_callback_fn_t cbfunc, void* cbdata)
{ {
return(mca_oob.oob_send_nb(peer, msg, count, flags, cbfunc, cbdata)); return(mca_oob.oob_send_nb(peer, msg, count, tag, flags, cbfunc, cbdata));
} }
/* /*
@ -69,7 +68,7 @@ int mca_oob_send_nb(const ompi_process_name_t* peer, const struct iovec* msg, in
*/ */
int mca_oob_send_hton_nb(const ompi_process_name_t* peer, const struct iovec* msg, int mca_oob_send_hton_nb(const ompi_process_name_t* peer, const struct iovec* msg,
const mca_oob_base_type_t* types, int count, int flags, const mca_oob_base_type_t* types, int count, int tag, int flags,
mca_oob_callback_fn_t cbfunc, void* cbdata) mca_oob_callback_fn_t cbfunc, void* cbdata)
{ {
int rc, i = 0; int rc, i = 0;
@ -113,11 +112,9 @@ int mca_oob_send_hton_nb(const ompi_process_name_t* peer, const struct iovec* ms
mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32); mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32);
} }
} }
rc = mca_oob.oob_send_nb(peer, converted, count, flags, rc = mca_oob.oob_send_nb(peer, converted, count, tag, flags, mca_oob_base_send_cb, cb_struct);
&mca_oob_base_send_cb,
(void *) cb_struct);
} else { } else {
rc = mca_oob.oob_send_nb(peer, msg, count, flags, cbfunc, cbdata); rc = mca_oob.oob_send_nb(peer, msg, count, tag, flags, cbfunc, cbdata);
} }
return rc; return rc;
} }

Просмотреть файл

@ -20,7 +20,7 @@
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
static int do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int flags); static int do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int tag, int flags);
/** /**
* Similiar to unix send(2). * Similiar to unix send(2).
@ -36,6 +36,7 @@ int mca_oob_cofs_send(
const ompi_process_name_t* peer, const ompi_process_name_t* peer,
const struct iovec *iov, const struct iovec *iov,
int count, int count,
int tag,
int flags) int flags)
{ {
FILE *fp; FILE *fp;
@ -45,14 +46,14 @@ int mca_oob_cofs_send(
char msg_file_tmp[OMPI_PATH_MAX]; char msg_file_tmp[OMPI_PATH_MAX];
/* create the file and open it... */ /* create the file and open it... */
snprintf(msg_file, OMPI_PATH_MAX, "%s/%d_%d_%d_%ld.msg", mca_oob_cofs_comm_loc, snprintf(msg_file, OMPI_PATH_MAX, "%s/%d_%d_%d_%d_%ld.msg", mca_oob_cofs_comm_loc,
ompi_name_server.get_jobid(&mca_oob_base_self), ompi_name_server.get_jobid(&mca_oob_name_self),
ompi_name_server.get_vpid(&mca_oob_base_self), ompi_name_server.get_vpid(&mca_oob_name_self),
ompi_name_server.get_vpid(peer), (long)mca_oob_cofs_serial); ompi_name_server.get_vpid(peer), tag, (long)mca_oob_cofs_serial);
snprintf(msg_file_tmp, OMPI_PATH_MAX, "%s/.%d_%d_%d_%ld.msg", mca_oob_cofs_comm_loc, snprintf(msg_file_tmp, OMPI_PATH_MAX, "%s/.%d_%d_%d_%d_%ld.msg", mca_oob_cofs_comm_loc,
ompi_name_server.get_jobid(&mca_oob_base_self), ompi_name_server.get_jobid(&mca_oob_name_self),
ompi_name_server.get_vpid(&mca_oob_base_self), ompi_name_server.get_vpid(&mca_oob_name_self),
ompi_name_server.get_vpid(peer), (long)mca_oob_cofs_serial); ompi_name_server.get_vpid(peer), tag, (long)mca_oob_cofs_serial);
fp = fopen(msg_file_tmp, "w"); fp = fopen(msg_file_tmp, "w");
if (fp == NULL) { if (fp == NULL) {
@ -90,24 +91,30 @@ int mca_oob_cofs_send_nb(
const ompi_process_name_t* peer, const ompi_process_name_t* peer,
const struct iovec *iov, const struct iovec *iov,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata) void* cbdata)
{ {
int status = mca_oob_cofs_send(peer, iov, count, flags); int status = mca_oob_cofs_send(peer, iov, count, tag, flags);
if(NULL != cbfunc) if(NULL != cbfunc)
cbfunc(status, peer, iov, count, cbdata); cbfunc(status, peer, iov, count, tag, cbdata);
return status; return status;
} }
int int
mca_oob_cofs_recv(ompi_process_name_t* peer, const struct iovec* iov, int count, int flags) mca_oob_cofs_recv(
ompi_process_name_t* peer,
const struct iovec* iov,
int count,
int tag,
int flags)
{ {
int ret = OMPI_ERR_WOULD_BLOCK; int ret = OMPI_ERR_WOULD_BLOCK;
while (ret == OMPI_ERR_WOULD_BLOCK) { while (ret == OMPI_ERR_WOULD_BLOCK) {
ret = do_recv(ompi_name_server.get_jobid(peer), ret = do_recv(ompi_name_server.get_jobid(peer),
ompi_name_server.get_vpid(peer), iov, count, flags); ompi_name_server.get_vpid(peer), iov, count, tag, flags);
sleep(1); sleep(1);
} }
return ret; return ret;
@ -119,24 +126,25 @@ mca_oob_cofs_recv_nb(
ompi_process_name_t* peer, ompi_process_name_t* peer,
const struct iovec* iov, const struct iovec* iov,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata) void* cbdata)
{ {
int status = mca_oob_cofs_recv(peer, iov, count, flags); int status = mca_oob_cofs_recv(peer, iov, count, tag, flags);
if(NULL != cbfunc) if(NULL != cbfunc)
cbfunc(status, peer, iov, count, cbdata); cbfunc(status, peer, iov, count, tag, cbdata);
return status; return status;
} }
static char* static char*
find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid) find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int tag)
{ {
DIR* dir; DIR* dir;
struct dirent *ent; struct dirent *ent;
unsigned long tmp_serial; unsigned long tmp_serial;
int tmp_jobid, tmp_procid, tmp_myprocid; int tmp_jobid, tmp_procid, tmp_myprocid, tmp_tag;
int ret; int ret;
bool found = false; bool found = false;
char best_name[OMPI_PATH_MAX]; char best_name[OMPI_PATH_MAX];
@ -150,21 +158,23 @@ find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid)
while ((ent = readdir(dir)) != NULL) { while ((ent = readdir(dir)) != NULL) {
if (ent->d_name[0] == '.') continue; if (ent->d_name[0] == '.') continue;
ret = sscanf(ent->d_name, "%d_%d_%d_%lu.msg", &tmp_jobid, &tmp_procid, ret = sscanf(ent->d_name, "%d_%d_%d_%d_%lu.msg", &tmp_jobid, &tmp_procid,
&tmp_myprocid, &tmp_serial); &tmp_myprocid, &tmp_tag, &tmp_serial);
if (ret != 4) { if (ret != 5) {
continue; continue;
} }
if (tmp_jobid != jobid) { if (tmp_jobid != jobid) {
continue; continue;
} }
if (tmp_myprocid != ompi_name_server.get_vpid(&mca_oob_base_self)) { if (tmp_myprocid != ompi_name_server.get_vpid(&mca_oob_name_self)) {
continue; continue;
} }
if (tmp_procid != procid) { if (tmp_procid != procid) {
continue; continue;
} }
if (tag != MCA_OOB_TAG_ANY && tag != tmp_tag)
continue;
/* do best one here... */ /* do best one here... */
found = true; found = true;
@ -184,7 +194,7 @@ find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid)
static int static int
do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int flags) do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int tag, int flags)
{ {
char *fname; char *fname;
char full_fname[OMPI_PATH_MAX]; char full_fname[OMPI_PATH_MAX];
@ -192,7 +202,7 @@ do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec
size_t rlen; size_t rlen;
size_t size; size_t size;
fname = find_match(jobid, procid); fname = find_match(jobid, procid, tag);
if (fname == NULL) { if (fname == NULL) {
return OMPI_ERR_WOULD_BLOCK; return OMPI_ERR_WOULD_BLOCK;
} }

Просмотреть файл

@ -20,7 +20,7 @@ int mca_oob_cofs_close(void);
/* /*
* Startup / Shutdown * Startup / Shutdown
*/ */
mca_oob_base_module_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads); mca_oob_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads);
int mca_oob_cofs_finalize(void); int mca_oob_cofs_finalize(void);
@ -39,6 +39,7 @@ int mca_oob_cofs_send(
const ompi_process_name_t*, const ompi_process_name_t*,
const struct iovec* msg, const struct iovec* msg,
int count, int count,
int tag,
int flags); int flags);
@ -58,6 +59,7 @@ int mca_oob_cofs_recv(
ompi_process_name_t* peer, ompi_process_name_t* peer,
const struct iovec *msg, const struct iovec *msg,
int count, int count,
int tag,
int flags); int flags);
@ -78,6 +80,7 @@ int mca_oob_cofs_send_nb(
const ompi_process_name_t* peer, const ompi_process_name_t* peer,
const struct iovec* msg, const struct iovec* msg,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata); void* cbdata);
@ -99,6 +102,7 @@ int mca_oob_cofs_recv_nb(
ompi_process_name_t* peer, ompi_process_name_t* peer,
const struct iovec* msg, const struct iovec* msg,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata); void* cbdata);

Просмотреть файл

@ -41,7 +41,7 @@ mca_oob_base_component_1_0_0_t mca_oob_cofs_component = {
mca_oob_cofs_finalize mca_oob_cofs_finalize
}; };
mca_oob_base_module_t mca_oob_cofs = { mca_oob_t mca_oob_cofs = {
mca_oob_cofs_send, mca_oob_cofs_send,
mca_oob_cofs_recv, mca_oob_cofs_recv,
mca_oob_cofs_send_nb, mca_oob_cofs_send_nb,
@ -54,8 +54,7 @@ int mca_oob_cofs_my_procid;
uint64_t mca_oob_cofs_serial; uint64_t mca_oob_cofs_serial;
struct mca_oob_base_module_1_0_0_t* mca_oob_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
{ {
int len; int len;
char *tmp; char *tmp;

Просмотреть файл

@ -12,16 +12,25 @@
#include "mca/base/base.h" #include "mca/base/base.h"
#include "mca/ns/ns.h" #include "mca/ns/ns.h"
#include <sys/uio.h> #include <sys/uio.h>
/* /*
* Address for wildcard receives. * Well known address
*/ */
extern ompi_process_name_t mca_oob_base_any; extern ompi_process_name_t mca_oob_name_any;
extern ompi_process_name_t mca_oob_base_self; extern ompi_process_name_t mca_oob_name_seed;
extern ompi_process_name_t mca_oob_name_self;
#define MCA_OOB_BASE_ANY &mca_oob_base_any #define MCA_OOB_NAME_ANY &mca_oob_name_any
#define MCA_OOB_BASE_SELF &mca_oob_base_self #define MCA_OOB_NAME_SEED &mca_oob_name_seed
#define MCA_OOB_NAME_SELF &mca_oob_name_self
/*
* Other constants
*/
#define MCA_OOB_TAG_ANY 0
/* /*
* OOB API * OOB API
@ -58,28 +67,31 @@ extern "C" {
/** /**
* Similiar to unix writev(2). * Similiar to unix writev(2).
* *
* @param peer (IN) Opaque name of peer process. * @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param flags (IN) Currently unused. * @param tag (IN) User defined tag for matching send/recv.
* @return OMPI error code (<0) on error number of bytes actually sent. * @param flags (IN) Currently unused.
* @return OMPI error code (<0) on error number of bytes actually sent.
*/ */
int mca_oob_send( int mca_oob_send(
const ompi_process_name_t* peer, const ompi_process_name_t* peer,
const struct iovec *msg, const struct iovec *msg,
int count, int count,
int tag,
int flags); int flags);
/** /**
* Convert data (if required) to network byte order prior to sending to peer. * Convert data (if required) to network byte order prior to sending to peer.
* *
* @param peer (IN) Opaque name of peer process. * @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param types (IN) Parallel array to iovecs describing data type of each iovec element.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param flags (IN) Currently unused. * @param tag (IN) User defined tag for matching send/recv.
* @return OMPI error code (<0) on error number of bytes actually sent. * @param flags (IN) Currently unused.
* @return OMPI error code (<0) on error number of bytes actually sent.
*/ */
int mca_oob_send_hton( int mca_oob_send_hton(
@ -87,6 +99,7 @@ int mca_oob_send_hton(
const struct iovec *msg, const struct iovec *msg,
const mca_oob_base_type_t *types, const mca_oob_base_type_t *types,
int count, int count,
int tag,
int flags); int flags);
@ -96,12 +109,18 @@ int mca_oob_send_hton(
* @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
* iovec array without removing the message from the queue. * iovec array without removing the message from the queue.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
*/ */
int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int flags); int mca_oob_recv(
ompi_process_name_t* peer,
const struct iovec *msg,
int count,
int tag,
int flags);
/** /**
* Receive data and convert (if required) to host byte order. * Receive data and convert (if required) to host byte order.
@ -110,6 +129,7 @@ int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count,
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param types (IN) Parallel array to iovecs describing data type of each iovec element.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
* iovec array without removing the message from the queue. * iovec array without removing the message from the queue.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
@ -120,6 +140,7 @@ int mca_oob_recv_ntoh(
const struct iovec *msg, const struct iovec *msg,
const mca_oob_base_type_t *types, const mca_oob_base_type_t *types,
int count, int count,
int tag,
int flags); int flags);
/* /*
@ -134,6 +155,7 @@ int mca_oob_recv_ntoh(
* @param peer (IN) Opaque name of peer process. * @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param cbdata (IN) User data. * @param cbdata (IN) User data.
*/ */
@ -141,7 +163,8 @@ typedef void (*mca_oob_callback_fn_t)(
int status, int status,
const ompi_process_name_t* peer, const ompi_process_name_t* peer,
const struct iovec* msg, const struct iovec* msg,
size_t count, int count,
int tag,
void* cbdata); void* cbdata);
/** /**
@ -150,6 +173,7 @@ typedef void (*mca_oob_callback_fn_t)(
* @param peer (IN) Opaque name of peer process. * @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) Currently unused. * @param flags (IN) Currently unused.
* @param cbfunc (IN) Callback function on send completion. * @param cbfunc (IN) Callback function on send completion.
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
@ -161,6 +185,7 @@ int mca_oob_send_nb(
const ompi_process_name_t* peer, const ompi_process_name_t* peer,
const struct iovec* msg, const struct iovec* msg,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata); void* cbdata);
@ -172,6 +197,7 @@ int mca_oob_send_nb(
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param types (IN) Parallel array to iovecs describing data type of each iovec element.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) Currently unused. * @param flags (IN) Currently unused.
* @param cbfunc (IN) Callback function on send completion. * @param cbfunc (IN) Callback function on send completion.
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
@ -184,6 +210,7 @@ int mca_oob_send_hton_nb(
const struct iovec* msg, const struct iovec* msg,
const mca_oob_base_type_t* types, const mca_oob_base_type_t* types,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata); void* cbdata);
@ -194,6 +221,7 @@ int mca_oob_send_hton_nb(
* @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue,
* @param cbfunc (IN) Callback function on recv completion. * @param cbfunc (IN) Callback function on recv completion.
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
@ -204,6 +232,7 @@ int mca_oob_recv_nb(
ompi_process_name_t* peer, ompi_process_name_t* peer,
const struct iovec* msg, const struct iovec* msg,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata); void* cbdata);
@ -215,6 +244,7 @@ int mca_oob_recv_nb(
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param types (IN) Parallel array to iovecs describing data type of each iovec element.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue,
* @param cbfunc (IN) Callback function on recv completion. * @param cbfunc (IN) Callback function on recv completion.
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
@ -226,6 +256,7 @@ int mca_oob_recv_ntoh_nb(
const struct iovec* msg, const struct iovec* msg,
const mca_oob_base_type_t* types, const mca_oob_base_type_t* types,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata); void* cbdata);
@ -243,6 +274,7 @@ int mca_oob_recv_ntoh_nb(
* @param peer (IN) Opaque name of peer process. * @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) Currently unused. * @param flags (IN) Currently unused.
* @return OMPI error code (<0) on error number of bytes actually sent. * @return OMPI error code (<0) on error number of bytes actually sent.
*/ */
@ -251,6 +283,7 @@ typedef int (*mca_oob_base_module_send_fn_t)(
const ompi_process_name_t* peer, const ompi_process_name_t* peer,
const struct iovec *msg, const struct iovec *msg,
int count, int count,
int tag,
int flags); int flags);
@ -261,6 +294,7 @@ typedef int (*mca_oob_base_module_send_fn_t)(
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param types (IN) Parallel array to iovecs describing data type of each iovec element.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
* iovec array without removing the message from the queue. * iovec array without removing the message from the queue.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
@ -270,6 +304,7 @@ typedef int (*mca_oob_base_module_recv_fn_t)(
ompi_process_name_t* peer, ompi_process_name_t* peer,
const struct iovec *msg, const struct iovec *msg,
int count, int count,
int tag,
int flags); int flags);
/** /**
@ -278,6 +313,7 @@ typedef int (*mca_oob_base_module_recv_fn_t)(
* @param peer (IN) Opaque name of peer process. * @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) Currently unused. * @param flags (IN) Currently unused.
* @param cbfunc (IN) Callback function on send completion. * @param cbfunc (IN) Callback function on send completion.
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
@ -289,6 +325,7 @@ typedef int (*mca_oob_base_module_send_nb_fn_t)(
const ompi_process_name_t* peer, const ompi_process_name_t* peer,
const struct iovec* msg, const struct iovec* msg,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata); void* cbdata);
@ -299,6 +336,7 @@ typedef int (*mca_oob_base_module_send_nb_fn_t)(
* @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue,
* @param cbfunc (IN) Callback function on recv completion. * @param cbfunc (IN) Callback function on recv completion.
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
@ -309,6 +347,7 @@ typedef int (*mca_oob_base_module_recv_nb_fn_t)(
ompi_process_name_t* peer, ompi_process_name_t* peer,
const struct iovec* msg, const struct iovec* msg,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata); void* cbdata);
@ -317,20 +356,20 @@ typedef int (*mca_oob_base_module_recv_nb_fn_t)(
* OOB Module * OOB Module
*/ */
struct mca_oob_base_module_1_0_0_t { struct mca_oob_1_0_0_t {
mca_oob_base_module_send_fn_t oob_send; mca_oob_base_module_send_fn_t oob_send;
mca_oob_base_module_recv_fn_t oob_recv; mca_oob_base_module_recv_fn_t oob_recv;
mca_oob_base_module_send_nb_fn_t oob_send_nb; mca_oob_base_module_send_nb_fn_t oob_send_nb;
mca_oob_base_module_recv_nb_fn_t oob_recv_nb; mca_oob_base_module_recv_nb_fn_t oob_recv_nb;
}; };
typedef struct mca_oob_base_module_1_0_0_t mca_oob_base_module_1_0_0_t; typedef struct mca_oob_1_0_0_t mca_oob_1_0_0_t;
typedef struct mca_oob_base_module_1_0_0_t mca_oob_base_module_t; typedef struct mca_oob_1_0_0_t mca_oob_t;
/** /**
* OOB Component * OOB Component
*/ */
typedef mca_oob_base_module_t* (*mca_oob_base_component_init_fn_t)( typedef mca_oob_t* (*mca_oob_base_component_init_fn_t)(
bool *allow_multi_user_threads, bool *allow_multi_user_threads,
bool *have_hidden_threads); bool *have_hidden_threads);

Просмотреть файл

@ -4,8 +4,18 @@
* *
*/ */
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include "util/output.h"
#include "mca/oob/tcp/oob_tcp.h" #include "mca/oob/tcp/oob_tcp.h"
static int mca_oob_tcp_create_listen(void);
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user);
static void mca_oob_tcp_accept(void);
/* /*
* Struct of function pointers and all that to let us be initialized * Struct of function pointers and all that to let us be initialized
*/ */
@ -28,28 +38,48 @@ mca_oob_tcp_component_t mca_oob_tcp_component = {
} }
}; };
static struct mca_oob_base_module_1_0_0_t mca_oob_tcp = { static mca_oob_t mca_oob_tcp = {
mca_oob_tcp_send, mca_oob_tcp_send,
mca_oob_tcp_recv, mca_oob_tcp_recv,
mca_oob_tcp_send_nb, mca_oob_tcp_send_nb,
mca_oob_tcp_recv_nb mca_oob_tcp_recv_nb
}; };
/*
* Utility function to register/lookup module parameters.
*/
static inline int mca_oob_tcp_param_register_int(
const char* param_name,
int default_value)
{
int id = mca_base_param_register_int("ptl","tcp",param_name,NULL,default_value);
int param_value = default_value;
mca_base_param_lookup_int(id,&param_value);
return param_value;
}
/* /*
* Initialize global variables used w/in this module. * Initialize global variables used w/in this module.
*/ */
int mca_oob_tcp_open(void) int mca_oob_tcp_open(void)
{ {
#if 0 OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_list, ompi_list_t);
mca_oob_tcp_module.tcp_listen_port = 1; OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_tree, ompi_rb_tree_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_list, ompi_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_peer_free, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_tree, ompi_rb_tree_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msgs, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_peer_free, ompi_free_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_lock, ompi_mutex_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_post, ompi_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_condition, ompi_condition_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_msg_recv, ompi_list_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_post_recv, ompi_list_t); OBJ_CONSTRUCT(&mca_oob_tcp_component.tcp_match_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_oob_tcp_module.tcp_msg_recv, ompi_list_t);
#endif /* register oob module parameters */
mca_oob_tcp_component.tcp_peer_limit =
mca_oob_tcp_param_register_int("peer_limit", -1);
mca_oob_tcp_component.tcp_peer_retries =
mca_oob_tcp_param_register_int("peer_retries", 60);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -60,65 +90,214 @@ int mca_oob_tcp_open(void)
int mca_oob_tcp_close(void) int mca_oob_tcp_close(void)
{ {
#if 0 OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_list);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_list); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_tree);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_tree); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_peer_free);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_peer_free); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msgs);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_condition); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_lock);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_lock); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_post);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_post_recv); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_msg_recv);
OBJ_DESTRUCT(&mca_oob_tcp_module.tcp_msg_recv); OBJ_DESTRUCT(&mca_oob_tcp_component.tcp_match_lock);
#endif
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/** /*
* Compare two process names for equality. * Called by mca_oob_tcp_recv_handler() when the TCP listen
* * socket has pending connection requests. Accept incoming
* @param n1 Process name 1. * requests and queue for completion of the connection handshake.
* @param n2 Process name 2.
* @return (-1 for n1<n2 0 for equality, 1 for n1>n2)
*
* Note that the definition of < or > is somewhat arbitrary -
* just needs to be consistently applied to maintain an ordering
* when process names are used as indices.
*/ */
static int ompi_process_name_compare(ompi_process_name_t* n1, static void mca_oob_tcp_accept(void)
ompi_process_name_t* n2)
{ {
if(n1->cellid < n2->cellid) while(true) {
return -1; ompi_socklen_t addrlen = sizeof(struct sockaddr_in);
else if(n1->cellid > n2->cellid) struct sockaddr_in addr;
return 1; ompi_event_t* event;
else if(n1->jobid < n2->jobid) int sd = accept(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&addr, &addrlen);
return -1; if(sd < 0) {
else if(n1->jobid > n2->jobid) if(errno == EINTR)
return 1; continue;
else if(n1->vpid < n2->vpid) if(errno != EAGAIN || errno != EWOULDBLOCK)
return -1; ompi_output(0, "mca_oob_tcp_accept: accept() failed with errno %d.", errno);
else if(n1->vpid > n2->vpid) return;
return 1; }
return(0);
} /* wait for receipt of peers process identifier to complete this connection */
event = malloc(sizeof(ompi_event_t));
ompi_event_set(event, sd, OMPI_EV_READ|OMPI_EV_PERSIST, mca_oob_tcp_recv_handler, event);
ompi_event_add(event, 0);
}
}
/* /*
* this function will temporarily return NULL so we don't use it * Create a listen socket and bind to all interfaces
*/ */
struct mca_oob_base_module_1_0_0_t* mca_oob_tcp_init(bool *allow_multi_user_threads,
bool *have_hidden_threads) static int mca_oob_tcp_create_listen(void)
{ {
#if 0 int flags;
/* initialize data structures */ struct sockaddr_in inaddr;
ompi_rb_tree_init(&mca_oob_tcp_module.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)ompi_process_name_compare); ompi_socklen_t addrlen;
#endif
/* return &mca_oob_tcp; */ /* create a listen socket for incoming connections */
return NULL; mca_oob_tcp_component.tcp_listen_sd = socket(AF_INET, SOCK_STREAM, 0);
if(mca_oob_tcp_component.tcp_listen_sd < 0) {
ompi_output(0,"mca_oob_tcp_component_init: socket() failed with errno=%d", errno);
return OMPI_ERROR;
}
/* bind to all addresses and dynamically assigned port */
memset(&inaddr, 0, sizeof(inaddr));
inaddr.sin_family = AF_INET;
inaddr.sin_addr.s_addr = INADDR_ANY;
inaddr.sin_port = htons(5000+mca_oob_name_self.vpid);
if(bind(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, sizeof(inaddr)) < 0) {
ompi_output(0,"mca_oob_tcp_create_listen: bind() failed with errno=%d", errno);
return OMPI_ERROR;
}
/* resolve system assignend port */
addrlen = sizeof(struct sockaddr_in);
if(getsockname(mca_oob_tcp_component.tcp_listen_sd, (struct sockaddr*)&inaddr, &addrlen) < 0) {
ompi_output(0, "mca_oob_tcp_create_listen: getsockname() failed with errno=%d", errno);
return OMPI_ERROR;
}
mca_oob_tcp_component.tcp_listen_port = inaddr.sin_port;
/* setup listen backlog to maximum allowed by kernel */
if(listen(mca_oob_tcp_component.tcp_listen_sd, SOMAXCONN) < 0) {
ompi_output(0, "mca_oob_tcp_component_init: listen() failed with errno=%d", errno);
return OMPI_ERROR;
}
/* set socket up to be non-blocking, otherwise accept could block */
if((flags = fcntl(mca_oob_tcp_component.tcp_listen_sd, F_GETFL, 0)) < 0) {
ompi_output(0, "mca_oob_tcp_component_init: fcntl(F_GETFL) failed with errno=%d", errno);
return OMPI_ERROR;
} else {
flags |= O_NONBLOCK;
if(fcntl(mca_oob_tcp_component.tcp_listen_sd, F_SETFL, flags) < 0) {
ompi_output(0, "mca_oob_tcp_component_init: fcntl(F_SETFL) failed with errno=%d", errno);
return OMPI_ERROR;
}
}
/* register listen port */
ompi_event_set(
&mca_oob_tcp_component.tcp_recv_event,
mca_oob_tcp_component.tcp_listen_sd,
OMPI_EV_READ|OMPI_EV_PERSIST,
mca_oob_tcp_recv_handler,
0);
ompi_event_add(&mca_oob_tcp_component.tcp_recv_event, 0);
return OMPI_SUCCESS;
} }
/*
* Event callback when there is data available on the registered
* socket to recv.
*/
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
{
ompi_process_name_t name;
mca_oob_tcp_peer_t* peer;
int rc;
/* accept new connections on the listen socket */
if(mca_oob_tcp_component.tcp_listen_sd == sd) {
mca_oob_tcp_accept();
return;
}
ompi_event_del((ompi_event_t*)user);
free(user);
/* recv the process identifier */
rc = recv(sd, &name, sizeof(name), 0);
if(rc != sizeof(name)) {
ompi_output(0, "mca_oob_tcp_recv_handler: recv() return value %d != %d, errno = %d",
rc, sizeof(name), errno);
close(sd);
return;
}
OMPI_PROCESS_NAME_NTOH(name);
/* now set socket up to be non-blocking */
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_GETFL) failed with errno=%d", errno);
} else {
flags |= O_NONBLOCK;
if(fcntl(sd, F_SETFL, flags) < 0) {
ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_SETFL) failed with errno=%d", errno);
}
}
/* lookup the corresponding process */
peer = mca_oob_tcp_peer_lookup(&name, true);
if(NULL == peer) {
ompi_output(0, "mca_oob_tcp_recv_handler: unable to locate peer");
close(sd);
return;
}
/* is the peer instance willing to accept this connection */
if(mca_oob_tcp_peer_accept(peer, sd) == false) {
close(sd);
return;
}
}
/*
* Module initialization.
* (1) initialize static resources
* (2) create listen socket
*/
mca_oob_t* mca_oob_tcp_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
{
/* initialize data structures */
ompi_rb_tree_init(&mca_oob_tcp_component.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)ompi_process_name_compare);
ompi_free_list_init(&mca_oob_tcp_component.tcp_peer_free,
sizeof(mca_oob_tcp_peer_t),
OBJ_CLASS(mca_oob_tcp_peer_t),
8, /* initial number */
mca_oob_tcp_component.tcp_peer_limit, /* maximum number */
8, /* increment to grow by */
NULL); /* use default allocator */
ompi_free_list_init(&mca_oob_tcp_component.tcp_msgs,
sizeof(mca_oob_tcp_msg_t),
OBJ_CLASS(mca_oob_tcp_msg_t),
8, /* initial number */
-1, /* maximum number */
8, /* increment to grow by */
NULL); /* use default allocator */
#if 0
/* intialize event library */
if(ompi_event_init() != OMPI_SUCCESS) {
ompi_output(0, "mca_oob_tcp_init: unable to initialize event library\n");
return NULL;
}
/* create a listen socket */
if(mca_oob_tcp_create_listen() != OMPI_SUCCESS) {
ompi_output(0, "mca_oob_tcp_init: unable to create listen socket\n");
return NULL;
}
return &mca_oob_tcp;
#else
return NULL;
#endif
}
/*
* Module cleanup.
*/
int mca_oob_tcp_finalize(void) int mca_oob_tcp_finalize(void)
{ {
/* probably want to try to finish all sends and recieves here /* probably want to try to finish all sends and recieves here

Просмотреть файл

@ -29,22 +29,78 @@ extern "C" {
*/ */
int mca_oob_tcp_open(void); int mca_oob_tcp_open(void);
int mca_oob_tcp_close(void); int mca_oob_tcp_close(void);
struct mca_oob_base_module_1_0_0_t* mca_oob_tcp_init(bool *allow_multi_user_threads, mca_oob_t* mca_oob_tcp_init(bool *allow_multi_user_threads, bool *have_hidden_threads);
bool *have_hidden_threads);
int mca_oob_tcp_finalize(void); int mca_oob_tcp_finalize(void);
/*
* Convert process name from network to host byte order.
*
* @param name
*/
#define OMPI_PROCESS_NAME_NTOH(n) \
n.cellid = ntohl(n.cellid); \
n.jobid = ntohl(n.jobid); \
n.vpid = ntohl(n.vpid);
/*
* Convert process name from host to network byte order.
*
* @param name
*/
#define OMPI_PROCESS_NAME_HTON(n) \
n.cellid = htonl(n.cellid); \
n.jobid = htonl(n.jobid); \
n.vpid = htonl(n.vpid);
/**
* Compare two process names for equality.
*
* @param n1 Process name 1.
* @param n2 Process name 2.
* @return (-1 for n1<n2 0 for equality, 1 for n1>n2)
*
* Note that the definition of < or > is somewhat arbitrary -
* just needs to be consistently applied to maintain an ordering
* when process names are used as indices.
*/
static int ompi_process_name_compare(const ompi_process_name_t* n1, const ompi_process_name_t* n2)
{
if(n1->cellid < n2->cellid)
return -1;
else if(n1->cellid > n2->cellid)
return 1;
else if(n1->jobid < n2->jobid)
return -1;
else if(n1->jobid > n2->jobid)
return 1;
else if(n1->vpid < n2->vpid)
return -1;
else if(n1->vpid > n2->vpid)
return 1;
return(0);
}
/** /**
* Similiar to unix writev(2). * Similiar to unix writev(2).
* *
* @param peer (IN) Opaque name of peer process. * @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) Currently unused. * @param flags (IN) Currently unused.
* @return OMPI error code (<0) on error number of bytes actually sent. * @return OMPI error code (<0) on error number of bytes actually sent.
*/ */
int mca_oob_tcp_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int flags); int mca_oob_tcp_send(
const ompi_process_name_t* peer,
const struct iovec *msg,
int count,
int tag,
int flags);
/** /**
* Similiar to unix readv(2) * Similiar to unix readv(2)
@ -52,12 +108,18 @@ int mca_oob_tcp_send(const ompi_process_name_t* peer, const struct iovec *msg, i
* @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
* iovec array without removing the message from the queue. * iovec array without removing the message from the queue.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
*/ */
int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int count, int flags); int mca_oob_tcp_recv(
ompi_process_name_t* peer,
const struct iovec *iov,
int count,
int tag,
int flags);
/* /*
@ -70,6 +132,7 @@ int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int cou
* @param peer (IN) Opaque name of peer process. * @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) Currently unused. * @param flags (IN) Currently unused.
* @param cbfunc (IN) Callback function on send completion. * @param cbfunc (IN) Callback function on send completion.
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
@ -77,8 +140,14 @@ int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int cou
* *
*/ */
int mca_oob_tcp_send_nb(const ompi_process_name_t* peer, const struct iovec* iov, int count, int mca_oob_tcp_send_nb(
int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); const ompi_process_name_t* peer,
const struct iovec* iov,
int count,
int tag,
int flags,
mca_oob_callback_fn_t cbfunc,
void* cbdata);
/** /**
* Non-blocking version of mca_oob_recv(). * Non-blocking version of mca_oob_recv().
@ -86,14 +155,21 @@ int mca_oob_tcp_send_nb(const ompi_process_name_t* peer, const struct iovec* iov
* @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue,
* @param cbfunc (IN) Callback function on recv completion. * @param cbfunc (IN) Callback function on recv completion.
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
*/ */
int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* iov, int count, int flags, int mca_oob_tcp_recv_nb(
mca_oob_callback_fn_t cbfunc, void* cbdata); ompi_process_name_t* peer,
const struct iovec* iov,
int count,
int tag,
int flags,
mca_oob_callback_fn_t cbfunc,
void* cbdata);
/** /**
@ -101,19 +177,20 @@ int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* iov, int
*/ */
struct mca_oob_tcp_component_t { struct mca_oob_tcp_component_t {
mca_oob_base_component_1_0_0_t super; /**< base OOB component */ mca_oob_base_component_1_0_0_t super; /**< base OOB component */
int tcp_listen_sd; /**< listen socket for incoming connection requests */ int tcp_listen_sd; /**< listen socket for incoming connection requests */
unsigned short tcp_listen_port; /**< listen port */ unsigned short tcp_listen_port; /**< listen port */
ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */ ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */
ompi_rb_tree_t tcp_peer_tree; /**< tree of peers sorted by name */ ompi_rb_tree_t tcp_peer_tree; /**< tree of peers sorted by name */
ompi_free_list_t tcp_peer_free; /**< free list of peers */ ompi_free_list_t tcp_peer_free; /**< free list of peers */
size_t tcp_peer_limit; /**< max size of tcp peer cache */
int tcp_peer_retries; /**< max number of retries before declaring peer gone */
ompi_free_list_t tcp_msgs; /**< free list of messages */ ompi_free_list_t tcp_msgs; /**< free list of messages */
ompi_event_t tcp_send_event; /**< event structure for sends */ ompi_event_t tcp_send_event; /**< event structure for sends */
ompi_event_t tcp_recv_event; /**< event structure for recvs */ ompi_event_t tcp_recv_event; /**< event structure for recvs */
ompi_mutex_t tcp_lock; /**< lock for accessing module state */ ompi_mutex_t tcp_lock; /**< lock for accessing module state */
ompi_condition_t tcp_condition; /**< condition variable for blocking sends */ ompi_list_t tcp_msg_post; /**< list of recieves user has posted */
size_t tcp_cache_size; /**< max size of tcp peer cache */
ompi_list_t tcp_post_recv; /**< list of the recvs the user has posted */
ompi_list_t tcp_msg_recv; /**< list of recieved messages */ ompi_list_t tcp_msg_recv; /**< list of recieved messages */
ompi_mutex_t tcp_match_lock; /**< lock held while searching/posting messages */
}; };
typedef struct mca_oob_tcp_component_t mca_oob_tcp_component_t; typedef struct mca_oob_tcp_component_t mca_oob_tcp_component_t;

30
src/mca/oob/tcp/oob_tcp_hdr.h Обычный файл
Просмотреть файл

@ -0,0 +1,30 @@
/*
* $HEADER$
*/
/** @file:
*
* Contains header used by tcp oob.
*/
#ifndef _MCA_OOB_TCP_HDR_H_
#define _MCA_OOB_TCP_HDR_H_
/*
* Header used by tcp oob protocol.
*/
struct mca_oob_tcp_hdr_t {
uint32_t msg_size; /**< the total size of the message body - excluding header */
int32_t msg_tag; /**< user provided tag */
};
typedef struct mca_oob_tcp_hdr_t mca_oob_tcp_hdr_t;
#define MCA_OOB_TCP_HDR_NTOHL(h) \
ntohl(h->msg_size); \
ntohl(h->msg_tag);
#define MCA_OOB_TCP_HDR_HTONL(h) \
htonl(h->msg_size); \
htonl(h->msg_tag);
#endif /* _MCA_OOB_TCP_MESSAGE_H_ */

Просмотреть файл

@ -30,20 +30,24 @@ static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg)
/** /**
* Wait for a msg to complete. * Wait for a msg to complete.
* @param msg (IN) Message to wait on. * @param msg (IN) Message to wait on.
* @param size (OUT) Number of bytes delivered. * @param rc (OUT) Return code (number of bytes read on success or error code on failure).
* @retval OMPI_SUCCESS or error code on failure. * @retval OMPI_SUCCESS or error code on failure.
*/ */
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size) int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc)
{ {
int rc = OMPI_SUCCESS; /* wait for message to complete */
ompi_mutex_lock(&msg->msg_lock); ompi_mutex_lock(&msg->msg_lock);
while(msg->msg_complete == false) while(msg->msg_complete == false) {
ompi_condition_wait(&msg->msg_condition, &msg->msg_lock); ompi_condition_wait(&msg->msg_condition, &msg->msg_lock);
}
ompi_mutex_unlock(&msg->msg_lock); ompi_mutex_unlock(&msg->msg_lock);
*size = msg->msg_state;
MCA_OOB_TCP_MSG_RETURN(msg); /* return status */
return rc; if(NULL != rc) {
*rc = msg->msg_rc;
}
return OMPI_SUCCESS;
} }
/** /**
@ -57,7 +61,7 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, ompi_process_name_t * peer)
ompi_mutex_lock(&msg->msg_lock); ompi_mutex_lock(&msg->msg_lock);
msg->msg_complete = true; msg->msg_complete = true;
if(NULL != msg->msg_cbfunc) { if(NULL != msg->msg_cbfunc) {
msg->msg_cbfunc(msg->msg_state, peer, msg->msg_iov, msg->msg_count, msg->msg_cbdata); msg->msg_cbfunc(msg->msg_rc, peer, msg->msg_uiov, msg->msg_ucnt, ntohl(msg->msg_hdr.msg_tag), msg->msg_cbdata);
ompi_mutex_unlock(&msg->msg_lock); ompi_mutex_unlock(&msg->msg_lock);
MCA_OOB_TCP_MSG_RETURN(msg); MCA_OOB_TCP_MSG_RETURN(msg);
} else { } else {
@ -78,69 +82,59 @@ bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
{ {
int rc; int rc;
while(1) { while(1) {
rc = writev(peer->peer_sd, msg->msg_rwptr, msg->msg_rwcnt); rc = writev(peer->peer_sd, msg->msg_rwptr, msg->msg_rwnum);
if(rc <= 0) { if(rc < 0) {
if(errno == EINTR) if(errno == EINTR)
continue; continue;
else if (errno == EAGAIN) else if (errno == EAGAIN)
return false; return false;
else { else {
close(peer->peer_sd); mca_oob_tcp_peer_close(peer);
peer->peer_state = MCA_OOB_TCP_CLOSED;
return false; return false;
} }
} }
msg->msg_state += rc;
do {/* while there is still more iovects to write */ msg->msg_rc += rc;
do {/* while there is still more iovecs to write */
if(rc < msg->msg_rwptr->iov_len) { if(rc < msg->msg_rwptr->iov_len) {
msg->msg_rwptr->iov_len -= rc; msg->msg_rwptr->iov_len -= rc;
msg->msg_rwptr->iov_base = (void *) ((char *) msg->msg_rwptr->iov_base + rc); msg->msg_rwptr->iov_base = (void *) ((char *) msg->msg_rwptr->iov_base + rc);
break; break;
} else { } else {
rc -= msg->msg_rwptr->iov_len; rc -= msg->msg_rwptr->iov_len;
(msg->msg_rwcnt)--; (msg->msg_rwnum)--;
(msg->msg_rwptr)++; (msg->msg_rwptr)++;
if(0 == msg->msg_rwcnt) { if(0 == msg->msg_rwnum) {
ompi_list_remove_item(&peer->peer_send_queue, (ompi_list_item_t *) msg);
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
return true; return true;
} }
} }
} while(msg->msg_rwcnt); } while(msg->msg_rwnum);
} }
} }
/** /**
* Actually recieves the data * Receives message data.
*
* @param msg the message to be recieved into * @param msg the message to be recieved into
* @param peer the peer to recieve from * @param peer the peer to recieve from
* @retval true if the whole message was recieved * @retval true if the whole message was received
* @retval false if the whole message was not recieved * @retval false if the whole message was not received
*/ */
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer) bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer)
{ {
int rc; int rc;
/* get the first sizeof(unint32_t) bytes of the message
* to either match this with a posted recieve, or to create
* message
* then use this information to allocate an array of size 2
* of iovecs and a buffer for the second part large enough to hold the
* whole message */
while(1) { while(1) {
rc = readv(peer->peer_sd, msg->msg_rwptr, msg->msg_rwcnt); rc = readv(peer->peer_sd, msg->msg_rwptr, msg->msg_rwnum);
if(rc <= 0) { if(rc <= 0) {
if(errno == EINTR) if(errno == EINTR)
continue; continue;
else if (errno == EAGAIN) else if (errno == EAGAIN)
return false; return false;
else { else {
close(peer->peer_sd); mca_oob_tcp_peer_close(peer);
peer->peer_state = MCA_OOB_TCP_CLOSED;
return false; return false;
} }
} }
msg->msg_state += rc;
do { do {
if(rc < msg->msg_rwptr->iov_len) { if(rc < msg->msg_rwptr->iov_len) {
msg->msg_rwptr->iov_len -= rc; msg->msg_rwptr->iov_len -= rc;
@ -148,14 +142,114 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
break; break;
} else { } else {
rc -= msg->msg_rwptr->iov_len; rc -= msg->msg_rwptr->iov_len;
(msg->msg_rwcnt)--; (msg->msg_rwnum)--;
(msg->msg_rwptr)++; (msg->msg_rwptr)++;
if(0 == msg->msg_rwcnt) { if(0 == msg->msg_rwnum) {
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
return true; return true;
} }
} }
} while(msg->msg_rwcnt); } while(msg->msg_rwnum);
} }
} }
/**
* Called to copy the results of a message into user supplied iovec array.
* @param msg (IN) Message send that is in progress.
* @param iov (IN) Iovec array of user supplied buffers.
* @retval count Number of elements in iovec array.
*/
int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int count)
{
int i;
const struct iovec *src = msg->msg_rwiov;
const struct iovec *dst = iov;
unsigned char* src_ptr = (unsigned char*)src->iov_base;
size_t src_len = src->iov_len;
int src_cnt = 0;
int rc = 0;
for(i=0; i<count; i++) {
unsigned char* dst_ptr = (unsigned char*)dst->iov_base;
size_t dst_len = dst->iov_len;
while(dst_len > 0) {
size_t len = (dst_len <= src_len) ? dst_len : src_len;
memcpy(dst_ptr, src_ptr, len);
rc += len;
dst_ptr += len;
dst_len -= len;
src_ptr += len;
src_len -= len;
if(src_len == 0) {
if(++src_cnt == count)
return rc;
src++;
src_ptr = src->iov_base;
src_len = src->iov_len;
}
}
dst++;
}
return rc;
}
/**
* Match name to a message that has been received asynchronously (unexpected).
*
* @param name (IN) Name associated with peer or wildcard to match first posted recv.
* @return msg Matched message or NULL.
*
* Note - this routine requires the caller to be holding the module lock.
*/
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, int tag)
{
mca_oob_tcp_msg_t* msg;
for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_recv);
msg != (mca_oob_tcp_msg_t*) ompi_list_get_end(&mca_oob_tcp_component.tcp_msg_recv);
msg = (mca_oob_tcp_msg_t*) ompi_list_get_next(msg)) {
if((0 == ompi_process_name_compare(name,MCA_OOB_NAME_ANY) ||
(0 == ompi_process_name_compare(name, &msg->msg_peer)))) {
if (tag == MCA_OOB_TAG_ANY || tag == msg->msg_hdr.msg_tag) {
return msg;
}
}
}
return NULL;
}
/**
* Match name to a posted recv request.
*
* @param name (IN) Name associated with peer or wildcard to match first posted recv.
* @return msg Matched message or NULL.
*
* Note - this routine requires the caller to be holding the module lock.
*/
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(const ompi_process_name_t* name, int tag, bool peek)
{
mca_oob_tcp_msg_t* msg;
for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_post);
msg != (mca_oob_tcp_msg_t*) ompi_list_get_end(&mca_oob_tcp_component.tcp_msg_post);
msg = (mca_oob_tcp_msg_t*) ompi_list_get_next(msg)) {
if((0 == ompi_process_name_compare(&msg->msg_peer,MCA_OOB_NAME_ANY) ||
(0 == ompi_process_name_compare(&msg->msg_peer,name)))) {
if (msg->msg_hdr.msg_tag == MCA_OOB_TAG_ANY || msg->msg_hdr.msg_tag == tag) {
if((msg->msg_flags & MCA_OOB_PEEK) == 0 || peek) {
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_post, &msg->super);
return msg;
} else {
return NULL;
}
}
}
}
return NULL;
}

Просмотреть файл

@ -10,29 +10,43 @@
#define _MCA_OOB_TCP_MESSAGE_H_ #define _MCA_OOB_TCP_MESSAGE_H_
#include "class/ompi_list.h" #include "class/ompi_list.h"
#include "mca/oob/tcp/oob_tcp_peer.h"
#include "mca/oob/oob.h" #include "mca/oob/oob.h"
#include "oob_tcp_peer.h"
#include "oob_tcp_hdr.h"
#include <errno.h> #include <errno.h>
struct mca_oob_tcp_peer_t; struct mca_oob_tcp_peer_t;
#define MCA_OOB_TCP_IOV_MAX 16
typedef enum { MCA_OOB_TCP_POSTED, MCA_OOB_TCP_UNEXPECTED } mca_oob_tcp_type_t;
/** /**
* describes each message being progressed. * describes each message being progressed.
*/ */
struct mca_oob_tcp_msg_t { struct mca_oob_tcp_msg_t {
ompi_list_item_t super; /**< make it so we can put this on a list */ ompi_list_item_t super; /**< allow this item to be put on a list */
int msg_state; /**< the amount sent or recieved or errno */ mca_oob_tcp_type_t msg_type; /**< posted receive or unexpected */
uint32_t msg_size; /**< the total size of the message */ int msg_flags; /**< flags to send/recv */
const struct iovec * msg_user; /**< the data of the message */ int msg_rc; /**< the return code for the send/recv (amount sent/recvd or errno) */
struct iovec * msg_iov; /**< copy of iovec array - not data */ mca_oob_tcp_hdr_t msg_hdr; /**< header used to convey message properties to peer */
struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */ const struct iovec* msg_uiov; /**< the user supplied iovec array */
int msg_rwcnt; /**< number of iovecs left for read/write */ int msg_ucnt; /**< the number of items in the user iovec array */
int msg_count; /**< the number of items in the iovec array */ struct iovec * msg_rwiov; /**< copy of iovec array - not data */
mca_oob_callback_fn_t msg_cbfunc; /**< the callback function for the send/recieve */ struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */
void *msg_cbdata; /**< the data for the callback fnuction */ int msg_rwnum; /**< number of iovecs left for read/write */
bool msg_complete; /**< whether the message is done sending or not */ int msg_rwcnt; /**< total number of iovecs for read/write */
ompi_process_name_t * msg_peer; /**< the name of the peer */ void* msg_rwbuf; /**< optional buffer for send/recv */
ompi_mutex_t msg_lock; /**< lock for the condition variable */ mca_oob_callback_fn_t msg_cbfunc; /**< the callback function for the send/receive */
ompi_condition_t msg_condition; /**< the message condition */ void * msg_cbdata; /**< the data for the callback fnuction */
bool msg_complete; /**< whether the message is done sending or not */
ompi_process_name_t msg_peer; /**< the name of the peer */
ompi_mutex_t msg_lock; /**< lock for the condition variable */
ompi_condition_t msg_condition; /**< condition variable for completion */
struct iovec msg_iov[MCA_OOB_TCP_IOV_MAX]; /** preallocate space for iovec array */
}; };
/** /**
* Convenience typedef * Convenience typedef
*/ */
@ -55,15 +69,18 @@ OBJ_CLASS_DECLARATION(mca_oob_tcp_msg_t);
*/ */
#define MCA_OOB_TCP_MSG_RETURN(msg) \ #define MCA_OOB_TCP_MSG_RETURN(msg) \
{ \ { \
/* frees the iovec allocated during the send/recieve */ \ /* frees the iovec allocated during the send/receive */ \
if(NULL != msg->msg_iov) free(msg->msg_iov); \ if(NULL != msg->msg_rwiov) \
mca_oob_tcp_msg_iov_return(msg,msg->msg_rwiov); \
if(NULL != msg->msg_rwbuf) \
free(msg->msg_rwbuf); \
OMPI_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_msgs, (ompi_list_item_t*)msg); \ OMPI_FREE_LIST_RETURN(&mca_oob_tcp_component.tcp_msgs, (ompi_list_item_t*)msg); \
} }
/** /**
* Wait for a msg to complete. * Wait for a msg to complete.
* @param msg (IN) Message to wait on. * @param msg (IN) Message to wait on.
* @param size (OUT) Number of bytes delivered. * @param size (OUT) Number of bytes delivered.
* @retval OMPI_SUCCESS or error code on failure. * @retval OMPI_SUCCESS or error code on failure.
*/ */
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size); int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size);
@ -72,16 +89,25 @@ int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* size);
* Signal that a message has completed. Wakes up any pending threads (for blocking send) * Signal that a message has completed. Wakes up any pending threads (for blocking send)
* or invokes callbacks for non-blocking case. * or invokes callbacks for non-blocking case.
* @param msg (IN) Message send/recv that has completed. * @param msg (IN) Message send/recv that has completed.
* @param peer (IN) The peer the send/recieve was from * @param peer (IN) The peer the send/receive was from
* @retval OMPI_SUCCESS or error code on failure. * @retval OMPI_SUCCESS or error code on failure.
*/ */
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, ompi_process_name_t * peer); int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, ompi_process_name_t * peer);
/**
* Called to copy the results of a message into user supplied iovec array.
* @param msg (IN) Message send that is in progress.
* @param iov (IN) Iovec array of user supplied buffers.
* @retval count Number of elements in iovec array.
*/
int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int count);
/** /**
* Called asynchronously to progress sending a message from the event library thread. * Called asynchronously to progress sending a message from the event library thread.
* @param msg (IN) Message send that is in progress. * @param msg (IN) Message send that is in progress.
* @param sd (IN) Socket descriptor to use for send. * @param sd (IN) Socket descriptor to use for send.
* @retval bool Bool flag indicating wether operation has completed. * @retval Number of bytes copied.
*/ */
bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer); bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);
@ -94,5 +120,63 @@ bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer); bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);
/**
* Match name to a message that has been received asynchronously (unexpected).
*
* @param name (IN) Name associated with peer or wildcard to match first posted recv.
* @param tag (IN) Message tag.
* @return msg Matched message or NULL.
*
* Note - this routine requires the caller to be holding the module lock.
*/
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t*, int tag);
/**
* Match name to a posted recv request.
*
* @param name (IN) Name associated with peer or wildcard to match first posted recv.
* @param tag (IN) Message tag.
* @param peek (IN) Match message with MCA_OOB_PEEK flag set.
* @return msg Matched message or NULL.
*
* Note - this routine requires the caller to be holding the module lock.
*/
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(const ompi_process_name_t*, int tag, bool peek);
/**
* Allocate space for iovec array - if the request number of elements is less than
* MCA_OOB_TCP_IOV_MAX then use the array allocated along w/ the message - otherwise
* allocate count elements.
*
* @param msg (IN) Message to allocate array.
* @return Array of iovec elements.
*
*/
static inline struct iovec* mca_oob_tcp_msg_iov_alloc(mca_oob_tcp_msg_t* msg, int count)
{
if(count <= MCA_OOB_TCP_IOV_MAX)
return msg->msg_iov;
return malloc(sizeof(struct iovec) * count);
}
/**
* Release resource held by iovec array if this is not part of the message.
*
* @param msg (IN) Message to allocate array.
* @param iov (IN) Iovec array to return.
*
*/
static inline void mca_oob_tcp_msg_iov_return(mca_oob_tcp_msg_t* msg, struct iovec* iov)
{
if(iov != msg->msg_iov)
free(iov);
}
#endif /* _MCA_OOB_TCP_MESSAGE_H_ */ #endif /* _MCA_OOB_TCP_MESSAGE_H_ */

Просмотреть файл

@ -12,7 +12,6 @@
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer); static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer);
static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer); static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer); static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer); static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t* peer); static void mca_oob_tcp_peer_destruct(mca_oob_tcp_peer_t* peer);
@ -82,7 +81,8 @@ static int mca_oob_tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
} }
/* /*
* * Initiate the appropriate action based on the state of the connection
* to the peer.
* *
*/ */
int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg) int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
@ -154,11 +154,17 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bo
return NULL; return NULL;
} }
/* initialize peer state */
peer->peer_name = *name; peer->peer_name = *name;
peer->peer_sd = -1;
peer->peer_state = MCA_OOB_TCP_CLOSED;
/****** /******
* need to add the peer's address to the structure * need to add the peer's address to the structure
******/ ******/
peer->peer_addr.sin_family = AF_INET;
peer->peer_addr.sin_port = htons(5000+peer->peer_name.vpid);
peer->peer_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree,
(ompi_process_name_t *) name, peer)) { (ompi_process_name_t *) name, peer)) {
@ -168,10 +174,11 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bo
} }
return NULL; return NULL;
} }
ompi_list_prepend(&mca_oob_tcp_component.tcp_peer_list, (ompi_list_item_t *) peer);
/* if the peer list is over the maximum size, remove one unsed peer */ /* if the peer list is over the maximum size, remove one unsed peer */
ompi_list_prepend(&mca_oob_tcp_component.tcp_peer_list, (ompi_list_item_t *) peer);
if(ompi_list_get_size(&mca_oob_tcp_component.tcp_peer_list) > if(ompi_list_get_size(&mca_oob_tcp_component.tcp_peer_list) >
mca_oob_tcp_component.tcp_cache_size) { mca_oob_tcp_component.tcp_peer_limit) {
old = (mca_oob_tcp_peer_t *) old = (mca_oob_tcp_peer_t *)
ompi_list_get_last(&mca_oob_tcp_component.tcp_peer_list); ompi_list_get_last(&mca_oob_tcp_component.tcp_peer_list);
while(1) { while(1) {
@ -183,7 +190,7 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bo
break; break;
} else { } else {
old = (mca_oob_tcp_peer_t *) ompi_list_get_prev(old); old = (mca_oob_tcp_peer_t *) ompi_list_get_prev(old);
if(NULL == old) { if(ompi_list_get_begin(&mca_oob_tcp_component.tcp_peer_list) == (ompi_list_item_t*)old) {
/* we tried, but we couldn't find one that was valid to get rid /* we tried, but we couldn't find one that was valid to get rid
* of. Oh well. */ * of. Oh well. */
break; break;
@ -208,7 +215,6 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bo
static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer) static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
{ {
int rc,flags; int rc,flags;
peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0); peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);
if (peer->peer_sd < 0) { if (peer->peer_sd < 0) {
peer->peer_retries++; peer->peer_retries++;
@ -250,6 +256,7 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
return rc; return rc;
} }
/* /*
* Check the status of the connection. If the connection failed, will retry * Check the status of the connection. If the connection failed, will retry
* later. Otherwise, send this processes identifier to the peer on the * later. Otherwise, send this processes identifier to the peer on the
@ -259,21 +266,30 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
{ {
int so_error = 0; int so_error = 0;
ompi_socklen_t so_length = sizeof(so_error); ompi_socklen_t so_length = sizeof(so_error);
/* unregister from receiving event notifications */ /* unregister from receiving event notifications */
ompi_event_del(&peer->peer_send_event); ompi_event_del(&peer->peer_send_event);
/* check connect completion status */ /* check connect completion status */
if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_ERROR, &so_error, &so_length) < 0) { if(getsockopt(peer->peer_sd, SOL_SOCKET, SO_ERROR, &so_error, &so_length) < 0) {
ompi_output(0, "mca_ptl_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n", errno); ompi_output(0, "mca_oob_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n", errno);
mca_oob_tcp_peer_close(peer); mca_oob_tcp_peer_close(peer);
return; return;
} }
if(so_error == EINPROGRESS) { if(so_error == EINPROGRESS) {
ompi_event_add(&peer->peer_send_event, 0); ompi_event_add(&peer->peer_send_event, 0);
return; return;
} } else if (so_error == ECONNREFUSED) {
if(so_error != 0) { if(peer->peer_retries++ > mca_oob_tcp_component.tcp_peer_retries) {
ompi_output(0, "mca_oob_tcp_peer_complete_connect: unable to contact peer after %d retries\n", peer->peer_retries);
mca_oob_tcp_peer_close(peer);
return;
}
mca_oob_tcp_peer_close(peer);
sleep(1);
mca_oob_tcp_peer_start_connect(peer);
return;
} else if(so_error != 0) {
ompi_output(0, "mca_oob_tcp_peer_complete_connect: connect() failed with errno=%d\n", so_error); ompi_output(0, "mca_oob_tcp_peer_complete_connect: connect() failed with errno=%d\n", so_error);
mca_oob_tcp_peer_close(peer); mca_oob_tcp_peer_close(peer);
return; return;
@ -308,9 +324,10 @@ static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer)
* and update the peer state to reflect the connection has * and update the peer state to reflect the connection has
* been closed. * been closed.
*/ */
static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer) void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
{ {
if(peer->peer_sd >= 0) { if(peer->peer_state != MCA_OOB_TCP_CLOSED &&
peer->peer_sd >= 0) {
ompi_event_del(&peer->peer_recv_event); ompi_event_del(&peer->peer_recv_event);
ompi_event_del(&peer->peer_send_event); ompi_event_del(&peer->peer_send_event);
close(peer->peer_sd); close(peer->peer_sd);
@ -327,8 +344,9 @@ static void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer) static int mca_oob_tcp_peer_send_connect_ack(mca_oob_tcp_peer_t* peer)
{ {
/* send process identifier to remote peer */ /* send process identifier to remote peer */
if(mca_oob_tcp_peer_send_blocking( peer, &mca_oob_base_self, sizeof(mca_oob_base_self)) != ompi_process_name_t guid = mca_oob_name_self;
sizeof(mca_oob_base_self)) { OMPI_PROCESS_NAME_HTON(guid);
if(mca_oob_tcp_peer_send_blocking( peer, &guid, sizeof(guid)) != sizeof(guid)) {
return OMPI_ERR_UNREACH; return OMPI_ERR_UNREACH;
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -345,6 +363,7 @@ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
if((mca_oob_tcp_peer_recv_blocking(peer, &guid, sizeof(ompi_process_name_t))) != sizeof(ompi_process_name_t)) { if((mca_oob_tcp_peer_recv_blocking(peer, &guid, sizeof(ompi_process_name_t))) != sizeof(ompi_process_name_t)) {
return OMPI_ERR_UNREACH; return OMPI_ERR_UNREACH;
} }
OMPI_PROCESS_NAME_NTOH(guid);
/* compare this to the expected values */ /* compare this to the expected values */
if(memcmp(&peer->peer_name, &guid, sizeof(ompi_process_name_t)) != 0) { if(memcmp(&peer->peer_name, &guid, sizeof(ompi_process_name_t)) != 0) {
@ -417,41 +436,159 @@ static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data,
return cnt; return cnt;
} }
/*
* Progress a completed recv:
* (1) signal a posted recv as complete
* (2) queue an unexpected message in the recv list
*/
static void mca_oob_tcp_peer_recv_progress(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t *msg)
{
/* was this a posted recv? */
if(msg->msg_type == MCA_OOB_TCP_POSTED) {
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
} else {
/* if not attempt to match unexpected message to a posted recv */
mca_oob_tcp_msg_t* post;
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
post = mca_oob_tcp_msg_match_post(&peer->peer_name, msg->msg_hdr.msg_tag,true);
if(NULL != post) {
/* copy msg data into posted recv */
post->msg_rc = mca_oob_tcp_msg_copy(msg, post->msg_uiov, post->msg_ucnt);
if(post->msg_flags & MCA_OOB_TRUNC) {
int i, size = 0;
for(i=0; i<msg->msg_rwcnt; i++)
size += msg->msg_rwiov[i].iov_len;
post->msg_rc = size;
}
if(post->msg_flags & MCA_OOB_PEEK) {
/* will need message for actual receive */
ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, &msg->super);
} else {
MCA_OOB_TCP_MSG_RETURN(msg);
}
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
mca_oob_tcp_msg_complete(post, &peer->peer_name);
} else {
ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t*)msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
}
}
}
/*
* Start receiving a new message.
* (1) receive header
* (2) attempt to match posted receives
* (3) if a posted receive is available - receive into users buffer
* (4) otherwise, allocate a new message and buffer for receive
*/
static void mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
{
mca_oob_tcp_msg_t* msg;
mca_oob_tcp_hdr_t hdr;
uint32_t size;
/* blocking receive of the message header */
if(mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr))
return;
size = ntohl(hdr.msg_size);
/* attempt to match posted receive
* however - dont match message w/ peek attribute, as we need to
* queue the message anyway to match subsequent recv.
*/
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
msg = mca_oob_tcp_msg_match_post(&peer->peer_name, hdr.msg_tag, false);
if(NULL != msg) {
uint32_t posted_size = 0;
int i;
/* setup buffer for receive */
for(i=0; i<msg->msg_ucnt; i++)
posted_size += msg->msg_uiov[i].iov_len;
/* allocate an additional buffer to receive entire message */
if(posted_size < size) {
uint32_t alloc_size = size - posted_size;
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,msg->msg_ucnt+1);
memcpy(msg->msg_rwiov, msg->msg_uiov, msg->msg_ucnt * sizeof(struct iovec));
msg->msg_rwbuf = malloc(alloc_size);
msg->msg_rwiov[msg->msg_ucnt].iov_base = msg->msg_rwbuf;
msg->msg_rwiov[msg->msg_ucnt].iov_len = alloc_size;
msg->msg_rwnum = msg->msg_ucnt+1;
} else {
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,msg->msg_ucnt);
memcpy(msg->msg_rwiov, msg->msg_uiov, msg->msg_ucnt * sizeof(struct iovec));
msg->msg_rwnum = msg->msg_ucnt;
}
} else {
/* allocate a new message along with buffer */
int rc;
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
OMPI_THREAD_UNLOCK(&peer->peer_lock);
return;
}
msg->msg_type = MCA_OOB_TCP_UNEXPECTED;
msg->msg_rc = 0;
msg->msg_flags = 0;
msg->msg_peer = peer->peer_name;
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,1);
msg->msg_rwbuf = malloc(size);
msg->msg_rwiov->iov_base = msg->msg_rwbuf;
msg->msg_rwiov->iov_len = size;
msg->msg_rwcnt = msg->msg_rwnum = 1;
}
msg->msg_rwptr = msg->msg_rwiov;
msg->msg_hdr = hdr;
/* if receive of message data completed - queue the receive message */
if(mca_oob_tcp_msg_recv_handler(msg, peer)) {
mca_oob_tcp_peer_recv_progress(peer, msg);
} else {
/* continue processing until complete */
peer->peer_recv_msg = msg;
}
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
}
/*
* Dispatch to the appropriate action routine based on the state
* of the connection with the peer.
*/
static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user) static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user)
{ {
mca_oob_tcp_peer_t* peer = user; mca_oob_tcp_peer_t* peer = user;
OMPI_THREAD_LOCK(&peer->peer_lock); OMPI_THREAD_LOCK(&peer->peer_lock);
switch(peer->peer_state) { switch(peer->peer_state) {
case MCA_OOB_TCP_CONNECT_ACK: case MCA_OOB_TCP_CONNECT_ACK:
{ {
mca_oob_tcp_peer_recv_connect_ack(peer); mca_oob_tcp_peer_recv_connect_ack(peer);
break; break;
} }
case MCA_OOB_TCP_CONNECTED: case MCA_OOB_TCP_CONNECTED:
{ {
mca_oob_tcp_msg_t* msg = peer->peer_recv_msg; if(NULL == peer->peer_recv_msg) {
if(NULL == msg) { mca_oob_tcp_peer_recv_start(peer);
int rc; } else if (mca_oob_tcp_msg_recv_handler(peer->peer_recv_msg, peer)) {
MCA_OOB_TCP_MSG_ALLOC(msg, rc); mca_oob_tcp_peer_recv_progress(peer, peer->peer_recv_msg);
if(NULL == msg) { peer->peer_recv_msg = NULL;
OMPI_THREAD_UNLOCK(&peer->peer_lock);
return;
} }
break;
} }
default:
/* check for completion of non-blocking recv on the current fragment */
if(mca_oob_tcp_msg_recv_handler(msg, peer) == false)
peer->peer_recv_msg = msg;
else
peer->peer_recv_msg = 0;
break;
}
default:
{ {
ompi_output(0, "mca_oob_tcp_peer_recv_handler: invalid socket state(%d)", peer->peer_state); ompi_output(0, "mca_oob_tcp_peer_recv_handler: invalid socket state(%d)", peer->peer_state);
mca_oob_tcp_peer_close(peer); mca_oob_tcp_peer_close(peer);
break; break;
} }
} }
OMPI_THREAD_UNLOCK(&peer->peer_lock); OMPI_THREAD_UNLOCK(&peer->peer_lock);
@ -471,14 +608,16 @@ static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user)
break; break;
case MCA_OOB_TCP_CONNECTED: case MCA_OOB_TCP_CONNECTED:
{ {
/* complete the current send */
do { do {
/* complete the current send */
mca_oob_tcp_msg_t* msg = peer->peer_send_msg; mca_oob_tcp_msg_t* msg = peer->peer_send_msg;
if(mca_oob_tcp_msg_send_handler(msg, peer) == false) { if(mca_oob_tcp_msg_send_handler(msg, peer)) {
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
} else {
break; break;
} }
/* if required - update request status and release fragment */
/* progress any pending sends */ /* if current completed - progress any pending sends */
peer->peer_send_msg = (mca_oob_tcp_msg_t*) peer->peer_send_msg = (mca_oob_tcp_msg_t*)
ompi_list_remove_first(&peer->peer_send_queue); ompi_list_remove_first(&peer->peer_send_queue);
} while (NULL != peer->peer_send_msg); } while (NULL != peer->peer_send_msg);
@ -551,3 +690,36 @@ static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg)
ompi_output(0, buff); ompi_output(0, buff);
} }
/*
* Accept incoming connection - if not already connected.
*/
bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
{
OMPI_THREAD_LOCK(&peer->peer_lock);
if ((peer->peer_state == MCA_OOB_TCP_CLOSED) ||
(peer->peer_state != MCA_OOB_TCP_CONNECTED &&
ompi_process_name_compare(&peer->peer_name, MCA_OOB_NAME_SELF) < 0)) {
mca_oob_tcp_peer_close(peer);
peer->peer_sd = sd;
mca_oob_tcp_peer_event_init(peer);
if(mca_oob_tcp_peer_send_connect_ack(peer) != OMPI_SUCCESS) {
mca_oob_tcp_peer_close(peer);
OMPI_THREAD_UNLOCK(&peer->peer_lock);
return false;
}
ompi_event_add(&peer->peer_recv_event, 0);
mca_oob_tcp_peer_connected(peer);
#if OMPI_ENABLE_DEBUG
mca_oob_tcp_peer_dump(peer, "accepted");
#endif
OMPI_THREAD_UNLOCK(&peer->peer_lock);
return true;
}
OMPI_THREAD_UNLOCK(&peer->peer_lock);
return false;
}

Просмотреть файл

@ -30,7 +30,7 @@ typedef enum {
/** /**
* This structire describes a peer * This structure describes a peer
*/ */
struct mca_oob_tcp_peer_t { struct mca_oob_tcp_peer_t {
ompi_list_item_t super; /**< allow this to be on a list */ ompi_list_item_t super; /**< allow this to be on a list */
@ -41,13 +41,19 @@ struct mca_oob_tcp_peer_t {
int peer_sd; /**< socket descriptor of the connection */ int peer_sd; /**< socket descriptor of the connection */
ompi_event_t peer_send_event; /**< registration with event thread for send events */ ompi_event_t peer_send_event; /**< registration with event thread for send events */
ompi_event_t peer_recv_event; /**< registration with event thread for recv events */ ompi_event_t peer_recv_event; /**< registration with event thread for recv events */
ompi_mutex_t peer_lock; /**< make sure only one thread accesses critical data structures */ ompi_mutex_t peer_lock; /**< protect critical data structures */
ompi_list_t peer_send_queue; /**< list of messages to send */ ompi_list_t peer_send_queue; /**< list of messages to send */
mca_oob_tcp_msg_t *peer_send_msg; /**< current send in progress */ mca_oob_tcp_msg_t *peer_send_msg; /**< current send in progress */
mca_oob_tcp_msg_t *peer_recv_msg; /**< current recv in progress */ mca_oob_tcp_msg_t *peer_recv_msg; /**< current recv in progress */
}; };
typedef struct mca_oob_tcp_peer_t mca_oob_tcp_peer_t; typedef struct mca_oob_tcp_peer_t mca_oob_tcp_peer_t;
/*
* Class declaration.
*/
OBJ_CLASS_DECLARATION(mca_oob_tcp_peer_t);
/** /**
* Get a new peer data structure * Get a new peer data structure
*/ */
@ -92,6 +98,22 @@ mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(const ompi_process_name_t* peer_name
*/ */
int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg); int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg);
/**
* Connection request for this peer. Check the status of our connection
* before accepting the peers.
*
* @param peer The peer process.
* @param sd Incoming connection request.
*/
bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd);
/**
* Cleanup/close the connection to the peer.
*
* @param peer The peer process.
*/
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer);
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }
#endif #endif

Просмотреть файл

@ -7,89 +7,79 @@
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param types (IN) Parallel array to iovecs describing data type of each iovec element. * @param types (IN) Parallel array to iovecs describing data type of each iovec element.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User supplied tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
* iovec array without removing the message from the queue. * iovec array without removing the message from the queue.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error or number of bytes actually received.
*/ */
int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int count, int flags) int mca_oob_tcp_recv(
ompi_process_name_t* peer,
const struct iovec *iov,
int count,
int tag,
int flags)
{ {
mca_oob_tcp_msg_t * msg = (mca_oob_tcp_msg_t *) mca_oob_tcp_msg_t *msg;
ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_recv); int i, rc, size = 0;
int i, amount, read, size = 0;
char * base; /* lock the tcp struct */
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
/* check to see if a matching receive is on the list */
msg = mca_oob_tcp_msg_match_recv(peer, tag);
if(NULL != msg) {
/* if we are just doing peek, return bytes without dequeing message */
if(msg->msg_rc < 0)
return msg->msg_rc;
rc = mca_oob_tcp_msg_copy(msg, iov, count);
if(rc >= 0 && MCA_OOB_TRUNC & flags) {
rc = 0;
for(i=0; i<msg->msg_rwcnt; i++)
rc += msg->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & flags) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return rc;
}
/* otherwise dequeue the message and return to free list */
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg);
MCA_OOB_TCP_MSG_RETURN(msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return rc;
}
/* the message has not already been received. So we add it to the receive queue */
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return rc;
}
/* determine overall size of user supplied buffer */
for(i = 0; i < count; i++) { for(i = 0; i < count; i++) {
size += iov[i].iov_len; size += iov[i].iov_len;
} }
/* lock the tcp struct */
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
/* check to see if a matching recieve is on the list */
while(NULL != msg) {
if(MCA_OOB_BASE_ANY == peer ||
(0 == memcmp(peer, &msg->msg_peer, sizeof(ompi_process_name_t)))) {
/* if we are just doing peek, all we need to do is match the peer name,
* nothing else. */
if(MCA_OOB_PEEK & flags) {
base = (char *) msg->msg_iov[1].iov_base;
size = msg->msg_iov[i].iov_len;
read = 0;
for(i = 0; i < count; i++) {
amount = ((iov[i].iov_len < size) ? iov[i].iov_len : size);
memcpy(iov[i].iov_base, base, amount);
size -= amount;
base +=amount;
read += amount;
}
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return read;
}
/* what are we going to define as matching? we don't know the number
* of iovecs we got off the network, we only know the total size
* For now, we wil just use the size == the size we have available */
if(size == msg->msg_size) {
/* match */
base = (char *) msg->msg_iov[1].iov_base;
size = msg->msg_iov[i].iov_len;
read = 0;
for(i = 0; i < count; i++) {
memcpy(iov[i].iov_base, base, iov[i].iov_len);
base +=amount;
read += amount;
}
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv,
(ompi_list_item_t *) msg);
MCA_OOB_TCP_MSG_RETURN(msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return read;
}
msg = (mca_oob_tcp_msg_t *) ompi_list_get_next(msg);
}
}
/* the message has not already been recieved. So we add it to the recieve queue */
MCA_OOB_TCP_MSG_ALLOC(msg, i);
if(NULL == msg) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return i;
}
/* fill in the struct */ /* fill in the struct */
msg->msg_state = 0; msg->msg_type = MCA_OOB_TCP_POSTED;
msg->msg_size = size; msg->msg_rc = 0;
msg->msg_user = iov; msg->msg_flags = flags;
msg->msg_iov = NULL; msg->msg_uiov = iov;
msg->msg_rwcnt = msg->msg_count = count; msg->msg_ucnt = count;
msg->msg_cbfunc = NULL; msg->msg_cbfunc = NULL;
msg->msg_cbdata = NULL;
msg->msg_complete = false; msg->msg_complete = false;
if(0 == memcmp(MCA_OOB_BASE_ANY, peer, sizeof(ompi_process_name_t))) { msg->msg_peer = *peer;
msg->msg_peer = MCA_OOB_BASE_ANY; ompi_list_append(&mca_oob_tcp_component.tcp_msg_post, (ompi_list_item_t *) msg);
} else {
mca_oob_tcp_peer_t * other = mca_oob_tcp_peer_lookup(peer, false);
msg->msg_peer = &other->peer_name;
}
/* add to list */
ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
/* wait for the recieve to complete */
mca_oob_tcp_msg_wait(msg, &read); /* wait for the receive to complete */
return read; mca_oob_tcp_msg_wait(msg, &rc);
MCA_OOB_TCP_MSG_RETURN(msg);
return rc;
} }
/* /*
@ -98,91 +88,79 @@ int mca_oob_tcp_recv(ompi_process_name_t* peer, const struct iovec *iov, int cou
* @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive. * @param peer (IN) Opaque name of peer process or MCA_OOB_BASE_ANY for wildcard receive.
* @param msg (IN) Array of iovecs describing user buffers and lengths. * @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array. * @param count (IN) Number of elements in iovec array.
* @param tag (IN) User supplied tag for matching send/recv.
* @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue,
* @param cbfunc (IN) Callback function on recv completion. * @param cbfunc (IN) Callback function on recv completion.
* @param cbdata (IN) User data that is passed to callback function. * @param cbdata (IN) User data that is passed to callback function.
* @return OMPI error code (<0) on error or number of bytes actually received. * @return OMPI error code (<0) on error.
*/ */
int mca_oob_tcp_recv_nb(ompi_process_name_t* peer, const struct iovec* iov, int count, int mca_oob_tcp_recv_nb(
int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) ompi_process_name_t* peer,
const struct iovec* iov,
int count,
int tag,
int flags,
mca_oob_callback_fn_t cbfunc,
void* cbdata)
{ {
mca_oob_tcp_msg_t * msg = (mca_oob_tcp_msg_t *) mca_oob_tcp_msg_t *msg;
ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_recv); int i, rc, size = 0;
int i, amount, read, size = 0;
char * base; /* lock the tcp struct */
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
/* check to see if a matching receive is on the list */
msg = mca_oob_tcp_msg_match_recv(peer, tag);
if(NULL != msg) {
if(msg->msg_rc < 0)
return msg->msg_rc;
/* if we are just doing peek, return bytes without dequeing message */
rc = mca_oob_tcp_msg_copy(msg, iov, count);
if(rc >= 0 && MCA_OOB_TRUNC & flags) {
rc = 0;
for(i=0; i<msg->msg_rwcnt; i++)
rc += msg->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & flags) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata);
return 0;
}
/* otherwise dequeue the message and return to free list */
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata);
MCA_OOB_TCP_MSG_RETURN(msg);
return 0;
}
/* the message has not already been received. So we add it to the receive queue */
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return rc;
}
/* determine overall size of user supplied buffer */
for(i = 0; i < count; i++) { for(i = 0; i < count; i++) {
size += iov[i].iov_len; size += iov[i].iov_len;
} }
/* lock the tcp struct */
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
/* check to see if a matching recieve is on the list */
while(NULL != msg) {
if(MCA_OOB_BASE_ANY == peer ||
(0 == memcmp(peer, &msg->msg_peer, sizeof(ompi_process_name_t)))) {
/* if we are just doing peek, all we need to do is match the peer name,
* nothing else. */
if(MCA_OOB_PEEK & flags) {
base = (char *) msg->msg_iov[1].iov_base;
size = msg->msg_iov[i].iov_len;
read = 0;
for(i = 0; i < count; i++) {
amount = ((iov[i].iov_len < size) ? iov[i].iov_len : size);
memcpy(iov[i].iov_base, base, amount);
size -= amount;
base +=amount;
read += amount;
}
cbfunc(read, peer, iov, count, cbdata);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return read;
}
/* what are we going to define as matching? we don't know the number
* of iovecs we got off the network, we only know the total size
* For now, we wil just use the size == the size we have available */
if(size == msg->msg_size) {
/* match */
base = (char *) msg->msg_iov[1].iov_base;
size = msg->msg_iov[i].iov_len;
read = 0;
for(i = 0; i < count; i++) {
memcpy(iov[i].iov_base, base, iov[i].iov_len);
base +=amount;
read += amount;
}
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv,
(ompi_list_item_t *) msg);
MCA_OOB_TCP_MSG_RETURN(msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
cbfunc(read, peer, iov, count, cbdata);
return read;
}
msg = (mca_oob_tcp_msg_t *) ompi_list_get_next(msg);
}
}
/* the message has not already been recieved. So we add it to the recieve queue */
MCA_OOB_TCP_MSG_ALLOC(msg, i);
if(NULL == msg) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return i;
}
/* fill in the struct */ /* fill in the struct */
msg->msg_state = 0; msg->msg_type = MCA_OOB_TCP_POSTED;
msg->msg_size = size; msg->msg_rc = 0;
msg->msg_user = iov; msg->msg_flags = flags;
msg->msg_iov = NULL; msg->msg_uiov = iov;
msg->msg_rwcnt = msg->msg_count = count; msg->msg_ucnt = count;
msg->msg_cbfunc = cbfunc; msg->msg_cbfunc = cbfunc;
msg->msg_cbdata = cbdata; msg->msg_cbdata = cbdata;
if(0 == memcmp(MCA_OOB_BASE_ANY, peer, sizeof(ompi_process_name_t))) {
msg->msg_peer = MCA_OOB_BASE_ANY;
} else {
mca_oob_tcp_peer_t * other = mca_oob_tcp_peer_lookup(peer, false);
msg->msg_peer = &other->peer_name;
}
msg->msg_complete = false; msg->msg_complete = false;
/* add to list */ msg->msg_peer = *peer;
ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg); ompi_list_append(&mca_oob_tcp_component.tcp_msg_post, (ompi_list_item_t *) msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock); OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return OMPI_SUCCESS; return 0;
} }

Просмотреть файл

@ -10,11 +10,18 @@
* @return OMPI error code (<0) on error number of bytes actually sent. * @return OMPI error code (<0) on error number of bytes actually sent.
*/ */
int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, int count, int flags) int mca_oob_tcp_send(
const ompi_process_name_t* name,
const struct iovec *iov,
int count,
int tag,
int flags)
{ {
mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name, true); mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name, true);
mca_oob_tcp_msg_t* msg; mca_oob_tcp_msg_t* msg;
int rc, sent; int size;
int rc;
if(NULL == peer) if(NULL == peer)
return OMPI_ERR_UNREACH; return OMPI_ERR_UNREACH;
@ -23,25 +30,32 @@ int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, i
return rc; return rc;
/* calculate the size of the message */ /* calculate the size of the message */
msg->msg_size = sizeof(uint32_t); size = 0;
for(rc = 0; rc < count; rc++) { for(rc = 0; rc < count; rc++) {
msg->msg_size += iov[rc].iov_len; size += iov[rc].iov_len;
} }
/* turn the size to network byte order so there will be no problems */ /* turn the size to network byte order so there will be no problems */
msg->msg_size = htonl(msg->msg_size); msg->msg_hdr.msg_size = htonl(size);
msg->msg_user = iov; msg->msg_hdr.msg_tag = htonl(tag);
/* create one additional iovect that will hold the size of the message */
msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*(count + 1)); /* create one additional iovect that will hold the header */
msg->msg_iov[0].iov_base = &msg->msg_size; msg->msg_type = MCA_OOB_TCP_POSTED;
msg->msg_iov[0].iov_len = sizeof(uint32_t); msg->msg_rc = 0;
msg->msg_rwptr = msg->msg_iov; msg->msg_flags = flags;
msg->msg_count = msg->msg_rwcnt = count + 1; msg->msg_uiov = iov;
memcpy(msg->msg_iov, &(msg->msg_user[1]), sizeof(struct iovec)*count); msg->msg_ucnt = count;
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg, count+1);
msg->msg_rwiov[0].iov_base = &msg->msg_hdr;
msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr);
msg->msg_rwptr = msg->msg_rwiov;
msg->msg_rwcnt = msg->msg_rwnum = count + 1;
memcpy(msg->msg_rwiov+1, msg->msg_uiov, sizeof(struct iovec)*msg->msg_ucnt);
msg->msg_rwbuf = NULL;
msg->msg_cbfunc = NULL; msg->msg_cbfunc = NULL;
msg->msg_cbdata = NULL; msg->msg_cbdata = NULL;
msg->msg_complete = false; msg->msg_complete = false;
msg->msg_peer = &peer->peer_name; msg->msg_peer = peer->peer_name;
msg->msg_state = 0;
rc = mca_oob_tcp_peer_send(peer, msg); rc = mca_oob_tcp_peer_send(peer, msg);
if(rc != OMPI_SUCCESS) { if(rc != OMPI_SUCCESS) {
@ -49,10 +63,12 @@ int mca_oob_tcp_send(const ompi_process_name_t* name, const struct iovec *iov, i
return rc; return rc;
} }
rc = mca_oob_tcp_msg_wait(msg, &sent); rc = mca_oob_tcp_msg_wait(msg, &size);
MCA_OOB_TCP_MSG_RETURN(msg);
if(rc != OMPI_SUCCESS) if(rc != OMPI_SUCCESS)
return rc; return rc;
return sent; size -= sizeof(mca_oob_tcp_hdr_t);
return size;
} }
/* /*
@ -72,13 +88,16 @@ int mca_oob_tcp_send_nb(
const ompi_process_name_t* name, const ompi_process_name_t* name,
const struct iovec* iov, const struct iovec* iov,
int count, int count,
int tag,
int flags, int flags,
mca_oob_callback_fn_t cbfunc, mca_oob_callback_fn_t cbfunc,
void* cbdata) void* cbdata)
{ {
mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name, true); mca_oob_tcp_peer_t* peer = mca_oob_tcp_peer_lookup(name, true);
mca_oob_tcp_msg_t* msg; mca_oob_tcp_msg_t* msg;
int size;
int rc; int rc;
if(NULL == peer) if(NULL == peer)
return OMPI_ERR_UNREACH; return OMPI_ERR_UNREACH;
@ -87,26 +106,31 @@ int mca_oob_tcp_send_nb(
return rc; return rc;
/* calculate the size of the message */ /* calculate the size of the message */
msg->msg_size = sizeof(size_t); size = 0;
for(rc = 0; rc < count; rc++) { for(rc = 0; rc < count; rc++) {
msg->msg_size += iov[rc].iov_len; size += iov[rc].iov_len;
} }
/* turn the size to network byte order so there will be no problems */ /* turn the size to network byte order so there will be no problems */
msg->msg_size = htonl(msg->msg_size); msg->msg_hdr.msg_size = htonl(size);
msg->msg_hdr.msg_tag = htonl(tag);
msg->msg_user = iov;
/* create one additional iovect that will hold the size of the message */ /* create one additional iovect that will hold the size of the message */
msg->msg_iov = (struct iovec*)malloc(sizeof(struct iovec)*(count + 1)); msg->msg_type = MCA_OOB_TCP_POSTED;
msg->msg_iov[0].iov_base = &msg->msg_size; msg->msg_rc = 0;
msg->msg_iov[0].iov_len = sizeof(size_t); msg->msg_flags = flags;
msg->msg_rwptr = msg->msg_iov; msg->msg_uiov = iov;
msg->msg_count = msg->msg_rwcnt = count + 1; msg->msg_ucnt = count;
memcpy(msg->msg_iov, &(msg->msg_user[1]), sizeof(struct iovec)*count); msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,count+1);
msg->msg_rwiov[0].iov_base = &msg->msg_hdr;
msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr);
msg->msg_rwptr = msg->msg_rwiov;
msg->msg_rwcnt = msg->msg_rwnum = count + 1;
memcpy(msg->msg_rwiov+1, msg->msg_uiov, sizeof(struct iovec)*msg->msg_ucnt);
msg->msg_rwbuf = NULL;
msg->msg_cbfunc = cbfunc; msg->msg_cbfunc = cbfunc;
msg->msg_cbdata = cbdata; msg->msg_cbdata = cbdata;
msg->msg_complete = false; msg->msg_complete = false;
msg->msg_peer = &peer->peer_name; msg->msg_peer = peer->peer_name;
msg->msg_state = 0;
rc = mca_oob_tcp_peer_send(peer, msg); rc = mca_oob_tcp_peer_send(peer, msg);
if(rc != OMPI_SUCCESS) { if(rc != OMPI_SUCCESS) {

Просмотреть файл

@ -328,16 +328,20 @@ typedef int (*mca_ptl_base_module_del_procs_fn_t)(
* PML->PTL Initialize a send request for use by the PTL. * PML->PTL Initialize a send request for use by the PTL.
* *
* @param ptl (IN) PTL instance * @param ptl (IN) PTL instance
* @param request (OUT) Pointer to allocated request. * @param request (IN) Pointer to allocated request.
* *
* To reduce latency (number of required allocations), the PML allocates additional * To reduce latency (number of required allocations), the PML allocates additional
* space along w/ each request - that may be used by the PTL for additional control * space along w/ each request - that may be used by the PTL for additional control
* information (e.g. first fragment descriptor). * information (e.g. first fragment descriptor). If the PTL intends to use this space
* the ptl_cache_bytes attributes should be set to reflect the number of bytes needed
* by the PTL on a per-request basis. This space is allocated contiguously along with
* the mca_pml_base_send_request_t, w/ the space available to the PTL immediately
* following the request.
* *
* The init function is called the first time the request is ued by the PTL. On * The init function is called the first time the request is ued by the PTL. On
* completion of the request - the PML will cache the request for later use by the * completion of the request - the PML will cache the request for later use by the
* same PTL. When the request is re-used from the cache, the init function is NOT * same PTL. When the request is re-used from the cache, the init function is NOT
* called for subsequent sends. * called for subsequent sends.
*/ */
typedef int (*mca_ptl_base_module_request_init_fn_t)( typedef int (*mca_ptl_base_module_request_init_fn_t)(
struct mca_ptl_base_module_t* ptl, struct mca_ptl_base_module_t* ptl,
@ -350,12 +354,12 @@ typedef int (*mca_ptl_base_module_request_init_fn_t)(
* request by the PTL. * request by the PTL.
* *
* @param ptl (IN) PTL instance * @param ptl (IN) PTL instance
* @param request (OUT) Pointer to allocated request. * @param request (IN) Pointer to allocated request.
* *
* The fini function is called when the PML removes a request from the PTLs * The fini function is called when the PML removes a request from the PTLs
* cache (due to resource constraints) or reaching cache limit, prior to re-using * cache (due to resource constraints) or the cache limit has been reached, prior
* the request for another PTL. This provides the PTL the chance to cleanup/release * to re-using the request for another PTL. This provides the PTL the chance to
* any resources cached on the send descriptor by the PTL. * cleanup/release any resources cached on the send descriptor by the PTL.
*/ */
typedef void (*mca_ptl_base_module_request_fini_fn_t)( typedef void (*mca_ptl_base_module_request_fini_fn_t)(

Просмотреть файл

@ -30,7 +30,9 @@ static inline int ompi_condition_wait(ompi_condition_t* c, ompi_mutex_t* m)
} }
} else { } else {
while(c->c_signaled == 0) { while(c->c_signaled == 0) {
ompi_mutex_unlock(m);
ompi_progress(); ompi_progress();
ompi_mutex_lock(m);
} }
} }
c->c_signaled--; c->c_signaled--;