0425a7a7d8
This fix ensures that all progress functions return the number of
completed events.
Signed-off-by: George Bosilca <bosilca@icl.utk.edu>
(cherry picked from commit 72501f8f9c
)
273 строки
8.4 KiB
C
273 строки
8.4 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2013-2016 Los Alamos National Security, LLC. All rights
|
|
* reseved.
|
|
* Copyright (c) 2015 Research Organization for Information Science
|
|
* and Technology (RIST). All rights reserved.
|
|
* Copyright (c) 2004-2016 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2016 IBM Corporation. All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "comm_request.h"
|
|
|
|
#include "opal/class/opal_free_list.h"
|
|
#include "opal/include/opal/sys/atomic.h"
|
|
|
|
static opal_free_list_t ompi_comm_requests;
|
|
static opal_list_t ompi_comm_requests_active;
|
|
static opal_mutex_t ompi_comm_request_mutex;
|
|
bool ompi_comm_request_progress_active = false;
|
|
bool ompi_comm_request_initialized = false;
|
|
|
|
typedef struct ompi_comm_request_item_t {
|
|
opal_list_item_t super;
|
|
ompi_comm_request_callback_fn_t callback;
|
|
ompi_request_t *subreqs[OMPI_COMM_REQUEST_MAX_SUBREQ];
|
|
int subreq_count;
|
|
} ompi_comm_request_item_t;
|
|
OBJ_CLASS_DECLARATION(ompi_comm_request_item_t);
|
|
|
|
static int ompi_comm_request_progress (void);
|
|
|
|
void ompi_comm_request_init (void)
|
|
{
|
|
OBJ_CONSTRUCT(&ompi_comm_requests, opal_free_list_t);
|
|
(void) opal_free_list_init (&ompi_comm_requests, sizeof (ompi_comm_request_t), 8,
|
|
OBJ_CLASS(ompi_comm_request_t), 0, 0, 0, -1, 8,
|
|
NULL, 0, NULL, NULL, NULL);
|
|
|
|
OBJ_CONSTRUCT(&ompi_comm_requests_active, opal_list_t);
|
|
ompi_comm_request_progress_active = false;
|
|
OBJ_CONSTRUCT(&ompi_comm_request_mutex, opal_mutex_t);
|
|
ompi_comm_request_initialized = true;
|
|
}
|
|
|
|
void ompi_comm_request_fini (void)
|
|
{
|
|
if (!ompi_comm_request_initialized) {
|
|
return;
|
|
}
|
|
|
|
ompi_comm_request_initialized = false;
|
|
|
|
opal_mutex_lock (&ompi_comm_request_mutex);
|
|
if (ompi_comm_request_progress_active) {
|
|
opal_progress_unregister (ompi_comm_request_progress);
|
|
}
|
|
opal_mutex_unlock (&ompi_comm_request_mutex);
|
|
OBJ_DESTRUCT(&ompi_comm_request_mutex);
|
|
OBJ_DESTRUCT(&ompi_comm_requests_active);
|
|
OBJ_DESTRUCT(&ompi_comm_requests);
|
|
}
|
|
|
|
|
|
int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
|
|
ompi_request_t *subreqs[], int subreq_count)
|
|
{
|
|
ompi_comm_request_item_t *request_item;
|
|
int i;
|
|
|
|
if (subreq_count > OMPI_COMM_REQUEST_MAX_SUBREQ) {
|
|
return OMPI_ERR_BAD_PARAM;
|
|
}
|
|
|
|
request_item = OBJ_NEW(ompi_comm_request_item_t);
|
|
if (NULL == request_item) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
request_item->callback = callback;
|
|
|
|
for (i = 0 ; i < subreq_count ; ++i) {
|
|
request_item->subreqs[i] = subreqs[i];
|
|
}
|
|
|
|
request_item->subreq_count = subreq_count;
|
|
|
|
opal_list_append (&request->schedule, &request_item->super);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
static int ompi_comm_request_progress (void)
|
|
{
|
|
ompi_comm_request_t *request, *next;
|
|
static int32_t progressing = 0;
|
|
int completed = 0;
|
|
|
|
/* don't allow re-entry */
|
|
if (opal_atomic_swap_32 (&progressing, 1)) {
|
|
return 0;
|
|
}
|
|
|
|
opal_mutex_lock (&ompi_comm_request_mutex);
|
|
|
|
OPAL_LIST_FOREACH_SAFE(request, next, &ompi_comm_requests_active, ompi_comm_request_t) {
|
|
int rc = OMPI_SUCCESS;
|
|
|
|
if (opal_list_get_size (&request->schedule)) {
|
|
ompi_comm_request_item_t *request_item = (ompi_comm_request_item_t *) opal_list_remove_first (&request->schedule);
|
|
int item_complete = true;
|
|
|
|
/* don't call ompi_request_test_all as it causes a recursive call into opal_progress */
|
|
while (request_item->subreq_count) {
|
|
ompi_request_t *subreq = request_item->subreqs[request_item->subreq_count-1];
|
|
if( REQUEST_COMPLETE(subreq) ) {
|
|
ompi_request_free (&subreq);
|
|
request_item->subreq_count--;
|
|
completed++;
|
|
} else {
|
|
item_complete = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (item_complete) {
|
|
if (request_item->callback) {
|
|
opal_mutex_unlock (&ompi_comm_request_mutex);
|
|
rc = request_item->callback (request);
|
|
opal_mutex_lock (&ompi_comm_request_mutex);
|
|
}
|
|
OBJ_RELEASE(request_item);
|
|
} else {
|
|
opal_list_prepend (&request->schedule, &request_item->super);
|
|
}
|
|
}
|
|
|
|
/* if the request schedule is empty then the request is complete */
|
|
if (0 == opal_list_get_size (&request->schedule)) {
|
|
opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
|
|
request->super.req_status.MPI_ERROR = (OMPI_SUCCESS == rc) ? MPI_SUCCESS : MPI_ERR_INTERN;
|
|
ompi_request_complete (&request->super, true);
|
|
}
|
|
}
|
|
|
|
if (0 == opal_list_get_size (&ompi_comm_requests_active)) {
|
|
/* no more active requests. disable this progress function */
|
|
ompi_comm_request_progress_active = false;
|
|
opal_progress_unregister (ompi_comm_request_progress);
|
|
}
|
|
|
|
opal_mutex_unlock (&ompi_comm_request_mutex);
|
|
progressing = 0;
|
|
|
|
return completed;
|
|
}
|
|
|
|
void ompi_comm_request_start (ompi_comm_request_t *request)
|
|
{
|
|
opal_mutex_lock (&ompi_comm_request_mutex);
|
|
opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request);
|
|
|
|
/* check if we need to start the communicator request progress function */
|
|
if (!ompi_comm_request_progress_active) {
|
|
opal_progress_register (ompi_comm_request_progress);
|
|
ompi_comm_request_progress_active = true;
|
|
}
|
|
|
|
request->super.req_state = OMPI_REQUEST_ACTIVE;
|
|
|
|
opal_mutex_unlock (&ompi_comm_request_mutex);
|
|
}
|
|
|
|
static int ompi_comm_request_cancel (struct ompi_request_t *ompi_req, int complete)
|
|
{
|
|
ompi_comm_request_t *tmp, *request = (ompi_comm_request_t *) ompi_req;
|
|
ompi_comm_request_item_t *item, *next;
|
|
|
|
opal_mutex_lock (&ompi_comm_request_mutex);
|
|
|
|
OPAL_LIST_FOREACH_SAFE(item, next, &request->schedule, ompi_comm_request_item_t) {
|
|
for (int i = 0 ; i < item->subreq_count ; ++i) {
|
|
ompi_request_cancel (item->subreqs[i]);
|
|
}
|
|
|
|
opal_list_remove_item (&request->schedule, &item->super);
|
|
OBJ_RELEASE(item);
|
|
}
|
|
|
|
/* remove the request for the list of active requests */
|
|
OPAL_LIST_FOREACH(tmp, &ompi_comm_requests_active, ompi_comm_request_t) {
|
|
if (tmp == request) {
|
|
opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
|
|
break;
|
|
}
|
|
}
|
|
|
|
opal_mutex_unlock (&ompi_comm_request_mutex);
|
|
|
|
return MPI_ERR_REQUEST;
|
|
}
|
|
|
|
static int ompi_comm_request_free (struct ompi_request_t **ompi_req)
|
|
{
|
|
ompi_comm_request_t *request = (ompi_comm_request_t *) *ompi_req;
|
|
|
|
if( !REQUEST_COMPLETE(*ompi_req) ) {
|
|
return MPI_ERR_REQUEST;
|
|
}
|
|
|
|
OMPI_REQUEST_FINI(*ompi_req);
|
|
ompi_comm_request_return (request);
|
|
|
|
*ompi_req = MPI_REQUEST_NULL;
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
static void ompi_comm_request_construct (ompi_comm_request_t *request)
|
|
{
|
|
request->context = NULL;
|
|
|
|
request->super.req_type = OMPI_REQUEST_COMM;
|
|
request->super.req_status._cancelled = 0;
|
|
request->super.req_free = ompi_comm_request_free;
|
|
request->super.req_cancel = ompi_comm_request_cancel;
|
|
|
|
OBJ_CONSTRUCT(&request->schedule, opal_list_t);
|
|
}
|
|
|
|
static void ompi_comm_request_destruct (ompi_comm_request_t *request)
|
|
{
|
|
OBJ_DESTRUCT(&request->schedule);
|
|
}
|
|
|
|
OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t,
|
|
ompi_comm_request_construct,
|
|
ompi_comm_request_destruct);
|
|
|
|
OBJ_CLASS_INSTANCE(ompi_comm_request_item_t, opal_list_item_t, NULL, NULL);
|
|
|
|
ompi_comm_request_t *ompi_comm_request_get (void)
|
|
{
|
|
opal_free_list_item_t *item;
|
|
|
|
item = opal_free_list_get (&ompi_comm_requests);
|
|
if (OPAL_UNLIKELY(NULL == item)) {
|
|
return NULL;
|
|
}
|
|
|
|
OMPI_REQUEST_INIT((ompi_request_t *) item, false);
|
|
|
|
return (ompi_comm_request_t *) item;
|
|
}
|
|
|
|
void ompi_comm_request_return (ompi_comm_request_t *request)
|
|
{
|
|
if (request->context) {
|
|
OBJ_RELEASE (request->context);
|
|
request->context = NULL;
|
|
}
|
|
|
|
OMPI_REQUEST_FINI(&request->super);
|
|
opal_free_list_return (&ompi_comm_requests, (opal_free_list_item_t *) request);
|
|
}
|
|
|