Merge pull request #1983 from hjelmn/request_cb
ompi/request: change semantics of ompi request callbacks
Этот коммит содержится в:
Коммит
e5c7512692
@ -265,9 +265,6 @@ struct ompi_osc_pt2pt_module_t {
|
||||
/** Lock for garbage collection lists */
|
||||
opal_mutex_t gc_lock;
|
||||
|
||||
/** List of requests that need to be freed */
|
||||
opal_list_t request_gc;
|
||||
|
||||
/** List of buffers that need to be freed */
|
||||
opal_list_t buffer_gc;
|
||||
};
|
||||
@ -658,14 +655,13 @@ static inline void osc_pt2pt_copy_for_send (void *target, size_t target_len, con
|
||||
}
|
||||
|
||||
/**
|
||||
* osc_pt2pt_request_gc_clean:
|
||||
* osc_pt2pt_gc_clean:
|
||||
*
|
||||
* @short Release finished PML requests and accumulate buffers.
|
||||
*
|
||||
* @long This function exists because it is not possible to free a PML request
|
||||
* or buffer from a request completion callback. We instead put requests
|
||||
* and buffers on the module's garbage collection lists and release then
|
||||
* at a later time.
|
||||
* @long This function exists because it is not possible to free a buffer from
|
||||
* a request completion callback. We instead put requests and buffers on the
|
||||
* module's garbage collection lists and release then at a later time.
|
||||
*/
|
||||
static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module)
|
||||
{
|
||||
@ -673,26 +669,12 @@ static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module)
|
||||
opal_list_item_t *item;
|
||||
|
||||
OPAL_THREAD_LOCK(&module->gc_lock);
|
||||
|
||||
while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) {
|
||||
OPAL_THREAD_UNLOCK(&module->gc_lock);
|
||||
ompi_request_free (&request);
|
||||
OPAL_THREAD_LOCK(&module->gc_lock);
|
||||
}
|
||||
|
||||
while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->gc_lock);
|
||||
}
|
||||
|
||||
static inline void osc_pt2pt_gc_add_request (ompi_osc_pt2pt_module_t *module, ompi_request_t *request)
|
||||
{
|
||||
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
|
||||
opal_list_append (&module->request_gc, (opal_list_item_t *) request));
|
||||
}
|
||||
|
||||
static inline void osc_pt2pt_gc_add_buffer (ompi_osc_pt2pt_module_t *module, opal_list_item_t *buffer)
|
||||
{
|
||||
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
|
||||
|
@ -45,10 +45,9 @@ static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request)
|
||||
|
||||
mark_outgoing_completion(module);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
ompi_request_free (&request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
|
||||
@ -101,10 +100,9 @@ static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request)
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
|
||||
assert (NULL != module);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
ompi_request_free (&request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* self communication optimizations */
|
||||
|
@ -320,7 +320,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
|
||||
OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
|
||||
OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->request_gc, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t);
|
||||
|
@ -238,10 +238,8 @@ static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request)
|
||||
/* free the temporary buffer */
|
||||
free (ctx);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
ompi_request_free (&request);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -437,10 +435,8 @@ static int osc_pt2pt_incoming_req_complete (ompi_request_t *request)
|
||||
|
||||
mark_incoming_completion (module, rank);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
ompi_request_free (&request);
|
||||
return 1;
|
||||
}
|
||||
|
||||
struct osc_pt2pt_get_post_send_cb_data_t {
|
||||
@ -460,10 +456,8 @@ static int osc_pt2pt_get_post_send_cb (ompi_request_t *request)
|
||||
/* mark this as a completed "incoming" request */
|
||||
mark_incoming_completion (module, rank);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
ompi_request_free (&request);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -699,9 +693,7 @@ static int accumulate_cb (ompi_request_t *request)
|
||||
osc_pt2pt_gc_add_buffer (module, &acc_data->super);
|
||||
}
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
ompi_request_free (&request);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -771,13 +763,11 @@ static int replace_cb (ompi_request_t *request)
|
||||
|
||||
mark_incoming_completion (module, rank);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
/* unlock the accumulate lock */
|
||||
ompi_osc_pt2pt_accumulate_unlock (module);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
ompi_request_free (&request);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1435,13 +1425,11 @@ static int process_large_datatype_request_cb (ompi_request_t *request)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
/* free the datatype buffer */
|
||||
osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
ompi_request_free (&request);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -37,11 +37,9 @@ static int frag_send_cb (ompi_request_t *request)
|
||||
mark_outgoing_completion(module);
|
||||
opal_free_list_return (&mca_osc_pt2pt_component.frags, &frag->super);
|
||||
|
||||
ompi_request_free (&request);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int frag_send (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *frag)
|
||||
|
@ -79,7 +79,6 @@ int ompi_osc_pt2pt_free(ompi_win_t *win)
|
||||
OPAL_LIST_DESTRUCT(&module->pending_acc);
|
||||
|
||||
osc_pt2pt_gc_clean (module);
|
||||
OPAL_LIST_DESTRUCT(&module->request_gc);
|
||||
OPAL_LIST_DESTRUCT(&module->buffer_gc);
|
||||
OBJ_DESTRUCT(&module->gc_lock);
|
||||
|
||||
|
@ -67,8 +67,10 @@ typedef int (*ompi_request_cancel_fn_t)(struct ompi_request_t* request, int flag
|
||||
|
||||
/*
|
||||
* Optional function called when the request is completed from the MPI
|
||||
* library perspective. This function is not allowed to release any
|
||||
* ressources related to the request.
|
||||
* library perspective. This function is allowed to release the request if
|
||||
* the request will not be used with ompi_request_wait* or ompi_request_test.
|
||||
* If the function reposts (using start) a request or calls ompi_request_free()
|
||||
* on the request it *MUST* return 1. It should return 0 otherwise.
|
||||
*/
|
||||
typedef int (*ompi_request_complete_fn_t)(struct ompi_request_t* request);
|
||||
|
||||
@ -412,24 +414,28 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
|
||||
*/
|
||||
static inline int ompi_request_complete(ompi_request_t* request, bool with_signal)
|
||||
{
|
||||
int rc = 0;
|
||||
|
||||
if( NULL != request->req_complete_cb) {
|
||||
request->req_complete_cb( request );
|
||||
rc = request->req_complete_cb( request );
|
||||
request->req_complete_cb = NULL;
|
||||
}
|
||||
|
||||
if( OPAL_LIKELY(with_signal) ) {
|
||||
if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, REQUEST_COMPLETED)) {
|
||||
ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete,
|
||||
REQUEST_COMPLETED);
|
||||
/* In the case where another thread concurrently changed the request to REQUEST_PENDING */
|
||||
if( REQUEST_PENDING != tmp_sync )
|
||||
wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR);
|
||||
}
|
||||
} else
|
||||
request->req_complete = REQUEST_COMPLETED;
|
||||
if (0 == rc) {
|
||||
if( OPAL_LIKELY(with_signal) ) {
|
||||
if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, REQUEST_COMPLETED)) {
|
||||
ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete,
|
||||
REQUEST_COMPLETED);
|
||||
/* In the case where another thread concurrently changed the request to REQUEST_PENDING */
|
||||
if( REQUEST_PENDING != tmp_sync )
|
||||
wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR);
|
||||
}
|
||||
} else
|
||||
request->req_complete = REQUEST_COMPLETED;
|
||||
|
||||
if( OPAL_UNLIKELY(MPI_SUCCESS != request->req_status.MPI_ERROR) ) {
|
||||
ompi_request_failed++;
|
||||
if( OPAL_UNLIKELY(MPI_SUCCESS != request->req_status.MPI_ERROR) ) {
|
||||
ompi_request_failed++;
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user