diff --git a/ompi/mca/osc/rdma/osc_rdma_comm.c b/ompi/mca/osc/rdma/osc_rdma_comm.c index 9a34a793d3..f660367116 100644 --- a/ompi/mca/osc/rdma/osc_rdma_comm.c +++ b/ompi/mca/osc/rdma/osc_rdma_comm.c @@ -26,6 +26,7 @@ #include "osc_rdma_header.h" #include "osc_rdma_data_move.h" #include "ompi/memchecker.h" +#include "ompi/mca/osc/base/osc_base_obj_convert.h" static int enqueue_sendreq(ompi_osc_rdma_module_t *module, @@ -87,7 +88,45 @@ ompi_osc_rdma_module_accumulate(void *origin_addr, int origin_count, sendreq->req_op_id = op->o_f_to_c_index; - if (0 && module->m_eager_send_active) { + if (module->m_eager_send_active) { + /* accumulate semantics require send to self, which is bloody + expensive with the extra copies. Put a shortcut in for the + common case. */ + if (target == ompi_comm_rank(sendreq->req_module->m_comm) && + ompi_ddt_is_contiguous_memory_layout(sendreq->req_target_datatype, + sendreq->req_target_count) && + !ompi_convertor_need_buffers(&sendreq->req_origin_convertor) && + 0 == OPAL_THREAD_TRYLOCK(&module->m_acc_lock)) { + void *target_buffer = (unsigned char*) module->m_win->w_baseptr + + ((unsigned long) target_disp * + module->m_win->w_disp_unit); + + struct iovec iov; + uint32_t iov_count = 1; + size_t max_data = sendreq->req_origin_bytes_packed; + + iov.iov_len = max_data; + iov.iov_base = NULL; + ret = ompi_convertor_pack(&sendreq->req_origin_convertor, + &iov, &iov_count, + &max_data); + if (ret < 0) { + OPAL_THREAD_UNLOCK(&module->m_acc_lock); + return OMPI_ERR_FATAL; + } + + ret = ompi_osc_base_process_op(target_buffer, + iov.iov_base, + max_data, + target_dt, + target_count, + op); + /* unlock the window for accumulates */ + OPAL_THREAD_UNLOCK(&module->m_acc_lock); + ompi_osc_rdma_sendreq_free(sendreq); + return ret; + } + OPAL_THREAD_LOCK(&module->m_lock); sendreq->req_module->m_num_pending_out += 1; module->m_num_pending_sendreqs[sendreq->req_target_rank] += 1; diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.c b/ompi/mca/osc/rdma/osc_rdma_data_move.c index 252e731bf9..83b0cfb209 100644 --- a/ompi/mca/osc/rdma/osc_rdma_data_move.c +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.c @@ -372,22 +372,30 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl, /* release the descriptor and sendreq */ btl->btl_free(btl, descriptor); - while (opal_list_get_size(&module->m_queued_sendreqs)) { + if (opal_list_get_size(&module->m_queued_sendreqs) > 0) { opal_list_item_t *item; - int ret; + int ret, i, len; - OPAL_THREAD_LOCK(&module->m_lock); - item = opal_list_remove_first(&module->m_queued_sendreqs); - OPAL_THREAD_UNLOCK(&module->m_lock); - if (NULL == item) break; - - ret = ompi_osc_rdma_sendreq_send(module, (ompi_osc_rdma_sendreq_t*) item); - if (OMPI_SUCCESS != ret) { + len = opal_list_get_size(&module->m_queued_sendreqs); + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, + "%d items in restart queue", + len)); + for (i = 0 ; i < len ; ++i) { OPAL_THREAD_LOCK(&module->m_lock); - opal_list_append(&(module->m_queued_sendreqs), item); + item = opal_list_remove_first(&module->m_queued_sendreqs); OPAL_THREAD_UNLOCK(&module->m_lock); - break; + if (NULL == item) break; + + ret = ompi_osc_rdma_sendreq_send(module, (ompi_osc_rdma_sendreq_t*) item); + if (OMPI_SUCCESS != ret) { + OPAL_THREAD_LOCK(&module->m_lock); + opal_list_append(&(module->m_queued_sendreqs), item); + OPAL_THREAD_UNLOCK(&module->m_lock); + } } + + /* flush so things actually get sent out and resources restored */ + ompi_osc_rdma_flush(module); } } diff --git a/ompi/mca/osc/rdma/osc_rdma_sync.c b/ompi/mca/osc/rdma/osc_rdma_sync.c index 0c84f7cfac..7f94f97ced 100644 --- a/ompi/mca/osc/rdma/osc_rdma_sync.c +++ b/ompi/mca/osc/rdma/osc_rdma_sync.c @@ -55,7 +55,7 @@ int ompi_osc_rdma_module_fence(int assert, ompi_win_t *win) { unsigned int incoming_reqs; - int ret = OMPI_SUCCESS, i; + int ret = OMPI_SUCCESS, i, len, started_send; ompi_osc_rdma_module_t *module = GET_MODULE(win); int num_outgoing = 0; @@ -112,18 +112,45 @@ ompi_osc_rdma_module_fence(int assert, ompi_win_t *win) /* try to start all the requests. We've copied everything we need out of pending_sendreqs, so don't need the lock here */ - while (NULL != - (item = opal_list_remove_first(&(module->m_copy_pending_sendreqs)))) { - ompi_osc_rdma_sendreq_t *req = - (ompi_osc_rdma_sendreq_t*) item; + len = opal_list_get_size(&(module->m_copy_pending_sendreqs)); + started_send = 0; + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, + "fence: trying to start %d reqs", + len)); + for (i = 0 ; i < len ; ++i) { + ompi_osc_rdma_sendreq_t *req = (ompi_osc_rdma_sendreq_t*) + opal_list_remove_first(&(module->m_copy_pending_sendreqs)); ret = ompi_osc_rdma_sendreq_send(module, req); if (OMPI_SUCCESS != ret) { opal_list_append(&(module->m_copy_pending_sendreqs), item); - break; + } else { + started_send = 1; } } + /* we need to start at least one send, so that the callback + will restart the rest. */ + while (0 == started_send && len != 0) { + opal_progress(); + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, + "fence: restarting %d reqs", len)); + len = opal_list_get_size(&(module->m_copy_pending_sendreqs)); + for (i = 0 ; i < len ; ++i) { + ompi_osc_rdma_sendreq_t *req = (ompi_osc_rdma_sendreq_t*) + opal_list_remove_first(&(module->m_copy_pending_sendreqs)); + + ret = ompi_osc_rdma_sendreq_send(module, req); + if (OMPI_SUCCESS != ret) { + opal_list_append(&(module->m_copy_pending_sendreqs), item); + } else { + started_send = 1; + } + } + } + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, + "fence: done with initial start")); + if (module->m_use_rdma) { if (module->m_rdma_wait_completion) { OPAL_THREAD_LOCK(&module->m_lock); @@ -167,7 +194,7 @@ ompi_osc_rdma_module_fence(int assert, ompi_win_t *win) module->m_num_pending_in += incoming_reqs; module->m_num_pending_out += num_outgoing; - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, "fence: waiting on %d in and %d out, now %d, %d", incoming_reqs, num_outgoing, @@ -484,7 +511,7 @@ ompi_osc_rdma_module_lock(int lock_type, ompi_win_remove_mode(win, OMPI_WIN_FENCE); ompi_win_append_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS); - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, "%d sending lock request to %d", ompi_comm_rank(module->m_comm), target)); /* generate a lock request */ @@ -532,7 +559,7 @@ ompi_osc_rdma_module_unlock(int target, OPAL_THREAD_UNLOCK(&module->m_lock); /* send the unlock request */ - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, "%d sending unlock request to %d with %d requests", ompi_comm_rank(module->m_comm), target, out_count)); @@ -600,7 +627,7 @@ ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module, ompi_win_append_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH); send_ack = true; } else { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, "%d queuing lock request from %d (%d)", ompi_comm_rank(module->m_comm), origin, lock_type)); @@ -616,7 +643,7 @@ ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module, ompi_win_append_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH); send_ack = true; } else { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, "queuing lock request from %d (%d) lock_type:%d", ompi_comm_rank(module->m_comm), origin, lock_type)); @@ -631,7 +658,7 @@ ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module, OPAL_THREAD_UNLOCK(&(module->m_lock)); if (send_ack) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, "%d sending lock ack to %d", ompi_comm_rank(module->m_comm), origin)); ompi_osc_rdma_control_send(module, proc, @@ -654,7 +681,7 @@ ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module, assert(module->m_lock_status != 0); - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, "received unlock request from %d with %d requests\n", origin, count)); @@ -706,7 +733,7 @@ ompi_osc_rdma_passive_unlock_complete(ompi_osc_rdma_module_t *module) /* issue whichever unlock acks we should issue */ while (NULL != (new_pending = (ompi_osc_rdma_pending_lock_t*) opal_list_remove_first(©_unlock_acks))) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, "sending unlock reply to proc")); ompi_osc_rdma_control_send(module, new_pending->proc, @@ -735,7 +762,7 @@ ompi_osc_rdma_passive_unlock_complete(ompi_osc_rdma_module_t *module) OPAL_THREAD_UNLOCK(&(module->m_lock)); if (NULL != new_pending) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output, + OPAL_OUTPUT_VERBOSE((40, ompi_osc_base_output, "sending lock request to proc")); ompi_osc_rdma_control_send(module, new_pending->proc,