1
1

A little bit further.....transferring to Tim's box for further work.

This commit was SVN r3763.
Этот коммит содержится в:
Ralph Castain 2004-12-09 17:04:59 +00:00
родитель 02d5381c25
Коммит 3c29621a4f
6 изменённых файлов: 57 добавлений и 28 удалений

Просмотреть файл

@ -230,7 +230,9 @@ static void mca_base_modex_registry_callback(
* Lookup the process.
*/
ompi_unpack(buffer, &proc_name, 1, OMPI_NAME);
ompi_output(0, "[%d,%d,%d] yahoo! modex processing data for proc [%d,%d,%d]",
OMPI_NAME_ARGS(*ompi_rte_get_self()), OMPI_NAME_ARGS(proc_name));
proc = ompi_proc_find_and_add(&proc_name, &isnew);
if(NULL == proc)
@ -337,9 +339,6 @@ static int mca_base_modex_subscribe(ompi_process_name_t* name)
}
OMPI_UNLOCK(&mca_base_modex_lock);
ompi_output(0, "[%d,%d,%d] modex_subscribe for [%d,%d,%d]",
OMPI_NAME_ARGS(*ompi_rte_get_self()), OMPI_NAME_ARGS(*name));
/* otherwise - subscribe */
asprintf(&segment, "%s-%s", OMPI_RTE_MODEX_SEGMENT, mca_ns_base_get_jobid_string(name));
rctag = ompi_registry.subscribe(
@ -455,8 +454,6 @@ int mca_base_modex_recv(
return OMPI_ERR_OUT_OF_RESOURCE;
}
ompi_output(0, "[%d,%d,%d] modex_recv: waiting for data", OMPI_NAME_ARGS(*ompi_rte_get_self()));
/* wait until data is available */
while(modex_module->module_data_avail == false) {
ompi_condition_wait(&modex_module->module_data_cond, &proc->proc_lock);

Просмотреть файл

@ -30,12 +30,13 @@ mca_gpr_base_unpack_get_startup_msg(ompi_buffer_t buffer,
ompi_list_t *recipients)
{
mca_gpr_cmd_flag_t command;
int32_t num_recipients, i;
int32_t num_objects, num_recipients, i;
ompi_process_name_t proc;
ompi_name_server_namelist_t *peer;
ompi_buffer_t msg;
void *addr;
int size;
char *segment=NULL;
ompi_registry_object_t *data_object;
ompi_registry_object_size_t data_obj_size;
if ((OMPI_SUCCESS != ompi_unpack(buffer, &command, 1, MCA_GPR_OOB_PACK_CMD))
|| (MCA_GPR_GET_STARTUP_MSG_CMD != command)) {
@ -65,9 +66,23 @@ mca_gpr_base_unpack_get_startup_msg(ompi_buffer_t buffer,
return NULL;
}
ompi_buffer_get(buffer, &addr, &size);
if (0 < size) {
ompi_pack(msg, addr, size, OMPI_BYTE);
while (0 < ompi_unpack_string(buffer, &segment)) {
ompi_output(0, "\transferring data for segment %s", segment);
ompi_pack_string(msg, segment);
ompi_unpack(buffer, &num_objects, 1, OMPI_INT32); /* unpack #data objects */
ompi_pack(msg, &num_objects, 1, OMPI_INT32);
if (0 < num_objects) {
for (i=0; i < num_objects; i++) {
ompi_unpack(buffer, &data_obj_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE);
data_object = (ompi_registry_object_t)malloc(data_obj_size);
ompi_unpack(buffer, data_object, data_obj_size, OMPI_BYTE);
ompi_pack(msg, &data_obj_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE);
ompi_pack(msg, data_object, data_obj_size, OMPI_BYTE);
free(data_object);
}
}
free(segment);
}
return msg;

Просмотреть файл

@ -1141,8 +1141,10 @@ static void mca_gpr_replica_recv_get_startup_msg_cmd(ompi_buffer_t buffer, ompi_
ompi_list_t *recipients=NULL;
ompi_buffer_t msg;
ompi_name_server_namelist_t *recip=NULL;
void *addr=NULL;
int32_t size=0, num_recipients=0, i=0;
int32_t num_objects=0, num_recipients=0, i=0;
char *segment=NULL;
ompi_registry_object_t *data_object;
ompi_registry_object_size_t data_obj_size;
if (OMPI_SUCCESS != ompi_unpack(buffer, &jobid, 1, OMPI_JOBID)) {
return;
@ -1171,11 +1173,25 @@ static void mca_gpr_replica_recv_get_startup_msg_cmd(ompi_buffer_t buffer, ompi_
OBJ_RELEASE(recip);
}
ompi_buffer_get(msg, &addr, &size);
ompi_pack(answer, &size, 1, OMPI_INT32);
ompi_pack(answer, &addr, size, OMPI_BYTE);
while (0 < ompi_unpack_string(msg, &segment)) {
ompi_output(0, "replica_recv_proxy: transferring startup data for segment %s", segment);
ompi_pack_string(answer, segment);
ompi_unpack(msg, &num_objects, 1, OMPI_INT32); /* unpack #data objects */
ompi_pack(answer, &num_objects, 1, OMPI_INT32);
if (0 < num_objects) {
for (i=0; i < num_objects; i++) {
ompi_unpack(msg, &data_obj_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE);
data_object = (ompi_registry_object_t)malloc(data_obj_size);
ompi_unpack(msg, data_object, data_obj_size, OMPI_BYTE);
ompi_pack(answer, &data_obj_size, 1, MCA_GPR_OOB_PACK_OBJECT_SIZE);
ompi_pack(answer, data_object, data_obj_size, OMPI_BYTE);
free(data_object);
}
}
free(segment);
}
ompi_buffer_free(msg);
}

Просмотреть файл

@ -44,11 +44,9 @@ int mca_oob_xcast(
{
ompi_name_server_namelist_t *ptr;
int rc;
size_t buf_size;
int tag = MCA_OOB_TAG_XCAST;
ompi_buffer_size(buffer, &buf_size);
ompi_buffer_t rbuf;
/* check to see if I am the root process name */
if(NULL != root &&
0 == ompi_name_server.compare(OMPI_NS_CMP_ALL, root, ompi_rte_get_self())) {
@ -61,8 +59,7 @@ int mca_oob_xcast(
}
}
} else {
ompi_buffer_t rbuf;
int rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, &tag);
rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, &tag);
if(rc < 0) {
return rc;
}

Просмотреть файл

@ -37,6 +37,7 @@ int ompi_rte_job_startup(mca_ns_base_jobid_t jobid)
ompi_name_server_namelist_t *ptr;
ompi_rte_process_status_t *proc_status;
int num_procs;
size_t buf_size;
if (ompi_rte_debug_flag) {
ompi_output(0, "[%d,%d,%d] entered rte_job_startup for job %d",
@ -49,8 +50,9 @@ int ompi_rte_job_startup(mca_ns_base_jobid_t jobid)
ompi_registry.triggers_active(jobid);
if (ompi_rte_debug_flag) {
ompi_output(0, "[%d,%d,%d] rte_job_startup: sending startup message to %d recipients",
OMPI_NAME_ARGS(*ompi_rte_get_self()),
ompi_buffer_size(startup_msg, &buf_size);
ompi_output(0, "[%d,%d,%d] rte_job_startup: sending startup message of size %d to %d recipients",
OMPI_NAME_ARGS(*ompi_rte_get_self()), (int)buf_size,
ompi_list_get_size(recipients));
}

Просмотреть файл

@ -59,10 +59,12 @@ ompi_rte_decode_startup_msg(int status, ompi_process_name_t *peer,
ompi_registry_object_t *data_object;
ompi_registry_object_size_t data_obj_size;
int32_t num_objects, i;
size_t buf_size;
if (ompi_rte_debug_flag) {
ompi_output(0, "[%d,%d,%d] decoding startup msg",
OMPI_NAME_ARGS(*ompi_rte_get_self()));
ompi_buffer_size(msg, &buf_size);
ompi_output(0, "[%d,%d,%d] decoding startup msg of length %d",
OMPI_NAME_ARGS(*ompi_rte_get_self()), (int)buf_size);
}
while (0 < ompi_unpack_string(msg, &segment)) {