1
1
openmpi/ompi/mca/coll/ml/coll_ml_bcast.c

840 строки
36 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved.
* Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file */
#include "ompi_config.h"
#include <unistd.h>
#include <sys/uio.h>
#include "opal/threads/mutex.h"
#include "opal/sys/atomic.h"
#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/bcol/bcol.h"
#include "coll_ml.h"
#include "coll_ml_inlines.h"
#include "coll_ml_colls.h"
#include "coll_ml_allocation.h"
#define ML_BUFFER_ALLOC_WAIT(ml, buffer) \
do { \
buffer = mca_coll_ml_alloc_buffer(ml); \
while (NULL == buffer) { \
opal_progress(); \
buffer = mca_coll_ml_alloc_buffer(ml); \
} \
} while (0)
#define COLL_ML_SETUP_ORDERING_INFO(op, last, prev) \
do { \
/* Don't change order of commands !!!! */ \
(op)->prev_frag = prev; \
(op)->fragment_data.message_descriptor->last_started_frag = last; \
/* op->next_to_process_frag = NULL; */ \
} while (0)
#define ALLOCATE_AND_PACK_CONTIG_BCAST_FRAG(ml_module, op, coll_index, root, \
total_len, frag_len, buf, ml_buff_desc) \
do { \
op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module, \
ml_module->coll_ml_bcast_functions[coll_index], \
buf, buf, \
total_len, \
0 /* offset for first pack */); \
if (OPAL_LIKELY(frag_len > 0)) { \
if (ompi_comm_rank(ml_module->comm) == root) { \
/* single frag, pack the data */ \
memcpy((void *)(uintptr_t)(ml_buff_desc)->data_addr, \
buf, frag_len); \
/* No unpack for root */ \
op->process_fn = NULL; \
} else { \
op->process_fn = mca_coll_ml_bcast_small_unpack_data; \
} \
} \
op->full_message.n_bytes_scheduled = frag_len; \
} while (0)
#define SMALL_BCAST 0
#define LARGE_BCAST (SMALL_BCAST + 1)
/* bcast data unpack */
static int mca_coll_ml_bcast_converter_unpack_data(mca_coll_ml_collective_operation_progress_t *coll_op)
{
struct iovec iov;
uint32_t iov_count = 1;
size_t max_data = 0;
mca_coll_ml_collective_operation_progress_t *next_op;
mca_coll_ml_module_t *ml_module =
(mca_coll_ml_module_t *) coll_op->coll_module;
size_t max_index =
ml_module->payload_block->num_banks * ml_module->payload_block->num_buffers_per_bank;
bool is_first = true;
int ret;
/* Check if the fragment delivered in order */
if (coll_op->fragment_data.buffer_desc->buffer_index !=
coll_op->fragment_data.message_descriptor->next_expected_index) {
mca_coll_ml_collective_operation_progress_t *prev_coll_op = coll_op->prev_frag;
assert(NULL == prev_coll_op->next_to_process_frag);
/* make sure that previous process will have pointer to the out
of order process */
prev_coll_op->next_to_process_frag = coll_op;
assert(!(coll_op->pending & REQ_OUT_OF_ORDER));
coll_op->pending |= REQ_OUT_OF_ORDER;
/* we will unpack it later */
ML_VERBOSE(10, ("Get %d expecting %d previous %d",
coll_op->fragment_data.buffer_desc->buffer_index,
coll_op->fragment_data.message_descriptor->next_expected_index,
prev_coll_op->fragment_data.buffer_desc->buffer_index));
return ORTE_ERR_NO_MATCH_YET;
}
do {
iov.iov_len = coll_op->fragment_data.fragment_size;
iov.iov_base = (void *)((uintptr_t) coll_op->fragment_data.buffer_desc->data_addr);
ML_VERBOSE(10, ("Data unpack with convertern index %d",
coll_op->fragment_data.buffer_desc->buffer_index));
opal_convertor_unpack(&coll_op->fragment_data.message_descriptor->recv_convertor,
&iov, &iov_count, &max_data);
/* update next index */
++coll_op->fragment_data.message_descriptor->next_expected_index;
if (coll_op->fragment_data.message_descriptor->next_expected_index >= max_index) {
coll_op->fragment_data.message_descriptor->next_expected_index = 0;
}
/* Return to queue if the packet is done,
the exeption is first packet, we release it later.
*/
next_op = coll_op->next_to_process_frag;
coll_op->next_to_process_frag = NULL;
if ((!is_first) &&
(0 != coll_op->fragment_data.offset_into_user_buffer)) {
assert(coll_op->pending & REQ_OUT_OF_ORDER);
coll_op->pending ^= REQ_OUT_OF_ORDER;
/* Pasha: On one hand - I'm not sure that conceptually it is right place to call buffer recycling. Potentially,
coll_ml_fragment_completion_processing() sounds like right place for out of order unpack/sync handling.
* On the other hand - non contiguous data is not supper common and we would like to minimize effect on critical pass
* for non contiguous data types. */
ret = mca_coll_ml_buffer_recycling(coll_op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERROR;
}
CHECK_AND_RECYCLE(coll_op);
}
coll_op = next_op;
is_first = false;
} while (NULL != coll_op);
return OMPI_SUCCESS;
}
static int mca_coll_ml_bcast_small_unpack_data(mca_coll_ml_collective_operation_progress_t *coll_op)
{
void * dest = (void *)((uintptr_t) coll_op->full_message.dest_user_addr +
(uintptr_t) coll_op->full_message.n_bytes_delivered);
void * src = (void *)((uintptr_t) coll_op->fragment_data.buffer_desc->data_addr);
memcpy(dest, src, coll_op->fragment_data.fragment_size);
return OMPI_SUCCESS;
}
static int mca_coll_ml_bcast_large_unpack_data(mca_coll_ml_collective_operation_progress_t *coll_op)
{
void * dest = (void *)((uintptr_t) coll_op->fragment_data.message_descriptor->dest_user_addr +
(uintptr_t) coll_op->fragment_data.offset_into_user_buffer);
void * src = (void *)((uintptr_t) coll_op->fragment_data.buffer_desc->data_addr);
memcpy(dest, src, coll_op->fragment_data.fragment_size);
return OMPI_SUCCESS;
}
static int mca_coll_ml_bcast_frag_converter_progress(mca_coll_ml_collective_operation_progress_t *coll_op)
{
/* local variables */
int ret, frag_len;
size_t max_data = 0;
ml_payload_buffer_desc_t *src_buffer_desc = NULL;
mca_coll_ml_collective_operation_progress_t *new_op = NULL;
mca_coll_ml_task_setup_fn_t task_setup = NULL;
mca_coll_ml_module_t *ml_module = OP_ML_MODULE(coll_op);
/* Keep the pipeline filled with fragments */
while (coll_op->fragment_data.message_descriptor->n_active <
mca_coll_ml_component.pipeline_depth) {
/* If an active fragment happens to have completed the collective during
* a hop into the progress engine, then don't launch a new fragment,
* instead break and return.
*/
if (coll_op->fragment_data.message_descriptor->n_bytes_scheduled
== coll_op->fragment_data.message_descriptor->n_bytes_total) {
break;
}
/* Get an ml buffer */
src_buffer_desc = mca_coll_ml_alloc_buffer(ml_module);
if (NULL == src_buffer_desc) {
/* If there exist outstanding fragments, then break out
* and let an active fragment deal with this later,
* there are no buffers available.
*/
if (0 < coll_op->fragment_data.message_descriptor->n_active) {
return OMPI_SUCCESS;
} else {
/* It is useless to call progress from here, since
* ml progress can't be executed as result ml memsync
* call will not be completed and no memory will be
* recycled. So we put the element on the list, and we will
* progress it later when memsync will recycle some memory*/
if (NULL == src_buffer_desc) {
/* The fragment is already on list and
* the we still have no ml resources
* Return busy */
if (coll_op->pending & REQ_OUT_OF_MEMORY) {
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
coll_op->pending |= REQ_OUT_OF_MEMORY;
opal_list_append(&ml_module->waiting_for_memory_list,
(opal_list_item_t *)coll_op);
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
}
}
/* Get a new collective descriptor and initialize it */
new_op = mca_coll_ml_duplicate_op_prog_single_frag_dag
(ml_module, coll_op);
/* We need this address for pointer arithmetic in memcpy */
frag_len = ML_GET_FRAG_SIZE(coll_op, BCOL_BCAST);
/* Decide based on global flag, not variable one */
if (coll_op->fragment_data.message_descriptor->root) {
struct iovec iov;
uint32_t iov_count = 1;
/* OBJ_RETAIN(new_op->variable_fn_params.dtype); */
iov.iov_base = (IOVBASE_TYPE*) src_buffer_desc->data_addr;
iov.iov_len = ml_module->ml_fragment_size;
assert(0 != iov.iov_len);
opal_convertor_pack(&new_op->fragment_data.message_descriptor->send_convertor,
&iov, &iov_count, &max_data);
new_op->process_fn = NULL;
new_op->variable_fn_params.root_flag = true;
new_op->variable_fn_params.root_route = NULL;
task_setup = OP_ML_MODULE(new_op)->
coll_ml_bcast_functions[new_op->fragment_data.current_coll_op]->
task_setup_fn[COLL_ML_ROOT_TASK_FN];
} else {
new_op->process_fn = mca_coll_ml_bcast_converter_unpack_data;
new_op->variable_fn_params.root_flag = false;
new_op->variable_fn_params.root_route = coll_op->variable_fn_params.root_route;
task_setup = OP_ML_MODULE(new_op)->
coll_ml_bcast_functions[new_op->fragment_data.current_coll_op]->
task_setup_fn[COLL_ML_GENERAL_TASK_FN];
mca_coll_ml_convertor_get_send_frag_size(
ml_module, &max_data,
new_op->fragment_data.message_descriptor);
}
new_op->fragment_data.message_descriptor->n_bytes_scheduled += max_data;
new_op->fragment_data.fragment_size = max_data;
new_op->fragment_data.buffer_desc = src_buffer_desc;
/* Setup fragment specific data */
++(new_op->fragment_data.message_descriptor->n_active);
COLL_ML_SETUP_ORDERING_INFO(new_op, new_op,
new_op->fragment_data.message_descriptor->last_started_frag);
ML_VERBOSE(10, ("Start more, My index %d my prev %d",
new_op->fragment_data.buffer_desc->buffer_index,
new_op->prev_frag->fragment_data.buffer_desc->buffer_index));
ML_SET_VARIABLE_PARAMS_BCAST(
new_op,
OP_ML_MODULE(new_op),
frag_len,
MPI_BYTE,
src_buffer_desc,
0,
0,
frag_len,
src_buffer_desc->data_addr);
/* TBD: remove buffer_size */
new_op->variable_fn_params.buffer_size = coll_op->variable_fn_params.buffer_size;
new_op->variable_fn_params.hier_factor = coll_op->variable_fn_params.hier_factor;
/* Set order info for new frag if there is a bcol needs ordering */
MCA_COLL_ML_SET_NEW_FRAG_ORDER_INFO(new_op);
/* Launch this collective !! */
ret = mca_coll_ml_generic_collectives_append_to_queue(new_op, task_setup);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ML_ERROR(("Failed to launch"));
return ret;
}
}
return OMPI_SUCCESS;
}
static int mca_coll_ml_bcast_frag_progress(mca_coll_ml_collective_operation_progress_t *coll_op)
{
/* local variables */
int ret;
int frag_len, current_coll_op = coll_op->fragment_data.current_coll_op;
size_t dt_size;
void *buf;
ml_payload_buffer_desc_t *src_buffer_desc = NULL;
mca_coll_ml_collective_operation_progress_t *new_op = NULL;
mca_coll_ml_task_setup_fn_t task_setup = NULL;
ompi_datatype_type_size(coll_op->variable_fn_params.dtype, &dt_size);
/* Keep the pipeline filled with fragments */
while (coll_op->fragment_data.message_descriptor->n_active <
coll_op->fragment_data.message_descriptor->pipeline_depth) {
/* If an active fragment happens to have completed the collective during
* a hop into the progress engine, then don't launch a new fragment,
* instead break and return.
*/
if (coll_op->fragment_data.message_descriptor->n_bytes_scheduled
== coll_op->fragment_data.message_descriptor->n_bytes_total) {
break;
}
/* Get an ml buffer */
src_buffer_desc = mca_coll_ml_alloc_buffer(OP_ML_MODULE(coll_op));
if (NULL == src_buffer_desc) {
/* If there exist outstanding fragments, then break out
* and let an active fragment deal with this later,
* there are no buffers available.
*/
if (0 < coll_op->fragment_data.message_descriptor->n_active) {
return OMPI_SUCCESS;
} else {
/* It is useless to call progress from here, since
* ml progress can't be executed as result ml memsync
* call will not be completed and no memory will be
* recycled. So we put the element on the list, and we will
* progress it later when memsync will recycle some memory*/
/* The fragment is already on list and
* the we still have no ml resources
* Return busy */
if (coll_op->pending & REQ_OUT_OF_MEMORY) {
ML_VERBOSE(10,("Out of resources %p", coll_op));
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
coll_op->pending |= REQ_OUT_OF_MEMORY;
opal_list_append(&((OP_ML_MODULE(coll_op))->waiting_for_memory_list),
(opal_list_item_t *) coll_op);
ML_VERBOSE(10,("Out of resources %p adding to pending queue", coll_op));
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
}
/* Get a new collective descriptor and initialize it */
new_op = mca_coll_ml_duplicate_op_prog_single_frag_dag
(OP_ML_MODULE(coll_op), coll_op);
/* We need this address for pointer arithmetic in memcpy */
buf = coll_op->fragment_data.message_descriptor->dest_user_addr;
frag_len = ML_GET_FRAG_SIZE(coll_op, BCOL_BCAST);
/* Decide based on global flag, not variable one */
if (coll_op->fragment_data.message_descriptor->root) {
memcpy((void *)(uintptr_t)src_buffer_desc->data_addr,
(void *) ((uintptr_t) buf + (uintptr_t) coll_op->
fragment_data.message_descriptor->n_bytes_scheduled) , frag_len);
/* No unpack for root */
new_op->process_fn = NULL;
new_op->variable_fn_params.root_flag = true;
new_op->variable_fn_params.root_route = NULL;
task_setup = OP_ML_MODULE(new_op)->coll_ml_bcast_functions[current_coll_op]->
task_setup_fn[COLL_ML_ROOT_TASK_FN];
} else {
new_op->process_fn = mca_coll_ml_bcast_large_unpack_data;
new_op->variable_fn_params.root_flag = false;
new_op->variable_fn_params.root_route = coll_op->variable_fn_params.root_route;
task_setup = OP_ML_MODULE(new_op)->coll_ml_bcast_functions[current_coll_op]->
task_setup_fn[COLL_ML_GENERAL_TASK_FN];
}
/* Setup fragment specific data */
new_op->fragment_data.message_descriptor->n_bytes_scheduled += frag_len;
new_op->fragment_data.buffer_desc = src_buffer_desc;
new_op->fragment_data.fragment_size = frag_len;
new_op->fragment_data.message_descriptor->n_active++;
ML_SET_VARIABLE_PARAMS_BCAST(
new_op,
OP_ML_MODULE(new_op),
frag_len,
MPI_BYTE,
src_buffer_desc,
0,
0,
frag_len,
src_buffer_desc->data_addr);
/* Fill in bcast specific arguments */
/* TBD: remove buffer_size */
new_op->variable_fn_params.buffer_size = coll_op->variable_fn_params.buffer_size;
new_op->variable_fn_params.hier_factor = coll_op->variable_fn_params.hier_factor;
/* Set order info for new frag if there is a bcol needs ordering */
MCA_COLL_ML_SET_NEW_FRAG_ORDER_INFO(new_op);
ML_VERBOSE(10, ("FFFF Contig + fragmentation [0-sk, 1-lk, 3-su, 4-lu] %d %d %d\n",
new_op->variable_fn_params.buffer_size ,
new_op->fragment_data.fragment_size,
new_op->fragment_data.message_descriptor->n_bytes_scheduled));
/* Launch this collective !! */
ret = mca_coll_ml_generic_collectives_append_to_queue(new_op, task_setup);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ML_VERBOSE(10, ("Failed to launch"));
return ret;
}
}
return OMPI_SUCCESS;
}
#define BCAST_FRAGMENTATION_IS_ENABLED(module) \
(module->bcast_fn_index_table[LARGE_BCAST] < ML_BCAST_LARGE_DATA_KNOWN)
static inline __opal_attribute_always_inline__
int parallel_bcast_start(void *buf, int count, struct ompi_datatype_t *dtype,
int root, mca_coll_base_module_t *module, ompi_request_t **req)
{
size_t pack_len = 0;
size_t dt_size = 0;
bool contig = false;
int bcast_index, n_fragments = 1;
mca_coll_ml_collective_operation_progress_t * coll_op = NULL;
mca_coll_ml_module_t *ml_module = (mca_coll_ml_module_t *) module;
ml_payload_buffer_desc_t *src_buffer_desc = NULL;
mca_coll_ml_task_setup_fn_t task_setup;
ML_VERBOSE(10, ("Starting bcast, mca_coll_ml_bcast_uknown_root"));
ompi_datatype_type_size(dtype, &dt_size);
pack_len = count * dt_size;
/* Setup data buffer */
ML_BUFFER_ALLOC_WAIT(ml_module, src_buffer_desc);
/* Get information about memory layout */
contig = opal_datatype_is_contiguous_memory_layout((opal_datatype_t *)dtype, count);
/* Allocate collective schedule and pack message */
if (contig) {
if (pack_len <= (size_t) ml_module->small_message_thresholds[BCOL_BCAST]) {
assert(pack_len <= ml_module->payload_block->size_buffer);
bcast_index = ml_module->bcast_fn_index_table[SMALL_BCAST];
ML_VERBOSE(10, ("Contig + small message %d [0-sk, 1-lk, 3-su, 4-lu]\n", bcast_index));
ALLOCATE_AND_PACK_CONTIG_BCAST_FRAG(ml_module, coll_op,
bcast_index, root,
pack_len, pack_len, buf, src_buffer_desc);
ML_SET_VARIABLE_PARAMS_BCAST(coll_op, ml_module, count, dtype,
src_buffer_desc, 0, 0, ml_module->payload_block->size_buffer,
(src_buffer_desc->data_addr));
} else if (BCAST_FRAGMENTATION_IS_ENABLED(ml_module)) {
/* We moved the fragmentation decision from communication creation time to
runtime, since for large messages the if latency is not so critical */
size_t n_dts_per_frag;
int frag_len, pipeline_depth = mca_coll_ml_component.pipeline_depth;
bcast_index = ml_module->bcast_fn_index_table[LARGE_BCAST];
ML_VERBOSE(10, ("Contig + fragmentation %d [0-sk, 1-lk, 3-su, 4-lu]\n", bcast_index));
/* Calculate the number of fragments required for this message */
frag_len = (pack_len < (size_t) ml_module->small_message_thresholds[BCOL_BCAST] ?
pack_len : (size_t) ml_module->small_message_thresholds[BCOL_BCAST]);
n_dts_per_frag = frag_len/dt_size;
n_fragments = (pack_len + dt_size*n_dts_per_frag - 1)/(dt_size*n_dts_per_frag);
pipeline_depth = (n_fragments < pipeline_depth ? n_fragments : pipeline_depth);
ALLOCATE_AND_PACK_CONTIG_BCAST_FRAG(ml_module, coll_op,
bcast_index, root,
pack_len, frag_len, buf, src_buffer_desc);
ML_SET_VARIABLE_PARAMS_BCAST(coll_op, ml_module, (frag_len/dt_size), dtype,
src_buffer_desc, 0, 0, frag_len, (src_buffer_desc->data_addr));
coll_op->full_message.fragment_launcher = mca_coll_ml_bcast_frag_progress;
coll_op->full_message.pipeline_depth = pipeline_depth;
/* Initialize fragment specific information */
coll_op->fragment_data.current_coll_op = bcast_index;
coll_op->fragment_data.buffer_desc = src_buffer_desc;
/* coll_op->fragment_data.message_descriptor->n_bytes_scheduled += frag_len; */
coll_op->fragment_data.fragment_size = frag_len;
coll_op->fragment_data.message_descriptor->n_active++;
/* should be removed */
coll_op->variable_fn_params.buffer_size = frag_len;
ML_VERBOSE(10, ("Contig + fragmentation [0-sk, 1-lk, 3-su, 4-lu] %d %d\n",
coll_op->variable_fn_params.buffer_size,
coll_op->fragment_data.fragment_size));
} else {
bcast_index = ml_module->bcast_fn_index_table[LARGE_BCAST];
ML_VERBOSE(10, ("Contig + zero copy %d [0-sk, 1-lk, 3-su, 4-lu]\n", bcast_index));
coll_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module,
ml_module->coll_ml_bcast_functions[bcast_index],
buf, buf,
pack_len,
0 /* offset for first pack */);
/* For large messages (bcast) this points to userbuf */
/* Pasha: temporary work around for basesmuma, userbuf should
be removed */
coll_op->variable_fn_params.userbuf = buf;
coll_op->process_fn = NULL;
coll_op->full_message.n_bytes_scheduled = pack_len;
ML_SET_VARIABLE_PARAMS_BCAST(coll_op, ml_module, count, dtype,
src_buffer_desc, 0, 0,
ml_module->payload_block->size_buffer, buf);
}
} else {
/* Non contiguous data type */
bcast_index = ml_module->bcast_fn_index_table[SMALL_BCAST];
ML_VERBOSE(10, ("NON Contig + fragmentation %d [0-sk, 1-lk, 3-su, 4-lu]\n", bcast_index));
coll_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module,
ml_module->coll_ml_bcast_functions[bcast_index],
buf, buf,
pack_len,
0 /* offset for first pack */);
if (OPAL_LIKELY(pack_len > 0)) {
size_t max_data = 0;
if (ompi_comm_rank(ml_module->comm) == root) {
struct iovec iov;
uint32_t iov_count = 1;
opal_convertor_copy_and_prepare_for_send(
ompi_mpi_local_convertor,
&dtype->super, count, buf, 0,
&coll_op->full_message.send_convertor);
opal_convertor_get_packed_size(&coll_op->full_message.send_convertor,
&coll_op->full_message.send_converter_bytes_packed);
coll_op->full_message.n_bytes_total =
coll_op->full_message.send_converter_bytes_packed;
iov.iov_base = (IOVBASE_TYPE*) src_buffer_desc->data_addr;
iov.iov_len = ml_module->ml_fragment_size;
opal_convertor_pack(&coll_op->full_message.send_convertor,
&iov, &iov_count, &max_data);
coll_op->process_fn = NULL;
coll_op->full_message.n_bytes_scheduled = max_data;
/* We need prepare the data for future pipe line comunication */
coll_op->full_message.fragment_launcher = mca_coll_ml_bcast_frag_converter_progress;
coll_op->full_message.pipeline_depth = mca_coll_ml_component.pipeline_depth;
coll_op->full_message.root = true;
} else {
opal_convertor_copy_and_prepare_for_send(
ompi_mpi_local_convertor,
&dtype->super, count, NULL, 0,
&coll_op->full_message.dummy_convertor);
/* In non-root case we use it for #bytes remaining to receive */
opal_convertor_get_packed_size(&coll_op->full_message.dummy_convertor,
&coll_op->full_message.send_converter_bytes_packed);
opal_convertor_copy_and_prepare_for_recv(
ompi_mpi_local_convertor,
&dtype->super, count, buf, 0,
&coll_op->full_message.recv_convertor);
opal_convertor_get_unpacked_size(&coll_op->full_message.recv_convertor,
&coll_op->full_message.recv_converter_bytes_packed);
coll_op->full_message.root = false;
coll_op->full_message.n_bytes_total =
coll_op->full_message.recv_converter_bytes_packed;
coll_op->process_fn = mca_coll_ml_bcast_converter_unpack_data;
coll_op->full_message.fragment_launcher = mca_coll_ml_bcast_frag_converter_progress;
coll_op->full_message.pipeline_depth = mca_coll_ml_component.pipeline_depth;
coll_op->full_message.dummy_conv_position = 0;
mca_coll_ml_convertor_get_send_frag_size(
ml_module, &max_data,
&coll_op->full_message);
coll_op->full_message.n_bytes_scheduled = max_data;
}
}
coll_op->fragment_data.current_coll_op = bcast_index;
coll_op->fragment_data.message_descriptor->n_active++;
coll_op->fragment_data.fragment_size = coll_op->full_message.n_bytes_scheduled;
/* Set initial index */
coll_op->full_message.next_expected_index = src_buffer_desc->buffer_index;
/* Prepare linking information for future frags */
COLL_ML_SETUP_ORDERING_INFO(coll_op, coll_op, NULL);
/* Since the data is already packed we will use MPI_BYTE and byte count as datatype */
ML_SET_VARIABLE_PARAMS_BCAST(coll_op, ml_module, coll_op->full_message.n_bytes_scheduled, MPI_BYTE,
src_buffer_desc, 0, 0, ml_module->payload_block->size_buffer,(src_buffer_desc->data_addr));
n_fragments = (coll_op->full_message.n_bytes_total +
ml_module->ml_fragment_size - 1) / ml_module->ml_fragment_size;
}
coll_op->variable_fn_params.hier_factor = 1;
coll_op->fragment_data.buffer_desc = src_buffer_desc;
/* Set order info if there is a bcol needs ordering */
MCA_COLL_ML_SET_ORDER_INFO(coll_op, n_fragments);
if (ompi_comm_rank(ml_module->comm) == root) {
coll_op->full_message.root =
coll_op->variable_fn_params.root_flag = true;
coll_op->variable_fn_params.root_route = NULL;
task_setup = ml_module->coll_ml_bcast_functions[bcast_index]->
task_setup_fn[COLL_ML_ROOT_TASK_FN];
} else {
coll_op->full_message.root =
coll_op->variable_fn_params.root_flag = false;
coll_op->variable_fn_params.root_route =
(NULL == coll_op->coll_schedule->topo_info->route_vector ?
NULL : &coll_op->coll_schedule->topo_info->route_vector[root]);
task_setup = ml_module->coll_ml_bcast_functions[bcast_index]->
task_setup_fn[COLL_ML_GENERAL_TASK_FN];
}
*req = &coll_op->full_message.super;
return mca_coll_ml_generic_collectives_launcher(coll_op, task_setup);
}
int mca_coll_ml_parallel_bcast(void *buf, int count, struct ompi_datatype_t *dtype,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int ret;
ompi_request_t *req;
ret = parallel_bcast_start(buf, count, dtype, root, module, &req);
if (OPAL_UNLIKELY(ret != OMPI_SUCCESS)) {
ML_VERBOSE(10, ("Failed to launch"));
return ret;
}
/* Blocking bcast */
ompi_request_wait_completion(req);
ompi_request_free(&req);
ML_VERBOSE(10, ("Bcast is done mca_coll_ml_bcast_known"));
return OMPI_SUCCESS;
}
int mca_coll_ml_parallel_bcast_nb(void *buf, int count, struct ompi_datatype_t *dtype,
int root, struct ompi_communicator_t *comm,
ompi_request_t **req,
mca_coll_base_module_t *module)
{
int ret;
ret = parallel_bcast_start(buf, count, dtype, root, module, req);
if (OPAL_UNLIKELY(ret != OMPI_SUCCESS)) {
ML_VERBOSE(10, ("Failed to launch"));
return ret;
}
ML_VERBOSE(10, ("Bcast is done mca_coll_ml_bcast_known"));
return OMPI_SUCCESS;
}
int mca_coll_ml_bcast_sequential_root(void *buf, int count, struct ompi_datatype_t *dtype,
int root, struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
/* local variables */
int ret, fn_idx;
size_t pack_len = 0;
size_t dt_size = 0;
mca_coll_ml_collective_operation_progress_t * coll_op = NULL;
mca_coll_ml_compound_functions_t *fixed_schedule;
mca_coll_ml_module_t *ml_module = (mca_coll_ml_module_t *) module;
ml_payload_buffer_desc_t *src_buffer_desc = NULL;
mca_bcol_base_coll_fn_desc_t *func;
ML_VERBOSE(10, ("Starting static bcast, small messages"));
assert(NULL != dtype);
/* Calculate size of the data,
* on this stage only contiguous data is supported */
ompi_datatype_type_size(dtype, &dt_size);
pack_len = count * dt_size;
/* Setup data buffer */
src_buffer_desc = mca_coll_ml_alloc_buffer(ml_module);
while (NULL == src_buffer_desc) {
opal_progress();
src_buffer_desc = mca_coll_ml_alloc_buffer(ml_module);
}
/* Allocate collective schedule and pack message */
if (pack_len <= (size_t) ml_module->small_message_thresholds[BCOL_BCAST]) {
/* The len of the message can not be larger than ML buffer size */
assert(pack_len <= ml_module->payload_block->size_buffer);
coll_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module,
ml_module->coll_ml_bcast_functions[ML_BCAST_SMALL_DATA_SEQUENTIAL],
buf, buf,
pack_len,
0 /* offset for first pack */);
if (ompi_comm_rank(comm) == root) {
/* single frag, pack the data */
memcpy((void *)(uintptr_t)src_buffer_desc->data_addr,
buf, pack_len);
/* No unpack for root */
coll_op->process_fn = NULL;
} else {
coll_op->process_fn = mca_coll_ml_bcast_small_unpack_data;
}
coll_op->variable_fn_params.sbuf =
src_buffer_desc->data_addr;
} else {
ML_VERBOSE(10, ("ML_BCAST_LARGE_DATA_KNOWN case."));
coll_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module,
ml_module->coll_ml_bcast_functions[ML_BCAST_LARGE_DATA_SEQUENTIAL],
buf, buf,
pack_len,
0 /* offset for first pack */);
/* For large messages (bcast) this points to userbuf */
/* Pasha: temporary work around for basesmuma, userbuf should
be removed */
coll_op->variable_fn_params.userbuf =
coll_op->variable_fn_params.sbuf = buf;
coll_op->process_fn = NULL;
}
/* Fill in the function arguments */
coll_op->variable_fn_params.sequence_num =
OPAL_THREAD_ADD64(&(ml_module->collective_sequence_num), 1);
coll_op->variable_fn_params.count = count;
coll_op->variable_fn_params.dtype = dtype;
coll_op->variable_fn_params.buffer_index = src_buffer_desc->buffer_index;
coll_op->variable_fn_params.src_desc = src_buffer_desc;
coll_op->variable_fn_params.sbuf_offset = 0;
coll_op->variable_fn_params.rbuf_offset = 0;
/* pasha - why we duplicate it ? */
coll_op->fragment_data.buffer_desc = src_buffer_desc;
/* pack data into payload buffer - NOTE: assume no fragmenation at this stage */
if (ompi_comm_rank(comm) == root) {
coll_op->variable_fn_params.root_flag = true;
coll_op->variable_fn_params.root_route =
&coll_op->coll_schedule->topo_info->route_vector[root];
coll_op->full_message.n_bytes_scheduled = pack_len;
} else {
coll_op->variable_fn_params.root_flag = false;
coll_op->variable_fn_params.root_route =
&coll_op->coll_schedule->topo_info->route_vector[root];
}
/* seems like we should fix a schedule here and now */
fixed_schedule = coll_op->coll_schedule->
comp_fn_arr[coll_op->variable_fn_params.root_route->level];
/* now we set this schedule as the compound function list */
coll_op->coll_schedule->component_functions = fixed_schedule;
coll_op->sequential_routine.current_active_bcol_fn = 0;
while (true) {
/* ready, aim, fire collective(s)!! */
fn_idx = coll_op->sequential_routine.current_active_bcol_fn;
func = fixed_schedule[fn_idx].bcol_function;
ret = func->coll_fn(&coll_op->variable_fn_params,
(struct coll_ml_function_t *) &fixed_schedule[fn_idx].constant_group_data);
/* set the coll_fn_started flag to true */
if (BCOL_FN_COMPLETE == ret) {
/* done with this routine, bump the active counter */
coll_op->sequential_routine.current_active_bcol_fn++;
coll_op->variable_fn_params.root_flag = true;
/* check for collective completion */
if (coll_op->sequential_routine.current_active_bcol_fn ==
coll_op->coll_schedule->n_fns) {
/* handle fragment completion */
ret = coll_ml_fragment_completion_processing(coll_op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
mca_coll_ml_abort_ml("Failed to run coll_ml_fragment_completion_processing");
}
/* break out of while loop */
break;
}
} else {
/* put entire collective opperation onto sequential queue */
opal_list_append(&mca_coll_ml_component.sequential_collectives,
(opal_list_item_t *) coll_op);
break;
}
}
/* Blocking bcast */
ompi_request_wait_completion(&coll_op->full_message.super);
ompi_request_free((ompi_request_t **) &coll_op);
ML_VERBOSE(10, ("Bcast is done"));
return OMPI_SUCCESS;
}