1
1
openmpi/ompi/communicator/comm_request.c

269 строки
8.3 KiB
C
Исходник Обычный вид История

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2013-2016 Los Alamos National Security, LLC. All rights
* reseved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2004-2016 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "comm_request.h"
#include "opal/class/opal_free_list.h"
#include "opal/include/opal/sys/atomic.h"
static opal_free_list_t ompi_comm_requests;
static opal_list_t ompi_comm_requests_active;
static opal_mutex_t ompi_comm_request_mutex;
bool ompi_comm_request_progress_active = false;
bool ompi_comm_request_initialized = false;
typedef struct ompi_comm_request_item_t {
opal_list_item_t super;
ompi_comm_request_callback_fn_t callback;
ompi_request_t *subreqs[OMPI_COMM_REQUEST_MAX_SUBREQ];
int subreq_count;
} ompi_comm_request_item_t;
OBJ_CLASS_DECLARATION(ompi_comm_request_item_t);
static int ompi_comm_request_progress (void);
void ompi_comm_request_init (void)
{
OBJ_CONSTRUCT(&ompi_comm_requests, opal_free_list_t);
(void) opal_free_list_init (&ompi_comm_requests, sizeof (ompi_comm_request_t), 8,
OBJ_CLASS(ompi_comm_request_t), 0, 0, 0, -1, 8,
NULL, 0, NULL, NULL, NULL);
OBJ_CONSTRUCT(&ompi_comm_requests_active, opal_list_t);
ompi_comm_request_progress_active = false;
OBJ_CONSTRUCT(&ompi_comm_request_mutex, opal_mutex_t);
ompi_comm_request_initialized = true;
}
void ompi_comm_request_fini (void)
{
if (!ompi_comm_request_initialized) {
return;
}
ompi_comm_request_initialized = false;
opal_mutex_lock (&ompi_comm_request_mutex);
if (ompi_comm_request_progress_active) {
opal_progress_unregister (ompi_comm_request_progress);
}
opal_mutex_unlock (&ompi_comm_request_mutex);
OBJ_DESTRUCT(&ompi_comm_request_mutex);
OBJ_DESTRUCT(&ompi_comm_requests_active);
OBJ_DESTRUCT(&ompi_comm_requests);
}
int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
ompi_request_t *subreqs[], int subreq_count)
{
ompi_comm_request_item_t *request_item;
int i;
if (subreq_count > OMPI_COMM_REQUEST_MAX_SUBREQ) {
return OMPI_ERR_BAD_PARAM;
}
request_item = OBJ_NEW(ompi_comm_request_item_t);
if (NULL == request_item) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
request_item->callback = callback;
for (i = 0 ; i < subreq_count ; ++i) {
request_item->subreqs[i] = subreqs[i];
}
request_item->subreq_count = subreq_count;
opal_list_append (&request->schedule, &request_item->super);
return OMPI_SUCCESS;
}
static int ompi_comm_request_progress (void)
{
ompi_comm_request_t *request, *next;
static int32_t progressing = 0;
/* don't allow re-entry */
if (opal_atomic_swap_32 (&progressing, 1)) {
return 0;
}
opal_mutex_lock (&ompi_comm_request_mutex);
OPAL_LIST_FOREACH_SAFE(request, next, &ompi_comm_requests_active, ompi_comm_request_t) {
int rc = OMPI_SUCCESS;
if (opal_list_get_size (&request->schedule)) {
ompi_comm_request_item_t *request_item = (ompi_comm_request_item_t *) opal_list_remove_first (&request->schedule);
int item_complete = true;
/* don't call ompi_request_test_all as it causes a recursive call into opal_progress */
while (request_item->subreq_count) {
ompi_request_t *subreq = request_item->subreqs[request_item->subreq_count-1];
if( REQUEST_COMPLETE(subreq) ) {
ompi_request_free (&subreq);
request_item->subreq_count--;
} else {
item_complete = false;
break;
}
}
if (item_complete) {
if (request_item->callback) {
opal_mutex_unlock (&ompi_comm_request_mutex);
rc = request_item->callback (request);
opal_mutex_lock (&ompi_comm_request_mutex);
}
OBJ_RELEASE(request_item);
} else {
opal_list_prepend (&request->schedule, &request_item->super);
}
}
/* if the request schedule is empty then the request is complete */
if (0 == opal_list_get_size (&request->schedule)) {
opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
request->super.req_status.MPI_ERROR = (OMPI_SUCCESS == rc) ? MPI_SUCCESS : MPI_ERR_INTERN;
ompi_request_complete (&request->super, true);
}
}
if (0 == opal_list_get_size (&ompi_comm_requests_active)) {
/* no more active requests. disable this progress function */
ompi_comm_request_progress_active = false;
opal_progress_unregister (ompi_comm_request_progress);
}
opal_mutex_unlock (&ompi_comm_request_mutex);
progressing = 0;
return 1;
}
void ompi_comm_request_start (ompi_comm_request_t *request)
{
opal_mutex_lock (&ompi_comm_request_mutex);
opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request);
/* check if we need to start the communicator request progress function */
if (!ompi_comm_request_progress_active) {
opal_progress_register (ompi_comm_request_progress);
ompi_comm_request_progress_active = true;
}
request->super.req_state = OMPI_REQUEST_ACTIVE;
opal_mutex_unlock (&ompi_comm_request_mutex);
}
static int ompi_comm_request_cancel (struct ompi_request_t *ompi_req, int complete)
{
ompi_comm_request_t *tmp, *request = (ompi_comm_request_t *) ompi_req;
ompi_comm_request_item_t *item, *next;
opal_mutex_lock (&ompi_comm_request_mutex);
OPAL_LIST_FOREACH_SAFE(item, next, &request->schedule, ompi_comm_request_item_t) {
for (int i = 0 ; i < item->subreq_count ; ++i) {
ompi_request_cancel (item->subreqs[i]);
}
opal_list_remove_item (&request->schedule, &item->super);
OBJ_RELEASE(item);
}
/* remove the request for the list of active requests */
OPAL_LIST_FOREACH(tmp, &ompi_comm_requests_active, ompi_comm_request_t) {
if (tmp == request) {
opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
break;
}
}
opal_mutex_unlock (&ompi_comm_request_mutex);
return MPI_ERR_REQUEST;
}
static int ompi_comm_request_free (struct ompi_request_t **ompi_req)
{
ompi_comm_request_t *request = (ompi_comm_request_t *) *ompi_req;
if( !REQUEST_COMPLETE(*ompi_req) ) {
return MPI_ERR_REQUEST;
}
OMPI_REQUEST_FINI(*ompi_req);
ompi_comm_request_return (request);
*ompi_req = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
static void ompi_comm_request_construct (ompi_comm_request_t *request)
{
request->context = NULL;
request->super.req_type = OMPI_REQUEST_COMM;
request->super.req_status._cancelled = 0;
request->super.req_free = ompi_comm_request_free;
request->super.req_cancel = ompi_comm_request_cancel;
OBJ_CONSTRUCT(&request->schedule, opal_list_t);
}
static void ompi_comm_request_destruct (ompi_comm_request_t *request)
{
OBJ_DESTRUCT(&request->schedule);
}
OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t,
ompi_comm_request_construct,
ompi_comm_request_destruct);
OBJ_CLASS_INSTANCE(ompi_comm_request_item_t, opal_list_item_t, NULL, NULL);
ompi_comm_request_t *ompi_comm_request_get (void)
{
opal_free_list_item_t *item;
item = opal_free_list_get (&ompi_comm_requests);
if (OPAL_UNLIKELY(NULL == item)) {
return NULL;
}
OMPI_REQUEST_INIT((ompi_request_t *) item, false);
return (ompi_comm_request_t *) item;
}
void ompi_comm_request_return (ompi_comm_request_t *request)
{
if (request->context) {
OBJ_RELEASE (request->context);
}
OMPI_REQUEST_FINI(&request->super);
opal_free_list_return (&ompi_comm_requests, (opal_free_list_item_t *) request);
}