Create a new queue (to simplify locking) for requests that are started but
can not be started by the BTL. This commit was SVN r14757.
Этот коммит содержится в:
родитель
bd5be6ed79
Коммит
5ec421e1b0
@ -41,12 +41,6 @@ ompi_osc_rdma_module_free(ompi_win_t *win)
|
||||
"rdma component destroying window with id %d",
|
||||
module->m_comm->c_contextid);
|
||||
|
||||
OPAL_THREAD_LOCK(&module->m_lock);
|
||||
while (OMPI_WIN_EXPOSE_EPOCH & ompi_win_get_mode(win)) {
|
||||
opal_condition_wait(&module->m_cond, &module->m_lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->m_lock);
|
||||
|
||||
/* finish with a barrier */
|
||||
if (ompi_group_size(win->w_group) > 1) {
|
||||
ret = module->m_comm->c_coll.coll_barrier(module->m_comm);
|
||||
@ -76,6 +70,7 @@ ompi_osc_rdma_module_free(ompi_win_t *win)
|
||||
|
||||
OBJ_DESTRUCT(&module->m_unlocks_pending);
|
||||
OBJ_DESTRUCT(&module->m_locks_pending);
|
||||
OBJ_DESTRUCT(&module->m_queued_sendreqs);
|
||||
OBJ_DESTRUCT(&module->m_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->m_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->m_acc_lock);
|
||||
|
@ -130,6 +130,8 @@ struct ompi_osc_rdma_module_t {
|
||||
opal_list_t m_copy_pending_sendreqs;
|
||||
unsigned int *m_copy_num_pending_sendreqs;
|
||||
|
||||
opal_list_t m_queued_sendreqs;
|
||||
|
||||
/** start sending data eagerly */
|
||||
bool m_eager_send;
|
||||
|
||||
|
@ -282,6 +282,7 @@ ompi_osc_rdma_component_select(ompi_win_t *win,
|
||||
OBJ_CONSTRUCT(&module->m_acc_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->m_pending_sendreqs, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->m_copy_pending_sendreqs, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->m_queued_sendreqs, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->m_locks_pending, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->m_unlocks_pending, opal_list_t);
|
||||
|
||||
@ -405,6 +406,7 @@ ompi_osc_rdma_component_select(ompi_win_t *win,
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&module->m_unlocks_pending);
|
||||
OBJ_DESTRUCT(&module->m_locks_pending);
|
||||
OBJ_DESTRUCT(&module->m_queued_sendreqs);
|
||||
OBJ_DESTRUCT(&module->m_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->m_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->m_acc_lock);
|
||||
|
@ -43,7 +43,7 @@ create_send_tag(ompi_osc_rdma_module_t *module)
|
||||
newval = (oldval + 1) % mca_pml.pml_max_tag;
|
||||
} while (0 == opal_atomic_cmpset_32(&module->m_tag_counter, oldval, newval));
|
||||
return newval;
|
||||
#elif OMPI_HAVE_THREAD_SUPPORT
|
||||
#else
|
||||
int32_t ret;
|
||||
/* no compare and swap - have to lock the module */
|
||||
OPAL_THREAD_LOCK(&module->m_lock);
|
||||
@ -51,9 +51,6 @@ create_send_tag(ompi_osc_rdma_module_t *module)
|
||||
ret = module->m_tag_counter;
|
||||
OPAL_THREAD_UNLOCK(&module->m_lock);
|
||||
return ret;
|
||||
#else
|
||||
module->m_tag_counter = (module->m_tag_counter + 1) % mca_pml.pml_max_tag;
|
||||
return module->m_tag_counter;
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -117,7 +114,6 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
|
||||
(ompi_osc_rdma_sendreq_t*) descriptor->des_cbdata;
|
||||
ompi_osc_rdma_send_header_t *header =
|
||||
(ompi_osc_rdma_send_header_t*) descriptor->des_src[0].seg_addr.pval;
|
||||
opal_list_item_t *item;
|
||||
ompi_osc_rdma_module_t *module = sendreq->req_module;
|
||||
int32_t count;
|
||||
|
||||
@ -179,25 +175,21 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
|
||||
/* release the descriptor and sendreq */
|
||||
btl->btl_free(btl, descriptor);
|
||||
|
||||
/* any other sendreqs to restart? */
|
||||
while (NULL !=
|
||||
(item = opal_list_remove_first(&(module->m_copy_pending_sendreqs)))) {
|
||||
ompi_osc_rdma_sendreq_t *req =
|
||||
(ompi_osc_rdma_sendreq_t*) item;
|
||||
while (opal_list_get_size(&module->m_queued_sendreqs)) {
|
||||
opal_list_item_t *item;
|
||||
int ret;
|
||||
|
||||
ret = ompi_osc_rdma_sendreq_send(module, req);
|
||||
OPAL_THREAD_LOCK(&module->m_lock);
|
||||
item = opal_list_remove_first(&module->m_queued_sendreqs);
|
||||
OPAL_THREAD_UNLOCK(&module->m_lock);
|
||||
if (NULL == item) break;
|
||||
|
||||
ret = ompi_osc_rdma_sendreq_send(module, (ompi_osc_rdma_sendreq_t*) item);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(5, ompi_osc_base_output,
|
||||
"fence: failure in starting sendreq (%d). Will try later.",
|
||||
ret);
|
||||
opal_list_append(&(module->m_copy_pending_sendreqs), item);
|
||||
|
||||
if (OMPI_ERR_TEMP_OUT_OF_RESOURCE == ret ||
|
||||
OMPI_ERR_OUT_OF_RESOURCE == ret) {
|
||||
break;
|
||||
}
|
||||
OPAL_THREAD_LOCK(&module->m_lock);
|
||||
opal_list_append(&(module->m_queued_sendreqs), item);
|
||||
OPAL_THREAD_UNLOCK(&module->m_lock);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -122,22 +122,21 @@ ompi_osc_rdma_module_fence(int assert, ompi_win_t *win)
|
||||
(ompi_osc_rdma_sendreq_t*) item;
|
||||
|
||||
ret = ompi_osc_rdma_sendreq_send(module, req);
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(5, ompi_osc_base_output,
|
||||
"fence: failure in starting sendreq (%d). "
|
||||
"Will try later.",
|
||||
ret);
|
||||
opal_list_append(&(module->m_copy_pending_sendreqs), item);
|
||||
|
||||
if (OMPI_ERR_TEMP_OUT_OF_RESOURCE == ret ||
|
||||
OMPI_ERR_OUT_OF_RESOURCE == ret) {
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->m_lock);
|
||||
/* if some requests couldn't be started, push into the
|
||||
"queued" list, where we will try to restart them later. */
|
||||
if (opal_list_get_size(&module->m_copy_pending_sendreqs)) {
|
||||
opal_list_join(&module->m_queued_sendreqs,
|
||||
opal_list_get_end(&module->m_queued_sendreqs),
|
||||
&module->m_copy_pending_sendreqs);
|
||||
}
|
||||
|
||||
/* possible we've already received a couple in messages, so
|
||||
atomicall add however many we're going to wait for */
|
||||
module->m_num_pending_in += incoming_reqs;
|
||||
@ -277,17 +276,22 @@ ompi_osc_rdma_module_complete(ompi_win_t *win)
|
||||
(ompi_osc_rdma_sendreq_t*) item;
|
||||
|
||||
ret = ompi_osc_rdma_sendreq_send(module, req);
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(5, ompi_osc_base_output,
|
||||
"complete: failure in starting sendreq (%d). Will try later.",
|
||||
ret);
|
||||
opal_list_append(&(module->m_copy_pending_sendreqs), item);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* wait for all the requests */
|
||||
OPAL_THREAD_LOCK(&module->m_lock);
|
||||
/* if some requests couldn't be started, push into the
|
||||
"queued" list, where we will try to restart them later. */
|
||||
if (opal_list_get_size(&module->m_copy_pending_sendreqs)) {
|
||||
opal_list_join(&module->m_queued_sendreqs,
|
||||
opal_list_get_end(&module->m_queued_sendreqs),
|
||||
&module->m_copy_pending_sendreqs);
|
||||
}
|
||||
|
||||
/* wait for all the requests */
|
||||
while (0 != module->m_num_pending_out) {
|
||||
opal_condition_wait(&module->m_cond, &module->m_lock);
|
||||
}
|
||||
@ -480,24 +484,31 @@ ompi_osc_rdma_module_unlock(int target,
|
||||
module->m_comm->c_my_rank,
|
||||
out_count);
|
||||
|
||||
/* try to start all the requests. We've copied everything we
|
||||
need out of pending_sendreqs, so don't need the lock
|
||||
here */
|
||||
while (NULL !=
|
||||
(item = opal_list_remove_first(&(module->m_copy_pending_sendreqs)))) {
|
||||
ompi_osc_rdma_sendreq_t *req =
|
||||
(ompi_osc_rdma_sendreq_t*) item;
|
||||
|
||||
ret = ompi_osc_rdma_sendreq_send(module, req);
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(5, ompi_osc_base_output,
|
||||
"unlock: failure in starting sendreq (%d). Will try later.",
|
||||
ret);
|
||||
opal_list_append(&(module->m_copy_pending_sendreqs), item);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* wait for all the requests */
|
||||
OPAL_THREAD_LOCK(&module->m_lock);
|
||||
/* if some requests couldn't be started, push into the
|
||||
"queued" list, where we will try to restart them later. */
|
||||
if (opal_list_get_size(&module->m_copy_pending_sendreqs)) {
|
||||
opal_list_join(&module->m_queued_sendreqs,
|
||||
opal_list_get_end(&module->m_queued_sendreqs),
|
||||
&module->m_copy_pending_sendreqs);
|
||||
}
|
||||
|
||||
/* wait for all the requests */
|
||||
while (0 != module->m_num_pending_out) {
|
||||
opal_condition_wait(&module->m_cond, &module->m_lock);
|
||||
}
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user