Slightly change the coll comm query API -- pass in a **data to allow
the component to cache data that is necessary to do the query (e.g., if you need to try to allocate resources in order to determine selectability). This data is given back during unquery() and is cached on comm->c_coll_selected_data if the module is selected (before calling comm_init). This commit was SVN r4251.
Этот коммит содержится в:
родитель
b3e26dfa9f
Коммит
5e47dafca6
@ -51,14 +51,16 @@ static mca_coll_base_module_1_0_0_t null_module = {
|
||||
* Local types
|
||||
*/
|
||||
struct avail_coll_t {
|
||||
ompi_list_item_t super;
|
||||
ompi_list_item_t super;
|
||||
|
||||
int ac_priority;
|
||||
const mca_coll_base_component_1_0_0_t *ac_component;
|
||||
const mca_coll_base_module_1_0_0_t *ac_module;
|
||||
int ac_priority;
|
||||
const mca_coll_base_component_1_0_0_t *ac_component;
|
||||
const mca_coll_base_module_1_0_0_t *ac_module;
|
||||
struct mca_coll_base_comm_t *ac_data;
|
||||
};
|
||||
typedef struct avail_coll_t avail_coll_t;
|
||||
|
||||
|
||||
/*
|
||||
* Local functions
|
||||
*/
|
||||
@ -67,19 +69,24 @@ static ompi_list_t *check_components(ompi_list_t *components,
|
||||
char **names, int num_names);
|
||||
static int check_one_component(ompi_communicator_t *comm,
|
||||
const mca_base_component_t *component,
|
||||
const mca_coll_base_module_1_0_0_t **module);
|
||||
const mca_coll_base_module_1_0_0_t **module,
|
||||
struct mca_coll_base_comm_t **data);
|
||||
|
||||
static int query(const mca_base_component_t *component,
|
||||
ompi_communicator_t *comm, int *priority,
|
||||
const mca_coll_base_module_1_0_0_t **module);
|
||||
const mca_coll_base_module_1_0_0_t **module,
|
||||
struct mca_coll_base_comm_t **data);
|
||||
static int query_1_0_0(const mca_coll_base_component_1_0_0_t *coll_component,
|
||||
ompi_communicator_t *comm, int *priority,
|
||||
const mca_coll_base_module_1_0_0_t **module);
|
||||
const mca_coll_base_module_1_0_0_t **module,
|
||||
struct mca_coll_base_comm_t **data);
|
||||
|
||||
static void unquery(const mca_coll_base_component_1_0_0_t *coll_component,
|
||||
ompi_communicator_t *comm);
|
||||
ompi_communicator_t *comm,
|
||||
struct mca_coll_base_comm_t *data);
|
||||
static void unquery_1_0_0(const mca_coll_base_component_1_0_0_t *coll_component,
|
||||
ompi_communicator_t *comm);
|
||||
ompi_communicator_t *comm,
|
||||
struct mca_coll_base_comm_t *data);
|
||||
|
||||
static int module_init(const mca_coll_base_module_1_0_0_t *module,
|
||||
ompi_communicator_t *comm);
|
||||
@ -114,6 +121,7 @@ int mca_coll_base_comm_select(ompi_communicator_t *comm,
|
||||
ompi_list_item_t *item;
|
||||
const mca_coll_base_component_1_0_0_t *selected_component, *component;
|
||||
const mca_coll_base_module_1_0_0_t *selected_module;
|
||||
struct mca_coll_base_comm_t *selected_data;
|
||||
|
||||
/* Announce */
|
||||
|
||||
@ -232,11 +240,13 @@ int mca_coll_base_comm_select(ompi_communicator_t *comm,
|
||||
avail = (avail_coll_t *) item;
|
||||
selected_component = avail->ac_component;
|
||||
selected_module = avail->ac_module;
|
||||
selected_data = avail->ac_data;
|
||||
OBJ_RELEASE(avail);
|
||||
} else {
|
||||
using_basic = true;
|
||||
selected_component = mca_coll_base_basic_component;
|
||||
selected_module = comm->c_coll_basic_module;
|
||||
selected_data = comm->c_coll_basic_data;
|
||||
}
|
||||
#else
|
||||
/* JMS CONTINUE HERE */
|
||||
@ -251,7 +261,7 @@ int mca_coll_base_comm_select(ompi_communicator_t *comm,
|
||||
item = ompi_list_remove_first(selectable)) {
|
||||
avail = (avail_coll_t *) item;
|
||||
component = avail->ac_component;
|
||||
unquery(component, comm);
|
||||
unquery(component, comm, avail->ac_data);
|
||||
OBJ_RELEASE(avail);
|
||||
}
|
||||
OBJ_RELEASE(selectable);
|
||||
@ -262,6 +272,7 @@ int mca_coll_base_comm_select(ompi_communicator_t *comm,
|
||||
it. */
|
||||
|
||||
comm->c_coll_selected_component = selected_component;
|
||||
comm->c_coll_selected_data = selected_data;
|
||||
if (!using_basic) {
|
||||
comm->c_coll = *selected_module;
|
||||
replace_null_with_basic(comm);
|
||||
@ -309,6 +320,7 @@ static ompi_list_t *check_components(ompi_list_t *components,
|
||||
bool want_to_check;
|
||||
ompi_list_t *selectable;
|
||||
avail_coll_t *avail, *avail2;
|
||||
struct mca_coll_base_comm_t *data;
|
||||
|
||||
/* Make a list of the components that query successfully */
|
||||
|
||||
@ -341,7 +353,7 @@ static ompi_list_t *check_components(ompi_list_t *components,
|
||||
so */
|
||||
|
||||
if (want_to_check) {
|
||||
priority = check_one_component(comm, component, &module);
|
||||
priority = check_one_component(comm, component, &module, &data);
|
||||
if (priority > 0) {
|
||||
|
||||
/* We have a component that indicated that it wants to run by
|
||||
@ -351,6 +363,7 @@ static ompi_list_t *check_components(ompi_list_t *components,
|
||||
avail->ac_priority = priority;
|
||||
avail->ac_component = (mca_coll_base_component_1_0_0_t *) component;
|
||||
avail->ac_module = module;
|
||||
avail->ac_data = data;
|
||||
|
||||
/* Put this item on the list in priority order (highest
|
||||
priority first). Should it go first? */
|
||||
@ -398,12 +411,13 @@ static ompi_list_t *check_components(ompi_list_t *components,
|
||||
*/
|
||||
static int check_one_component(ompi_communicator_t *comm,
|
||||
const mca_base_component_t *component,
|
||||
const mca_coll_base_module_1_0_0_t **module)
|
||||
const mca_coll_base_module_1_0_0_t **module,
|
||||
struct mca_coll_base_comm_t **data)
|
||||
{
|
||||
int err;
|
||||
int priority = -1;
|
||||
|
||||
err = query(component, comm, &priority, module);
|
||||
err = query(component, comm, &priority, module, data);
|
||||
|
||||
if (OMPI_SUCCESS == err) {
|
||||
priority = (priority < 100) ? priority : 100;
|
||||
@ -432,7 +446,8 @@ static int check_one_component(ompi_communicator_t *comm,
|
||||
*/
|
||||
static int query(const mca_base_component_t *component,
|
||||
ompi_communicator_t *comm,
|
||||
int *priority, const mca_coll_base_module_1_0_0_t **module)
|
||||
int *priority, const mca_coll_base_module_1_0_0_t **module,
|
||||
struct mca_coll_base_comm_t **data)
|
||||
{
|
||||
/* coll v1.0.0 */
|
||||
|
||||
@ -443,7 +458,7 @@ static int query(const mca_base_component_t *component,
|
||||
const mca_coll_base_component_1_0_0_t *coll100 =
|
||||
(mca_coll_base_component_1_0_0_t *) component;
|
||||
|
||||
return query_1_0_0(coll100, comm, priority, module);
|
||||
return query_1_0_0(coll100, comm, priority, module, data);
|
||||
}
|
||||
|
||||
/* Unknown coll API version -- return error */
|
||||
@ -454,13 +469,14 @@ static int query(const mca_base_component_t *component,
|
||||
|
||||
static int query_1_0_0(const mca_coll_base_component_1_0_0_t *component,
|
||||
ompi_communicator_t *comm, int *priority,
|
||||
const mca_coll_base_module_1_0_0_t **module)
|
||||
const mca_coll_base_module_1_0_0_t **module,
|
||||
struct mca_coll_base_comm_t **data)
|
||||
{
|
||||
const mca_coll_base_module_1_0_0_t *ret;
|
||||
|
||||
/* There's currently no need for conversion */
|
||||
|
||||
ret = component->collm_comm_query(comm, priority);
|
||||
ret = component->collm_comm_query(comm, priority, data);
|
||||
if (NULL != ret) {
|
||||
*module = ret;
|
||||
return OMPI_SUCCESS;
|
||||
@ -475,7 +491,8 @@ static int query_1_0_0(const mca_coll_base_component_1_0_0_t *component,
|
||||
**************************************************************************/
|
||||
|
||||
static void unquery(const mca_coll_base_component_1_0_0_t *component,
|
||||
ompi_communicator_t *comm)
|
||||
ompi_communicator_t *comm,
|
||||
struct mca_coll_base_comm_t *data)
|
||||
{
|
||||
if (1 == component->collm_version.mca_major_version &&
|
||||
0 == component->collm_version.mca_minor_version &&
|
||||
@ -483,7 +500,7 @@ static void unquery(const mca_coll_base_component_1_0_0_t *component,
|
||||
const mca_coll_base_component_1_0_0_t *coll100 =
|
||||
(mca_coll_base_component_1_0_0_t *) component;
|
||||
|
||||
unquery_1_0_0(coll100, comm);
|
||||
unquery_1_0_0(coll100, comm, data);
|
||||
}
|
||||
|
||||
/* There's no way to have a version that we don't recognize here --
|
||||
@ -492,9 +509,12 @@ static void unquery(const mca_coll_base_component_1_0_0_t *component,
|
||||
|
||||
|
||||
static void unquery_1_0_0(const mca_coll_base_component_1_0_0_t *component,
|
||||
ompi_communicator_t *comm)
|
||||
ompi_communicator_t *comm,
|
||||
struct mca_coll_base_comm_t *data)
|
||||
{
|
||||
component->collm_comm_unquery(comm);
|
||||
if (NULL != component->collm_comm_unquery) {
|
||||
component->collm_comm_unquery(comm, data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -536,16 +556,18 @@ static int query_basic(ompi_communicator_t *comm)
|
||||
{
|
||||
int ret;
|
||||
int priority;
|
||||
struct mca_coll_base_comm_t *data;
|
||||
|
||||
ret = OMPI_SUCCESS;
|
||||
if (NULL == comm->c_coll_basic_module) {
|
||||
ret = query((mca_base_component_t *) mca_coll_base_basic_component, comm,
|
||||
&priority, &comm->c_coll_basic_module);
|
||||
&priority, &comm->c_coll_basic_module, &data);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
comm->c_coll_basic_module = NULL;
|
||||
return ret;
|
||||
}
|
||||
|
||||
comm->c_coll_basic_data = data;
|
||||
ret = module_init(comm->c_coll_basic_module, comm);
|
||||
}
|
||||
return ret;
|
||||
|
@ -45,8 +45,8 @@ OMPI_COMP_EXPORT extern int mca_coll_basic_priority_param;
|
||||
int mca_coll_basic_init_query(bool *allow_multi_user_threads,
|
||||
bool *have_hidden_threads);
|
||||
const struct mca_coll_base_module_1_0_0_t *
|
||||
mca_coll_basic_comm_query(struct ompi_communicator_t *comm, int *priority);
|
||||
int mca_coll_basic_comm_unquery(struct ompi_communicator_t *comm);
|
||||
mca_coll_basic_comm_query(struct ompi_communicator_t *comm, int *priority,
|
||||
struct mca_coll_base_comm_t **data);
|
||||
|
||||
const struct mca_coll_base_module_1_0_0_t *
|
||||
mca_coll_basic_module_init(struct ompi_communicator_t *comm);
|
||||
|
@ -142,7 +142,8 @@ int mca_coll_basic_init_query(bool *allow_multi_user_threads,
|
||||
* priority we want to return.
|
||||
*/
|
||||
const mca_coll_base_module_1_0_0_t *
|
||||
mca_coll_basic_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
mca_coll_basic_comm_query(struct ompi_communicator_t *comm, int *priority,
|
||||
struct mca_coll_base_comm_t **data)
|
||||
{
|
||||
if (OMPI_SUCCESS != mca_base_param_lookup_int(mca_coll_basic_priority_param,
|
||||
priority)) {
|
||||
@ -153,12 +154,8 @@ mca_coll_basic_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
algorithms. */
|
||||
|
||||
if (OMPI_COMM_IS_INTER(comm)) {
|
||||
/* Intercommunicators */
|
||||
return &inter_linear;
|
||||
} else {
|
||||
|
||||
/* Intracommunicators */
|
||||
|
||||
if (ompi_comm_size(comm) <= mca_coll_base_crossover) {
|
||||
return &intra_linear;
|
||||
} else {
|
||||
@ -181,19 +178,13 @@ mca_coll_basic_module_init(struct ompi_communicator_t *comm)
|
||||
|
||||
/* Allocate the data that hangs off the communicator */
|
||||
|
||||
comm->c_coll_basic_data = NULL;
|
||||
|
||||
if (OMPI_COMM_IS_INTER(comm)) {
|
||||
/* Intercommunicators */
|
||||
/* JMS Continue here */
|
||||
size = ompi_comm_remote_size(comm);
|
||||
} else {
|
||||
/* Intracommunicators */
|
||||
/* JMS Continue here */
|
||||
size = ompi_comm_size(comm);
|
||||
}
|
||||
data = malloc(sizeof(struct mca_coll_base_comm_t) +
|
||||
(sizeof(ompi_request_t *) * size * 2));
|
||||
(sizeof(ompi_request_t *) * size * 2));
|
||||
|
||||
if (NULL == data) {
|
||||
return NULL;
|
||||
@ -201,9 +192,6 @@ mca_coll_basic_module_init(struct ompi_communicator_t *comm)
|
||||
data->mccb_reqs = (ompi_request_t **) (data + 1);
|
||||
data->mccb_num_reqs = size * 2;
|
||||
|
||||
/* Initialize the communicator */
|
||||
|
||||
|
||||
/* All done */
|
||||
|
||||
comm->c_coll_basic_data = data;
|
||||
@ -228,14 +216,6 @@ int mca_coll_basic_module_finalize(struct ompi_communicator_t *comm)
|
||||
comm->c_coll_basic_data->mccb_num_reqs = 0;
|
||||
#endif
|
||||
|
||||
if (OMPI_COMM_IS_INTER(comm)) {
|
||||
/* Intercommunicators */
|
||||
/* JMS Continue here */
|
||||
} else {
|
||||
/* Intracommunicators */
|
||||
/* JMS Continue here */
|
||||
}
|
||||
|
||||
/* All done */
|
||||
|
||||
free(comm->c_coll_basic_data);
|
||||
|
@ -22,6 +22,13 @@
|
||||
#include "mca/base/base.h"
|
||||
|
||||
|
||||
/*
|
||||
* Forward declaration
|
||||
*/
|
||||
|
||||
struct mca_coll_base_comm_t;
|
||||
|
||||
|
||||
/*
|
||||
* Coll component function typedefs
|
||||
*/
|
||||
@ -30,9 +37,11 @@ typedef int (*mca_coll_base_component_init_query_fn_t)
|
||||
(bool *allow_multi_user_threads, bool *have_hidden_threads);
|
||||
typedef const struct mca_coll_base_module_1_0_0_t *
|
||||
(*mca_coll_base_component_comm_query_1_0_0_fn_t)
|
||||
(struct ompi_communicator_t *comm, int *priority);
|
||||
(struct ompi_communicator_t *comm, int *priority,
|
||||
struct mca_coll_base_comm_t **data);
|
||||
typedef int (*mca_coll_base_component_comm_unquery_fn_t)
|
||||
(struct ompi_communicator_t *comm);
|
||||
(struct ompi_communicator_t *comm,
|
||||
struct mca_coll_base_comm_t *data);
|
||||
|
||||
/*
|
||||
* Coll module function typedefs
|
||||
|
@ -45,8 +45,8 @@ OMPI_COMP_EXPORT extern int mca_coll_self_priority_param;
|
||||
int mca_coll_self_init_query(bool *allow_multi_user_threads,
|
||||
bool *have_hidden_threads);
|
||||
const struct mca_coll_base_module_1_0_0_t *
|
||||
mca_coll_self_comm_query(struct ompi_communicator_t *comm, int *priority);
|
||||
int mca_coll_self_comm_unquery(struct ompi_communicator_t *comm);
|
||||
mca_coll_self_comm_query(struct ompi_communicator_t *comm, int *priority,
|
||||
struct mca_coll_base_comm_t **data);
|
||||
|
||||
const struct mca_coll_base_module_1_0_0_t *
|
||||
mca_coll_self_module_init(struct ompi_communicator_t *comm);
|
||||
|
@ -42,6 +42,7 @@ int mca_coll_self_priority_param = -1;
|
||||
*/
|
||||
static int self_open(void);
|
||||
|
||||
|
||||
/*
|
||||
* Instantiate the public struct with all of our public information
|
||||
* and pointers to our public functions in it
|
||||
@ -83,7 +84,7 @@ const mca_coll_base_component_1_0_0_t mca_coll_self_component = {
|
||||
|
||||
mca_coll_self_init_query,
|
||||
mca_coll_self_comm_query,
|
||||
NULL
|
||||
NULL,
|
||||
};
|
||||
|
||||
|
||||
@ -97,3 +98,4 @@ static int self_open(void)
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -78,7 +78,8 @@ int mca_coll_self_init_query(bool *allow_multi_user_threads,
|
||||
* priority we want to return.
|
||||
*/
|
||||
const mca_coll_base_module_1_0_0_t *
|
||||
mca_coll_self_comm_query(struct ompi_communicator_t *comm, int *priority)
|
||||
mca_coll_self_comm_query(struct ompi_communicator_t *comm, int *priority,
|
||||
struct mca_coll_base_comm_t **data)
|
||||
{
|
||||
/* We only work on intracommunicators of size 1 */
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user