1
1

Update to pmix v2.0.0rc1, including thread safety fixes

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-06-06 15:16:34 -07:00
родитель 21fba8b7f3
Коммит c3e6dc2022
23 изменённых файлов: 178 добавлений и 36 удалений

Просмотреть файл

@ -30,7 +30,7 @@ greek=
# command, or with the date (if "git describe" fails) in the form of
# "date<date>".
repo_rev=git707f8cf
repo_rev=git071ebc3
# If tarball_version is not empty, it is used as the version string in
# the tarball filename, regardless of all other versions listed in

Просмотреть файл

@ -63,6 +63,8 @@ static inline void pmix_atomic_wmb(void)
}
#define PMIXMB() pmix_atomic_mb()
#define PMIXRMB() pmix_atomic_rmb()
#define PMIXWMB() pmix_atomic_wmb()
/**********************************************************************
*

Просмотреть файл

@ -10,7 +10,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2010 IBM Corporation. All rights reserved.
* Copyright (c) 2010-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2015-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved.
@ -30,10 +30,8 @@
#define PMIXMB() __asm__ __volatile__ ("sync" : : : "memory")
#define PMIXRMB() __asm__ __volatile__ ("lwsync" : : : "memory")
#define PMIXWMB() __asm__ __volatile__ ("eieio" : : : "memory")
#define PMIXWMB() __asm__ __volatile__ ("lwsync" : : : "memory")
#define PMIXISYNC() __asm__ __volatile__ ("isync" : : : "memory")
#define PMIXSMP_SYNC "sync \n\t"
#define PMIXSMP_ISYNC "\n\tisync"
/**********************************************************************

Просмотреть файл

@ -88,6 +88,7 @@ static const char pmix_version_string[] = PMIX_VERSION;
static void _notify_complete(pmix_status_t status, void *cbdata)
{
pmix_event_chain_t *chain = (pmix_event_chain_t*)cbdata;
PMIX_ACQUIRE_OBJECT(chain);
PMIX_RELEASE(chain);
}
@ -178,7 +179,7 @@ static void wait_cbfunc(struct pmix_peer_t *pr,
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:client wait_cbfunc received");
PMIX_POST_OBJECT(active);
*active = false;
}
@ -197,6 +198,7 @@ static void job_data(struct pmix_peer_t *pr,
if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &nspace, &cnt, PMIX_STRING))) {
PMIX_ERROR_LOG(rc);
cb->status = PMIX_ERROR;
PMIX_POST_OBJECT(cb);
cb->active = false;
return;
}
@ -208,6 +210,7 @@ static void job_data(struct pmix_peer_t *pr,
pmix_job_data_htable_store(pmix_globals.myid.nspace, buf);
#endif
cb->status = PMIX_SUCCESS;
PMIX_POST_OBJECT(cb);
cb->active = false;
}
@ -235,6 +238,7 @@ static void evhandler_reg_callbk(pmix_status_t status,
void *cbdata)
{
volatile int *active = (volatile int*)cbdata;
PMIX_POST_OBJECT(active);
*active = status;
}
@ -680,6 +684,9 @@ static void _putfn(int sd, short args, void *cbdata)
uint8_t *tmp;
size_t len;
/* need to acquire the cb object from its originating thread */
PMIX_ACQUIRE_OBJECT(cb);
/* no need to push info that starts with "pmix" as that is
* info we would have been provided at startup */
if (0 == strncmp(cb->key, "pmix", 4)) {
@ -757,6 +764,8 @@ static void _putfn(int sd, short args, void *cbdata)
PMIX_RELEASE(kv); // maintain accounting
}
cb->pstatus = rc;
/* post the data so the receiving thread can acquire it */
PMIX_POST_OBJECT(cb);
cb->active = false;
}
@ -802,6 +811,9 @@ static void _commitfn(int sd, short args, void *cbdata)
pmix_buffer_t *msgout;
pmix_cmd_t cmd=PMIX_COMMIT_CMD;
/* need to acquire the cb object from its originating thread */
PMIX_ACQUIRE_OBJECT(cb);
msgout = PMIX_NEW(pmix_buffer_t);
/* pack the cmd */
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(msgout, &cmd, 1, PMIX_CMD))) {
@ -850,6 +862,8 @@ static void _commitfn(int sd, short args, void *cbdata)
done:
cb->pstatus = rc;
/* post the data so the receiving thread can acquire it */
PMIX_POST_OBJECT(cb);
cb->active = false;
}
@ -901,6 +915,9 @@ static void _peersfn(int sd, short args, void *cbdata)
#endif
size_t i;
/* need to acquire the cb object from its originating thread */
PMIX_ACQUIRE_OBJECT(cb);
/* cycle across our known nspaces */
tmp = NULL;
#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)
@ -955,6 +972,8 @@ static void _peersfn(int sd, short args, void *cbdata)
done:
cb->pstatus = rc;
/* post the data so the receiving thread can acquire it */
PMIX_POST_OBJECT(cb);
cb->active = false;
}
@ -1004,6 +1023,9 @@ static void _nodesfn(int sd, short args, void *cbdata)
pmix_nspace_t *nsptr;
pmix_nrec_t *nptr;
/* need to acquire the cb object from its originating thread */
PMIX_ACQUIRE_OBJECT(cb);
/* cycle across our known nspaces */
tmp = NULL;
PMIX_LIST_FOREACH(nsptr, &pmix_globals.nspaces, pmix_nspace_t) {
@ -1023,6 +1045,8 @@ static void _nodesfn(int sd, short args, void *cbdata)
}
cb->pstatus = rc;
/* post the data so the receiving thread can acquire it */
PMIX_POST_OBJECT(cb);
cb->active = false;
}

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014 Artem Y. Polyakov <artpol84@gmail.com>.
@ -51,6 +51,8 @@
#include "src/util/argv.h"
#include "src/util/error.h"
#include "src/util/output.h"
#include "src/threads/threads.h"
#include "src/mca/ptl/ptl.h"
#include "pmix_client_ops.h"
@ -344,5 +346,6 @@ static void op_cbfunc(pmix_status_t status, void *cbdata)
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
cb->status = status;
PMIX_POST_OBJECT(cb);
cb->active = false;
}

Просмотреть файл

@ -53,6 +53,7 @@
#include "src/class/pmix_list.h"
#include "src/buffer_ops/buffer_ops.h"
#include "src/threads/threads.h"
#include "src/util/argv.h"
#include "src/util/compress.h"
#include "src/util/error.h"
@ -186,12 +187,14 @@ static void _value_cbfunc(pmix_status_t status, pmix_value_t *kv, void *cbdata)
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
pmix_status_t rc;
PMIX_ACQUIRE_OBJECT(cb);
cb->status = status;
if (PMIX_SUCCESS == status) {
if (PMIX_SUCCESS != (rc = pmix_bfrop.copy((void**)&cb->value, kv, PMIX_VALUE))) {
PMIX_ERROR_LOG(rc);
}
}
PMIX_POST_OBJECT(cb);
cb->active = false;
}
@ -238,12 +241,12 @@ static pmix_buffer_t* _pack_get(char *nspace, pmix_rank_t rank,
return msg;
}
/* this callback is coming from the usock recv, and thus
/* this callback is coming from the ptl recv, and thus
* is occurring inside of our progress thread - hence, no
* need to thread shift */
static void _getnb_cbfunc(struct pmix_peer_t *pr,
pmix_ptl_hdr_t *hdr,
pmix_buffer_t *buf, void *cbdata)
pmix_buffer_t *buf, void *cbdata)
{
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
pmix_cb_t *cb2;
@ -486,6 +489,9 @@ static void _getnbfn(int fd, short flags, void *cbdata)
char *tmp;
bool my_nspace = false, my_rank = false;
/* cb was passed to us from another thread - acquire it */
PMIX_ACQUIRE_OBJECT(cb);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: getnbfn value for proc %s:%d key %s",
cb->nspace, cb->rank,
@ -739,11 +745,12 @@ request:
rc = PMIX_ERROR;
goto respond;
}
/* we made a lot of changes to cb, so ensure they get
* written out before we return */
PMIX_POST_OBJECT(cb);
return;
respond:
respond:
/* if a callback was provided, execute it */
if (NULL != cb->value_cbfunc) {
if (NULL != val) {

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014 Artem Y. Polyakov <artpol84@gmail.com>.
@ -48,6 +48,7 @@
#include "src/class/pmix_list.h"
#include "src/buffer_ops/buffer_ops.h"
#include "src/threads/threads.h"
#include "src/util/argv.h"
#include "src/util/error.h"
#include "src/util/output.h"
@ -304,7 +305,8 @@ PMIX_EXPORT pmix_status_t PMIx_Lookup_nb(char **keys,
}
PMIX_EXPORT pmix_status_t PMIx_Unpublish(char **keys,
const pmix_info_t info[], size_t ninfo)
const pmix_info_t info[],
size_t ninfo)
{
pmix_status_t rc;
pmix_cb_t *cb;
@ -417,6 +419,8 @@ static void wait_cbfunc(struct pmix_peer_t *pr,
int ret;
int32_t cnt;
PMIX_ACQUIRE_OBJECT(cb);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:client recv callback activated with %d bytes",
(NULL == buf) ? -1 : (int)buf->bytes_used);
@ -437,6 +441,7 @@ static void op_cbfunc(pmix_status_t status, void *cbdata)
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
cb->status = status;
PMIX_POST_OBJECT(cb);
cb->active = false;
}
@ -450,6 +455,8 @@ static void wait_lookup_cbfunc(struct pmix_peer_t *pr,
pmix_pdata_t *pdata;
size_t ndata;
PMIX_ACQUIRE_OBJECT(cb);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:client recv callback activated with %d bytes",
(NULL == buf) ? -1 : (int)buf->bytes_used);
@ -514,6 +521,7 @@ static void lookup_cbfunc(pmix_status_t status, pmix_pdata_t pdata[], size_t nda
pmix_pdata_t *tgt = (pmix_pdata_t*)cb->cbdata;
size_t i, j;
PMIX_ACQUIRE_OBJECT(cb);
cb->status = status;
if (PMIX_SUCCESS == status) {
/* find the matching key in the provided info array - error if not found */
@ -530,6 +538,6 @@ static void lookup_cbfunc(pmix_status_t status, pmix_pdata_t pdata[], size_t nda
}
}
}
PMIX_POST_OBJECT(cb);
cb->active = false;
}

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2014 Artem Y. Polyakov <artpol84@gmail.com>.
@ -48,6 +48,7 @@
#include "src/class/pmix_list.h"
#include "src/buffer_ops/buffer_ops.h"
#include "src/threads/threads.h"
#include "src/util/argv.h"
#include "src/util/error.h"
#include "src/util/output.h"
@ -189,6 +190,8 @@ static void wait_cbfunc(struct pmix_peer_t *pr,
pmix_status_t rc, ret;
int32_t cnt;
PMIX_ACQUIRE_OBJECT(cb);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:client recv callback activated with %d bytes",
(NULL == buf) ? -1 : (int)buf->bytes_used);
@ -233,9 +236,11 @@ static void spawn_cbfunc(pmix_status_t status, char nspace[], void *cbdata)
{
pmix_cb_t *cb = (pmix_cb_t*)cbdata;
PMIX_ACQUIRE_OBJECT(cb);
cb->status = status;
if (NULL != nspace) {
(void)strncpy(cb->nspace, nspace, PMIX_MAX_NSLEN);
}
PMIX_POST_OBJECT(cb);
cb->active = false;
}

Просмотреть файл

@ -254,6 +254,9 @@ static void progress_local_event_hdlr(pmix_status_t status,
pmix_op_cbfunc_t cbfunc, void *thiscbdata,
void *notification_cbdata)
{
/* this may be in the host's thread, so we need to threadshift it
* before accessing our internal data */
pmix_event_chain_t *chain = (pmix_event_chain_t*)notification_cbdata;
size_t n, nsave, cnt;
pmix_info_t *newinfo;
@ -768,6 +771,9 @@ static void _notify_client_event(int sd, short args, void *cbdata)
size_t n;
bool matched, holdcd;
/* need to acquire the object from its originating thread */
PMIX_ACQUIRE_OBJECT(cd);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix_server: _notify_error notifying clients of error %s",
PMIx_Error_string(cd->status));
@ -1056,6 +1062,9 @@ void pmix_event_timeout_cb(int fd, short flags, void *arg)
{
pmix_event_chain_t *ch = (pmix_event_chain_t*)arg;
/* need to acquire the object from its originating thread */
PMIX_ACQUIRE_OBJECT(ch);
ch->timer_active = false;
/* remove it from the list */

Просмотреть файл

@ -344,6 +344,9 @@ static void reg_event_hdlr(int sd, short args, void *cbdata)
pmix_notify_caddy_t *ncd;
pmix_event_chain_t *chain;
/* need to acquire the object from its originating thread */
PMIX_ACQUIRE_OBJECT(cd);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix: register event_hdlr with %d infos", (int)cd->ninfo);
@ -775,6 +778,9 @@ static void dereg_event_hdlr(int sd, short args, void *cbdata)
size_t n;
pmix_active_code_t *active;
/* need to acquire the object from its originating thread */
PMIX_ACQUIRE_OBJECT(cd);
/* if I am not the server, then I need to notify the server
* to remove my registration */
if (PMIX_PROC_SERVER != pmix_globals.proc_type) {

Просмотреть файл

@ -38,7 +38,7 @@
#include "src/class/pmix_list.h"
#include "src/class/pmix_ring_buffer.h"
#include "src/event/pmix_event.h"
#include "src/threads/threads.h"
#include "src/mca/psec/psec.h"
#include "src/mca/ptl/ptl.h"
@ -343,6 +343,7 @@ PMIX_CLASS_DECLARATION(pmix_info_caddy_t);
(r)->active = true; \
pmix_event_assign(&((r)->ev), pmix_globals.evbase, \
-1, EV_WRITE, (c), (r)); \
PMIX_POST_OBJECT((r)); \
pmix_event_active(&((r)->ev), EV_WRITE, 1); \
} while (0)
@ -352,6 +353,7 @@ PMIX_CLASS_DECLARATION(pmix_info_caddy_t);
while ((a)) { \
usleep(10); \
} \
PMIX_ACQUIRE_OBJECT((a)); \
} while (0)

Просмотреть файл

@ -158,6 +158,8 @@ static void add_tracker(int sd, short flags, void *cbdata)
{
file_tracker_t *ft = (file_tracker_t*)cbdata;
PMIX_ACQUIRE_OBJECT(fd);
/* add the tracker to our list */
pmix_list_append(&mca_psensor_file_component.trackers, &ft->super);
@ -221,6 +223,7 @@ static pmix_status_t start(pmix_peer_t *requestor, pmix_status_t error,
/* need to push into our event base to add this to our trackers */
pmix_event_assign(&ft->cdev, pmix_psensor_base.evbase, -1,
EV_WRITE, add_tracker, ft);
PMIX_POST_OBJECT(ft);
pmix_event_active(&ft->cdev, EV_WRITE, 1);
return PMIX_SUCCESS;
@ -232,6 +235,8 @@ static void del_tracker(int sd, short flags, void *cbdata)
file_caddy_t *cd = (file_caddy_t*)cbdata;
file_tracker_t *ft, *ftnext;
PMIX_ACQUIRE_OBJECT(cd);
/* remove the tracker from our list */
PMIX_LIST_FOREACH_SAFE(ft, ftnext, &mca_psensor_file_component.trackers, file_tracker_t) {
if (ft->requestor != cd->requestor) {
@ -258,6 +263,7 @@ static pmix_status_t stop(pmix_peer_t *requestor, char *id)
/* need to push into our event base to add this to our trackers */
pmix_event_assign(&cd->ev, pmix_psensor_base.evbase, -1,
EV_WRITE, del_tracker, cd);
PMIX_POST_OBJECT(cd);
pmix_event_active(&cd->ev, EV_WRITE, 1);
return PMIX_SUCCESS;
@ -277,6 +283,8 @@ static void file_sample(int sd, short args, void *cbdata)
pmix_status_t rc;
pmix_proc_t source;
PMIX_ACQUIRE_OBJECT(ft);
PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
"[%s:%d] sampling file %s",
pmix_globals.myid.nspace, pmix_globals.myid.rank,

Просмотреть файл

@ -150,6 +150,8 @@ static void add_tracker(int sd, short flags, void *cbdata)
{
pmix_heartbeat_trkr_t *ft = (pmix_heartbeat_trkr_t*)cbdata;
PMIX_ACQUIRE_OBJECT(ft);
/* add the tracker to our list */
pmix_list_append(&mca_psensor_heartbeat_component.trackers, &ft->super);
@ -203,6 +205,7 @@ static pmix_status_t heartbeat_start(pmix_peer_t *requestor, pmix_status_t error
/* need to push into our event base to add this to our trackers */
pmix_event_assign(&ft->cdev, pmix_psensor_base.evbase, -1,
EV_WRITE, add_tracker, ft);
PMIX_POST_OBJECT(ft);
pmix_event_active(&ft->cdev, EV_WRITE, 1);
return PMIX_SUCCESS;
@ -213,6 +216,8 @@ static void del_tracker(int sd, short flags, void *cbdata)
heartbeat_caddy_t *cd = (heartbeat_caddy_t*)cbdata;
pmix_heartbeat_trkr_t *ft, *ftnext;
PMIX_ACQUIRE_OBJECT(cd);
/* remove the tracker from our list */
PMIX_LIST_FOREACH_SAFE(ft, ftnext, &mca_psensor_heartbeat_component.trackers, pmix_heartbeat_trkr_t) {
if (ft->requestor != cd->requestor) {
@ -239,6 +244,7 @@ static pmix_status_t heartbeat_stop(pmix_peer_t *requestor, char *id)
/* need to push into our event base to add this to our trackers */
pmix_event_assign(&cd->ev, pmix_psensor_base.evbase, -1,
EV_WRITE, del_tracker, cd);
PMIX_POST_OBJECT(cd);
pmix_event_active(&cd->ev, EV_WRITE, 1);
return PMIX_SUCCESS;
@ -261,6 +267,8 @@ static void check_heartbeat(int fd, short dummy, void *cbdata)
pmix_status_t rc;
pmix_proc_t source;
PMIX_ACQUIRE_OBJECT(ft);
PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output,
"[%s:%d] sensor:check_heartbeat for proc %s:%d",
pmix_globals.myid.nspace, pmix_globals.myid.rank,
@ -301,6 +309,8 @@ static void add_beat(int sd, short args, void *cbdata)
pmix_psensor_beat_t *b = (pmix_psensor_beat_t*)cbdata;
pmix_heartbeat_trkr_t *ft;
PMIX_ACQUIRE_OBJECT(b);
/* find this peer in our trackers */
PMIX_LIST_FOREACH(ft, &mca_psensor_heartbeat_component.trackers, pmix_heartbeat_trkr_t) {
if (ft->requestor == b->peer) {
@ -326,5 +336,6 @@ void pmix_psensor_heartbeat_recv_beats(struct pmix_peer_t *peer,
/* shift this to our thread for processing */
pmix_event_assign(&b->ev, pmix_psensor_base.evbase, -1,
EV_WRITE, add_beat, b);
PMIX_POST_OBJECT(b);
pmix_event_active(&b->ev, EV_WRITE, 1);
}

Просмотреть файл

@ -284,6 +284,8 @@ static void* listen_thread(void *obj)
pmix_output_verbose(8, pmix_globals.debug_output,
"listen_thread: new connection: (%d, %d)",
pending_connection->sd, pmix_socket_errno);
/* post the object */
PMIX_POST_OBJECT(pending_connection);
/* activate the event */
pmix_event_active(&pending_connection->ev, EV_WRITE, 1);
accepted_connections++;

Просмотреть файл

@ -312,6 +312,9 @@ void pmix_ptl_base_send_handler(int sd, short flags, void *cbdata)
pmix_ptl_send_t *msg = peer->send_msg;
pmix_status_t rc;
/* acquire the object */
PMIX_ACQUIRE_OBJECT(peer);
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:send_handler SENDING TO PEER %s:%d tag %u with %s msg",
peer->info->nptr->nspace, peer->info->rank,
@ -374,6 +377,9 @@ void pmix_ptl_base_recv_handler(int sd, short flags, void *cbdata)
size_t nbytes;
char *ptr;
/* acquire the object */
PMIX_ACQUIRE_OBJECT(peer);
pmix_output_verbose(2, pmix_globals.debug_output,
"ptl:base:recv:handler called with peer %s:%d",
(NULL == peer) ? "NULL" : peer->info->nptr->nspace,
@ -502,6 +508,9 @@ void pmix_ptl_base_send(int sd, short args, void *cbdata)
pmix_ptl_queue_t *queue = (pmix_ptl_queue_t*)cbdata;
pmix_ptl_send_t *snd;
/* acquire the object */
PMIX_ACQUIRE_OBJECT(queue);
if (NULL == queue->peer || queue->peer->sd < 0 ||
NULL == queue->peer->info || NULL == queue->peer->info->nptr) {
/* this peer has lost connection */
@ -546,6 +555,9 @@ void pmix_ptl_base_send_recv(int fd, short args, void *cbdata)
pmix_ptl_send_t *snd;
uint32_t tag;
/* acquire the object */
PMIX_ACQUIRE_OBJECT(ms);
if (ms->peer->sd < 0) {
/* this peer's socket has been closed */
PMIX_RELEASE(ms);
@ -607,6 +619,9 @@ void pmix_ptl_base_process_msg(int fd, short flags, void *cbdata)
pmix_ptl_posted_recv_t *rcv;
pmix_buffer_t buf;
/* acquire the object */
PMIX_ACQUIRE_OBJECT(msg);
pmix_output_verbose(5, pmix_globals.debug_output,
"message received %d bytes for tag %u on socket %d",
(int)msg->hdr.nbytes, msg->hdr.tag, msg->sd);

Просмотреть файл

@ -145,6 +145,7 @@ PMIX_CLASS_DECLARATION(pmix_ptl_sr_t);
typedef struct {
pmix_object_t super;
volatile bool active;
pmix_event_t ev;
struct pmix_peer_t *peer;
pmix_buffer_t *buf;
@ -205,6 +206,7 @@ PMIX_CLASS_DECLARATION(pmix_listener_t);
__FILE__, __LINE__); \
pmix_event_assign(&((ms)->ev), pmix_globals.evbase, -1, \
EV_WRITE, pmix_ptl_base_process_msg, (ms)); \
PMIX_POST_OBJECT(ms); \
pmix_event_active(&((ms)->ev), EV_WRITE, 1); \
} while (0)
@ -245,6 +247,7 @@ PMIX_CLASS_DECLARATION(pmix_listener_t);
/* add it to the queue */ \
pmix_list_append(&(p)->send_queue, &snd->super); \
} \
PMIX_POST_OBJECT(snd); \
/* ensure the send event is active */ \
if (!(p)->send_ev_active && 0 <= (p)->sd) { \
pmix_event_add(&(p)->send_event, 0); \

Просмотреть файл

@ -347,9 +347,7 @@ static pmix_status_t send_recv(struct pmix_peer_t *peer,
ms->bfr = bfr;
ms->cbfunc = cbfunc;
ms->cbdata = cbdata;
pmix_event_assign(&ms->ev, pmix_globals.evbase, -1,
EV_WRITE, pmix_ptl_base_send_recv, ms);
pmix_event_active(&ms->ev, EV_WRITE, 1);
PMIX_THREADSHIFT(ms, pmix_ptl_base_send_recv);
return PMIX_SUCCESS;
}
@ -368,10 +366,7 @@ static pmix_status_t send_oneway(struct pmix_peer_t *peer,
q->peer = pr;
q->buf = bfr;
q->tag = tag;
pmix_event_assign(&q->ev, pmix_globals.evbase, -1,
EV_WRITE, pmix_ptl_base_send, q);
pmix_event_active(&q->ev, EV_WRITE, 1);
PMIX_THREADSHIFT(q, pmix_ptl_base_send);
return PMIX_SUCCESS;
}

Просмотреть файл

@ -687,6 +687,9 @@ static void connection_handler(int sd, short args, void *cbdata)
pmix_rank_info_t *info;
pmix_proc_t proc;
/* acquire the object */
PMIX_ACQUIRE_OBJECT(pnd);
pmix_output_verbose(8, pmix_ptl_base_framework.framework_output,
"ptl:tcp:connection_handler: new connection: %d",
pnd->sd);
@ -717,7 +720,7 @@ static void connection_handler(int sd, short args, void *cbdata)
PMIX_RELEASE(pnd);
return;
}
if (PMIX_SUCCESS != pmix_ptl_base_recv_blocking(pnd->sd, msg, hdr.nbytes)) {
if (PMIX_SUCCESS != (rc = pmix_ptl_base_recv_blocking(pnd->sd, msg, hdr.nbytes))) {
/* unable to complete the recv */
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"ptl:tcp:connection_handler unable to complete recv of connect-ack with client ON SOCKET %d",
@ -972,7 +975,7 @@ static void connection_handler(int sd, short args, void *cbdata)
/* tell the client all is good */
u32 = htonl(PMIX_SUCCESS);
if (PMIX_SUCCESS != pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(uint32_t))) {
if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(uint32_t)))) {
PMIX_ERROR_LOG(rc);
info->proc_cnt--;
PMIX_RELEASE(info);
@ -1024,7 +1027,8 @@ static void connection_handler(int sd, short args, void *cbdata)
error:
/* send an error reply to the client */
if (PMIX_SUCCESS != pmix_ptl_base_send_blocking(pnd->sd, (char*)&rc, sizeof(int))) {
u32 = htonl(rc);
if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(int)))) {
PMIX_ERROR_LOG(rc);
CLOSE_THE_SOCKET(pnd->sd);
}
@ -1042,6 +1046,9 @@ static void process_cbfunc(int sd, short args, void *cbdata)
int rc;
uint32_t u32;
/* acquire the object */
PMIX_ACQUIRE_OBJECT(cd);
/* send this status so they don't hang */
u32 = ntohl(cd->status);
if (PMIX_SUCCESS != (rc = pmix_ptl_base_send_blocking(pnd->sd, (char*)&u32, sizeof(uint32_t)))) {

Просмотреть файл

@ -199,9 +199,7 @@ static pmix_status_t send_recv(struct pmix_peer_t *peer,
ms->bfr = bfr;
ms->cbfunc = cbfunc;
ms->cbdata = cbdata;
pmix_event_assign(&ms->ev, pmix_globals.evbase, -1,
EV_WRITE, pmix_ptl_base_send_recv, ms);
pmix_event_active(&ms->ev, EV_WRITE, 1);
PMIX_THREADSHIFT(ms, pmix_ptl_base_send_recv);
return PMIX_SUCCESS;
}
@ -220,9 +218,7 @@ static pmix_status_t send_oneway(struct pmix_peer_t *peer,
q->peer = peer;
q->buf = bfr;
q->tag = tag;
pmix_event_assign(&q->ev, pmix_globals.evbase, -1,
EV_WRITE, pmix_ptl_base_send, q);
pmix_event_active(&q->ev, EV_WRITE, 1);
PMIX_THREADSHIFT(q, pmix_ptl_base_send);
return PMIX_SUCCESS;
}

Просмотреть файл

@ -412,6 +412,9 @@ static void connection_handler(int sd, short args, void *cbdata)
pmix_proc_t proc;
size_t len;
/* acquire the object */
PMIX_ACQUIRE_OBJECT(pnd);
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
"USOCK CONNECTION FROM PEER ON SOCKET %d", pnd->sd);

Просмотреть файл

@ -298,6 +298,8 @@ static void _register_nspace(int sd, short args, void *cbdata)
int32_t cnt;
#endif
PMIX_ACQUIRE_OBJECT(caddy);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:server _register_nspace %s", cd->proc.nspace);
@ -521,6 +523,8 @@ static void _deregister_nspace(int sd, short args, void *cbdata)
pmix_nspace_t *tmp;
pmix_status_t rc = PMIX_SUCCESS;
PMIX_ACQUIRE_OBJECT(cd);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:server _deregister_nspace %s",
cd->proc.nspace);
@ -578,6 +582,8 @@ void pmix_server_execute_collective(int sd, short args, void *cbdata)
pmix_rank_info_t *info;
pmix_value_t *val;
PMIX_ACQUIRE_OBJECT(tcd);
/* we don't need to check for non-NULL APIs here as
* that was already done when the tracker was created */
if (PMIX_FENCENB_CMD == trk->type) {
@ -659,6 +665,8 @@ static void _register_client(int sd, short args, void *cbdata)
bool all_def;
size_t i;
PMIX_ACQUIRE_OBJECT(cd);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:server _register_client for nspace %s rank %d",
cd->proc.nspace, cd->proc.rank);
@ -797,6 +805,8 @@ static void _deregister_client(int sd, short args, void *cbdata)
pmix_rank_info_t *info;
pmix_nspace_t *nptr, *tmp;
PMIX_ACQUIRE_OBJECT(cd);
pmix_output_verbose(2, pmix_globals.debug_output,
"pmix:server _deregister_client for nspace %s rank %d",
cd->proc.nspace, cd->proc.rank);
@ -910,6 +920,8 @@ static void _dmodex_req(int sd, short args, void *cbdata)
pmix_dmdx_remote_t *dcd;
pmix_status_t rc;
PMIX_ACQUIRE_OBJECT(cd);
pmix_output_verbose(2, pmix_globals.debug_output,
"DMODX LOOKING FOR %s:%d",
cd->proc.nspace, cd->proc.rank);
@ -1038,6 +1050,8 @@ static void _store_internal(int sd, short args, void *cbdata)
pmix_shift_caddy_t *cd = (pmix_shift_caddy_t*)cbdata;
pmix_nspace_t *ns, *nsptr;
PMIX_ACQUIRE_OBJECT(cd);
ns = NULL;
PMIX_LIST_FOREACH(nsptr, &pmix_globals.nspaces, pmix_nspace_t) {
if (0 == strncmp(cd->nspace, nsptr->nspace, PMIX_MAX_NSLEN)) {
@ -1453,6 +1467,8 @@ static void _setup_app(int sd, short args, void *cbdata)
pmix_kval_t *kv;
size_t n;
PMIX_ACQUIRE_OBJECT(cd);
PMIX_CONSTRUCT(&ilist, pmix_list_t);
/* pass to the network libraries */
@ -1529,6 +1545,8 @@ static void _setup_local_support(int sd, short args, void *cbdata)
pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata;
pmix_status_t rc;
PMIX_ACQUIRE_OBJECT(cd);
/* pass to the network libraries */
rc = pmix_pnet.setup_local_network(cd->nspace, cd->info, cd->ninfo);
@ -1611,6 +1629,8 @@ static void _spcb(int sd, short args, void *cbdata)
pmix_status_t rc;
char *msg;
PMIX_ACQUIRE_OBJECT(cd);
/* setup the reply with the returned status */
reply = PMIX_NEW(pmix_buffer_t);
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(reply, &cd->status, 1, PMIX_STATUS))) {
@ -1715,6 +1735,8 @@ static void _mdxcbfunc(int sd, short argc, void *cbdata)
int32_t cnt = 1;
char byte;
PMIX_ACQUIRE_OBJECT(scd);
/* pass the blobs being returned */
PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
@ -1978,6 +2000,8 @@ static void _cnct(int sd, short args, void *cbdata)
pmix_nspace_t *nptr;
pmix_buffer_t *job_info_ptr;
PMIX_ACQUIRE_OBJECT(scd);
/* setup the reply, starting with the returned status */
reply = PMIX_NEW(pmix_buffer_t);
if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(reply, &scd->status, 1, PMIX_STATUS))) {

Просмотреть файл

@ -63,6 +63,7 @@ extern pmix_server_module_t pmix_host_server;
typedef struct {
pmix_object_t super;
pmix_event_t ev;
volatile bool active;
pmix_status_t status;
const char *data;
size_t ndata;
@ -597,6 +598,8 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata)
pmix_nspace_t *ns, *nptr;
pmix_status_t rc;
PMIX_ACQUIRE_OBJECT(caddy);
pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] process dmdx reply from %s:%u",
__FILE__, __LINE__,
@ -709,7 +712,5 @@ static void dmdx_cbfunc(pmix_status_t status,
"[%s:%d] queue dmdx reply for %s:%u",
__FILE__, __LINE__,
caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
pmix_event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE,
_process_dmdx_reply, caddy);
pmix_event_active(&caddy->ev, EV_WRITE, 1);
PMIX_THREADSHIFT(caddy, _process_dmdx_reply);
}

Просмотреть файл

@ -116,6 +116,19 @@ PMIX_EXPORT PMIX_CLASS_DECLARATION(pmix_thread_t);
} while(0);
/* provide a macro for forward-proofing the shifting
* of objects between threads - at some point, we
* may revamp our threading model */
/* post an object to another thread - for now, we
* only have a memory barrier */
#define PMIX_POST_OBJECT(o) pmix_atomic_wmb()
/* acquire an object from another thread - for now,
* we only have a memory barrier */
#define PMIX_ACQUIRE_OBJECT(o) pmix_atomic_rmb()
PMIX_EXPORT int pmix_thread_start(pmix_thread_t *);
PMIX_EXPORT int pmix_thread_join(pmix_thread_t *, void **thread_return);
PMIX_EXPORT bool pmix_thread_self_compare(pmix_thread_t*);