Ensure we retain the peer object until we are done with it, then detect that the socket has closed due to a lost connection and cleanly release the message event
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
родитель
ba47f73887
Коммит
201f8571ca
@ -222,6 +222,16 @@ PMIX_EXPORT PMIX_CLASS_INSTANCE(pmix_listener_t,
|
|||||||
pmix_list_item_t,
|
pmix_list_item_t,
|
||||||
lcon, ldes);
|
lcon, ldes);
|
||||||
|
|
||||||
|
static void qcon(pmix_ptl_queue_t *p)
|
||||||
|
{
|
||||||
|
p->peer = NULL;
|
||||||
|
}
|
||||||
|
static void qdes(pmix_ptl_queue_t *p)
|
||||||
|
{
|
||||||
|
if (NULL != p->peer) {
|
||||||
|
PMIX_RELEASE(p->peer);
|
||||||
|
}
|
||||||
|
}
|
||||||
PMIX_EXPORT PMIX_CLASS_INSTANCE(pmix_ptl_queue_t,
|
PMIX_EXPORT PMIX_CLASS_INSTANCE(pmix_ptl_queue_t,
|
||||||
pmix_object_t,
|
pmix_object_t,
|
||||||
NULL, NULL);
|
qcon, qdes);
|
||||||
|
@ -470,18 +470,20 @@ void pmix_ptl_base_send(int sd, short args, void *cbdata)
|
|||||||
{
|
{
|
||||||
pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
|
pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
|
||||||
pmix_ptl_send_t *snd;
|
pmix_ptl_send_t *snd;
|
||||||
|
|
||||||
|
if (NULL == queue->peer || queue->peer->sd < 0 ||
|
||||||
|
NULL == queue->peer->info || NULL == queue->peer->info->nptr) {
|
||||||
|
/* this peer has lost connection */
|
||||||
|
PMIX_RELEASE(queue);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
pmix_output_verbose(2, pmix_globals.debug_output,
|
pmix_output_verbose(2, pmix_globals.debug_output,
|
||||||
"[%s:%d] send to %s:%d on tag %d",
|
"[%s:%d] send to %s:%d on tag %d",
|
||||||
__FILE__, __LINE__,
|
__FILE__, __LINE__,
|
||||||
(queue->peer)->info->nptr->nspace,
|
(queue->peer)->info->nptr->nspace,
|
||||||
(queue->peer)->info->rank, (queue->tag));
|
(queue->peer)->info->rank, (queue->tag));
|
||||||
|
|
||||||
if (queue->peer->sd < 0) {
|
|
||||||
/* this peer's socket has been closed */
|
|
||||||
PMIX_RELEASE(queue);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
snd = PMIX_NEW(pmix_ptl_send_t);
|
snd = PMIX_NEW(pmix_ptl_send_t);
|
||||||
snd->hdr.pindex = htonl(pmix_globals.pindex);
|
snd->hdr.pindex = htonl(pmix_globals.pindex);
|
||||||
snd->hdr.tag = htonl(queue->tag);
|
snd->hdr.tag = htonl(queue->tag);
|
||||||
|
@ -335,9 +335,11 @@ static pmix_status_t send_recv(struct pmix_peer_t *peer,
|
|||||||
void *cbdata)
|
void *cbdata)
|
||||||
{
|
{
|
||||||
pmix_ptl_sr_t *ms;
|
pmix_ptl_sr_t *ms;
|
||||||
|
|
||||||
pmix_output_verbose(5, pmix_globals.debug_output,
|
pmix_output_verbose(5, pmix_globals.debug_output,
|
||||||
"[%s:%d] post send to server",
|
"[%s:%d] post send to server",
|
||||||
__FILE__, __LINE__);
|
__FILE__, __LINE__);
|
||||||
|
|
||||||
ms = PMIX_NEW(pmix_ptl_sr_t);
|
ms = PMIX_NEW(pmix_ptl_sr_t);
|
||||||
ms->peer = peer;
|
ms->peer = peer;
|
||||||
ms->bfr = bfr;
|
ms->bfr = bfr;
|
||||||
@ -354,11 +356,13 @@ static pmix_status_t send_oneway(struct pmix_peer_t *peer,
|
|||||||
pmix_ptl_tag_t tag)
|
pmix_ptl_tag_t tag)
|
||||||
{
|
{
|
||||||
pmix_ptl_queue_t *q;
|
pmix_ptl_queue_t *q;
|
||||||
|
pmix_peer_t *pr = (pmix_peer_t*)peer;
|
||||||
|
|
||||||
/* we have to transfer this to an event for thread
|
/* we have to transfer this to an event for thread
|
||||||
* safety as we need to post this message on the
|
* safety as we need to post this message on the
|
||||||
* peer's send queue */
|
* peer's send queue */
|
||||||
q = PMIX_NEW(pmix_ptl_queue_t);
|
q = PMIX_NEW(pmix_ptl_queue_t);
|
||||||
|
OBJ_RETAIN(pr);
|
||||||
q->peer = peer;
|
q->peer = peer;
|
||||||
q->buf = bfr;
|
q->buf = bfr;
|
||||||
q->tag = tag;
|
q->tag = tag;
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
* Copyright (c) 2011-2014 Cisco Systems, Inc. All rights reserved.
|
* Copyright (c) 2011-2014 Cisco Systems, Inc. All rights reserved.
|
||||||
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
|
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
|
||||||
* reserved.
|
* reserved.
|
||||||
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
|
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
|
||||||
* $COPYRIGHT$
|
* $COPYRIGHT$
|
||||||
*
|
*
|
||||||
* Additional copyrights may follow
|
* Additional copyrights may follow
|
||||||
@ -189,9 +189,11 @@ static pmix_status_t send_recv(struct pmix_peer_t *peer,
|
|||||||
void *cbdata)
|
void *cbdata)
|
||||||
{
|
{
|
||||||
pmix_ptl_sr_t *ms;
|
pmix_ptl_sr_t *ms;
|
||||||
|
|
||||||
pmix_output_verbose(5, pmix_globals.debug_output,
|
pmix_output_verbose(5, pmix_globals.debug_output,
|
||||||
"[%s:%d] post send to server",
|
"[%s:%d] post send to server",
|
||||||
__FILE__, __LINE__);
|
__FILE__, __LINE__);
|
||||||
|
|
||||||
ms = PMIX_NEW(pmix_ptl_sr_t);
|
ms = PMIX_NEW(pmix_ptl_sr_t);
|
||||||
ms->peer = peer;
|
ms->peer = peer;
|
||||||
ms->bfr = bfr;
|
ms->bfr = bfr;
|
||||||
@ -208,11 +210,13 @@ static pmix_status_t send_oneway(struct pmix_peer_t *peer,
|
|||||||
pmix_ptl_tag_t tag)
|
pmix_ptl_tag_t tag)
|
||||||
{
|
{
|
||||||
pmix_ptl_queue_t *q;
|
pmix_ptl_queue_t *q;
|
||||||
|
pmix_peer_t *pr = (pmix_peer_t*)peer;
|
||||||
|
|
||||||
/* we have to transfer this to an event for thread
|
/* we have to transfer this to an event for thread
|
||||||
* safety as we need to post this message on the
|
* safety as we need to post this message on the
|
||||||
* peer's send queue */
|
* peer's send queue */
|
||||||
q = PMIX_NEW(pmix_ptl_queue_t);
|
q = PMIX_NEW(pmix_ptl_queue_t);
|
||||||
|
OBJ_RETAIN(pr);
|
||||||
q->peer = peer;
|
q->peer = peer;
|
||||||
q->buf = bfr;
|
q->buf = bfr;
|
||||||
q->tag = tag;
|
q->tag = tag;
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user