1
1

First stage in the move to a faster startup. Change the ORTE stage gate xcast into a binary tree broadcast (away from a linear broadcast). Also, removed the timing report in the gpr_proxy component that printed out the number of bytes in the compound command message as the answer was "not much" - reduces the clutter in the data.

This commit was SVN r12679.
Этот коммит содержится в:
Ralph Castain 2006-11-28 00:06:25 +00:00
родитель 652b91ee26
Коммит bc4e97a435
8 изменённых файлов: 140 добавлений и 175 удалений

Просмотреть файл

@ -150,8 +150,8 @@ int ompi_mpi_finalize(void)
/*
* Wait for everyone to get here
*/
if (ORTE_SUCCESS != (ret = orte_rml.xcast(NULL, NULL, 0, NULL,
orte_gpr.deliver_notify_msg, NULL))) {
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false,
NULL, orte_gpr.deliver_notify_msg))) {
ORTE_ERROR_LOG(ret);
return ret;
}
@ -291,8 +291,8 @@ int ompi_mpi_finalize(void)
* the RTE while the smr is trying to do the update - which causes
* an ugly race condition
*/
if (ORTE_SUCCESS != (ret = orte_rml.xcast(NULL, NULL, 0, NULL,
orte_gpr.deliver_notify_msg, NULL))) {
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false,
NULL, orte_gpr.deliver_notify_msg))) {
ORTE_ERROR_LOG(ret);
return ret;
}

Просмотреть файл

@ -523,9 +523,13 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
(ompistop.tv_usec - ompistart.tv_usec)));
}
/* FIRST BARRIER - WAIT FOR MSG FROM RMGR_PROC_STAGE_GATE_MGR TO ARRIVE */
if (ORTE_SUCCESS != (ret = orte_rml.xcast(NULL, NULL, 0, NULL,
orte_gpr.deliver_notify_msg, NULL))) {
/* FIRST BARRIER - WAIT FOR MSG FROM RMGR_PROC_STAGE_GATE_MGR TO ARRIVE.
* We pass a "process_first" flag of "true" to indicate that we need to
* process the STG1 message prior to sending it along the xcast chain
* as this message contains all the oob contact info we need!
*/
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, true,
NULL, orte_gpr.deliver_notify_msg))) {
ORTE_ERROR_LOG(ret);
error = "ompi_mpi_init: failed to see all procs register\n";
goto error;
@ -640,8 +644,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
/* Second barrier -- wait for message from
RMGR_PROC_STAGE_GATE_MGR to arrive */
if (ORTE_SUCCESS != (ret = orte_rml.xcast(NULL, NULL, 0, NULL,
orte_gpr.deliver_notify_msg, NULL))) {
if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false,
NULL, orte_gpr.deliver_notify_msg))) {
ORTE_ERROR_LOG(ret);
error = "ompi_mpi_init: failed to see all procs register\n";
goto error;

Просмотреть файл

@ -114,11 +114,6 @@ int orte_gpr_proxy_exec_compound_cmd(void)
OPAL_THREAD_LOCK(&orte_gpr_proxy_globals.wait_for_compound_mutex);
rc = ORTE_SUCCESS;
if (orte_gpr_proxy_globals.timing) {
opal_output(0, "gpr_proxy [%ld]: compound cmd size %lu", (long)ORTE_PROC_MY_NAME->vpid,
(unsigned long)orte_gpr_proxy_globals.compound_cmd->bytes_used);
}
if (0 > orte_rml.send_buffer(orte_process_info.gpr_replica, orte_gpr_proxy_globals.compound_cmd, ORTE_RML_TAG_GPR, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
rc = ORTE_ERR_COMM_FAILURE;

Просмотреть файл

@ -400,12 +400,10 @@ ORTE_DECLSPEC int mca_oob_recv_packed_nb(
* continuing to forward data along the distribution tree.
*/
ORTE_DECLSPEC int mca_oob_xcast(
orte_process_name_t* root,
orte_process_name_t* peers,
orte_std_cntr_t num_peers,
orte_buffer_t* buffer,
orte_gpr_trigger_cb_fn_t cbfunc);
ORTE_DECLSPEC int mca_oob_xcast(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_trigger_cb_fn_t cbfunc);
/*
* Callback on exception condition.

Просмотреть файл

@ -26,6 +26,7 @@
#include "opal/threads/condition.h"
#include "opal/util/output.h"
#include "opal/util/bit_ops.h"
#include "orte/util/proc_info.h"
#include "orte/dss/dss.h"
@ -38,139 +39,137 @@
/**
* A "broadcast-like" function over the specified set of peers.
* @param root The process acting as the root of the broadcast.
* @param peers The list of processes receiving the broadcast (excluding root).
* A "broadcast-like" function to a job's processes.
* @param jobid The job whose processes are to receive the message
* @param process_first Whether or not to process the message locally before sending it on
* @param buffer The data to broadcast - only significant at root.
* @param cbfunc Callback function on receipt of data - not significant at root.
*
* Note that the callback function is provided so that the data can be
* received and interpreted by the application prior to the broadcast
* continuing to forward data along the distribution tree.
*/
static opal_mutex_t xcastmutex;
static int xcast_bitmap, bitmap_save;
static bool bitmap_init = false;
struct mca_oob_xcast_t {
opal_object_t super;
opal_mutex_t mutex;
opal_condition_t cond;
orte_std_cntr_t counter;
};
typedef struct mca_oob_xcast_t mca_oob_xcast_t;
static void mca_oob_xcast_construct(mca_oob_xcast_t* xcast)
{
OBJ_CONSTRUCT(&xcast->mutex, opal_mutex_t);
OBJ_CONSTRUCT(&xcast->cond, opal_condition_t);
}
static void mca_oob_xcast_destruct(mca_oob_xcast_t* xcast)
{
OBJ_DESTRUCT(&xcast->mutex);
OBJ_DESTRUCT(&xcast->cond);
}
static OBJ_CLASS_INSTANCE(
mca_oob_xcast_t,
opal_object_t,
mca_oob_xcast_construct,
mca_oob_xcast_destruct);
static void mca_oob_xcast_cb(int status, orte_process_name_t* peer, orte_buffer_t* buffer, int tag, void* cbdata)
{
mca_oob_xcast_t* xcast = (mca_oob_xcast_t*)cbdata;
OPAL_THREAD_LOCK(&xcast->mutex);
if(--xcast->counter == 0) {
opal_condition_signal(&xcast->cond);
}
OPAL_THREAD_UNLOCK(&xcast->mutex);
}
int mca_oob_xcast(
orte_process_name_t* root,
orte_process_name_t* peers,
orte_std_cntr_t num_peers,
orte_buffer_t* buffer,
orte_gpr_trigger_cb_fn_t cbfunc)
int mca_oob_xcast(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_trigger_cb_fn_t cbfunc)
{
orte_std_cntr_t i;
int rc;
int tag = ORTE_RML_TAG_XCAST;
int status;
orte_proc_state_t state;
struct timeval sendstart, sendstop;
int peer, size, rank, hibit, mask;
orte_buffer_t rbuf, sbuf;
orte_gpr_notify_message_t *msg;
orte_process_name_t target;
/* check to see if I am the root process name */
if(NULL != root && ORTE_EQUAL == orte_dss.compare(root, orte_process_info.my_name, ORTE_NAME)) {
mca_oob_xcast_t *xcast = OBJ_NEW(mca_oob_xcast_t);
xcast->counter = num_peers;
/* check for timing request - if so, we want to printout the size of the message being sent */
if (orte_oob_base_timing) {
opal_output(0, "oob_xcast: message size is %lu bytes", (unsigned long)buffer->bytes_used);
gettimeofday(&sendstart, NULL);
}
for(i=0; i<num_peers; i++) {
/* check status of peer to ensure they are alive */
if (ORTE_SUCCESS != (rc = orte_smr.get_proc_state(&state, &status, peers+i))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (state != ORTE_PROC_STATE_TERMINATED && state != ORTE_PROC_STATE_ABORTED) {
rc = mca_oob_send_packed_nb(peers+i, buffer, tag, 0, mca_oob_xcast_cb, xcast);
if (rc < 0) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
}
/* check to see if there is something to send - this is only true on the HNP end.
* However, we cannot just test to see if we are the HNP since, if we are a singleton,
* we are the HNP *and* we still need to handle both ends of the xcast
*/
if (NULL != buffer) {
/* this is the HNP end, so it starts the procedure. Accordingly, it sends its
* message to the first process in the job in the peer list, which takes it from there
*/
OBJ_CONSTRUCT(&xcastmutex, opal_mutex_t);
OPAL_THREAD_LOCK(&xcastmutex);
/* wait for all non-blocking operations to complete */
OPAL_THREAD_LOCK(&xcast->mutex);
while(xcast->counter > 0) {
opal_condition_wait(&xcast->cond, &xcast->mutex);
}
/* check for timing request - if so, report comm time */
if (orte_oob_base_timing) {
gettimeofday(&sendstop, NULL);
opal_output(0, "oob_xcast: time to send was %ld usec",
(long int)((sendstop.tv_sec - sendstart.tv_sec)*1000000 +
(sendstop.tv_usec - sendstart.tv_usec)));
}
OPAL_THREAD_UNLOCK(&xcast->mutex);
OBJ_RELEASE(xcast);
} else {
orte_buffer_t rbuf;
orte_gpr_notify_message_t *msg;
OBJ_CONSTRUCT(&rbuf, orte_buffer_t);
rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &rbuf, tag);
if(rc < 0) {
OBJ_DESTRUCT(&rbuf);
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = job;
target.vpid = 0;
if (0 > (rc = mca_oob_send_packed(&target, buffer, tag, 0))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&xcastmutex);
OBJ_DESTRUCT(&xcastmutex);
return rc;
}
if (cbfunc != NULL) {
msg = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == msg) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
i=1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&rbuf, &msg, &i, ORTE_GPR_NOTIFY_MSG))) {
OPAL_THREAD_UNLOCK(&xcastmutex);
OBJ_DESTRUCT(&xcastmutex);
return ORTE_SUCCESS;
}
/* this process is one of the application procs - accordingly, it will
* receive the message from its "parent" in the broadcast tree, and
* then send it along to some set of children
*/
/* compute the bitmap, if we haven't already done so */
if (!bitmap_init) {
bitmap_save = opal_cube_dim((int)orte_process_info.num_procs);
bitmap_init = true;
}
xcast_bitmap = bitmap_save;
rank = (int)(ORTE_PROC_MY_NAME->vpid);
size = (int)orte_process_info.num_procs;
hibit = opal_hibit(rank, xcast_bitmap);
--xcast_bitmap;
/* regardless of who we are, we first have to receive the message */
OBJ_CONSTRUCT(&rbuf, orte_buffer_t);
if (0 > (rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &rbuf, tag))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&rbuf);
return rc;
}
msg = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == msg) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
i=1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&rbuf, &msg, &i, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(msg);
return rc;
}
OBJ_DESTRUCT(&rbuf);
/* repack the message so we can send it on */
OBJ_CONSTRUCT(&sbuf, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.pack(&sbuf, &msg, 1, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&sbuf);
return rc;
}
/* since the OOB contact info for our peers is in the STG1 message, we have to
* process it BEFORE we can relay the message to any "children"
*/
if (cbfunc != NULL && process_first) {
/* process the message */
cbfunc(msg);
}
/* send data to any children */
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = ORTE_PROC_MY_NAME->jobid;
for (i = hibit + 1, mask = 1 << i; i <= xcast_bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
target.vpid = (orte_vpid_t)peer;
if (0 > (rc = mca_oob_send_packed(&target, &sbuf, tag, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(msg);
return rc;
}
cbfunc(msg);
OBJ_RELEASE(msg);
}
OBJ_DESTRUCT(&rbuf);
}
OBJ_DESTRUCT(&sbuf);
/* if it wasn't the STG1 message, then process it here */
if (cbfunc != NULL && !process_first) {
cbfunc(msg);
}
OBJ_RELEASE(msg);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -196,11 +196,10 @@ typedef int (*mca_oob_base_module_fini_fn_t)(void);
/**
* xcast function for sending common messages to all processes
*/
typedef int (*mca_oob_base_module_xcast_fn_t)(orte_process_name_t* root,
orte_process_name_t* peers,
orte_std_cntr_t num_peers,
orte_buffer_t* buffer,
orte_gpr_trigger_cb_fn_t cbfunc);
typedef int (*mca_oob_base_module_xcast_fn_t)(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_trigger_cb_fn_t cbfunc);
/**
* OOB Module

Просмотреть файл

@ -58,12 +58,8 @@ int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job)
int orte_rmgr_base_proc_stage_gate_mgr(orte_gpr_notify_message_t *msg)
{
orte_buffer_t buffer;
orte_process_name_t *recipients=NULL;
orte_std_cntr_t n=0;
int rc;
orte_jobid_t job;
opal_list_t attrs;
opal_list_item_t *item;
OPAL_TRACE(1);
@ -88,23 +84,7 @@ int orte_rmgr_base_proc_stage_gate_mgr(orte_gpr_notify_message_t *msg)
}
OPAL_TRACE_ARG1(1, job);
/* need the list of peers for this job so we can send them the xcast.
* obtain this list from the name service's get_job_peers function
*/
OBJ_CONSTRUCT(&attrs, opal_list_t);
if (ORTE_SUCCESS != (rc = orte_rmgr.add_attribute(&attrs, ORTE_NS_USE_JOBID, ORTE_JOBID,
&job, ORTE_RMGR_ATTR_OVERRIDE))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&attrs);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_ns.get_peers(&recipients, &n, &attrs))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* set the job state to the appropriate level */
if (orte_schema.check_std_trigger_name(msg->target, ORTE_ALL_LAUNCHED_TRIGGER)) {
if (ORTE_SUCCESS != (rc = orte_smr.set_job_state(job, ORTE_JOB_STATE_LAUNCHED))) {
@ -148,19 +128,12 @@ int orte_rmgr_base_proc_stage_gate_mgr(orte_gpr_notify_message_t *msg)
}
/* send the message */
if (ORTE_SUCCESS != (rc = orte_rml.xcast(orte_process_info.my_name, recipients,
n, &buffer, NULL, NULL))) {
if (ORTE_SUCCESS != (rc = orte_rml.xcast(job, false, &buffer, NULL))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&buffer);
CLEANUP:
if (NULL != recipients) free(recipients);
while (NULL != (item = opal_list_remove_first(&attrs))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&attrs);
return rc;
}

Просмотреть файл

@ -326,13 +326,10 @@ typedef int (*orte_rml_module_recv_cancel_fn_t)(orte_process_name_t* peer, orte_
* xcast function for sending common messages to all processes
*/
typedef int (*orte_rml_module_xcast_fn_t)(
orte_process_name_t* root,
orte_process_name_t* peers,
size_t num_peers,
orte_buffer_t* buffer,
orte_gpr_trigger_cb_fn_t cbfunc,
void *user_tag);
typedef int (*orte_rml_module_xcast_fn_t)(orte_jobid_t job,
bool process_first,
orte_buffer_t* buffer,
orte_gpr_trigger_cb_fn_t cbfunc);
/*
* Callback on exception condition.