1
1

* Implement long-ago discussed RFC to add a callback data pointer in the

request completion callback
* Use the completion callback pointer to remove all need for opal_progress
  calls in the one-sided layer

This commit was SVN r24848.
Этот коммит содержится в:
Brian Barrett 2011-06-30 20:05:16 +00:00
родитель e6295159ae
Коммит a4b2bd903b
19 изменённых файлов: 499 добавлений и 766 удалений

Просмотреть файл

@ -29,8 +29,6 @@ pt2pt_sources = \
osc_pt2pt_header.h \
osc_pt2pt_longreq.h \
osc_pt2pt_longreq.c \
osc_pt2pt_mpireq.h \
osc_pt2pt_mpireq.c \
osc_pt2pt_replyreq.h \
osc_pt2pt_replyreq.c \
osc_pt2pt_sendreq.h \

Просмотреть файл

@ -22,7 +22,6 @@
#include "osc_pt2pt_sendreq.h"
#include "ompi/mca/osc/base/base.h"
#include "opal/runtime/opal_progress.h"
#include "opal/threads/mutex.h"
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
@ -34,7 +33,6 @@ int
ompi_osc_pt2pt_module_free(ompi_win_t *win)
{
int ret = OMPI_SUCCESS;
int tmp;
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
opal_output_verbose(1, ompi_osc_base_output,
@ -47,26 +45,6 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win)
module->p2p_comm->c_coll.coll_barrier_module);
}
/* remove from component information */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
tmp = opal_hash_table_remove_value_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
ompi_comm_get_cid(module->p2p_comm));
/* only take the output of hast_table_remove if there wasn't already an error */
ret = (ret != OMPI_SUCCESS) ? ret : tmp;
if (0 == opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules)) {
#if OMPI_ENABLE_PROGRESS_THREADS
void *foo;
mca_osc_pt2pt_component.p2p_c_thread_run = false;
opal_condition_broadcast(&ompi_request_cond);
opal_thread_join(&mca_osc_pt2pt_component.p2p_c_thread, &foo);
#else
opal_progress_unregister(ompi_osc_pt2pt_component_progress);
#endif
}
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
win->w_osc_module = NULL;
OBJ_DESTRUCT(&module->p2p_unlocks_pending);

Просмотреть файл

@ -28,6 +28,7 @@
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
#include "ompi/request/request.h"
#include "ompi/mca/osc/osc.h"
BEGIN_C_DECLS
@ -38,24 +39,9 @@ struct ompi_osc_pt2pt_component_t {
/** Extend the basic osc component interface */
ompi_osc_base_component_t super;
/** store the state of progress threads for this instance of OMPI */
bool p2p_c_have_progress_threads;
/** lock access to datastructures in the component structure */
opal_mutex_t p2p_c_lock;
/** List of active modules (windows) */
opal_hash_table_t p2p_c_modules;
/** max size of eager message */
size_t p2p_c_eager_size;
/** Lock for request management */
opal_mutex_t p2p_c_request_lock;
/** Condition variable for request management */
opal_condition_t p2p_c_request_cond;
/** free list of ompi_osc_pt2pt_sendreq_t structures */
opal_free_list_t p2p_c_sendreqs;
/** free list of ompi_osc_pt2pt_replyreq_t structures */
@ -64,14 +50,6 @@ struct ompi_osc_pt2pt_component_t {
opal_free_list_t p2p_c_longreqs;
/** free list for eager / control meessages */
opal_free_list_t p2p_c_buffers;
/** list of outstanding requests, of type ompi_osc_pt2pt_mpireq_t */
opal_list_t p2p_c_pending_requests;
#if OMPI_ENABLE_PROGRESS_THREADS
opal_thread_t p2p_c_thread;
bool p2p_c_thread_run;
#endif
};
typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;
@ -179,7 +157,26 @@ int ompi_osc_pt2pt_component_select(struct ompi_win_t *win,
struct ompi_info_t *info,
struct ompi_communicator_t *comm);
int ompi_osc_pt2pt_component_progress(void);
/* helper function that properly sets up request handling */
int ompi_osc_pt2pt_component_irecv(void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t *comm,
struct ompi_request_t **request,
ompi_request_complete_fn_t callback,
void *data);
int ompi_osc_pt2pt_component_isend(void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int dest,
int tag,
struct ompi_communicator_t *comm,
struct ompi_request_t **request,
ompi_request_complete_fn_t callback,
void *data);
/*
* Module interface function types

Просмотреть файл

@ -17,10 +17,11 @@
#include "ompi_config.h"
#include "osc_pt2pt_mpireq.h"
#include "osc_pt2pt_buffer.h"
#include "opal/class/opal_free_list.h"
#include "opal/types.h"
#include "osc_pt2pt_buffer.h"
static void ompi_osc_pt2pt_buffer_construct(ompi_osc_pt2pt_buffer_t *buf)
{
/* adjust payload location to account for alignment issues */
@ -36,7 +37,7 @@ static void ompi_osc_pt2pt_buffer_destruct(ompi_osc_pt2pt_buffer_t *buf)
}
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_buffer_t, ompi_osc_pt2pt_mpireq_t,
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_buffer_t, opal_free_list_item_t,
ompi_osc_pt2pt_buffer_construct,
ompi_osc_pt2pt_buffer_destruct);

Просмотреть файл

@ -17,12 +17,16 @@
#ifndef OMPI_OSC_PT2PT_BUFFER_H
#define OMPI_OSC_PT2PT_BUFFER_H
#include "osc_pt2pt_mpireq.h"
#include "opal/class/opal_free_list.h"
#include "ompi/request/request.h"
BEGIN_C_DECLS
struct ompi_osc_pt2pt_buffer_t {
ompi_osc_pt2pt_mpireq_t mpireq;
ompi_free_list_item_t super;
ompi_request_t *request;
void *data;
void *payload;
size_t len;
};

Просмотреть файл

@ -12,6 +12,7 @@
* reserved.
* Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2006-2008 University of Houston. All rights reserved.
* Copyright (c) 2010 Sandia National Laboratories. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -40,10 +41,7 @@
#include "ompi/mca/pml/pml.h"
static int component_open(void);
static void component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq);
#if OMPI_ENABLE_PROGRESS_THREADS
static void* component_thread_fn(opal_object_t *obj);
#endif
static int component_fragment_cb(ompi_request_t *request);
ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
{ /* ompi_osc_base_component_t */
@ -57,7 +55,6 @@ ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
NULL
},
{ /* mca_base_component_data */
/* The component is checkpoint ready - JJH Double check this... */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
ompi_osc_pt2pt_component_init,
@ -90,62 +87,16 @@ ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_template = {
};
/* look up parameters for configuring this window. The code first
looks in the info structure passed by the user, then through mca
parameters. */
static bool
check_config_value_bool(char *key, ompi_info_t *info)
{
char *value_string;
int value_len, ret, flag, param;
bool result;
ret = ompi_info_get_valuelen(info, key, &value_len, &flag);
if (OMPI_SUCCESS != ret) goto info_not_found;
if (flag == 0) goto info_not_found;
value_len++;
value_string = (char*)malloc(sizeof(char) * value_len + 1); /* Should malloc 1 char for NUL-termination */
if (NULL == value_string) goto info_not_found;
ret = ompi_info_get(info, key, value_len, value_string, &flag);
if (OMPI_SUCCESS != ret) {
free(value_string);
goto info_not_found;
}
assert(flag != 0);
ret = ompi_info_value_to_bool(value_string, &result);
free(value_string);
if (OMPI_SUCCESS != ret) goto info_not_found;
return result;
info_not_found:
param = mca_base_param_find("osc", "pt2pt", key);
if (0 > param) return false;
ret = mca_base_param_lookup_int(param, &flag);
if (OMPI_SUCCESS != ret) return false;
return OPAL_INT_TO_BOOL(flag);
}
static int
component_open(void)
{
int tmp;
mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version,
"no_locks",
"Enable optimizations available only if MPI_LOCK is not used.",
false, false, 0, NULL);
mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version,
"eager_limit",
"Max size of eagerly sent data",
false, false, 16 * 1024,
&tmp);
mca_osc_pt2pt_component.p2p_c_eager_size = tmp;
return OMPI_SUCCESS;
@ -158,22 +109,6 @@ ompi_osc_pt2pt_component_init(bool enable_progress_threads,
{
size_t aligned_size;
/* we can run with either threads or not threads (may not be able
to do win locks)... */
mca_osc_pt2pt_component.p2p_c_have_progress_threads =
enable_progress_threads;
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_modules,
opal_hash_table_t);
opal_hash_table_init(&mca_osc_pt2pt_component.p2p_c_modules, 2);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_request_lock,
opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_request_cond,
opal_condition_t);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_sendreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_pt2pt_component.p2p_c_sendreqs,
sizeof(ompi_osc_pt2pt_sendreq_t),
@ -202,14 +137,6 @@ ompi_osc_pt2pt_component_init(bool enable_progress_threads,
OBJ_CLASS(ompi_osc_pt2pt_buffer_t),
1, -1, 1);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_pending_requests,
opal_list_t);
#if OMPI_ENABLE_PROGRESS_THREADS
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_thread, opal_thread_t);
mca_osc_pt2pt_component.p2p_c_thread_run = false;
#endif
return OMPI_SUCCESS;
}
@ -217,35 +144,10 @@ ompi_osc_pt2pt_component_init(bool enable_progress_threads,
int
ompi_osc_pt2pt_component_finalize(void)
{
size_t num_modules;
#if OMPI_ENABLE_PROGRESS_THREADS
void* ret;
#endif
if (0 !=
(num_modules = opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules))) {
opal_output(ompi_osc_base_output,
"WARNING: There were %d Windows created but not freed.",
(int) num_modules);
#if OMPI_ENABLE_PROGRESS_THREADS
mca_osc_pt2pt_component.p2p_c_thread_run = false;
opal_condition_broadcast(&ompi_request_cond);
opal_thread_join(&mca_osc_pt2pt_component.p2p_c_thread, &ret);
#else
opal_progress_unregister(ompi_osc_pt2pt_component_progress);
#endif
}
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_pending_requests);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_buffers);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_longreqs);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_replyreqs);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_sendreqs);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_request_lock);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_request_cond);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_modules);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_lock);
return OMPI_SUCCESS;
}
@ -360,32 +262,8 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
module->p2p_shared_count = 0;
module->p2p_lock_received_ack = 0;
/* update component data */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_hash_table_set_value_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
ompi_comm_get_cid(module->p2p_comm),
module);
ret = opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules);
if (ret == 1) {
#if OMPI_ENABLE_PROGRESS_THREADS
mca_osc_pt2pt_component.p2p_c_thread_run = true;
mca_osc_pt2pt_component.p2p_c_thread.t_run = component_thread_fn;
mca_osc_pt2pt_component.p2p_c_thread.t_arg = NULL;
ret = opal_thread_start(&mca_osc_pt2pt_component.p2p_c_thread);
#else
ret = opal_progress_register(ompi_osc_pt2pt_component_progress);
#endif
} else {
ret = OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
if (OMPI_SUCCESS != ret) goto cleanup;
/* fill in window information */
win->w_osc_module = (ompi_osc_base_module_t*) module;
if (check_config_value_bool("no_locks", info)) {
win->w_flags |= OMPI_WIN_NO_LOCKS;
}
/* sync memory - make sure all initialization completed */
opal_atomic_mb();
@ -394,24 +272,19 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,
item, ret);
if (OMPI_SUCCESS != ret) goto cleanup;
buffer = (ompi_osc_pt2pt_buffer_t*) item;
buffer->mpireq.cbfunc = component_fragment_cb;
buffer->mpireq.cbdata = (void*) module;
ret = MCA_PML_CALL(irecv(buffer->payload,
mca_osc_pt2pt_component.p2p_c_eager_size,
MPI_BYTE,
MPI_ANY_SOURCE,
CONTROL_MSG_TAG,
module->p2p_comm,
&buffer->mpireq.request));
if (OMPI_SUCCESS != ret) goto cleanup;
buffer->data = (void*) module;
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&buffer->mpireq.super.super);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
ret = ompi_osc_pt2pt_component_irecv(buffer->payload,
mca_osc_pt2pt_component.p2p_c_eager_size,
MPI_BYTE,
MPI_ANY_SOURCE,
CONTROL_MSG_TAG,
module->p2p_comm,
&(buffer->request),
component_fragment_cb,
buffer);
if (OMPI_SUCCESS != ret) goto cleanup;
return OMPI_SUCCESS;
@ -454,17 +327,16 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
/* dispatch for callback on message completion */
static void
component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
static int
component_fragment_cb(ompi_request_t *request)
{
int ret;
ompi_osc_pt2pt_buffer_t *buffer =
(ompi_osc_pt2pt_buffer_t*) mpireq;
(ompi_osc_pt2pt_buffer_t*) request->req_complete_cb_data;
ompi_osc_pt2pt_module_t *module =
(ompi_osc_pt2pt_module_t*) mpireq->cbdata;
(ompi_osc_pt2pt_module_t*) buffer->data;
assert(mpireq->status._ucount >= sizeof(ompi_osc_pt2pt_base_header_t));
assert(request->req_status._ucount >= (int) sizeof(ompi_osc_pt2pt_base_header_t));
/* handle message */
switch (((ompi_osc_pt2pt_base_header_t*) buffer->payload)->hdr_type) {
@ -684,90 +556,97 @@ component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
opal_output_verbose(5, ompi_osc_base_output,
"received one-sided packet for with unknown type");
}
ret = MCA_PML_CALL(irecv(buffer->payload,
mca_osc_pt2pt_component.p2p_c_eager_size,
MPI_BYTE,
MPI_ANY_SOURCE,
CONTROL_MSG_TAG,
module->p2p_comm,
&buffer->mpireq.request));
/* BWB -- FIX ME -- handle irecv errors */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&buffer->mpireq.super.super);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
ompi_request_free(&request);
ret = ompi_osc_pt2pt_component_irecv(buffer->payload,
mca_osc_pt2pt_component.p2p_c_eager_size,
MPI_BYTE,
MPI_ANY_SOURCE,
CONTROL_MSG_TAG,
module->p2p_comm,
&buffer->request,
component_fragment_cb,
buffer);
return ret;
}
int
ompi_osc_pt2pt_component_progress(void)
ompi_osc_pt2pt_component_irecv(void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t *comm,
ompi_request_t **request,
ompi_request_complete_fn_t callback,
void *cbdata)
{
opal_list_item_t *item;
int ret, done = 0;
int ret;
bool missed_callback;
ompi_request_complete_fn_t tmp;
#if OMPI_ENABLE_PROGRESS_THREADS
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
#else
ret = OPAL_THREAD_TRYLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
if (ret != 0) return 0;
#endif
ret = MCA_PML_CALL(irecv(buf, count, datatype,
src, tag, comm, request));
if (OMPI_SUCCESS != ret) return ret;
for (item = opal_list_get_first(&mca_osc_pt2pt_component.p2p_c_pending_requests) ;
item != opal_list_get_end(&mca_osc_pt2pt_component.p2p_c_pending_requests) ;
item = opal_list_get_next(item)) {
ompi_osc_pt2pt_mpireq_t *buffer =
(ompi_osc_pt2pt_mpireq_t*) item;
/* lock the giant request mutex to update the callback data so
that the PML can't mark the request as complete while we're
updating the callback data, which means we can
deterministically ensure the callback is only fired once and
that we didn't miss it. */
OPAL_THREAD_LOCK(&ompi_request_lock);
(*request)->req_complete_cb = callback;
(*request)->req_complete_cb_data = cbdata;
missed_callback = (*request)->req_complete;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* BWB - FIX ME */
#if OMPI_ENABLE_PROGRESS_THREADS == 0
if (buffer->request->req_state == OMPI_REQUEST_INACTIVE ||
buffer->request->req_complete) {
ret = ompi_request_test(&buffer->request,
&done,
&buffer->status);
} else {
done = 0;
ret = OMPI_SUCCESS;
}
#else
ret = ompi_request_test(&buffer->request,
&done,
&buffer->status);
#endif
if (OMPI_SUCCESS == ret && 0 != done) {
opal_list_remove_item(&mca_osc_pt2pt_component.p2p_c_pending_requests,
item);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
buffer->cbfunc(buffer);
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
break;
}
}
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
return done;
}
#if OMPI_ENABLE_PROGRESS_THREADS
static void*
component_thread_fn(opal_object_t *obj)
{
struct timespec waittime;
while (mca_osc_pt2pt_component.p2p_c_thread_run) {
/* wake up whenever a request completes, to make sure it's not
for us */
waittime.tv_sec = 1;
waittime.tv_nsec = 0;
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_condition_timedwait(&ompi_request_cond, &ompi_request_lock, &waittime);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
ompi_osc_pt2pt_component_progress();
if (missed_callback) {
tmp = (*request)->req_complete_cb;
(*request)->req_complete_cb = NULL;
tmp(*request);
}
return NULL;
return OMPI_SUCCESS;
}
#endif
int
ompi_osc_pt2pt_component_isend(void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int dest,
int tag,
struct ompi_communicator_t *comm,
ompi_request_t **request,
ompi_request_complete_fn_t callback,
void *cbdata)
{
int ret;
bool missed_callback;
ompi_request_complete_fn_t tmp;
ret = MCA_PML_CALL(isend(buf, count, datatype,
dest, tag, MCA_PML_BASE_SEND_STANDARD, comm, request));
if (OMPI_SUCCESS != ret) return ret;
/* lock the giant request mutex to update the callback data so
that the PML can't mark the request as complete while we're
updating the callback data, which means we can
deterministically ensure the callback is only fired once and
that we didn't miss it. */
OPAL_THREAD_LOCK(&ompi_request_lock);
(*request)->req_complete_cb = callback;
(*request)->req_complete_cb_data = cbdata;
missed_callback = (*request)->req_complete;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if (missed_callback) {
tmp = (*request)->req_complete_cb;
(*request)->req_complete_cb = NULL;
tmp(*request);
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -86,13 +86,12 @@ inmsg_mark_complete(ompi_osc_pt2pt_module_t *module)
* Sending a sendreq to target
*
**********************************************************************/
static void
ompi_osc_pt2pt_sendreq_send_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
static int
ompi_osc_pt2pt_sendreq_send_long_cb(ompi_request_t *request)
{
ompi_osc_pt2pt_longreq_t *longreq =
(ompi_osc_pt2pt_longreq_t*) mpireq;
ompi_osc_pt2pt_sendreq_t *sendreq =
(ompi_osc_pt2pt_sendreq_t*) longreq->mpireq.cbdata;
(ompi_osc_pt2pt_longreq_t*) request->req_complete_cb_data;
ompi_osc_pt2pt_sendreq_t *sendreq = longreq->req_basereq.req_sendreq;
int32_t count;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
@ -108,16 +107,20 @@ ompi_osc_pt2pt_sendreq_send_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
ompi_osc_pt2pt_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->p2p_cond);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
static void
ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
static int
ompi_osc_pt2pt_sendreq_send_cb(ompi_request_t *request)
{
ompi_osc_pt2pt_buffer_t *buffer =
(ompi_osc_pt2pt_buffer_t*) mpireq;
(ompi_osc_pt2pt_buffer_t*) request->req_complete_cb_data;
ompi_osc_pt2pt_sendreq_t *sendreq =
(ompi_osc_pt2pt_sendreq_t*) mpireq->cbdata;
(ompi_osc_pt2pt_sendreq_t*) buffer->data;
ompi_osc_pt2pt_send_header_t *header =
(ompi_osc_pt2pt_send_header_t*) buffer->payload;
int32_t count;
@ -135,7 +138,7 @@ ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
}
#endif
/* do we need to post a send? */
if (header->hdr_msg_length != 0) {
/* sendreq is done. Mark it as so and get out of here */
OPAL_THREAD_LOCK(&sendreq->req_module->p2p_lock);
@ -147,8 +150,11 @@ ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
}
/* release the buffer */
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
&mpireq->super);
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, buffer);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -190,8 +196,7 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
buffer = (ompi_osc_pt2pt_buffer_t*) item;
/* setup buffer */
buffer->mpireq.cbfunc = ompi_osc_pt2pt_sendreq_send_cb;
buffer->mpireq.cbdata = (void*) sendreq;
buffer->data = sendreq;
/* pack header */
header = (ompi_osc_pt2pt_send_header_t*) buffer->payload;
@ -284,22 +289,16 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
"%d sending sendreq to %d",
ompi_comm_rank(sendreq->req_module->p2p_comm),
sendreq->req_target_rank));
ret = MCA_PML_CALL(isend(buffer->payload,
buffer->len,
MPI_BYTE,
sendreq->req_target_rank,
CONTROL_MSG_TAG,
MCA_PML_BASE_SEND_STANDARD,
module->p2p_comm,
&buffer->mpireq.request));
ret = ompi_osc_pt2pt_component_isend(buffer->payload,
buffer->len,
MPI_BYTE,
sendreq->req_target_rank,
CONTROL_MSG_TAG,
module->p2p_comm,
&buffer->request,
ompi_osc_pt2pt_sendreq_send_cb,
buffer);
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&buffer->mpireq.super.super);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
/* Need to be fixed.
* The payload is made undefined due to the isend call.
*/
MEMCHECKER(
opal_memchecker_base_mem_defined(buffer->payload, buffer->len);
);
@ -307,29 +306,23 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
header->hdr_msg_length == 0) {
ompi_osc_pt2pt_longreq_t *longreq;
ompi_osc_pt2pt_longreq_alloc(&longreq);
longreq->req_basereq.req_sendreq = sendreq;
longreq->mpireq.cbfunc = ompi_osc_pt2pt_sendreq_send_long_cb;
longreq->mpireq.cbdata = sendreq;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d starting long sendreq to %d (%d)",
ompi_comm_rank(sendreq->req_module->p2p_comm),
sendreq->req_target_rank,
header->hdr_origin_tag));
mca_pml.pml_isend(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_origin_tag,
MCA_PML_BASE_SEND_STANDARD,
sendreq->req_module->p2p_comm,
&(longreq->mpireq.request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&(longreq->mpireq.super.super));
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
ret = ompi_osc_pt2pt_component_isend(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_origin_tag,
sendreq->req_module->p2p_comm,
&(longreq->req_pml_request),
ompi_osc_pt2pt_sendreq_send_long_cb,
longreq);
}
goto done;
@ -350,28 +343,31 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
* Sending a replyreq back to origin
*
**********************************************************************/
static void
ompi_osc_pt2pt_replyreq_send_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
static int
ompi_osc_pt2pt_replyreq_send_long_cb(ompi_request_t *request)
{
ompi_osc_pt2pt_longreq_t *longreq =
(ompi_osc_pt2pt_longreq_t*) mpireq;
ompi_osc_pt2pt_replyreq_t *replyreq =
(ompi_osc_pt2pt_replyreq_t*) mpireq->cbdata;
(ompi_osc_pt2pt_longreq_t*) request->req_complete_cb_data;
ompi_osc_pt2pt_replyreq_t *replyreq = longreq->req_basereq.req_replyreq;
inmsg_mark_complete(replyreq->rep_module);
ompi_osc_pt2pt_longreq_free(longreq);
ompi_osc_pt2pt_replyreq_free(replyreq);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
static void
ompi_osc_pt2pt_replyreq_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
static int
ompi_osc_pt2pt_replyreq_send_cb(ompi_request_t *request)
{
ompi_osc_pt2pt_buffer_t *buffer =
(ompi_osc_pt2pt_buffer_t*) mpireq;
(ompi_osc_pt2pt_buffer_t*) request->req_complete_cb_data;
ompi_osc_pt2pt_replyreq_t *replyreq =
(ompi_osc_pt2pt_replyreq_t*) mpireq->cbdata;
(ompi_osc_pt2pt_replyreq_t*) buffer->data;
ompi_osc_pt2pt_reply_header_t *header =
(ompi_osc_pt2pt_reply_header_t*) buffer->payload;
@ -389,8 +385,11 @@ ompi_osc_pt2pt_replyreq_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
}
/* release the descriptor and replyreq */
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
&mpireq->super);
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, buffer);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -420,8 +419,7 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
}
/* setup buffer */
buffer->mpireq.cbfunc = ompi_osc_pt2pt_replyreq_send_cb;
buffer->mpireq.cbdata = (void*) replyreq;
buffer->data = replyreq;
/* pack header */
header = (ompi_osc_pt2pt_reply_header_t*) buffer->payload;
@ -482,18 +480,15 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
#endif
/* send fragment */
ret = MCA_PML_CALL(isend(buffer->payload,
buffer->len,
MPI_BYTE,
replyreq->rep_origin_rank,
CONTROL_MSG_TAG,
MCA_PML_BASE_SEND_STANDARD,
module->p2p_comm,
&buffer->mpireq.request));
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&buffer->mpireq.super.super);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
ret = ompi_osc_pt2pt_component_isend(buffer->payload,
buffer->len,
MPI_BYTE,
replyreq->rep_origin_rank,
CONTROL_MSG_TAG,
module->p2p_comm,
&buffer->request,
ompi_osc_pt2pt_replyreq_send_cb,
buffer);
/* Need to be fixed.
* The payload is made undefined due to the isend call.
@ -504,24 +499,17 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
if (header->hdr_msg_length == 0) {
ompi_osc_pt2pt_longreq_t *longreq;
ompi_osc_pt2pt_longreq_alloc(&longreq);
longreq->req_basereq.req_replyreq = replyreq;
longreq->mpireq.cbfunc = ompi_osc_pt2pt_replyreq_send_long_cb;
longreq->mpireq.cbdata = replyreq;
mca_pml.pml_isend(replyreq->rep_target_convertor.pBaseBuf,
replyreq->rep_target_convertor.count,
replyreq->rep_target_datatype,
replyreq->rep_origin_rank,
header->hdr_target_tag,
MCA_PML_BASE_SEND_STANDARD,
module->p2p_comm,
&(longreq->mpireq.request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&longreq->mpireq.super.super);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
ret = ompi_osc_pt2pt_component_isend(replyreq->rep_target_convertor.pBaseBuf,
replyreq->rep_target_convertor.count,
replyreq->rep_target_datatype,
replyreq->rep_origin_rank,
header->hdr_target_tag,
module->p2p_comm,
&(longreq->req_pml_request),
ompi_osc_pt2pt_replyreq_send_long_cb,
longreq);
}
goto done;
@ -541,16 +529,20 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
* Receive a put on the target side
*
**********************************************************************/
static void
ompi_osc_pt2pt_sendreq_recv_put_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
static int
ompi_osc_pt2pt_sendreq_recv_put_long_cb(ompi_request_t *request)
{
ompi_osc_pt2pt_longreq_t *longreq =
(ompi_osc_pt2pt_longreq_t*) mpireq;
ompi_osc_pt2pt_longreq_t *longreq =
(ompi_osc_pt2pt_longreq_t*) request->req_complete_cb_data;
OBJ_RELEASE(longreq->req_datatype);
ompi_osc_pt2pt_longreq_free(longreq);
inmsg_mark_complete(longreq->req_module);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -617,24 +609,18 @@ ompi_osc_pt2pt_sendreq_recv_put(ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_longreq_t *longreq;
ompi_osc_pt2pt_longreq_alloc(&longreq);
longreq->mpireq.cbfunc = ompi_osc_pt2pt_sendreq_recv_put_long_cb;
longreq->mpireq.cbdata = NULL;
longreq->req_datatype = datatype;
longreq->req_module = module;
ret = mca_pml.pml_irecv(target,
header->hdr_target_count,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->p2p_comm,
&(longreq->mpireq.request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&(longreq->mpireq.super.super));
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
ret = ompi_osc_pt2pt_component_irecv(target,
header->hdr_target_count,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->p2p_comm,
&(longreq->req_pml_request),
ompi_osc_pt2pt_sendreq_recv_put_long_cb,
longreq);
}
return ret;
@ -646,14 +632,13 @@ ompi_osc_pt2pt_sendreq_recv_put(ompi_osc_pt2pt_module_t *module,
* Receive an accumulate on the target side
*
**********************************************************************/
static void
ompi_osc_pt2pt_sendreq_recv_accum_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
static int
ompi_osc_pt2pt_sendreq_recv_accum_long_cb(ompi_request_t *request)
{
ompi_osc_pt2pt_longreq_t *longreq =
(ompi_osc_pt2pt_longreq_t*) mpireq;
(ompi_osc_pt2pt_longreq_t*) request->req_complete_cb_data;
ompi_osc_pt2pt_module_t *module = longreq->req_module;
ompi_osc_pt2pt_send_header_t *header =
(ompi_osc_pt2pt_send_header_t*) mpireq->cbdata;
ompi_osc_pt2pt_send_header_t *header = longreq->req_basereq.req_sendhdr;
void *payload = (void*) (header + 1);
int ret;
void *target = (unsigned char*) module->p2p_win->w_baseptr +
@ -717,7 +702,7 @@ ompi_osc_pt2pt_sendreq_recv_accum_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
header->hdr_origin));
/* free the temp buffer */
free(mpireq->cbdata);
free(longreq->req_basereq.req_sendhdr);
/* Release datatype & op */
OBJ_RELEASE(longreq->req_datatype);
@ -726,6 +711,10 @@ ompi_osc_pt2pt_sendreq_recv_accum_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
inmsg_mark_complete(longreq->req_module);
ompi_osc_pt2pt_longreq_free(longreq);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -881,39 +870,34 @@ ompi_osc_pt2pt_sendreq_recv_accum(ompi_osc_pt2pt_module_t *module,
/* get a longreq and fill it in */
ompi_osc_pt2pt_longreq_alloc(&longreq);
longreq->mpireq.cbfunc = ompi_osc_pt2pt_sendreq_recv_accum_long_cb;
longreq->req_datatype = datatype;
longreq->req_op = op;
longreq->req_module = module;
/* allocate a buffer to receive into ... */
longreq->mpireq.cbdata = malloc(buflen + sizeof(ompi_osc_pt2pt_send_header_t));
if (NULL == longreq->mpireq.cbdata) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
/* fill in tmp header */
memcpy(longreq->mpireq.cbdata, header,
sizeof(ompi_osc_pt2pt_send_header_t));
((ompi_osc_pt2pt_send_header_t*) longreq->mpireq.cbdata)->hdr_msg_length = buflen;
longreq->req_basereq.req_sendhdr = malloc(buflen + sizeof(ompi_osc_pt2pt_send_header_t));
ret = mca_pml.pml_irecv(((char*) longreq->mpireq.cbdata) + sizeof(ompi_osc_pt2pt_send_header_t),
primitive_count,
primitive_datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->p2p_comm,
&(longreq->mpireq.request));
if (NULL == longreq->req_basereq.req_sendhdr) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
/* fill in tmp header */
memcpy(longreq->req_basereq.req_sendhdr, header,
sizeof(ompi_osc_pt2pt_send_header_t));
((ompi_osc_pt2pt_send_header_t*) longreq->req_basereq.req_sendhdr)->hdr_msg_length = buflen;
ret = ompi_osc_pt2pt_component_irecv(((char*) longreq->req_basereq.req_sendhdr) + sizeof(ompi_osc_pt2pt_send_header_t),
primitive_count,
primitive_datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->p2p_comm,
&(longreq->req_pml_request),
ompi_osc_pt2pt_sendreq_recv_accum_long_cb,
longreq);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d started long recv accum message from %d (%d)",
ompi_comm_rank(module->p2p_comm),
header->hdr_origin,
header->hdr_origin_tag));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&(longreq->mpireq.super.super));
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
}
return ret;
@ -925,13 +909,12 @@ ompi_osc_pt2pt_sendreq_recv_accum(ompi_osc_pt2pt_module_t *module,
* Recveive a get on the origin side
*
**********************************************************************/
static void
ompi_osc_pt2pt_replyreq_recv_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
static int
ompi_osc_pt2pt_replyreq_recv_long_cb(ompi_request_t *request)
{
ompi_osc_pt2pt_longreq_t *longreq =
(ompi_osc_pt2pt_longreq_t*) mpireq;
ompi_osc_pt2pt_sendreq_t *sendreq =
(ompi_osc_pt2pt_sendreq_t*) longreq->mpireq.cbdata;
(ompi_osc_pt2pt_longreq_t*) request->req_complete_cb_data;
ompi_osc_pt2pt_sendreq_t *sendreq = longreq->req_basereq.req_sendreq;
int32_t count;
OPAL_THREAD_LOCK(&sendreq->req_module->p2p_lock);
@ -942,6 +925,10 @@ ompi_osc_pt2pt_replyreq_recv_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
ompi_osc_pt2pt_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->p2p_cond);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
int
@ -996,24 +983,18 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_longreq_t *longreq;
ompi_osc_pt2pt_longreq_alloc(&longreq);
longreq->mpireq.cbfunc = ompi_osc_pt2pt_replyreq_recv_long_cb;
longreq->mpireq.cbdata = sendreq;
longreq->req_basereq.req_sendreq = sendreq;
longreq->req_module = module;
/* BWB - FIX ME - George is going to kill me for this */
ret = mca_pml.pml_irecv(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_target_tag,
module->p2p_comm,
&(longreq->mpireq.request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&(longreq->mpireq.super.super));
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
ret = ompi_osc_pt2pt_component_irecv(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_target_tag,
module->p2p_comm,
&(longreq->req_pml_request),
ompi_osc_pt2pt_replyreq_recv_long_cb,
longreq);
}
return ret;
@ -1025,12 +1006,17 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
* Control message communication
*
**********************************************************************/
static void
ompi_osc_pt2pt_control_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
static int
ompi_osc_pt2pt_control_send_cb(ompi_request_t *request)
{
opal_free_list_item_t *item = (opal_free_list_item_t*) request->req_complete_cb_data;
/* release the descriptor and sendreq */
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
&mpireq->super);
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, item);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -1068,8 +1054,7 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
}
/* setup buffer */
buffer->mpireq.cbfunc = ompi_osc_pt2pt_control_send_cb;
buffer->mpireq.cbdata = NULL;
buffer->data = NULL;
buffer->len = sizeof(ompi_osc_pt2pt_control_header_t);
/* pack header */
@ -1089,19 +1074,15 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
#endif
/* send fragment */
ret = MCA_PML_CALL(isend(buffer->payload,
buffer->len,
MPI_BYTE,
rank,
CONTROL_MSG_TAG,
MCA_PML_BASE_SEND_STANDARD,
module->p2p_comm,
&buffer->mpireq.request));
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
&(buffer->mpireq.super.super));
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
ret = ompi_osc_pt2pt_component_isend(buffer->payload,
buffer->len,
MPI_BYTE,
rank,
CONTROL_MSG_TAG,
module->p2p_comm,
&buffer->request,
ompi_osc_pt2pt_control_send_cb,
buffer);
goto done;
cleanup:

Просмотреть файл

@ -16,10 +16,8 @@
#include "ompi_config.h"
#include "osc_pt2pt.h"
#include "osc_pt2pt_mpireq.h"
#include "osc_pt2pt_longreq.h"
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_longreq_t, ompi_osc_pt2pt_mpireq_t,
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_longreq_t, opal_free_list_item_t,
NULL, NULL);

Просмотреть файл

@ -17,15 +17,23 @@
#ifndef OSC_PT2PT_LONGREQ_H
#define OSC_PT2PT_LONGREQ_H
#include "opal/class/opal_free_list.h"
#include "osc_pt2pt.h"
#include "osc_pt2pt_mpireq.h"
struct ompi_osc_pt2pt_longreq_t {
ompi_osc_pt2pt_mpireq_t mpireq;
opal_free_list_item_t super;
/* warning - this doesn't always have a sane value */
ompi_osc_pt2pt_module_t *req_module;
struct ompi_request_t *req_pml_request; /* PML request */
union {
struct ompi_osc_pt2pt_sendreq_t *req_sendreq;
struct ompi_osc_pt2pt_replyreq_t *req_replyreq;
struct ompi_osc_pt2pt_send_header_t *req_sendhdr;
} req_basereq;
/* This may not always be filled in... */
struct ompi_osc_pt2pt_module_t *req_module;
struct ompi_op_t *req_op;
struct ompi_datatype_t *req_datatype;
};
@ -49,7 +57,7 @@ static inline int
ompi_osc_pt2pt_longreq_free(ompi_osc_pt2pt_longreq_t *longreq)
{
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_longreqs,
&longreq->mpireq.super.super);
&longreq->super);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -1,27 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_pt2pt.h"
#include "osc_pt2pt_mpireq.h"
#include "opal/class/opal_free_list.h"
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_mpireq_t, opal_free_list_item_t,
NULL, NULL);

Просмотреть файл

@ -1,44 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2006 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_OSC_PT2PT_MPIREQ_H
#define OMPI_OSC_PT2PT_MPIREQ_H
#include "opal/class/opal_free_list.h"
#include "ompi/request/request.h"
BEGIN_C_DECLS
struct ompi_osc_pt2pt_mpireq_t;
typedef void (*ompi_osc_pt2pt_mpireq_cb_fn_t)(
struct ompi_osc_pt2pt_mpireq_t *mpireq);
struct ompi_osc_pt2pt_mpireq_t {
opal_free_list_item_t super;
ompi_request_t *request;
ompi_status_public_t status;
ompi_osc_pt2pt_mpireq_cb_fn_t cbfunc;
void *cbdata;
};
typedef struct ompi_osc_pt2pt_mpireq_t ompi_osc_pt2pt_mpireq_t;
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_mpireq_t);
END_C_DECLS
#endif

Просмотреть файл

@ -18,7 +18,6 @@
#include "osc_pt2pt.h"
#include "osc_pt2pt_sendreq.h"
#include "osc_pt2pt_longreq.h"
#include "osc_pt2pt_header.h"
#include "osc_pt2pt_data_move.h"

Просмотреть файл

@ -21,7 +21,6 @@
#include "osc_rdma.h"
#include "osc_rdma_sendreq.h"
#include "opal/runtime/opal_progress.h"
#include "opal/threads/mutex.h"
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
@ -34,7 +33,7 @@ int
ompi_osc_rdma_module_free(ompi_win_t *win)
{
int ret = OMPI_SUCCESS;
int tmp, i;
int i;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
opal_output_verbose(1, ompi_osc_base_output,
@ -49,22 +48,8 @@ ompi_osc_rdma_module_free(ompi_win_t *win)
/* remove from component information */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
tmp = opal_hash_table_remove_value_uint32(&mca_osc_rdma_component.c_modules,
opal_hash_table_remove_value_uint32(&mca_osc_rdma_component.c_modules,
ompi_comm_get_cid(module->m_comm));
/* only take the output of hast_table_remove if there wasn't already an error */
ret = (ret != OMPI_SUCCESS) ? ret : tmp;
if (0 == opal_hash_table_get_size(&mca_osc_rdma_component.c_modules)) {
#if OMPI_ENABLE_PROGRESS_THREADS
void *foo;
mca_osc_rdma_component.c_thread_run = false;
opal_condition_broadcast(&ompi_request_cond);
opal_thread_join(&mca_osc_rdma_component.c_thread, &foo);
#else
opal_progress_unregister(ompi_osc_rdma_component_progress);
#endif
}
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
win->w_osc_module = NULL;

Просмотреть файл

@ -28,6 +28,7 @@
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
#include "ompi/request/request.h"
#include "ompi/mca/osc/osc.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/bml/bml.h"
@ -45,9 +46,6 @@ struct ompi_osc_rdma_component_t {
/** Extend the basic osc component interface */
ompi_osc_base_component_t super;
/** store the state of progress threads for this instance of OMPI */
bool c_have_progress_threads;
/** lock access to datastructures in the component structure */
opal_mutex_t c_lock;
@ -69,14 +67,6 @@ struct ompi_osc_rdma_component_t {
/** free list of ompi_osc_rdma_longreq_t structures */
opal_free_list_t c_longreqs;
/** list of outstanding requests, of type ompi_osc_pt2pt_longreq_t */
opal_list_t c_pending_requests;
#if OMPI_ENABLE_PROGRESS_THREADS
opal_thread_t c_thread;
bool c_thread_run;
#endif
bool c_btl_registered;
uint32_t c_sequence_number;
@ -237,7 +227,26 @@ int ompi_osc_rdma_component_select(struct ompi_win_t *win,
struct ompi_info_t *info,
struct ompi_communicator_t *comm);
int ompi_osc_rdma_component_progress(void);
/* helper function that properly sets up request handling */
int ompi_osc_rdma_component_irecv(void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t *comm,
struct ompi_request_t **request,
ompi_request_complete_fn_t callback,
void *data);
int ompi_osc_rdma_component_isend(void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int dest,
int tag,
struct ompi_communicator_t *comm,
struct ompi_request_t **request,
ompi_request_complete_fn_t callback,
void *data);
int ompi_osc_rdma_peer_info_free(ompi_osc_rdma_peer_info_t *peer_info);

Просмотреть файл

@ -43,15 +43,13 @@
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/mca/pml/pml.h"
static int component_open(void);
static void component_fragment_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata);
#if OMPI_ENABLE_PROGRESS_THREADS
static void* component_thread_fn(opal_object_t *obj);
#endif
static int setup_rdma(ompi_osc_rdma_module_t *module);
ompi_osc_rdma_component_t mca_osc_rdma_component = {
@ -186,11 +184,6 @@ ompi_osc_rdma_component_init(bool enable_progress_threads,
{
if (!mca_bml_base_inited()) return OMPI_ERROR;
/* we can run with either threads or not threads (may not be able
to do win locks)... */
mca_osc_rdma_component.c_have_progress_threads =
enable_progress_threads;
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_modules,
@ -220,14 +213,6 @@ ompi_osc_rdma_component_init(bool enable_progress_threads,
OBJ_CLASS(ompi_osc_rdma_longreq_t),
1, -1, 1);
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_pending_requests,
opal_list_t);
#if OMPI_ENABLE_PROGRESS_THREADS
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_thread, opal_thread_t);
mca_osc_rdma_component.c_thread_run = false;
#endif
mca_osc_rdma_component.c_btl_registered = false;
mca_osc_rdma_component.c_sequence_number = 0;
@ -246,24 +231,10 @@ ompi_osc_rdma_component_finalize(void)
opal_output(ompi_osc_base_output,
"WARNING: There were %d Windows created but not freed.",
(int) num_modules);
#if OMPI_ENABLE_PROGRESS_THREADS
mca_osc_rdma_component.c_thread_run = false;
opal_condition_broadcast(&ompi_request_cond);
{
void* ret;
opal_thread_join(&mca_osc_rdma_component.c_thread, &ret);
}
#else
opal_progress_unregister(ompi_osc_rdma_component_progress);
#endif
}
mca_bml.bml_register(MCA_BTL_TAG_OSC_RDMA, NULL, NULL);
#if OMPI_ENABLE_PROGRESS_THREADS
OBJ_DESTRUCT(&mca_osc_rdma_component.c_thread);
#endif
OBJ_DESTRUCT(&mca_osc_rdma_component.c_pending_requests);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_longreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_replyreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_sendreqs);
@ -413,19 +384,6 @@ ompi_osc_rdma_component_select(ompi_win_t *win,
opal_hash_table_set_value_uint32(&mca_osc_rdma_component.c_modules,
ompi_comm_get_cid(module->m_comm),
module);
ret = opal_hash_table_get_size(&mca_osc_rdma_component.c_modules);
if (ret == 1) {
#if OMPI_ENABLE_PROGRESS_THREADS
mca_osc_rdma_component.c_thread_run = true;
mca_osc_rdma_component.c_thread.t_run = component_thread_fn;
mca_osc_rdma_component.c_thread.t_arg = NULL;
ret = opal_thread_start(&mca_osc_rdma_component.c_thread);
#else
ret = opal_progress_register(ompi_osc_rdma_component_progress);
#endif
} else {
ret = OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
if (OMPI_SUCCESS != ret) goto cleanup;
@ -935,77 +893,85 @@ component_fragment_cb(struct mca_btl_base_module_t *btl,
}
}
int
ompi_osc_rdma_component_progress(void)
ompi_osc_rdma_component_irecv(void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int src,
int tag,
struct ompi_communicator_t *comm,
ompi_request_t **request,
ompi_request_complete_fn_t callback,
void *cbdata)
{
opal_list_item_t *item;
int ret, done = 0;
int ret;
bool missed_callback;
ompi_request_complete_fn_t tmp;
#if OMPI_ENABLE_PROGRESS_THREADS
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
#else
ret = OPAL_THREAD_TRYLOCK(&mca_osc_rdma_component.c_lock);
if (ret != 0) return 0;
#endif
ret = MCA_PML_CALL(irecv(buf, count, datatype,
src, tag, comm, request));
if (OMPI_SUCCESS != ret) return ret;
for (item = opal_list_get_first(&mca_osc_rdma_component.c_pending_requests) ;
item != opal_list_get_end(&mca_osc_rdma_component.c_pending_requests) ;
item = opal_list_get_next(item)) {
ompi_osc_rdma_longreq_t *longreq =
(ompi_osc_rdma_longreq_t*) item;
/* lock the giant request mutex to update the callback data so
that the PML can't mark the request as complete while we're
updating the callback data, which means we can
deterministically ensure the callback is only fired once and
that we didn't miss it. */
OPAL_THREAD_LOCK(&ompi_request_lock);
(*request)->req_complete_cb = callback;
(*request)->req_complete_cb_data = cbdata;
missed_callback = (*request)->req_complete;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* BWB - FIX ME */
#if OMPI_ENABLE_PROGRESS_THREADS == 0
if (longreq->request->req_state == OMPI_REQUEST_INACTIVE ||
longreq->request->req_complete) {
ret = ompi_request_test(&longreq->request,
&done,
0);
} else {
done = 0;
ret = OMPI_SUCCESS;
}
#else
ret = ompi_request_test(&longreq->request,
&done,
0);
#endif
if (OMPI_SUCCESS == ret && 0 != done) {
opal_list_remove_item(&mca_osc_rdma_component.c_pending_requests,
item);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
longreq->cbfunc(longreq);
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
break;
}
if (missed_callback) {
tmp = (*request)->req_complete_cb;
(*request)->req_complete_cb = NULL;
tmp(*request);
}
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
return done;
return OMPI_SUCCESS;
}
#if OMPI_ENABLE_PROGRESS_THREADS
static void*
component_thread_fn(opal_object_t *obj)
int
ompi_osc_rdma_component_isend(void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int dest,
int tag,
struct ompi_communicator_t *comm,
ompi_request_t **request,
ompi_request_complete_fn_t callback,
void *cbdata)
{
struct timespec waittime;
int ret;
bool missed_callback;
ompi_request_complete_fn_t tmp;
while (mca_osc_rdma_component.c_thread_run) {
/* wake up whenever a request completes, to make sure it's not
for us */
waittime.tv_sec = 1;
waittime.tv_nsec = 0;
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_condition_timedwait(&ompi_request_cond, &ompi_request_lock, &waittime);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
ompi_osc_rdma_component_progress();
ret = MCA_PML_CALL(isend(buf, count, datatype,
dest, tag, MCA_PML_BASE_SEND_STANDARD, comm, request));
if (OMPI_SUCCESS != ret) return ret;
/* lock the giant request mutex to update the callback data so
that the PML can't mark the request as complete while we're
updating the callback data, which means we can
deterministically ensure the callback is only fired once and
that we didn't miss it. */
OPAL_THREAD_LOCK(&ompi_request_lock);
(*request)->req_complete_cb = callback;
(*request)->req_complete_cb_data = cbdata;
missed_callback = (*request)->req_complete;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if (missed_callback) {
tmp = (*request)->req_complete_cb;
(*request)->req_complete_cb = NULL;
tmp(*request);
}
return NULL;
return OMPI_SUCCESS;
}
#endif
/*********** RDMA setup stuff ***********/

Просмотреть файл

@ -253,11 +253,12 @@ ompi_osc_rdma_sendreq_rdma(ompi_osc_rdma_module_t *module,
* Sending a sendreq to target
*
**********************************************************************/
static void
ompi_osc_rdma_sendreq_send_long_cb(ompi_osc_rdma_longreq_t *longreq)
static int
ompi_osc_rdma_sendreq_send_long_cb(ompi_request_t *request)
{
ompi_osc_rdma_sendreq_t *sendreq =
(ompi_osc_rdma_sendreq_t*) longreq->cbdata;
ompi_osc_rdma_longreq_t *longreq =
(ompi_osc_rdma_longreq_t*) request->req_complete_cb_data;
ompi_osc_rdma_sendreq_t *sendreq = longreq->req_basereq.req_sendreq;
int32_t count;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
@ -273,6 +274,9 @@ ompi_osc_rdma_sendreq_send_long_cb(ompi_osc_rdma_longreq_t *longreq)
ompi_osc_rdma_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->m_cond);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -332,28 +336,23 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->cbfunc = ompi_osc_rdma_sendreq_send_long_cb;
longreq->cbdata = sendreq;
longreq->req_basereq.req_sendreq = sendreq;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d starting long sendreq to %d (%d)",
ompi_comm_rank(sendreq->req_module->m_comm),
sendreq->req_target_rank,
header->hdr_origin_tag));
mca_pml.pml_isend(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_origin_tag,
MCA_PML_BASE_SEND_STANDARD,
sendreq->req_module->m_comm,
&(longreq->request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
ompi_osc_rdma_component_isend(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_origin_tag,
sendreq->req_module->m_comm,
&(longreq->request),
ompi_osc_rdma_sendreq_send_long_cb,
longreq);
}
} else {
ompi_osc_rdma_sendreq_free(sendreq);
@ -647,16 +646,21 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
* Sending a replyreq back to origin
*
**********************************************************************/
static void
ompi_osc_rdma_replyreq_send_long_cb(ompi_osc_rdma_longreq_t *longreq)
static int
ompi_osc_rdma_replyreq_send_long_cb(ompi_request_t *request)
{
ompi_osc_rdma_replyreq_t *replyreq =
(ompi_osc_rdma_replyreq_t*) longreq->cbdata;
ompi_osc_rdma_longreq_t *longreq =
(ompi_osc_rdma_longreq_t*) request->req_complete_cb_data;
ompi_osc_rdma_replyreq_t *replyreq = longreq->req_basereq.req_replyreq;
inmsg_mark_complete(replyreq->rep_module);
ompi_osc_rdma_longreq_free(longreq);
ompi_osc_rdma_replyreq_free(replyreq);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -692,24 +696,17 @@ ompi_osc_rdma_replyreq_send_cb(struct mca_btl_base_module_t* btl,
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_basereq.req_replyreq = replyreq;
longreq->cbfunc = ompi_osc_rdma_replyreq_send_long_cb;
longreq->cbdata = replyreq;
mca_pml.pml_isend(replyreq->rep_target_convertor.pBaseBuf,
replyreq->rep_target_convertor.count,
replyreq->rep_target_datatype,
replyreq->rep_origin_rank,
header->hdr_target_tag,
MCA_PML_BASE_SEND_STANDARD,
replyreq->rep_module->m_comm,
&(longreq->request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
ompi_osc_rdma_component_isend(replyreq->rep_target_convertor.pBaseBuf,
replyreq->rep_target_convertor.count,
replyreq->rep_target_datatype,
replyreq->rep_origin_rank,
header->hdr_target_tag,
replyreq->rep_module->m_comm,
&(longreq->request),
ompi_osc_rdma_replyreq_send_long_cb,
longreq);
}
/* release the descriptor and replyreq */
@ -821,17 +818,24 @@ ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module,
* Receive a put on the target side
*
**********************************************************************/
static void
ompi_osc_rdma_sendreq_recv_put_long_cb(ompi_osc_rdma_longreq_t *longreq)
static int
ompi_osc_rdma_sendreq_recv_put_long_cb(ompi_request_t *request)
{
ompi_osc_rdma_longreq_t *longreq =
(ompi_osc_rdma_longreq_t*) request->req_complete_cb_data;
OBJ_RELEASE(longreq->req_datatype);
ompi_osc_rdma_longreq_free(longreq);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d finished receiving long put message",
ompi_comm_rank(longreq->req_module->m_comm)));
inmsg_mark_complete(longreq->req_module);
ompi_osc_rdma_longreq_free(longreq);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -897,31 +901,24 @@ ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->cbfunc = ompi_osc_rdma_sendreq_recv_put_long_cb;
longreq->cbdata = NULL;
longreq->req_datatype = datatype;
longreq->req_module = module;
ret = mca_pml.pml_irecv(target,
header->hdr_target_count,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->m_comm,
&(longreq->request));
ompi_osc_rdma_component_irecv(target,
header->hdr_target_count,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->m_comm,
&(longreq->request),
ompi_osc_rdma_sendreq_recv_put_long_cb,
longreq);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d started long recv put message from %d (%d)",
ompi_comm_rank(module->m_comm),
header->hdr_origin,
header->hdr_origin_tag));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
}
return ret;
@ -937,11 +934,12 @@ ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
**********************************************************************/
static void
ompi_osc_rdma_sendreq_recv_accum_long_cb(ompi_osc_rdma_longreq_t *longreq)
static int
ompi_osc_rdma_sendreq_recv_accum_long_cb(ompi_request_t *request)
{
ompi_osc_rdma_send_header_t *header =
(ompi_osc_rdma_send_header_t*) longreq->cbdata;
ompi_osc_rdma_longreq_t *longreq =
(ompi_osc_rdma_longreq_t*) request->req_complete_cb_data;
ompi_osc_rdma_send_header_t *header = longreq->req_basereq.req_sendhdr;
void *payload = (void*) (header + 1);
int ret;
ompi_osc_rdma_module_t *module = longreq->req_module;
@ -1004,7 +1002,7 @@ ompi_osc_rdma_sendreq_recv_accum_long_cb(ompi_osc_rdma_longreq_t *longreq)
header->hdr_origin));
/* free the temp buffer */
free(longreq->cbdata);
free(longreq->req_basereq.req_sendhdr);
/* Release datatype & op */
OBJ_RELEASE(longreq->req_datatype);
@ -1013,6 +1011,10 @@ ompi_osc_rdma_sendreq_recv_accum_long_cb(ompi_osc_rdma_longreq_t *longreq)
inmsg_mark_complete(longreq->req_module);
ompi_osc_rdma_longreq_free(longreq);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -1173,39 +1175,34 @@ ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
/* get a longreq and fill it in */
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->cbfunc = ompi_osc_rdma_sendreq_recv_accum_long_cb;
longreq->req_datatype = datatype;
longreq->req_op = op;
longreq->req_module = module;
/* allocate a buffer to receive into ... */
longreq->cbdata = malloc(buflen + sizeof(ompi_osc_rdma_send_header_t));
longreq->req_basereq.req_sendhdr = malloc(buflen + sizeof(ompi_osc_rdma_send_header_t));
if (NULL == longreq->cbdata) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
if (NULL == longreq->req_basereq.req_sendhdr) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
/* fill in tmp header */
memcpy(longreq->cbdata, header,
memcpy(longreq->req_basereq.req_sendhdr, header,
sizeof(ompi_osc_rdma_send_header_t));
((ompi_osc_rdma_send_header_t*) longreq->cbdata)->hdr_msg_length = buflen;
longreq->req_basereq.req_sendhdr->hdr_msg_length = buflen;
ret = mca_pml.pml_irecv(((char*) longreq->cbdata) + sizeof(ompi_osc_rdma_send_header_t),
primitive_count,
primitive_datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->m_comm,
&(longreq->request));
ompi_osc_rdma_component_irecv(longreq->req_basereq.req_sendhdr + 1,
primitive_count,
primitive_datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->m_comm,
&(longreq->request),
ompi_osc_rdma_sendreq_recv_accum_long_cb,
longreq);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d started long recv accum message from %d (%d)",
ompi_comm_rank(module->m_comm),
header->hdr_origin,
header->hdr_origin_tag));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
}
return ret;
@ -1217,11 +1214,13 @@ ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
* Recveive a get on the origin side
*
**********************************************************************/
static void
ompi_osc_rdma_replyreq_recv_long_cb(ompi_osc_rdma_longreq_t *longreq)
static int
ompi_osc_rdma_replyreq_recv_long_cb(ompi_request_t *request)
{
ompi_osc_rdma_longreq_t *longreq =
(ompi_osc_rdma_longreq_t*) request->req_complete_cb_data;
ompi_osc_rdma_sendreq_t *sendreq =
(ompi_osc_rdma_sendreq_t*) longreq->cbdata;
(ompi_osc_rdma_sendreq_t*) longreq->req_basereq.req_sendreq;
int32_t count;
OPAL_THREAD_LOCK(&sendreq->req_module->m_lock);
@ -1232,6 +1231,10 @@ ompi_osc_rdma_replyreq_recv_long_cb(ompi_osc_rdma_longreq_t *longreq)
ompi_osc_rdma_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->m_cond);
ompi_request_free(&request);
return OMPI_SUCCESS;
}
@ -1277,24 +1280,18 @@ ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->cbfunc = ompi_osc_rdma_replyreq_recv_long_cb;
longreq->cbdata = sendreq;
longreq->req_basereq.req_sendreq = sendreq;
longreq->req_module = module;
/* BWB - FIX ME - George is going to kill me for this */
ret = mca_pml.pml_irecv(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_target_tag,
module->m_comm,
&(longreq->request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
ret = ompi_osc_rdma_component_irecv(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_target_tag,
module->m_comm,
&(longreq->request),
ompi_osc_rdma_replyreq_recv_long_cb,
longreq);
}
return ret;

Просмотреть файл

@ -26,16 +26,15 @@
#include "ompi/request/request.h"
#include "ompi/op/op.h"
struct ompi_osc_rdma_longreq_t;
typedef struct ompi_osc_rdma_longreq_t ompi_osc_rdma_longreq_t;
typedef void (*ompi_osc_rdma_longreq_cb_fn_t)(ompi_osc_rdma_longreq_t *longreq);
struct ompi_osc_rdma_longreq_t {
opal_free_list_item_t super;
ompi_request_t *request;
ompi_osc_rdma_longreq_cb_fn_t cbfunc;
void *cbdata;
union {
struct ompi_osc_rdma_sendreq_t *req_sendreq;
struct ompi_osc_rdma_replyreq_t *req_replyreq;
struct ompi_osc_rdma_send_header_t *req_sendhdr;
} req_basereq;
/* warning - this doesn't always have a sane value */
ompi_osc_rdma_module_t *req_module;
@ -44,6 +43,7 @@ struct ompi_osc_rdma_longreq_t {
struct ompi_op_t *req_op;
struct ompi_datatype_t *req_datatype;
};
typedef struct ompi_osc_rdma_longreq_t ompi_osc_rdma_longreq_t;
OBJ_CLASS_DECLARATION(ompi_osc_rdma_longreq_t);
static inline int

Просмотреть файл

@ -52,6 +52,7 @@ static void ompi_request_construct(ompi_request_t* req)
req->req_free = NULL;
req->req_cancel = NULL;
req->req_complete_cb = NULL;
req->req_complete_cb_data = NULL;
req->req_f_to_c_index = MPI_UNDEFINED;
req->req_mpi_object.comm = (struct ompi_communicator_t*) NULL;
}

Просмотреть файл

@ -106,6 +106,7 @@ struct ompi_request_t {
ompi_request_free_fn_t req_free; /**< Called by free */
ompi_request_cancel_fn_t req_cancel; /**< Optional function to cancel the request */
ompi_request_complete_fn_t req_complete_cb; /**< Called when the request is MPI completed */
void *req_complete_cb_data;
ompi_mpi_object_t req_mpi_object; /**< Pointer to MPI object that created this request */
};
@ -392,8 +393,10 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
*/
static inline int ompi_request_complete(ompi_request_t* request, bool with_signal)
{
if( NULL != request->req_complete_cb ) {
request->req_complete_cb( request );
ompi_request_complete_fn_t tmp = request->req_complete_cb;
if( NULL != tmp ) {
request->req_complete_cb = NULL;
tmp( request );
}
ompi_request_completed++;
request->req_complete = true;