1
1

Improve the scalability of the modex operation and fix a bug reported by Tim P

The bug was a race condition in the barrier operation that caused the barrier in MPI_Finalize to fail on very short programs.

Scalaiblity was improved by using the daemons to aggregate modex and barrier messages before sending them to the rank=0 proc. Improvement is proportional to ppn, of course, but there really wasn't a scaling problem at low ppn anyway. This modification also paves the way for better allgather operations since now all the data for each node is sitting at the daemon level, and the daemons are now aware that a collective operation on the OOB is underway (so they -can- participate in a collective of their own to support it).

Also added better diagnostics to map out the timing associated with MPI_Init - turned on by -mca orte_timing 1.

This commit was SVN r17988.
Этот коммит содержится в:
Ralph Castain 2008-03-27 15:17:53 +00:00
родитель cf40674369
Коммит 6166278e18
15 изменённых файлов: 345 добавлений и 83 удалений

Просмотреть файл

@ -293,7 +293,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
}
/* check for timing request - get stop time and report elapsed time if so */
if (timing) {
if (timing && 0 == ORTE_PROC_MY_NAME->vpid) {
gettimeofday(&ompistop, NULL);
opal_output(0, "ompi_mpi_init [%ld]: time from start to completion of orte_init %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
@ -527,7 +527,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
}
/* check for timing request - get stop time and report elapsed time if so */
if (timing) {
if (timing && 0 == ORTE_PROC_MY_NAME->vpid) {
gettimeofday(&ompistop, NULL);
opal_output(0, "ompi_mpi_init[%ld]: time from completion of orte_init to modex %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
@ -544,7 +544,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
goto error;
}
if (timing) {
if (timing && 0 == ORTE_PROC_MY_NAME->vpid) {
gettimeofday(&ompistop, NULL);
opal_output(0, "ompi_mpi_init[%ld]: time to execute modex %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
@ -648,7 +648,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
/* check for timing request - get stop time and report elapsed
time if so, then start the clock again */
if (timing) {
if (timing && 0 == ORTE_PROC_MY_NAME->vpid) {
gettimeofday(&ompistop, NULL);
opal_output(0, "ompi_mpi_init[%ld]: time from stage 2 cast to complete oob wireup %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
@ -791,7 +791,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
ompi_wait_for_debugger();
/* check for timing request - get stop time and report elapsed time if so */
if (timing) {
if (timing && 0 == ORTE_PROC_MY_NAME->vpid) {
gettimeofday(&ompistop, NULL);
opal_output(0, "ompi_mpi_init[%ld]: time from oob wireup to complete mpi_init %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,

Просмотреть файл

@ -42,9 +42,17 @@
#include "orte/mca/grpcomm/base/base.h"
static bool allgather_failed;
static bool allgather_timer;
static orte_std_cntr_t allgather_num_recvd;
static opal_buffer_t *allgather_buf;
static void allgather_timer_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
allgather_timer = true;
}
static void allgather_server_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
@ -97,29 +105,46 @@ static void allgather_client_recv(int status, orte_process_name_t* sender,
int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
orte_process_name_t name;
int rc;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
struct timeval ompistart, ompistop;
opal_buffer_t coll;
orte_rml_tag_t target_tag=ORTE_RML_TAG_ALLGATHER_SERVER;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: entering allgather",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* everything happens within my jobid */
name.jobid = ORTE_PROC_MY_NAME->jobid;
/* everyone sends data to their local daemon */
OBJ_CONSTRUCT(&coll, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
/* tell the daemon where it is eventually to be delivered */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &target_tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
/* add our data to it */
opal_dss.copy_payload(&coll, sbuf);
/* send to local daemon */
if (0 > orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &coll, ORTE_RML_TAG_DAEMON, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&coll);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_DESTRUCT(&coll);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather buffer sent",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/*** RANK != 0 ***/
if (0 != ORTE_PROC_MY_NAME->vpid) {
/* everyone but rank=0 sends data */
name.vpid = 0;
if (0 > orte_rml.send_buffer(&name, sbuf, ORTE_RML_TAG_ALLGATHER_SERVER, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather buffer sent",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup the buffer that will recv the results */
allgather_buf = OBJ_NEW(opal_buffer_t);
@ -156,6 +181,23 @@ int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
"%s allgather buffer received",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_timing) {
/* if we are rank=N, send a message back to indicate
* the xcast completed for timing purposes
*/
opal_buffer_t buf;
orte_std_cntr_t i=0;
orte_process_name_t name;
if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) {
name.jobid = ORTE_PROC_MY_NAME->jobid;
name.vpid = 0;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */
orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_ALLGATHER_TIMER,0);
OBJ_DESTRUCT(&buf);
}
}
return ORTE_SUCCESS;
}
@ -170,11 +212,9 @@ int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
return rc;
}
/* put my own information into the outgoing buffer */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* my info will be included in the collected buffers as part of
* the daemon's collective operation
*/
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather collecting buffers",
@ -195,7 +235,7 @@ int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
return rc;
}
ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, (orte_std_cntr_t)orte_process_info.num_procs-1);
ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, orte_process_info.num_daemons);
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER))) {
@ -221,10 +261,6 @@ int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
return ORTE_ERROR;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather xcasting collected data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* copy the received info to the caller's buffer */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) {
ORTE_ERROR_LOG(rc);
@ -233,12 +269,24 @@ int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
}
OBJ_RELEASE(allgather_buf);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather xcasting collected data - buffer size %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)rbuf->bytes_used));
/* xcast the results */
orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, rbuf, ORTE_RML_TAG_ALLGATHER_CLIENT);
if (orte_timing) {
/* setup a receive to hear when the rank=N proc has received the data
* release - in most xcast schemes, this will always be the final recvr
*/
allgather_timer = false;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_TIMER,
ORTE_RML_NON_PERSISTENT, allgather_timer_recv, NULL);
ORTE_PROGRESSED_WAIT(allgather_timer, 0, 1);
gettimeofday(&ompistop, NULL);
opal_output(0, "allgather[%ld]: time to send outbound data %ld usec",
opal_output(0, "allgather[%ld]: time to complete outbound xcast %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
(ompistop.tv_usec - ompistart.tv_usec)));

Просмотреть файл

@ -45,6 +45,7 @@
static orte_std_cntr_t barrier_num_recvd;
static bool barrier_failed;
static bool barrier_timer;
static void barrier_server_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
@ -71,11 +72,19 @@ static void barrier_recv(int status, orte_process_name_t* sender,
++barrier_num_recvd;
}
static void barrier_timer_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
barrier_timer = true;
}
int orte_grpcomm_base_barrier(void)
{
orte_process_name_t name;
orte_std_cntr_t i=0;
opal_buffer_t buf;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
orte_rml_tag_t target_tag=ORTE_RML_TAG_BARRIER_SERVER;
int rc;
struct timeval ompistart, ompistop;
@ -83,27 +92,35 @@ int orte_grpcomm_base_barrier(void)
"%s grpcomm: entering barrier",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* everything happens within the same jobid */
name.jobid = ORTE_PROC_MY_NAME->jobid;
/* everyone sends barrier to local daemon */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* tell the daemon where it is eventually to be delivered */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &target_tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */
/* send to local daemon */
if (0 > orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&buf);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_DESTRUCT(&buf);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s barrier sent",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/*** RANK != 0 ***/
if (0 != ORTE_PROC_MY_NAME->vpid) {
/* All non-root send & receive near-zero-length message. */
name.vpid = 0;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s sending barrier",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_BARRIER_SERVER,0);
if (rc < 0) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_DESTRUCT(&buf);
/* now receive the release from rank=0. Be sure to do this in
* a manner that allows us to return without being in a recv!
*/
@ -121,6 +138,20 @@ int orte_grpcomm_base_barrier(void)
"%s received barrier release",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_timing) {
/* if we are rank=N, send a message back to indicate
* the xcast completed for timing purposes
*/
orte_process_name_t name;
if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) {
name.jobid = ORTE_PROC_MY_NAME->jobid;
name.vpid = 0;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */
orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_BARRIER_TIMER,0);
OBJ_DESTRUCT(&buf);
}
}
return ORTE_SUCCESS;
}
@ -141,8 +172,14 @@ int orte_grpcomm_base_barrier(void)
return rc;
}
ORTE_PROGRESSED_WAIT(barrier_failed, barrier_num_recvd, (orte_std_cntr_t)orte_process_info.num_procs-1);
ORTE_PROGRESSED_WAIT(barrier_failed, barrier_num_recvd, orte_process_info.num_daemons);
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (orte_timing) {
gettimeofday(&ompistop, NULL);
opal_output(0, "barrier[%ld]: time to collect inbound data %ld usec",
@ -171,8 +208,15 @@ int orte_grpcomm_base_barrier(void)
OBJ_DESTRUCT(&buf);
if (orte_timing) {
/* setup a receive to hear when the rank=N proc has received the barrier
* release - in most xcast schemes, this will always be the final recvr
*/
barrier_timer = false;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_TIMER,
ORTE_RML_NON_PERSISTENT, barrier_timer_recv, NULL);
ORTE_PROGRESSED_WAIT(barrier_timer, 0, 1);
gettimeofday(&ompistop, NULL);
opal_output(0, "barrier[%ld]: time to send outbound data %ld usec",
opal_output(0, "barrier[%ld]: time to complete outbound xcast %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
(ompistop.tv_usec - ompistart.tv_usec)));

Просмотреть файл

@ -115,7 +115,6 @@ static int xcast(orte_jobid_t job,
orte_rml_tag_t tag)
{
int rc = ORTE_SUCCESS;
struct timeval start, stop;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast sent to job %s tag %ld",
@ -127,10 +126,6 @@ static int xcast(orte_jobid_t job,
return ORTE_SUCCESS;
}
if (orte_timing) {
gettimeofday(&start, NULL);
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast: num_procs %ld linear xover: %ld binomial xover: %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -175,12 +170,6 @@ static int xcast(orte_jobid_t job,
DONE:
if (orte_timing) {
gettimeofday(&stop, NULL);
opal_output(0, "%s grpcomm:xcast: time %ld usec", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long int)((stop.tv_sec - start.tv_sec)*1000000 +
(stop.tv_usec - start.tv_usec)));
}
return rc;
}

Просмотреть файл

@ -537,6 +537,13 @@ static int odls_base_default_setup_fork(orte_app_context_t *context,
opal_setenv(param, orte_process_info.my_daemon_uri, true, environ_copy);
free(param);
/* pass the #daemons to the local proc for collective ops */
param = mca_base_param_environ_variable("orte","num","daemons");
asprintf(&param2, "%ld", (long)orte_process_info.num_procs);
opal_setenv(param, param2, true, environ_copy);
free(param);
free(param2);
/* pass the hnp's contact info to the local proc in case it
* needs it
*/
@ -1865,3 +1872,139 @@ CLEANUP:
return rc;
}
static bool all_children_participated(orte_jobid_t job)
{
opal_list_item_t *item;
orte_odls_child_t *child;
/* the thread is locked elsewhere - don't try to do it again here */
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) {
/* if this child has *not* participated yet, return false */
if (!child->coll_recvd) {
return false;
}
}
}
/* if we get here, then everyone in the job has participated - cleanout
* their flags so they can do this again!
*/
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* is this child part of the specified job? */
if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) {
/* clear flag */
child->coll_recvd = false;
}
}
return true;
}
static opal_buffer_t *collection_bucket=NULL;
static orte_rml_tag_t collective_target_tag;
int orte_odls_base_default_collect_data(orte_process_name_t *proc,
opal_buffer_t *buf)
{
opal_list_item_t *item;
orte_odls_child_t *child;
int rc;
bool found=false;
orte_process_name_t collector;
orte_std_cntr_t n;
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
/* find this child */
if (OPAL_EQUAL == opal_dss.compare(proc, child->name, ORTE_NAME)) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: collecting data from child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(child->name)));
found = true;
break;
}
}
/* if it wasn't found on the list, then we need to add it - must have
* come from a singleton
*/
if (!found) {
child = OBJ_NEW(orte_odls_child_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy((void**)&child->name, proc, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
opal_list_append(&orte_odls_globals.children, &child->super);
/* we don't know any other info about the child, so just indicate it's
* alive
*/
child->alive = true;
}
/* unpack the target tag for this collective */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &collective_target_tag, &n, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* if the collection bucket isn't initialized, do so now */
if (NULL == collection_bucket) {
collection_bucket = OBJ_NEW(opal_buffer_t);
}
/* store the provided data */
opal_dss.copy_payload(collection_bucket, buf);
/* flag this proc as having participated */
child->coll_recvd = true;
/* now check to see if everyone in this job has participated */
if (all_children_participated(proc->jobid)) {
/* once everyone participates, send the collection
* bucket to the rank=0 proc of this job
*/
collector.jobid = proc->jobid;
collector.vpid = 0;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: sending collection bucket to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&collector)));
/* go ahead and send it */
if (0 > (rc = orte_rml.send_buffer(&collector, collection_bucket, collective_target_tag, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(collection_bucket);
goto CLEANUP;
}
OBJ_RELEASE(collection_bucket);
}
CLEANUP:
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -59,6 +59,7 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr)
ptr->pid = 0;
ptr->app_idx = -1;
ptr->alive = false;
ptr->coll_recvd = false;
/* set the default state to "failed to start" so
* we can correctly report should something
* go wrong during launch

Просмотреть файл

@ -57,6 +57,7 @@ typedef struct orte_odls_child_t {
pid_t pid; /* local pid of the proc */
orte_std_cntr_t app_idx; /* index of the app_context for this proc */
bool alive; /* is this proc alive? */
bool coll_recvd; /* collective operation recvd */
orte_proc_state_t state; /* the state of the process */
orte_exit_code_t exit_code; /* process exit code */
unsigned long cpu_set;
@ -147,6 +148,11 @@ ORTE_DECLSPEC int orte_odls_base_default_require_sync(orte_process_name_t *proc,
*/
ORTE_DECLSPEC int orte_odls_base_preload_files_app_context(orte_app_context_t* context);
/*
* Collect data to support collective operations across the procs
*/
ORTE_DECLSPEC int orte_odls_base_default_collect_data(orte_process_name_t *proc, opal_buffer_t *buf);
END_C_DECLS
#endif

Просмотреть файл

@ -90,7 +90,8 @@ orte_odls_base_module_t orte_odls_default_module = {
orte_odls_default_kill_local_procs,
orte_odls_default_signal_local_procs,
orte_odls_base_default_deliver_message,
orte_odls_base_default_require_sync
orte_odls_base_default_require_sync,
orte_odls_base_default_collect_data
};
static bool odls_default_child_died(pid_t pid, unsigned int timeout, int *exit_status)

Просмотреть файл

@ -39,12 +39,11 @@
#include "orte/mca/odls/odls_types.h"
BEGIN_C_DECLS
/*
* odls module functions
*/
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/*
* Construct a buffer for use in adding local processes
@ -85,6 +84,12 @@ typedef int (*orte_odls_base_module_deliver_message_fn_t)(orte_jobid_t job, opal
*/
typedef int (*orte_odls_base_module_require_sync_fn_t)(orte_process_name_t *proc, opal_buffer_t *buffer);
/**
* Collect data as part of a collective operation by the procs
*/
typedef int (*orte_odls_base_module_collect_data_fn_t)(orte_process_name_t *proc, opal_buffer_t *buffer);
/**
* pls module version 1.3.0
*/
@ -95,6 +100,7 @@ struct orte_odls_base_module_1_3_0_t {
orte_odls_base_module_signal_local_process_fn_t signal_local_procs;
orte_odls_base_module_deliver_message_fn_t deliver_message;
orte_odls_base_module_require_sync_fn_t require_sync;
orte_odls_base_module_collect_data_fn_t collect_data;
};
/** shorten orte_odls_base_module_1_3_0_t declaration */
@ -152,8 +158,6 @@ typedef orte_odls_base_component_1_3_0_t orte_odls_base_component_t;
*/
ORTE_DECLSPEC extern orte_odls_base_module_t orte_odls; /* holds selected module's function pointers */
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
END_C_DECLS
#endif /* MCA_ODLS_H */

Просмотреть файл

@ -55,6 +55,9 @@ typedef uint8_t orte_daemon_cmd_flag_t;
#define ORTE_DAEMON_TERMINATE_JOB_CMD (orte_daemon_cmd_flag_t) 19
#define ORTE_DAEMON_HALT_VM_CMD (orte_daemon_cmd_flag_t) 20
/* collective-based cmds */
#define ORTE_DAEMON_COLL_CMD (orte_daemon_cmd_flag_t) 21
END_C_DECLS
#endif

Просмотреть файл

@ -390,6 +390,21 @@ static int plm_tm_launch_job(orte_job_t *jdata)
goto cleanup;
}
/* check for timing request - get stop time for launch completion and report */
if (orte_timing) {
if (0 != gettimeofday(&completionstop, NULL)) {
opal_output(0, "plm_tm: could not obtain completion stop time");
} else {
deltat = (completionstop.tv_sec - jobstart.tv_sec)*1000000 +
(completionstop.tv_usec - completionstop.tv_usec);
opal_output(0, "plm_tm: time to launch/wireup all daemons: %d usec", deltat);
}
opal_output(0, "plm_tm: Launch statistics:");
opal_output(0, "plm_tm: Average time to launch an orted: %f usec", avgtime);
opal_output(0, "plm_tm: Max time to launch an orted: %d usec at iter %d", maxtime, maxiter);
opal_output(0, "plm_tm: Min time to launch an orted: %d usec at iter %d", mintime, miniter);
}
launch_apps:
if (ORTE_SUCCESS != (rc = orte_plm_base_launch_apps(jdata->jobid))) {
OPAL_OUTPUT_VERBOSE((1, orte_plm_globals.output,
@ -402,21 +417,6 @@ launch_apps:
/* if we get here, then everything launched okay - record that fact */
failed_launch = false;
/* check for timing request - get stop time for launch completion and report */
if (orte_timing) {
if (0 != gettimeofday(&completionstop, NULL)) {
opal_output(0, "plm_tm: could not obtain completion stop time");
} else {
deltat = (launchstop.tv_sec - launchstart.tv_sec)*1000000 +
(launchstop.tv_usec - launchstart.tv_usec);
opal_output(0, "plm_tm: launch completion required %d usec", deltat);
}
opal_output(0, "plm_tm: Launch statistics:");
opal_output(0, "plm_tm: Average time to launch an orted: %f usec", avgtime);
opal_output(0, "plm_tm: Max time to launch an orted: %d usec at iter %d", maxtime, maxiter);
opal_output(0, "plm_tm: Min time to launch an orted: %d usec at iter %d", mintime, miniter);
}
cleanup:
if (NULL != argv) {

Просмотреть файл

@ -93,6 +93,11 @@ BEGIN_C_DECLS
#define ORTE_RML_TAG_DATA_SERVER 27
#define ORTE_RML_TAG_DATA_CLIENT 28
/* timing related */
#define ORTE_RML_TAG_BARRIER_TIMER 29
#define ORTE_RML_TAG_ALLGATHER_TIMER 30
#define ORTE_RML_TAG_MAX 100

Просмотреть файл

@ -448,6 +448,17 @@ static int process_commands(orte_process_name_t* sender,
OBJ_RELEASE(relay_msg);
break;
/**** COLLECTIVE DATA COMMAND ****/
case ORTE_DAEMON_COLL_CMD:
if (orte_debug_daemons_flag) {
opal_output(0, "%s orted_cmd: received collective data cmd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
if (ORTE_SUCCESS != (ret = orte_odls.collect_data(sender, buffer))) {
ORTE_ERROR_LOG(ret);
}
break;
/**** EXIT COMMAND ****/
case ORTE_DAEMON_EXIT_CMD:
if (orte_process_info.hnp) {

Просмотреть файл

@ -41,6 +41,7 @@ ORTE_DECLSPEC orte_proc_info_t orte_process_info = {
/* .my_name = */ {ORTE_JOBID_INVALID, ORTE_VPID_INVALID},
/* .my_daemon = */ {ORTE_JOBID_INVALID, ORTE_VPID_INVALID},
/* .my_daemon_uri = */ NULL,
/* .num_daemons = */ 0,
/* .my_hnp = */ {0, 0},
/* .my_hnp_uri = */ NULL,
/* .hnp_pid = */ 0,
@ -119,6 +120,11 @@ int orte_proc_info(void)
(NULL == orte_process_info.my_hnp_uri) ? "NULL" : orte_process_info.my_hnp_uri,
(NULL == orte_process_info.my_daemon_uri) ? "NULL" : orte_process_info.my_daemon_uri));
mca_base_param_reg_int_name("orte", "num_daemons",
"Number of daemons in system",
true, false, -1, &tmp);
orte_process_info.num_daemons = tmp;
mca_base_param_reg_int_name("orte", "app_num",
"Index of the app_context that defines this proc",
true, false, -1, &tmp);

Просмотреть файл

@ -48,6 +48,7 @@ struct orte_proc_info_t {
orte_process_name_t my_name; /**< My official process name */
orte_process_name_t my_daemon; /**< Name of my local daemon */
char *my_daemon_uri; /**< Contact info to local daemon */
orte_std_cntr_t num_daemons; /**< number of active daemons */
orte_process_name_t my_hnp; /**< Name of my hnp */
char *my_hnp_uri; /**< Contact info for my hnp */
pid_t hnp_pid; /**< hnp pid - used if singleton */