resolve a couple of locking/progress issues
This commit was SVN r5472.
Этот коммит содержится в:
родитель
7713012d92
Коммит
cf7a1c7631
@ -193,7 +193,6 @@ static orte_iof_base_endpoint_t* orte_iof_base_endpoint_lookup(
|
||||
int tag)
|
||||
{
|
||||
ompi_list_item_t* item;
|
||||
OMPI_THREAD_LOCK(&orte_iof_base.iof_lock);
|
||||
for(item = ompi_list_get_first(&orte_iof_base.iof_endpoints);
|
||||
item != ompi_list_get_end(&orte_iof_base.iof_endpoints);
|
||||
item = ompi_list_get_next(item)) {
|
||||
@ -205,7 +204,6 @@ static orte_iof_base_endpoint_t* orte_iof_base_endpoint_lookup(
|
||||
return endpoint;
|
||||
}
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -412,12 +410,14 @@ int orte_iof_base_endpoint_forward(
|
||||
if(ompi_list_get_size(&endpoint->ep_frags) == 1) {
|
||||
ompi_event_add(&endpoint->ep_event,0);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
|
||||
} else {
|
||||
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
|
||||
|
||||
/* acknowledge fragment */
|
||||
orte_iof_base_endpoint_ack(endpoint, frag->frag_hdr.hdr_msg.msg_seq + frag->frag_hdr.hdr_msg.msg_len);
|
||||
orte_iof_base_frag_ack(frag);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -49,19 +49,23 @@ int orte_iof_base_flush(void)
|
||||
|
||||
/* flush any pending output */
|
||||
fflush(NULL);
|
||||
|
||||
|
||||
/* force all file descriptors to be progressed at least once,
|
||||
* wait on a timer callback to be called out of the event loop
|
||||
*/
|
||||
|
||||
OMPI_THREAD_LOCK(&orte_iof_base.iof_lock);
|
||||
if(ompi_event_progress_thread() == false) {
|
||||
OMPI_THREAD_LOCK(&orte_iof_base.iof_lock);
|
||||
ompi_evtimer_set(&ev, orte_iof_base_timer_cb, &flushed);
|
||||
ompi_event_add(&ev, &tv);
|
||||
while(flushed == 0)
|
||||
ompi_condition_wait(&orte_iof_base.iof_condition, &orte_iof_base.iof_lock);
|
||||
} else {
|
||||
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
|
||||
OMPI_THREAD_LOCK(&orte_iof_base.iof_lock);
|
||||
}
|
||||
orte_iof_base.iof_waiting++;
|
||||
|
||||
ompi_evtimer_set(&ev, orte_iof_base_timer_cb, &flushed);
|
||||
ompi_event_add(&ev, &tv);
|
||||
while(flushed == 0)
|
||||
ompi_condition_wait(&orte_iof_base.iof_condition, &orte_iof_base.iof_lock);
|
||||
|
||||
/* wait for all of the endpoints to reach an idle state */
|
||||
pending = ompi_list_get_size(&orte_iof_base.iof_endpoints);
|
||||
while(pending > 0) {
|
||||
@ -75,7 +79,13 @@ int orte_iof_base_flush(void)
|
||||
}
|
||||
}
|
||||
if(pending != 0) {
|
||||
ompi_condition_wait(&orte_iof_base.iof_condition, &orte_iof_base.iof_lock);
|
||||
if(ompi_event_progress_thread() == false) {
|
||||
ompi_condition_wait(&orte_iof_base.iof_condition, &orte_iof_base.iof_lock);
|
||||
} else {
|
||||
OMPI_THREAD_UNLOCK(&orte_iof_base.iof_lock);
|
||||
ompi_event_loop(OMPI_EVLOOP_ONCE);
|
||||
OMPI_THREAD_LOCK(&orte_iof_base.iof_lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
orte_iof_base.iof_waiting--;
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user