1
1

Modify the new DSS xfer and copy functions so they only xfer/copy the unpacked portion of a buffer's payload. This allows for more rapid transfer of data during message relay without requiring any knowledge of what is in the buffer.

Begin work on restoring binomial message distribution method.

This commit was SVN r14728.
Этот коммит содержится в:
Ralph Castain 2007-05-23 14:06:32 +00:00
родитель b2e805db61
Коммит e6ff7757ab
4 изменённых файлов: 228 добавлений и 168 удалений

Просмотреть файл

@ -41,7 +41,7 @@ int orte_dss_unload(orte_buffer_t *buffer, void **payload,
{
char *hdr_dst = NULL;
orte_dss_buffer_type_t type;
/* check that buffer is not null */
if (!buffer) {
return ORTE_ERR_BAD_PARAM;
@ -73,8 +73,8 @@ int orte_dss_unload(orte_buffer_t *buffer, void **payload,
/* okay, we have something to provide - pass it back */
*payload = buffer->base_ptr;
*bytes_used = (orte_std_cntr_t)buffer->bytes_used;
*bytes_used = buffer->bytes_used;
/* dereference everything in buffer */
buffer->base_ptr = NULL;
buffer->pack_ptr = buffer->unpack_ptr = NULL;
@ -129,10 +129,18 @@ int orte_dss_load(orte_buffer_t *buffer, void *payload,
return ORTE_SUCCESS;
}
/* Move the UNPACKED portion of a source buffer into a destination buffer
* The complete contents of the src buffer are NOT moved - only that
* portion that has not been previously unpacked. However, we must ensure
* that we don't subsequently "free" memory from inside a previously
* malloc'd block. Hence, we must obtain a new memory allocation for the
* dest buffer's storage before we move the data across. As a result, this
* looks functionally a lot more like a destructive "copy" - both for
* the source and destination buffers - then a direct transfer of data!
*/
int orte_dss_xfer_payload(orte_buffer_t *dest, orte_buffer_t *src)
{
void *payload;
orte_std_cntr_t bytes_used;
int rc;
/* ensure we have valid source and destination */
@ -141,25 +149,44 @@ int orte_dss_xfer_payload(orte_buffer_t *dest, orte_buffer_t *src)
return ORTE_ERR_BAD_PARAM;
}
/* unload the src payload */
if (ORTE_SUCCESS != (rc = orte_dss_unload(src, &payload, &bytes_used))) {
/* if the dest is already populated, release the data */
if (0 != dest->bytes_used) {
free(dest->base_ptr);
dest->base_ptr = NULL;
dest->pack_ptr = dest->unpack_ptr = NULL;
dest->bytes_allocated = dest->bytes_used = 0;
}
/* ensure the dest buffer type matches the src */
dest->type = src->type;
/* copy the src payload to the dest - this will allocate "fresh"
* memory for the unpacked payload remaining in the src buffer
*/
if (ORTE_SUCCESS != (rc = orte_dss_copy_payload(dest, src))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* load it into the dest, overwriting anything already there */
if (ORTE_SUCCESS != (rc = orte_dss_load(dest, payload, bytes_used))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* dereference everything in src */
free(src->base_ptr);
src->base_ptr = NULL;
src->pack_ptr = src->unpack_ptr = NULL;
src->bytes_allocated = src->bytes_used = 0;
return ORTE_SUCCESS;
}
/* Copy the UNPACKED portion of a source buffer into a destination buffer
* The complete contents of the src buffer are NOT copied - only that
* portion that has not been previously unpacked is copied.
*/
int orte_dss_copy_payload(orte_buffer_t *dest, orte_buffer_t *src)
{
char *dst_ptr;
orte_std_cntr_t bytes_left;
/* ensure we have valid source and destination */
if (NULL == dest || NULL == src) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
@ -181,17 +208,31 @@ int orte_dss_copy_payload(orte_buffer_t *dest, orte_buffer_t *src)
*/
dest->type = src->type;
/* compute how much of the src buffer remains unpacked
* buffer->bytes_used is the total number of bytes in the buffer that
* have been packed. However, we may have already unpacked some of
* that data. We only want to unload what remains unpacked. This
* means we have to look at how much of the buffer remains "used"
* beyond the unpack_ptr
*/
bytes_left = src->bytes_used - (src->unpack_ptr - src->base_ptr);
/* if nothing is left, then nothing to do */
if (0 == bytes_left) {
return ORTE_SUCCESS;
}
/* add room to the dest for the src buffer's payload */
if (NULL == (dst_ptr = orte_dss_buffer_extend(dest, src->bytes_used))) {
if (NULL == (dst_ptr = orte_dss_buffer_extend(dest, bytes_left))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* copy the src payload to the specified location in dest */
memcpy(dst_ptr, src->base_ptr, src->bytes_used);
memcpy(dst_ptr, src->unpack_ptr, bytes_left);
/* adjust the dest buffer's bookkeeping */
dest->bytes_used += src->bytes_used;
dest->pack_ptr = ((char*)dest->pack_ptr) + src->bytes_used;
dest->bytes_used += bytes_left;
dest->pack_ptr = ((char*)dest->pack_ptr) + bytes_left;
return ORTE_SUCCESS;
}

Просмотреть файл

@ -118,14 +118,9 @@ int mca_oob_base_open(void)
}
param = mca_base_param_reg_string_name("oob", "xcast_mode",
#if 0
"Select xcast mode (\"linear\" | \"binomial\" | \"direct [default] \")",
#endif
"Select xcast mode (\"linear\" | \"direct [default] \")",
false, false, "direct", &mode);
if (0 == strcmp(mode, "binomial")) {
opal_output(0, "oob_xcast_mode: %s option not supported at this time", mode);
return ORTE_ERROR;
orte_oob_xcast_mode = 0;
} else if (0 == strcmp(mode, "linear")) {
orte_oob_xcast_mode = 1;

Просмотреть файл

@ -205,13 +205,13 @@ static int mca_oob_xcast_binomial_tree(orte_jobid_t job,
orte_rml_tag_t tag)
{
orte_std_cntr_t i;
int rc, ret;
int rc;
int peer, size, rank, hibit, mask;
orte_process_name_t target;
orte_buffer_t *buf;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
orte_daemon_cmd_flag_t mode=ORTE_DAEMON_ROUTE_BINOMIAL;
orte_vpid_t daemon_start=0, num_daemons;
orte_vpid_t num_daemons;
int bitmap;
/* this is the HNP end, so it starts the procedure. Since the HNP is always the
@ -223,19 +223,34 @@ static int mca_oob_xcast_binomial_tree(orte_jobid_t job,
*/
buf = OBJ_NEW(orte_buffer_t);
/* ======== LOAD THE VALUES THAT ARE COMMON TO ALL NON-DIRECT MESSAGE PATHS ======== */
/* tell the daemon this is a message for its local procs */
if (ORTE_SUCCESS != (rc = orte_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the routing algorithm is binomial so it can figure
* out who to forward the message to
* out who to forward the message down the tree
*/
if (ORTE_SUCCESS != (rc = orte_dss.pack(buf, &mode, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the jobid of the procs that are to receive the message */
if (ORTE_SUCCESS != (rc = orte_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the tag where the message is to be sent */
if (ORTE_SUCCESS != (rc = orte_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* ======= DONE WITH COMMON VALUES ====== */
/* get the number of daemons currently in the system and tell the daemon so
* it can properly route
*/
@ -243,21 +258,11 @@ static int mca_oob_xcast_binomial_tree(orte_jobid_t job,
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(buf, &daemon_start, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(buf, &num_daemons, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the jobid of the procs that are to receive the message */
if (ORTE_SUCCESS != (rc = orte_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
@ -276,37 +281,63 @@ static int mca_oob_xcast_binomial_tree(orte_jobid_t job,
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = 0;
/* if there is only one daemon, then we just send it - don't try to
* compute the binomial algorithm as it won't yield a meaningful
* result for just one
/* we have to account for all of the messages we are about to send
* because the non-blocking send can come back almost immediately - before
* we would get the chance to increment the num_active. This causes us
* to not correctly wakeup and reset the xcast_in_progress flag
*/
if (num_daemons < 2) {
target.vpid = 1;
if (0 > (ret = mca_oob_send_packed(&target, buf, ORTE_RML_TAG_PLS_ORTED, 0))) {
ORTE_ERROR_LOG(ret);
rc = ret;
OPAL_THREAD_LOCK(&orte_oob_xcast_mutex);
xcast_num_active = num_daemons;
if (orte_process_info.daemon ||
orte_process_info.seed ||
orte_process_info.singleton) {
/* we never send to ourselves,
* so we need to adjust the number of sends
* we are expecting to complete
*/
xcast_num_active--;
if (xcast_num_active <= 0) {
/* if we aren't going to send anything at all, we
* need to reset the xcast_in_progress flag so
* we don't block the entire system and return
*/
xcast_in_progress = false;
OPAL_THREAD_UNLOCK(&orte_oob_xcast_mutex);
rc = ORTE_SUCCESS;
goto CLEANUP;
}
} else {
/* compute the bitmap */
bitmap = opal_cube_dim((int)num_daemons);
rank = 0;
size = (int)num_daemons;
hibit = opal_hibit(rank, bitmap);
--bitmap;
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = 0;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
target.vpid = (orte_vpid_t)(daemon_start+peer);
if (0 > (ret = mca_oob_send_packed(&target, buf, ORTE_RML_TAG_PLS_ORTED, 0))) {
ORTE_ERROR_LOG(ret);
rc = ret;
}
OPAL_THREAD_UNLOCK(&orte_oob_xcast_mutex);
/* compute the bitmap */
bitmap = opal_cube_dim((int)num_daemons);
rank = 0;
size = (int)num_daemons;
hibit = opal_hibit(rank, bitmap);
--bitmap;
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = 0;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
target.vpid = (orte_vpid_t)peer;
opal_output(0, "[%ld,%ld,%ld] xcast to [%ld,%ld,%ld]", ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), ORTE_NAME_ARGS(&target));
if (0 > (rc = mca_oob_send_packed_nb(&target, buf, ORTE_RML_TAG_PLS_ORTED,
0, mca_oob_xcast_send_cb, NULL))) {
if (ORTE_ERR_ADDRESSEE_UNKNOWN != rc) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
rc = ORTE_ERR_COMM_FAILURE;
OPAL_THREAD_LOCK(&orte_oob_xcast_mutex);
xcast_num_active -= (num_daemons-i);
OPAL_THREAD_UNLOCK(&orte_oob_xcast_mutex);
goto CLEANUP;
}
/* decrement the number we are waiting to see */
OPAL_THREAD_LOCK(&orte_oob_xcast_mutex);
xcast_num_active--;
OPAL_THREAD_UNLOCK(&orte_oob_xcast_mutex);
}
}
}

Просмотреть файл

@ -58,22 +58,20 @@
#include "orte/tools/orted/orted.h"
static int binomial_route_msg(orte_buffer_t *buf, orte_jobid_t job, orte_rml_tag_t target_tag);
void orte_daemon_recv_pls(int status, orte_process_name_t* sender,
orte_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_daemon_cmd_flag_t command, routing_mode;
orte_buffer_t answer, relay;
orte_buffer_t answer, *relay;
int ret;
orte_std_cntr_t n;
int32_t signal;
orte_gpr_notify_data_t *ndat;
orte_jobid_t *jobs, job;
orte_std_cntr_t num_jobs;
orte_gpr_notify_message_t *msg;
orte_std_cntr_t daemon_start, num_daemons;
int i, bitmap, peer, size, rank, hibit, mask;
orte_process_name_t target;
orte_rml_tag_t target_tag;
OPAL_TRACE(1);
@ -192,130 +190,37 @@ void orte_daemon_recv_pls(int status, orte_process_name_t* sender,
goto CLEANUP;
}
/* if the mode is BINOMIAL, then we will be routing the message elsewhere, so
* copy the message for later use and unpack routing info we'll need
*/
if (ORTE_DAEMON_ROUTE_BINOMIAL == routing_mode) {
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &daemon_start, &n, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &num_daemons, &n, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
}
/* unpack the jobid of the procs that are to receive the message */
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &job, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* unpack the tag where we are to deliver the message */
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &target_tag, &n, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* unpack the message */
msg = OBJ_NEW(orte_gpr_notify_message_t);
if (NULL == msg) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
goto CLEANUP;
}
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buffer, &msg, &n, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(msg);
goto CLEANUP;
}
/* if the mode is BINOMIAL, then we have to reconstruct and route this message on */
/* if the mode is BINOMIAL, then relay it on before doing anything else */
if (ORTE_DAEMON_ROUTE_BINOMIAL == routing_mode) {
OBJ_CONSTRUCT(&relay, orte_buffer_t);
/* tell the daemon this is a message for its local procs */
command=ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (ret = orte_dss.pack(&relay, &command, 1, ORTE_DAEMON_CMD))) {
if (ORTE_SUCCESS != (ret = binomial_route_msg(buffer, job, target_tag))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* tell the daemon the routing algorithm is binomial so it can figure
* out who to forward the message to
*/
if (ORTE_SUCCESS != (ret = orte_dss.pack(&relay, &routing_mode, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* tell the daemon the number of daemons currently in the system so
* it can properly route
*/
if (ORTE_SUCCESS != (ret = orte_dss.pack(&relay, &daemon_start, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
if (ORTE_SUCCESS != (ret = orte_dss.pack(&relay, &num_daemons, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* tell the daemon the jobid of the procs that are to receive the message */
if (ORTE_SUCCESS != (ret = orte_dss.pack(&relay, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* pack the message itself */
if (ORTE_SUCCESS != (ret = orte_dss.pack(&relay, &msg, 1, ORTE_GPR_NOTIFY_MSG))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* compute the bitmap */
bitmap = opal_cube_dim((int)num_daemons);
rank = (int)ORTE_PROC_MY_NAME->vpid;
size = (int)num_daemons;
hibit = opal_hibit(rank, bitmap);
--bitmap;
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = 0;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
target.vpid = (orte_vpid_t)peer;
if (0 > (ret = orte_rml.send_buffer(&target, &relay, ORTE_RML_TAG_PLS_ORTED, 0))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&relay);
goto CLEANUP;
}
}
}
OBJ_DESTRUCT(&relay);
}
relay = OBJ_NEW(orte_buffer_t);
orte_dss.copy_payload(relay, buffer);
/* construct a buffer for local delivery to our children */
OBJ_CONSTRUCT(&relay, orte_buffer_t);
if (ORTE_SUCCESS != (ret = orte_dss.pack(&relay, &msg, 1, ORTE_GPR_NOTIFY_MSG))) {
/* now deliver the message to our children */
if (ORTE_SUCCESS != (ret = orte_odls.deliver_message(job, relay, target_tag))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* deliver the message to the children */
if (ORTE_SUCCESS != (ret = orte_odls.deliver_message(job, &relay, target_tag))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
OBJ_DESTRUCT(&relay);
OBJ_RELEASE(msg);
OBJ_RELEASE(relay);
break;
/**** EXIT COMMAND ****/
@ -507,3 +412,91 @@ DONE:
return;
}
static int binomial_route_msg(orte_buffer_t *buf, orte_jobid_t job, orte_rml_tag_t target_tag)
{
orte_std_cntr_t n, num_daemons;
int i, bitmap, peer, size, rank, hibit, mask;
orte_process_name_t target;
orte_daemon_cmd_flag_t command;
orte_buffer_t *relay=NULL;
int ret;
/* initialize the relay buffer */
relay = OBJ_NEW(orte_buffer_t);
if (NULL == relay) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* tell the downstream daemons this is a message for their local procs */
command=ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (ret = orte_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* tell the downstream daemons the routing algorithm is binomial */
command = ORTE_DAEMON_ROUTE_BINOMIAL;
if (ORTE_SUCCESS != (ret = orte_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* tell the downstream daemons the jobid of the procs that are to receive the message */
if (ORTE_SUCCESS != (ret = orte_dss.pack(relay, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* tell the downstream daemons the tag where the message is to be delivered */
if (ORTE_SUCCESS != (ret = orte_dss.pack(relay, &target_tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* unpack the current number of daemons */
n = 1;
if (ORTE_SUCCESS != (ret = orte_dss.unpack(buf, &num_daemons, &n, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* pass that value to the downstream daemons */
if (ORTE_SUCCESS != (ret = orte_dss.pack(relay, &num_daemons, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* copy the message payload to the relay buffer - this is non-destructive */
if (ORTE_SUCCESS != (ret = orte_dss.copy_payload(relay, buf))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* compute the bitmap */
bitmap = opal_cube_dim((int)num_daemons);
rank = (int)ORTE_PROC_MY_NAME->vpid;
size = (int)num_daemons;
hibit = opal_hibit(rank, bitmap);
--bitmap;
target.cellid = ORTE_PROC_MY_NAME->cellid;
target.jobid = 0;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
target.vpid = (orte_vpid_t)peer;
opal_output(0, "[%ld,%ld,%ld] relaying to [%ld,%ld,%ld]", ORTE_NAME_ARGS(ORTE_PROC_MY_NAME), ORTE_NAME_ARGS(&target));
if (0 > (ret = orte_rml.send_buffer(&target, relay, ORTE_RML_TAG_PLS_ORTED, 0))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
}
}
CLEANUP:
if (NULL != relay) OBJ_RELEASE(relay);
return ORTE_SUCCESS;
}