opal/common/ucx: Somplify Worker Pool context management
Signed-off-by: Artem Polyakov <artpol84@gmail.com>
Этот коммит содержится в:
родитель
8b7fa927ba
Коммит
6b7acdf21f
@ -389,20 +389,15 @@ opal_common_ucx_wpctx_create(opal_common_ucx_wpool_t *wpool, int comm_size,
|
||||
opal_common_ucx_ctx_t **ctx_ptr)
|
||||
{
|
||||
opal_common_ucx_ctx_t *ctx = calloc(1, sizeof(*ctx));
|
||||
pthread_rwlockattr_t attr;
|
||||
int ret = OPAL_SUCCESS;
|
||||
|
||||
ctx->ctx_id = OPAL_ATOMIC_ADD_FETCH32(&wpool->cur_ctxid, 1);
|
||||
DBG_OUT("ctx_create: ctx_id = %d\n", (int)ctx->ctx_id);
|
||||
|
||||
//OBJ_CONSTRUCT(&ctx->mutex, opal_mutex_t);
|
||||
// TODO: check return codes
|
||||
pthread_rwlockattr_init(&attr);
|
||||
pthread_rwlock_init(&ctx->rwlock, &attr);
|
||||
pthread_rwlockattr_destroy(&attr);
|
||||
|
||||
OBJ_CONSTRUCT(&ctx->mutex, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&ctx->tls_workers, opal_list_t);
|
||||
ctx->released = 0;
|
||||
ctx->refcntr = 1; /* application holding the context */
|
||||
ctx->wpool = wpool;
|
||||
ctx->comm_size = comm_size;
|
||||
|
||||
@ -430,39 +425,26 @@ error:
|
||||
OPAL_DECLSPEC void
|
||||
opal_common_ucx_wpctx_release(opal_common_ucx_ctx_t *ctx)
|
||||
{
|
||||
_ctx_record_list_item_t *item = NULL;
|
||||
int can_free = 0;
|
||||
int my_refcntr = -1;
|
||||
|
||||
DBG_OUT("opal_common_ucx_ctx_release: ctx = %p\n", (void *)ctx);
|
||||
|
||||
/* Mark all the contexts and workers as being released
|
||||
* Threads involved in this context will reconize this
|
||||
* through the "released" flag and will perform cleanup
|
||||
* in a deferred manner.
|
||||
* If Worker pool will be destroyed earlier - wpool filalize
|
||||
* will take care of this.
|
||||
*/
|
||||
/* Application is expected to guarantee that no operation
|
||||
* is performed on the context that is being released */
|
||||
|
||||
//opal_mutex_lock(&ctx->mutex);
|
||||
pthread_rwlock_rdlock(&ctx->rwlock);
|
||||
|
||||
/* Mark that this function has been called */
|
||||
/* Mark that this context was released by application
|
||||
* Threads will use this flag to perform deferred cleanup */
|
||||
ctx->released = 1;
|
||||
/* Go over all TLS subscribed to this context and mark
|
||||
* that this handler is no longer in use */
|
||||
OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) {
|
||||
item->ptr->released = 1;
|
||||
}
|
||||
|
||||
if (0 == opal_list_get_size(&ctx->tls_workers)) {
|
||||
can_free = 1;
|
||||
}
|
||||
/* Decrement the reference counter */
|
||||
my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1);
|
||||
|
||||
//opal_mutex_unlock(&ctx->mutex);
|
||||
pthread_rwlock_unlock(&ctx->rwlock);
|
||||
/* Make sure that all the loads/stores are complete */
|
||||
opal_atomic_mb();
|
||||
|
||||
|
||||
if (can_free) {
|
||||
/* If there is no more references to this handler
|
||||
* we can release it */
|
||||
if (0 == my_refcntr) {
|
||||
_common_ucx_wpctx_free(ctx);
|
||||
}
|
||||
}
|
||||
@ -474,7 +456,7 @@ _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx)
|
||||
{
|
||||
free(ctx->recv_worker_addrs);
|
||||
free(ctx->recv_worker_displs);
|
||||
//OBJ_DESTRUCT(&ctx->mutex);
|
||||
OBJ_DESTRUCT(&ctx->mutex);
|
||||
OBJ_DESTRUCT(&ctx->tls_workers);
|
||||
DBG_OUT("_common_ucx_ctx_free: ctx = %p\n", (void *)ctx);
|
||||
free(ctx);
|
||||
@ -482,56 +464,63 @@ _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx)
|
||||
|
||||
/* Subscribe a new TLS to this context */
|
||||
static int
|
||||
_common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx, _tlocal_ctx_t *ctx_rec)
|
||||
_common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx,
|
||||
opal_common_ucx_winfo_t *winfo)
|
||||
{
|
||||
_ctx_record_list_item_t *item = OBJ_NEW(_ctx_record_list_item_t);
|
||||
if (NULL == item) {
|
||||
return OPAL_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
item->ptr = ctx_rec;
|
||||
/* Increment the reference counter */
|
||||
OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, 1);
|
||||
|
||||
//opal_mutex_lock(&ctx->mutex);
|
||||
pthread_rwlock_wrlock(&ctx->rwlock);
|
||||
/* Add new worker to the context */
|
||||
item->ptr = winfo;
|
||||
opal_mutex_lock(&ctx->mutex);
|
||||
opal_list_append(&ctx->tls_workers, &item->super);
|
||||
//opal_mutex_unlock(&ctx->mutex);
|
||||
pthread_rwlock_unlock(&ctx->rwlock);
|
||||
opal_mutex_unlock(&ctx->mutex);
|
||||
|
||||
DBG_OUT("_common_ucx_ctx_append: ctx = %p, ctx_rec = %p\n",
|
||||
(void *)ctx, (void *)ctx_rec);
|
||||
DBG_OUT("_common_ucx_wpctx_append: ctx = %p, winfo = %p\n",
|
||||
(void *)ctx, (void *)winfo);
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
|
||||
/* Unsubscribe a particular TLS to this context */
|
||||
static void
|
||||
_common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx, _tlocal_ctx_t *ctx_rec)
|
||||
_common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
|
||||
opal_common_ucx_winfo_t *winfo)
|
||||
{
|
||||
int can_free = 0;
|
||||
_ctx_record_list_item_t *item = NULL, *next;
|
||||
int my_refcntr = -1;
|
||||
|
||||
// opal_mutex_lock(&ctx->mutex);
|
||||
pthread_rwlock_wrlock(&ctx->rwlock);
|
||||
opal_mutex_lock(&ctx->mutex);
|
||||
|
||||
OPAL_LIST_FOREACH_SAFE(item, next, &ctx->tls_workers,
|
||||
_ctx_record_list_item_t) {
|
||||
if (ctx_rec == item->ptr) {
|
||||
if (winfo == item->ptr) {
|
||||
opal_list_remove_item(&ctx->tls_workers, &item->super);
|
||||
opal_mutex_lock(&winfo->mutex);
|
||||
winfo->released = 1;
|
||||
opal_mutex_unlock(&winfo->mutex);
|
||||
OBJ_RELEASE(item);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (0 == opal_list_get_size(&ctx->tls_workers) && ctx->released) {
|
||||
can_free = 1;
|
||||
}
|
||||
//opal_mutex_unlock(&ctx->mutex);
|
||||
pthread_rwlock_unlock(&ctx->rwlock);
|
||||
opal_mutex_unlock(&ctx->mutex);
|
||||
|
||||
if (can_free) {
|
||||
/* All references to this data structure are removed
|
||||
* we can safely release communication context structure */
|
||||
/* Make sure that all the loads/stores are complete */
|
||||
opal_atomic_mb();
|
||||
|
||||
/* Decrement the reference counter */
|
||||
my_refcntr = OPAL_ATOMIC_ADD_FETCH32(&ctx->refcntr, -1);
|
||||
|
||||
if (0 == my_refcntr) {
|
||||
/* All references to this data structure were removed
|
||||
* We can safely release communication context structure */
|
||||
_common_ucx_wpctx_free(ctx);
|
||||
}
|
||||
DBG_OUT("_common_ucx_ctx_remove: ctx = %p, ctx_rec = %p\n",
|
||||
(void *)ctx, (void *)ctx_rec);
|
||||
DBG_OUT("_common_ucx_wpctx_remove: ctx = %p, ctx_rec = %p\n",
|
||||
(void *)ctx, (void *)winfo);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -911,20 +900,13 @@ _tlocal_ctx_search(_tlocal_table_t *tls, int ctx_id)
|
||||
static int
|
||||
_tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec)
|
||||
{
|
||||
opal_common_ucx_winfo_t *winfo = ctx_rec->winfo;
|
||||
if (0 == ctx_rec->ctx_id) {
|
||||
return OPAL_SUCCESS;
|
||||
}
|
||||
/* Remove myself from the communication context structure
|
||||
* This may result in context release as we are using
|
||||
* delayed cleanup */
|
||||
_common_ucx_wpctx_remove(ctx_rec->gctx, ctx_rec);
|
||||
|
||||
/* Mark this winfo as free and it will be garbage-collected by
|
||||
* progress or flush function */
|
||||
opal_mutex_lock(&winfo->mutex);
|
||||
winfo->released = 1;
|
||||
opal_mutex_unlock(&winfo->mutex);
|
||||
_common_ucx_wpctx_remove(ctx_rec->gctx, ctx_rec->winfo);
|
||||
|
||||
/* Erase the record so it can be reused */
|
||||
memset(ctx_rec, 0, sizeof(*ctx_rec));
|
||||
@ -936,25 +918,25 @@ _tlocal_ctx_record_cleanup(_tlocal_ctx_t *ctx_rec)
|
||||
static _tlocal_ctx_t *
|
||||
_tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
|
||||
{
|
||||
size_t i;
|
||||
size_t i, free_idx = -1;
|
||||
int rc;
|
||||
|
||||
/* Try to find available record in the TLS table */
|
||||
/* Try to find available record in the TLS table
|
||||
* In parallel perform deferred cleanups */
|
||||
for (i=0; i<tls->ctx_tbl_size; i++) {
|
||||
if (0 == tls->ctx_tbl[i]->ctx_id) {
|
||||
/* Found clean record */
|
||||
break;
|
||||
}
|
||||
if (tls->ctx_tbl[i]->released ) {
|
||||
if (tls->ctx_tbl[i]->gctx->released ) {
|
||||
/* Found dirty record, need to clean first */
|
||||
_tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
|
||||
break;
|
||||
}
|
||||
if ((0 == tls->ctx_tbl[i]->ctx_id) && (0 > free_idx)) {
|
||||
/* Found clean record */
|
||||
free_idx = i;
|
||||
}
|
||||
}
|
||||
|
||||
/* if needed - extend the table */
|
||||
if( i >= tls->ctx_tbl_size ){
|
||||
i = tls->ctx_tbl_size;
|
||||
if (0 > free_idx) {
|
||||
free_idx = tls->ctx_tbl_size;
|
||||
rc = _tlocal_tls_ctxtbl_extend(tls, 4);
|
||||
if (rc) {
|
||||
//TODO: error out
|
||||
@ -962,10 +944,10 @@ _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
|
||||
}
|
||||
}
|
||||
|
||||
tls->ctx_tbl[i]->ctx_id = ctx->ctx_id;
|
||||
tls->ctx_tbl[i]->gctx = ctx;
|
||||
tls->ctx_tbl[i]->winfo = _wpool_get_idle(tls->wpool, ctx->comm_size);
|
||||
if (NULL == tls->ctx_tbl[i]->winfo) {
|
||||
tls->ctx_tbl[free_idx]->ctx_id = ctx->ctx_id;
|
||||
tls->ctx_tbl[free_idx]->gctx = ctx;
|
||||
tls->ctx_tbl[free_idx]->winfo = _wpool_get_idle(tls->wpool, ctx->comm_size);
|
||||
if (NULL == tls->ctx_tbl[free_idx]->winfo) {
|
||||
MCA_COMMON_UCX_ERROR("Failed to allocate new worker");
|
||||
return NULL;
|
||||
}
|
||||
@ -978,21 +960,21 @@ _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
|
||||
opal_atomic_wmb();
|
||||
|
||||
/* Add this worker to the active worker list */
|
||||
_wpool_add_active(tls->wpool, tls->ctx_tbl[i]->winfo);
|
||||
_wpool_add_active(tls->wpool, tls->ctx_tbl[free_idx]->winfo);
|
||||
|
||||
/* add this worker into the context list */
|
||||
rc = _common_ucx_wpctx_append(ctx, tls->ctx_tbl[i]);
|
||||
rc = _common_ucx_wpctx_append(ctx, tls->ctx_tbl[free_idx]->winfo);
|
||||
if (rc) {
|
||||
//TODO: error out
|
||||
return NULL;
|
||||
}
|
||||
|
||||
DBG_OUT("_tlocal_add_ctx: tls = %p, ctx_rec = %p, winfo = %p\n",
|
||||
(void *)tls, (void *)&tls->ctx_tbl[i],
|
||||
(void *)tls->ctx_tbl[i]->winfo);
|
||||
(void *)tls, (void *)&tls->ctx_tbl[free_idx],
|
||||
(void *)tls->ctx_tbl[free_idx]->winfo);
|
||||
|
||||
/* All good - return the record */
|
||||
return tls->ctx_tbl[i];
|
||||
return tls->ctx_tbl[free_idx];
|
||||
}
|
||||
|
||||
static int _tlocal_ctx_connect(_tlocal_ctx_t *ctx_rec, int target)
|
||||
@ -1037,7 +1019,6 @@ _tlocal_search_mem(_tlocal_table_t *tls, int mem_id)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
_tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
|
||||
{
|
||||
@ -1244,41 +1225,38 @@ opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
|
||||
|
||||
DBG_OUT("opal_common_ucx_mem_flush: mem = %p, target = %d\n", (void *)mem, target);
|
||||
|
||||
// TODO: make this as a read lock
|
||||
//opal_mutex_lock(&ctx->mutex);
|
||||
pthread_rwlock_rdlock(&ctx->rwlock);
|
||||
opal_mutex_lock(&ctx->mutex);
|
||||
|
||||
OPAL_LIST_FOREACH(item, &ctx->tls_workers, _ctx_record_list_item_t) {
|
||||
switch (scope) {
|
||||
case OPAL_COMMON_UCX_SCOPE_WORKER:
|
||||
opal_mutex_lock(&item->ptr->winfo->mutex);
|
||||
rc = opal_common_ucx_worker_flush(item->ptr->winfo->worker);
|
||||
opal_mutex_lock(&item->ptr->mutex);
|
||||
rc = opal_common_ucx_worker_flush(item->ptr->worker);
|
||||
if (rc != OPAL_SUCCESS) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_worker_flush failed: %d", rc);
|
||||
rc = OPAL_ERROR;
|
||||
}
|
||||
DBG_OUT("opal_common_ucx_mem_flush(after opal_common_ucx_worker_flush): worker = %p\n",
|
||||
(void *)item->ptr->winfo->worker);
|
||||
opal_mutex_unlock(&item->ptr->winfo->mutex);
|
||||
(void *)item->ptr->worker);
|
||||
opal_mutex_unlock(&item->ptr->mutex);
|
||||
break;
|
||||
case OPAL_COMMON_UCX_SCOPE_EP:
|
||||
if (NULL != item->ptr->winfo->endpoints[target] ) {
|
||||
opal_mutex_lock(&item->ptr->winfo->mutex);
|
||||
rc = opal_common_ucx_ep_flush(item->ptr->winfo->endpoints[target],
|
||||
item->ptr->winfo->worker);
|
||||
if (NULL != item->ptr->endpoints[target] ) {
|
||||
opal_mutex_lock(&item->ptr->mutex);
|
||||
rc = opal_common_ucx_ep_flush(item->ptr->endpoints[target],
|
||||
item->ptr->worker);
|
||||
if (rc != OPAL_SUCCESS) {
|
||||
MCA_COMMON_UCX_VERBOSE(1, "opal_common_ucx_ep_flush failed: %d", rc);
|
||||
rc = OPAL_ERROR;
|
||||
}
|
||||
DBG_OUT("opal_common_ucx_mem_flush(after opal_common_ucx_worker_flush): ep = %p worker = %p\n",
|
||||
(void *)item->ptr->winfo->endpoints[target],
|
||||
(void *)item->ptr->winfo->worker);
|
||||
opal_mutex_unlock(&item->ptr->winfo->mutex);
|
||||
(void *)item->ptr->endpoints[target],
|
||||
(void *)item->ptr->worker);
|
||||
opal_mutex_unlock(&item->ptr->mutex);
|
||||
}
|
||||
}
|
||||
}
|
||||
//opal_mutex_unlock(&ctx->mutex);
|
||||
pthread_rwlock_unlock(&ctx->rwlock);
|
||||
opal_mutex_unlock(&ctx->mutex);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
@ -22,7 +22,6 @@ typedef struct {
|
||||
/* Ref counting & locking*/
|
||||
int refcnt;
|
||||
opal_mutex_t mutex;
|
||||
//pthread_rwlock_t rwlock;
|
||||
|
||||
/* UCX data */
|
||||
ucp_context_h ucp_ctx;
|
||||
@ -44,8 +43,8 @@ typedef struct {
|
||||
|
||||
typedef struct {
|
||||
int ctx_id;
|
||||
//opal_mutex_t mutex;
|
||||
pthread_rwlock_t rwlock;
|
||||
opal_mutex_t mutex;
|
||||
opal_atomic_int32_t refcntr;
|
||||
|
||||
/* the reference to a Worker pool this context belongs to*/
|
||||
opal_common_ucx_wpool_t *wpool;
|
||||
|
@ -7,8 +7,6 @@
|
||||
|
||||
typedef struct {
|
||||
int ctx_id;
|
||||
// TODO: make sure that this is being set by external thread
|
||||
volatile int released;
|
||||
opal_common_ucx_ctx_t *gctx;
|
||||
opal_common_ucx_winfo_t *winfo;
|
||||
} _tlocal_ctx_t;
|
||||
@ -35,7 +33,7 @@ OBJ_CLASS_DECLARATION(_winfo_list_item_t);
|
||||
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
_tlocal_ctx_t *ptr;
|
||||
opal_common_ucx_winfo_t *ptr;
|
||||
} _ctx_record_list_item_t;
|
||||
OBJ_CLASS_DECLARATION(_ctx_record_list_item_t);
|
||||
|
||||
@ -99,9 +97,9 @@ static int _wpool_add_active(opal_common_ucx_wpool_t *wpool,
|
||||
/* Internal Worker Pool Context management */
|
||||
static void _common_ucx_wpctx_free(opal_common_ucx_ctx_t *ctx);
|
||||
static int _common_ucx_wpctx_append(opal_common_ucx_ctx_t *ctx,
|
||||
_tlocal_ctx_t *ctx_rec);
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
static void _common_ucx_wpctx_remove(opal_common_ucx_ctx_t *ctx,
|
||||
_tlocal_ctx_t *ctx_rec);
|
||||
opal_common_ucx_winfo_t *winfo);
|
||||
|
||||
/* Internal Worker Pool Memeory management */
|
||||
static int _comm_ucx_wpmem_map(opal_common_ucx_wpool_t *wpool,
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user