* Fix some error checking code for Lock / Unlock at the MPI layer
* Implement win_lock and win_unlock in the pt2pt component. Not well tested, but appears to move bits if properly motivated... This commit was SVN r8922.
Этот коммит содержится в:
родитель
5c750cd8b9
Коммит
ec7b60d501
@ -56,6 +56,8 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win)
|
||||
ret = (ret != OMPI_SUCCESS) ? ret : tmp;
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
|
||||
OBJ_DESTRUCT(&(module->p2p_locks_pending));
|
||||
|
||||
assert(module->p2p_sc_group == NULL);
|
||||
assert(module->p2p_pw_group == NULL);
|
||||
free(module->p2p_fence_coll_counts);
|
||||
|
@ -80,15 +80,17 @@ struct ompi_osc_pt2pt_module_t {
|
||||
/** For MPI_Fence synchronization, the number of messages to send
|
||||
in epoch. For Start/Complete, the number of updates for this
|
||||
Complete. For Post/Wait (poorly named), the number of
|
||||
Complete counters we're waiting for. Not protected by
|
||||
p2p_lock - must use atomic counter operations. */
|
||||
Complete counters we're waiting for. For lock, the number of
|
||||
messages waiting for completion on on the origin side. Not
|
||||
protected by p2p_lock - must use atomic counter operations. */
|
||||
volatile int32_t p2p_num_pending_out;
|
||||
|
||||
/** For MPI_Fence synchronization, the number of expected incoming
|
||||
messages. For Start/Complete, the number of expected Post
|
||||
messages. For Post/Wait, the number of expected updates from
|
||||
complete. Not protected by p2p_lock - must use atomic counter
|
||||
operations. */
|
||||
complete. For lock, the number of messages on the passive side
|
||||
we are waiting for. Not protected by p2p_lock - must use
|
||||
atomic counter operations. */
|
||||
volatile int32_t p2p_num_pending_in;
|
||||
|
||||
/** cyclic counter for a unique tage for long messages. Not
|
||||
@ -114,6 +116,10 @@ struct ompi_osc_pt2pt_module_t {
|
||||
struct ompi_group_t *p2p_sc_group;
|
||||
|
||||
/* ********************* LOCK data ************************ */
|
||||
int32_t p2p_lock_status; /* one of 0, MPI_LOCK_EXCLUSIVE, MPI_LOCK_SHARED */
|
||||
int32_t p2p_shared_count;
|
||||
opal_list_t p2p_locks_pending;
|
||||
int32_t p2p_lock_received_ack;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t;
|
||||
|
||||
@ -232,6 +238,16 @@ int ompi_osc_pt2pt_module_lock(int lock_type,
|
||||
int ompi_osc_pt2pt_module_unlock(int target,
|
||||
struct ompi_win_t *win);
|
||||
|
||||
/*
|
||||
* passive side sync interface functions
|
||||
*/
|
||||
int ompi_osc_pt2pt_passive_lock(ompi_osc_pt2pt_module_t *module,
|
||||
int32_t origin,
|
||||
int32_t lock_type);
|
||||
|
||||
int ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module,
|
||||
int32_t origin,
|
||||
int32_t count);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
|
@ -277,6 +277,10 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
module->p2p_sc_group = NULL;
|
||||
|
||||
/* lock data */
|
||||
module->p2p_lock_status = 0;
|
||||
module->p2p_shared_count = 0;
|
||||
OBJ_CONSTRUCT(&(module->p2p_locks_pending), opal_list_t);
|
||||
module->p2p_lock_received_ack = 0;
|
||||
|
||||
/* update component data */
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
@ -434,7 +438,41 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
|
||||
/* we've heard from one more place, and have value reqs to
|
||||
process */
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), -1);
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value);
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]);
|
||||
}
|
||||
break;
|
||||
|
||||
case OMPI_OSC_PT2PT_HDR_LOCK_REQ:
|
||||
{
|
||||
ompi_osc_pt2pt_control_header_t *header =
|
||||
(ompi_osc_pt2pt_control_header_t*)
|
||||
descriptor->des_dst[0].seg_addr.pval;
|
||||
|
||||
/* get our module pointer */
|
||||
module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx);
|
||||
if (NULL == module) return;
|
||||
|
||||
if (header->hdr_value[1] > 0) {
|
||||
ompi_osc_pt2pt_passive_lock(module, header->hdr_value[0],
|
||||
header->hdr_value[1]);
|
||||
} else {
|
||||
OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case OMPI_OSC_PT2PT_HDR_UNLOCK_REQ:
|
||||
{
|
||||
ompi_osc_pt2pt_control_header_t *header =
|
||||
(ompi_osc_pt2pt_control_header_t*)
|
||||
descriptor->des_dst[0].seg_addr.pval;
|
||||
|
||||
/* get our module pointer */
|
||||
module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx);
|
||||
if (NULL == module) return;
|
||||
|
||||
ompi_osc_pt2pt_passive_unlock(module, header->hdr_value[0],
|
||||
header->hdr_value[1]);
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -713,7 +713,7 @@ ompi_osc_pt2pt_control_send_cb(struct mca_btl_base_module_t* btl,
|
||||
int
|
||||
ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
|
||||
ompi_proc_t *proc,
|
||||
uint8_t type, int32_t value)
|
||||
uint8_t type, int32_t value0, int32_t value1)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_bml_base_endpoint_t *endpoint = NULL;
|
||||
@ -746,7 +746,8 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
|
||||
/* pack header */
|
||||
header = (ompi_osc_pt2pt_control_header_t*) descriptor->des_src[0].seg_addr.pval;
|
||||
header->hdr_base.hdr_type = type;
|
||||
header->hdr_value = value;
|
||||
header->hdr_value[0] = value0;
|
||||
header->hdr_value[1] = value1;
|
||||
header->hdr_windx = module->p2p_comm->c_contextid;
|
||||
|
||||
#if 0 /* BWB - FIX ME */
|
||||
|
@ -50,6 +50,6 @@ int ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
|
||||
|
||||
int ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
|
||||
ompi_proc_t *proc,
|
||||
uint8_t type, int32_t value);
|
||||
uint8_t type, int32_t value0, int32_t value1);
|
||||
|
||||
#endif
|
||||
|
@ -22,13 +22,14 @@
|
||||
#endif
|
||||
|
||||
|
||||
#define OMPI_OSC_PT2PT_HDR_PUT 0x0001
|
||||
#define OMPI_OSC_PT2PT_HDR_ACC 0x0002
|
||||
#define OMPI_OSC_PT2PT_HDR_GET 0x0004
|
||||
#define OMPI_OSC_PT2PT_HDR_REPLY 0x0008
|
||||
#define OMPI_OSC_PT2PT_HDR_POST 0x0010
|
||||
#define OMPI_OSC_PT2PT_HDR_COMPLETE 0x0020
|
||||
#define OMPI_OSC_PT2PT_HDR_LOCK 0x0040
|
||||
#define OMPI_OSC_PT2PT_HDR_PUT 0x0001
|
||||
#define OMPI_OSC_PT2PT_HDR_ACC 0x0002
|
||||
#define OMPI_OSC_PT2PT_HDR_GET 0x0004
|
||||
#define OMPI_OSC_PT2PT_HDR_REPLY 0x0008
|
||||
#define OMPI_OSC_PT2PT_HDR_POST 0x0010
|
||||
#define OMPI_OSC_PT2PT_HDR_COMPLETE 0x0020
|
||||
#define OMPI_OSC_PT2PT_HDR_LOCK_REQ 0x0040
|
||||
#define OMPI_OSC_PT2PT_HDR_UNLOCK_REQ 0x0080
|
||||
|
||||
struct ompi_osc_pt2pt_base_header_t {
|
||||
uint8_t hdr_type;
|
||||
@ -113,7 +114,7 @@ typedef struct ompi_osc_pt2pt_reply_header_t ompi_osc_pt2pt_reply_header_t;
|
||||
struct ompi_osc_pt2pt_control_header_t {
|
||||
ompi_osc_pt2pt_base_header_t hdr_base;
|
||||
int16_t hdr_windx;
|
||||
int32_t hdr_value;
|
||||
int32_t hdr_value[2];
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_control_header_t ompi_osc_pt2pt_control_header_t;
|
||||
|
||||
@ -121,14 +122,16 @@ typedef struct ompi_osc_pt2pt_control_header_t ompi_osc_pt2pt_control_header_t;
|
||||
do { \
|
||||
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
|
||||
(hdr).hdr_windx = htons((hdr).hdr_windx); \
|
||||
(hdr).hdr_value = htonl((hdr).hdr_value); \
|
||||
(hdr).hdr_value[0] = htonl((hdr).hdr_value[0]); \
|
||||
(hdr).hdr_value[1] = htonl((hdr).hdr_value[1]); \
|
||||
} while (0)
|
||||
|
||||
#define OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(hdr) \
|
||||
do { \
|
||||
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
|
||||
(hdr).hdr_windx = ntohs((hdr).hdr_windx); \
|
||||
(hdr).hdr_value = ntohl((hdr).hdr_value); \
|
||||
(hdr).hdr_value[0] = ntohl((hdr).hdr_value[0]); \
|
||||
(hdr).hdr_value[1] = ntohl((hdr).hdr_value[1]); \
|
||||
} while (0)
|
||||
|
||||
|
||||
|
@ -252,7 +252,8 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
|
||||
ompi_osc_pt2pt_control_send(P2P_MODULE(win),
|
||||
P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i],
|
||||
OMPI_OSC_PT2PT_HDR_COMPLETE,
|
||||
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank]);
|
||||
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank],
|
||||
0);
|
||||
}
|
||||
|
||||
/* try to start all the requests. We've copied everything we
|
||||
@ -268,7 +269,7 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output(0, "complete: failure in starting sendreq (%d). Will try later.",
|
||||
ret);
|
||||
opal_list_prepend(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
|
||||
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
|
||||
}
|
||||
}
|
||||
|
||||
@ -323,7 +324,7 @@ ompi_osc_pt2pt_module_post(ompi_group_t *group,
|
||||
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) {
|
||||
ompi_osc_pt2pt_control_send(P2P_MODULE(win),
|
||||
group->grp_proc_pointers[i],
|
||||
OMPI_OSC_PT2PT_HDR_POST, 1);
|
||||
OMPI_OSC_PT2PT_HDR_POST, 1, 0);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -388,13 +389,39 @@ ompi_osc_pt2pt_module_test(ompi_win_t *win,
|
||||
}
|
||||
|
||||
|
||||
struct ompi_osc_pt2pt_pending_lock_t {
|
||||
opal_list_item_t super;
|
||||
ompi_proc_t *proc;
|
||||
int32_t lock_type;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t;
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t,
|
||||
NULL, NULL);
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_module_lock(int lock_type,
|
||||
int target,
|
||||
int assert,
|
||||
ompi_win_t *win)
|
||||
{
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
ompi_proc_t *proc = P2P_MODULE(win)->p2p_comm->c_pml_procs[target]->proc_ompi;
|
||||
|
||||
assert(lock_type != 0);
|
||||
|
||||
/* set our mode on the window */
|
||||
ompi_win_set_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS);
|
||||
|
||||
opal_output(0, "sending lock request to %d", target);
|
||||
/* generate a lock request */
|
||||
ompi_osc_pt2pt_control_send(P2P_MODULE(win),
|
||||
proc,
|
||||
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
|
||||
P2P_MODULE(win)->p2p_comm->c_my_rank,
|
||||
lock_type);
|
||||
|
||||
/* return */
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
@ -402,5 +429,149 @@ int
|
||||
ompi_osc_pt2pt_module_unlock(int target,
|
||||
ompi_win_t *win)
|
||||
{
|
||||
return OMPI_ERR_NOT_IMPLEMENTED;
|
||||
int32_t out_count;
|
||||
opal_list_item_t *item;
|
||||
int ret;
|
||||
ompi_proc_t *proc = P2P_MODULE(win)->p2p_comm->c_pml_procs[target]->proc_ompi;
|
||||
|
||||
while (0 == P2P_MODULE(win)->p2p_lock_received_ack) {
|
||||
ompi_osc_pt2pt_progress(P2P_MODULE(win));
|
||||
}
|
||||
P2P_MODULE(win)->p2p_lock_received_ack = 0;
|
||||
|
||||
/* start all the requests */
|
||||
ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win));
|
||||
|
||||
/* try to start all the requests. We've copied everything we need
|
||||
out of pending_sendreqs, so don't need the lock here */
|
||||
out_count = opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs));
|
||||
|
||||
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), out_count);
|
||||
|
||||
while (NULL !=
|
||||
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
|
||||
ompi_osc_pt2pt_sendreq_t *req =
|
||||
(ompi_osc_pt2pt_sendreq_t*) item;
|
||||
|
||||
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req);
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output(0, "unlock: failure in starting sendreq (%d). Will try later.",
|
||||
ret);
|
||||
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
|
||||
}
|
||||
}
|
||||
|
||||
/* wait for all the requests */
|
||||
while (0 != P2P_MODULE(win)->p2p_num_pending_out) {
|
||||
ompi_osc_pt2pt_progress(P2P_MODULE(win));
|
||||
}
|
||||
|
||||
/* send the unlock request */
|
||||
opal_output(0, "sending unlock request to %d", target);
|
||||
ompi_osc_pt2pt_control_send(P2P_MODULE(win),
|
||||
proc,
|
||||
OMPI_OSC_PT2PT_HDR_UNLOCK_REQ,
|
||||
P2P_MODULE(win)->p2p_comm->c_my_rank,
|
||||
out_count);
|
||||
|
||||
/* set our mode on the window */
|
||||
ompi_win_set_mode(win, 0);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_passive_lock(ompi_osc_pt2pt_module_t *module,
|
||||
int32_t origin,
|
||||
int32_t lock_type)
|
||||
{
|
||||
bool send_ack = false;
|
||||
int ret = OMPI_SUCCESS;
|
||||
ompi_proc_t *proc = module->p2p_comm->c_pml_procs[origin]->proc_ompi;
|
||||
ompi_osc_pt2pt_pending_lock_t *new_pending;
|
||||
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
if (lock_type == MPI_LOCK_EXCLUSIVE) {
|
||||
if (module->p2p_lock_status == 0) {
|
||||
module->p2p_lock_status = MPI_LOCK_EXCLUSIVE;
|
||||
send_ack = true;
|
||||
} else {
|
||||
opal_output(0, "queuing lock request from %d (%d)", origin, lock_type);
|
||||
new_pending = OBJ_NEW(ompi_osc_pt2pt_pending_lock_t);
|
||||
new_pending->proc = proc;
|
||||
new_pending->lock_type = lock_type;
|
||||
opal_list_append(&(module->p2p_locks_pending), &(new_pending->super));
|
||||
}
|
||||
} else if (lock_type == MPI_LOCK_SHARED) {
|
||||
if (module->p2p_lock_status != MPI_LOCK_EXCLUSIVE) {
|
||||
module->p2p_lock_status = MPI_LOCK_SHARED;
|
||||
module->p2p_shared_count++;
|
||||
send_ack = true;
|
||||
} else {
|
||||
opal_output(0, "queuing lock request from %d (%d)", origin, lock_type);
|
||||
new_pending = OBJ_NEW(ompi_osc_pt2pt_pending_lock_t);
|
||||
new_pending->proc = proc;
|
||||
new_pending->lock_type = lock_type;
|
||||
opal_list_append(&(module->p2p_locks_pending), &(new_pending->super));
|
||||
}
|
||||
} else {
|
||||
ret = OMPI_ERROR;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
|
||||
if (send_ack) {
|
||||
opal_output(0, "sending lock ack to %d", origin);
|
||||
ompi_osc_pt2pt_control_send(module, proc,
|
||||
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
|
||||
module->p2p_comm->c_my_rank,
|
||||
OMPI_SUCCESS);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module,
|
||||
int32_t origin,
|
||||
int32_t count)
|
||||
{
|
||||
ompi_osc_pt2pt_pending_lock_t *new_pending = NULL;
|
||||
|
||||
assert(module->p2p_lock_status != 0);
|
||||
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count);
|
||||
|
||||
while (0 != module->p2p_num_pending_in) {
|
||||
ompi_osc_pt2pt_progress(module);
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
if (module->p2p_lock_status == MPI_LOCK_EXCLUSIVE) {
|
||||
module->p2p_lock_status = 0;
|
||||
} else {
|
||||
module->p2p_shared_count--;
|
||||
if (module->p2p_shared_count == 0) {
|
||||
module->p2p_lock_status = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* if we were really unlocked, see if we have more to process */
|
||||
new_pending = (ompi_osc_pt2pt_pending_lock_t*)
|
||||
opal_list_remove_first(&(module->p2p_locks_pending));
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
|
||||
if (NULL != new_pending) {
|
||||
opal_output(0, "sending lock request to proc");
|
||||
/* generate a lock request */
|
||||
ompi_osc_pt2pt_control_send(module,
|
||||
new_pending->proc,
|
||||
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
|
||||
module->p2p_comm->c_my_rank,
|
||||
new_pending->lock_type);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -50,13 +50,8 @@ int MPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win)
|
||||
} else if (0 != (assert & ~(MPI_MODE_NOCHECK))) {
|
||||
return OMPI_ERRHANDLER_INVOKE(win, MPI_ERR_ASSERT, FUNC_NAME);
|
||||
} else if (0 != (ompi_win_get_mode(win) &
|
||||
(OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED |
|
||||
OMPI_WIN_STARTED))) {
|
||||
/* can't be in exposure epoch, or posted, or started */
|
||||
return OMPI_ERRHANDLER_INVOKE(win, MPI_ERR_RMA_CONFLICT, FUNC_NAME);
|
||||
} else if ((0 != (ompi_win_get_mode(win) & OMPI_WIN_ACCESS_EPOCH)) &&
|
||||
(0 == (ompi_win_get_mode(win) & OMPI_WIN_LOCK_ACCESS))) {
|
||||
/* can't be in an access epoch unless we're locked */
|
||||
(OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_EXPOSE_EPOCH))) {
|
||||
/* can't be in either an access or exposure epoch (even a lock one)*/
|
||||
return OMPI_ERRHANDLER_INVOKE(win, MPI_ERR_RMA_CONFLICT, FUNC_NAME);
|
||||
} else if (! ompi_win_allow_locks(win)) {
|
||||
return OMPI_ERRHANDLER_INVOKE(win, MPI_ERR_RMA_SYNC, FUNC_NAME);
|
||||
|
@ -127,7 +127,7 @@ static inline int ompi_win_rank(ompi_win_t *win) {
|
||||
}
|
||||
|
||||
static inline bool ompi_win_allow_locks(ompi_win_t *win) {
|
||||
return (0 != (win->w_flags & OMPI_WIN_NO_LOCKS));
|
||||
return (0 == (win->w_flags & OMPI_WIN_NO_LOCKS));
|
||||
}
|
||||
|
||||
static inline int16_t ompi_win_get_mode(ompi_win_t *win) {
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user