1
1

* support non-predefined datatypes for all three communication mechanisms

* rework the thread locking so that it at least makes sense to me.  Still
  need to do a bunch of testing before I'm happy with it, but it's a tad
  bit closer...

This commit was SVN r8918.
Этот коммит содержится в:
Brian Barrett 2006-02-07 12:16:23 +00:00
родитель e0a814d3a7
Коммит 340bf14191
8 изменённых файлов: 414 добавлений и 321 удалений

Просмотреть файл

@ -30,7 +30,7 @@ int
ompi_osc_pt2pt_module_free(ompi_win_t *win)
{
int ret = OMPI_SUCCESS;
int i, tmp;
int tmp;
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
if ((OMPI_WIN_ACCESS_EPOCH & win->w_flags) ||
@ -56,19 +56,24 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win)
ret = (ret != OMPI_SUCCESS) ? ret : tmp;
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
/* clean up p2p module part */
for (i = 0 ; i < ompi_comm_size(module->p2p_comm) ; ++i) {
OBJ_DESTRUCT(&(module->p2p_pending_out_sendreqs[i]));
}
free(module->p2p_pending_out_sendreqs);
module->p2p_pending_out_sendreqs = NULL;
assert(module->p2p_sc_group == NULL);
assert(module->p2p_pw_group == NULL);
free(module->p2p_fence_coll_counts);
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&(module->p2p_copy_pending_sendreqs));
OBJ_DESTRUCT(&(module->p2p_long_msgs));
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&(module->p2p_pending_sendreqs));
ompi_comm_free(&(module->p2p_comm));
module->p2p_comm = NULL;
module->p2p_win = NULL;
OBJ_DESTRUCT(&(module->p2p_long_msgs));
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));

Просмотреть файл

@ -67,35 +67,53 @@ struct ompi_osc_pt2pt_module_t {
/** communicator created with this window */
ompi_communicator_t *p2p_comm;
/** array of opal_list_ts, not a pointer to one of them. Array is
of size <num ranks in communicator>, although only the first
<num ranks in group> are used for PWSC synchronization */
opal_list_t *p2p_pending_out_sendreqs;
/** list of ompi_osc_pt2pt_sendreq_t structures, and includes all
requests for this access epoch that have not already been
started. p2p_lock must be held when modifying this field. */
opal_list_t p2p_pending_sendreqs;
/* For MPI_Fence synchronization, the number of messages to send
in epoch. For Start/Complete, the number of updates for this
Complete. For Post/Wait (poorly named), the number of Complete
counters we're waiting for.*/
/** list of int16_t counters for the number of requests to a
particular rank in p2p_comm for this access epoc. p2p_lock
must be held when modifying this field */
short *p2p_num_pending_sendreqs;
/** For MPI_Fence synchronization, the number of messages to send
in epoch. For Start/Complete, the number of updates for this
Complete. For Post/Wait (poorly named), the number of
Complete counters we're waiting for. Not protected by
p2p_lock - must use atomic counter operations. */
volatile int32_t p2p_num_pending_out;
/* For MPI_Fence synchronization, the number of expected incoming
messages. For Start/Complete, the number of expected Post
messages. For Post/Wait, the number of expected updates from
complete. */
/** For MPI_Fence synchronization, the number of expected incoming
messages. For Start/Complete, the number of expected Post
messages. For Post/Wait, the number of expected updates from
complete. Not protected by p2p_lock - must use atomic counter
operations. */
volatile int32_t p2p_num_pending_in;
/* cyclic counter for a unique tage for long messages. Not
protected by the p2p_lock - must use create_send_tag() to
create a send tag */
/** cyclic counter for a unique tage for long messages. Not
protected by the p2p_lock - must use create_send_tag() to
create a send tag */
volatile int32_t p2p_tag_counter;
/** list of outstanding long messages that must be processes
(ompi_osc_pt2pt_request_long) */
(ompi_osc_pt2pt_request_long). Protected by p2p_lock. */
opal_list_t p2p_long_msgs;
/** number of outstanding long messages */
volatile int32_t p2p_num_long_msgs;
struct ompi_group_t *pw_group;
struct ompi_group_t *sc_group;
opal_list_t p2p_copy_pending_sendreqs;
short *p2p_copy_num_pending_sendreqs;
/* ********************* FENCE data ************************ */
/* an array of <sizeof(p2p_comm)> ints, each containing the value
1. */
int *p2p_fence_coll_counts;
/* ********************* PWSC data ************************ */
struct ompi_group_t *p2p_pw_group;
struct ompi_group_t *p2p_sc_group;
/* ********************* LOCK data ************************ */
};
typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t;

Просмотреть файл

@ -24,8 +24,9 @@ enqueue_sendreq(ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_sendreq_t *sendreq)
{
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_pending_out_sendreqs[sendreq->req_target_rank]),
opal_list_append(&(module->p2p_pending_sendreqs),
(opal_list_item_t*) sendreq);
module->p2p_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
return OMPI_SUCCESS;

Просмотреть файл

@ -16,6 +16,8 @@
#include "ompi_config.h"
#include <string.h>
#include "osc_pt2pt.h"
#include "osc_pt2pt_sendreq.h"
#include "osc_pt2pt_replyreq.h"
@ -192,7 +194,6 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
{
ompi_osc_pt2pt_module_t *module;
int ret, i;
/* create module structure */
module = malloc(sizeof(ompi_osc_pt2pt_module_t));
if (NULL == module) return OMPI_ERROR;
@ -215,11 +216,52 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
return ret;
}
if (!want_locks(info)) win->w_flags |= OMPI_WIN_NO_LOCKS;
module->p2p_pending_out_sendreqs = malloc(sizeof(opal_list_t) *
OBJ_CONSTRUCT(&module->p2p_pending_sendreqs, opal_list_t);
module->p2p_num_pending_sendreqs = malloc(sizeof(short) *
ompi_comm_size(module->p2p_comm));
if (NULL == module) {
if (NULL == module->p2p_num_pending_sendreqs) {
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
}
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(short) * ompi_comm_size(module->p2p_comm));
module->p2p_num_pending_out = 0;
module->p2p_num_pending_in = 0;
module->p2p_tag_counter = 0;
OBJ_CONSTRUCT(&(module->p2p_long_msgs), opal_list_t);
OBJ_CONSTRUCT(&(module->p2p_copy_pending_sendreqs), opal_list_t);
module->p2p_copy_num_pending_sendreqs = malloc(sizeof(short) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_copy_num_pending_sendreqs) {
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
}
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(short) * ompi_comm_size(module->p2p_comm));
/* fence data */
module->p2p_fence_coll_counts = malloc(sizeof(int) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_fence_coll_counts) {
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
@ -227,15 +269,14 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
return ret;
}
for (i = 0 ; i < ompi_comm_size(module->p2p_comm) ; ++i) {
OBJ_CONSTRUCT(&(module->p2p_pending_out_sendreqs[i]), opal_list_t);
module->p2p_fence_coll_counts[i] = 1;
}
module->p2p_num_pending_out = 0;
module->p2p_num_pending_in = 0;
module->p2p_tag_counter = 0;
/* pwsc data */
module->p2p_pw_group = NULL;
module->p2p_sc_group = NULL;
OBJ_CONSTRUCT(&(module->p2p_long_msgs), opal_list_t);
module->p2p_num_long_msgs = 0;
/* lock data */
/* update component data */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
@ -246,6 +287,7 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
/* fill in window information */
win->w_osc_module = (ompi_osc_base_module_t*) module;
if (!want_locks(info)) win->w_flags |= OMPI_WIN_NO_LOCKS;
/* register to receive fragment callbacks */
ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT,
@ -272,10 +314,10 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
void *payload;
assert(descriptor->des_dst[0].seg_len >=
sizeof(ompi_osc_pt2pt_type_header_t));
sizeof(ompi_osc_pt2pt_base_header_t));
/* handle message */
switch (((ompi_osc_pt2pt_type_header_t*) descriptor->des_dst[0].seg_addr.pval)->hdr_type) {
switch (((ompi_osc_pt2pt_base_header_t*) descriptor->des_dst[0].seg_addr.pval)->hdr_type) {
case OMPI_OSC_PT2PT_HDR_PUT:
{
ompi_osc_pt2pt_send_header_t *header;
@ -316,6 +358,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
ompi_datatype_t *datatype;
ompi_osc_pt2pt_send_header_t *header;
ompi_osc_pt2pt_replyreq_t *replyreq;
ompi_proc_t *proc;
/* get our header and payload */
header = (ompi_osc_pt2pt_send_header_t*)
@ -327,7 +370,8 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
if (NULL == module) return;
/* create or get a pointer to our datatype */
datatype = ompi_osc_pt2pt_datatype_create(header->hdr_target_dt_id, &payload);
proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi;
datatype = ompi_osc_pt2pt_datatype_create(proc, &payload);
/* create replyreq sendreq */
ret = ompi_osc_pt2pt_replyreq_alloc_init(module,

Просмотреть файл

@ -28,7 +28,7 @@
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/datatype/datatype.h"
static inline int32_t
create_send_tag(ompi_osc_pt2pt_module_t *module)
@ -66,10 +66,7 @@ ompi_osc_pt2pt_sendreq_send_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
ompi_osc_pt2pt_sendreq_t *sendreq =
(ompi_osc_pt2pt_sendreq_t*) longreq->req_comp_cbdata;
OPAL_THREAD_LOCK(&(sendreq->req_module->p2p_lock));
opal_list_remove_item(&(sendreq->req_module->p2p_long_msgs), &(longreq->super));
sendreq->req_module->p2p_num_long_msgs--;
OPAL_THREAD_UNLOCK(&(sendreq->req_module->p2p_lock));
ompi_osc_pt2pt_longreq_free(longreq);
@ -121,7 +118,6 @@ ompi_osc_pt2pt_sendreq_send_cb(struct mca_btl_base_module_t* btl,
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(sendreq->req_module->p2p_lock));
opal_list_append(&(sendreq->req_module->p2p_long_msgs), &(longreq->super));
sendreq->req_module->p2p_num_long_msgs++;
OPAL_THREAD_UNLOCK(&(sendreq->req_module->p2p_lock));
}
}
@ -146,11 +142,21 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
mca_btl_base_descriptor_t *descriptor = NULL;
ompi_osc_pt2pt_send_header_t *header = NULL;
size_t written_data = 0;
/* Get a BTL and a fragment to go with it */
size_t needed_len = sizeof(ompi_osc_pt2pt_send_header_t);
const void *packed_ddt;
size_t packed_ddt_len = ompi_ddt_pack_description_length(sendreq->req_target_datatype);
/* we always need to send the ddt */
needed_len += packed_ddt_len;
if (OMPI_OSC_PT2PT_GET != sendreq->req_type) {
needed_len += sendreq->req_origin_bytes_packed;
}
/* Get a BTL so we have the eager limit */
endpoint = (mca_bml_base_endpoint_t*) sendreq->req_target_proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
descriptor = bml_btl->btl_alloc(bml_btl->btl,
needed_len < bml_btl->btl_eager_limit ? needed_len :
bml_btl->btl_eager_limit);
if (NULL == descriptor) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
@ -180,19 +186,19 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
switch (sendreq->req_type) {
case OMPI_OSC_PT2PT_PUT:
header->hdr_type = OMPI_OSC_PT2PT_HDR_PUT;
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_PUT;
#if OMPI_ENABLE_MEM_DEBUG
header->hdr_target_op = 0;
#endif
break;
case OMPI_OSC_PT2PT_ACC:
header->hdr_type = OMPI_OSC_PT2PT_HDR_ACC;
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_ACC;
header->hdr_target_op = sendreq->req_op_id;
break;
case OMPI_OSC_PT2PT_GET:
header->hdr_type = OMPI_OSC_PT2PT_HDR_GET;
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_GET;
#if OMPI_ENABLE_MEM_DEBUG
header->hdr_target_op = 0;
#endif
@ -200,16 +206,11 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
}
/* Set datatype id and / or pack datatype */
if (ompi_ddt_is_predefined(sendreq->req_target_datatype)) {
header->hdr_target_dt_id = sendreq->req_target_datatype->d_f_to_c_index;
/* does not extend written_data, as nothing extra added */
} else {
header->hdr_target_dt_id = -1;
/* BWB - FIX ME - implement this datatype thing */
opal_output(0, "Datatype is not predefined. aborting.");
abort();
}
ret = ompi_ddt_get_pack_description(sendreq->req_target_datatype, &packed_ddt);
if (OMPI_SUCCESS != ret) goto cleanup;
memcpy((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data,
packed_ddt, packed_ddt_len);
written_data += packed_ddt_len;
if (OMPI_OSC_PT2PT_GET != sendreq->req_type) {
/* if sending data and it fits, pack payload */
@ -274,10 +275,7 @@ ompi_osc_pt2pt_replyreq_send_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
ompi_osc_pt2pt_replyreq_t *replyreq =
(ompi_osc_pt2pt_replyreq_t*) longreq->req_comp_cbdata;
OPAL_THREAD_LOCK(&(replyreq->rep_module->p2p_lock));
opal_list_remove_item(&(replyreq->rep_module->p2p_long_msgs), &(longreq->super));
replyreq->rep_module->p2p_num_long_msgs--;
OPAL_THREAD_UNLOCK(&(replyreq->rep_module->p2p_lock));
ompi_osc_pt2pt_longreq_free(longreq);
@ -328,7 +326,6 @@ ompi_osc_pt2pt_replyreq_send_cb(struct mca_btl_base_module_t* btl,
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(replyreq->rep_module->p2p_lock));
opal_list_append(&(replyreq->rep_module->p2p_long_msgs), &(longreq->super));
replyreq->rep_module->p2p_num_long_msgs++;
OPAL_THREAD_UNLOCK(&(replyreq->rep_module->p2p_lock));
}
@ -374,7 +371,7 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
/* pack header */
header = (ompi_osc_pt2pt_reply_header_t*) descriptor->des_src[0].seg_addr.pval;
written_data += sizeof(ompi_osc_pt2pt_reply_header_t);
header->hdr_type = OMPI_OSC_PT2PT_HDR_REPLY;
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_REPLY;
header->hdr_origin_sendreq = replyreq->rep_origin_sendreq;
header->hdr_target_tag = 0;
@ -433,10 +430,7 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
static void
ompi_osc_pt2pt_sendreq_recv_put_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
{
OPAL_THREAD_LOCK(&(longreq->req_module->p2p_lock));
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs), &(longreq->super));
longreq->req_module->p2p_num_long_msgs--;
OPAL_THREAD_UNLOCK(&(longreq->req_module->p2p_lock));
OBJ_RELEASE(longreq->req_datatype);
ompi_osc_pt2pt_longreq_free(longreq);
@ -453,7 +447,9 @@ ompi_osc_pt2pt_sendreq_recv_put(ompi_osc_pt2pt_module_t *module,
int ret = OMPI_SUCCESS;
void *target = (unsigned char*) module->p2p_win->w_baseptr +
(header->hdr_target_disp * module->p2p_win->w_disp_unit);
struct ompi_datatype_t *datatype = ompi_osc_pt2pt_datatype_create(header->hdr_target_dt_id, &inbuf);
ompi_proc_t *proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi;
struct ompi_datatype_t *datatype =
ompi_osc_pt2pt_datatype_create(proc, &inbuf);
if (header->hdr_msg_length > 0) {
ompi_convertor_t convertor;
@ -505,7 +501,6 @@ ompi_osc_pt2pt_sendreq_recv_put(ompi_osc_pt2pt_module_t *module,
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_long_msgs), &(longreq->super));
module->p2p_num_long_msgs++;
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
@ -551,10 +546,7 @@ ompi_osc_pt2pt_sendreq_recv_accum_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
OPAL_THREAD_ADD32(&(longreq->req_module->p2p_num_pending_in), -1);
OPAL_THREAD_LOCK(&(longreq->req_module->p2p_lock));
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs), &(longreq->super));
longreq->req_module->p2p_num_long_msgs--;
OPAL_THREAD_UNLOCK(&(longreq->req_module->p2p_lock));
OBJ_RELEASE(longreq->req_datatype);
OBJ_RELEASE(longreq->req_op);
@ -570,8 +562,10 @@ ompi_osc_pt2pt_sendreq_recv_accum(ompi_osc_pt2pt_module_t *module,
void *payload)
{
int ret = OMPI_SUCCESS;
struct ompi_datatype_t *datatype = ompi_osc_pt2pt_datatype_create(header->hdr_target_dt_id, &payload);
struct ompi_op_t *op = ompi_osc_pt2pt_op_create(header->hdr_target_op);
ompi_proc_t *proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi;
struct ompi_datatype_t *datatype =
ompi_osc_pt2pt_datatype_create(proc, &payload);
if (header->hdr_msg_length > 0) {
/* lock the window for accumulates */
@ -637,10 +631,7 @@ ompi_osc_pt2pt_replyreq_recv_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
ompi_osc_pt2pt_sendreq_t *sendreq =
(ompi_osc_pt2pt_sendreq_t*) longreq->req_comp_cbdata;
OPAL_THREAD_LOCK(&(longreq->req_module->p2p_lock));
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs), &(longreq->super));
longreq->req_module->p2p_num_long_msgs--;
OPAL_THREAD_UNLOCK(&(longreq->req_module->p2p_lock));
ompi_osc_pt2pt_longreq_free(longreq);
@ -696,7 +687,6 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_long_msgs), &(longreq->super));
module->p2p_num_long_msgs++;
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
@ -755,7 +745,7 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
/* pack header */
header = (ompi_osc_pt2pt_control_header_t*) descriptor->des_src[0].seg_addr.pval;
header->hdr_type = type;
header->hdr_base.hdr_type = type;
header->hdr_value = value;
header->hdr_windx = module->p2p_comm->c_contextid;

Просмотреть файл

@ -21,50 +21,6 @@
#include <netinet/in.h>
#endif
struct ompi_osc_pt2pt_type_header_t {
uint8_t hdr_type;
};
typedef struct ompi_osc_pt2pt_type_header_t ompi_osc_pt2pt_type_header_t;
struct ompi_osc_pt2pt_send_header_t {
uint8_t hdr_type;
uint32_t hdr_windx;
int32_t hdr_origin;
ompi_ptr_t hdr_origin_sendreq;
int32_t hdr_origin_tag;
int16_t hdr_target_dt_id;
int32_t hdr_target_disp;
int32_t hdr_target_count;
int32_t hdr_target_op;
int32_t hdr_msg_length; /* 0 if payload is not included */
};
typedef struct ompi_osc_pt2pt_send_header_t ompi_osc_pt2pt_send_header_t;
struct ompi_osc_pt2pt_reply_header_t {
uint8_t hdr_type;
ompi_ptr_t hdr_origin_sendreq;
int32_t hdr_target_tag;
int32_t hdr_msg_length;
};
typedef struct ompi_osc_pt2pt_reply_header_t ompi_osc_pt2pt_reply_header_t;
struct ompi_osc_pt2pt_control_header_t {
uint8_t hdr_type;
int32_t hdr_windx;
int32_t hdr_value;
};
typedef struct ompi_osc_pt2pt_control_header_t ompi_osc_pt2pt_control_header_t;
struct ompi_osc_pt2pt_lock_header_t {
uint8_t hdr_type;
int32_t hdr_windx;
int32_t hdr_origin;
};
typedef struct ompi_osc_pt2pt_lock_header_t ompi_osc_pt2pt_lock_header_t;
#define OMPI_OSC_PT2PT_HDR_PUT 0x0001
#define OMPI_OSC_PT2PT_HDR_ACC 0x0002
@ -74,6 +30,108 @@ typedef struct ompi_osc_pt2pt_lock_header_t ompi_osc_pt2pt_lock_header_t;
#define OMPI_OSC_PT2PT_HDR_COMPLETE 0x0020
#define OMPI_OSC_PT2PT_HDR_LOCK 0x0040
struct ompi_osc_pt2pt_base_header_t {
uint8_t hdr_type;
/* eventually, this will include endian information */
uint8_t hdr_flags;
};
typedef struct ompi_osc_pt2pt_base_header_t ompi_osc_pt2pt_base_header_t;
#define OMPI_OSC_PT2PT_BASE_HDR_NTOH(h)
#define OMPI_OSC_PT2PT_BASE_HDR_HTON(h)
struct ompi_osc_pt2pt_send_header_t {
ompi_osc_pt2pt_base_header_t hdr_base;
uint16_t hdr_windx;
int32_t hdr_origin;
ompi_ptr_t hdr_origin_sendreq;
int32_t hdr_origin_tag;
int32_t hdr_target_disp;
int32_t hdr_target_count;
int32_t hdr_target_op;
int32_t hdr_msg_length; /* 0 if payload is not included */
};
typedef struct ompi_osc_pt2pt_send_header_t ompi_osc_pt2pt_send_header_t;
#define OMPI_OSC_PT2PT_REQ_HDR_HTON(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
(hdr).hdr_windx = htons((hdr).hdr_windx); \
(hdr).hdr_origin = htonl((hdr).hdr_origin); \
(hdr).hdr_origin_sendreq = hton64((hdr).hdr_origin_sendreq); \
(hdr).hdr_origin_tag = htonl((hdr).hdr_origin_tag); \
(hdr).hdr_target_disp = htonl((hdr).hdr_target_disp); \
(hdr).hdr_target_count = htonl((hdr).hdr_target_count); \
(hdr).hdr_target_op = htonl((hdr).hdr_target_op); \
(hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \
} while (0)
#define OMPI_OSC_PT2PT_REQ_HDR_NTOH(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
(hdr).hdr_windx = ntohs((hdr).hdr_windx); \
(hdr).hdr_origin = ntohl((hdr).hdr_origin); \
(hdr).hdr_origin_sendreq = ntoh64((hdr).hdr_origin_sendreq); \
(hdr).hdr_origin_tag = ntohl((hdr).hdr_origin_tag); \
(hdr).hdr_target_disp = ntohl((hdr).hdr_target_disp); \
(hdr).hdr_target_count = ntohl((hdr).hdr_target_count); \
(hdr).hdr_target_op = ntohl((hdr).hdr_target_op); \
(hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \
} while (0)
struct ompi_osc_pt2pt_reply_header_t {
ompi_osc_pt2pt_base_header_t hdr_base;
ompi_ptr_t hdr_origin_sendreq;
int32_t hdr_target_tag;
int32_t hdr_msg_length;
};
typedef struct ompi_osc_pt2pt_reply_header_t ompi_osc_pt2pt_reply_header_t;
#define OMPI_OSC_PT2PT_REPLY_HDR_HTON(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
(hdr).hdr_origin_sendreq = hton64((hdr).hdr_origin_sendreq); \
(hdr).hdr_target_tag = htonl((hdr).hdr_target_tag); \
(hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \
} while (0)
#define OMPI_OSC_PT2PT_REPLY_HDR_NTOH(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
(hdr).hdr_origin_sendreq = ntoh64((hdr).hdr_origin_sendreq); \
(hdr).hdr_target_tag = ntohl((hdr).hdr_target_tag); \
(hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \
} while (0)
struct ompi_osc_pt2pt_control_header_t {
ompi_osc_pt2pt_base_header_t hdr_base;
int16_t hdr_windx;
int32_t hdr_value;
};
typedef struct ompi_osc_pt2pt_control_header_t ompi_osc_pt2pt_control_header_t;
#define OMPI_OSC_PT2PT_CONTROL_HDR_HTON(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
(hdr).hdr_windx = htons((hdr).hdr_windx); \
(hdr).hdr_value = htonl((hdr).hdr_value); \
} while (0)
#define OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
(hdr).hdr_windx = ntohs((hdr).hdr_windx); \
(hdr).hdr_value = ntohl((hdr).hdr_value); \
} while (0)
/*
* Convert a 64 bit value to network byte order.
*/
@ -112,44 +170,4 @@ static inline uint64_t ntoh64(uint64_t val)
}
#define OMPI_OSC_PT2PT_REQ_HDR_HTON(hdr) \
do { \
(hdr).hdr_windx = htonl((hdr).hdr_windx); \
(hdr).hdr_origin = htonl((hdr).hdr_origin); \
(hdr).hdr_origin_sendreq = hton64((hdr).hdr_origin_sendreq); \
(hdr).hdr_origin_tag = htonl((hdr).hdr_origin_tag); \
(hdr).hdr_target_dt_id = htons((hdr).hdr_target_dt_id); \
(hdr).hdr_target_disp = htonl((hdr).hdr_target_disp); \
(hdr).hdr_target_count = htonl((hdr).hdr_target_count); \
(hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \
(hdr).hdr_target_op = htonl((hdr).hdr_target_op); \
} while (0)
#define OMPI_OSC_PT2PT_REQ_HDR_NTOH(hdr) \
do { \
(hdr).hdr_windx = ntohl((hdr).hdr_windx); \
(hdr).hdr_origin = ntohl((hdr).hdr_origin); \
(hdr).hdr_origin_sendreq = ntoh64((hdr).hdr_origin_sendreq); \
(hdr).hdr_origin_tag = ntohl((hdr).hdr_origin_tag); \
(hdr).hdr_target_dt_id = ntohs((hdr).hdr_target_dt_id); \
(hdr).hdr_target_disp = ntohl((hdr).hdr_target_disp); \
(hdr).hdr_target_count = ntohl((hdr).hdr_target_count); \
(hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \
(hdr).hdr_target_op = ntohl((hdr).hdr_target_op); \
} while (0)
#define OMPI_OSC_PT2PT_REPLY_HDR_HTON(hdr) \
do { \
(hdr).hdr_origin_sendreq = hton64((hdr).hdr_origin_sendreq); \
(hdr).hdr_target_tag = htonl((hdr).hdr_target_tag); \
(hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \
} while (0)
#define OMPI_OSC_PT2PT_REPLY_HDR_NTOH(hdr) \
do { \
(hdr).hdr_origin_sendreq = ntoh64((hdr).hdr_origin_sendreq); \
(hdr).hdr_target_tag = ntohl((hdr).hdr_target_tag); \
(hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \
} while (0)
#endif /* OMPI_MCA_OSC_PT2PT_HDR_H */

Просмотреть файл

@ -20,25 +20,15 @@
#include "mpi.h"
#include "ompi/datatype/datatype.h"
static inline
struct ompi_datatype_t*
ompi_osc_pt2pt_datatype_create(int datatype_id, void **payload)
ompi_osc_pt2pt_datatype_create(ompi_proc_t *remote_proc, void **payload)
{
struct ompi_datatype_t *datatype;
if (datatype_id == -1) {
/* not predefined datatype - need to construct out of payload */
/* BWB - FIX ME - implement dt sending */
opal_output(0, "remote datatypes not supported. aborting.");
abort();
/* don't forget to move the payload pointer */
} else {
/* retain datatype so that can be released at end */
datatype = MPI_Type_f2c(datatype_id);
OBJ_RETAIN(datatype);
}
struct ompi_datatype_t *datatype =
ompi_ddt_create_from_packed_description(payload, remote_proc);
if (ompi_ddt_is_predefined(datatype)) OBJ_RETAIN(datatype);
return datatype;
}

Просмотреть файл

@ -32,9 +32,11 @@
static inline void
ompi_osc_pt2pt_progress(ompi_osc_pt2pt_module_t *module)
{
if (0 != module->p2p_num_long_msgs) {
if (0 != opal_list_get_size(&(module->p2p_long_msgs))) {
opal_list_item_t *item, *next;
OPAL_THREAD_LOCK(&(module->p2p_lock));
/* Have to go the convoluted while() route instead of a for()
loop because the callback will likely remove the request
from the list and free it, and that would lead to much
@ -52,104 +54,119 @@ ompi_osc_pt2pt_progress(ompi_osc_pt2pt_module_t *module)
longreq->req_comp_cb(longreq);
}
}
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
opal_progress();
}
static inline void
ompi_osc_pt2pt_flip_sendreqs(ompi_osc_pt2pt_module_t *module)
{
short *tmp;
OPAL_THREAD_LOCK(&(module->p2p_lock));
/* user has not promised nothing has happened - need to make
sure we've done all our requests */
module->p2p_num_pending_out = 0;
tmp = module->p2p_copy_num_pending_sendreqs;
module->p2p_copy_num_pending_sendreqs =
module->p2p_num_pending_sendreqs;
module->p2p_num_pending_sendreqs = tmp;
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(short) * ompi_comm_size(module->p2p_comm));
/* Copy in all the pending requests */
opal_list_join(&module->p2p_copy_pending_sendreqs,
opal_list_get_end(&module->p2p_copy_pending_sendreqs),
&module->p2p_pending_sendreqs);
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
int
ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win)
{
short *outgoing_reqs = NULL;
short incoming_reqs;
int *counts = NULL;
int ret = OMPI_SUCCESS;
int i;
int ret, i;
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
int num_pending;
if (0 == (assert & MPI_MODE_NOPRECEDE)) {
/* user has not promised nothing has happened - need to make
sure we've done all our requests */
P2P_MODULE(win)->p2p_num_pending_out = 0;
outgoing_reqs = malloc(sizeof(short) *
ompi_comm_size(P2P_MODULE(win)->p2p_comm));
if (NULL == outgoing_reqs) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
outgoing_reqs[i] =
opal_list_get_size(&(P2P_MODULE(win)->p2p_pending_out_sendreqs[i]));
P2P_MODULE(win)->p2p_num_pending_out += outgoing_reqs[i];
}
counts = malloc(sizeof(int) *
ompi_comm_size(P2P_MODULE(win)->p2p_comm));
if (NULL == counts) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
counts[i] = 1;
}
/* find out how much data everyone is going to send us... */
ret = P2P_MODULE(win)->p2p_comm->c_coll.coll_reduce_scatter(outgoing_reqs,
&incoming_reqs,
counts,
MPI_SHORT,
MPI_SUM,
P2P_MODULE(win)->p2p_comm);
if (OMPI_SUCCESS != ret) goto cleanup;
P2P_MODULE(win)->p2p_num_pending_in += incoming_reqs;
/* start all the requests */
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
opal_list_item_t *item;
opal_list_t *req_list =
&(P2P_MODULE(win)->p2p_pending_out_sendreqs[i]);
while (NULL != (item = opal_list_remove_first(req_list))) {
ompi_osc_pt2pt_sendreq_t *req =
(ompi_osc_pt2pt_sendreq_t*) item;
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "fence: failure in starting sendreq");
opal_list_prepend(req_list, item);
goto cleanup;
}
}
/* check that the user didn't lie to us - since NOPRECEDED
must be specified by all processes if it is specified by
any process, if we see this it is safe to assume that there
are no pending operations anywhere needed to close out this
epoch. */
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
num_pending = opal_list_get_size(&(P2P_MODULE(win)->p2p_pending_sendreqs));
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
if (0 != num_pending) {
return MPI_ERR_RMA_SYNC;
}
} else {
/* Don't trust the user that nothing has happened in this
epoch and count through all the pending sendreqs to
verify */
int tmp = 0;
opal_list_item_t *item;
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
tmp +=
opal_list_get_size(&(P2P_MODULE(win)->p2p_pending_out_sendreqs[i]));
ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win));
/* find out how much data everyone is going to send us. Need
to have the lock during this period so that we have a sane
view of the number of sendreqs */
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_reduce_scatter(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
&incoming_reqs,
P2P_MODULE(win)->p2p_fence_coll_counts,
MPI_SHORT,
MPI_SUM,
P2P_MODULE(win)->p2p_comm);
if (OMPI_SUCCESS != ret) {
/* put the stupid data back for the user. This is not
cheap, but the user lost his data if we don't. */
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
opal_list_join(&P2P_MODULE(win)->p2p_pending_sendreqs,
opal_list_get_end(&P2P_MODULE(win)->p2p_pending_sendreqs),
&P2P_MODULE(win)->p2p_copy_pending_sendreqs);
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
P2P_MODULE(win)->p2p_num_pending_sendreqs[i] +=
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[i];
}
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
return ret;
}
if (0 != tmp) {
ret = MPI_ERR_ASSERT;
goto cleanup;
/* possible we've already received a couple in messages, so
atomicall add however many we're going to wait for */
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_in), incoming_reqs);
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)));
/* try to start all the requests. We've copied everything we
need out of pending_sendreqs, so don't need the lock
here */
while (NULL !=
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
ompi_osc_pt2pt_sendreq_t *req =
(ompi_osc_pt2pt_sendreq_t*) item;
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "fence: failure in starting sendreq (%d). Will try later.",
ret);
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
}
}
incoming_reqs = 0;
}
/* now we know how many things we're waiting for - wait for them... */
while (0 != P2P_MODULE(win)->p2p_num_pending_in ||
0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
/* now we know how many things we're waiting for - wait for them... */
while (0 != P2P_MODULE(win)->p2p_num_pending_in ||
0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
}
}
/* all transfers are done - back to the real world we go */
@ -159,13 +176,7 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win)
ompi_win_set_mode(win, 0);
}
cleanup:
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
if (NULL != outgoing_reqs) free(outgoing_reqs);
if (NULL != counts) free(counts);
return ret;
return OMPI_SUCCESS;
}
@ -174,12 +185,17 @@ ompi_osc_pt2pt_module_start(ompi_group_t *group,
int assert,
ompi_win_t *win)
{
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
OBJ_RETAIN(group);
/* BWB - do I need this? */
ompi_group_increment_proc_count(group);
P2P_MODULE(win)->sc_group = group;
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
if (NULL != P2P_MODULE(win)->p2p_sc_group || NULL != P2P_MODULE(win)->p2p_pw_group) {
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
return MPI_ERR_RMA_CONFLICT;
}
P2P_MODULE(win)->p2p_sc_group = group;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* Set our mode to access w/ start */
ompi_win_set_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_STARTED);
@ -187,9 +203,7 @@ ompi_osc_pt2pt_module_start(ompi_group_t *group,
/* possible we've already received a couple in messages, so
atomicall add however many we're going to wait for */
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_in),
ompi_group_size(P2P_MODULE(win)->sc_group));
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
ompi_group_size(P2P_MODULE(win)->p2p_sc_group));
return OMPI_SUCCESS;
}
@ -200,30 +214,29 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
{
int i;
int ret = OMPI_SUCCESS;
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
ompi_group_t *group;
opal_list_item_t *item;
/* wait for all the post messages */
while (0 != P2P_MODULE(win)->p2p_num_pending_in) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
}
ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win));
/* for each process in group, send a control message with number
of updates coming, then start all the requests */
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->sc_group) ; ++i) {
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_sc_group) ; ++i) {
int comm_rank = -1, j;
opal_list_item_t *item;
opal_list_t *req_list;
/* no need to increment ref count - the communicator isn't
going anywhere while we're here */
ompi_group_t *comm_group = P2P_MODULE(win)->p2p_comm->c_local_group;
int32_t num_reqs;
/* find the rank in the communicator associated with this windows */
for (j = 0 ;
j < ompi_group_size(comm_group) ;
++j) {
if (P2P_MODULE(win)->sc_group->grp_proc_pointers[i] ==
if (P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i] ==
comm_group->grp_proc_pointers[j]) {
comm_rank = j;
break;
@ -234,25 +247,28 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
goto cleanup;
}
req_list = &(P2P_MODULE(win)->p2p_pending_out_sendreqs[comm_rank]);
num_reqs = opal_list_get_size(req_list);
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), num_reqs);
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank]);
ompi_osc_pt2pt_control_send(P2P_MODULE(win),
P2P_MODULE(win)->sc_group->grp_proc_pointers[i],
OMPI_OSC_PT2PT_HDR_COMPLETE, num_reqs);
P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i],
OMPI_OSC_PT2PT_HDR_COMPLETE,
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank]);
}
while (NULL != (item = opal_list_remove_first(req_list))) {
ompi_osc_pt2pt_sendreq_t *req =
(ompi_osc_pt2pt_sendreq_t*) item;
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req);
/* try to start all the requests. We've copied everything we
need out of pending_sendreqs, so don't need the lock
here */
while (NULL !=
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
ompi_osc_pt2pt_sendreq_t *req =
(ompi_osc_pt2pt_sendreq_t*) item;
if (OMPI_SUCCESS != ret) {
opal_output(0, "complete: failure in starting sendreq");
opal_list_prepend(req_list, item);
goto cleanup;
}
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "complete: failure in starting sendreq (%d). Will try later.",
ret);
opal_list_prepend(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
}
}
@ -265,13 +281,15 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
/* set our mode back to nothing */
ompi_win_set_mode(win, 0);
/* BWB - do I need this? */
ompi_group_decrement_proc_count(P2P_MODULE(win)->sc_group);
OBJ_RELEASE(P2P_MODULE(win)->sc_group);
P2P_MODULE(win)->sc_group = NULL;
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
group = P2P_MODULE(win)->p2p_pw_group;
P2P_MODULE(win)->p2p_pw_group = NULL;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* BWB - do I need this? */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
return ret;
}
@ -282,24 +300,27 @@ ompi_osc_pt2pt_module_post(ompi_group_t *group,
{
int i;
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
OBJ_RETAIN(group);
/* BWB - do I need this? */
ompi_group_increment_proc_count(group);
P2P_MODULE(win)->pw_group = group;
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
if (NULL != P2P_MODULE(win)->p2p_sc_group || NULL != P2P_MODULE(win)->p2p_pw_group) {
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
return MPI_ERR_RMA_CONFLICT;
}
P2P_MODULE(win)->p2p_pw_group = group;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* Set our mode to expose w/ post */
ompi_win_set_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
/* list how many complete counters we're still waiting on */
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
ompi_group_size(P2P_MODULE(win)->pw_group));
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
ompi_group_size(P2P_MODULE(win)->p2p_pw_group));
/* send a hello counter to everyone in group */
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->pw_group) ; ++i) {
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) {
ompi_osc_pt2pt_control_send(P2P_MODULE(win),
group->grp_proc_pointers[i],
OMPI_OSC_PT2PT_HDR_POST, 1);
@ -312,21 +333,24 @@ ompi_osc_pt2pt_module_post(ompi_group_t *group,
int
ompi_osc_pt2pt_module_wait(ompi_win_t *win)
{
ompi_group_t *group;
while (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_pending_out)) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
}
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
ompi_win_set_mode(win, 0);
/* BWB - do I need this? */
ompi_group_decrement_proc_count(P2P_MODULE(win)->pw_group);
OBJ_RELEASE(P2P_MODULE(win)->pw_group);
P2P_MODULE(win)->pw_group = NULL;
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
group = P2P_MODULE(win)->p2p_pw_group;
P2P_MODULE(win)->p2p_pw_group = NULL;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* BWB - do I need this? */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
return OMPI_SUCCESS;
}
@ -335,6 +359,8 @@ int
ompi_osc_pt2pt_module_test(ompi_win_t *win,
int *flag)
{
ompi_group_t *group;
if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_pending_out)) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
@ -347,16 +373,17 @@ ompi_osc_pt2pt_module_test(ompi_win_t *win,
*flag = 1;
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
ompi_win_set_mode(win, 0);
/* BWB - do I need this? */
ompi_group_decrement_proc_count(P2P_MODULE(win)->pw_group);
OBJ_RELEASE(P2P_MODULE(win)->pw_group);
P2P_MODULE(win)->pw_group = NULL;
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
group = P2P_MODULE(win)->p2p_pw_group;
P2P_MODULE(win)->p2p_pw_group = NULL;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* BWB - do I need this? */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
return OMPI_SUCCESS;
}