* Enable eager sending for Accumulate
* If the accumulate is local, make it short-circuit the request path. Accumulate requires local ops due to its window rules, so this is likely to help a bunch (on the codes I"m messing with at least) * Due a better job at flushing everything that can go out on the wire in a resource constrained problem * Move some debugging values around to make large problems somewhat easier to deal with This commit was SVN r20277.
Этот коммит содержится в:
родитель
333f419f4f
Коммит
cfc400eb57
@ -26,6 +26,7 @@
|
|||||||
#include "osc_rdma_header.h"
|
#include "osc_rdma_header.h"
|
||||||
#include "osc_rdma_data_move.h"
|
#include "osc_rdma_data_move.h"
|
||||||
#include "ompi/memchecker.h"
|
#include "ompi/memchecker.h"
|
||||||
|
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
|
||||||
|
|
||||||
static int
|
static int
|
||||||
enqueue_sendreq(ompi_osc_rdma_module_t *module,
|
enqueue_sendreq(ompi_osc_rdma_module_t *module,
|
||||||
@ -87,7 +88,45 @@ ompi_osc_rdma_module_accumulate(void *origin_addr, int origin_count,
|
|||||||
|
|
||||||
sendreq->req_op_id = op->o_f_to_c_index;
|
sendreq->req_op_id = op->o_f_to_c_index;
|
||||||
|
|
||||||
if (0 && module->m_eager_send_active) {
|
if (module->m_eager_send_active) {
|
||||||
|
/* accumulate semantics require send to self, which is bloody
|
||||||
|
expensive with the extra copies. Put a shortcut in for the
|
||||||
|
common case. */
|
||||||
|
if (target == ompi_comm_rank(sendreq->req_module->m_comm) &&
|
||||||
|
ompi_ddt_is_contiguous_memory_layout(sendreq->req_target_datatype,
|
||||||
|
sendreq->req_target_count) &&
|
||||||
|
!ompi_convertor_need_buffers(&sendreq->req_origin_convertor) &&
|
||||||
|
0 == OPAL_THREAD_TRYLOCK(&module->m_acc_lock)) {
|
||||||
|
void *target_buffer = (unsigned char*) module->m_win->w_baseptr +
|
||||||
|
((unsigned long) target_disp *
|
||||||
|
module->m_win->w_disp_unit);
|
||||||
|
|
||||||
|
struct iovec iov;
|
||||||
|
uint32_t iov_count = 1;
|
||||||
|
size_t max_data = sendreq->req_origin_bytes_packed;
|
||||||
|
|
||||||
|
iov.iov_len = max_data;
|
||||||
|
iov.iov_base = NULL;
|
||||||
|
ret = ompi_convertor_pack(&sendreq->req_origin_convertor,
|
||||||
|
&iov, &iov_count,
|
||||||
|
&max_data);
|
||||||
|
if (ret < 0) {
|
||||||
|
OPAL_THREAD_UNLOCK(&module->m_acc_lock);
|
||||||
|
return OMPI_ERR_FATAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = ompi_osc_base_process_op(target_buffer,
|
||||||
|
iov.iov_base,
|
||||||
|
max_data,
|
||||||
|
target_dt,
|
||||||
|
target_count,
|
||||||
|
op);
|
||||||
|
/* unlock the window for accumulates */
|
||||||
|
OPAL_THREAD_UNLOCK(&module->m_acc_lock);
|
||||||
|
ompi_osc_rdma_sendreq_free(sendreq);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
OPAL_THREAD_LOCK(&module->m_lock);
|
OPAL_THREAD_LOCK(&module->m_lock);
|
||||||
sendreq->req_module->m_num_pending_out += 1;
|
sendreq->req_module->m_num_pending_out += 1;
|
||||||
module->m_num_pending_sendreqs[sendreq->req_target_rank] += 1;
|
module->m_num_pending_sendreqs[sendreq->req_target_rank] += 1;
|
||||||
|
@ -372,22 +372,30 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
|
|||||||
/* release the descriptor and sendreq */
|
/* release the descriptor and sendreq */
|
||||||
btl->btl_free(btl, descriptor);
|
btl->btl_free(btl, descriptor);
|
||||||
|
|
||||||
while (opal_list_get_size(&module->m_queued_sendreqs)) {
|
if (opal_list_get_size(&module->m_queued_sendreqs) > 0) {
|
||||||
opal_list_item_t *item;
|
opal_list_item_t *item;
|
||||||
int ret;
|
int ret, i, len;
|
||||||
|
|
||||||
OPAL_THREAD_LOCK(&module->m_lock);
|
len = opal_list_get_size(&module->m_queued_sendreqs);
|
||||||
item = opal_list_remove_first(&module->m_queued_sendreqs);
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
OPAL_THREAD_UNLOCK(&module->m_lock);
|
"%d items in restart queue",
|
||||||
if (NULL == item) break;
|
len));
|
||||||
|
for (i = 0 ; i < len ; ++i) {
|
||||||
ret = ompi_osc_rdma_sendreq_send(module, (ompi_osc_rdma_sendreq_t*) item);
|
|
||||||
if (OMPI_SUCCESS != ret) {
|
|
||||||
OPAL_THREAD_LOCK(&module->m_lock);
|
OPAL_THREAD_LOCK(&module->m_lock);
|
||||||
opal_list_append(&(module->m_queued_sendreqs), item);
|
item = opal_list_remove_first(&module->m_queued_sendreqs);
|
||||||
OPAL_THREAD_UNLOCK(&module->m_lock);
|
OPAL_THREAD_UNLOCK(&module->m_lock);
|
||||||
break;
|
if (NULL == item) break;
|
||||||
|
|
||||||
|
ret = ompi_osc_rdma_sendreq_send(module, (ompi_osc_rdma_sendreq_t*) item);
|
||||||
|
if (OMPI_SUCCESS != ret) {
|
||||||
|
OPAL_THREAD_LOCK(&module->m_lock);
|
||||||
|
opal_list_append(&(module->m_queued_sendreqs), item);
|
||||||
|
OPAL_THREAD_UNLOCK(&module->m_lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* flush so things actually get sent out and resources restored */
|
||||||
|
ompi_osc_rdma_flush(module);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ int
|
|||||||
ompi_osc_rdma_module_fence(int assert, ompi_win_t *win)
|
ompi_osc_rdma_module_fence(int assert, ompi_win_t *win)
|
||||||
{
|
{
|
||||||
unsigned int incoming_reqs;
|
unsigned int incoming_reqs;
|
||||||
int ret = OMPI_SUCCESS, i;
|
int ret = OMPI_SUCCESS, i, len, started_send;
|
||||||
ompi_osc_rdma_module_t *module = GET_MODULE(win);
|
ompi_osc_rdma_module_t *module = GET_MODULE(win);
|
||||||
int num_outgoing = 0;
|
int num_outgoing = 0;
|
||||||
|
|
||||||
@ -112,18 +112,45 @@ ompi_osc_rdma_module_fence(int assert, ompi_win_t *win)
|
|||||||
/* try to start all the requests. We've copied everything we
|
/* try to start all the requests. We've copied everything we
|
||||||
need out of pending_sendreqs, so don't need the lock
|
need out of pending_sendreqs, so don't need the lock
|
||||||
here */
|
here */
|
||||||
while (NULL !=
|
len = opal_list_get_size(&(module->m_copy_pending_sendreqs));
|
||||||
(item = opal_list_remove_first(&(module->m_copy_pending_sendreqs)))) {
|
started_send = 0;
|
||||||
ompi_osc_rdma_sendreq_t *req =
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
(ompi_osc_rdma_sendreq_t*) item;
|
"fence: trying to start %d reqs",
|
||||||
|
len));
|
||||||
|
for (i = 0 ; i < len ; ++i) {
|
||||||
|
ompi_osc_rdma_sendreq_t *req = (ompi_osc_rdma_sendreq_t*)
|
||||||
|
opal_list_remove_first(&(module->m_copy_pending_sendreqs));
|
||||||
|
|
||||||
ret = ompi_osc_rdma_sendreq_send(module, req);
|
ret = ompi_osc_rdma_sendreq_send(module, req);
|
||||||
if (OMPI_SUCCESS != ret) {
|
if (OMPI_SUCCESS != ret) {
|
||||||
opal_list_append(&(module->m_copy_pending_sendreqs), item);
|
opal_list_append(&(module->m_copy_pending_sendreqs), item);
|
||||||
break;
|
} else {
|
||||||
|
started_send = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* we need to start at least one send, so that the callback
|
||||||
|
will restart the rest. */
|
||||||
|
while (0 == started_send && len != 0) {
|
||||||
|
opal_progress();
|
||||||
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
|
"fence: restarting %d reqs", len));
|
||||||
|
len = opal_list_get_size(&(module->m_copy_pending_sendreqs));
|
||||||
|
for (i = 0 ; i < len ; ++i) {
|
||||||
|
ompi_osc_rdma_sendreq_t *req = (ompi_osc_rdma_sendreq_t*)
|
||||||
|
opal_list_remove_first(&(module->m_copy_pending_sendreqs));
|
||||||
|
|
||||||
|
ret = ompi_osc_rdma_sendreq_send(module, req);
|
||||||
|
if (OMPI_SUCCESS != ret) {
|
||||||
|
opal_list_append(&(module->m_copy_pending_sendreqs), item);
|
||||||
|
} else {
|
||||||
|
started_send = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
|
"fence: done with initial start"));
|
||||||
|
|
||||||
if (module->m_use_rdma) {
|
if (module->m_use_rdma) {
|
||||||
if (module->m_rdma_wait_completion) {
|
if (module->m_rdma_wait_completion) {
|
||||||
OPAL_THREAD_LOCK(&module->m_lock);
|
OPAL_THREAD_LOCK(&module->m_lock);
|
||||||
@ -167,7 +194,7 @@ ompi_osc_rdma_module_fence(int assert, ompi_win_t *win)
|
|||||||
module->m_num_pending_in += incoming_reqs;
|
module->m_num_pending_in += incoming_reqs;
|
||||||
module->m_num_pending_out += num_outgoing;
|
module->m_num_pending_out += num_outgoing;
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
"fence: waiting on %d in and %d out, now %d, %d",
|
"fence: waiting on %d in and %d out, now %d, %d",
|
||||||
incoming_reqs,
|
incoming_reqs,
|
||||||
num_outgoing,
|
num_outgoing,
|
||||||
@ -484,7 +511,7 @@ ompi_osc_rdma_module_lock(int lock_type,
|
|||||||
ompi_win_remove_mode(win, OMPI_WIN_FENCE);
|
ompi_win_remove_mode(win, OMPI_WIN_FENCE);
|
||||||
ompi_win_append_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS);
|
ompi_win_append_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS);
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
"%d sending lock request to %d",
|
"%d sending lock request to %d",
|
||||||
ompi_comm_rank(module->m_comm), target));
|
ompi_comm_rank(module->m_comm), target));
|
||||||
/* generate a lock request */
|
/* generate a lock request */
|
||||||
@ -532,7 +559,7 @@ ompi_osc_rdma_module_unlock(int target,
|
|||||||
OPAL_THREAD_UNLOCK(&module->m_lock);
|
OPAL_THREAD_UNLOCK(&module->m_lock);
|
||||||
|
|
||||||
/* send the unlock request */
|
/* send the unlock request */
|
||||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
"%d sending unlock request to %d with %d requests",
|
"%d sending unlock request to %d with %d requests",
|
||||||
ompi_comm_rank(module->m_comm), target,
|
ompi_comm_rank(module->m_comm), target,
|
||||||
out_count));
|
out_count));
|
||||||
@ -600,7 +627,7 @@ ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module,
|
|||||||
ompi_win_append_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH);
|
ompi_win_append_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH);
|
||||||
send_ack = true;
|
send_ack = true;
|
||||||
} else {
|
} else {
|
||||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
"%d queuing lock request from %d (%d)",
|
"%d queuing lock request from %d (%d)",
|
||||||
ompi_comm_rank(module->m_comm),
|
ompi_comm_rank(module->m_comm),
|
||||||
origin, lock_type));
|
origin, lock_type));
|
||||||
@ -616,7 +643,7 @@ ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module,
|
|||||||
ompi_win_append_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH);
|
ompi_win_append_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH);
|
||||||
send_ack = true;
|
send_ack = true;
|
||||||
} else {
|
} else {
|
||||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
"queuing lock request from %d (%d) lock_type:%d",
|
"queuing lock request from %d (%d) lock_type:%d",
|
||||||
ompi_comm_rank(module->m_comm),
|
ompi_comm_rank(module->m_comm),
|
||||||
origin, lock_type));
|
origin, lock_type));
|
||||||
@ -631,7 +658,7 @@ ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module,
|
|||||||
OPAL_THREAD_UNLOCK(&(module->m_lock));
|
OPAL_THREAD_UNLOCK(&(module->m_lock));
|
||||||
|
|
||||||
if (send_ack) {
|
if (send_ack) {
|
||||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
"%d sending lock ack to %d",
|
"%d sending lock ack to %d",
|
||||||
ompi_comm_rank(module->m_comm), origin));
|
ompi_comm_rank(module->m_comm), origin));
|
||||||
ompi_osc_rdma_control_send(module, proc,
|
ompi_osc_rdma_control_send(module, proc,
|
||||||
@ -654,7 +681,7 @@ ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module,
|
|||||||
|
|
||||||
assert(module->m_lock_status != 0);
|
assert(module->m_lock_status != 0);
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
"received unlock request from %d with %d requests\n",
|
"received unlock request from %d with %d requests\n",
|
||||||
origin, count));
|
origin, count));
|
||||||
|
|
||||||
@ -706,7 +733,7 @@ ompi_osc_rdma_passive_unlock_complete(ompi_osc_rdma_module_t *module)
|
|||||||
/* issue whichever unlock acks we should issue */
|
/* issue whichever unlock acks we should issue */
|
||||||
while (NULL != (new_pending = (ompi_osc_rdma_pending_lock_t*)
|
while (NULL != (new_pending = (ompi_osc_rdma_pending_lock_t*)
|
||||||
opal_list_remove_first(©_unlock_acks))) {
|
opal_list_remove_first(©_unlock_acks))) {
|
||||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
"sending unlock reply to proc"));
|
"sending unlock reply to proc"));
|
||||||
ompi_osc_rdma_control_send(module,
|
ompi_osc_rdma_control_send(module,
|
||||||
new_pending->proc,
|
new_pending->proc,
|
||||||
@ -735,7 +762,7 @@ ompi_osc_rdma_passive_unlock_complete(ompi_osc_rdma_module_t *module)
|
|||||||
OPAL_THREAD_UNLOCK(&(module->m_lock));
|
OPAL_THREAD_UNLOCK(&(module->m_lock));
|
||||||
|
|
||||||
if (NULL != new_pending) {
|
if (NULL != new_pending) {
|
||||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output,
|
||||||
"sending lock request to proc"));
|
"sending lock request to proc"));
|
||||||
ompi_osc_rdma_control_send(module,
|
ompi_osc_rdma_control_send(module,
|
||||||
new_pending->proc,
|
new_pending->proc,
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user