Fixing ML COLL compilation issues on some SUN platforms. For more detail see following mail thread:
http://www.open-mpi.org/community/lists/devel/2012/08/11448.php A lot of thanks to Paul Hargrove for the issue analysis and patch testing. Refs trac:3243 This commit was SVN r27178. The following Trac tickets were found above: Ticket 3243 --> https://svn.open-mpi.org/trac/ompi/ticket/3243
Этот коммит содержится в:
родитель
a3b08f5800
Коммит
8cf3c95494
@ -16,6 +16,7 @@ dist_pkgdata_DATA = \
|
||||
not_used_yet =
|
||||
|
||||
sources = coll_ml.h \
|
||||
coll_ml_inlines.h \
|
||||
coll_ml_module.c \
|
||||
coll_ml_allocation.h \
|
||||
coll_ml_allocation.c \
|
||||
|
@ -905,39 +905,28 @@ int mca_coll_ml_fulltree_iboffload_only_hierarchy_discovery(mca_coll_ml_module_t
|
||||
void mca_coll_ml_allreduce_matrix_init(mca_coll_ml_module_t *ml_module,
|
||||
const mca_bcol_base_component_2_0_0_t *bcol_component);
|
||||
|
||||
static inline int mca_coll_ml_err(const char* fmt, ...)
|
||||
{
|
||||
va_list list;
|
||||
int ret;
|
||||
|
||||
va_start(list, fmt);
|
||||
ret = vfprintf(stderr, fmt, list);
|
||||
va_end(list);
|
||||
return ret;
|
||||
}
|
||||
|
||||
#define ML_ERROR(args) \
|
||||
do { \
|
||||
mca_coll_ml_err("[%s]%s[%s:%d:%s] COLL-ML ", \
|
||||
orte_process_info.nodename, \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
__FILE__, __LINE__, __func__); \
|
||||
mca_coll_ml_err args; \
|
||||
mca_coll_ml_err("\n"); \
|
||||
} while(0)
|
||||
do { \
|
||||
mca_coll_ml_err("[%s]%s[%s:%d:%s] COLL-ML ", \
|
||||
orte_process_info.nodename, \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
__FILE__, __LINE__, __func__); \
|
||||
mca_coll_ml_err args; \
|
||||
mca_coll_ml_err("\n"); \
|
||||
} while(0)
|
||||
|
||||
#if OPAL_ENABLE_DEBUG
|
||||
#define ML_VERBOSE(level, args) \
|
||||
do { \
|
||||
if(mca_coll_ml_component.verbose >= level) { \
|
||||
mca_coll_ml_err("[%s]%s[%s:%d:%s] COLL-ML ", \
|
||||
orte_process_info.nodename, \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
__FILE__, __LINE__, __func__); \
|
||||
mca_coll_ml_err args; \
|
||||
mca_coll_ml_err("\n"); \
|
||||
} \
|
||||
} while(0)
|
||||
do { \
|
||||
if(mca_coll_ml_component.verbose >= level) { \
|
||||
mca_coll_ml_err("[%s]%s[%s:%d:%s] COLL-ML ", \
|
||||
orte_process_info.nodename, \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
__FILE__, __LINE__, __func__); \
|
||||
mca_coll_ml_err args; \
|
||||
mca_coll_ml_err("\n"); \
|
||||
} \
|
||||
} while(0)
|
||||
#else
|
||||
#define ML_VERBOSE(level, args)
|
||||
#endif
|
||||
@ -969,435 +958,9 @@ static inline int mca_coll_ml_err(const char* fmt, ...)
|
||||
(op)->fragment_data.message_descriptor->n_bytes_scheduled : \
|
||||
(size_t) OP_ML_MODULE((op))->small_message_thresholds[coll])
|
||||
|
||||
static inline __opal_attribute_always_inline__ int ml_fls(int num)
|
||||
{
|
||||
int i = 1;
|
||||
int j = 0;
|
||||
|
||||
if (0 == num) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
while (i < num) {
|
||||
i *= 2;
|
||||
j++;
|
||||
}
|
||||
|
||||
if (i > num) {
|
||||
j--;
|
||||
}
|
||||
|
||||
return j;
|
||||
}
|
||||
|
||||
/* Abort mpi process in case of fatal error */
|
||||
void mca_coll_ml_abort_ml(char *message);
|
||||
|
||||
static inline __opal_attribute_always_inline__
|
||||
int mca_coll_ml_buffer_recycling(mca_coll_ml_collective_operation_progress_t *ml_request)
|
||||
{
|
||||
mca_coll_ml_module_t *ml_module = (mca_coll_ml_module_t *)ml_request->coll_module;
|
||||
ml_memory_block_desc_t *ml_memblock = ml_module->payload_block;
|
||||
uint64_t bank_index = ml_request->fragment_data.buffer_desc->bank_index;
|
||||
int rc;
|
||||
|
||||
opal_atomic_add(&ml_memblock->bank_release_counters[bank_index], 1);
|
||||
|
||||
/* Check if the bank is ready for recycling */
|
||||
if (ml_memblock->bank_release_counters[bank_index] ==
|
||||
ml_memblock->num_buffers_per_bank ) {
|
||||
ml_memblock->ready_for_memsync[bank_index] = true;
|
||||
|
||||
ML_VERBOSE(10, ("Sync count %d, bank %d", ml_memblock->memsync_counter, bank_index));
|
||||
assert(ml_memblock->bank_is_busy);
|
||||
if (ml_memblock->memsync_counter == (int)bank_index) {
|
||||
while(ml_memblock->ready_for_memsync[ml_memblock->memsync_counter]) {
|
||||
ML_VERBOSE(10, ("Calling for service barrier: ml_buffer_index - %d %d %d == %d.\n",
|
||||
ml_request->fragment_data.buffer_desc->buffer_index,
|
||||
ml_memblock->memsync_counter,
|
||||
ml_memblock->bank_release_counters[ml_memblock->memsync_counter],
|
||||
ml_memblock->num_buffers_per_bank));
|
||||
/* Setting the ready flag to 0 - unready - done */
|
||||
ml_memblock->ready_for_memsync[ml_memblock->memsync_counter] = false;
|
||||
|
||||
rc = mca_coll_ml_memsync_intra(ml_module, ml_memblock->memsync_counter);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
ML_ERROR(("Failed to start memory sync !!!"));
|
||||
return rc;
|
||||
}
|
||||
|
||||
opal_atomic_add(&ml_memblock->memsync_counter, 1);
|
||||
if (ml_memblock->memsync_counter == (int)ml_memblock->num_banks) {
|
||||
ml_memblock->memsync_counter = 0;
|
||||
}
|
||||
ML_VERBOSE(10, ("After service barrier."));
|
||||
}
|
||||
} else {
|
||||
ML_VERBOSE(10, ("Out of order %d\n", ml_memblock->memsync_counter));
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ int coll_ml_fragment_completion_processing(
|
||||
mca_coll_ml_collective_operation_progress_t *coll_op)
|
||||
{
|
||||
/* local variables */
|
||||
int ret = OMPI_SUCCESS;
|
||||
size_t bytes_in_this_frag;
|
||||
struct full_message_t *full_msg_desc = coll_op->fragment_data.message_descriptor;
|
||||
bool ready_to_release = true, out_of_resource = false;
|
||||
|
||||
ML_VERBOSE(10, ("Coll_op %p processing completion", coll_op));
|
||||
/* Call unpack/pack function */
|
||||
if (OPAL_LIKELY(NULL != coll_op->process_fn)) {
|
||||
ret = coll_op->process_fn(coll_op);
|
||||
switch(ret) {
|
||||
case OMPI_SUCCESS:
|
||||
ML_VERBOSE(10, ("unpack done"));
|
||||
ready_to_release = true;
|
||||
break;
|
||||
case ORTE_ERR_NO_MATCH_YET:
|
||||
ML_VERBOSE(10, ("unexpected packet"));
|
||||
ready_to_release = false;
|
||||
break;
|
||||
default:
|
||||
ML_ERROR(("Error, unexpected error code %d", ret));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
bytes_in_this_frag = coll_op->fragment_data.fragment_size;
|
||||
|
||||
ML_VERBOSE(10, ("Delivered %d bytes in frag %d total %d",
|
||||
full_msg_desc->n_bytes_delivered,
|
||||
bytes_in_this_frag,
|
||||
full_msg_desc->n_bytes_total));
|
||||
|
||||
/* check for full message completion */
|
||||
if(full_msg_desc->n_bytes_delivered + bytes_in_this_frag ==
|
||||
full_msg_desc->n_bytes_total) {
|
||||
/* message complete - don't update number of bytes delivered, just
|
||||
* mark the message complete
|
||||
*/
|
||||
full_msg_desc->n_bytes_delivered += bytes_in_this_frag;
|
||||
|
||||
/* decrement the number of fragments */
|
||||
full_msg_desc->n_active--;
|
||||
|
||||
ML_VERBOSE(10, ("Signaling completion"));
|
||||
|
||||
/* here we need to be sure that we point to the first fragment only */
|
||||
ompi_request_complete(&(coll_op->fragment_data.message_descriptor->super), true);
|
||||
coll_op->fragment_data.message_descriptor->super.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
} else {
|
||||
assert(NULL != coll_op->fragment_data.buffer_desc);
|
||||
/* update the number of bytes delivered */
|
||||
full_msg_desc->n_bytes_delivered += bytes_in_this_frag;
|
||||
/* decrement the number of fragments */
|
||||
full_msg_desc->n_active--;
|
||||
/* here we need to start the next fragment */
|
||||
ML_VERBOSE(10, ("Launch frags for %p", coll_op));
|
||||
if (full_msg_desc->n_bytes_scheduled < full_msg_desc->n_bytes_total) {
|
||||
ret = coll_op->fragment_data.message_descriptor->fragment_launcher(coll_op);
|
||||
if (OPAL_UNLIKELY(OMPI_ERR_TEMP_OUT_OF_RESOURCE == ret)) {
|
||||
out_of_resource = true;
|
||||
} else if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
ML_VERBOSE(10, ("Failed to launch fragment"));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ready_to_release) {
|
||||
/* Check if we have to recycle memory.
|
||||
* Note: It is safe to recycle ML buffers since the ML buffer data
|
||||
* already was unpacked to user buffer
|
||||
*/
|
||||
if (NULL != coll_op->fragment_data.buffer_desc) {
|
||||
ret = mca_coll_ml_buffer_recycling(coll_op);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
/* if this is not fragment 0, return fragment to the free list.
|
||||
* fragment 0 will be returned in mca_ml_request_free() which
|
||||
* is called from the MPI wait() and test() routines.
|
||||
* We can recover the pointer to the fragement descriptor from
|
||||
* the MPI level request object, wich is the first element
|
||||
* in the fragment descriptor.
|
||||
*/
|
||||
ML_VERBOSE(10, ("Master ? %p %d", coll_op, coll_op->fragment_data.offset_into_user_buffer));
|
||||
if (0 != coll_op->fragment_data.offset_into_user_buffer &&
|
||||
!out_of_resource) {
|
||||
/* non-zero offset ==> this is not fragment 0 */
|
||||
CHECK_AND_RECYCLE(coll_op);
|
||||
}
|
||||
}
|
||||
|
||||
/* return */
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* task completion */
|
||||
static inline __opal_attribute_always_inline__ int coll_ml_task_dependency_processing(
|
||||
mca_coll_ml_task_status_t *task)
|
||||
{
|
||||
/* update dependencies */
|
||||
mca_coll_ml_collective_operation_progress_t *my_schedule_instance =
|
||||
task->ml_coll_operation;
|
||||
int n_dependent_tasks = task->rt_num_dependent_tasks;
|
||||
int dep_task;
|
||||
|
||||
for (dep_task = 0; dep_task < n_dependent_tasks; dep_task++)
|
||||
{
|
||||
int task_index;
|
||||
task_index = task->rt_dependent_task_indecies[dep_task];
|
||||
my_schedule_instance->dag_description.status_array[task_index].n_dep_satisfied++;
|
||||
}
|
||||
|
||||
/* return */
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* collective task completion processing -
|
||||
* "task" may be removed from list in this routine.
|
||||
* Thread safety is assumed to be handled outside this routine.
|
||||
*/
|
||||
static inline __opal_attribute_always_inline__ int mca_coll_ml_task_completion_processing(
|
||||
mca_coll_ml_task_status_t **task_status_g, opal_list_t *list)
|
||||
{
|
||||
/* local variables */
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_coll_ml_task_status_t *task_status = *task_status_g;
|
||||
|
||||
mca_coll_ml_collective_operation_progress_t *coll_op =
|
||||
task_status->ml_coll_operation;
|
||||
|
||||
/* Pasha: Since all our collectives so far use the root
|
||||
flag, I replacing the call for custom call back function
|
||||
with setting root_flag.
|
||||
If we will see that we need some custom functionality,
|
||||
we will enable it later.
|
||||
*/
|
||||
|
||||
task_status->ml_coll_operation->variable_fn_params.root_flag = true;
|
||||
|
||||
#if 0
|
||||
/* process task completion function,
|
||||
if any was defined */
|
||||
if (OPAL_LIKELY(NULL != task_status->task_comp_fn)) {
|
||||
ret = task_status->task_comp_fn(task_status);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* update dependencies */
|
||||
ret = coll_ml_task_dependency_processing(task_status);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
ML_VERBOSE(3,("Failed to coll_ml_task_dependency_processing"));
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* process task completion function,
|
||||
if any was defined */
|
||||
if (OPAL_LIKELY(NULL != task_status->task_comp_fn)) {
|
||||
ret = task_status->task_comp_fn(task_status);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
ML_VERBOSE(3,("Failed to task_comp_fn"));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/* remove the descriptor from the incomplete list
|
||||
(Pasha: if the list was provided) */
|
||||
/* No need to put this an any new list - it is associcated
|
||||
* with the mca_coll_ml_collective_operation_progress_t
|
||||
* descriptor already
|
||||
*/
|
||||
|
||||
if (NULL != list) {
|
||||
(*task_status_g) = (mca_coll_ml_task_status_t *)
|
||||
opal_list_remove_item(list, (opal_list_item_t *)(task_status));
|
||||
}
|
||||
|
||||
/* update completion counter */
|
||||
coll_op->dag_description.num_tasks_completed++;
|
||||
|
||||
if(coll_op->dag_description.num_tasks_completed ==
|
||||
coll_op->coll_schedule->n_fns)
|
||||
{
|
||||
/* the actual fragment descriptor is not on any list, as
|
||||
* we can get at it from the task descriptors
|
||||
*/
|
||||
ret = coll_ml_fragment_completion_processing(coll_op);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
ML_VERBOSE(3,("Failed to coll_ml_fragment_completion_processing"));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/* return */
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ int mca_coll_ml_generic_collectives_append_to_queue(
|
||||
mca_coll_ml_collective_operation_progress_t *op_prog,
|
||||
mca_coll_ml_task_setup_fn_t task_setup)
|
||||
{
|
||||
int fn_index;
|
||||
mca_coll_ml_collective_operation_description_t *op_desc =
|
||||
op_prog->coll_schedule;
|
||||
mca_coll_ml_compound_functions_t *func = NULL;
|
||||
mca_coll_ml_task_status_t *task_status = NULL;
|
||||
mca_coll_ml_component_t *cm = &mca_coll_ml_component;
|
||||
|
||||
ML_VERBOSE(9, ("Calling mca_coll_ml_generic_collectives_launcher"));
|
||||
|
||||
/* Init all tasks, before we start them */
|
||||
for (fn_index = 0; fn_index < op_desc->n_fns; fn_index++) {
|
||||
func = &op_desc->component_functions[fn_index];
|
||||
task_status = &op_prog->dag_description.status_array[fn_index];
|
||||
|
||||
ML_VERBOSE(9, ("Processing function index %d", fn_index));
|
||||
|
||||
assert(NULL != func);
|
||||
|
||||
/* Init task status */
|
||||
task_status->n_dep_satisfied = 0; /* start from zero */
|
||||
task_status->bcol_fn = func->bcol_function;
|
||||
/* setup run time parametres */
|
||||
/* Pasha: do we need the if proctection ? */
|
||||
if (OPAL_LIKELY(NULL != task_setup)) {
|
||||
task_setup(task_status, fn_index, func);
|
||||
}
|
||||
|
||||
/* the pointer to operation progress supposed to be set during
|
||||
construction time. Just want to make sure that it is ok */
|
||||
assert(task_status->ml_coll_operation == op_prog);
|
||||
|
||||
/* We assume that all pointer to functions are defined and it
|
||||
is not reson to check for null */
|
||||
assert(NULL != func->bcol_function->coll_fn);
|
||||
|
||||
/* In order to preserve ordering on all ranks we have to add it to tail */
|
||||
/* TBD: Need to review the way we launch fragments */
|
||||
ML_VERBOSE(9, ("The task %p dependency is %d, appending it on pending list",
|
||||
(void *)task_status, func->num_dependencies));
|
||||
OPAL_THREAD_LOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
opal_list_append(&cm->pending_tasks, (opal_list_item_t *)task_status);
|
||||
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
}
|
||||
|
||||
ML_VERBOSE(9, ("Collective was launched !"));
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ int mca_coll_ml_generic_collectives_launcher(
|
||||
mca_coll_ml_collective_operation_progress_t *op_prog,
|
||||
mca_coll_ml_task_setup_fn_t task_setup)
|
||||
{
|
||||
int fn_index;
|
||||
int rc, ret;
|
||||
mca_coll_ml_collective_operation_description_t *op_desc =
|
||||
op_prog->coll_schedule;
|
||||
mca_coll_ml_compound_functions_t *func = NULL;
|
||||
mca_coll_ml_task_status_t *task_status = NULL;
|
||||
mca_coll_ml_component_t *cm = &mca_coll_ml_component;
|
||||
|
||||
ML_VERBOSE(9, ("Calling mca_coll_ml_generic_collectives_launcher"));
|
||||
|
||||
/* Init all tasks, before we start them */
|
||||
for (fn_index = 0; fn_index < op_desc->n_fns; fn_index++) {
|
||||
func = &op_desc->component_functions[fn_index];
|
||||
task_status = &op_prog->dag_description.status_array[fn_index];
|
||||
|
||||
ML_VERBOSE(9, ("Processing function index %d", fn_index));
|
||||
|
||||
assert(NULL != func);
|
||||
|
||||
/* Init task status */
|
||||
task_status->n_dep_satisfied = 0; /* start from zero */
|
||||
/* task_status->my_index_in_coll_schedule = fn_index;
|
||||
pasha: the value is set during init */
|
||||
task_status->bcol_fn = func->bcol_function;
|
||||
/* Pasha: disabling support for custom complition functions
|
||||
task_status->task_comp_fn = func->task_comp_fn;
|
||||
*/
|
||||
|
||||
/* setup run time parametres */
|
||||
/* Pasha: do we need the if proctection ? */
|
||||
if (OPAL_LIKELY(NULL != task_setup)) {
|
||||
task_setup(task_status, fn_index, func);
|
||||
}
|
||||
|
||||
/* the pointer to operation progress supposed to be set during
|
||||
construction time. Just want to make sure that it is ok */
|
||||
assert(task_status->ml_coll_operation == op_prog);
|
||||
/* Task status is done */
|
||||
|
||||
/* launch the task and put it on corresponding list (if required) */
|
||||
|
||||
/* We assume that all pointer to functions are defined and it
|
||||
is not reason to check for null */
|
||||
assert(NULL != func->bcol_function->coll_fn);
|
||||
}
|
||||
|
||||
/* try to start startable */
|
||||
for (fn_index = 0; fn_index < op_desc->n_fns; fn_index++) {
|
||||
func = &op_desc->component_functions[fn_index];
|
||||
task_status = &op_prog->dag_description.status_array[fn_index];
|
||||
/* fire the collective imidiate if it has no dependencies */
|
||||
if (0 == task_status->rt_num_dependencies) {
|
||||
rc = func->bcol_function->coll_fn(&op_prog->variable_fn_params,
|
||||
/* Pasha: Need to update the prototype of the func,
|
||||
right now it is ugly hack for compilation */
|
||||
(struct coll_ml_function_t *)&func->constant_group_data);
|
||||
switch(rc) {
|
||||
case BCOL_FN_NOT_STARTED:
|
||||
/* put it on pending list */
|
||||
ML_VERBOSE(9, ("Call to bcol collecitive return BCOL_FN_NOT_STARTED, putting the task on pending list"));
|
||||
OPAL_THREAD_LOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
opal_list_append(&cm->pending_tasks, (opal_list_item_t *)task_status);
|
||||
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
break;
|
||||
case BCOL_FN_STARTED:
|
||||
/* put it on started list */
|
||||
ML_VERBOSE(9, ("Call to bcol collecitive return BCOL_FN_STARTED, puting the task on active list"));
|
||||
OPAL_THREAD_LOCK(&(mca_coll_ml_component.active_tasks_mutex));
|
||||
opal_list_append(&cm->active_tasks, (opal_list_item_t *)task_status);
|
||||
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.active_tasks_mutex));
|
||||
break;
|
||||
case BCOL_FN_COMPLETE:
|
||||
/* the tast is done ! lets start relevant dependencies */
|
||||
ML_VERBOSE(9, ("Call to bcol collecitive return BCOL_FN_COMPLETE"));
|
||||
/* the task does not belong to any list, yes. So passing NULL */
|
||||
ret = mca_coll_ml_task_completion_processing(&task_status, NULL);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
ML_VERBOSE(9, ("Failed to mca_coll_ml_task_completion_processing"));
|
||||
return ret;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
ML_ERROR(("Unknow exit status %d", rc));
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
} else {
|
||||
/* the task is depend on other, lets put it on pending list */
|
||||
ML_VERBOSE(9, ("The task %p dependency is %d, putting it on pending list",
|
||||
(void *)task_status, func->num_dependencies));
|
||||
OPAL_THREAD_LOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
opal_list_append(&cm->pending_tasks, (opal_list_item_t *)task_status);
|
||||
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
}
|
||||
}
|
||||
ML_VERBOSE(9, ("Collective was launched !"));
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
#define ML_SET_VARIABLE_PARAMS_BCAST(op, ml, cnt, datatype, b_desc, \
|
||||
s_offset, r_offset, frag_len, buf) \
|
||||
do { \
|
||||
@ -1443,99 +1006,6 @@ do { \
|
||||
op->fragment_data.current_coll_op = -1; \
|
||||
} while (0)
|
||||
|
||||
static inline __opal_attribute_always_inline__ mca_coll_ml_collective_operation_progress_t *
|
||||
mca_coll_ml_alloc_op_prog_single_frag_dag(
|
||||
mca_coll_ml_module_t *ml_module,
|
||||
mca_coll_ml_collective_operation_description_t *coll_schedule,
|
||||
void *src, void *dst, size_t total_bytes,
|
||||
size_t offset_into_user_buffer
|
||||
)
|
||||
{
|
||||
int rc;
|
||||
ompi_free_list_item_t *item;
|
||||
mca_coll_ml_collective_operation_progress_t *coll_op = NULL;
|
||||
ompi_request_t *req;
|
||||
|
||||
/* Blocking call on fragment allocation (Maybe we want to make it non blocking ?) */
|
||||
OMPI_FREE_LIST_WAIT(&(ml_module->coll_ml_collective_descriptors),
|
||||
item,
|
||||
rc);
|
||||
|
||||
coll_op = (mca_coll_ml_collective_operation_progress_t *) item;
|
||||
ML_VERBOSE(10, (">>> Allocating coll op %p", coll_op));
|
||||
assert(NULL != coll_op);
|
||||
assert(coll_op->dag_description.status_array[0].item.opal_list_item_refcount == 0);
|
||||
req = &(coll_op->full_message.super);
|
||||
|
||||
OMPI_REQUEST_INIT(req, false);
|
||||
/* Mark the request ACTIVE. It is critical for MPI_Test()*/
|
||||
req->req_state = OMPI_REQUEST_ACTIVE;
|
||||
req->req_status._cancelled = 0;
|
||||
req->req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
|
||||
MCA_COLL_ML_OP_BASIC_SETUP(coll_op, total_bytes,
|
||||
offset_into_user_buffer, src, dst, coll_schedule);
|
||||
|
||||
/* We do not set sequential, since it is not sequential call */
|
||||
coll_op->dag_description.num_tasks_completed = 0;
|
||||
|
||||
/* Release reference counter have to be zero */
|
||||
assert(0 == coll_op->pending);
|
||||
|
||||
return coll_op;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ mca_coll_ml_collective_operation_progress_t *
|
||||
mca_coll_ml_duplicate_op_prog_single_frag_dag(
|
||||
mca_coll_ml_module_t *ml_module,
|
||||
mca_coll_ml_collective_operation_progress_t *old_op)
|
||||
{
|
||||
mca_coll_ml_collective_operation_progress_t *new_op = NULL;
|
||||
|
||||
new_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module,
|
||||
ml_module->coll_ml_bcast_functions[old_op->fragment_data.current_coll_op],
|
||||
old_op->fragment_data.message_descriptor->dest_user_addr,
|
||||
old_op->fragment_data.message_descriptor->src_user_addr,
|
||||
old_op->fragment_data.message_descriptor->n_bytes_total,
|
||||
old_op->fragment_data.message_descriptor->n_bytes_scheduled);
|
||||
|
||||
new_op->fragment_data.current_coll_op = old_op->fragment_data.current_coll_op;
|
||||
new_op->fragment_data.message_descriptor = old_op->fragment_data.message_descriptor;
|
||||
|
||||
return new_op;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ mca_coll_ml_collective_operation_progress_t *
|
||||
mca_coll_ml_alloc_op_prog_single_frag_seq(
|
||||
mca_coll_ml_module_t *ml_module,
|
||||
mca_coll_ml_collective_operation_description_t *coll_schedule,
|
||||
void *src, void *dst,
|
||||
size_t total_bytes,
|
||||
size_t offset_into_user_buffer
|
||||
)
|
||||
{
|
||||
int rc;
|
||||
ompi_free_list_item_t *item;
|
||||
mca_coll_ml_collective_operation_progress_t *coll_op = NULL;
|
||||
|
||||
/* Blocking call on fragment allocation (Maybe we want to make it non blocking ?) */
|
||||
OMPI_FREE_LIST_WAIT(&(ml_module->coll_ml_collective_descriptors),
|
||||
item,
|
||||
rc);
|
||||
|
||||
coll_op = (mca_coll_ml_collective_operation_progress_t *) item;
|
||||
|
||||
assert(NULL != coll_op);
|
||||
|
||||
MCA_COLL_ML_OP_BASIC_SETUP(coll_op, total_bytes,
|
||||
offset_into_user_buffer, src, dst, coll_schedule);
|
||||
|
||||
/* set sequential data */
|
||||
/* pasha - do we have something to set ? */
|
||||
|
||||
return coll_op;
|
||||
}
|
||||
|
||||
/* This routine re-orders and packs user data. The assumption is that
|
||||
* there is per-process data, the amount of data is the same for all * ranks,
|
||||
* and the user data is contigous.
|
||||
@ -1550,31 +1020,6 @@ int mca_coll_ml_pack_reorder_contiguous_data(
|
||||
int mca_coll_ml_pack_reorder_noncontiguous_data(
|
||||
mca_coll_ml_collective_operation_progress_t *coll_op);
|
||||
|
||||
static inline __opal_attribute_always_inline__
|
||||
void mca_coll_ml_convertor_get_send_frag_size(mca_coll_ml_module_t *ml_module,
|
||||
size_t *frag_size, struct full_message_t *message_descriptor)
|
||||
{
|
||||
size_t ml_fragment_size = ml_module->ml_fragment_size;
|
||||
opal_convertor_t *dummy_convertor = &message_descriptor->dummy_convertor;
|
||||
|
||||
/* The last frag needs special service */
|
||||
if (ml_fragment_size >
|
||||
message_descriptor->send_converter_bytes_packed) {
|
||||
*frag_size = message_descriptor->send_converter_bytes_packed;
|
||||
message_descriptor->send_converter_bytes_packed = 0;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
*frag_size = ml_fragment_size;
|
||||
message_descriptor->dummy_conv_position += ml_fragment_size;
|
||||
|
||||
opal_convertor_generic_simple_position(dummy_convertor, &message_descriptor->dummy_conv_position);
|
||||
*frag_size -= dummy_convertor->partial_length;
|
||||
|
||||
message_descriptor->send_converter_bytes_packed -= (*frag_size);
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
#endif /* HAVE_STDLIB_H */
|
||||
|
||||
#include "coll_ml.h"
|
||||
#include "coll_ml_inlines.h"
|
||||
#include "coll_ml_allocation.h"
|
||||
|
||||
long memory_buffer_index;
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "opal/sys/atomic.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
|
||||
static void mca_coll_ml_barrier_task_setup(
|
||||
mca_coll_ml_task_status_t *task_status,
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "ompi/mca/bcol/bcol.h"
|
||||
|
||||
#include "coll_ml.h"
|
||||
#include "coll_ml_inlines.h"
|
||||
#include "coll_ml_colls.h"
|
||||
#include "coll_ml_allocation.h"
|
||||
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
|
||||
#include "coll_ml.h"
|
||||
#include "coll_ml_inlines.h"
|
||||
|
||||
#include "ompi/mca/common/netpatterns/common_netpatterns.h"
|
||||
#include "coll_ml_mca.h"
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "opal/mca/base/mca_base_param.h"
|
||||
|
||||
#include "coll_ml.h"
|
||||
#include "coll_ml_inlines.h"
|
||||
#include "coll_ml_config.h"
|
||||
#include "coll_ml_lex.h"
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "ompi/mca/bcol/bcol.h"
|
||||
#include "opal/sys/atomic.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_allocation.h"
|
||||
#include "coll_ml_colls.h"
|
||||
#include <unistd.h>
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "ompi/mca/bcol/bcol.h"
|
||||
#include "ompi/mca/coll/base/base.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
#include "ompi/mca/common/commpatterns/common_coll_ops.h"
|
||||
|
||||
#include "ompi/datatype/ompi_datatype.h"
|
||||
|
@ -9,6 +9,7 @@
|
||||
*/
|
||||
#include "ompi_config.h"
|
||||
#include "coll_ml.h"
|
||||
#include "coll_ml_inlines.h"
|
||||
|
||||
|
||||
static inline void mca_coll_ml_fragment_constructor(mca_coll_ml_fragment_t *frag)
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
#include "ompi/include/ompi/constants.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_functions.h"
|
||||
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
#include "ompi/include/ompi/constants.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_allocation.h"
|
||||
|
||||
/* collective managment descriptor initialization - called right after
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
#include "ompi/include/ompi/constants.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_functions.h"
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
#include "ompi/include/ompi/constants.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_functions.h"
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "ompi_config.h"
|
||||
#include "ompi/mca/bcol/bcol.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_hier_algorithms_common_setup.h"
|
||||
|
||||
int mca_coll_ml_schedule_init_scratch(mca_coll_ml_topology_t *topo_info,
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
#include "ompi/include/ompi/constants.h"
|
||||
|
||||
int ml_coll_up_and_down_hier_setup(mca_coll_ml_module_t *ml_module,
|
||||
|
576
ompi/mca/coll/ml/coll_ml_inlines.h
Обычный файл
576
ompi/mca/coll/ml/coll_ml_inlines.h
Обычный файл
@ -0,0 +1,576 @@
|
||||
/*
|
||||
* Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved.
|
||||
* Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
/** @file */
|
||||
|
||||
#ifndef MCA_COLL_ML_INLINES_H
|
||||
#define MCA_COLL_ML_INLINES_H
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
static inline int mca_coll_ml_err(const char* fmt, ...)
|
||||
{
|
||||
va_list list;
|
||||
int ret;
|
||||
|
||||
va_start(list, fmt);
|
||||
ret = vfprintf(stderr, fmt, list);
|
||||
va_end(list);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ int ml_fls(int num)
|
||||
{
|
||||
int i = 1;
|
||||
int j = 0;
|
||||
|
||||
if (0 == num) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
while (i < num) {
|
||||
i *= 2;
|
||||
j++;
|
||||
}
|
||||
|
||||
if (i > num) {
|
||||
j--;
|
||||
}
|
||||
|
||||
return j;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__
|
||||
int mca_coll_ml_buffer_recycling(mca_coll_ml_collective_operation_progress_t *ml_request)
|
||||
{
|
||||
mca_coll_ml_module_t *ml_module = (mca_coll_ml_module_t *)ml_request->coll_module;
|
||||
ml_memory_block_desc_t *ml_memblock = ml_module->payload_block;
|
||||
uint64_t bank_index = ml_request->fragment_data.buffer_desc->bank_index;
|
||||
int rc;
|
||||
|
||||
opal_atomic_add(&ml_memblock->bank_release_counters[bank_index], 1);
|
||||
|
||||
/* Check if the bank is ready for recycling */
|
||||
if (ml_memblock->bank_release_counters[bank_index] ==
|
||||
ml_memblock->num_buffers_per_bank ) {
|
||||
ml_memblock->ready_for_memsync[bank_index] = true;
|
||||
|
||||
ML_VERBOSE(10, ("Sync count %d, bank %d", ml_memblock->memsync_counter, bank_index));
|
||||
assert(ml_memblock->bank_is_busy);
|
||||
if (ml_memblock->memsync_counter == (int)bank_index) {
|
||||
while(ml_memblock->ready_for_memsync[ml_memblock->memsync_counter]) {
|
||||
ML_VERBOSE(10, ("Calling for service barrier: ml_buffer_index - %d %d %d == %d.\n",
|
||||
ml_request->fragment_data.buffer_desc->buffer_index,
|
||||
ml_memblock->memsync_counter,
|
||||
ml_memblock->bank_release_counters[ml_memblock->memsync_counter],
|
||||
ml_memblock->num_buffers_per_bank));
|
||||
/* Setting the ready flag to 0 - unready - done */
|
||||
ml_memblock->ready_for_memsync[ml_memblock->memsync_counter] = false;
|
||||
|
||||
rc = mca_coll_ml_memsync_intra(ml_module, ml_memblock->memsync_counter);
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
ML_ERROR(("Failed to start memory sync !!!"));
|
||||
return rc;
|
||||
}
|
||||
|
||||
opal_atomic_add(&ml_memblock->memsync_counter, 1);
|
||||
if (ml_memblock->memsync_counter == (int)ml_memblock->num_banks) {
|
||||
ml_memblock->memsync_counter = 0;
|
||||
}
|
||||
ML_VERBOSE(10, ("After service barrier."));
|
||||
}
|
||||
} else {
|
||||
ML_VERBOSE(10, ("Out of order %d\n", ml_memblock->memsync_counter));
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ int coll_ml_fragment_completion_processing(
|
||||
mca_coll_ml_collective_operation_progress_t *coll_op)
|
||||
{
|
||||
/* local variables */
|
||||
int ret = OMPI_SUCCESS;
|
||||
size_t bytes_in_this_frag;
|
||||
struct full_message_t *full_msg_desc = coll_op->fragment_data.message_descriptor;
|
||||
bool ready_to_release = true, out_of_resource = false;
|
||||
|
||||
ML_VERBOSE(10, ("Coll_op %p processing completion", coll_op));
|
||||
/* Call unpack/pack function */
|
||||
if (OPAL_LIKELY(NULL != coll_op->process_fn)) {
|
||||
ret = coll_op->process_fn(coll_op);
|
||||
switch(ret) {
|
||||
case OMPI_SUCCESS:
|
||||
ML_VERBOSE(10, ("unpack done"));
|
||||
ready_to_release = true;
|
||||
break;
|
||||
case ORTE_ERR_NO_MATCH_YET:
|
||||
ML_VERBOSE(10, ("unexpected packet"));
|
||||
ready_to_release = false;
|
||||
break;
|
||||
default:
|
||||
ML_ERROR(("Error, unexpected error code %d", ret));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
bytes_in_this_frag = coll_op->fragment_data.fragment_size;
|
||||
|
||||
ML_VERBOSE(10, ("Delivered %d bytes in frag %d total %d",
|
||||
full_msg_desc->n_bytes_delivered,
|
||||
bytes_in_this_frag,
|
||||
full_msg_desc->n_bytes_total));
|
||||
|
||||
/* check for full message completion */
|
||||
if(full_msg_desc->n_bytes_delivered + bytes_in_this_frag ==
|
||||
full_msg_desc->n_bytes_total) {
|
||||
/* message complete - don't update number of bytes delivered, just
|
||||
* mark the message complete
|
||||
*/
|
||||
full_msg_desc->n_bytes_delivered += bytes_in_this_frag;
|
||||
|
||||
/* decrement the number of fragments */
|
||||
full_msg_desc->n_active--;
|
||||
|
||||
ML_VERBOSE(10, ("Signaling completion"));
|
||||
|
||||
/* here we need to be sure that we point to the first fragment only */
|
||||
ompi_request_complete(&(coll_op->fragment_data.message_descriptor->super), true);
|
||||
coll_op->fragment_data.message_descriptor->super.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
} else {
|
||||
assert(NULL != coll_op->fragment_data.buffer_desc);
|
||||
/* update the number of bytes delivered */
|
||||
full_msg_desc->n_bytes_delivered += bytes_in_this_frag;
|
||||
/* decrement the number of fragments */
|
||||
full_msg_desc->n_active--;
|
||||
/* here we need to start the next fragment */
|
||||
ML_VERBOSE(10, ("Launch frags for %p", coll_op));
|
||||
if (full_msg_desc->n_bytes_scheduled < full_msg_desc->n_bytes_total) {
|
||||
ret = coll_op->fragment_data.message_descriptor->fragment_launcher(coll_op);
|
||||
if (OPAL_UNLIKELY(OMPI_ERR_TEMP_OUT_OF_RESOURCE == ret)) {
|
||||
out_of_resource = true;
|
||||
} else if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
ML_VERBOSE(10, ("Failed to launch fragment"));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ready_to_release) {
|
||||
/* Check if we have to recycle memory.
|
||||
* Note: It is safe to recycle ML buffers since the ML buffer data
|
||||
* already was unpacked to user buffer
|
||||
*/
|
||||
if (NULL != coll_op->fragment_data.buffer_desc) {
|
||||
ret = mca_coll_ml_buffer_recycling(coll_op);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
/* if this is not fragment 0, return fragment to the free list.
|
||||
* fragment 0 will be returned in mca_ml_request_free() which
|
||||
* is called from the MPI wait() and test() routines.
|
||||
* We can recover the pointer to the fragement descriptor from
|
||||
* the MPI level request object, wich is the first element
|
||||
* in the fragment descriptor.
|
||||
*/
|
||||
ML_VERBOSE(10, ("Master ? %p %d", coll_op, coll_op->fragment_data.offset_into_user_buffer));
|
||||
if (0 != coll_op->fragment_data.offset_into_user_buffer &&
|
||||
!out_of_resource) {
|
||||
/* non-zero offset ==> this is not fragment 0 */
|
||||
CHECK_AND_RECYCLE(coll_op);
|
||||
}
|
||||
}
|
||||
|
||||
/* return */
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* task completion */
|
||||
static inline __opal_attribute_always_inline__ int coll_ml_task_dependency_processing(
|
||||
mca_coll_ml_task_status_t *task)
|
||||
{
|
||||
/* update dependencies */
|
||||
mca_coll_ml_collective_operation_progress_t *my_schedule_instance =
|
||||
task->ml_coll_operation;
|
||||
int n_dependent_tasks = task->rt_num_dependent_tasks;
|
||||
int dep_task;
|
||||
|
||||
for (dep_task = 0; dep_task < n_dependent_tasks; dep_task++)
|
||||
{
|
||||
int task_index;
|
||||
task_index = task->rt_dependent_task_indecies[dep_task];
|
||||
my_schedule_instance->dag_description.status_array[task_index].n_dep_satisfied++;
|
||||
}
|
||||
|
||||
/* return */
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* collective task completion processing -
|
||||
* "task" may be removed from list in this routine.
|
||||
* Thread safety is assumed to be handled outside this routine.
|
||||
*/
|
||||
static inline __opal_attribute_always_inline__ int mca_coll_ml_task_completion_processing(
|
||||
mca_coll_ml_task_status_t **task_status_g, opal_list_t *list)
|
||||
{
|
||||
/* local variables */
|
||||
int ret = OMPI_SUCCESS;
|
||||
mca_coll_ml_task_status_t *task_status = *task_status_g;
|
||||
|
||||
mca_coll_ml_collective_operation_progress_t *coll_op =
|
||||
task_status->ml_coll_operation;
|
||||
|
||||
/* Pasha: Since all our collectives so far use the root
|
||||
flag, I replacing the call for custom call back function
|
||||
with setting root_flag.
|
||||
If we will see that we need some custom functionality,
|
||||
we will enable it later.
|
||||
*/
|
||||
|
||||
task_status->ml_coll_operation->variable_fn_params.root_flag = true;
|
||||
|
||||
#if 0
|
||||
/* process task completion function,
|
||||
if any was defined */
|
||||
if (OPAL_LIKELY(NULL != task_status->task_comp_fn)) {
|
||||
ret = task_status->task_comp_fn(task_status);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* update dependencies */
|
||||
ret = coll_ml_task_dependency_processing(task_status);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
ML_VERBOSE(3,("Failed to coll_ml_task_dependency_processing"));
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* process task completion function,
|
||||
if any was defined */
|
||||
if (OPAL_LIKELY(NULL != task_status->task_comp_fn)) {
|
||||
ret = task_status->task_comp_fn(task_status);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
ML_VERBOSE(3,("Failed to task_comp_fn"));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/* remove the descriptor from the incomplete list
|
||||
(Pasha: if the list was provided) */
|
||||
/* No need to put this an any new list - it is associcated
|
||||
* with the mca_coll_ml_collective_operation_progress_t
|
||||
* descriptor already
|
||||
*/
|
||||
|
||||
if (NULL != list) {
|
||||
(*task_status_g) = (mca_coll_ml_task_status_t *)
|
||||
opal_list_remove_item(list, (opal_list_item_t *)(task_status));
|
||||
}
|
||||
|
||||
/* update completion counter */
|
||||
coll_op->dag_description.num_tasks_completed++;
|
||||
|
||||
if(coll_op->dag_description.num_tasks_completed ==
|
||||
coll_op->coll_schedule->n_fns)
|
||||
{
|
||||
/* the actual fragment descriptor is not on any list, as
|
||||
* we can get at it from the task descriptors
|
||||
*/
|
||||
ret = coll_ml_fragment_completion_processing(coll_op);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
ML_VERBOSE(3,("Failed to coll_ml_fragment_completion_processing"));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/* return */
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ int mca_coll_ml_generic_collectives_append_to_queue(
|
||||
mca_coll_ml_collective_operation_progress_t *op_prog,
|
||||
mca_coll_ml_task_setup_fn_t task_setup)
|
||||
{
|
||||
int fn_index;
|
||||
mca_coll_ml_collective_operation_description_t *op_desc =
|
||||
op_prog->coll_schedule;
|
||||
mca_coll_ml_compound_functions_t *func = NULL;
|
||||
mca_coll_ml_task_status_t *task_status = NULL;
|
||||
mca_coll_ml_component_t *cm = &mca_coll_ml_component;
|
||||
|
||||
ML_VERBOSE(9, ("Calling mca_coll_ml_generic_collectives_launcher"));
|
||||
|
||||
/* Init all tasks, before we start them */
|
||||
for (fn_index = 0; fn_index < op_desc->n_fns; fn_index++) {
|
||||
func = &op_desc->component_functions[fn_index];
|
||||
task_status = &op_prog->dag_description.status_array[fn_index];
|
||||
|
||||
ML_VERBOSE(9, ("Processing function index %d", fn_index));
|
||||
|
||||
assert(NULL != func);
|
||||
|
||||
/* Init task status */
|
||||
task_status->n_dep_satisfied = 0; /* start from zero */
|
||||
task_status->bcol_fn = func->bcol_function;
|
||||
/* setup run time parametres */
|
||||
/* Pasha: do we need the if proctection ? */
|
||||
if (OPAL_LIKELY(NULL != task_setup)) {
|
||||
task_setup(task_status, fn_index, func);
|
||||
}
|
||||
|
||||
/* the pointer to operation progress supposed to be set during
|
||||
construction time. Just want to make sure that it is ok */
|
||||
assert(task_status->ml_coll_operation == op_prog);
|
||||
|
||||
/* We assume that all pointer to functions are defined and it
|
||||
is not reson to check for null */
|
||||
assert(NULL != func->bcol_function->coll_fn);
|
||||
|
||||
/* In order to preserve ordering on all ranks we have to add it to tail */
|
||||
/* TBD: Need to review the way we launch fragments */
|
||||
ML_VERBOSE(9, ("The task %p dependency is %d, appending it on pending list",
|
||||
(void *)task_status, func->num_dependencies));
|
||||
OPAL_THREAD_LOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
opal_list_append(&cm->pending_tasks, (opal_list_item_t *)task_status);
|
||||
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
}
|
||||
|
||||
ML_VERBOSE(9, ("Collective was launched !"));
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ int mca_coll_ml_generic_collectives_launcher(
|
||||
mca_coll_ml_collective_operation_progress_t *op_prog,
|
||||
mca_coll_ml_task_setup_fn_t task_setup)
|
||||
{
|
||||
int fn_index;
|
||||
int rc, ret;
|
||||
mca_coll_ml_collective_operation_description_t *op_desc =
|
||||
op_prog->coll_schedule;
|
||||
mca_coll_ml_compound_functions_t *func = NULL;
|
||||
mca_coll_ml_task_status_t *task_status = NULL;
|
||||
mca_coll_ml_component_t *cm = &mca_coll_ml_component;
|
||||
|
||||
ML_VERBOSE(9, ("Calling mca_coll_ml_generic_collectives_launcher"));
|
||||
|
||||
/* Init all tasks, before we start them */
|
||||
for (fn_index = 0; fn_index < op_desc->n_fns; fn_index++) {
|
||||
func = &op_desc->component_functions[fn_index];
|
||||
task_status = &op_prog->dag_description.status_array[fn_index];
|
||||
|
||||
ML_VERBOSE(9, ("Processing function index %d", fn_index));
|
||||
|
||||
assert(NULL != func);
|
||||
|
||||
/* Init task status */
|
||||
task_status->n_dep_satisfied = 0; /* start from zero */
|
||||
/* task_status->my_index_in_coll_schedule = fn_index;
|
||||
pasha: the value is set during init */
|
||||
task_status->bcol_fn = func->bcol_function;
|
||||
/* Pasha: disabling support for custom complition functions
|
||||
task_status->task_comp_fn = func->task_comp_fn;
|
||||
*/
|
||||
|
||||
/* setup run time parametres */
|
||||
/* Pasha: do we need the if proctection ? */
|
||||
if (OPAL_LIKELY(NULL != task_setup)) {
|
||||
task_setup(task_status, fn_index, func);
|
||||
}
|
||||
|
||||
/* the pointer to operation progress supposed to be set during
|
||||
construction time. Just want to make sure that it is ok */
|
||||
assert(task_status->ml_coll_operation == op_prog);
|
||||
/* Task status is done */
|
||||
|
||||
/* launch the task and put it on corresponding list (if required) */
|
||||
|
||||
/* We assume that all pointer to functions are defined and it
|
||||
is not reason to check for null */
|
||||
assert(NULL != func->bcol_function->coll_fn);
|
||||
}
|
||||
|
||||
/* try to start startable */
|
||||
for (fn_index = 0; fn_index < op_desc->n_fns; fn_index++) {
|
||||
func = &op_desc->component_functions[fn_index];
|
||||
task_status = &op_prog->dag_description.status_array[fn_index];
|
||||
/* fire the collective imidiate if it has no dependencies */
|
||||
if (0 == task_status->rt_num_dependencies) {
|
||||
rc = func->bcol_function->coll_fn(&op_prog->variable_fn_params,
|
||||
/* Pasha: Need to update the prototype of the func,
|
||||
right now it is ugly hack for compilation */
|
||||
(struct coll_ml_function_t *)&func->constant_group_data);
|
||||
switch(rc) {
|
||||
case BCOL_FN_NOT_STARTED:
|
||||
/* put it on pending list */
|
||||
ML_VERBOSE(9, ("Call to bcol collecitive return BCOL_FN_NOT_STARTED, putting the task on pending list"));
|
||||
OPAL_THREAD_LOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
opal_list_append(&cm->pending_tasks, (opal_list_item_t *)task_status);
|
||||
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
break;
|
||||
case BCOL_FN_STARTED:
|
||||
/* put it on started list */
|
||||
ML_VERBOSE(9, ("Call to bcol collecitive return BCOL_FN_STARTED, puting the task on active list"));
|
||||
OPAL_THREAD_LOCK(&(mca_coll_ml_component.active_tasks_mutex));
|
||||
opal_list_append(&cm->active_tasks, (opal_list_item_t *)task_status);
|
||||
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.active_tasks_mutex));
|
||||
break;
|
||||
case BCOL_FN_COMPLETE:
|
||||
/* the tast is done ! lets start relevant dependencies */
|
||||
ML_VERBOSE(9, ("Call to bcol collecitive return BCOL_FN_COMPLETE"));
|
||||
/* the task does not belong to any list, yes. So passing NULL */
|
||||
ret = mca_coll_ml_task_completion_processing(&task_status, NULL);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
ML_VERBOSE(9, ("Failed to mca_coll_ml_task_completion_processing"));
|
||||
return ret;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
ML_ERROR(("Unknow exit status %d", rc));
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
} else {
|
||||
/* the task is depend on other, lets put it on pending list */
|
||||
ML_VERBOSE(9, ("The task %p dependency is %d, putting it on pending list",
|
||||
(void *)task_status, func->num_dependencies));
|
||||
OPAL_THREAD_LOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
opal_list_append(&cm->pending_tasks, (opal_list_item_t *)task_status);
|
||||
OPAL_THREAD_UNLOCK(&(mca_coll_ml_component.pending_tasks_mutex));
|
||||
}
|
||||
}
|
||||
ML_VERBOSE(9, ("Collective was launched !"));
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ mca_coll_ml_collective_operation_progress_t *
|
||||
mca_coll_ml_alloc_op_prog_single_frag_dag(
|
||||
mca_coll_ml_module_t *ml_module,
|
||||
mca_coll_ml_collective_operation_description_t *coll_schedule,
|
||||
void *src, void *dst, size_t total_bytes,
|
||||
size_t offset_into_user_buffer
|
||||
)
|
||||
{
|
||||
int rc;
|
||||
ompi_free_list_item_t *item;
|
||||
mca_coll_ml_collective_operation_progress_t *coll_op = NULL;
|
||||
ompi_request_t *req;
|
||||
|
||||
/* Blocking call on fragment allocation (Maybe we want to make it non blocking ?) */
|
||||
OMPI_FREE_LIST_WAIT(&(ml_module->coll_ml_collective_descriptors),
|
||||
item,
|
||||
rc);
|
||||
|
||||
coll_op = (mca_coll_ml_collective_operation_progress_t *) item;
|
||||
ML_VERBOSE(10, (">>> Allocating coll op %p", coll_op));
|
||||
assert(NULL != coll_op);
|
||||
assert(coll_op->dag_description.status_array[0].item.opal_list_item_refcount == 0);
|
||||
req = &(coll_op->full_message.super);
|
||||
|
||||
OMPI_REQUEST_INIT(req, false);
|
||||
/* Mark the request ACTIVE. It is critical for MPI_Test()*/
|
||||
req->req_state = OMPI_REQUEST_ACTIVE;
|
||||
req->req_status._cancelled = 0;
|
||||
req->req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
|
||||
MCA_COLL_ML_OP_BASIC_SETUP(coll_op, total_bytes,
|
||||
offset_into_user_buffer, src, dst, coll_schedule);
|
||||
|
||||
/* We do not set sequential, since it is not sequential call */
|
||||
coll_op->dag_description.num_tasks_completed = 0;
|
||||
|
||||
/* Release reference counter have to be zero */
|
||||
assert(0 == coll_op->pending);
|
||||
|
||||
return coll_op;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ mca_coll_ml_collective_operation_progress_t *
|
||||
mca_coll_ml_duplicate_op_prog_single_frag_dag(
|
||||
mca_coll_ml_module_t *ml_module,
|
||||
mca_coll_ml_collective_operation_progress_t *old_op)
|
||||
{
|
||||
mca_coll_ml_collective_operation_progress_t *new_op = NULL;
|
||||
|
||||
new_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module,
|
||||
ml_module->coll_ml_bcast_functions[old_op->fragment_data.current_coll_op],
|
||||
old_op->fragment_data.message_descriptor->dest_user_addr,
|
||||
old_op->fragment_data.message_descriptor->src_user_addr,
|
||||
old_op->fragment_data.message_descriptor->n_bytes_total,
|
||||
old_op->fragment_data.message_descriptor->n_bytes_scheduled);
|
||||
|
||||
new_op->fragment_data.current_coll_op = old_op->fragment_data.current_coll_op;
|
||||
new_op->fragment_data.message_descriptor = old_op->fragment_data.message_descriptor;
|
||||
|
||||
return new_op;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__ mca_coll_ml_collective_operation_progress_t *
|
||||
mca_coll_ml_alloc_op_prog_single_frag_seq(
|
||||
mca_coll_ml_module_t *ml_module,
|
||||
mca_coll_ml_collective_operation_description_t *coll_schedule,
|
||||
void *src, void *dst,
|
||||
size_t total_bytes,
|
||||
size_t offset_into_user_buffer
|
||||
)
|
||||
{
|
||||
int rc;
|
||||
ompi_free_list_item_t *item;
|
||||
mca_coll_ml_collective_operation_progress_t *coll_op = NULL;
|
||||
|
||||
/* Blocking call on fragment allocation (Maybe we want to make it non blocking ?) */
|
||||
OMPI_FREE_LIST_WAIT(&(ml_module->coll_ml_collective_descriptors),
|
||||
item,
|
||||
rc);
|
||||
|
||||
coll_op = (mca_coll_ml_collective_operation_progress_t *) item;
|
||||
|
||||
assert(NULL != coll_op);
|
||||
|
||||
MCA_COLL_ML_OP_BASIC_SETUP(coll_op, total_bytes,
|
||||
offset_into_user_buffer, src, dst, coll_schedule);
|
||||
|
||||
/* set sequential data */
|
||||
/* pasha - do we have something to set ? */
|
||||
|
||||
return coll_op;
|
||||
}
|
||||
|
||||
static inline __opal_attribute_always_inline__
|
||||
void mca_coll_ml_convertor_get_send_frag_size(mca_coll_ml_module_t *ml_module,
|
||||
size_t *frag_size, struct full_message_t *message_descriptor)
|
||||
{
|
||||
size_t ml_fragment_size = ml_module->ml_fragment_size;
|
||||
opal_convertor_t *dummy_convertor = &message_descriptor->dummy_convertor;
|
||||
|
||||
/* The last frag needs special service */
|
||||
if (ml_fragment_size >
|
||||
message_descriptor->send_converter_bytes_packed) {
|
||||
*frag_size = message_descriptor->send_converter_bytes_packed;
|
||||
message_descriptor->send_converter_bytes_packed = 0;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
*frag_size = ml_fragment_size;
|
||||
message_descriptor->dummy_conv_position += ml_fragment_size;
|
||||
|
||||
opal_convertor_generic_simple_position(dummy_convertor, &message_descriptor->dummy_conv_position);
|
||||
*frag_size -= dummy_convertor->partial_length;
|
||||
|
||||
message_descriptor->send_converter_bytes_packed -= (*frag_size);
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
@ -12,6 +12,7 @@
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "opal/threads/mutex.h"
|
||||
#include "coll_ml.h"
|
||||
#include "coll_ml_inlines.h"
|
||||
#include "coll_ml_mca.h"
|
||||
#include "coll_ml_lmngr.h"
|
||||
#ifndef HAVE_POSIX_MEMALIGN
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "orte/util/show_help.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "coll_ml.h"
|
||||
#include "coll_ml_inlines.h"
|
||||
#include "coll_ml_mca.h"
|
||||
#include "coll_ml_lmngr.h"
|
||||
#include "ompi/mca/common/netpatterns/common_netpatterns.h"
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "ompi/mca/coll/coll.h"
|
||||
#include "opal/sys/atomic.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_inlines.h"
|
||||
#include "ompi/mca/coll/ml/coll_ml_allocation.h"
|
||||
|
||||
static int mca_coll_ml_memsync_recycle_memory(mca_coll_ml_collective_operation_progress_t *coll_op)
|
||||
|
@ -44,6 +44,7 @@
|
||||
#include "opal/util/arch.h"
|
||||
|
||||
#include "coll_ml.h"
|
||||
#include "coll_ml_inlines.h"
|
||||
#include "coll_ml_select.h"
|
||||
#include "coll_ml_custom_utils.h"
|
||||
#include "coll_ml_allocation.h"
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include "ompi/op/op.h"
|
||||
#include "ompi/mca/bcol/bcol.h"
|
||||
#include "coll_ml.h"
|
||||
#include "coll_ml_inlines.h"
|
||||
|
||||
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user