1
1

Continue cleanup of notifications. Resolve a race condition that can result in attempt to send a message on a closed socket

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-01-03 20:46:50 -08:00
родитель 5737a45b35
Коммит f355fb926d
8 изменённых файлов: 177 добавлений и 23 удалений

Просмотреть файл

@ -16,6 +16,7 @@
#include "orte/runtime/orte_globals.h"
#include "orte/mca/errmgr/errmgr.h"
static int rank, size;
static volatile int active;
static volatile bool wait_for_release = true;
#define MEMPROBE_RELEASE 12345
@ -26,9 +27,10 @@ static void _release_fn(int status,
opal_pmix_notification_complete_fn_t cbfunc,
void *cbdata)
{
fprintf(stderr, "Rank %d: Release recvd\n", rank);
/* must let the notifier know we are done */
if (NULL != cbfunc) {
cbfunc(0, NULL, NULL, NULL, cbdata);
cbfunc(OPAL_ERR_HANDLERS_COMPLETE, NULL, NULL, NULL, cbdata);
}
/* flag that the debugger is complete so we can exit */
wait_for_release = false;
@ -47,20 +49,39 @@ static void _register_fn(int status,
*active = status;
}
static void qcbfunc(int status,
opal_list_t *info,
void *cbdata,
opal_pmix_release_cbfunc_t release_fn,
void *release_cbdata)
{
opal_list_t *results = (opal_list_t*)cbdata;
opal_value_t *kv;
fprintf(stderr, "Rank %d: Query returned status %d\n", rank, status);
if (NULL != info) {
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(info))) {
opal_list_append(results, &kv->super);
}
}
if (NULL != release_fn) {
release_fn(release_cbdata);
}
wait_for_release = false;
}
int main(int argc, char* argv[])
{
int rank, size;
opal_list_t *codes;
opal_value_t *kv;
opal_pmix_query_t *q;
opal_list_t query, response;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (0 == rank) {
fprintf(stderr, "Sampling memory usage after MPI_Init\n");
}
/* everyone registers their event handler */
codes = OBJ_NEW(opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup("errorcode");
@ -74,9 +95,59 @@ int main(int argc, char* argv[])
usleep(10);
}
/* now wait for notification */
while (wait_for_release) {
usleep(10);
/* rank 0 asks for memory to be sampled, while everyone else waits */
if (0 == rank) {
fprintf(stderr, "Sampling memory usage after MPI_Init\n");
OBJ_CONSTRUCT(&query, opal_list_t);
OBJ_CONSTRUCT(&response, opal_list_t);
q = OBJ_NEW(opal_pmix_query_t);
opal_list_append(&query, &q->super);
opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE);
/* qualify that we just want avg, min/max values reported */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
/* issue the request */
wait_for_release = true;
opal_pmix.query(&query, qcbfunc, (void*)&response);
while (wait_for_release) {
usleep(10);
}
/* output the results */
OPAL_LIST_FOREACH(kv, &response, opal_value_t) {
fprintf(stderr, "\tResults: %s\n", kv->key);
}
OPAL_LIST_DESTRUCT(&response);
/* send the notification to release the other procs */
wait_for_release = true;
OBJ_CONSTRUCT(&response, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&response, &kv->super);
if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL,
OPAL_PMIX_RANGE_GLOBAL, &response,
NULL, NULL)) {
fprintf(stderr, "Notify event failed\n");
exit(1);
}
while (wait_for_release) {
usleep(10);
}
OPAL_LIST_DESTRUCT(&response);
} else {
/* now wait for notification */
while (wait_for_release) {
usleep(10);
}
}
wait_for_release = true;
@ -86,13 +157,60 @@ int main(int argc, char* argv[])
if (0 == rank) {
fprintf(stderr, "\n\nSampling memory usage after MPI_Barrier\n");
OBJ_CONSTRUCT(&query, opal_list_t);
OBJ_CONSTRUCT(&response, opal_list_t);
q = OBJ_NEW(opal_pmix_query_t);
opal_list_append(&query, &q->super);
opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE);
/* qualify that we just want avg, min/max values reported */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&q->qualifiers, &kv->super);
/* issue the request */
wait_for_release = true;
opal_pmix.query(&query, qcbfunc, (void*)&response);
while (wait_for_release) {
usleep(10);
}
/* output the results */
OPAL_LIST_FOREACH(kv, &response, opal_value_t) {
fprintf(stderr, "\tResults: %s\n", kv->key);
}
OPAL_LIST_DESTRUCT(&response);
/* send the notification to release the other procs */
wait_for_release = true;
OBJ_CONSTRUCT(&response, opal_list_t);
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT);
kv->type = OPAL_BOOL;
kv->data.flag = true;
opal_list_append(&response, &kv->super);
if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL,
OPAL_PMIX_RANGE_GLOBAL, &response,
NULL, NULL)) {
fprintf(stderr, "Notify event failed\n");
exit(1);
}
while (wait_for_release) {
usleep(10);
}
OPAL_LIST_DESTRUCT(&response);
} else {
/* wait again while memory is sampled */
while (wait_for_release) {
usleep(10);
}
}
/* wait again while memory is sampled */
while (wait_for_release) {
usleep(10);
}
fprintf(stderr, "%d: FINALIZING\n", rank);
fflush(stderr);
MPI_Finalize();
return 0;
}

Просмотреть файл

@ -11,7 +11,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -39,6 +39,7 @@
#include "src/mca/base/pmix_mca_base_var.h"
#include "src/mca/base/pmix_mca_base_framework.h"
#include "src/class/pmix_list.h"
#include "src/client/pmix_client_ops.h"
#include "src/mca/ptl/base/base.h"
/*
@ -76,6 +77,11 @@ static pmix_status_t pmix_ptl_close(void)
/* ensure the listen thread has been shut down */
pmix_ptl.stop_listening();
if (0 <= pmix_client_globals.myserver.sd) {
CLOSE_THE_SOCKET(pmix_client_globals.myserver.sd);
pmix_client_globals.myserver.sd = -1;
}
/* the components will cleanup when closed */
PMIX_DESTRUCT(&pmix_ptl_globals.actives);
PMIX_LIST_DESTRUCT(&pmix_ptl_globals.posted_recvs);
@ -92,6 +98,7 @@ static pmix_status_t pmix_ptl_open(pmix_mca_base_open_flag_t flags)
PMIX_CONSTRUCT(&pmix_ptl_globals.posted_recvs, pmix_list_t);
pmix_ptl_globals.listen_thread_active = false;
PMIX_CONSTRUCT(&pmix_ptl_globals.listeners, pmix_list_t);
pmix_client_globals.myserver.sd = -1;
/* Open up all available components */
return pmix_mca_base_framework_components_open(&pmix_ptl_base_framework, flags);

Просмотреть файл

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Artem Y. Polyakov <artpol84@gmail.com>.
* All rights reserved.
* Copyright (c) 2015-2016 Research Organization for Information Science
@ -478,10 +478,17 @@ void pmix_ptl_base_send(int sd, short args, void *cbdata)
pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
pmix_ptl_send_t *snd;
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] queue callback called: reply to %s:%d on tag %d",
"[%s:%d] send to %s:%d on tag %d",
__FILE__, __LINE__,
(queue->peer)->info->nptr->nspace,
(queue->peer)->info->rank, (queue->tag));
if (queue->peer->sd < 0) {
/* this peer's socket has been closed */
PMIX_RELEASE(queue);
return;
}
snd = PMIX_NEW(pmix_ptl_send_t);
snd->hdr.pindex = htonl(pmix_globals.pindex);
snd->hdr.tag = htonl(queue->tag);
@ -513,6 +520,12 @@ void pmix_ptl_base_send_recv(int fd, short args, void *cbdata)
pmix_ptl_send_t *snd;
uint32_t tag;
if (ms->peer->sd < 0) {
/* this peer's socket has been closed */
PMIX_RELEASE(ms);
return;
}
/* set the tag */
tag = current_tag++;

Просмотреть файл

@ -12,7 +12,7 @@
* All rights reserved.
* Copyright (c) 2007-2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -236,7 +236,7 @@ PMIX_CLASS_DECLARATION(pmix_listener_t);
pmix_list_append(&(p)->send_queue, &snd->super); \
} \
/* ensure the send event is active */ \
if (!(p)->send_ev_active) { \
if (!(p)->send_ev_active && 0 <= (p)->sd) { \
event_add(&(p)->send_event, 0); \
(p)->send_ev_active = true; \
} \

Просмотреть файл

@ -13,7 +13,7 @@
* Copyright (c) 2011-2014 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -270,6 +270,7 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
if (PMIX_SUCCESS != (rc = send_connect_ack(sd))) {
PMIX_ERROR_LOG(rc);
CLOSE_THE_SOCKET(sd);
pmix_client_globals.myserver.sd = -1;
return rc;
}
@ -277,6 +278,7 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer,
if (PMIX_SUCCESS != (rc = recv_connect_ack(sd))) {
PMIX_ERROR_LOG(rc);
CLOSE_THE_SOCKET(sd);
pmix_client_globals.myserver.sd = -1;
return rc;
}

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014-2015 Artem Y. Polyakov <artpol84@gmail.com>.
@ -802,7 +802,7 @@ static void _deregister_client(int sd, short args, void *cbdata)
/* nothing to do */
goto cleanup;
}
/* find an remove this client */
/* find and remove this client */
PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) {
if (info->rank == cd->proc.rank) {
pmix_list_remove_item(&nptr->server->ranks, &info->super);
@ -1442,6 +1442,7 @@ static void op_cbfunc(pmix_status_t status, void *cbdata)
PMIX_RELEASE(cd);
return;
}
/* the function that created the server_caddy did a
* retain on the peer, so we don't have to worry about
* it still being present - send a copy to the originator */
@ -2078,6 +2079,10 @@ static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag,
if (PMIX_SUCCESS != (rc = pmix_host_server.client_finalized(&proc, peer->info->server_object,
op_cbfunc, cd))) {
PMIX_RELEASE(cd);
} else {
/* don't reply to them ourselves - we will do so when the host
* server calls us back */
return rc;
}
}
/* turn off the recv event - we shouldn't hear anything

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014-2015 Artem Y. Polyakov <artpol84@gmail.com>.
@ -1180,6 +1180,13 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer,
if (0 != strncmp(peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) {
continue;
}
/* if the source of the event is the same peer just registered, then ignore it
* as the event notification system will have already locally
* processed it */
if (0 == strncmp(peer->info->nptr->nspace, cd->source.nspace, PMIX_MAX_NSLEN) &&
peer->info->rank == cd->source.rank) {
continue;
}
if (PMIX_RANK_WILDCARD == cd->targets[n].rank ||
peer->info->rank == cd->targets[n].rank) {
matched = true;

Просмотреть файл

@ -225,6 +225,8 @@ BEGIN_C_DECLS
#define OPAL_PMIX_QUERY_DEBUG_SUPPORT "pmix.qry.debug" // return a comma-delimited list of supported debug attributes
#define OPAL_PMIX_QUERY_MEMORY_USAGE "pmix.qry.mem" // return info on memory usage for the procs indicated in the qualifiers
#define OPAL_PMIX_QUERY_LOCAL_ONLY "pmix.qry.local" // constrain the query to local information only
#define OPAL_PMIX_QUERY_REPORT_AVG "pmix.qry.avg" // report average values
#define OPAL_PMIX_QUERY_REPORT_MINMAX "pmix.qry.minmax" // report minimum and maximum value
/* log attributes */
#define OPAL_PMIX_LOG_STDERR "pmix.log.stderr" // (bool) log data to stderr