1
1

Merge pull request #1315 from alex-mikheev/topic/oshmem_ucx_atomic

OSHMEM/UCX: implements atomic support
Этот коммит содержится в:
Mike Dubman 2016-01-22 20:46:44 +02:00
родитель 243d973cfe f627608e42
Коммит 89c7fea492
8 изменённых файлов: 139 добавлений и 89 удалений

Просмотреть файл

@ -27,32 +27,25 @@ int mca_atomic_basic_fadd(void *target,
struct oshmem_op_t *op)
{
int rc = OSHMEM_SUCCESS;
long long temp_value = 0;
if (!target || !value) {
rc = OSHMEM_ERROR;
}
atomic_basic_lock(pe);
rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe));
if (prev)
memcpy(prev, (void*) &temp_value, nlong);
op->o_func.c_fn((void*) value,
(void*) &temp_value,
nlong / op->dt_size);
if (rc == OSHMEM_SUCCESS) {
long long temp_value = 0;
atomic_basic_lock(pe);
rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe));
if (prev)
memcpy(prev, (void*) &temp_value, nlong);
op->o_func.c_fn((void*) value,
(void*) &temp_value,
nlong / op->dt_size);
if (rc == OSHMEM_SUCCESS) {
rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe));
shmem_quiet();
}
atomic_basic_unlock(pe);
rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe));
shmem_quiet();
}
atomic_basic_unlock(pe);
return rc;
}

Просмотреть файл

@ -41,18 +41,6 @@ int mca_atomic_mxm_cswap(void *target,
ptl_id = -1;
mxm_err = MXM_OK;
if (!prev || !target || !value) {
ATOMIC_ERROR("[#%d] Whether target, value or prev are not defined",
my_pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}
if ((pe < 0) || (pe >= oshmem_num_procs())) {
ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}
switch (nlong) {
case 1:
nlong_order = 0;

Просмотреть файл

@ -43,18 +43,6 @@ int mca_atomic_mxm_fadd(void *target,
ptl_id = -1;
mxm_err = MXM_OK;
if (!target || !value) {
ATOMIC_ERROR("[#%d] target or value are not defined", my_pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}
if ((pe < 0) || (pe >= oshmem_num_procs())) {
ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}
switch (nlong) {
case 1:
nlong_order = 0;

Просмотреть файл

@ -97,7 +97,7 @@ static int ucx_open(void)
*/
if (strcmp(mca_spml_base_selected_component.spmlm_version.mca_component_name, "ucx")) {
ATOMIC_VERBOSE(5,
"Can not use atomic/ucx because spml ikrit component disabled");
"Can not use atomic/ucx because spml ucx component disabled");
return OSHMEM_ERR_NOT_AVAILABLE;
}
mca_spml_self = (mca_spml_ucx_t *)mca_spml.self;

Просмотреть файл

@ -13,11 +13,8 @@
#include <stdlib.h>
#include "oshmem/constants.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"
#include "atomic_ucx.h"
@ -29,6 +26,45 @@ int mca_atomic_ucx_cswap(void *target,
size_t nlong,
int pe)
{
return OSHMEM_SUCCESS;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva);
if (NULL == cond) {
switch (nlong) {
case 4:
status = ucp_atomic_swap32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)value, rva, ucx_mkey->rkey, prev);
break;
case 8:
status = ucp_atomic_swap64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)value, rva, ucx_mkey->rkey, prev);
break;
default:
goto err_size;
}
}
else {
switch (nlong) {
case 4:
status = ucp_atomic_cswap32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)cond, *(uint32_t *)value, rva, ucx_mkey->rkey, prev);
break;
case 8:
status = ucp_atomic_cswap64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)cond, *(uint64_t *)value, rva, ucx_mkey->rkey, prev);
break;
default:
goto err_size;
}
}
return ucx_status_to_oshmem(status);
err_size:
ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe);
return OSHMEM_ERROR;
}

Просмотреть файл

@ -13,13 +13,8 @@
#include <stdlib.h>
#include "oshmem/constants.h"
#include "oshmem/op/op.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"
#include "atomic_ucx.h"
@ -30,6 +25,44 @@ int mca_atomic_ucx_fadd(void *target,
int pe,
struct oshmem_op_t *op)
{
/* TODO: actual code */
return OSHMEM_SUCCESS;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;
ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva);
if (NULL == prev) {
switch (nlong) {
case 4:
status = ucp_atomic_add32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)value, rva, ucx_mkey->rkey);
break;
case 8:
status = ucp_atomic_add64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)value, rva, ucx_mkey->rkey);
break;
default:
goto err_size;
}
}
else {
switch (nlong) {
case 4:
status = ucp_atomic_fadd32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)value, rva, ucx_mkey->rkey, prev);
break;
case 8:
status = ucp_atomic_fadd64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)value, rva, ucx_mkey->rkey, prev);
break;
default:
goto err_size;
}
}
return ucx_status_to_oshmem(status);
err_size:
ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe);
return OSHMEM_ERROR;
}

Просмотреть файл

@ -53,7 +53,7 @@ mca_spml_ucx_t mca_spml_ucx = {
mca_spml_ucx_deregister,
mca_spml_base_oob_get_mkeys,
mca_spml_ucx_put,
NULL, //mca_spml_ucx_put_nb,
NULL, /* todo: mca_spml_ucx_put_nb, */
mca_spml_ucx_get,
mca_spml_ucx_recv,
mca_spml_ucx_send,
@ -174,6 +174,8 @@ static void dump_address(int pe, char *addr, size_t len)
#endif
}
static char spml_ucx_transport_ids[1] = { 0 };
int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
{
size_t i, n;
@ -208,7 +210,6 @@ int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
/* Get the EP connection requests for all the processes from modex */
for (n = 0; n < nprocs; ++n) {
i = (my_rank + n) % nprocs;
//if (i == my_rank) continue;
dump_address(i, (char *)(wk_raddrs + wk_roffs[i]), wk_rsizes[i]);
err = ucp_ep_create(mca_spml_ucx.ucp_worker,
(ucp_address_t *)(wk_raddrs + wk_roffs[i]),
@ -217,6 +218,8 @@ int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
SPML_ERROR("ucp_ep_create failed!!!\n");
goto error2;
}
procs[i]->num_transports = 1;
procs[i]->transport_ids = spml_ucx_transport_ids;
}
ucp_worker_release_address(mca_spml_ucx.ucp_worker, wk_local_addr);
@ -377,46 +380,27 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
int mca_spml_ucx_get(void *src_addr, size_t size, void *dst_addr, int src)
{
void *rva;
sshmem_mkey_t *r_mkey;
ucs_status_t err;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, 0, &rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
src, src_addr);
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}
ucx_mkey = mca_spml_ucx_get_mkey(src, src_addr, &rva);
status = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);
err = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
return ucx_status_to_oshmem(status);
}
int mca_spml_ucx_put(void* dst_addr, size_t size, void* src_addr, int dst)
{
void *rva;
sshmem_mkey_t *r_mkey;
ucs_status_t err;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, 0, &rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
dst, dst_addr);
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}
ucx_mkey = mca_spml_ucx_get_mkey(dst, dst_addr, &rva);
status = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);
err = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
return ucx_status_to_oshmem(status);
}
int mca_spml_ucx_fence(void)

Просмотреть файл

@ -16,12 +16,17 @@
#include "oshmem_config.h"
#include "oshmem/request/request.h"
#include "oshmem/mca/spml/base/base.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/util/oshmem_util.h"
#include "oshmem/mca/spml/base/spml_base_putreq.h"
#include "oshmem/proc/proc.h"
#include "oshmem/mca/spml/base/spml_base_request.h"
#include "oshmem/mca/spml/base/spml_base_getreq.h"
#include "oshmem/runtime/runtime.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "orte/runtime/orte_globals.h"
@ -98,6 +103,29 @@ extern int mca_spml_ucx_fence(void);
extern int mca_spml_ucx_quiet(void);
extern int spml_ucx_progress(void);
static inline spml_ucx_mkey_t *
mca_spml_ucx_get_mkey(int pe, void *va, void **rva)
{
sshmem_mkey_t *r_mkey;
r_mkey = mca_memheap_base_get_cached_mkey(pe, va, 0, rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of symmetric variable",
pe, va);
oshmem_shmem_abort(-1);
return NULL;
}
return (spml_ucx_mkey_t *)(r_mkey->spml_context);
}
static inline ucx_status_to_oshmem(ucs_status_t status)
{
return OPAL_LIKELY(UCS_OK == status) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
}
END_C_DECLS
#endif