1
1
openmpi/ompi/mca/coll/ml/coll_ml_inlines.h
George Bosilca c9e5ab9ed1 Our macros for the OMPI-level free list had one extra argument, a possible return
value to signal that the operation of retrieving the element from the free list
failed. However in this case the returned pointer was set to NULL as well, so the
error code was redundant. Moreover, this was a continuous source of warnings when
the picky mode is on.

The attached parch remove the rc argument from the OMPI_FREE_LIST_GET and
OMPI_FREE_LIST_WAIT macros, and change to check if the item is NULL instead of
using the return code.

This commit was SVN r28722.
2013-07-04 08:34:37 +00:00

582 строки
22 KiB
C

/*
* Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved.
* Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved.
* Copyright (c) 2013 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#ifndef MCA_COLL_ML_INLINES_H
#define MCA_COLL_ML_INLINES_H
#include "ompi_config.h"
BEGIN_C_DECLS
static inline int mca_coll_ml_err(const char* fmt, ...)
{
va_list list;
int ret;
va_start(list, fmt);
ret = vfprintf(stderr, fmt, list);
va_end(list);
return ret;
}
static inline __opal_attribute_always_inline__ int ml_fls(int num)
{
int i = 1;
int j = 0;
if (0 == num) {
return 0;
}
while (i < num) {
i *= 2;
j++;
}
if (i > num) {
j--;
}
return j;
}
static inline __opal_attribute_always_inline__
int mca_coll_ml_buffer_recycling(mca_coll_ml_collective_operation_progress_t *ml_request)
{
mca_coll_ml_module_t *ml_module = (mca_coll_ml_module_t *)ml_request->coll_module;
ml_memory_block_desc_t *ml_memblock = ml_module->payload_block;
uint64_t bank_index = ml_request->fragment_data.buffer_desc->bank_index;
int rc;
opal_atomic_add(&ml_memblock->bank_release_counters[bank_index], 1);
/* Check if the bank is ready for recycling */
if (ml_memblock->bank_release_counters[bank_index] ==
ml_memblock->num_buffers_per_bank ) {
ml_memblock->ready_for_memsync[bank_index] = true;
ML_VERBOSE(10, ("Sync count %d, bank %d", ml_memblock->memsync_counter, bank_index));
assert(ml_memblock->bank_is_busy);
if (ml_memblock->memsync_counter == (int)bank_index) {
while(ml_memblock->ready_for_memsync[ml_memblock->memsync_counter]) {
ML_VERBOSE(10, ("Calling for service barrier: ml_buffer_index - %d %d %d == %d.\n",
ml_request->fragment_data.buffer_desc->buffer_index,
ml_memblock->memsync_counter,
ml_memblock->bank_release_counters[ml_memblock->memsync_counter],
ml_memblock->num_buffers_per_bank));
/* Setting the ready flag to 0 - unready - done */
ml_memblock->ready_for_memsync[ml_memblock->memsync_counter] = false;
rc = mca_coll_ml_memsync_intra(ml_module, ml_memblock->memsync_counter);
if (OMPI_SUCCESS != rc) {
ML_ERROR(("Failed to start memory sync !!!"));
return rc;
}
opal_atomic_add(&ml_memblock->memsync_counter, 1);
if (ml_memblock->memsync_counter == (int)ml_memblock->num_banks) {
ml_memblock->memsync_counter = 0;
}
ML_VERBOSE(10, ("After service barrier."));
}
} else {
ML_VERBOSE(10, ("Out of order %d\n", ml_memblock->memsync_counter));
}
}
return OMPI_SUCCESS;
}
static inline __opal_attribute_always_inline__ int coll_ml_fragment_completion_processing(
mca_coll_ml_collective_operation_progress_t *coll_op)
{
/* local variables */
int ret = OMPI_SUCCESS;
size_t bytes_in_this_frag;
struct full_message_t *full_msg_desc = coll_op->fragment_data.message_descriptor;
bool ready_to_release = true, out_of_resource = false;
ML_VERBOSE(10, ("Coll_op %p processing completion", coll_op));
/* Call unpack/pack function */
if (OPAL_LIKELY(NULL != coll_op->process_fn)) {
ret = coll_op->process_fn(coll_op);
switch(ret) {
case OMPI_SUCCESS:
ML_VERBOSE(10, ("unpack done"));
ready_to_release = true;
break;
case ORTE_ERR_NO_MATCH_YET:
ML_VERBOSE(10, ("unexpected packet"));
ready_to_release = false;
break;
default:
ML_ERROR(("Error, unexpected error code %d", ret));
return ret;
}
}
bytes_in_this_frag = coll_op->fragment_data.fragment_size;
ML_VERBOSE(10, ("Delivered %d bytes in frag %d total %d",
full_msg_desc->n_bytes_delivered,
bytes_in_this_frag,
full_msg_desc->n_bytes_total));
/* check for full message completion */
if(full_msg_desc->n_bytes_delivered + bytes_in_this_frag ==
full_msg_desc->n_bytes_total) {
/* message complete - don't update number of bytes delivered, just
* mark the message complete
*/
full_msg_desc->n_bytes_delivered += bytes_in_this_frag;
/* decrement the number of fragments */
full_msg_desc->n_active--;
ML_VERBOSE(10, ("Signaling completion"));
/* here we need to be sure that we point to the first fragment only */
ompi_request_complete(&(coll_op->fragment_data.message_descriptor->super), true);
coll_op->fragment_data.message_descriptor->super.req_status.MPI_ERROR = OMPI_SUCCESS;
} else {
assert(NULL != coll_op->fragment_data.buffer_desc);
/* update the number of bytes delivered */
full_msg_desc->n_bytes_delivered += bytes_in_this_frag;
/* decrement the number of fragments */
full_msg_desc->n_active--;
/* here we need to start the next fragment */
ML_VERBOSE(10, ("Launch frags for %p", coll_op));
if (full_msg_desc->n_bytes_scheduled < full_msg_desc->n_bytes_total) {
ret = coll_op->fragment_data.message_descriptor->fragment_launcher(coll_op);
if (OPAL_UNLIKELY(OMPI_ERR_TEMP_OUT_OF_RESOURCE == ret)) {
out_of_resource = true;
} else if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ML_VERBOSE(10, ("Failed to launch fragment"));
return ret;
}
}
}
if (ready_to_release) {
/* Check if we have to recycle memory.
* Note: It is safe to recycle ML buffers since the ML buffer data
* already was unpacked to user buffer
*/
if (NULL != coll_op->fragment_data.buffer_desc) {
ret = mca_coll_ml_buffer_recycling(coll_op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
}
/* if this is not fragment 0, return fragment to the free list.
* fragment 0 will be returned in mca_ml_request_free() which
* is called from the MPI wait() and test() routines.
* We can recover the pointer to the fragement descriptor from
* the MPI level request object, wich is the first element
* in the fragment descriptor.
*/
ML_VERBOSE(10, ("Master ? %p %d", coll_op, coll_op->fragment_data.offset_into_user_buffer));
/* offset_into_user_buffer == 0 is not a valid definition for the first frag.
* if (0 != coll_op->fragment_data.offset_into_user_buffer &&
* !out_of_resource) {
* A well posed definition of the first frag is the following
*/
if((&coll_op->full_message !=
coll_op->fragment_data.message_descriptor) &&
!out_of_resource){
/* non-zero offset ==> this is not fragment 0 */
CHECK_AND_RECYCLE(coll_op);
}
}
/* return */
return OMPI_SUCCESS;
}
/* task completion */
static inline __opal_attribute_always_inline__ int coll_ml_task_dependency_processing(
mca_coll_ml_task_status_t *task)
{
/* update dependencies */
mca_coll_ml_collective_operation_progress_t *my_schedule_instance =
task->ml_coll_operation;
int n_dependent_tasks = task->rt_num_dependent_tasks;
int dep_task;
for (dep_task = 0; dep_task < n_dependent_tasks; dep_task++)
{
int task_index;
task_index = task->rt_dependent_task_indecies[dep_task];
my_schedule_instance->dag_description.status_array[task_index].n_dep_satisfied++;
}
/* return */
return OMPI_SUCCESS;
}
/* collective task completion processing -
* "task" may be removed from list in this routine.
* Thread safety is assumed to be handled outside this routine.
*/
static inline __opal_attribute_always_inline__ int mca_coll_ml_task_completion_processing(
mca_coll_ml_task_status_t **task_status_g, opal_list_t *list)
{
/* local variables */
int ret = OMPI_SUCCESS;
mca_coll_ml_task_status_t *task_status = *task_status_g;
mca_coll_ml_collective_operation_progress_t *coll_op =
task_status->ml_coll_operation;
/* Pasha: Since all our collectives so far use the root
flag, I replacing the call for custom call back function
with setting root_flag.
If we will see that we need some custom functionality,
we will enable it later.
*/
task_status->ml_coll_operation->variable_fn_params.root_flag = true;
#if 0
/* process task completion function,
if any was defined */
if (OPAL_LIKELY(NULL != task_status->task_comp_fn)) {
ret = task_status->task_comp_fn(task_status);
if (ret != OMPI_SUCCESS) {
return ret;
}
}
#endif
/* update dependencies */
ret = coll_ml_task_dependency_processing(task_status);
if (ret != OMPI_SUCCESS) {
ML_VERBOSE(3,("Failed to coll_ml_task_dependency_processing"));
return ret;
}
/* process task completion function,
if any was defined */
if (OPAL_LIKELY(NULL != task_status->task_comp_fn)) {
ret = task_status->task_comp_fn(task_status);
if (ret != OMPI_SUCCESS) {
ML_VERBOSE(3,("Failed to task_comp_fn"));
return ret;
}
}
/* remove the descriptor from the incomplete list
(Pasha: if the list was provided) */
/* No need to put this an any new list - it is associcated
* with the mca_coll_ml_collective_operation_progress_t
* descriptor already
*/
if (NULL != list) {
(*task_status_g) = (mca_coll_ml_task_status_t *)
opal_list_remove_item(list, (opal_list_item_t *)(task_status));
}
/* update completion counter */
coll_op->dag_description.num_tasks_completed++;
if(coll_op->dag_description.num_tasks_completed ==
coll_op->coll_schedule->n_fns)
{
/* the actual fragment descriptor is not on any list, as
* we can get at it from the task descriptors
*/
ret = coll_ml_fragment_completion_processing(coll_op);
if (OMPI_SUCCESS != ret) {
ML_VERBOSE(3,("Failed to coll_ml_fragment_completion_processing"));
return ret;
}
}
/* return */
return ret;
}
static inline __opal_attribute_always_inline__ int mca_coll_ml_generic_collectives_append_to_queue(
mca_coll_ml_collective_operation_progress_t *op_prog,
mca_coll_ml_task_setup_fn_t task_setup)
{
int fn_index;
mca_coll_ml_collective_operation_description_t *op_desc =
op_prog->coll_schedule;
mca_coll_ml_compound_functions_t *func = NULL;
mca_coll_ml_task_status_t *task_status = NULL;
mca_coll_ml_component_t *cm = &mca_coll_ml_component;
ML_VERBOSE(9, ("Calling mca_coll_ml_generic_collectives_launcher"));
/* Init all tasks, before we start them */
for (fn_index = 0; fn_index < op_desc->n_fns; fn_index++) {
func = &op_desc->component_functions[fn_index];
task_status = &op_prog->dag_description.status_array[fn_index];
ML_VERBOSE(9, ("Processing function index %d", fn_index));
assert(NULL != func);
/* Init task status */
task_status->n_dep_satisfied = 0; /* start from zero */
task_status->bcol_fn = func->bcol_function;
/* setup run time parametres */
/* Pasha: do we need the if proctection ? */
if (OPAL_LIKELY(NULL != task_setup)) {
task_setup(task_status, fn_index, func);
}
/* the pointer to operation progress supposed to be set during
construction time. Just want to make sure that it is ok */
assert(task_status->ml_coll_operation == op_prog);
/* We assume that all pointer to functions are defined and it
is not reson to check for null */
assert(NULL != func->bcol_function->coll_fn);
/* In order to preserve ordering on all ranks we have to add it to tail */
/* TBD: Need to review the way we launch fragments */
ML_VERBOSE(9, ("The task %p dependency is %d, appending it on pending list",
(void *)task_status, func->num_dependencies));
OPAL_THREAD_LOCK(&(mca_coll_ml_component.pending_tasks_mutex));
opal_list_append(&cm->pending_tasks, (opal_list_item_t *)task_status);
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.pending_tasks_mutex));
}
ML_VERBOSE(9, ("Collective was launched !"));
return OMPI_SUCCESS;
}
static inline __opal_attribute_always_inline__ int mca_coll_ml_generic_collectives_launcher(
mca_coll_ml_collective_operation_progress_t *op_prog,
mca_coll_ml_task_setup_fn_t task_setup)
{
int fn_index;
int rc, ret;
mca_coll_ml_collective_operation_description_t *op_desc =
op_prog->coll_schedule;
mca_coll_ml_compound_functions_t *func = NULL;
mca_coll_ml_task_status_t *task_status = NULL;
mca_coll_ml_component_t *cm = &mca_coll_ml_component;
ML_VERBOSE(9, ("Calling mca_coll_ml_generic_collectives_launcher"));
/* Init all tasks, before we start them */
for (fn_index = 0; fn_index < op_desc->n_fns; fn_index++) {
func = &op_desc->component_functions[fn_index];
task_status = &op_prog->dag_description.status_array[fn_index];
ML_VERBOSE(9, ("Processing function index %d", fn_index));
assert(NULL != func);
/* Init task status */
task_status->n_dep_satisfied = 0; /* start from zero */
/* task_status->my_index_in_coll_schedule = fn_index;
pasha: the value is set during init */
task_status->bcol_fn = func->bcol_function;
/* Pasha: disabling support for custom complition functions
task_status->task_comp_fn = func->task_comp_fn;
*/
/* setup run time parametres */
/* Pasha: do we need the if proctection ? */
if (OPAL_LIKELY(NULL != task_setup)) {
task_setup(task_status, fn_index, func);
}
/* the pointer to operation progress supposed to be set during
construction time. Just want to make sure that it is ok */
assert(task_status->ml_coll_operation == op_prog);
/* Task status is done */
/* launch the task and put it on corresponding list (if required) */
/* We assume that all pointer to functions are defined and it
is not reason to check for null */
assert(NULL != func->bcol_function->coll_fn);
}
/* try to start startable */
for (fn_index = 0; fn_index < op_desc->n_fns; fn_index++) {
func = &op_desc->component_functions[fn_index];
task_status = &op_prog->dag_description.status_array[fn_index];
/* fire the collective imidiate if it has no dependencies */
if (0 == task_status->rt_num_dependencies) {
rc = func->bcol_function->coll_fn(&op_prog->variable_fn_params,
/* Pasha: Need to update the prototype of the func,
right now it is ugly hack for compilation */
(struct coll_ml_function_t *)&func->constant_group_data);
switch(rc) {
case BCOL_FN_NOT_STARTED:
/* put it on pending list */
ML_VERBOSE(9, ("Call to bcol collecitive return BCOL_FN_NOT_STARTED, putting the task on pending list"));
OPAL_THREAD_LOCK(&(mca_coll_ml_component.pending_tasks_mutex));
opal_list_append(&cm->pending_tasks, (opal_list_item_t *)task_status);
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.pending_tasks_mutex));
break;
case BCOL_FN_STARTED:
/* put it on started list */
ML_VERBOSE(9, ("Call to bcol collecitive return BCOL_FN_STARTED, puting the task on active list"));
OPAL_THREAD_LOCK(&(mca_coll_ml_component.active_tasks_mutex));
opal_list_append(&cm->active_tasks, (opal_list_item_t *)task_status);
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.active_tasks_mutex));
break;
case BCOL_FN_COMPLETE:
/* the tast is done ! lets start relevant dependencies */
ML_VERBOSE(9, ("Call to bcol collecitive return BCOL_FN_COMPLETE"));
/* the task does not belong to any list, yes. So passing NULL */
ret = mca_coll_ml_task_completion_processing(&task_status, NULL);
if (OMPI_SUCCESS != ret) {
ML_VERBOSE(9, ("Failed to mca_coll_ml_task_completion_processing"));
return ret;
}
break;
default:
ML_ERROR(("Unknow exit status %d", rc));
return OMPI_ERROR;
}
} else {
/* the task is depend on other, lets put it on pending list */
ML_VERBOSE(9, ("The task %p dependency is %d, putting it on pending list",
(void *)task_status, func->num_dependencies));
OPAL_THREAD_LOCK(&(mca_coll_ml_component.pending_tasks_mutex));
opal_list_append(&cm->pending_tasks, (opal_list_item_t *)task_status);
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.pending_tasks_mutex));
}
}
ML_VERBOSE(9, ("Collective was launched !"));
return OMPI_SUCCESS;
}
static inline __opal_attribute_always_inline__ mca_coll_ml_collective_operation_progress_t *
mca_coll_ml_alloc_op_prog_single_frag_dag(
mca_coll_ml_module_t *ml_module,
mca_coll_ml_collective_operation_description_t *coll_schedule,
void *src, void *dst, size_t total_bytes,
size_t offset_into_user_buffer
)
{
ompi_free_list_item_t *item;
mca_coll_ml_collective_operation_progress_t *coll_op = NULL;
ompi_request_t *req;
/* Blocking call on fragment allocation (Maybe we want to make it non blocking ?) */
OMPI_FREE_LIST_WAIT(&(ml_module->coll_ml_collective_descriptors),
item);
coll_op = (mca_coll_ml_collective_operation_progress_t *) item;
ML_VERBOSE(10, (">>> Allocating coll op %p", coll_op));
assert(NULL != coll_op);
assert(coll_op->dag_description.status_array[0].item.opal_list_item_refcount == 0);
req = &(coll_op->full_message.super);
OMPI_REQUEST_INIT(req, false);
/* Mark the request ACTIVE. It is critical for MPI_Test()*/
req->req_state = OMPI_REQUEST_ACTIVE;
req->req_status._cancelled = 0;
req->req_status.MPI_ERROR = OMPI_SUCCESS;
MCA_COLL_ML_OP_BASIC_SETUP(coll_op, total_bytes,
offset_into_user_buffer, src, dst, coll_schedule);
/* We do not set sequential, since it is not sequential call */
coll_op->dag_description.num_tasks_completed = 0;
/* Release reference counter have to be zero */
assert(0 == coll_op->pending);
return coll_op;
}
static inline __opal_attribute_always_inline__ mca_coll_ml_collective_operation_progress_t *
mca_coll_ml_duplicate_op_prog_single_frag_dag(
mca_coll_ml_module_t *ml_module,
mca_coll_ml_collective_operation_progress_t *old_op)
{
mca_coll_ml_collective_operation_progress_t *new_op = NULL;
new_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module,
ml_module->coll_ml_bcast_functions[old_op->fragment_data.current_coll_op],
old_op->fragment_data.message_descriptor->dest_user_addr,
old_op->fragment_data.message_descriptor->src_user_addr,
old_op->fragment_data.message_descriptor->n_bytes_total,
old_op->fragment_data.message_descriptor->n_bytes_scheduled);
new_op->fragment_data.current_coll_op = old_op->fragment_data.current_coll_op;
new_op->fragment_data.message_descriptor = old_op->fragment_data.message_descriptor;
return new_op;
}
static inline __opal_attribute_always_inline__ mca_coll_ml_collective_operation_progress_t *
mca_coll_ml_alloc_op_prog_single_frag_seq(
mca_coll_ml_module_t *ml_module,
mca_coll_ml_collective_operation_description_t *coll_schedule,
void *src, void *dst,
size_t total_bytes,
size_t offset_into_user_buffer
)
{
ompi_free_list_item_t *item;
mca_coll_ml_collective_operation_progress_t *coll_op = NULL;
/* Blocking call on fragment allocation (Maybe we want to make it non blocking ?) */
OMPI_FREE_LIST_WAIT(&(ml_module->coll_ml_collective_descriptors),
item);
coll_op = (mca_coll_ml_collective_operation_progress_t *) item;
assert(NULL != coll_op);
MCA_COLL_ML_OP_BASIC_SETUP(coll_op, total_bytes,
offset_into_user_buffer, src, dst, coll_schedule);
/* set sequential data */
/* pasha - do we have something to set ? */
return coll_op;
}
static inline __opal_attribute_always_inline__
void mca_coll_ml_convertor_get_send_frag_size(mca_coll_ml_module_t *ml_module,
size_t *frag_size, struct full_message_t *message_descriptor)
{
size_t ml_fragment_size = ml_module->ml_fragment_size;
opal_convertor_t *dummy_convertor = &message_descriptor->dummy_convertor;
/* The last frag needs special service */
if (ml_fragment_size >
message_descriptor->send_converter_bytes_packed) {
*frag_size = message_descriptor->send_converter_bytes_packed;
message_descriptor->send_converter_bytes_packed = 0;
return;
}
*frag_size = ml_fragment_size;
message_descriptor->dummy_conv_position += ml_fragment_size;
opal_convertor_generic_simple_position(dummy_convertor, &message_descriptor->dummy_conv_position);
*frag_size -= dummy_convertor->partial_length;
message_descriptor->send_converter_bytes_packed -= (*frag_size);
}
END_C_DECLS
#endif