1
1
Signed-off-by: Ralph Castain <rhc@pmix.org>
Этот коммит содержится в:
Ralph Castain 2019-03-16 01:20:15 -07:00
родитель 35a597178d
Коммит 2794ae43b3

Просмотреть файл

@ -385,6 +385,7 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
/* add this name to the pool */ /* add this name to the pool */
nd = OBJ_NEW(orte_node_t); nd = OBJ_NEW(orte_node_t);
nd->name = strdup(names[n]); nd->name = strdup(names[n]);
nd->index = n;
opal_pointer_array_set_item(orte_node_pool, n, nd); opal_pointer_array_set_item(orte_node_pool, n, nd);
/* set the topology - always default to homogeneous /* set the topology - always default to homogeneous
* as that is the most common scenario */ * as that is the most common scenario */
@ -409,7 +410,6 @@ int orte_util_decode_nidmap(opal_buffer_t *buf)
daemons->num_procs++; daemons->num_procs++;
opal_pointer_array_set_item(daemons->procs, proc->name.vpid, proc); opal_pointer_array_set_item(daemons->procs, proc->name.vpid, proc);
} }
nd->index = proc->name.vpid;
OBJ_RETAIN(nd); OBJ_RETAIN(nd);
proc->node = nd; proc->node = nd;
OBJ_RETAIN(proc); OBJ_RETAIN(proc);
@ -945,8 +945,9 @@ int orte_util_parse_node_info(opal_buffer_t *buf)
int orte_util_generate_ppn(orte_job_t *jdata, int orte_util_generate_ppn(orte_job_t *jdata,
opal_buffer_t *buf) opal_buffer_t *buf)
{ {
uint16_t *ppn=NULL; uint16_t ppn;
size_t nbytes; uint8_t *bytes;
int32_t nbytes;
int rc = ORTE_SUCCESS; int rc = ORTE_SUCCESS;
orte_app_idx_t i; orte_app_idx_t i;
int j, k; int j, k;
@ -955,32 +956,39 @@ int orte_util_generate_ppn(orte_job_t *jdata,
orte_node_t *nptr; orte_node_t *nptr;
orte_proc_t *proc; orte_proc_t *proc;
size_t sz; size_t sz;
opal_buffer_t bucket;
/* make room for the number of procs on each node */ OBJ_CONSTRUCT(&bucket, opal_buffer_t);
nbytes = sizeof(uint16_t) * orte_node_pool->size;
ppn = (uint16_t*)malloc(nbytes);
for (i=0; i < jdata->num_apps; i++) { for (i=0; i < jdata->num_apps; i++) {
/* reset the #procs */ /* for each app_context */
memset(ppn, 0, nbytes); for (j=0; j < jdata->map->nodes->size; j++) {
/* for each app_context, compute the #procs on if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(jdata->map->nodes, j))) {
* each node of the allocation */
for (j=0; j < orte_node_pool->size; j++) {
if (NULL == (nptr = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, j))) {
continue; continue;
} }
if (NULL == nptr->daemon) { if (NULL == nptr->daemon) {
continue; continue;
} }
ppn = 0;
for (k=0; k < nptr->procs->size; k++) { for (k=0; k < nptr->procs->size; k++) {
if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(nptr->procs, k))) { if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(nptr->procs, k))) {
if (proc->name.jobid == jdata->jobid) { if (proc->name.jobid == jdata->jobid) {
++ppn[j]; ++ppn;
} }
} }
} }
if (0 < ppn) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &nptr->index, 1, ORTE_STD_CNTR))) {
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(&bucket, &ppn, 1, OPAL_UINT16))) {
goto cleanup;
}
}
} }
if (opal_compress.compress_block((uint8_t*)ppn, nbytes, opal_dss.unload(&bucket, (void**)&bytes, &nbytes);
if (opal_compress.compress_block(bytes, (size_t)nbytes,
(uint8_t**)&bo.bytes, &sz)) { (uint8_t**)&bo.bytes, &sz)) {
/* mark that this was compressed */ /* mark that this was compressed */
compressed = true; compressed = true;
@ -988,7 +996,7 @@ int orte_util_generate_ppn(orte_job_t *jdata,
} else { } else {
/* mark that this was not compressed */ /* mark that this was not compressed */
compressed = false; compressed = false;
bo.bytes = (uint8_t*)ppn; bo.bytes = bytes;
bo.size = nbytes; bo.size = nbytes;
} }
/* indicate compression */ /* indicate compression */
@ -1015,21 +1023,31 @@ int orte_util_generate_ppn(orte_job_t *jdata,
} }
cleanup: cleanup:
free(ppn); OBJ_DESTRUCT(&bucket);
return rc; return rc;
} }
int orte_util_decode_ppn(orte_job_t *jdata, int orte_util_decode_ppn(orte_job_t *jdata,
opal_buffer_t *buf) opal_buffer_t *buf)
{ {
orte_std_cntr_t index;
orte_app_idx_t n; orte_app_idx_t n;
int m, cnt, rc; int cnt, rc;
opal_byte_object_t *boptr; opal_byte_object_t *boptr;
bool compressed; bool compressed;
uint8_t *bytes;
size_t sz; size_t sz;
uint16_t *ppn, k; uint16_t ppn, k;
orte_node_t *node; orte_node_t *node;
orte_proc_t *proc; orte_proc_t *proc;
opal_buffer_t bucket;
/* reset any flags */
for (n=0; n < orte_node_pool->size; n++) {
if (NULL != (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, n))) {
ORTE_FLAG_UNSET(node, ORTE_NODE_FLAG_MAPPED);
}
}
for (n=0; n < jdata->num_apps; n++) { for (n=0; n < jdata->num_apps; n++) {
/* unpack the compression flag */ /* unpack the compression flag */
@ -1062,14 +1080,14 @@ int orte_util_decode_ppn(orte_job_t *jdata,
/* decompress if required */ /* decompress if required */
if (compressed) { if (compressed) {
if (!opal_compress.decompress_block((uint8_t**)&ppn, sz, if (!opal_compress.decompress_block(&bytes, sz,
boptr->bytes, boptr->size)) { boptr->bytes, boptr->size)) {
ORTE_ERROR_LOG(ORTE_ERROR); ORTE_ERROR_LOG(ORTE_ERROR);
OBJ_RELEASE(boptr); OBJ_RELEASE(boptr);
return ORTE_ERROR; return ORTE_ERROR;
} }
} else { } else {
ppn = (uint16_t*)boptr->bytes; bytes = boptr->bytes;
boptr->bytes = NULL; boptr->bytes = NULL;
boptr->size = 0; boptr->size = 0;
} }
@ -1078,38 +1096,67 @@ int orte_util_decode_ppn(orte_job_t *jdata,
} }
free(boptr); free(boptr);
/* cycle thru the node pool */ /* setup to unpack */
for (m=0; m < orte_node_pool->size; m++) { OBJ_CONSTRUCT(&bucket, opal_buffer_t);
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, m))) { opal_dss.load(&bucket, bytes, sz);
continue;
/* unpack each node and its ppn */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &index, &cnt, ORTE_STD_CNTR))) {
/* get the corresponding node object */
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, index))) {
rc = ORTE_ERR_NOT_FOUND;
ORTE_ERROR_LOG(rc);
goto error;
} }
if (0 < ppn[m]) { /* add the node to the job map if not already assigned */
if (!ORTE_FLAG_TEST(node, ORTE_NODE_FLAG_MAPPED)) { if (!ORTE_FLAG_TEST(node, ORTE_NODE_FLAG_MAPPED)) {
OBJ_RETAIN(node); OBJ_RETAIN(node);
ORTE_FLAG_SET(node, ORTE_NODE_FLAG_MAPPED); opal_pointer_array_add(jdata->map->nodes, node);
opal_pointer_array_add(jdata->map->nodes, node); ORTE_FLAG_SET(node, ORTE_NODE_FLAG_MAPPED);
}
/* create a proc object for each one */
for (k=0; k < ppn[m]; k++) {
proc = OBJ_NEW(orte_proc_t);
proc->name.jobid = jdata->jobid;
/* leave the vpid undefined as this will be determined
* later when we do the overall ranking */
proc->app_idx = n;
proc->parent = node->daemon->name.vpid;
OBJ_RETAIN(node);
proc->node = node;
/* flag the proc as ready for launch */
proc->state = ORTE_PROC_STATE_INIT;
opal_pointer_array_add(node->procs, proc);
/* we will add the proc to the jdata array when we
* compute its rank */
}
node->num_procs += ppn[m];
} }
/* get the ppn */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &ppn, &cnt, OPAL_UINT16))) {
ORTE_ERROR_LOG(rc);
goto error;
}
/* create a proc object for each one */
for (k=0; k < ppn; k++) {
proc = OBJ_NEW(orte_proc_t);
proc->name.jobid = jdata->jobid;
/* leave the vpid undefined as this will be determined
* later when we do the overall ranking */
proc->app_idx = n;
proc->parent = node->daemon->name.vpid;
OBJ_RETAIN(node);
proc->node = node;
/* flag the proc as ready for launch */
proc->state = ORTE_PROC_STATE_INIT;
opal_pointer_array_add(node->procs, proc);
/* we will add the proc to the jdata array when we
* compute its rank */
}
node->num_procs += ppn;
cnt = 1;
} }
free(ppn); OBJ_DESTRUCT(&bucket);
}
/* reset any flags */
for (n=0; n < jdata->map->nodes->size; n++) {
node = (orte_node_t*)opal_pointer_array_get_item(jdata->map->nodes, n);
ORTE_FLAG_UNSET(node, ORTE_NODE_FLAG_MAPPED);
} }
return ORTE_SUCCESS; return ORTE_SUCCESS;
error:
OBJ_DESTRUCT(&bucket);
/* reset any flags */
for (n=0; n < jdata->map->nodes->size; n++) {
node = (orte_node_t*)opal_pointer_array_get_item(jdata->map->nodes, n);
ORTE_FLAG_UNSET(node, ORTE_NODE_FLAG_MAPPED);
}
return rc;
} }