diff --git a/contrib/scaling/Makefile b/contrib/scaling/Makefile index 69fb6233d4..585f427c95 100644 --- a/contrib/scaling/Makefile +++ b/contrib/scaling/Makefile @@ -4,14 +4,14 @@ all: $(PROGS) CFLAGS = -O -orte_no_op: +orte_no_op: orte_no_op.c ortecc -o orte_no_op orte_no_op.c -mpi_no_op: +mpi_no_op: mpi_no_op.c mpicc -o mpi_no_op mpi_no_op.c -mpi_memprobe: - mpicc -o mpi_memprobe mpi_memprobe.c -lopen-pal +mpi_memprobe: mpi_memprobe.c + mpicc -o mpi_memprobe mpi_memprobe.c -lopen-pal -lopen-rte clean: rm -f $(PROGS) *~ diff --git a/contrib/scaling/mpi_memprobe.c b/contrib/scaling/mpi_memprobe.c index 5364e71570..9661a09dd8 100644 --- a/contrib/scaling/mpi_memprobe.c +++ b/contrib/scaling/mpi_memprobe.c @@ -17,7 +17,6 @@ #include "orte/mca/errmgr/errmgr.h" static int rank, size; -static volatile int active; static volatile bool wait_for_release = true; #define MEMPROBE_RELEASE 12345 @@ -27,7 +26,6 @@ static void _release_fn(int status, opal_pmix_notification_complete_fn_t cbfunc, void *cbdata) { - fprintf(stderr, "Rank %d: Release recvd\n", rank); /* must let the notifier know we are done */ if (NULL != cbfunc) { cbfunc(OPAL_ERR_HANDLERS_COMPLETE, NULL, NULL, NULL, cbdata); @@ -58,7 +56,6 @@ static void qcbfunc(int status, opal_list_t *results = (opal_list_t*)cbdata; opal_value_t *kv; - fprintf(stderr, "Rank %d: Query returned status %d\n", rank, status); if (NULL != info) { while (NULL != (kv = (opal_value_t*)opal_list_remove_first(info))) { opal_list_append(results, &kv->super); @@ -70,17 +67,131 @@ static void qcbfunc(int status, wait_for_release = false; } +static void notifycbfunc(int status, void *cbdata) +{ + volatile int *active = (volatile int*)cbdata; + *active = status; +} + +static void sample(void) +{ + opal_value_t *kv, *ival; + opal_pmix_query_t *q; + opal_list_t query, response, *lt; + volatile int active; + char **answer = NULL, *tmp, *msg; + + OBJ_CONSTRUCT(&query, opal_list_t); + OBJ_CONSTRUCT(&response, opal_list_t); + q = OBJ_NEW(opal_pmix_query_t); + opal_list_append(&query, &q->super); + opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE); + /* qualify that we just want local avg, min/max values reported */ + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_QUERY_LOCAL_ONLY); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&q->qualifiers, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&q->qualifiers, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&q->qualifiers, &kv->super); + /* issue the request */ + wait_for_release = true; + opal_pmix.query(&query, qcbfunc, (void*)&response); + /* wait for the query to complete */ + while (wait_for_release) { + usleep(10); + } + wait_for_release = true; + /* log my own results as a single string so the output + * doesn't get garbled on the other end */ + asprintf(&tmp, "Data for node %s", orte_process_info.nodename); + opal_argv_append_nosize(&answer, tmp); + free(tmp); + OPAL_LIST_FOREACH(kv, &response, opal_value_t) { + lt = (opal_list_t*)kv->data.ptr; + OPAL_LIST_FOREACH(ival, lt, opal_value_t) { + if (0 == strcmp(ival->key, OPAL_PMIX_DAEMON_MEMORY)) { + asprintf(&tmp, "\tDaemon: %f", ival->data.fval); + opal_argv_append_nosize(&answer, tmp); + free(tmp); + } else if (0 == strcmp(ival->key, OPAL_PMIX_CLIENT_AVG_MEMORY)) { + asprintf(&tmp, "\tClient: %f", ival->data.fval); + opal_argv_append_nosize(&answer, tmp); + free(tmp); + } else { + fprintf(stderr, "\tUnknown key: %s", ival->key); + } + } + } + opal_argv_append_nosize(&answer, "\n"); + OPAL_LIST_DESTRUCT(&response); + + /* construct the log output */ + OBJ_CONSTRUCT(&response, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_LOG_STDOUT); + kv->type = OPAL_STRING; + kv->data.string = opal_argv_join(answer, '\n'); + opal_list_append(&response, &kv->super); + opal_argv_free(answer); + active = -1; + opal_pmix.log(&response, notifycbfunc, (void*)&active); + while (-1 == active) { + usleep(10); + } + OPAL_LIST_DESTRUCT(&response); + + + if (0 == rank) { + /* send the notification to release the other procs */ + wait_for_release = true; + OBJ_CONSTRUCT(&response, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&response, &kv->super); + active = -1; + if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL, + OPAL_PMIX_RANGE_GLOBAL, &response, + notifycbfunc, (void*)&active)) { + fprintf(stderr, "Notify event failed\n"); + exit(1); + } + while (-1 == active) { + usleep(10); + } + OPAL_LIST_DESTRUCT(&response); + } + + /* now wait for notification */ + while (wait_for_release) { + usleep(10); + } +} + int main(int argc, char* argv[]) { opal_list_t *codes; opal_value_t *kv; - opal_pmix_query_t *q; - opal_list_t query, response; + volatile int active; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); + if (0 == rank) { + fprintf(stderr, "Sampling memory usage after MPI_Init\n"); + } + /* everyone registers their event handler */ codes = OBJ_NEW(opal_list_t); kv = OBJ_NEW(opal_value_t); @@ -95,54 +206,11 @@ int main(int argc, char* argv[]) usleep(10); } - /* rank 0 asks for memory to be sampled, while everyone else waits */ - if (0 == rank) { - fprintf(stderr, "Sampling memory usage after MPI_Init\n"); - OBJ_CONSTRUCT(&query, opal_list_t); - OBJ_CONSTRUCT(&response, opal_list_t); - q = OBJ_NEW(opal_pmix_query_t); - opal_list_append(&query, &q->super); - opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE); - /* qualify that we just want avg, min/max values reported */ - kv = OBJ_NEW(opal_value_t); - kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG); - kv->type = OPAL_BOOL; - kv->data.flag = true; - opal_list_append(&q->qualifiers, &kv->super); - kv = OBJ_NEW(opal_value_t); - kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX); - kv->type = OPAL_BOOL; - kv->data.flag = true; - opal_list_append(&q->qualifiers, &kv->super); - /* issue the request */ - wait_for_release = true; - opal_pmix.query(&query, qcbfunc, (void*)&response); - while (wait_for_release) { - usleep(10); - } - /* output the results */ - OPAL_LIST_FOREACH(kv, &response, opal_value_t) { - fprintf(stderr, "\tResults: %s\n", kv->key); - } - OPAL_LIST_DESTRUCT(&response); - /* send the notification to release the other procs */ - wait_for_release = true; - OBJ_CONSTRUCT(&response, opal_list_t); - kv = OBJ_NEW(opal_value_t); - kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT); - kv->type = OPAL_BOOL; - kv->data.flag = true; - opal_list_append(&response, &kv->super); - if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL, - OPAL_PMIX_RANGE_GLOBAL, &response, - NULL, NULL)) { - fprintf(stderr, "Notify event failed\n"); - exit(1); - } - while (wait_for_release) { - usleep(10); - } - OPAL_LIST_DESTRUCT(&response); + /* if I am the local leader (i.e., local_rank=0), then I ask + * my daemon to report the local memory usage, and send it + * to rank=0 */ + if (0 == orte_process_info.my_local_rank) { + sample(); } else { /* now wait for notification */ while (wait_for_release) { @@ -157,51 +225,14 @@ int main(int argc, char* argv[]) if (0 == rank) { fprintf(stderr, "\n\nSampling memory usage after MPI_Barrier\n"); - OBJ_CONSTRUCT(&query, opal_list_t); - OBJ_CONSTRUCT(&response, opal_list_t); - q = OBJ_NEW(opal_pmix_query_t); - opal_list_append(&query, &q->super); - opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE); - /* qualify that we just want avg, min/max values reported */ - kv = OBJ_NEW(opal_value_t); - kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG); - kv->type = OPAL_BOOL; - kv->data.flag = true; - opal_list_append(&q->qualifiers, &kv->super); - kv = OBJ_NEW(opal_value_t); - kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX); - kv->type = OPAL_BOOL; - kv->data.flag = true; - opal_list_append(&q->qualifiers, &kv->super); - /* issue the request */ - wait_for_release = true; - opal_pmix.query(&query, qcbfunc, (void*)&response); - while (wait_for_release) { - usleep(10); + } + + if (0 == orte_process_info.my_local_rank) { + if (0 != rank) { + /* wait a little */ + usleep(1000); } - /* output the results */ - OPAL_LIST_FOREACH(kv, &response, opal_value_t) { - fprintf(stderr, "\tResults: %s\n", kv->key); - } - OPAL_LIST_DESTRUCT(&response); - /* send the notification to release the other procs */ - wait_for_release = true; - OBJ_CONSTRUCT(&response, opal_list_t); - kv = OBJ_NEW(opal_value_t); - kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT); - kv->type = OPAL_BOOL; - kv->data.flag = true; - opal_list_append(&response, &kv->super); - if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL, - OPAL_PMIX_RANGE_GLOBAL, &response, - NULL, NULL)) { - fprintf(stderr, "Notify event failed\n"); - exit(1); - } - while (wait_for_release) { - usleep(10); - } - OPAL_LIST_DESTRUCT(&response); + sample(); } else { /* wait again while memory is sampled */ while (wait_for_release) { @@ -209,8 +240,6 @@ int main(int argc, char* argv[]) } } - fprintf(stderr, "%d: FINALIZING\n", rank); - fflush(stderr); MPI_Finalize(); return 0; } diff --git a/opal/mca/pmix/pmix2x/pmix/src/buffer_ops/unpack.c b/opal/mca/pmix/pmix2x/pmix/src/buffer_ops/unpack.c index d7b16a1a62..c6fe22f802 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/buffer_ops/unpack.c +++ b/opal/mca/pmix/pmix2x/pmix/src/buffer_ops/unpack.c @@ -10,7 +10,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016 Mellanox Technologies, Inc. @@ -688,6 +688,7 @@ pmix_status_t pmix_bfrop_unpack_status(pmix_buffer_t *buffer, void *dest, return PMIX_ERR_NOMEM; } if (PMIX_SUCCESS != (ret = pmix_bfrop_unpack_buffer(buffer, val->data.darray, &m, PMIX_DATA_ARRAY))) { + PMIX_ERROR_LOG(ret); return ret; } break; @@ -1274,6 +1275,9 @@ pmix_status_t pmix_bfrop_unpack_darray(pmix_buffer_t *buffer, void *dest, case PMIX_COMPRESSED_STRING: nbytes = sizeof(pmix_byte_object_t); break; + case PMIX_INFO: + nbytes = sizeof(pmix_info_t); + break; case PMIX_PERSIST: nbytes = sizeof(pmix_persistence_t); break; diff --git a/opal/mca/pmix/pmix2x/pmix/src/common/pmix_query.c b/opal/mca/pmix/pmix2x/pmix/src/common/pmix_query.c index 77f86755b8..7f426203a3 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/common/pmix_query.c +++ b/opal/mca/pmix/pmix2x/pmix/src/common/pmix_query.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2016 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2016 IBM Corporation. All rights reserved. @@ -78,7 +78,11 @@ static void query_cbfunc(struct pmix_peer_t *peer, cnt = results->ninfo; if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, results->info, &cnt, PMIX_INFO))) { PMIX_ERROR_LOG(rc); - goto complete; + pmix_output(0, "TYPE: %d", results->info[0].value.type); + results->status = rc; + PMIX_INFO_FREE(results->info, results->ninfo); + results->info = NULL; + results->ninfo = 0; } } diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c index 9e1062be61..85c43063ec 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -438,6 +438,15 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain) return; } +/* just a simple tracker so we know who we notified */ +typedef struct pmix_event_trkr_t { + pmix_list_item_t super; + pmix_peer_t *peer; +} pmix_event_trkr_t; +static PMIX_CLASS_INSTANCE(pmix_event_trkr_t, + pmix_list_item_t, + NULL, NULL); + static void _notify_client_event(int sd, short args, void *cbdata) { @@ -446,7 +455,9 @@ static void _notify_client_event(int sd, short args, void *cbdata) pmix_regevents_info_t *reginfoptr; pmix_peer_events_info_t *pr; size_t n; - bool matched; + bool matched, notify;; + pmix_list_t recips; + pmix_event_trkr_t *trkr; pmix_output_verbose(2, pmix_globals.debug_output, "pmix_server: _notify_error notifying clients of error %s", @@ -466,6 +477,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) /* cycle across our registered events and send the message to * any client who registered for it */ + PMIX_CONSTRUCT(&recips, pmix_list_t); PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) { if ((PMIX_MAX_ERR_CONSTANT == reginfoptr->code && !cd->nondefault) || cd->status == reginfoptr->code) { @@ -494,14 +506,30 @@ static void _notify_client_event(int sd, short args, void *cbdata) continue; } } + /* if we have already notified this client, then don't do it again */ + notify = true; + PMIX_LIST_FOREACH(trkr, &recips, pmix_event_trkr_t) { + if (trkr->peer == pr->peer) { + notify = false; + break; + } + } + if (!notify) { + continue; + } + /* add this peer to the list of prior recipients */ + trkr = PMIX_NEW(pmix_event_trkr_t); + trkr->peer = pr->peer; + pmix_list_append(&recips, &trkr->super); pmix_output_verbose(2, pmix_globals.debug_output, - "pmix_server: notifying client %s:%d", - pr->peer->info->nptr->nspace, pr->peer->info->rank); + "pmix_server: notifying client %s:%d of code %d", + pr->peer->info->nptr->nspace, pr->peer->info->rank, cd->status); PMIX_RETAIN(cd->buf); PMIX_SERVER_QUEUE_REPLY(pr->peer, 0, cd->buf); } } } + PMIX_LIST_DESTRUCT(&recips); /* notify the caller */ if (NULL != cd->cbfunc) { @@ -530,8 +558,8 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status, size_t n; pmix_output_verbose(2, pmix_globals.debug_output, - "pmix_server: notify client of event %s", - PMIx_Error_string(status)); + "pmix_server: notify client of event %s with %lu ninfos", + PMIx_Error_string(status), ninfo); cd = PMIX_NEW(pmix_notify_caddy_t); cd->status = status; diff --git a/opal/mca/pmix/pmix2x/pmix2x.c b/opal/mca/pmix/pmix2x/pmix2x.c index 84743c28e9..ea8e6375fe 100644 --- a/opal/mca/pmix/pmix2x/pmix2x.c +++ b/opal/mca/pmix/pmix2x/pmix2x.c @@ -785,6 +785,9 @@ int pmix2x_value_unload(opal_value_t *kv, int rc=OPAL_SUCCESS; bool found; opal_pmix2x_jobid_trkr_t *job; + opal_list_t *lt; + opal_value_t *ival; + size_t n; switch(v->type) { case PMIX_UNDEF: @@ -927,6 +930,25 @@ int pmix2x_value_unload(opal_value_t *kv, kv->type = OPAL_PTR; kv->data.ptr = v->data.ptr; break; + case PMIX_DATA_ARRAY: + if (NULL == v->data.darray || NULL == v->data.darray->array) { + kv->data.ptr = NULL; + break; + } + lt = OBJ_NEW(opal_list_t); + kv->type = OPAL_PTR; + kv->data.ptr = (void*)lt; + for (n=0; n < v->data.darray->size; n++) { + ival = OBJ_NEW(opal_value_t); + opal_list_append(lt, &ival->super); + /* handle the various types */ + if (PMIX_INFO == v->data.darray->type) { + pmix_info_t *iptr = (pmix_info_t*)v->data.darray->array; + ival->key = strdup(iptr[n].key); + pmix2x_value_unload(ival, &iptr[n].value); + } + } + break; default: /* silence warnings */ rc = OPAL_ERROR; @@ -1138,12 +1160,109 @@ static int notify_event(int status, return OPAL_SUCCESS; } +static void relcbfunc(void *cbdata) +{ + opal_list_t *results = (opal_list_t*)cbdata; + if (NULL != results) { + OPAL_LIST_RELEASE(results); + } +} + +static void infocbfunc(pmix_status_t status, + pmix_info_t *info, size_t ninfo, + void *cbdata, + pmix_release_cbfunc_t release_fn, + void *release_cbdata) +{ + pmix2x_opcaddy_t *cd = (pmix2x_opcaddy_t*)cbdata; + int rc = OPAL_SUCCESS; + opal_list_t *results = NULL; + opal_value_t *iptr; + size_t n; + + /* convert the array of pmix_info_t to the list of info */ + if (NULL != info) { + results = OBJ_NEW(opal_list_t); + for (n=0; n < ninfo; n++) { + iptr = OBJ_NEW(opal_value_t); + opal_list_append(results, &iptr->super); + iptr->key = strdup(info[n].key); + if (OPAL_SUCCESS != (rc = pmix2x_value_unload(iptr, &info[n].value))) { + OPAL_LIST_RELEASE(results); + results = NULL; + break; + } + } + } + + if (NULL != release_fn) { + release_fn(release_cbdata); + } + + /* return the values to the original requestor */ + if (NULL != cd->qcbfunc) { + cd->qcbfunc(rc, results, cd->cbdata, relcbfunc, results); + } + OBJ_RELEASE(cd); +} + static void pmix2x_query(opal_list_t *queries, opal_pmix_info_cbfunc_t cbfunc, void *cbdata) { - if (NULL != cbfunc) { - cbfunc(OPAL_ERR_NOT_SUPPORTED, NULL, cbdata, NULL, NULL); + int rc; + opal_value_t *ival; + size_t n, nqueries, nq; + pmix2x_opcaddy_t *cd; + pmix_status_t prc; + opal_pmix_query_t *q; + + /* create the caddy */ + cd = OBJ_NEW(pmix2x_opcaddy_t); + + /* bozo check */ + if (NULL == queries || 0 == (nqueries = opal_list_get_size(queries))) { + rc = OPAL_ERR_BAD_PARAM; + goto CLEANUP; } + + /* setup the operation */ + cd->qcbfunc = cbfunc; + cd->cbdata = cbdata; + cd->nqueries = nqueries; + + /* convert the list to an array of query objects */ + PMIX_QUERY_CREATE(cd->queries, cd->nqueries); + n=0; + OPAL_LIST_FOREACH(q, queries, opal_pmix_query_t) { + cd->queries[n].keys = opal_argv_copy(q->keys); + cd->queries[n].nqual = opal_list_get_size(&q->qualifiers); + if (0 < cd->queries[n].nqual) { + PMIX_INFO_CREATE(cd->queries[n].qualifiers, cd->queries[n].nqual); + nq = 0; + OPAL_LIST_FOREACH(ival, &q->qualifiers, opal_value_t) { + (void)strncpy(cd->queries[n].qualifiers[nq].key, ival->key, PMIX_MAX_KEYLEN); + pmix2x_value_load(&cd->queries[n].qualifiers[nq].value, ival); + ++nq; + } + } + ++n; + } + + /* pass it down */ + if (PMIX_SUCCESS != (prc = PMIx_Query_info_nb(cd->queries, cd->nqueries, + infocbfunc, cd))) { + /* do not hang! */ + rc = pmix2x_convert_rc(prc); + goto CLEANUP; + } + + return; + + CLEANUP: + if (NULL != cbfunc) { + cbfunc(rc, NULL, cbdata, NULL, NULL); + } + OBJ_RELEASE(cd); return; } @@ -1234,6 +1353,8 @@ static void opcon(pmix2x_opcaddy_t *p) p->active = false; p->codes = NULL; p->pcodes = NULL; + p->queries = NULL; + p->nqueries = 0; p->event = NULL; p->opcbfunc = NULL; p->mdxcbfunc = NULL; @@ -1260,6 +1381,9 @@ static void opdes(pmix2x_opcaddy_t *p) if (NULL != p->pcodes) { free(p->pcodes); } + if (NULL != p->queries) { + PMIX_QUERY_FREE(p->queries, p->nqueries); + } } OBJ_CLASS_INSTANCE(pmix2x_opcaddy_t, opal_object_t, diff --git a/opal/mca/pmix/pmix2x/pmix2x.h b/opal/mca/pmix/pmix2x/pmix2x.h index 5e13732273..541978e482 100644 --- a/opal/mca/pmix/pmix2x/pmix2x.h +++ b/opal/mca/pmix/pmix2x/pmix2x.h @@ -80,6 +80,8 @@ typedef struct { opal_list_t *codes; pmix_status_t *pcodes; size_t ncodes; + pmix_query_t *queries; + size_t nqueries; opal_pmix2x_event_t *event; opal_pmix_op_cbfunc_t opcbfunc; opal_pmix_modex_cbfunc_t mdxcbfunc; @@ -87,6 +89,7 @@ typedef struct { opal_pmix_lookup_cbfunc_t lkcbfunc; opal_pmix_spawn_cbfunc_t spcbfunc; opal_pmix_evhandler_reg_cbfunc_t evregcbfunc; + opal_pmix_info_cbfunc_t qcbfunc; void *cbdata; } pmix2x_opcaddy_t; OBJ_CLASS_DECLARATION(pmix2x_opcaddy_t); diff --git a/opal/mca/pmix/pmix2x/pmix2x_server_north.c b/opal/mca/pmix/pmix2x/pmix2x_server_north.c index 743953b509..2d29e28bf8 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_server_north.c +++ b/opal/mca/pmix/pmix2x/pmix2x_server_north.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Mellanox Technologies, Inc. @@ -1015,6 +1015,7 @@ static void server_log(const pmix_proc_t *proct, /* convert the data */ for (n=0; n < ndata; n++) { oinfo = OBJ_NEW(opal_value_t); + oinfo->key = strdup(data[n].key); /* we "borrow" the info field of the caddy as we and the * server function both agree on what will be there */ opal_list_append(&opalcaddy->info, &oinfo->super); diff --git a/opal/mca/pmix/pmix2x/pmix2x_server_south.c b/opal/mca/pmix/pmix2x/pmix2x_server_south.c index 06c2f3cea1..956844d9d5 100644 --- a/opal/mca/pmix/pmix2x/pmix2x_server_south.c +++ b/opal/mca/pmix/pmix2x/pmix2x_server_south.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. diff --git a/opal/mca/pmix/pmix_types.h b/opal/mca/pmix/pmix_types.h index 40a453d2f1..392c3401e4 100644 --- a/opal/mca/pmix/pmix_types.h +++ b/opal/mca/pmix/pmix_types.h @@ -231,9 +231,10 @@ BEGIN_C_DECLS #define OPAL_PMIX_QUERY_REPORT_MINMAX "pmix.qry.minmax" // report minimum and maximum value /* log attributes */ -#define OPAL_PMIX_LOG_STDERR "pmix.log.stderr" // (bool) log data to stderr -#define OPAL_PMIX_LOG_STDOUT "pmix.log.stdout" // (bool) log data to stdout -#define OPAL_PMIX_LOG_SYSLOG "pmix.log.syslog" // (bool) log data to syslog - defaults to ERROR priority unless +#define OPAL_PMIX_LOG_STDERR "pmix.log.stderr" // (char*) log string to stderr +#define OPAL_PMIX_LOG_STDOUT "pmix.log.stdout" // (char*) log string to stdout +#define OPAL_PMIX_LOG_SYSLOG "pmix.log.syslog" // (char*) log data to syslog - defaults to ERROR priority unless +#define OPAL_PMIX_LOG_MSG "pmix.log.msg" // (pmix_byte_object_t) message blob to be sent somewhere /* debugger attributes */ #define OPAL_PMIX_DEBUG_STOP_ON_EXEC "pmix.dbg.exec" // (bool) job is being spawned under debugger - instruct it to pause on start diff --git a/orte/mca/iof/base/base.h b/orte/mca/iof/base/base.h index dd5f63c2e5..a1a7d5f4d5 100644 --- a/orte/mca/iof/base/base.h +++ b/orte/mca/iof/base/base.h @@ -12,7 +12,7 @@ * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2015-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -200,8 +200,8 @@ ORTE_DECLSPEC int orte_iof_base_flush(void); ORTE_DECLSPEC extern orte_iof_base_t orte_iof_base; /* base functions */ -ORTE_DECLSPEC int orte_iof_base_write_output(orte_process_name_t *name, orte_iof_tag_t stream, - unsigned char *data, int numbytes, +ORTE_DECLSPEC int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream, + const unsigned char *data, int numbytes, orte_iof_write_event_t *channel); ORTE_DECLSPEC void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev); ORTE_DECLSPEC void orte_iof_base_write_handler(int fd, short event, void *cbdata); diff --git a/orte/mca/iof/base/iof_base_output.c b/orte/mca/iof/base/iof_base_output.c index c01573ef14..24d9176f2b 100644 --- a/orte/mca/iof/base/iof_base_output.c +++ b/orte/mca/iof/base/iof_base_output.c @@ -10,6 +10,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -43,8 +44,8 @@ #include "orte/mca/iof/base/base.h" -int orte_iof_base_write_output(orte_process_name_t *name, orte_iof_tag_t stream, - unsigned char *data, int numbytes, +int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t stream, + const unsigned char *data, int numbytes, orte_iof_write_event_t *channel) { char starttag[ORTE_IOF_BASE_TAG_MAX], endtag[ORTE_IOF_BASE_TAG_MAX], *suffix; diff --git a/orte/mca/iof/hnp/iof_hnp.c b/orte/mca/iof/hnp/iof_hnp.c index e6746ee960..0a5bbf7c77 100644 --- a/orte/mca/iof/hnp/iof_hnp.c +++ b/orte/mca/iof/hnp/iof_hnp.c @@ -15,7 +15,7 @@ * reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. - * Copyright (c) 2016 Intel, Inc. All rights reserved. + * Copyright (c) 2016-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -68,6 +68,10 @@ static int hnp_pull(const orte_process_name_t* src_name, static int hnp_close(const orte_process_name_t* peer, orte_iof_tag_t source_tag); +static int hnp_output(const orte_process_name_t* peer, + orte_iof_tag_t source_tag, + const char *msg); + static int finalize(void); static int hnp_ft_event(int state); @@ -79,13 +83,13 @@ static int hnp_ft_event(int state); */ orte_iof_base_module_t orte_iof_hnp_module = { - init, - hnp_push, - hnp_pull, - hnp_close, - NULL, - finalize, - hnp_ft_event + .init = init, + .push = hnp_push, + .pull = hnp_pull, + .close = hnp_close, + .output = hnp_output, + .finalize = finalize, + .ft_event = hnp_ft_event }; /* Initialize the module */ @@ -599,3 +603,17 @@ CHECK: } } } + +static int hnp_output(const orte_process_name_t* peer, + orte_iof_tag_t source_tag, + const char *msg) +{ + /* output this to our local output */ + if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) { + orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev); + } else { + orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev); + } + + return ORTE_SUCCESS; +} diff --git a/orte/mca/iof/iof.h b/orte/mca/iof/iof.h index 96f57b091e..56e4775bdf 100644 --- a/orte/mca/iof/iof.h +++ b/orte/mca/iof/iof.h @@ -13,7 +13,7 @@ * Copyright (c) 2007-2008 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -191,6 +191,13 @@ typedef int (*orte_iof_base_pull_fn_t)(const orte_process_name_t* peer, typedef int (*orte_iof_base_close_fn_t)(const orte_process_name_t* peer, orte_iof_tag_t source_tag); +/** + * Output something via the IOF subsystem + */ +typedef int (*orte_iof_base_output_fn_t)(const orte_process_name_t* peer, + orte_iof_tag_t source_tag, + const char *msg); + /* Flag that a job is complete */ typedef void (*orte_iof_base_complete_fn_t)(const orte_job_t *jdata); @@ -210,6 +217,7 @@ struct orte_iof_base_module_2_0_0_t { orte_iof_base_push_fn_t push; orte_iof_base_pull_fn_t pull; orte_iof_base_close_fn_t close; + orte_iof_base_output_fn_t output; orte_iof_base_complete_fn_t complete; orte_iof_base_finalize_fn_t finalize; orte_iof_base_ft_event_fn_t ft_event; diff --git a/orte/mca/iof/orted/iof_orted.c b/orte/mca/iof/orted/iof_orted.c index 1929b4b9eb..6deafa20cc 100644 --- a/orte/mca/iof/orted/iof_orted.c +++ b/orte/mca/iof/orted/iof_orted.c @@ -12,7 +12,7 @@ * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2016 Intel, Inc. All rights reserved. + * Copyright (c) 2016-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -69,6 +69,10 @@ static int orted_pull(const orte_process_name_t* src_name, static int orted_close(const orte_process_name_t* peer, orte_iof_tag_t source_tag); +static int orted_output(const orte_process_name_t* peer, + orte_iof_tag_t source_tag, + const char *msg); + static int finalize(void); static int orted_ft_event(int state); @@ -82,13 +86,13 @@ static int orted_ft_event(int state); */ orte_iof_base_module_t orte_iof_orted_module = { - init, - orted_push, - orted_pull, - orted_close, - NULL, - finalize, - orted_ft_event + .init = init, + .push = orted_push, + .pull = orted_pull, + .close = orted_close, + .output = orted_output, + .finalize = finalize, + .ft_event = orted_ft_event }; static int init(void) @@ -437,3 +441,46 @@ CHECK: } } } + +static int orted_output(const orte_process_name_t* peer, + orte_iof_tag_t source_tag, + const char *msg) +{ + opal_buffer_t *buf; + int rc; + + /* prep the buffer */ + buf = OBJ_NEW(opal_buffer_t); + + /* pack the stream first - we do this so that flow control messages can + * consist solely of the tag + */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &source_tag, 1, ORTE_IOF_TAG))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* pack name of process that gave us this data */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, peer, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* pack the data - for compatibility, we have to pack this as OPAL_BYTE, + * so ensure we include the NULL string terminator */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, msg, strlen(msg)+1, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* start non-blocking RML call to forward received data */ + OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, + "%s iof:orted:output sending %d bytes to HNP", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)strlen(msg)+1)); + + orte_rml.send_buffer_nb(orte_mgmt_conduit, + ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP, + orte_rml_send_callback, NULL); + + return ORTE_SUCCESS; +} diff --git a/orte/mca/iof/orted/iof_orted_read.c b/orte/mca/iof/orted/iof_orted_read.c index e6e52fa9ee..622f33171d 100644 --- a/orte/mca/iof/orted/iof_orted_read.c +++ b/orte/mca/iof/orted/iof_orted_read.c @@ -12,7 +12,7 @@ * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2016 Intel, Inc. All rights reserved. + * Copyright (c) 2016-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -43,18 +43,6 @@ #include "iof_orted.h" -/* - * Callback when non-blocking RML send completes. - */ -static void send_cb(int status, orte_process_name_t *peer, - opal_buffer_t *buf, orte_rml_tag_t tag, - void *cbdata) -{ - /* nothing to do here - just release buffer and return */ - OBJ_RELEASE(buf); -} - - void orte_iof_orted_read_handler(int fd, short event, void *cbdata) { orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata; @@ -146,7 +134,7 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata) orte_rml.send_buffer_nb(orte_mgmt_conduit, ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_IOF_HNP, - send_cb, NULL); + orte_rml_send_callback, NULL); /* re-add the event */ opal_event_add(rev->ev, 0); diff --git a/orte/mca/iof/tool/iof_tool.c b/orte/mca/iof/tool/iof_tool.c index c5484d14a4..49cb0936fc 100644 --- a/orte/mca/iof/tool/iof_tool.c +++ b/orte/mca/iof/tool/iof_tool.c @@ -12,7 +12,7 @@ * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -52,18 +52,22 @@ static int tool_pull(const orte_process_name_t* src_name, static int tool_close(const orte_process_name_t* peer, orte_iof_tag_t source_tag); +static int tool_output(const orte_process_name_t* peer, + orte_iof_tag_t source_tag, + const char *msg); + static int finalize(void); static int tool_ft_event(int state); orte_iof_base_module_t orte_iof_tool_module = { - init, - tool_push, - tool_pull, - tool_close, - NULL, - finalize, - tool_ft_event + .init = init, + .push = tool_push, + .pull = tool_pull, + .close = tool_close, + .output = tool_output, + .finalize = finalize, + .ft_event = tool_ft_event }; @@ -276,6 +280,20 @@ static int finalize(void) return ORTE_SUCCESS; } +static int tool_output(const orte_process_name_t* peer, + orte_iof_tag_t source_tag, + const char *msg) +{ + /* output this to our local output */ + if (ORTE_IOF_STDOUT & source_tag || orte_xml_output) { + orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stdout->wev); + } else { + orte_iof_base_write_output(peer, source_tag, (const unsigned char*)msg, strlen(msg), orte_iof_base.iof_write_stderr->wev); + } + + return ORTE_SUCCESS; +} + /* * FT event */ diff --git a/orte/orted/pmix/pmix_server_gen.c b/orte/orted/pmix/pmix_server_gen.c index ff36dd5b3b..10534c5838 100644 --- a/orte/orted/pmix/pmix_server_gen.c +++ b/orte/orted/pmix/pmix_server_gen.c @@ -38,12 +38,13 @@ #include "opal/mca/pstat/pstat.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/iof/iof.h" #include "orte/mca/rmaps/rmaps_types.h" #include "orte/mca/state/state.h" #include "orte/util/name_fns.h" #include "orte/runtime/orte_globals.h" #include "orte/mca/rml/rml.h" - #include "orte/mca/plm/base/plm_private.h" +#include "orte/mca/plm/base/plm_private.h" #include "pmix_server_internal.h" @@ -529,6 +530,7 @@ static void _query(int sd, short args, void *cbdata) opal_argv_free(ans); ans = NULL; } else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_MEMORY_USAGE)) { + OBJ_CONSTRUCT(&targets, opal_list_t); /* check the qualifiers to find the procs they want to * know about - if qualifiers are NULL, then get it for * the daemons + all active jobs */ @@ -537,6 +539,7 @@ static void _query(int sd, short args, void *cbdata) /* xcast a request for all memory usage */ /* return success - the callback will be done * once we get the results */ + OPAL_LIST_DESTRUCT(&targets); return; } @@ -551,6 +554,8 @@ static void _query(int sd, short args, void *cbdata) } else if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) { /* save this directive on our list of targets */ nm = OBJ_NEW(orte_namelist_t); + memcpy(&nm->name, &kv->data.name, sizeof(opal_process_name_t)); + opal_list_append(&targets, &nm->super); } } @@ -601,6 +606,7 @@ static void _query(int sd, short args, void *cbdata) } } else { + opal_output(0, "NONLOCAL"); /* if they want it for remote procs, see who is hosting them * and ask directly for the info - if rank=wildcard, then * we need to xcast the request and collect the results */ @@ -772,24 +778,37 @@ void pmix_server_log_fn(opal_process_name_t *requestor, "%s logging info", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - /* for now, we only support logging show_help messages */ OPAL_LIST_FOREACH(val, info, opal_value_t) { - /* we ignore the key as irrelevant - we only want to - * pull out the blob */ - if (OPAL_BYTE_OBJECT != val->type) { + if (NULL == val->key) { + ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); continue; } - buf = OBJ_NEW(opal_buffer_t); - opal_dss.load(buf, val->data.bo.bytes, val->data.bo.size); - val->data.bo.bytes = NULL; - if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit, - ORTE_PROC_MY_HNP, buf, - ORTE_RML_TAG_SHOW_HELP, - orte_rml_send_callback, NULL))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buf); + if (0 == strcmp(val->key, OPAL_PMIX_LOG_MSG)) { + /* pull out the blob */ + if (OPAL_BYTE_OBJECT != val->type) { + continue; + } + buf = OBJ_NEW(opal_buffer_t); + opal_dss.load(buf, val->data.bo.bytes, val->data.bo.size); + val->data.bo.bytes = NULL; + if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(orte_mgmt_conduit, + ORTE_PROC_MY_HNP, buf, + ORTE_RML_TAG_SHOW_HELP, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buf); + } + } else if (0 == strcmp(val->key, OPAL_PMIX_LOG_STDERR)) { + if (ORTE_SUCCESS != (rc = orte_iof.output(requestor, ORTE_IOF_STDERR, val->data.string))) { + ORTE_ERROR_LOG(rc); + } + } else if (0 == strcmp(val->key, OPAL_PMIX_LOG_STDOUT)) { + if (ORTE_SUCCESS != (rc = orte_iof.output(requestor, ORTE_IOF_STDOUT, val->data.string))) { + ORTE_ERROR_LOG(rc); + } } } + if (NULL != cbfunc) { cbfunc(OPAL_SUCCESS, cbdata); } diff --git a/orte/util/show_help.c b/orte/util/show_help.c index 290f01cc12..4e4b799bfa 100644 --- a/orte/util/show_help.c +++ b/orte/util/show_help.c @@ -12,7 +12,7 @@ * Copyright (c) 2008-2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2016 Intel, Inc. All rights reserved. + * Copyright (c) 2016-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -709,7 +709,7 @@ int orte_show_help_norender(const char *filename, const char *topic, if (NULL != opal_pmix.log) { OBJ_CONSTRUCT(&info, opal_list_t); kv = OBJ_NEW(opal_value_t), - kv->key = strdup(OPAL_PMIX_LOG_STDERR); + kv->key = strdup(OPAL_PMIX_LOG_MSG); kv->type = OPAL_BYTE_OBJECT; opal_dss.unload(buf, (void**)&kv->data.bo.bytes, &kv->data.bo.size); opal_list_append(&info, &kv->super);