1
1

Implement the backend support for process-generated event notification

Этот коммит содержится в:
Ralph Castain 2016-10-08 09:24:28 -07:00
родитель 315a622723
Коммит 5b1484a836
6 изменённых файлов: 135 добавлений и 4 удалений

0
contrib/update-my-copyright.pl Обычный файл → Исполняемый файл
Просмотреть файл

Просмотреть файл

@ -765,7 +765,51 @@ static pmix_status_t server_notify_event(pmix_status_t code,
pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata)
{
return PMIX_ERR_NOT_SUPPORTED;
pmix3x_opalcaddy_t *opalcaddy;
opal_process_name_t src;
size_t n;
opal_value_t *oinfo;
int rc, status;
if (NULL == host_module || NULL == host_module->notify_event) {
return PMIX_ERR_NOT_SUPPORTED;
}
/* setup the caddy */
opalcaddy = OBJ_NEW(pmix3x_opalcaddy_t);
opalcaddy->opcbfunc = cbfunc;
opalcaddy->cbdata = cbdata;
/* convert the code */
status = pmix3x_convert_rc(code);
/* convert the source */
if (OPAL_SUCCESS != (rc = opal_convert_string_to_jobid(&src.jobid, source->nspace))) {
opal_output(0, "FILE: %s LINE %d", __FILE__, __LINE__);
OBJ_RELEASE(opalcaddy);
return pmix3x_convert_opalrc(rc);
}
src.vpid = pmix3x_convert_rank(source->rank);
/* ignore the range for now */
/* convert the info */
for (n=0; n < ninfo; n++) {
oinfo = OBJ_NEW(opal_value_t);
opal_list_append(&opalcaddy->info, &oinfo->super);
oinfo->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix3x_value_unload(oinfo, &info[n].value))) {
OBJ_RELEASE(opalcaddy);
return pmix3x_convert_opalrc(rc);
}
}
/* send it upstairs */
if (OPAL_SUCCESS != (rc = host_module->notify_event(status, &src, &opalcaddy->info,
opal_opcbfunc, opalcaddy))) {
OBJ_RELEASE(opalcaddy);
}
return pmix3x_convert_opalrc(rc);
}
static void _info_rel(void *cbdata)

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -180,7 +180,8 @@ typedef int (*opal_pmix_server_disconnect_fn_t)(opal_list_t *procs, opal_list_t
* the PMIx server itself, or by one of its local clients. The RTE
* is requested to pass the notification to each PMIx server that
* hosts one or more of the specified processes */
typedef int (*opal_pmix_server_notify_fn_t)(int code, opal_list_t *procs, opal_list_t *info,
typedef int (*opal_pmix_server_notify_fn_t)(int code, opal_process_name_t *source,
opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
/* Query the RTE for information - the list is composed of opal_pmix_query_t items */

Просмотреть файл

@ -99,6 +99,7 @@ static opal_pmix_server_module_t pmix_server = {
.disconnect = pmix_server_disconnect_fn,
.register_events = pmix_server_register_events_fn,
.deregister_events = pmix_server_deregister_events_fn,
.notify_event = pmix_server_notify_event,
.query = pmix_server_query_fn,
.tool_connected = pmix_tool_connected_fn,
.log = pmix_server_log_fn

Просмотреть файл

@ -342,6 +342,88 @@ void pmix_server_notify(int status, orte_process_name_t* sender,
}
}
int pmix_server_notify_event(int code, opal_process_name_t *source,
opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
{
opal_buffer_t *buf;
int rc, ninfo;
opal_value_t *val;
orte_grpcomm_signature_t *sig;
/* a local process has generated an event - we need to xcast it
* to all the daemons so it can be passed down to their local
* procs */
buf = OBJ_NEW(opal_buffer_t);
if (NULL == buf) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* pack the status code */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &code, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
/* pack the source */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, source, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
/* pack the number of infos */
if (NULL == info) {
ninfo = 0;
} else {
ninfo = opal_list_get_size(info);
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &ninfo, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
if (0 < ninfo) {
OPAL_LIST_FOREACH(val, info, opal_value_t) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, val, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return rc;
}
}
}
/* goes to all daemons */
sig = OBJ_NEW(orte_grpcomm_signature_t);
if (NULL == sig) {
OBJ_RELEASE(buf);
return ORTE_ERR_OUT_OF_RESOURCE;
}
sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
if (NULL == sig->signature) {
OBJ_RELEASE(buf);
OBJ_RELEASE(sig);
return ORTE_ERR_OUT_OF_RESOURCE;
}
sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
sig->signature[0].vpid = ORTE_VPID_WILDCARD;
sig->sz = 1;
if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_NOTIFICATION, buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
OBJ_RELEASE(sig);
return rc;
}
OBJ_RELEASE(buf);
/* maintain accounting */
OBJ_RELEASE(sig);
/* execute the callback */
if (NULL != cbfunc) {
cbfunc(ORTE_SUCCESS, cbdata);
}
return ORTE_SUCCESS;
}
static void qrel(void *cbdata)
{
opal_list_t *l = (opal_list_t*)cbdata;

Просмотреть файл

@ -12,7 +12,7 @@
* Copyright (c) 2006-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
@ -189,6 +189,9 @@ extern int pmix_server_register_events_fn(opal_list_t *info,
extern int pmix_server_deregister_events_fn(opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc,
void *cbdata);
extern int pmix_server_notify_event(int code, opal_process_name_t *source,
opal_list_t *info,
opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
extern int pmix_server_query_fn(opal_process_name_t *requestor,
opal_list_t *queries,
opal_pmix_info_cbfunc_t cbfunc, void *cbdata);