1
1

Merge pull request #2893 from rhc54/topic/sim

Cleanup the ras simulator capability, and the relay route thru grpcomm
Этот коммит содержится в:
Ralph Castain 2017-02-01 16:17:40 -08:00 коммит произвёл GitHub
родитель 362ac8b87e 230d15f0d9
Коммит 50ca9fb66b
6 изменённых файлов: 283 добавлений и 223 удалений

Просмотреть файл

@ -874,6 +874,9 @@ static int rte_finalize(void)
if (NULL != orte_process_info.super.proc_hostname) {
free(orte_process_info.super.proc_hostname);
}
if (orte_do_not_launch) {
exit(0);
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -260,7 +260,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
opal_list_item_t *item;
orte_namelist_t *nm;
int ret, cnt;
opal_buffer_t *relay, *rly;
opal_buffer_t *relay=NULL, *rly;
orte_daemon_cmd_flag_t command = ORTE_DAEMON_NULL_CMD;
opal_buffer_t wireup, datbuf, *data;
opal_byte_object_t *bo;
@ -284,12 +284,17 @@ static void xcast_recv(int status, orte_process_name_t* sender,
rly = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(rly, buffer);
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
/* setup the relay list */
OBJ_CONSTRUCT(&coll, opal_list_t);
/* unpack the flag to see if this payload is compressed */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
return;
}
if (flag) {
@ -298,6 +303,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &inlen, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
return;
}
/* unpack the unpacked data size */
@ -305,6 +313,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &cmplen, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
ORTE_FORCED_TERMINATE(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
return;
}
/* allocate the space */
@ -315,6 +326,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(ret);
free(packed_data);
ORTE_FORCED_TERMINATE(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
return;
}
/* decompress the data */
@ -336,6 +350,8 @@ static void xcast_recv(int status, orte_process_name_t* sender,
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &sig, &cnt, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
ORTE_FORCED_TERMINATE(ret);
return;
}
@ -346,17 +362,12 @@ static void xcast_recv(int status, orte_process_name_t* sender,
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &tag, &cnt, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&datbuf);
OBJ_DESTRUCT(&coll);
OBJ_RELEASE(rly);
ORTE_FORCED_TERMINATE(ret);
return;
}
/* setup a buffer we can pass to ourselves - this just contains
* the initial message, minus the headers inserted by xcast itself */
relay = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(relay, data);
/* setup the relay list */
OBJ_CONSTRUCT(&coll, opal_list_t);
/* get our conduit's routed module name */
rtmod = orte_rml.get_routed(orte_coll_conduit);
@ -372,140 +383,163 @@ static void xcast_recv(int status, orte_process_name_t* sender,
if (ORTE_DAEMON_EXIT_CMD == command ||
ORTE_DAEMON_HALT_VM_CMD == command) {
orte_orteds_term_ordered = true;
/* copy the msg for relay to ourselves */
relay = OBJ_NEW(opal_buffer_t);
/* repack the command */
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
opal_dss.copy_payload(relay, data);
} else if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ||
ORTE_DAEMON_DVM_NIDMAP_CMD == command) {
/* update our local nidmap, if required - the decode function
* knows what to do
*/
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:xcast updating daemon nidmap",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap(data))) {
/* setup our internal relay buffer */
relay = OBJ_NEW(opal_buffer_t);
/* repack the command */
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
if (!ORTE_PROC_IS_HNP) {
/* update the routing plan - the HNP already did
* it when it computed the VM, so don't waste time
* re-doing it here */
orte_routed.update_routing_plan(rtmod);
}
/* routing is now possible */
orte_routed_base.routing_enabled = true;
/* see if we have wiring info as well */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
/* see if any daemons were launched */
cnt = 1;
if (OPAL_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
/* add it to our relay buffer as we will need it later */
opal_dss.pack(relay, &flag, 1, OPAL_INT8);
if (0 != flag) {
/* update our local nidmap, if required - the decode function
* knows what to do
*/
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:xcast updating daemon nidmap",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_DAEMON_ADD_LOCAL_PROCS == command) {
OBJ_RELEASE(relay);
relay = OBJ_NEW(opal_buffer_t);
/* repack the command */
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap(data))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
if (0 == flag) {
/* copy the remainder of the payload */
opal_dss.copy_payload(relay, data);
/* no - just return */
if (!ORTE_PROC_IS_HNP) {
/* update the routing plan - the HNP already did
* it when it computed the VM, so don't waste time
* re-doing it here */
orte_routed.update_routing_plan(rtmod);
}
/* routing is now possible */
orte_routed_base.routing_enabled = true;
/* see if we have wiring info as well */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
}
/* unpack the byte object */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
if (0 != flag) {
/* unpack the byte object */
cnt=1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
if (0 < bo->size) {
/* load it into a buffer */
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
opal_dss.load(&wireup, bo->bytes, bo->size);
/* pass it for processing */
if (ORTE_SUCCESS != (ret = orte_rml_base_update_contact_info(&wireup))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&wireup);
goto relay;
}
/* done with the wireup buffer - dump it */
OBJ_DESTRUCT(&wireup);
}
free(bo);
}
}
/* copy the remainder of the payload - we don't pass wiring info
* to the odls */
opal_dss.copy_payload(relay, data);
} else {
relay = OBJ_NEW(opal_buffer_t);
/* repack the command */
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(ret);
goto relay;
}
if (0 < bo->size) {
/* load it into a buffer */
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
opal_dss.load(&wireup, bo->bytes, bo->size);
/* pass it for processing */
if (ORTE_SUCCESS != (ret = orte_rml_base_update_contact_info(&wireup))) {
ORTE_ERROR_LOG(ret);
OBJ_DESTRUCT(&wireup);
goto relay;
}
/* done with the wireup buffer - dump it */
OBJ_DESTRUCT(&wireup);
}
free(bo);
if (ORTE_DAEMON_ADD_LOCAL_PROCS == command) {
/* copy the remainder of the payload */
opal_dss.copy_payload(relay, data);
}
/* copy the msg for relay to ourselves */
opal_dss.copy_payload(relay, data);
}
} else {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
} else {
/* copy the msg for relay to ourselves */
relay = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(relay, data);
}
relay:
if (!orte_do_not_launch) {
/* get the list of next recipients from the routed module */
orte_routed.get_routing_list(rtmod, &coll);
/* get the list of next recipients from the routed module */
orte_routed.get_routing_list(rtmod, &coll);
/* if list is empty, no relay is required */
if (opal_list_is_empty(&coll)) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:send_relay - recipient list is empty!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto CLEANUP;
}
/* if list is empty, no relay is required */
if (opal_list_is_empty(&coll)) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:send_relay - recipient list is empty!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
OBJ_RELEASE(rly);
goto CLEANUP;
/* send the message to each recipient on list, deconstructing it as we go */
while (NULL != (item = opal_list_remove_first(&coll))) {
nm = (orte_namelist_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)rly->bytes_used,
ORTE_NAME_PRINT(&nm->name)));
OBJ_RETAIN(rly);
/* check the state of the recipient - no point
* sending to someone not alive
*/
jdata = orte_get_job_data_object(nm->name.jobid);
if (NULL == (rec = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, nm->name.vpid))) {
opal_output(0, "%s grpcomm:direct:send_relay proc %s not found - cannot relay",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
OBJ_RELEASE(rly);
OBJ_RELEASE(item);
continue;
}
if (ORTE_PROC_STATE_RUNNING < rec->state ||
!ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE)) {
opal_output(0, "%s grpcomm:direct:send_relay proc %s not running - cannot relay",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
OBJ_RELEASE(rly);
OBJ_RELEASE(item);
continue;
}
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
&nm->name, rly, ORTE_RML_TAG_XCAST,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(rly);
OBJ_RELEASE(item);
continue;
}
OBJ_RELEASE(item);
}
}
/* send the message to each recipient on list, deconstructing it as we go */
while (NULL != (item = opal_list_remove_first(&coll))) {
nm = (orte_namelist_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)rly->bytes_used,
ORTE_NAME_PRINT(&nm->name)));
OBJ_RETAIN(rly);
/* check the state of the recipient - no point
* sending to someone not alive
*/
jdata = orte_get_job_data_object(nm->name.jobid);
if (NULL == (rec = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, nm->name.vpid))) {
opal_output(0, "%s grpcomm:direct:send_relay proc %s not found - cannot relay",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
OBJ_RELEASE(rly);
OBJ_RELEASE(item);
continue;
}
if (ORTE_PROC_STATE_RUNNING < rec->state ||
!ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE)) {
opal_output(0, "%s grpcomm:direct:send_relay proc %s not running - cannot relay",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
OBJ_RELEASE(rly);
OBJ_RELEASE(item);
continue;
}
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
&nm->name, rly, ORTE_RML_TAG_XCAST,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(rly);
OBJ_RELEASE(item);
continue;
}
OBJ_RELEASE(item);
}
OBJ_RELEASE(rly); // retain accounting
CLEANUP:
/* cleanup */
OBJ_DESTRUCT(&coll);
OPAL_LIST_DESTRUCT(&coll);
OBJ_RELEASE(rly); // retain accounting
/* now pass the relay buffer to myself for processing - don't
* inject it into the RML system via send as that will compete
@ -517,7 +551,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
relay->base_ptr = NULL;
relay->bytes_used = 0;
}
OBJ_RELEASE(relay);
if (NULL != relay) {
OBJ_RELEASE(relay);
}
OBJ_DESTRUCT(&datbuf);
}

Просмотреть файл

@ -108,6 +108,8 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
opal_byte_object_t bo, *boptr;
int32_t numbytes, numjobs;
int8_t flag;
void *nptr;
uint32_t key;
/* get the job data pointer */
if (NULL == (jdata = orte_get_job_data_object(job))) {
@ -122,72 +124,60 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
return ORTE_SUCCESS;
}
/* if this is a DVM-based launch, then don't pack all the wireup
* info as we don't need it - just pack the job itself */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FIXED_DVM, NULL, OPAL_BOOL)) {
numjobs = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &numjobs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the job struct */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &jdata, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/* construct a nodemap of the daemons */
if (ORTE_SUCCESS != (rc = orte_util_encode_nodemap(buffer))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if we are not using static ports, we need to send the wireup info */
if (!orte_static_ports) {
/* pack a flag indicating wiring info is provided */
/* if we launched new daemons... */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCHED_DAEMONS, NULL, OPAL_BOOL)) {
/* flag that we did */
flag = 1;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
/* get wireup info for daemons per the selected routing module */
wireup = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, wireup))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* put it in a byte object for xmission */
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);
/* pack the byte object - zero-byte objects are fine */
bo.size = numbytes;
boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* release the data since it has now been copied into our buffer */
if (NULL != bo.bytes) {
free(bo.bytes);
}
OBJ_RELEASE(wireup);
} else {
/* pack a flag indicating no wireup data is provided */
flag = 0;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
}
/* check if this job caused daemons to be spawned - if it did,
* then we need to ensure that those daemons get a complete
* copy of all active jobs so the grpcomm collectives can
* properly work should a proc from one of the other jobs
* interact with this one */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCHED_DAEMONS, NULL, OPAL_BOOL)) {
void *nptr;
uint32_t key;
/* include a nodemap of the daemons */
if (ORTE_SUCCESS != (rc = orte_util_encode_nodemap(buffer))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* if we are not using static ports, we need to send the wireup info */
if (!orte_static_ports) {
/* pack a flag indicating wiring info is provided */
flag = 1;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
/* get wireup info for daemons per the selected routing module */
wireup = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, wireup))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* put it in a byte object for xmission */
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);
/* pack the byte object - zero-byte objects are fine */
bo.size = numbytes;
boptr = &bo;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(wireup);
return rc;
}
/* release the data since it has now been copied into our buffer */
if (NULL != bo.bytes) {
free(bo.bytes);
}
OBJ_RELEASE(wireup);
} else {
/* pack a flag indicating no wireup data is provided */
flag = 0;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
}
/* we need to ensure that any new daemons get a complete
* copy of all active jobs so the grpcomm collectives can
* properly work should a proc from one of the other jobs
* interact with this one */
OBJ_CONSTRUCT(&jobdata, opal_buffer_t);
numjobs = 0;
rc = opal_hash_table_get_first_key_uint32(orte_job_data, &key, (void **)&jptr, &nptr);
while (OPAL_SUCCESS == rc) {
/* skip the one we are launching now */
if (NULL != jptr && jptr != jdata &&
ORTE_PROC_MY_NAME->jobid != jptr->jobid) {
/* pack the job struct */
@ -217,14 +207,11 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
OBJ_DESTRUCT(&jobdata);
}
} else {
numjobs = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &numjobs, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* include a sentinel */
flag = 0;
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
}
/* pack the job struct */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &jdata, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
@ -253,70 +240,79 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
orte_app_context_t *app;
orte_node_t *node;
bool newmap = false;
int8_t flag;
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s odls:constructing child list",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
*job = ORTE_JOBID_INVALID;
/* get the daemon job object */
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
/* unpack the flag to see if additional jobs are included in the data */
/* unpack the flag to see if new daemons were launched */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &n, &cnt, OPAL_INT32))) {
*job = ORTE_JOBID_INVALID;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
/* get the daemon job object */
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
if (0 < n) {
/* unpack the buffer containing the info */
if (0 != flag) {
/* see if additional jobs are included in the data */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &bptr, &cnt, OPAL_BUFFER))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &n, &cnt, OPAL_INT32))) {
*job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
for (k=0; k < n; k++) {
/* unpack each job and add it to the local orte_job_data array */
if (0 < n) {
/* unpack the buffer containing the info */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(bptr, &jdata, &cnt, ORTE_JOB))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &bptr, &cnt, OPAL_BUFFER))) {
*job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
/* check to see if we already have this one */
if (NULL == orte_get_job_data_object(jdata->jobid)) {
/* nope - add it */
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
/* connect each proc to its node object */
for (j=0; j < jdata->procs->size; j++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, j))) {
continue;
}
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, pptr->parent))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto REPORT_ERROR;
}
OBJ_RETAIN(dmn->node);
pptr->node = dmn->node;
/* add proc to node - note that num_procs for the
* node was already correctly unpacked, so don't
* increment it here */
OBJ_RETAIN(pptr);
opal_pointer_array_add(dmn->node->procs, pptr);
for (k=0; k < n; k++) {
/* unpack each job and add it to the local orte_job_data array */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(bptr, &jdata, &cnt, ORTE_JOB))) {
*job = ORTE_JOBID_INVALID;
ORTE_ERROR_LOG(rc);
goto REPORT_ERROR;
}
/* check to see if we already have this one */
if (NULL == orte_get_job_data_object(jdata->jobid)) {
/* nope - add it */
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
/* connect each proc to its node object */
for (j=0; j < jdata->procs->size; j++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, j))) {
continue;
}
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(daemons->procs, pptr->parent))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto REPORT_ERROR;
}
OBJ_RETAIN(dmn->node);
pptr->node = dmn->node;
/* add proc to node - note that num_procs for the
* node was already correctly unpacked, so don't
* increment it here */
OBJ_RETAIN(pptr);
opal_pointer_array_add(dmn->node->procs, pptr);
}
} else {
/* yep - so we can drop this copy */
jdata->jobid = ORTE_JOBID_INVALID;
OBJ_RELEASE(jdata);
}
} else {
/* yep - so we can drop this copy */
jdata->jobid = ORTE_JOBID_INVALID;
OBJ_RELEASE(jdata);
}
/* release the buffer */
OBJ_RELEASE(bptr);
}
/* release the buffer */
OBJ_RELEASE(bptr);
}
/* unpack the job we are to launch */

Просмотреть файл

@ -68,6 +68,7 @@
#include "orte/runtime/runtime.h"
#include "orte/runtime/orte_locks.h"
#include "orte/runtime/orte_quit.h"
#include "orte/util/compress.h"
#include "orte/util/name_fns.h"
#include "orte/util/nidmap.h"
#include "orte/util/pre_condition_transports.h"
@ -374,14 +375,6 @@ void orte_plm_base_complete_setup(int fd, short args, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(caddy->jdata->jobid));
/* if we don't want to launch the apps, now is the time to leave */
if (orte_do_not_launch) {
orte_never_launched = true;
ORTE_FORCED_TERMINATE(0);
OBJ_RELEASE(caddy);
return;
}
/* bozo check */
if (ORTE_JOB_STATE_SYSTEM_PREP != caddy->job_state) {
ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
@ -561,6 +554,27 @@ void orte_plm_base_launch_apps(int fd, short args, void *cbdata)
return;
}
/* if we don't want to launch the apps, now is the time to leave */
if (orte_do_not_launch) {
bool compressed;
uint8_t *cmpdata;
size_t cmplen;
/* report the size of the launch message */
compressed = orte_util_compress_block((uint8_t*)buffer->base_ptr, buffer->bytes_used,
&cmpdata, &cmplen);
if (compressed) {
opal_output(0, "LAUNCH MSG RAW SIZE: %d COMPRESSED SIZE: %d",
(int)buffer->bytes_used, (int)cmplen);
free(cmpdata);
} else {
opal_output(0, "LAUNCH MSG RAW SIZE: %d", (int)buffer->bytes_used);
}
orte_never_launched = true;
ORTE_FORCED_TERMINATE(0);
OBJ_RELEASE(caddy);
return;
}
/* goes to all daemons */
sig = OBJ_NEW(orte_grpcomm_signature_t);
sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));

Просмотреть файл

@ -259,6 +259,9 @@ static void vm_ready(int fd, short args, void *cbdata)
OBJ_RELEASE(buf);
return;
}
/* flag that daemons were launchd so we will update the nidmap */
flag = 1;
opal_dss.pack(buf, &flag, 1, OPAL_INT8);
/* construct a nodemap with everything in it */
if (ORTE_SUCCESS != (rc = orte_util_encode_nodemap(buf))) {
ORTE_ERROR_LOG(rc);

Просмотреть файл

@ -356,6 +356,10 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
opal_output(0, "%s orted_cmd: received exit cmd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
if (orte_do_not_launch) {
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_DAEMONS_TERMINATED);
return;
}
/* kill the local procs */
orte_odls.kill_local_procs(NULL);
/* flag that orteds were ordered to terminate */
@ -394,6 +398,10 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
opal_output(0, "%s orted_cmd: received halt_vm cmd",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
if (orte_do_not_launch) {
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_DAEMONS_TERMINATED);
return;
}
/* kill the local procs */
orte_odls.kill_local_procs(NULL);
/* flag that orteds were ordered to terminate */