1
1

Merge pull request #2669 from rhc54/topic/memprobe

Complete the memprobe support.
Этот коммит содержится в:
Ralph Castain 2017-01-05 12:02:56 -08:00 коммит произвёл GitHub
родитель b4088c331a 6509f60929
Коммит b343df43a1
19 изменённых файлов: 472 добавлений и 179 удалений

Просмотреть файл

@ -4,14 +4,14 @@ all: $(PROGS)
CFLAGS = -O
orte_no_op:
orte_no_op: orte_no_op.c
ortecc -o orte_no_op orte_no_op.c
mpi_no_op:
mpi_no_op: mpi_no_op.c
mpicc -o mpi_no_op mpi_no_op.c
mpi_memprobe:
mpicc -o mpi_memprobe mpi_memprobe.c -lopen-pal
mpi_memprobe: mpi_memprobe.c
mpicc -o mpi_memprobe mpi_memprobe.c -lopen-pal -lopen-rte
clean:
rm -f $(PROGS) *~

Просмотреть файл

@ -17,7 +17,6 @@
#include "orte/mca/errmgr/errmgr.h"
static int rank, size;
static volatile int active;
static volatile bool wait_for_release = true;
#define MEMPROBE_RELEASE 12345
@ -27,7 +26,6 @@ static void _release_fn(int status,
opal_pmix_notification_complete_fn_t cbfunc,
void *cbdata)
{
fprintf(stderr, "Rank %d: Release recvd\n", rank);
/* must let the notifier know we are done */
if (NULL != cbfunc) {
cbfunc(OPAL_ERR_HANDLERS_COMPLETE, NULL, NULL, NULL, cbdata);
@ -58,7 +56,6 @@ static void qcbfunc(int status,
opal_list_t *results = (opal_list_t*)cbdata;
opal_value_t *kv;
fprintf(stderr, "Rank %d: Query returned status %d\n", rank, status);
if (NULL != info) {
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(info))) {
opal_list_append(results, &kv->super);
@ -70,17 +67,131 @@ static void qcbfunc(int status,
wait_for_release = false;
}
static void notifycbfunc(int status, void *cbdata)
{
volatile int *active = (volatile int*)cbdata;
*active = status;
}
static void sample(void)
{
opal_value_t *kv, *ival;
opal_pmix_query_t *q;
opal_list_t query, response, *lt;
volatile int active;
char **answer = NULL, *tmp, *msg;
OBJ_CONSTRUCT(&query, opal_list_t);
OBJ_CONSTRUCT(&response, opal_list_t);
q = OBJ_NEW(opal_pmix_query_t);
opal_list_append(&query, &q->super);
opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE);
/* qualify that we just want local avg, min/max values reported */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_LOCAL_ONLY);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
/* issue the request */
wait_for_release = true;
opal_pmix.query(&query, qcbfunc, (void*)&response);
/* wait for the query to complete */
while (wait_for_release) {
usleep(10);
}
wait_for_release = true;
/* log my own results as a single string so the output
* doesn't get garbled on the other end */
asprintf(&tmp, "Data for node %s", orte_process_info.nodename);
opal_argv_append_nosize(&answer, tmp);
free(tmp);
OPAL_LIST_FOREACH(kv, &response, opal_value_t) {
lt = (opal_list_t*)kv->data.ptr;
OPAL_LIST_FOREACH(ival, lt, opal_value_t) {
if (0 == strcmp(ival->key, OPAL_PMIX_DAEMON_MEMORY)) {
asprintf(&tmp, "\tDaemon: %f", ival->data.fval);
opal_argv_append_nosize(&answer, tmp);
free(tmp);
} else if (0 == strcmp(ival->key, OPAL_PMIX_CLIENT_AVG_MEMORY)) {
asprintf(&tmp, "\tClient: %f", ival->data.fval);
opal_argv_append_nosize(&answer, tmp);
free(tmp);
} else {
fprintf(stderr, "\tUnknown key: %s", ival->key);
}
}
}
opal_argv_append_nosize(&answer, "\n");
OPAL_LIST_DESTRUCT(&response);
/* construct the log output */
OBJ_CONSTRUCT(&response, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_LOG_STDOUT);
kv->type = OPAL_STRING;
kv->data.string = opal_argv_join(answer, '\n');
opal_list_append(&response, &kv->super);
opal_argv_free(answer);
active = -1;
opal_pmix.log(&response, notifycbfunc, (void*)&active);
while (-1 == active) {
usleep(10);
}
OPAL_LIST_DESTRUCT(&response);
if (0 == rank) {
/* send the notification to release the other procs */
wait_for_release = true;
OBJ_CONSTRUCT(&response, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&response, &kv->super);
active = -1;
if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL,
OPAL_PMIX_RANGE_GLOBAL, &response,
notifycbfunc, (void*)&active)) {
fprintf(stderr, "Notify event failed\n");
exit(1);
}
while (-1 == active) {
usleep(10);
}
OPAL_LIST_DESTRUCT(&response);
}
/* now wait for notification */
while (wait_for_release) {
usleep(10);
}
}
int main(int argc, char* argv[])
{
opal_list_t *codes;
opal_value_t *kv;
opal_pmix_query_t *q;
opal_list_t query, response;
volatile int active;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (0 == rank) {
fprintf(stderr, "Sampling memory usage after MPI_Init\n");
}
/* everyone registers their event handler */
codes = OBJ_NEW(opal_list_t);
kv = OBJ_NEW(opal_value_t);
@ -95,54 +206,11 @@ int main(int argc, char* argv[])
usleep(10);
}
/* rank 0 asks for memory to be sampled, while everyone else waits */
if (0 == rank) {
fprintf(stderr, "Sampling memory usage after MPI_Init\n");
OBJ_CONSTRUCT(&query, opal_list_t);
OBJ_CONSTRUCT(&response, opal_list_t);
q = OBJ_NEW(opal_pmix_query_t);
opal_list_append(&query, &q->super);
opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE);
/* qualify that we just want avg, min/max values reported */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
/* issue the request */
wait_for_release = true;
opal_pmix.query(&query, qcbfunc, (void*)&response);
while (wait_for_release) {
usleep(10);
}
/* output the results */
OPAL_LIST_FOREACH(kv, &response, opal_value_t) {
fprintf(stderr, "\tResults: %s\n", kv->key);
}
OPAL_LIST_DESTRUCT(&response);
/* send the notification to release the other procs */
wait_for_release = true;
OBJ_CONSTRUCT(&response, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&response, &kv->super);
if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL,
OPAL_PMIX_RANGE_GLOBAL, &response,
NULL, NULL)) {
fprintf(stderr, "Notify event failed\n");
exit(1);
}
while (wait_for_release) {
usleep(10);
}
OPAL_LIST_DESTRUCT(&response);
/* if I am the local leader (i.e., local_rank=0), then I ask
* my daemon to report the local memory usage, and send it
* to rank=0 */
if (0 == orte_process_info.my_local_rank) {
sample();
} else {
/* now wait for notification */
while (wait_for_release) {
@ -157,51 +225,14 @@ int main(int argc, char* argv[])
if (0 == rank) {
fprintf(stderr, "\n\nSampling memory usage after MPI_Barrier\n");
OBJ_CONSTRUCT(&query, opal_list_t);
OBJ_CONSTRUCT(&response, opal_list_t);
q = OBJ_NEW(opal_pmix_query_t);
opal_list_append(&query, &q->super);
opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE);
/* qualify that we just want avg, min/max values reported */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
/* issue the request */
wait_for_release = true;
opal_pmix.query(&query, qcbfunc, (void*)&response);
while (wait_for_release) {
usleep(10);
}
if (0 == orte_process_info.my_local_rank) {
if (0 != rank) {
/* wait a little */
usleep(1000);
}
/* output the results */
OPAL_LIST_FOREACH(kv, &response, opal_value_t) {
fprintf(stderr, "\tResults: %s\n", kv->key);
}
OPAL_LIST_DESTRUCT(&response);
/* send the notification to release the other procs */
wait_for_release = true;
OBJ_CONSTRUCT(&response, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&response, &kv->super);
if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL,
OPAL_PMIX_RANGE_GLOBAL, &response,
NULL, NULL)) {
fprintf(stderr, "Notify event failed\n");
exit(1);
}
while (wait_for_release) {
usleep(10);
}
OPAL_LIST_DESTRUCT(&response);
sample();
} else {
/* wait again while memory is sampled */
while (wait_for_release) {
@ -209,8 +240,6 @@ int main(int argc, char* argv[])
}
}
fprintf(stderr, "%d: FINALIZING\n", rank);
fflush(stderr);
MPI_Finalize();
return 0;
}

Просмотреть файл

@ -10,7 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Mellanox Technologies, Inc.
@ -688,6 +688,7 @@ pmix_status_t pmix_bfrop_unpack_status(pmix_buffer_t *buffer, void *dest,
return PMIX_ERR_NOMEM;
}
if (PMIX_SUCCESS != (ret = pmix_bfrop_unpack_buffer(buffer, val->data.darray, &m, PMIX_DATA_ARRAY))) {
PMIX_ERROR_LOG(ret);
return ret;
}
break;
@ -1274,6 +1275,9 @@ pmix_status_t pmix_bfrop_unpack_darray(pmix_buffer_t *buffer, void *dest,
case PMIX_COMPRESSED_STRING:
nbytes = sizeof(pmix_byte_object_t);
break;
case PMIX_INFO:
nbytes = sizeof(pmix_info_t);
break;
case PMIX_PERSIST:
nbytes = sizeof(pmix_persistence_t);
break;

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2016 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2016 IBM Corporation. All rights reserved.
@ -78,7 +78,11 @@ static void query_cbfunc(struct pmix_peer_t *peer,
cnt = results->ninfo;
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, results->info, &cnt, PMIX_INFO))) {
PMIX_ERROR_LOG(rc);
goto complete;
pmix_output(0, "TYPE: %d", results->info[0].value.type);
results->status = rc;
PMIX_INFO_FREE(results->info, results->ninfo);
results->info = NULL;
results->ninfo = 0;
}
}

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -438,6 +438,15 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain)
return;
}
/* just a simple tracker so we know who we notified */
typedef struct pmix_event_trkr_t {
pmix_list_item_t super;
pmix_peer_t *peer;
} pmix_event_trkr_t;
static PMIX_CLASS_INSTANCE(pmix_event_trkr_t,
pmix_list_item_t,
NULL, NULL);
static void _notify_client_event(int sd, short args, void *cbdata)
{
@ -446,7 +455,9 @@ static void _notify_client_event(int sd, short args, void *cbdata)
pmix_regevents_info_t *reginfoptr;
pmix_peer_events_info_t *pr;
size_t n;
bool matched;
bool matched, notify;;
pmix_list_t recips;
pmix_event_trkr_t *trkr;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: _notify_error notifying clients of error %s",
@ -466,6 +477,7 @@ static void _notify_client_event(int sd, short args, void *cbdata)
/* cycle across our registered events and send the message to
* any client who registered for it */
PMIX_CONSTRUCT(&recips, pmix_list_t);
PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) {
if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) ||
cd->status == reginfoptr->code) {
@ -494,14 +506,30 @@ static void _notify_client_event(int sd, short args, void *cbdata)
continue;
}
}
/* if we have already notified this client, then don't do it again */
notify = true;
PMIX_LIST_FOREACH(trkr, &recips, pmix_event_trkr_t) {
if (trkr->peer == pr->peer) {
notify = false;
break;
}
}
if (!notify) {
continue;
}
/* add this peer to the list of prior recipients */
trkr = PMIX_NEW(pmix_event_trkr_t);
trkr->peer = pr->peer;
pmix_list_append(&recips, &trkr->super);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: notifying client %s:%d",
pr->peer->info->nptr->nspace, pr->peer->info->rank);
"pmix_server: notifying client %s:%d of code %d",
pr->peer->info->nptr->nspace, pr->peer->info->rank, cd->status);
PMIX_RETAIN(cd->buf);
PMIX_SERVER_QUEUE_REPLY(pr->peer, 0, cd->buf);
}
}
}
PMIX_LIST_DESTRUCT(&recips);
/* notify the caller */
if (NULL != cd->cbfunc) {
@ -530,8 +558,8 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
size_t n;
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: notify client of event %s",
PMIx_Error_string(status));
"pmix_server: notify client of event %s with %lu ninfos",
PMIx_Error_string(status), ninfo);
cd = PMIX_NEW(pmix_notify_caddy_t);
cd->status = status;

Просмотреть файл

@ -785,6 +785,9 @@ int pmix2x_value_unload(opal_value_t *kv,
int rc=OPAL_SUCCESS;
bool found;
opal_pmix2x_jobid_trkr_t *job;
opal_list_t *lt;
opal_value_t *ival;
size_t n;
switch(v->type) {
case PMIX_UNDEF:
@ -927,6 +930,25 @@ int pmix2x_value_unload(opal_value_t *kv,
kv->type = OPAL_PTR;
kv->data.ptr = v->data.ptr;
break;
case PMIX_DATA_ARRAY:
if (NULL == v->data.darray || NULL == v->data.darray->array) {
kv->data.ptr = NULL;
break;
}
lt = OBJ_NEW(opal_list_t);
kv->type = OPAL_PTR;
kv->data.ptr = (void*)lt;
for (n=0; n < v->data.darray->size; n++) {
ival = OBJ_NEW(opal_value_t);
opal_list_append(lt, &ival->super);
/* handle the various types */
if (PMIX_INFO == v->data.darray->type) {
pmix_info_t *iptr = (pmix_info_t*)v->data.darray->array;
ival->key = strdup(iptr[n].key);
pmix2x_value_unload(ival, &iptr[n].value);
}
}
break;
default:
/* silence warnings */
rc = OPAL_ERROR;
@ -1138,12 +1160,109 @@ static int notify_event(int status,
return OPAL_SUCCESS;
}
static void relcbfunc(void *cbdata)
{
opal_list_t *results = (opal_list_t*)cbdata;
if (NULL != results) {
OPAL_LIST_RELEASE(results);
}
}
static void infocbfunc(pmix_status_t status,
pmix_info_t *info, size_t ninfo,
void *cbdata,
pmix_release_cbfunc_t release_fn,
void *release_cbdata)
{
pmix2x_opcaddy_t *cd = (pmix2x_opcaddy_t*)cbdata;
int rc = OPAL_SUCCESS;
opal_list_t *results = NULL;
opal_value_t *iptr;
size_t n;
/* convert the array of pmix_info_t to the list of info */
if (NULL != info) {
results = OBJ_NEW(opal_list_t);
for (n=0; n < ninfo; n++) {
iptr = OBJ_NEW(opal_value_t);
opal_list_append(results, &iptr->super);
iptr->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix2x_value_unload(iptr, &info[n].value))) {
OPAL_LIST_RELEASE(results);
results = NULL;
break;
}
}
}
if (NULL != release_fn) {
release_fn(release_cbdata);
}
/* return the values to the original requestor */
if (NULL != cd->qcbfunc) {
cd->qcbfunc(rc, results, cd->cbdata, relcbfunc, results);
}
OBJ_RELEASE(cd);
}
static void pmix2x_query(opal_list_t *queries,
opal_pmix_info_cbfunc_t cbfunc, void *cbdata)
{
if (NULL != cbfunc) {
cbfunc(OPAL_ERR_NOT_SUPPORTED, NULL, cbdata, NULL, NULL);
int rc;
opal_value_t *ival;
size_t n, nqueries, nq;
pmix2x_opcaddy_t *cd;
pmix_status_t prc;
opal_pmix_query_t *q;
/* create the caddy */
cd = OBJ_NEW(pmix2x_opcaddy_t);
/* bozo check */
if (NULL == queries || 0 == (nqueries = opal_list_get_size(queries))) {
rc = OPAL_ERR_BAD_PARAM;
goto CLEANUP;
}
/* setup the operation */
cd->qcbfunc = cbfunc;
cd->cbdata = cbdata;
cd->nqueries = nqueries;
/* convert the list to an array of query objects */
PMIX_QUERY_CREATE(cd->queries, cd->nqueries);
n=0;
OPAL_LIST_FOREACH(q, queries, opal_pmix_query_t) {
cd->queries[n].keys = opal_argv_copy(q->keys);
cd->queries[n].nqual = opal_list_get_size(&q->qualifiers);
if (0 < cd->queries[n].nqual) {
PMIX_INFO_CREATE(cd->queries[n].qualifiers, cd->queries[n].nqual);
nq = 0;
OPAL_LIST_FOREACH(ival, &q->qualifiers, opal_value_t) {
(void)strncpy(cd->queries[n].qualifiers[nq].key, ival->key, PMIX_MAX_KEYLEN);
pmix2x_value_load(&cd->queries[n].qualifiers[nq].value, ival);
++nq;
}
}
++n;
}
/* pass it down */
if (PMIX_SUCCESS != (prc = PMIx_Query_info_nb(cd->queries, cd->nqueries,
infocbfunc, cd))) {
/* do not hang! */
rc = pmix2x_convert_rc(prc);
goto CLEANUP;
}
return;
CLEANUP:
if (NULL != cbfunc) {
cbfunc(rc, NULL, cbdata, NULL, NULL);
}
OBJ_RELEASE(cd);
return;
}
@ -1234,6 +1353,8 @@ static void opcon(pmix2x_opcaddy_t *p)
p->active = false;
p->codes = NULL;
p->pcodes = NULL;
p->queries = NULL;
p->nqueries = 0;
p->event = NULL;
p->opcbfunc = NULL;
p->mdxcbfunc = NULL;
@ -1260,6 +1381,9 @@ static void opdes(pmix2x_opcaddy_t *p)
if (NULL != p->pcodes) {
free(p->pcodes);
}
if (NULL != p->queries) {
PMIX_QUERY_FREE(p->queries, p->nqueries);
}
}
OBJ_CLASS_INSTANCE(pmix2x_opcaddy_t,
opal_object_t,

Просмотреть файл

@ -80,6 +80,8 @@ typedef struct {
opal_list_t *codes;
pmix_status_t *pcodes;
size_t ncodes;
pmix_query_t *queries;
size_t nqueries;
opal_pmix2x_event_t *event;
opal_pmix_op_cbfunc_t opcbfunc;
opal_pmix_modex_cbfunc_t mdxcbfunc;
@ -87,6 +89,7 @@ typedef struct {
opal_pmix_lookup_cbfunc_t lkcbfunc;
opal_pmix_spawn_cbfunc_t spcbfunc;
opal_pmix_evhandler_reg_cbfunc_t evregcbfunc;
opal_pmix_info_cbfunc_t qcbfunc;
void *cbdata;
} pmix2x_opcaddy_t;
OBJ_CLASS_DECLARATION(pmix2x_opcaddy_t);

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014-2015 Mellanox Technologies, Inc.
@ -1015,6 +1015,7 @@ static void server_log(const pmix_proc_t *proct,
/* convert the data */
for (n=0; n < ndata; n++) {
oinfo = OBJ_NEW(opal_value_t);
oinfo->key = strdup(data[n].key);
/* we "borrow" the info field of the caddy as we and the
* server function both agree on what will be there */
opal_list_append(&opalcaddy->info, &oinfo->super);

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.

Просмотреть файл

@ -231,9 +231,10 @@ BEGIN_C_DECLS
#define OPAL_PMIX_QUERY_REPORT_MINMAX "pmix.qry.minmax" // report minimum and maximum value
/* log attributes */
#define OPAL_PMIX_LOG_STDERR "pmix.log.stderr" // (bool) log data to stderr
#define OPAL_PMIX_LOG_STDOUT "pmix.log.stdout" // (bool) log data to stdout
#define OPAL_PMIX_LOG_SYSLOG "pmix.log.syslog" // (bool) log data to syslog - defaults to ERROR priority unless
#define OPAL_PMIX_LOG_STDERR "pmix.log.stderr" // (char*) log string to stderr
#define OPAL_PMIX_LOG_STDOUT "pmix.log.stdout" // (char*) log string to stdout
#define OPAL_PMIX_LOG_SYSLOG "pmix.log.syslog" // (char*) log data to syslog - defaults to ERROR priority unless
#define OPAL_PMIX_LOG_MSG "pmix.log.msg" // (pmix_byte_object_t) message blob to be sent somewhere
/* debugger attributes */
#define OPAL_PMIX_DEBUG_STOP_ON_EXEC "pmix.dbg.exec" // (bool) job is being spawned under debugger - instruct it to pause on start

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -200,8 +200,8 @@ ORTE_DECLSPEC int orte_iof_base_flush(void);
ORTE_DECLSPEC extern orte_iof_base_t orte_iof_base;
/* base functions */
ORTE_DECLSPEC int orte_iof_base_write_output(orte_process_name_t *name, orte_iof_tag_t stream,
unsigned char *data, int numbytes,
ORTE_DECLSPEC int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream,
const unsigned char *data, int numbytes,
orte_iof_write_event_t *channel);
ORTE_DECLSPEC void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev);
ORTE_DECLSPEC void orte_iof_base_write_handler(int fd, short event, void *cbdata);

Просмотреть файл

@ -10,6 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -43,8 +44,8 @@
#include "orte/mca/iof/base/base.h"
int orte_iof_base_write_output(orte_process_name_t *name, orte_iof_tag_t stream,
unsigned char *data, int numbytes,
int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream,
const unsigned char *data, int numbytes,
orte_iof_write_event_t *channel)
{
char starttag[ORTE_IOF_BASE_TAG_MAX], endtag[ORTE_IOF_BASE_TAG_MAX], *suffix;

Просмотреть файл

@ -15,7 +15,7 @@
* reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -68,6 +68,10 @@ static int hnp_pull(const orte_process_name_t* src_name,
static int hnp_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag);
static int hnp_output(const orte_process_name_t* peer,
orte_iof_tag_t source_tag,
const char *msg);
static int finalize(void);
static int hnp_ft_event(int state);
@ -79,13 +83,13 @@ static int hnp_ft_event(int state);
*/
orte_iof_base_module_t orte_iof_hnp_module = {
init,
hnp_push,
hnp_pull,
hnp_close,
NULL,
finalize,
hnp_ft_event
.init = init,
.push = hnp_push,
.pull = hnp_pull,
.close = hnp_close,
.output = hnp_output,
.finalize = finalize,
.ft_event = hnp_ft_event
};
/* Initialize the module */
@ -599,3 +603,17 @@ CHECK:
}
}
}
static int hnp_output(const orte_process_name_t* peer,
orte_iof_tag_t source_tag,
const char *msg)
{
/* output this to our local output */
if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) {
orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev);
} else {
orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev);
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -13,7 +13,7 @@
* Copyright (c) 2007-2008 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -191,6 +191,13 @@ typedef int (*orte_iof_base_pull_fn_t)(const orte_process_name_t* peer,
typedef int (*orte_iof_base_close_fn_t)(const orte_process_name_t* peer,
orte_iof_tag_t source_tag);
/**
* Output something via the IOF subsystem
*/
typedef int (*orte_iof_base_output_fn_t)(const orte_process_name_t* peer,
orte_iof_tag_t source_tag,
const char *msg);
/* Flag that a job is complete */
typedef void (*orte_iof_base_complete_fn_t)(const orte_job_t *jdata);
@ -210,6 +217,7 @@ struct orte_iof_base_module_2_0_0_t {
orte_iof_base_push_fn_t push;
orte_iof_base_pull_fn_t pull;
orte_iof_base_close_fn_t close;
orte_iof_base_output_fn_t output;
orte_iof_base_complete_fn_t complete;
orte_iof_base_finalize_fn_t finalize;
orte_iof_base_ft_event_fn_t ft_event;

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -69,6 +69,10 @@ static int orted_pull(const orte_process_name_t* src_name,
static int orted_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag);
static int orted_output(const orte_process_name_t* peer,
orte_iof_tag_t source_tag,
const char *msg);
static int finalize(void);
static int orted_ft_event(int state);
@ -82,13 +86,13 @@ static int orted_ft_event(int state);
*/
orte_iof_base_module_t orte_iof_orted_module = {
init,
orted_push,
orted_pull,
orted_close,
NULL,
finalize,
orted_ft_event
.init = init,
.push = orted_push,
.pull = orted_pull,
.close = orted_close,
.output = orted_output,
.finalize = finalize,
.ft_event = orted_ft_event
};
static int init(void)
@ -437,3 +441,46 @@ CHECK:
}
}
}
static int orted_output(const orte_process_name_t* peer,
orte_iof_tag_t source_tag,
const char *msg)
{
opal_buffer_t *buf;
int rc;
/* prep the buffer */
buf = OBJ_NEW(opal_buffer_t);
/* pack the stream first - we do this so that flow control messages can
* consist solely of the tag
*/
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &source_tag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack name of process that gave us this data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, peer, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the data - for compatibility, we have to pack this as OPAL_BYTE,
* so ensure we include the NULL string terminator */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, msg, strlen(msg)+1, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* start non-blocking RML call to forward received data */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:orted:output sending %d bytes to HNP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)strlen(msg)+1));
orte_rml.send_buffer_nb(orte_mgmt_conduit,
ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
orte_rml_send_callback, NULL);
return ORTE_SUCCESS;
}

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -43,18 +43,6 @@
#include "iof_orted.h"
/*
* Callback when non-blocking RML send completes.
*/
static void send_cb(int status, orte_process_name_t *peer,
opal_buffer_t *buf, orte_rml_tag_t tag,
void *cbdata)
{
/* nothing to do here - just release buffer and return */
OBJ_RELEASE(buf);
}
void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
{
orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
@ -146,7 +134,7 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
orte_rml.send_buffer_nb(orte_mgmt_conduit,
ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP,
send_cb, NULL);
orte_rml_send_callback, NULL);
/* re-add the event */
opal_event_add(rev->ev, 0);

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -52,18 +52,22 @@ static int tool_pull(const orte_process_name_t* src_name,
static int tool_close(const orte_process_name_t* peer,
orte_iof_tag_t source_tag);
static int tool_output(const orte_process_name_t* peer,
orte_iof_tag_t source_tag,
const char *msg);
static int finalize(void);
static int tool_ft_event(int state);
orte_iof_base_module_t orte_iof_tool_module = {
init,
tool_push,
tool_pull,
tool_close,
NULL,
finalize,
tool_ft_event
.init = init,
.push = tool_push,
.pull = tool_pull,
.close = tool_close,
.output = tool_output,
.finalize = finalize,
.ft_event = tool_ft_event
};
@ -276,6 +280,20 @@ static int finalize(void)
return ORTE_SUCCESS;
}
static int tool_output(const orte_process_name_t* peer,
orte_iof_tag_t source_tag,
const char *msg)
{
/* output this to our local output */
if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) {
orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev);
} else {
orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev);
}
return ORTE_SUCCESS;
}
/*
* FT event
*/

Просмотреть файл

@ -38,12 +38,13 @@
#include "opal/mca/pstat/pstat.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/state/state.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/plm/base/plm_private.h"
#include "orte/mca/plm/base/plm_private.h"
#include "pmix_server_internal.h"
@ -529,6 +530,7 @@ static void _query(int sd, short args, void *cbdata)
opal_argv_free(ans);
ans = NULL;
} else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_MEMORY_USAGE)) {
OBJ_CONSTRUCT(&targets, opal_list_t);
/* check the qualifiers to find the procs they want to
* know about - if qualifiers are NULL, then get it for
* the daemons + all active jobs */
@ -537,6 +539,7 @@ static void _query(int sd, short args, void *cbdata)
/* xcast a request for all memory usage */
/* return success - the callback will be done
* once we get the results */
OPAL_LIST_DESTRUCT(&targets);
return;
}
@ -551,6 +554,8 @@ static void _query(int sd, short args, void *cbdata)
} else if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) {
/* save this directive on our list of targets */
nm = OBJ_NEW(orte_namelist_t);
memcpy(&nm->name, &kv->data.name, sizeof(opal_process_name_t));
opal_list_append(&targets, &nm->super);
}
}
@ -601,6 +606,7 @@ static void _query(int sd, short args, void *cbdata)
}
} else {
opal_output(0, "NONLOCAL");
/* if they want it for remote procs, see who is hosting them
* and ask directly for the info - if rank=wildcard, then
* we need to xcast the request and collect the results */
@ -772,24 +778,37 @@ void pmix_server_log_fn(opal_process_name_t *requestor,
"%s logging info",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* for now, we only support logging show_help messages */
OPAL_LIST_FOREACH(val, info, opal_value_t) {
/* we ignore the key as irrelevant - we only want to
* pull out the blob */
if (OPAL_BYTE_OBJECT != val->type) {
if (NULL == val->key) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
continue;
}
buf = OBJ_NEW(opal_buffer_t);
opal_dss.load(buf, val->data.bo.bytes, val->data.bo.size);
val->data.bo.bytes = NULL;
if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
ORTE_PROC_MY_HNP, buf,
ORTE_RML_TAG_SHOW_HELP,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
if (0 == strcmp(val->key, OPAL_PMIX_LOG_MSG)) {
/* pull out the blob */
if (OPAL_BYTE_OBJECT != val->type) {
continue;
}
buf = OBJ_NEW(opal_buffer_t);
opal_dss.load(buf, val->data.bo.bytes, val->data.bo.size);
val->data.bo.bytes = NULL;
if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
ORTE_PROC_MY_HNP, buf,
ORTE_RML_TAG_SHOW_HELP,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
}
} else if (0 == strcmp(val->key, OPAL_PMIX_LOG_STDERR)) {
if (ORTE_SUCCESS != (rc = orte_iof.output(requestor, ORTE_IOF_STDERR, val->data.string))) {
ORTE_ERROR_LOG(rc);
}
} else if (0 == strcmp(val->key, OPAL_PMIX_LOG_STDOUT)) {
if (ORTE_SUCCESS != (rc = orte_iof.output(requestor, ORTE_IOF_STDOUT, val->data.string))) {
ORTE_ERROR_LOG(rc);
}
}
}
if (NULL != cbfunc) {
cbfunc(OPAL_SUCCESS, cbdata);
}

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2008-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2016 Intel, Inc. All rights reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -709,7 +709,7 @@ int orte_show_help_norender(const char *filename, const char *topic,
if (NULL != opal_pmix.log) {
OBJ_CONSTRUCT(&info, opal_list_t);
kv = OBJ_NEW(opal_value_t),
kv->key = strdup(OPAL_PMIX_LOG_STDERR);
kv->key = strdup(OPAL_PMIX_LOG_MSG);
kv->type = OPAL_BYTE_OBJECT;
opal_dss.unload(buf, (void**)&kv->data.bo.bytes, &kv->data.bo.size);
opal_list_append(&info, &kv->super);