From f355fb926defcbdf68f9bfec18afca0d1a0d4fed Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Tue, 3 Jan 2017 20:46:50 -0800 Subject: [PATCH] Continue cleanup of notifications. Resolve a race condition that can result in attempt to send a message on a closed socket Signed-off-by: Ralph Castain --- contrib/scaling/mpi_memprobe.c | 146 ++++++++++++++++-- .../pmix/src/mca/ptl/base/ptl_base_frame.c | 9 +- .../pmix/src/mca/ptl/base/ptl_base_sendrecv.c | 17 +- .../pmix/pmix2x/pmix/src/mca/ptl/ptl_types.h | 4 +- .../pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c | 4 +- .../pmix/pmix2x/pmix/src/server/pmix_server.c | 9 +- .../pmix2x/pmix/src/server/pmix_server_ops.c | 9 +- opal/mca/pmix/pmix_types.h | 2 + 8 files changed, 177 insertions(+), 23 deletions(-) diff --git a/contrib/scaling/mpi_memprobe.c b/contrib/scaling/mpi_memprobe.c index b3458ece9f..5364e71570 100644 --- a/contrib/scaling/mpi_memprobe.c +++ b/contrib/scaling/mpi_memprobe.c @@ -16,6 +16,7 @@ #include "orte/runtime/orte_globals.h" #include "orte/mca/errmgr/errmgr.h" +static int rank, size; static volatile int active; static volatile bool wait_for_release = true; #define MEMPROBE_RELEASE 12345 @@ -26,9 +27,10 @@ static void _release_fn(int status, opal_pmix_notification_complete_fn_t cbfunc, void *cbdata) { + fprintf(stderr, "Rank %d: Release recvd\n", rank); /* must let the notifier know we are done */ if (NULL != cbfunc) { - cbfunc(0, NULL, NULL, NULL, cbdata); + cbfunc(OPAL_ERR_HANDLERS_COMPLETE, NULL, NULL, NULL, cbdata); } /* flag that the debugger is complete so we can exit */ wait_for_release = false; @@ -47,20 +49,39 @@ static void _register_fn(int status, *active = status; } +static void qcbfunc(int status, + opal_list_t *info, + void *cbdata, + opal_pmix_release_cbfunc_t release_fn, + void *release_cbdata) +{ + opal_list_t *results = (opal_list_t*)cbdata; + opal_value_t *kv; + + fprintf(stderr, "Rank %d: Query returned status %d\n", rank, status); + if (NULL != info) { + while (NULL != (kv = (opal_value_t*)opal_list_remove_first(info))) { + opal_list_append(results, &kv->super); + } + } + if (NULL != release_fn) { + release_fn(release_cbdata); + } + wait_for_release = false; +} + int main(int argc, char* argv[]) { - int rank, size; opal_list_t *codes; opal_value_t *kv; + opal_pmix_query_t *q; + opal_list_t query, response; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); - if (0 == rank) { - fprintf(stderr, "Sampling memory usage after MPI_Init\n"); - } - + /* everyone registers their event handler */ codes = OBJ_NEW(opal_list_t); kv = OBJ_NEW(opal_value_t); kv->key = strdup("errorcode"); @@ -74,9 +95,59 @@ int main(int argc, char* argv[]) usleep(10); } - /* now wait for notification */ - while (wait_for_release) { - usleep(10); + /* rank 0 asks for memory to be sampled, while everyone else waits */ + if (0 == rank) { + fprintf(stderr, "Sampling memory usage after MPI_Init\n"); + OBJ_CONSTRUCT(&query, opal_list_t); + OBJ_CONSTRUCT(&response, opal_list_t); + q = OBJ_NEW(opal_pmix_query_t); + opal_list_append(&query, &q->super); + opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE); + /* qualify that we just want avg, min/max values reported */ + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&q->qualifiers, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&q->qualifiers, &kv->super); + /* issue the request */ + wait_for_release = true; + opal_pmix.query(&query, qcbfunc, (void*)&response); + while (wait_for_release) { + usleep(10); + } + /* output the results */ + OPAL_LIST_FOREACH(kv, &response, opal_value_t) { + fprintf(stderr, "\tResults: %s\n", kv->key); + } + OPAL_LIST_DESTRUCT(&response); + /* send the notification to release the other procs */ + wait_for_release = true; + OBJ_CONSTRUCT(&response, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&response, &kv->super); + if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL, + OPAL_PMIX_RANGE_GLOBAL, &response, + NULL, NULL)) { + fprintf(stderr, "Notify event failed\n"); + exit(1); + } + while (wait_for_release) { + usleep(10); + } + OPAL_LIST_DESTRUCT(&response); + } else { + /* now wait for notification */ + while (wait_for_release) { + usleep(10); + } } wait_for_release = true; @@ -86,13 +157,60 @@ int main(int argc, char* argv[]) if (0 == rank) { fprintf(stderr, "\n\nSampling memory usage after MPI_Barrier\n"); + OBJ_CONSTRUCT(&query, opal_list_t); + OBJ_CONSTRUCT(&response, opal_list_t); + q = OBJ_NEW(opal_pmix_query_t); + opal_list_append(&query, &q->super); + opal_argv_append_nosize(&q->keys, OPAL_PMIX_QUERY_MEMORY_USAGE); + /* qualify that we just want avg, min/max values reported */ + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_QUERY_REPORT_AVG); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&q->qualifiers, &kv->super); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_QUERY_REPORT_MINMAX); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&q->qualifiers, &kv->super); + /* issue the request */ + wait_for_release = true; + opal_pmix.query(&query, qcbfunc, (void*)&response); + while (wait_for_release) { + usleep(10); + } + /* output the results */ + OPAL_LIST_FOREACH(kv, &response, opal_value_t) { + fprintf(stderr, "\tResults: %s\n", kv->key); + } + OPAL_LIST_DESTRUCT(&response); + /* send the notification to release the other procs */ + wait_for_release = true; + OBJ_CONSTRUCT(&response, opal_list_t); + kv = OBJ_NEW(opal_value_t); + kv->key = strdup(OPAL_PMIX_EVENT_NON_DEFAULT); + kv->type = OPAL_BOOL; + kv->data.flag = true; + opal_list_append(&response, &kv->super); + if (OPAL_SUCCESS != opal_pmix.notify_event(MEMPROBE_RELEASE, NULL, + OPAL_PMIX_RANGE_GLOBAL, &response, + NULL, NULL)) { + fprintf(stderr, "Notify event failed\n"); + exit(1); + } + while (wait_for_release) { + usleep(10); + } + OPAL_LIST_DESTRUCT(&response); + } else { + /* wait again while memory is sampled */ + while (wait_for_release) { + usleep(10); + } } - /* wait again while memory is sampled */ - while (wait_for_release) { - usleep(10); - } - + fprintf(stderr, "%d: FINALIZING\n", rank); + fflush(stderr); MPI_Finalize(); return 0; } diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_frame.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_frame.c index 9942bd5b34..cc9557c777 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_frame.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_frame.c @@ -11,7 +11,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved. - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -39,6 +39,7 @@ #include "src/mca/base/pmix_mca_base_var.h" #include "src/mca/base/pmix_mca_base_framework.h" #include "src/class/pmix_list.h" +#include "src/client/pmix_client_ops.h" #include "src/mca/ptl/base/base.h" /* @@ -76,6 +77,11 @@ static pmix_status_t pmix_ptl_close(void) /* ensure the listen thread has been shut down */ pmix_ptl.stop_listening(); + if (0 <= pmix_client_globals.myserver.sd) { + CLOSE_THE_SOCKET(pmix_client_globals.myserver.sd); + pmix_client_globals.myserver.sd = -1; + } + /* the components will cleanup when closed */ PMIX_DESTRUCT(&pmix_ptl_globals.actives); PMIX_LIST_DESTRUCT(&pmix_ptl_globals.posted_recvs); @@ -92,6 +98,7 @@ static pmix_status_t pmix_ptl_open(pmix_mca_base_open_flag_t flags) PMIX_CONSTRUCT(&pmix_ptl_globals.posted_recvs, pmix_list_t); pmix_ptl_globals.listen_thread_active = false; PMIX_CONSTRUCT(&pmix_ptl_globals.listeners, pmix_list_t); + pmix_client_globals.myserver.sd = -1; /* Open up all available components */ return pmix_mca_base_framework_components_open(&pmix_ptl_base_framework, flags); diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c index b3c5ce45ae..572160057f 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014 Artem Y. Polyakov . * All rights reserved. * Copyright (c) 2015-2016 Research Organization for Information Science @@ -478,10 +478,17 @@ void pmix_ptl_base_send(int sd, short args, void *cbdata) pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata; pmix_ptl_send_t *snd; pmix_output_verbose(2, pmix_globals.debug_output, - "[%s:%d] queue callback called: reply to %s:%d on tag %d", + "[%s:%d] send to %s:%d on tag %d", __FILE__, __LINE__, (queue->peer)->info->nptr->nspace, (queue->peer)->info->rank, (queue->tag)); + + if (queue->peer->sd < 0) { + /* this peer's socket has been closed */ + PMIX_RELEASE(queue); + return; + } + snd = PMIX_NEW(pmix_ptl_send_t); snd->hdr.pindex = htonl(pmix_globals.pindex); snd->hdr.tag = htonl(queue->tag); @@ -513,6 +520,12 @@ void pmix_ptl_base_send_recv(int fd, short args, void *cbdata) pmix_ptl_send_t *snd; uint32_t tag; + if (ms->peer->sd < 0) { + /* this peer's socket has been closed */ + PMIX_RELEASE(ms); + return; + } + /* set the tag */ tag = current_tag++; diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/ptl_types.h b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/ptl_types.h index b397030c31..b3ca5efd91 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/ptl_types.h +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/ptl_types.h @@ -12,7 +12,7 @@ * All rights reserved. * Copyright (c) 2007-2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved. - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -236,7 +236,7 @@ PMIX_CLASS_DECLARATION(pmix_listener_t); pmix_list_append(&(p)->send_queue, &snd->super); \ } \ /* ensure the send event is active */ \ - if (!(p)->send_ev_active) { \ + if (!(p)->send_ev_active && 0 <= (p)->sd) { \ event_add(&(p)->send_event, 0); \ (p)->send_ev_active = true; \ } \ diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c index 3deb3686a2..5ae066d2eb 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c @@ -13,7 +13,7 @@ * Copyright (c) 2011-2014 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -270,6 +270,7 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer, if (PMIX_SUCCESS != (rc = send_connect_ack(sd))) { PMIX_ERROR_LOG(rc); CLOSE_THE_SOCKET(sd); + pmix_client_globals.myserver.sd = -1; return rc; } @@ -277,6 +278,7 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer, if (PMIX_SUCCESS != (rc = recv_connect_ack(sd))) { PMIX_ERROR_LOG(rc); CLOSE_THE_SOCKET(sd); + pmix_client_globals.myserver.sd = -1; return rc; } diff --git a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c index 1db60f5899..2795ffc51e 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c +++ b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Artem Y. Polyakov . @@ -802,7 +802,7 @@ static void _deregister_client(int sd, short args, void *cbdata) /* nothing to do */ goto cleanup; } - /* find an remove this client */ + /* find and remove this client */ PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) { if (info->rank == cd->proc.rank) { pmix_list_remove_item(&nptr->server->ranks, &info->super); @@ -1442,6 +1442,7 @@ static void op_cbfunc(pmix_status_t status, void *cbdata) PMIX_RELEASE(cd); return; } + /* the function that created the server_caddy did a * retain on the peer, so we don't have to worry about * it still being present - send a copy to the originator */ @@ -2078,6 +2079,10 @@ static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag, if (PMIX_SUCCESS != (rc = pmix_host_server.client_finalized(&proc, peer->info->server_object, op_cbfunc, cd))) { PMIX_RELEASE(cd); + } else { + /* don't reply to them ourselves - we will do so when the host + * server calls us back */ + return rc; } } /* turn off the recv event - we shouldn't hear anything diff --git a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c index d6ca188fb6..57e674dd5e 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Artem Y. Polyakov . @@ -1180,6 +1180,13 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer, if (0 != strncmp(peer->info->nptr->nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) { continue; } + /* if the source of the event is the same peer just registered, then ignore it + * as the event notification system will have already locally + * processed it */ + if (0 == strncmp(peer->info->nptr->nspace, cd->source.nspace, PMIX_MAX_NSLEN) && + peer->info->rank == cd->source.rank) { + continue; + } if (PMIX_RANK_WILDCARD == cd->targets[n].rank || peer->info->rank == cd->targets[n].rank) { matched = true; diff --git a/opal/mca/pmix/pmix_types.h b/opal/mca/pmix/pmix_types.h index f6809cb8e3..8ca48dc047 100644 --- a/opal/mca/pmix/pmix_types.h +++ b/opal/mca/pmix/pmix_types.h @@ -225,6 +225,8 @@ BEGIN_C_DECLS #define OPAL_PMIX_QUERY_DEBUG_SUPPORT "pmix.qry.debug" // return a comma-delimited list of supported debug attributes #define OPAL_PMIX_QUERY_MEMORY_USAGE "pmix.qry.mem" // return info on memory usage for the procs indicated in the qualifiers #define OPAL_PMIX_QUERY_LOCAL_ONLY "pmix.qry.local" // constrain the query to local information only +#define OPAL_PMIX_QUERY_REPORT_AVG "pmix.qry.avg" // report average values +#define OPAL_PMIX_QUERY_REPORT_MINMAX "pmix.qry.minmax" // report minimum and maximum value /* log attributes */ #define OPAL_PMIX_LOG_STDERR "pmix.log.stderr" // (bool) log data to stderr