/* * Copyright (c) 2009-2012 Oak Ridge National Laboratory. All rights reserved. * Copyright (c) 2009-2012 Mellanox Technologies. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /** @file */ #include "ompi_config.h" #include #include #include "opal/threads/mutex.h" #include "opal/sys/atomic.h" #include "ompi/constants.h" #include "ompi/communicator/communicator.h" #include "ompi/mca/coll/coll.h" #include "ompi/mca/bcol/bcol.h" #include "coll_ml.h" #include "coll_ml_inlines.h" #include "coll_ml_colls.h" #include "coll_ml_allocation.h" #define ML_BUFFER_ALLOC_WAIT(ml, buffer) \ do { \ buffer = mca_coll_ml_alloc_buffer(ml); \ while (NULL == buffer) { \ opal_progress(); \ buffer = mca_coll_ml_alloc_buffer(ml); \ } \ } while (0) #define COLL_ML_SETUP_ORDERING_INFO(op, last, prev) \ do { \ /* Don't change order of commands !!!! */ \ (op)->prev_frag = prev; \ (op)->fragment_data.message_descriptor->last_started_frag = last; \ /* op->next_to_process_frag = NULL; */ \ } while (0) #define ALLOCATE_AND_PACK_CONTIG_BCAST_FRAG(ml_module, op, coll_index, root, \ total_len, frag_len, buf, ml_buff_desc) \ do { \ op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module, \ ml_module->coll_ml_bcast_functions[coll_index], \ buf, buf, \ total_len, \ 0 /* offset for first pack */); \ if (OPAL_LIKELY(frag_len > 0)) { \ if (ompi_comm_rank(ml_module->comm) == root) { \ /* single frag, pack the data */ \ memcpy((void *)(uintptr_t)(ml_buff_desc)->data_addr, \ buf, frag_len); \ /* No unpack for root */ \ op->process_fn = NULL; \ } else { \ op->process_fn = mca_coll_ml_bcast_small_unpack_data; \ } \ } \ op->full_message.n_bytes_scheduled = frag_len; \ } while (0) #define SMALL_BCAST 0 #define LARGE_BCAST (SMALL_BCAST + 1) /* bcast data unpack */ static int mca_coll_ml_bcast_converter_unpack_data(mca_coll_ml_collective_operation_progress_t *coll_op) { struct iovec iov; uint32_t iov_count = 1; size_t max_data = 0; mca_coll_ml_collective_operation_progress_t *next_op; mca_coll_ml_module_t *ml_module = (mca_coll_ml_module_t *) coll_op->coll_module; size_t max_index = ml_module->payload_block->num_banks * ml_module->payload_block->num_buffers_per_bank; bool is_first = true; int ret; /* Check if the fragment delivered in order */ if (coll_op->fragment_data.buffer_desc->buffer_index != coll_op->fragment_data.message_descriptor->next_expected_index) { mca_coll_ml_collective_operation_progress_t *prev_coll_op = coll_op->prev_frag; assert(NULL == prev_coll_op->next_to_process_frag); /* make sure that previous process will have pointer to the out of order process */ prev_coll_op->next_to_process_frag = coll_op; assert(!(coll_op->pending & REQ_OUT_OF_ORDER)); coll_op->pending |= REQ_OUT_OF_ORDER; /* we will unpack it later */ ML_VERBOSE(10, ("Get %d expecting %d previous %d", coll_op->fragment_data.buffer_desc->buffer_index, coll_op->fragment_data.message_descriptor->next_expected_index, prev_coll_op->fragment_data.buffer_desc->buffer_index)); return ORTE_ERR_NO_MATCH_YET; } do { iov.iov_len = coll_op->fragment_data.fragment_size; iov.iov_base = (void *)((uintptr_t) coll_op->fragment_data.buffer_desc->data_addr); ML_VERBOSE(10, ("Data unpack with convertern index %d", coll_op->fragment_data.buffer_desc->buffer_index)); opal_convertor_unpack(&coll_op->fragment_data.message_descriptor->recv_convertor, &iov, &iov_count, &max_data); /* update next index */ ++coll_op->fragment_data.message_descriptor->next_expected_index; if (coll_op->fragment_data.message_descriptor->next_expected_index >= max_index) { coll_op->fragment_data.message_descriptor->next_expected_index = 0; } /* Return to queue if the packet is done, the exeption is first packet, we release it later. */ next_op = coll_op->next_to_process_frag; coll_op->next_to_process_frag = NULL; if ((!is_first) && (0 != coll_op->fragment_data.offset_into_user_buffer)) { assert(coll_op->pending & REQ_OUT_OF_ORDER); coll_op->pending ^= REQ_OUT_OF_ORDER; /* Pasha: On one hand - I'm not sure that conceptually it is right place to call buffer recycling. Potentially, coll_ml_fragment_completion_processing() sounds like right place for out of order unpack/sync handling. * On the other hand - non contiguous data is not supper common and we would like to minimize effect on critical pass * for non contiguous data types. */ ret = mca_coll_ml_buffer_recycling(coll_op); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return OMPI_ERROR; } CHECK_AND_RECYCLE(coll_op); } coll_op = next_op; is_first = false; } while (NULL != coll_op); return OMPI_SUCCESS; } static int mca_coll_ml_bcast_small_unpack_data(mca_coll_ml_collective_operation_progress_t *coll_op) { void * dest = (void *)((uintptr_t) coll_op->full_message.dest_user_addr + (uintptr_t) coll_op->full_message.n_bytes_delivered); void * src = (void *)((uintptr_t) coll_op->fragment_data.buffer_desc->data_addr); memcpy(dest, src, coll_op->fragment_data.fragment_size); return OMPI_SUCCESS; } static int mca_coll_ml_bcast_large_unpack_data(mca_coll_ml_collective_operation_progress_t *coll_op) { void * dest = (void *)((uintptr_t) coll_op->fragment_data.message_descriptor->dest_user_addr + (uintptr_t) coll_op->fragment_data.offset_into_user_buffer); void * src = (void *)((uintptr_t) coll_op->fragment_data.buffer_desc->data_addr); memcpy(dest, src, coll_op->fragment_data.fragment_size); return OMPI_SUCCESS; } static int mca_coll_ml_bcast_frag_converter_progress(mca_coll_ml_collective_operation_progress_t *coll_op) { /* local variables */ int ret, frag_len; size_t max_data = 0; ml_payload_buffer_desc_t *src_buffer_desc = NULL; mca_coll_ml_collective_operation_progress_t *new_op = NULL; mca_coll_ml_task_setup_fn_t task_setup = NULL; mca_coll_ml_module_t *ml_module = OP_ML_MODULE(coll_op); /* Keep the pipeline filled with fragments */ while (coll_op->fragment_data.message_descriptor->n_active < mca_coll_ml_component.pipeline_depth) { /* If an active fragment happens to have completed the collective during * a hop into the progress engine, then don't launch a new fragment, * instead break and return. */ if (coll_op->fragment_data.message_descriptor->n_bytes_scheduled == coll_op->fragment_data.message_descriptor->n_bytes_total) { break; } /* Get an ml buffer */ src_buffer_desc = mca_coll_ml_alloc_buffer(ml_module); if (NULL == src_buffer_desc) { /* If there exist outstanding fragments, then break out * and let an active fragment deal with this later, * there are no buffers available. */ if (0 < coll_op->fragment_data.message_descriptor->n_active) { return OMPI_SUCCESS; } else { /* It is useless to call progress from here, since * ml progress can't be executed as result ml memsync * call will not be completed and no memory will be * recycled. So we put the element on the list, and we will * progress it later when memsync will recycle some memory*/ if (NULL == src_buffer_desc) { /* The fragment is already on list and * the we still have no ml resources * Return busy */ if (coll_op->pending & REQ_OUT_OF_MEMORY) { return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } coll_op->pending |= REQ_OUT_OF_MEMORY; opal_list_append(&ml_module->waiting_for_memory_list, (opal_list_item_t *)coll_op); return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } } } /* Get a new collective descriptor and initialize it */ new_op = mca_coll_ml_duplicate_op_prog_single_frag_dag (ml_module, coll_op); /* We need this address for pointer arithmetic in memcpy */ frag_len = ML_GET_FRAG_SIZE(coll_op, BCOL_BCAST); /* Decide based on global flag, not variable one */ if (coll_op->fragment_data.message_descriptor->root) { struct iovec iov; uint32_t iov_count = 1; /* OBJ_RETAIN(new_op->variable_fn_params.dtype); */ iov.iov_base = (IOVBASE_TYPE*) src_buffer_desc->data_addr; iov.iov_len = ml_module->ml_fragment_size; assert(0 != iov.iov_len); opal_convertor_pack(&new_op->fragment_data.message_descriptor->send_convertor, &iov, &iov_count, &max_data); new_op->process_fn = NULL; new_op->variable_fn_params.root_flag = true; new_op->variable_fn_params.root_route = NULL; task_setup = OP_ML_MODULE(new_op)-> coll_ml_bcast_functions[new_op->fragment_data.current_coll_op]-> task_setup_fn[COLL_ML_ROOT_TASK_FN]; } else { new_op->process_fn = mca_coll_ml_bcast_converter_unpack_data; new_op->variable_fn_params.root_flag = false; new_op->variable_fn_params.root_route = coll_op->variable_fn_params.root_route; task_setup = OP_ML_MODULE(new_op)-> coll_ml_bcast_functions[new_op->fragment_data.current_coll_op]-> task_setup_fn[COLL_ML_GENERAL_TASK_FN]; mca_coll_ml_convertor_get_send_frag_size( ml_module, &max_data, new_op->fragment_data.message_descriptor); } new_op->fragment_data.message_descriptor->n_bytes_scheduled += max_data; new_op->fragment_data.fragment_size = max_data; new_op->fragment_data.buffer_desc = src_buffer_desc; /* Setup fragment specific data */ ++(new_op->fragment_data.message_descriptor->n_active); COLL_ML_SETUP_ORDERING_INFO(new_op, new_op, new_op->fragment_data.message_descriptor->last_started_frag); ML_VERBOSE(10, ("Start more, My index %d my prev %d", new_op->fragment_data.buffer_desc->buffer_index, new_op->prev_frag->fragment_data.buffer_desc->buffer_index)); ML_SET_VARIABLE_PARAMS_BCAST( new_op, OP_ML_MODULE(new_op), frag_len, MPI_BYTE, src_buffer_desc, 0, 0, frag_len, src_buffer_desc->data_addr); /* TBD: remove buffer_size */ new_op->variable_fn_params.buffer_size = coll_op->variable_fn_params.buffer_size; new_op->variable_fn_params.hier_factor = coll_op->variable_fn_params.hier_factor; /* Set order info for new frag if there is a bcol needs ordering */ MCA_COLL_ML_SET_NEW_FRAG_ORDER_INFO(new_op); /* Launch this collective !! */ ret = mca_coll_ml_generic_collectives_append_to_queue(new_op, task_setup); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { ML_ERROR(("Failed to launch")); return ret; } } return OMPI_SUCCESS; } static int mca_coll_ml_bcast_frag_progress(mca_coll_ml_collective_operation_progress_t *coll_op) { /* local variables */ int ret; int frag_len, current_coll_op = coll_op->fragment_data.current_coll_op; size_t dt_size; void *buf; ml_payload_buffer_desc_t *src_buffer_desc = NULL; mca_coll_ml_collective_operation_progress_t *new_op = NULL; mca_coll_ml_task_setup_fn_t task_setup = NULL; ompi_datatype_type_size(coll_op->variable_fn_params.dtype, &dt_size); /* Keep the pipeline filled with fragments */ while (coll_op->fragment_data.message_descriptor->n_active < coll_op->fragment_data.message_descriptor->pipeline_depth) { /* If an active fragment happens to have completed the collective during * a hop into the progress engine, then don't launch a new fragment, * instead break and return. */ if (coll_op->fragment_data.message_descriptor->n_bytes_scheduled == coll_op->fragment_data.message_descriptor->n_bytes_total) { break; } /* Get an ml buffer */ src_buffer_desc = mca_coll_ml_alloc_buffer(OP_ML_MODULE(coll_op)); if (NULL == src_buffer_desc) { /* If there exist outstanding fragments, then break out * and let an active fragment deal with this later, * there are no buffers available. */ if (0 < coll_op->fragment_data.message_descriptor->n_active) { return OMPI_SUCCESS; } else { /* It is useless to call progress from here, since * ml progress can't be executed as result ml memsync * call will not be completed and no memory will be * recycled. So we put the element on the list, and we will * progress it later when memsync will recycle some memory*/ /* The fragment is already on list and * the we still have no ml resources * Return busy */ if (coll_op->pending & REQ_OUT_OF_MEMORY) { ML_VERBOSE(10,("Out of resources %p", coll_op)); return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } coll_op->pending |= REQ_OUT_OF_MEMORY; opal_list_append(&((OP_ML_MODULE(coll_op))->waiting_for_memory_list), (opal_list_item_t *) coll_op); ML_VERBOSE(10,("Out of resources %p adding to pending queue", coll_op)); return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } } /* Get a new collective descriptor and initialize it */ new_op = mca_coll_ml_duplicate_op_prog_single_frag_dag (OP_ML_MODULE(coll_op), coll_op); /* We need this address for pointer arithmetic in memcpy */ buf = coll_op->fragment_data.message_descriptor->dest_user_addr; frag_len = ML_GET_FRAG_SIZE(coll_op, BCOL_BCAST); /* Decide based on global flag, not variable one */ if (coll_op->fragment_data.message_descriptor->root) { memcpy((void *)(uintptr_t)src_buffer_desc->data_addr, (void *) ((uintptr_t) buf + (uintptr_t) coll_op-> fragment_data.message_descriptor->n_bytes_scheduled) , frag_len); /* No unpack for root */ new_op->process_fn = NULL; new_op->variable_fn_params.root_flag = true; new_op->variable_fn_params.root_route = NULL; task_setup = OP_ML_MODULE(new_op)->coll_ml_bcast_functions[current_coll_op]-> task_setup_fn[COLL_ML_ROOT_TASK_FN]; } else { new_op->process_fn = mca_coll_ml_bcast_large_unpack_data; new_op->variable_fn_params.root_flag = false; new_op->variable_fn_params.root_route = coll_op->variable_fn_params.root_route; task_setup = OP_ML_MODULE(new_op)->coll_ml_bcast_functions[current_coll_op]-> task_setup_fn[COLL_ML_GENERAL_TASK_FN]; } /* Setup fragment specific data */ new_op->fragment_data.message_descriptor->n_bytes_scheduled += frag_len; new_op->fragment_data.buffer_desc = src_buffer_desc; new_op->fragment_data.fragment_size = frag_len; new_op->fragment_data.message_descriptor->n_active++; ML_SET_VARIABLE_PARAMS_BCAST( new_op, OP_ML_MODULE(new_op), frag_len, MPI_BYTE, src_buffer_desc, 0, 0, frag_len, src_buffer_desc->data_addr); /* Fill in bcast specific arguments */ /* TBD: remove buffer_size */ new_op->variable_fn_params.buffer_size = coll_op->variable_fn_params.buffer_size; new_op->variable_fn_params.hier_factor = coll_op->variable_fn_params.hier_factor; /* Set order info for new frag if there is a bcol needs ordering */ MCA_COLL_ML_SET_NEW_FRAG_ORDER_INFO(new_op); ML_VERBOSE(10, ("FFFF Contig + fragmentation [0-sk, 1-lk, 3-su, 4-lu] %d %d %d\n", new_op->variable_fn_params.buffer_size , new_op->fragment_data.fragment_size, new_op->fragment_data.message_descriptor->n_bytes_scheduled)); /* Launch this collective !! */ ret = mca_coll_ml_generic_collectives_append_to_queue(new_op, task_setup); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { ML_VERBOSE(10, ("Failed to launch")); return ret; } } return OMPI_SUCCESS; } #define BCAST_FRAGMENTATION_IS_ENABLED(module) \ (module->bcast_fn_index_table[LARGE_BCAST] < ML_BCAST_LARGE_DATA_KNOWN) static inline __opal_attribute_always_inline__ int parallel_bcast_start(void *buf, int count, struct ompi_datatype_t *dtype, int root, mca_coll_base_module_t *module, ompi_request_t **req) { size_t pack_len = 0; size_t dt_size = 0; bool contig = false; int bcast_index, n_fragments = 1; mca_coll_ml_collective_operation_progress_t * coll_op = NULL; mca_coll_ml_module_t *ml_module = (mca_coll_ml_module_t *) module; ml_payload_buffer_desc_t *src_buffer_desc = NULL; mca_coll_ml_task_setup_fn_t task_setup; ML_VERBOSE(10, ("Starting bcast, mca_coll_ml_bcast_uknown_root")); ompi_datatype_type_size(dtype, &dt_size); pack_len = count * dt_size; /* Setup data buffer */ ML_BUFFER_ALLOC_WAIT(ml_module, src_buffer_desc); /* Get information about memory layout */ contig = opal_datatype_is_contiguous_memory_layout((opal_datatype_t *)dtype, count); /* Allocate collective schedule and pack message */ if (contig) { if (pack_len <= (size_t) ml_module->small_message_thresholds[BCOL_BCAST]) { assert(pack_len <= ml_module->payload_block->size_buffer); bcast_index = ml_module->bcast_fn_index_table[SMALL_BCAST]; ML_VERBOSE(10, ("Contig + small message %d [0-sk, 1-lk, 3-su, 4-lu]\n", bcast_index)); ALLOCATE_AND_PACK_CONTIG_BCAST_FRAG(ml_module, coll_op, bcast_index, root, pack_len, pack_len, buf, src_buffer_desc); ML_SET_VARIABLE_PARAMS_BCAST(coll_op, ml_module, count, dtype, src_buffer_desc, 0, 0, ml_module->payload_block->size_buffer, (src_buffer_desc->data_addr)); } else if (BCAST_FRAGMENTATION_IS_ENABLED(ml_module)) { /* We moved the fragmentation decision from communication creation time to runtime, since for large messages the if latency is not so critical */ size_t n_dts_per_frag; int frag_len, pipeline_depth = mca_coll_ml_component.pipeline_depth; bcast_index = ml_module->bcast_fn_index_table[LARGE_BCAST]; ML_VERBOSE(10, ("Contig + fragmentation %d [0-sk, 1-lk, 3-su, 4-lu]\n", bcast_index)); /* Calculate the number of fragments required for this message */ frag_len = (pack_len < (size_t) ml_module->small_message_thresholds[BCOL_BCAST] ? pack_len : (size_t) ml_module->small_message_thresholds[BCOL_BCAST]); n_dts_per_frag = frag_len/dt_size; n_fragments = (pack_len + dt_size*n_dts_per_frag - 1)/(dt_size*n_dts_per_frag); pipeline_depth = (n_fragments < pipeline_depth ? n_fragments : pipeline_depth); ALLOCATE_AND_PACK_CONTIG_BCAST_FRAG(ml_module, coll_op, bcast_index, root, pack_len, frag_len, buf, src_buffer_desc); ML_SET_VARIABLE_PARAMS_BCAST(coll_op, ml_module, (frag_len/dt_size), dtype, src_buffer_desc, 0, 0, frag_len, (src_buffer_desc->data_addr)); coll_op->full_message.fragment_launcher = mca_coll_ml_bcast_frag_progress; coll_op->full_message.pipeline_depth = pipeline_depth; /* Initialize fragment specific information */ coll_op->fragment_data.current_coll_op = bcast_index; coll_op->fragment_data.buffer_desc = src_buffer_desc; /* coll_op->fragment_data.message_descriptor->n_bytes_scheduled += frag_len; */ coll_op->fragment_data.fragment_size = frag_len; coll_op->fragment_data.message_descriptor->n_active++; /* should be removed */ coll_op->variable_fn_params.buffer_size = frag_len; ML_VERBOSE(10, ("Contig + fragmentation [0-sk, 1-lk, 3-su, 4-lu] %d %d\n", coll_op->variable_fn_params.buffer_size, coll_op->fragment_data.fragment_size)); } else { bcast_index = ml_module->bcast_fn_index_table[LARGE_BCAST]; ML_VERBOSE(10, ("Contig + zero copy %d [0-sk, 1-lk, 3-su, 4-lu]\n", bcast_index)); coll_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module, ml_module->coll_ml_bcast_functions[bcast_index], buf, buf, pack_len, 0 /* offset for first pack */); /* For large messages (bcast) this points to userbuf */ /* Pasha: temporary work around for basesmuma, userbuf should be removed */ coll_op->variable_fn_params.userbuf = buf; coll_op->process_fn = NULL; coll_op->full_message.n_bytes_scheduled = pack_len; ML_SET_VARIABLE_PARAMS_BCAST(coll_op, ml_module, count, dtype, src_buffer_desc, 0, 0, ml_module->payload_block->size_buffer, buf); } } else { /* Non contiguous data type */ bcast_index = ml_module->bcast_fn_index_table[SMALL_BCAST]; ML_VERBOSE(10, ("NON Contig + fragmentation %d [0-sk, 1-lk, 3-su, 4-lu]\n", bcast_index)); coll_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module, ml_module->coll_ml_bcast_functions[bcast_index], buf, buf, pack_len, 0 /* offset for first pack */); if (OPAL_LIKELY(pack_len > 0)) { size_t max_data = 0; if (ompi_comm_rank(ml_module->comm) == root) { struct iovec iov; uint32_t iov_count = 1; opal_convertor_copy_and_prepare_for_send( ompi_mpi_local_convertor, &dtype->super, count, buf, 0, &coll_op->full_message.send_convertor); opal_convertor_get_packed_size(&coll_op->full_message.send_convertor, &coll_op->full_message.send_converter_bytes_packed); coll_op->full_message.n_bytes_total = coll_op->full_message.send_converter_bytes_packed; iov.iov_base = (IOVBASE_TYPE*) src_buffer_desc->data_addr; iov.iov_len = ml_module->ml_fragment_size; opal_convertor_pack(&coll_op->full_message.send_convertor, &iov, &iov_count, &max_data); coll_op->process_fn = NULL; coll_op->full_message.n_bytes_scheduled = max_data; /* We need prepare the data for future pipe line comunication */ coll_op->full_message.fragment_launcher = mca_coll_ml_bcast_frag_converter_progress; coll_op->full_message.pipeline_depth = mca_coll_ml_component.pipeline_depth; coll_op->full_message.root = true; } else { opal_convertor_copy_and_prepare_for_send( ompi_mpi_local_convertor, &dtype->super, count, NULL, 0, &coll_op->full_message.dummy_convertor); /* In non-root case we use it for #bytes remaining to receive */ opal_convertor_get_packed_size(&coll_op->full_message.dummy_convertor, &coll_op->full_message.send_converter_bytes_packed); opal_convertor_copy_and_prepare_for_recv( ompi_mpi_local_convertor, &dtype->super, count, buf, 0, &coll_op->full_message.recv_convertor); opal_convertor_get_unpacked_size(&coll_op->full_message.recv_convertor, &coll_op->full_message.recv_converter_bytes_packed); coll_op->full_message.root = false; coll_op->full_message.n_bytes_total = coll_op->full_message.recv_converter_bytes_packed; coll_op->process_fn = mca_coll_ml_bcast_converter_unpack_data; coll_op->full_message.fragment_launcher = mca_coll_ml_bcast_frag_converter_progress; coll_op->full_message.pipeline_depth = mca_coll_ml_component.pipeline_depth; coll_op->full_message.dummy_conv_position = 0; mca_coll_ml_convertor_get_send_frag_size( ml_module, &max_data, &coll_op->full_message); coll_op->full_message.n_bytes_scheduled = max_data; } } coll_op->fragment_data.current_coll_op = bcast_index; coll_op->fragment_data.message_descriptor->n_active++; coll_op->fragment_data.fragment_size = coll_op->full_message.n_bytes_scheduled; /* Set initial index */ coll_op->full_message.next_expected_index = src_buffer_desc->buffer_index; /* Prepare linking information for future frags */ COLL_ML_SETUP_ORDERING_INFO(coll_op, coll_op, NULL); /* Since the data is already packed we will use MPI_BYTE and byte count as datatype */ ML_SET_VARIABLE_PARAMS_BCAST(coll_op, ml_module, coll_op->full_message.n_bytes_scheduled, MPI_BYTE, src_buffer_desc, 0, 0, ml_module->payload_block->size_buffer,(src_buffer_desc->data_addr)); n_fragments = (coll_op->full_message.n_bytes_total + ml_module->ml_fragment_size - 1) / ml_module->ml_fragment_size; } coll_op->variable_fn_params.hier_factor = 1; coll_op->fragment_data.buffer_desc = src_buffer_desc; /* Set order info if there is a bcol needs ordering */ MCA_COLL_ML_SET_ORDER_INFO(coll_op, n_fragments); if (ompi_comm_rank(ml_module->comm) == root) { coll_op->full_message.root = coll_op->variable_fn_params.root_flag = true; coll_op->variable_fn_params.root_route = NULL; task_setup = ml_module->coll_ml_bcast_functions[bcast_index]-> task_setup_fn[COLL_ML_ROOT_TASK_FN]; } else { coll_op->full_message.root = coll_op->variable_fn_params.root_flag = false; coll_op->variable_fn_params.root_route = (NULL == coll_op->coll_schedule->topo_info->route_vector ? NULL : &coll_op->coll_schedule->topo_info->route_vector[root]); task_setup = ml_module->coll_ml_bcast_functions[bcast_index]-> task_setup_fn[COLL_ML_GENERAL_TASK_FN]; } *req = &coll_op->full_message.super; return mca_coll_ml_generic_collectives_launcher(coll_op, task_setup); } int mca_coll_ml_parallel_bcast(void *buf, int count, struct ompi_datatype_t *dtype, int root, struct ompi_communicator_t *comm, mca_coll_base_module_t *module) { int ret; ompi_request_t *req; ret = parallel_bcast_start(buf, count, dtype, root, module, &req); if (OPAL_UNLIKELY(ret != OMPI_SUCCESS)) { ML_VERBOSE(10, ("Failed to launch")); return ret; } /* Blocking bcast */ ompi_request_wait_completion(req); ompi_request_free(&req); ML_VERBOSE(10, ("Bcast is done mca_coll_ml_bcast_known")); return OMPI_SUCCESS; } int mca_coll_ml_parallel_bcast_nb(void *buf, int count, struct ompi_datatype_t *dtype, int root, struct ompi_communicator_t *comm, ompi_request_t **req, mca_coll_base_module_t *module) { int ret; ret = parallel_bcast_start(buf, count, dtype, root, module, req); if (OPAL_UNLIKELY(ret != OMPI_SUCCESS)) { ML_VERBOSE(10, ("Failed to launch")); return ret; } ML_VERBOSE(10, ("Bcast is done mca_coll_ml_bcast_known")); return OMPI_SUCCESS; } int mca_coll_ml_bcast_sequential_root(void *buf, int count, struct ompi_datatype_t *dtype, int root, struct ompi_communicator_t *comm, mca_coll_base_module_t *module) { /* local variables */ int ret, fn_idx; size_t pack_len = 0; size_t dt_size = 0; mca_coll_ml_collective_operation_progress_t * coll_op = NULL; mca_coll_ml_compound_functions_t *fixed_schedule; mca_coll_ml_module_t *ml_module = (mca_coll_ml_module_t *) module; ml_payload_buffer_desc_t *src_buffer_desc = NULL; mca_bcol_base_coll_fn_desc_t *func; ML_VERBOSE(10, ("Starting static bcast, small messages")); assert(NULL != dtype); /* Calculate size of the data, * on this stage only contiguous data is supported */ ompi_datatype_type_size(dtype, &dt_size); pack_len = count * dt_size; /* Setup data buffer */ src_buffer_desc = mca_coll_ml_alloc_buffer(ml_module); while (NULL == src_buffer_desc) { opal_progress(); src_buffer_desc = mca_coll_ml_alloc_buffer(ml_module); } /* Allocate collective schedule and pack message */ if (pack_len <= (size_t) ml_module->small_message_thresholds[BCOL_BCAST]) { /* The len of the message can not be larger than ML buffer size */ assert(pack_len <= ml_module->payload_block->size_buffer); coll_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module, ml_module->coll_ml_bcast_functions[ML_BCAST_SMALL_DATA_SEQUENTIAL], buf, buf, pack_len, 0 /* offset for first pack */); if (ompi_comm_rank(comm) == root) { /* single frag, pack the data */ memcpy((void *)(uintptr_t)src_buffer_desc->data_addr, buf, pack_len); /* No unpack for root */ coll_op->process_fn = NULL; } else { coll_op->process_fn = mca_coll_ml_bcast_small_unpack_data; } coll_op->variable_fn_params.sbuf = src_buffer_desc->data_addr; } else { ML_VERBOSE(10, ("ML_BCAST_LARGE_DATA_KNOWN case.")); coll_op = mca_coll_ml_alloc_op_prog_single_frag_dag(ml_module, ml_module->coll_ml_bcast_functions[ML_BCAST_LARGE_DATA_SEQUENTIAL], buf, buf, pack_len, 0 /* offset for first pack */); /* For large messages (bcast) this points to userbuf */ /* Pasha: temporary work around for basesmuma, userbuf should be removed */ coll_op->variable_fn_params.userbuf = coll_op->variable_fn_params.sbuf = buf; coll_op->process_fn = NULL; } /* Fill in the function arguments */ coll_op->variable_fn_params.sequence_num = OPAL_THREAD_ADD64(&(ml_module->collective_sequence_num), 1); coll_op->variable_fn_params.count = count; coll_op->variable_fn_params.dtype = dtype; coll_op->variable_fn_params.buffer_index = src_buffer_desc->buffer_index; coll_op->variable_fn_params.src_desc = src_buffer_desc; coll_op->variable_fn_params.sbuf_offset = 0; coll_op->variable_fn_params.rbuf_offset = 0; /* pasha - why we duplicate it ? */ coll_op->fragment_data.buffer_desc = src_buffer_desc; /* pack data into payload buffer - NOTE: assume no fragmenation at this stage */ if (ompi_comm_rank(comm) == root) { coll_op->variable_fn_params.root_flag = true; coll_op->variable_fn_params.root_route = &coll_op->coll_schedule->topo_info->route_vector[root]; coll_op->full_message.n_bytes_scheduled = pack_len; } else { coll_op->variable_fn_params.root_flag = false; coll_op->variable_fn_params.root_route = &coll_op->coll_schedule->topo_info->route_vector[root]; } /* seems like we should fix a schedule here and now */ fixed_schedule = coll_op->coll_schedule-> comp_fn_arr[coll_op->variable_fn_params.root_route->level]; /* now we set this schedule as the compound function list */ coll_op->coll_schedule->component_functions = fixed_schedule; coll_op->sequential_routine.current_active_bcol_fn = 0; while (true) { /* ready, aim, fire collective(s)!! */ fn_idx = coll_op->sequential_routine.current_active_bcol_fn; func = fixed_schedule[fn_idx].bcol_function; ret = func->coll_fn(&coll_op->variable_fn_params, (struct coll_ml_function_t *) &fixed_schedule[fn_idx].constant_group_data); /* set the coll_fn_started flag to true */ if (BCOL_FN_COMPLETE == ret) { /* done with this routine, bump the active counter */ coll_op->sequential_routine.current_active_bcol_fn++; coll_op->variable_fn_params.root_flag = true; /* check for collective completion */ if (coll_op->sequential_routine.current_active_bcol_fn == coll_op->coll_schedule->n_fns) { /* handle fragment completion */ ret = coll_ml_fragment_completion_processing(coll_op); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { mca_coll_ml_abort_ml("Failed to run coll_ml_fragment_completion_processing"); } /* break out of while loop */ break; } } else { /* put entire collective opperation onto sequential queue */ opal_list_append(&mca_coll_ml_component.sequential_collectives, (opal_list_item_t *) coll_op); break; } } /* Blocking bcast */ ompi_request_wait_completion(&coll_op->full_message.super); ompi_request_free((ompi_request_t **) &coll_op); ML_VERBOSE(10, ("Bcast is done")); return OMPI_SUCCESS; }