1
1

osc/rdma: add support for datatypes with large descriptions

This commit adds large datatype description support to the osc/rdma
component. Support is provided by an additional send/recv of the datatype
description if the description does not fit in an eager buffer. The
code is designed to require minimal new code and not for speed. We
consider this code path to be a slow path.

Refs trac:1905

cmr=v1.8:reviewer=jsquyres

This commit was SVN r31197.

The following Trac tickets were found above:
  Ticket 1905 --> https://svn.open-mpi.org/trac/ompi/ticket/1905
Этот коммит содержится в:
Nathan Hjelm 2014-03-24 18:57:29 +00:00
родитель 390645ac2a
Коммит 0ed44f2fdb
7 изменённых файлов: 524 добавлений и 226 удалений

Просмотреть файл

@ -85,10 +85,7 @@ ompi_osc_rdma_free(ompi_win_t *win)
OBJ_DESTRUCT(&module->pending_acc); OBJ_DESTRUCT(&module->pending_acc);
osc_rdma_gc_clean (module); osc_rdma_gc_clean ();
assert (0 == opal_list_get_size (&module->request_gc));
OBJ_DESTRUCT(&module->request_gc);
OBJ_DESTRUCT(&module->buffer_gc);
if (NULL != module->peers) { if (NULL != module->peers) {
free(module->peers); free(module->peers);

Просмотреть файл

@ -71,6 +71,12 @@ struct ompi_osc_rdma_component_t {
/** Is the progress function enabled? */ /** Is the progress function enabled? */
bool progress_enable; bool progress_enable;
/** List of requests that need to be freed */
opal_list_t request_gc;
/** List of buffers that need to be freed */
opal_list_t buffer_gc;
}; };
typedef struct ompi_osc_rdma_component_t ompi_osc_rdma_component_t; typedef struct ompi_osc_rdma_component_t ompi_osc_rdma_component_t;
@ -191,9 +197,6 @@ struct ompi_osc_rdma_module_t {
unsigned char *incoming_buffer; unsigned char *incoming_buffer;
ompi_request_t *frag_request; ompi_request_t *frag_request;
opal_list_t request_gc;
opal_list_t buffer_gc;
/* enforce accumulate semantics */ /* enforce accumulate semantics */
opal_atomic_lock_t accumulate_lock; opal_atomic_lock_t accumulate_lock;
@ -558,25 +561,41 @@ static inline void osc_rdma_copy_for_send (void *target, size_t target_len, void
* *
* @short Release finished PML requests and accumulate buffers. * @short Release finished PML requests and accumulate buffers.
* *
* @param[in] module - OSC RDMA module
*
* @long This function exists because it is not possible to free a PML request * @long This function exists because it is not possible to free a PML request
* or buffer from a request completion callback. We instead put requests * or buffer from a request completion callback. We instead put requests
* and buffers on the module's garbage collection lists and release then * and buffers on the module's garbage collection lists and release then
* at a later time. * at a later time.
*/ */
static inline void osc_rdma_gc_clean (ompi_osc_rdma_module_t *module) static inline void osc_rdma_gc_clean (void)
{ {
ompi_request_t *request; ompi_request_t *request;
opal_list_item_t *item; opal_list_item_t *item;
while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) { OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock);
while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&mca_osc_rdma_component.request_gc))) {
ompi_request_free (&request); ompi_request_free (&request);
} }
while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) { while (NULL != (item = opal_list_remove_first (&mca_osc_rdma_component.buffer_gc))) {
OBJ_RELEASE(item); OBJ_RELEASE(item);
} }
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock);
}
static inline void osc_rdma_gc_add_request (ompi_request_t *request)
{
OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock);
opal_list_append (&mca_osc_rdma_component.request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock);
}
static inline void osc_rdma_gc_add_buffer (opal_list_item_t *buffer)
{
OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock);
opal_list_append (&mca_osc_rdma_component.buffer_gc, buffer);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock);
} }
#define OSC_RDMA_FRAG_TAG 0x10000 #define OSC_RDMA_FRAG_TAG 0x10000

Просмотреть файл

@ -56,9 +56,19 @@ static int ompi_osc_rdma_req_comm_complete (ompi_request_t *request)
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* put this request on the garbage colletion list */ /* put this request on the garbage colletion list */
OPAL_THREAD_LOCK(&module->lock); osc_rdma_gc_add_request (request);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock); return OMPI_SUCCESS;
}
static int ompi_osc_rdma_dt_send_complete (ompi_request_t *request)
{
ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data;
OBJ_RELEASE(datatype);
/* put this request on the garbage colletion list */
osc_rdma_gc_add_request (request);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -234,9 +244,10 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count,
ompi_osc_rdma_frag_t *frag; ompi_osc_rdma_frag_t *frag;
ompi_osc_rdma_header_put_t *header; ompi_osc_rdma_header_put_t *header;
size_t ddt_len, payload_len, frag_len; size_t ddt_len, payload_len, frag_len;
bool is_long_datatype = false;
bool is_long_msg = false; bool is_long_msg = false;
const void *packed_ddt; const void *packed_ddt;
int tag, ret; int tag = -1, ret;
char *ptr; char *ptr;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@ -273,9 +284,16 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count,
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
frag_len = sizeof(ompi_osc_rdma_header_put_t) + ddt_len; frag_len = sizeof(ompi_osc_rdma_header_put_t) + ddt_len;
ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr); ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OMPI_SUCCESS != ret) { if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock); /* allocate space for the header plus space to store ddt_len */
return OMPI_ERR_OUT_OF_RESOURCE; frag_len = sizeof(ompi_osc_rdma_header_put_t) + 8;
ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
is_long_datatype = true;
} }
is_long_msg = true; is_long_msg = true;
@ -285,7 +303,8 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count,
OPAL_THREAD_UNLOCK(&module->lock); OPAL_THREAD_UNLOCK(&module->lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: put long protocol: %d", (int) is_long_msg)); "osc rdma: put long protocol: %d, large datatype: %d",
(int) is_long_msg, (int) is_long_datatype));
header = (ompi_osc_rdma_header_put_t *) ptr; header = (ompi_osc_rdma_header_put_t *) ptr;
header->base.flags = 0; header->base.flags = 0;
@ -294,42 +313,63 @@ static inline int ompi_osc_rdma_put_w_req (void *origin_addr, int origin_count,
header->displacement = target_disp; header->displacement = target_disp;
ptr += sizeof(ompi_osc_rdma_header_put_t); ptr += sizeof(ompi_osc_rdma_header_put_t);
ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt); do {
memcpy((unsigned char*) ptr, packed_ddt, ddt_len); ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt);
ptr += ddt_len; if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
if (!is_long_msg) {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_PUT;
osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count,
origin_dt);
/* the user's buffer is no longer needed so mark the request as
* complete. */
if (request) {
ompi_osc_rdma_request_complete (request, MPI_SUCCESS);
} }
} else {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG;
header->tag = tag; if (is_long_datatype) {
/* the datatype does not fit in an eager message. send it seperately */
header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE;
/* increase the outgoing signal count */ OBJ_RETAIN(target_dt);
ompi_osc_signal_outgoing (module, target, 1);
if (request) { ret = ompi_osc_rdma_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target,
request->outstanding_requests = 1; tag, module->comm, ompi_osc_rdma_dt_send_complete,
ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_dt, target_dt);
target, tag, module->comm, ompi_osc_rdma_req_comm_complete, if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
request); break;
}
*((uint64_t *) ptr) = ddt_len;
ptr += 8;
} else { } else {
ret = ompi_osc_rdma_component_isend (module,origin_addr, origin_count, origin_dt, target, tag, memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
module->comm); ptr += ddt_len;
} }
if (OMPI_SUCCESS != ret) goto cleanup;
}
cleanup: if (!is_long_msg) {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_PUT;
osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count,
origin_dt);
/* the user's buffer is no longer needed so mark the request as
* complete. */
if (request) {
ompi_osc_rdma_request_complete (request, MPI_SUCCESS);
}
} else {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG;
header->tag = tag;
/* increase the outgoing signal count */
ompi_osc_signal_outgoing (module, target, 1);
if (request) {
request->outstanding_requests = 1;
ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_dt,
target, tag, module->comm, ompi_osc_rdma_req_comm_complete,
request);
} else {
ret = ompi_osc_rdma_component_isend (module,origin_addr, origin_count, origin_dt, target, tag,
module->comm);
}
}
} while (0);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID; header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID;
} }
@ -372,11 +412,12 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count,
int ret; int ret;
ompi_osc_rdma_module_t *module = GET_MODULE(win); ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target); ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target);
bool is_long_datatype = false;
bool is_long_msg = false;
ompi_osc_rdma_frag_t *frag; ompi_osc_rdma_frag_t *frag;
ompi_osc_rdma_header_acc_t *header; ompi_osc_rdma_header_acc_t *header;
size_t ddt_len, payload_len, frag_len; size_t ddt_len, payload_len, frag_len;
char *ptr; char *ptr;
bool is_long_msg = false;
const void *packed_ddt; const void *packed_ddt;
int tag; int tag;
@ -416,11 +457,16 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count,
frag_len = sizeof(ompi_osc_rdma_header_acc_t) + ddt_len; frag_len = sizeof(ompi_osc_rdma_header_acc_t) + ddt_len;
ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr); ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OMPI_SUCCESS != ret) { if (OMPI_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, /* allocate space for the header plus space to store ddt_len */
"acc: out of resource error while trying to allocate a fragment")); frag_len = sizeof(ompi_osc_rdma_header_put_t) + 8;
OPAL_THREAD_UNLOCK(&module->lock); ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr);
return OMPI_ERR_OUT_OF_RESOURCE; if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
} OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
is_long_datatype = true;
}
is_long_msg = true; is_long_msg = true;
tag = get_tag (module); tag = get_tag (module);
@ -436,50 +482,71 @@ ompi_osc_rdma_accumulate_w_req (void *origin_addr, int origin_count,
header->op = op->o_f_to_c_index; header->op = op->o_f_to_c_index;
ptr += sizeof(ompi_osc_rdma_header_acc_t); ptr += sizeof(ompi_osc_rdma_header_acc_t);
ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt); do {
memcpy((unsigned char*) ptr, packed_ddt, ddt_len); ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt);
ptr += ddt_len; if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
if (!is_long_msg) {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_ACC;
osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc,
origin_count, origin_dt);
/* the user's buffer is no longer needed so mark the request as
* complete. */
if (request) {
ompi_osc_rdma_request_complete (request, MPI_SUCCESS);
} }
} else {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG;
header->tag = tag; if (is_long_datatype) {
/* the datatype does not fit in an eager message. send it seperately */
header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE;
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, OBJ_RETAIN(target_dt);
"acc: starting long accumulate with tag %d", tag));
/* increment the outgoing send count */ ret = ompi_osc_rdma_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target,
ompi_osc_signal_outgoing (module, target, 1); tag, module->comm, ompi_osc_rdma_dt_send_complete,
target_dt);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
if (request) { *((uint64_t *) ptr) = ddt_len;
request->outstanding_requests = 1; ptr += 8;
ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_dt,
target, tag, module->comm, ompi_osc_rdma_req_comm_complete,
request);
} else { } else {
ret = ompi_osc_rdma_component_isend (module, origin_addr, origin_count, origin_dt, target, tag, memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
module->comm); ptr += ddt_len;
} }
if (OMPI_SUCCESS != ret) { if (!is_long_msg) {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_ACC;
osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc,
origin_count, origin_dt);
/* the user's buffer is no longer needed so mark the request as
* complete. */
if (request) {
ompi_osc_rdma_request_complete (request, MPI_SUCCESS);
}
} else {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG;
header->tag = tag;
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"acc: send failed with eror %d", ret)); "acc: starting long accumulate with tag %d", tag));
}
}
/* mark the fragment as valid */ /* increment the outgoing send count */
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { ompi_osc_signal_outgoing (module, target, 1);
if (request) {
request->outstanding_requests = 1;
ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_dt,
target, tag, module->comm, ompi_osc_rdma_req_comm_complete,
request);
} else {
ret = ompi_osc_rdma_component_isend (module, origin_addr, origin_count, origin_dt, target, tag,
module->comm);
}
}
} while (0);
if (OMPI_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"acc: failed with eror %d", ret));
} else {
/* mark the fragment as valid */
header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID; header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID;
} }
@ -662,6 +729,7 @@ static inline int ompi_osc_rdma_rget_internal (void *origin_addr, int origin_cou
{ {
int ret, tag; int ret, tag;
ompi_osc_rdma_module_t *module = GET_MODULE(win); ompi_osc_rdma_module_t *module = GET_MODULE(win);
bool is_long_datatype = false;
ompi_osc_rdma_frag_t *frag; ompi_osc_rdma_frag_t *frag;
ompi_osc_rdma_header_get_t *header; ompi_osc_rdma_header_get_t *header;
size_t ddt_len, frag_len; size_t ddt_len, frag_len;
@ -713,8 +781,15 @@ static inline int ompi_osc_rdma_rget_internal (void *origin_addr, int origin_cou
frag_len = sizeof(ompi_osc_rdma_header_get_t) + ddt_len; frag_len = sizeof(ompi_osc_rdma_header_get_t) + ddt_len;
ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr); ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OMPI_SUCCESS != ret) { if (OMPI_SUCCESS != ret) {
OPAL_THREAD_UNLOCK(&module->lock); /* allocate space for the header plus space to store ddt_len */
return OMPI_ERR_OUT_OF_RESOURCE; frag_len = sizeof(ompi_osc_rdma_header_put_t) + 8;
ret = ompi_osc_rdma_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
is_long_datatype = true;
} }
tag = get_tag (module); tag = get_tag (module);
@ -733,20 +808,43 @@ static inline int ompi_osc_rdma_rget_internal (void *origin_addr, int origin_cou
header->tag = tag; header->tag = tag;
ptr += sizeof(ompi_osc_rdma_header_get_t); ptr += sizeof(ompi_osc_rdma_header_get_t);
ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt); do {
memcpy((unsigned char*) ptr, packed_ddt, ddt_len); ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt);
ptr += ddt_len; if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
if (is_long_datatype) {
/* the datatype does not fit in an eager message. send it seperately */
header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE;
OBJ_RETAIN(target_dt);
ret = ompi_osc_rdma_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target,
tag, module->comm, ompi_osc_rdma_dt_send_complete,
target_dt);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
*((uint64_t *) ptr) = ddt_len;
ptr += 8;
} else {
memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
ptr += ddt_len;
}
/* TODO -- store the request somewhere so we can cancel it on error */
rdma_request->outstanding_requests = 1;
ret = ompi_osc_rdma_irecv_w_cb (origin_addr, origin_count, origin_dt, target, tag,
module->comm, NULL, ompi_osc_rdma_req_comm_complete, rdma_request);
} while (0);
if (OMPI_SUCCESS == ret) { if (OMPI_SUCCESS == ret) {
header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID; header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID;
*request = &rdma_request->super; *request = &rdma_request->super;
} }
/* TODO -- store the request somewhere so we can cancel it on error */
rdma_request->outstanding_requests = 1;
ret = ompi_osc_rdma_irecv_w_cb (origin_addr, origin_count, origin_dt, target, tag,
module->comm, NULL, ompi_osc_rdma_req_comm_complete, rdma_request);
OPAL_THREAD_LOCK(&module->lock); OPAL_THREAD_LOCK(&module->lock);
ret = ompi_osc_rdma_frag_finish(module, frag); ret = ompi_osc_rdma_frag_finish(module, frag);
@ -837,11 +935,12 @@ int ompi_osc_rdma_rget_accumulate_internal (void *origin_addr, int origin_count,
int ret; int ret;
ompi_osc_rdma_module_t *module = GET_MODULE(win); ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target_rank); ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target_rank);
bool is_long_datatype = false;
bool is_long_msg = false;
ompi_osc_rdma_frag_t *frag; ompi_osc_rdma_frag_t *frag;
ompi_osc_rdma_header_get_acc_t *header; ompi_osc_rdma_header_get_acc_t *header;
size_t ddt_len, payload_len, frag_len; size_t ddt_len, payload_len, frag_len;
char *ptr; char *ptr;
bool is_long_msg = false;
const void *packed_ddt; const void *packed_ddt;
int tag; int tag;
ompi_osc_rdma_request_t *rdma_request; ompi_osc_rdma_request_t *rdma_request;
@ -901,9 +1000,17 @@ int ompi_osc_rdma_rget_accumulate_internal (void *origin_addr, int origin_count,
frag_len = sizeof(*header) + ddt_len; frag_len = sizeof(*header) + ddt_len;
ret = ompi_osc_rdma_frag_alloc(module, target_rank, frag_len, &frag, &ptr); ret = ompi_osc_rdma_frag_alloc(module, target_rank, frag_len, &frag, &ptr);
if (OMPI_SUCCESS != ret) { if (OMPI_SUCCESS != ret) {
OPAL_THREAD_UNLOCK(&module->lock); /* allocate space for the header plus space to store ddt_len */
return OMPI_ERR_OUT_OF_RESOURCE; frag_len = sizeof(ompi_osc_rdma_header_put_t) + 8;
ret = ompi_osc_rdma_frag_alloc(module, target_rank, frag_len, &frag, &ptr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
is_long_datatype = true;
} }
is_long_msg = true; is_long_msg = true;
} }
@ -927,32 +1034,53 @@ int ompi_osc_rdma_rget_accumulate_internal (void *origin_addr, int origin_count,
header->tag = tag; header->tag = tag;
ptr = (char *)(header + 1); ptr = (char *)(header + 1);
ret = ompi_datatype_get_pack_description(target_datatype, &packed_ddt); do {
memcpy((unsigned char*) ptr, packed_ddt, ddt_len); ret = ompi_datatype_get_pack_description(target_datatype, &packed_ddt);
ptr += ddt_len; if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
ret = ompi_osc_rdma_irecv_w_cb (result_addr, result_count, result_datatype, target_rank, tag,
module->comm, NULL, ompi_osc_rdma_req_comm_complete, rdma_request);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
goto cleanup;
}
if (!is_long_msg) {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_GET_ACC;
if (&ompi_mpi_op_no_op.op != op) {
osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count,
origin_datatype);
} }
} else {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG;
ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_datatype, target_rank, if (is_long_datatype) {
tag, module->comm, ompi_osc_rdma_req_comm_complete, rdma_request); /* the datatype does not fit in an eager message. send it seperately */
if (OMPI_SUCCESS != ret) goto cleanup; header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE;
}
OBJ_RETAIN(target_datatype);
ret = ompi_osc_rdma_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target_rank,
tag, module->comm, ompi_osc_rdma_dt_send_complete,
target_datatype);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
*((uint64_t *) ptr) = ddt_len;
ptr += 8;
} else {
memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
ptr += ddt_len;
}
ret = ompi_osc_rdma_irecv_w_cb (result_addr, result_count, result_datatype, target_rank, tag,
module->comm, NULL, ompi_osc_rdma_req_comm_complete, rdma_request);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
if (!is_long_msg) {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_GET_ACC;
if (&ompi_mpi_op_no_op.op != op) {
osc_rdma_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count,
origin_datatype);
}
} else {
header->base.type = OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG;
ret = ompi_osc_rdma_isend_w_cb (origin_addr, origin_count, origin_datatype, target_rank,
tag, module->comm, ompi_osc_rdma_req_comm_complete, rdma_request);
}
} while (0);
cleanup:
if (OMPI_SUCCESS == ret) { if (OMPI_SUCCESS == ret) {
header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID; header->base.flags |= OMPI_OSC_RDMA_HDR_FLAG_VALID;
*request = (ompi_request_t *) rdma_request; *request = (ompi_request_t *) rdma_request;

Просмотреть файл

@ -243,6 +243,8 @@ component_init(bool enable_progress_threads,
OBJ_CONSTRUCT(&mca_osc_rdma_component.lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_osc_rdma_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.pending_operations, opal_list_t); OBJ_CONSTRUCT(&mca_osc_rdma_component.pending_operations, opal_list_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.request_gc, opal_list_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.buffer_gc, opal_list_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.modules, OBJ_CONSTRUCT(&mca_osc_rdma_component.modules,
opal_hash_table_t); opal_hash_table_t);
@ -300,6 +302,8 @@ component_finalize(void)
OBJ_DESTRUCT(&mca_osc_rdma_component.lock); OBJ_DESTRUCT(&mca_osc_rdma_component.lock);
OBJ_DESTRUCT(&mca_osc_rdma_component.requests); OBJ_DESTRUCT(&mca_osc_rdma_component.requests);
OBJ_DESTRUCT(&mca_osc_rdma_component.pending_operations); OBJ_DESTRUCT(&mca_osc_rdma_component.pending_operations);
OBJ_DESTRUCT(&mca_osc_rdma_component.request_gc);
OBJ_DESTRUCT(&mca_osc_rdma_component.buffer_gc);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -351,8 +355,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
OBJ_CONSTRUCT(&module->queued_frags, opal_list_t); OBJ_CONSTRUCT(&module->queued_frags, opal_list_t);
OBJ_CONSTRUCT(&module->locks_pending, opal_list_t); OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t); OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t);
OBJ_CONSTRUCT(&module->request_gc, opal_list_t);
OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
OBJ_CONSTRUCT(&module->pending_acc, opal_list_t); OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
/* options */ /* options */

Просмотреть файл

@ -46,7 +46,7 @@
/** /**
* struct osc_rdma_accumulate_data_t: * struct osc_rdma_accumulate_data_t:
* *
* @short Data associated with an in-progress accumulation operation. * @brief Data associated with an in-progress accumulation operation.
*/ */
struct osc_rdma_accumulate_data_t { struct osc_rdma_accumulate_data_t {
opal_list_item_t super; opal_list_item_t super;
@ -93,10 +93,10 @@ OBJ_CLASS_INSTANCE(osc_rdma_accumulate_data_t, opal_list_item_t, osc_rdma_accumu
/** /**
* osc_rdma_pending_acc_t: * osc_rdma_pending_acc_t:
* *
* @short Keep track of accumulate and cswap operations that are * @brief Keep track of accumulate and cswap operations that are
* waiting on the accumulate lock. * waiting on the accumulate lock.
* *
* @long Since accumulate operations may take several steps to * Since accumulate operations may take several steps to
* complete we need to lock the accumulate lock until the operation * complete we need to lock the accumulate lock until the operation
* is complete. While the lock is held it is possible that additional * is complete. While the lock is held it is possible that additional
* accumulate operations will arrive. This structure keep track of * accumulate operations will arrive. This structure keep track of
@ -134,10 +134,49 @@ OBJ_CLASS_INSTANCE(osc_rdma_pending_acc_t, opal_list_item_t,
osc_rdma_pending_acc_constructor, osc_rdma_pending_acc_destructor); osc_rdma_pending_acc_constructor, osc_rdma_pending_acc_destructor);
/* end ompi_osc_rdma_pending_acc_t class */ /* end ompi_osc_rdma_pending_acc_t class */
/**
* @brief Class for large datatype descriptions
*
* This class is used to keep track of buffers for large datatype desctiotions
* (datatypes that do not fit in an eager fragment). The structure is designed
* to take advantage of the small datatype description code path.
*/
struct ompi_osc_rdma_ddt_buffer_t {
/** allows this class to be stored in the buffer garbage collection
* list */
opal_list_item_t super;
/** OSC RDMA module */
ompi_osc_rdma_module_t *module;
/** source of this header */
int source;
/** header + datatype data */
ompi_osc_rdma_header_t *header;
};
typedef struct ompi_osc_rdma_ddt_buffer_t ompi_osc_rdma_ddt_buffer_t;
static void ompi_osc_rdma_ddt_buffer_constructor (ompi_osc_rdma_ddt_buffer_t *ddt_buffer)
{
ddt_buffer->header = NULL;
}
static void ompi_osc_rdma_ddt_buffer_destructor (ompi_osc_rdma_ddt_buffer_t *ddt_buffer)
{
if (ddt_buffer->header) {
free (ddt_buffer->header);
ddt_buffer->header = NULL;
}
}
OBJ_CLASS_DECLARATION(ompi_osc_rdma_ddt_buffer_t);
OBJ_CLASS_INSTANCE(ompi_osc_rdma_ddt_buffer_t, opal_list_item_t, ompi_osc_rdma_ddt_buffer_constructor,
ompi_osc_rdma_ddt_buffer_destructor);
/* end ompi_osc_rdma_ddt_buffer_t class */
/** /**
* datatype_buffer_length: * datatype_buffer_length:
* *
* @short Determine the buffer size needed to hold count elements of datatype. * @brief Determine the buffer size needed to hold count elements of datatype.
* *
* @param[in] datatype - Element type * @param[in] datatype - Element type
* @param[in] count - Element count * @param[in] count - Element count
@ -162,7 +201,7 @@ static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
/** /**
* ompi_osc_rdma_control_send: * ompi_osc_rdma_control_send:
* *
* @short send a control message as part of a fragment * @brief send a control message as part of a fragment
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* @param[in] target - Target peer's rank * @param[in] target - Target peer's rank
@ -171,7 +210,7 @@ static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
* *
* @returns error OMPI error code or OMPI_SUCCESS * @returns error OMPI error code or OMPI_SUCCESS
* *
* @long "send" a control messages. Adds it to the active fragment, so the * "send" a control messages. Adds it to the active fragment, so the
* caller will still need to explicitly flush (either to everyone or * caller will still need to explicitly flush (either to everyone or
* to a target) before this is sent. * to a target) before this is sent.
*/ */
@ -210,10 +249,8 @@ static int ompi_osc_rdma_control_send_unbuffered_cb (ompi_request_t *request)
/* free the temporary buffer */ /* free the temporary buffer */
free (ctx); free (ctx);
/* put this request on the garbage collection list */ /* put this request on the garbage colletion list */
OPAL_THREAD_LOCK(&module->lock); osc_rdma_gc_add_request (request);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -221,14 +258,14 @@ static int ompi_osc_rdma_control_send_unbuffered_cb (ompi_request_t *request)
/** /**
* ompi_osc_rdma_control_send_unbuffered: * ompi_osc_rdma_control_send_unbuffered:
* *
* @short Send an unbuffered control message to a peer. * @brief Send an unbuffered control message to a peer.
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* @param[in] target - Target rank * @param[in] target - Target rank
* @param[in] data - Data to send * @param[in] data - Data to send
* @param[in] len - Length of data * @param[in] len - Length of data
* *
* @long Directly send a control message. This does not allocate a * Directly send a control message. This does not allocate a
* fragment, so should only be used when sending other messages would * fragment, so should only be used when sending other messages would
* be erroneous (such as complete messages, when there may be queued * be erroneous (such as complete messages, when there may be queued
* transactions from an overlapping post that has already heard back * transactions from an overlapping post that has already heard back
@ -265,7 +302,7 @@ int ompi_osc_rdma_control_send_unbuffered(ompi_osc_rdma_module_t *module,
/** /**
* datatype_create: * datatype_create:
* *
* @short Utility function that creates a new datatype from a packed * @brief Utility function that creates a new datatype from a packed
* description. * description.
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
@ -317,7 +354,7 @@ static inline int datatype_create (ompi_osc_rdma_module_t *module, int peer, omp
* @param[in] source - Message source * @param[in] source - Message source
* @param[in] put_header - Message header + data * @param[in] put_header - Message header + data
* *
* @long Process a put message and copy the message data to the specified * Process a put message and copy the message data to the specified
* memory region. Note, this function does not handle any bounds * memory region. Note, this function does not handle any bounds
* checking at the moment. * checking at the moment.
*/ */
@ -391,12 +428,12 @@ static inline int process_put_long(ompi_osc_rdma_module_t* module, int source,
/** /**
* osc_rdma_incoming_req_complete: * osc_rdma_incoming_req_complete:
* *
* @short Completion callback for a receive associate with an access * @brief Completion callback for a receive associate with an access
* epoch. * epoch.
* *
* @param[in] request - PML request with an OSC RMDA module as the callback data. * @param[in] request - PML request with an OSC RMDA module as the callback data.
* *
* @long This function is called when a send or recieve associated with an * This function is called when a send or recieve associated with an
* access epoch completes. When fired this function will increment the * access epoch completes. When fired this function will increment the
* passive or active incoming count. * passive or active incoming count.
*/ */
@ -412,9 +449,7 @@ static int osc_rdma_incoming_req_complete (ompi_request_t *request)
mark_incoming_completion (module, rank); mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */ /* put this request on the garbage colletion list */
OPAL_THREAD_LOCK(&module->lock); osc_rdma_gc_add_request (request);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
@ -437,15 +472,13 @@ static int osc_rdma_get_post_send_cb (ompi_request_t *request)
mark_incoming_completion (module, rank); mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */ /* put this request on the garbage colletion list */
OPAL_THREAD_LOCK(&module->lock); osc_rdma_gc_add_request (request);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/** /**
* @short Post a send to match the remote receive for a get operation. * @brief Post a send to match the remote receive for a get operation.
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* @param[in] source - Source buffer * @param[in] source - Source buffer
@ -454,7 +487,7 @@ static int osc_rdma_get_post_send_cb (ompi_request_t *request)
* @param[in] peer - Remote process that has the receive posted * @param[in] peer - Remote process that has the receive posted
* @param[in] tag - Tag for the send * @param[in] tag - Tag for the send
* *
* @long This function posts a send to match the receive posted as part * This function posts a send to match the receive posted as part
* of a get operation. When this send is complete the get is considered * of a get operation. When this send is complete the get is considered
* complete at the target (this process). * complete at the target (this process).
*/ */
@ -480,7 +513,7 @@ static int osc_rdma_get_post_send (ompi_osc_rdma_module_t *module, void *source,
/** /**
* process_get: * process_get:
* *
* @short Process a get message from a remote peer * @brief Process a get message from a remote peer
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* @param[in] target - Peer process * @param[in] target - Peer process
@ -516,7 +549,7 @@ static inline int process_get (ompi_osc_rdma_module_t* module, int target,
/** /**
* osc_rdma_accumulate_buffer: * osc_rdma_accumulate_buffer:
* *
* @short Accumulate data into the target buffer. * @brief Accumulate data into the target buffer.
* *
* @param[in] target - Target buffer * @param[in] target - Target buffer
* @param[in] source - Source buffer * @param[in] source - Source buffer
@ -575,7 +608,7 @@ static inline int osc_rdma_accumulate_buffer (void *target, void *source, size_t
} }
/** /**
* @short Create an accumulate data object. * @brief Create an accumulate data object.
* *
* @param[in] module - RDMA OSC module * @param[in] module - RDMA OSC module
* @param[in] target - Target for the accumulation * @param[in] target - Target for the accumulation
@ -588,9 +621,9 @@ static inline int osc_rdma_accumulate_buffer (void *target, void *source, size_t
* @param[in] request_count - Number of prerequisite requests * @param[in] request_count - Number of prerequisite requests
* @param[out] acc_data_out - New accumulation data * @param[out] acc_data_out - New accumulation data
* *
* @long This function is used to create a copy of the data needed to perform an accumulation. * This function is used to create a copy of the data needed to perform an accumulation.
* This data should be provided to ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb * This data should be provided to ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb
* as the ctx parameter with accumulate_cb as the cb parameter. * as the ctx parameter with accumulate_cb as the cb parameter.
*/ */
static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, int peer, void *target, void *source, size_t source_len, static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, int peer, void *target, void *source, size_t source_len,
ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op, ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op,
@ -622,7 +655,7 @@ static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, int pee
} }
/** /**
* @short Execute the accumulate once the request counter reaches 0. * @brief Execute the accumulate once the request counter reaches 0.
* *
* @param[in] request - request * @param[in] request - request
* *
@ -658,10 +691,12 @@ static int accumulate_cb (ompi_request_t *request)
/* drop the accumulate lock */ /* drop the accumulate lock */
ompi_osc_rdma_accumulate_unlock (module); ompi_osc_rdma_accumulate_unlock (module);
opal_list_append (&module->buffer_gc, &acc_data->super); osc_rdma_gc_add_buffer (&acc_data->super);
} }
opal_list_append (&module->request_gc, (opal_list_item_t *) request); /* put this request on the garbage colletion list */
osc_rdma_gc_add_request (request);
OPAL_THREAD_UNLOCK(&module->lock); OPAL_THREAD_UNLOCK(&module->lock);
return ret; return ret;
@ -731,7 +766,7 @@ static int replace_cb (ompi_request_t *request)
/** /**
* ompi_osc_rdma_acc_start: * ompi_osc_rdma_acc_start:
* *
* @short Start an accumulate with data operation. * @brief Start an accumulate with data operation.
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* @param[in] source - Source rank * @param[in] source - Source rank
@ -768,7 +803,7 @@ static int ompi_osc_rdma_acc_start (ompi_osc_rdma_module_t *module, int source,
/** /**
* ompi_osc_rdma_acc_start: * ompi_osc_rdma_acc_start:
* *
* @short Start a long accumulate operation. * @brief Start a long accumulate operation.
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* @param[in] source - Source rank * @param[in] source - Source rank
@ -838,7 +873,7 @@ static int ompi_osc_rdma_acc_long_start (ompi_osc_rdma_module_t *module, int sou
/** /**
* ompi_osc_rdma_gacc_start: * ompi_osc_rdma_gacc_start:
* *
* @short Start a accumulate with data + get operation. * @brief Start a accumulate with data + get operation.
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* @param[in] source - Source rank * @param[in] source - Source rank
@ -889,7 +924,7 @@ static int ompi_osc_rdma_gacc_start (ompi_osc_rdma_module_t *module, int source,
/** /**
* ompi_osc_rdma_gacc_long_start: * ompi_osc_rdma_gacc_long_start:
* *
* @short Start a long accumulate + get operation. * @brief Start a long accumulate + get operation.
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* @param[in] source - Source rank * @param[in] source - Source rank
@ -960,7 +995,7 @@ static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source,
/** /**
* ompi_osc_rdma_cswap_start: * ompi_osc_rdma_cswap_start:
* *
* @short Start a compare and swap operation * @brief Start a compare and swap operation
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* @param[in] source - Source rank * @param[in] source - Source rank
@ -1015,12 +1050,12 @@ static int ompi_osc_rdma_cswap_start (ompi_osc_rdma_module_t *module, int source
/** /**
* ompi_osc_rdma_progress_pending_acc: * ompi_osc_rdma_progress_pending_acc:
* *
* @short Progress one pending accumulation or compare and swap operation. * @brief Progress one pending accumulation or compare and swap operation.
* *
* @param[in] module - OSC RDMA module * @param[in] module - OSC RDMA module
* *
* @long If the accumulation lock can be aquired progress one pending * If the accumulation lock can be aquired progress one pending
* accumulate or compare and swap operation. * accumulate or compare and swap operation.
*/ */
int ompi_osc_rdma_progress_pending_acc (ompi_osc_rdma_module_t *module) int ompi_osc_rdma_progress_pending_acc (ompi_osc_rdma_module_t *module)
{ {
@ -1342,6 +1377,117 @@ static inline int process_unlock (ompi_osc_rdma_module_t *module, int source,
return sizeof (*unlock_header); return sizeof (*unlock_header);
} }
static int process_large_datatype_request_cb (ompi_request_t *request)
{
ompi_osc_rdma_ddt_buffer_t *ddt_buffer = (ompi_osc_rdma_ddt_buffer_t *) request->req_complete_cb_data;
ompi_osc_rdma_module_t *module = ddt_buffer->module;
ompi_osc_rdma_header_t *header = ddt_buffer->header;
int source = ddt_buffer->source;
/* process the request */
switch (header->base.type) {
case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG:
(void) process_put_long (module, source, &header->put);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET:
(void) process_get (module, source, &header->get);
break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
(void) process_acc_long (module, source, &header->acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
(void) process_get_acc_long (module, source, &header->get_acc);
break;
default:
/* developer error */
assert (0);
return OMPI_ERROR;
}
/* put this request on the garbage colletion list */
osc_rdma_gc_add_request (request);
/* free the datatype buffer */
osc_rdma_gc_add_buffer (&ddt_buffer->super);
return OMPI_SUCCESS;
}
/**
* @short process a request with a large datatype
*
* @param[in] module - OSC RDMA module
* @param[in] source - header source
* @param[in] header - header to process
*
* It is possible to construct datatypes whos description is too large
* to fit in an OSC RDMA fragment. In this case the remote side posts
* a send of the datatype description. This function posts the matching
* receive and processes the header on completion.
*/
static int process_large_datatype_request (ompi_osc_rdma_module_t *module, int source, ompi_osc_rdma_header_t *header)
{
ompi_osc_rdma_ddt_buffer_t *ddt_buffer;
int header_len, tag, ret;
uint64_t ddt_len;
/* determine the header size and receive tag */
switch (header->base.type) {
case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG:
header_len = sizeof (header->put);
tag = header->put.tag;
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET:
header_len = sizeof (header->get);
tag = header->get.tag;
break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
header_len = sizeof (header->acc);
tag = header->acc.tag;
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
header_len = sizeof (header->get_acc);
tag = header->get_acc.tag;
break;
default:
/* developer error */
opal_output (0, "Unsupported header/flag combination");
return OMPI_ERROR;
}
ddt_len = *((uint64_t *)((uintptr_t) header + header_len));
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"process_large_datatype_request: processing fragment with type %d. ddt_len %lu",
header->base.type, (unsigned long) ddt_len));
ddt_buffer = OBJ_NEW(ompi_osc_rdma_ddt_buffer_t);
if (OPAL_UNLIKELY(NULL == ddt_buffer)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
ddt_buffer->module = module;
ddt_buffer->source = source;
ddt_buffer->header = malloc (ddt_len + header_len);
if (OPAL_UNLIKELY(NULL == ddt_buffer->header)) {
OBJ_RELEASE(ddt_buffer);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy (ddt_buffer->header, header, header_len);
ret = ompi_osc_rdma_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len),
ddt_len, MPI_BYTE, source, tag, module->comm, NULL,
process_large_datatype_request_cb, ddt_buffer);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(ddt_buffer);
return ret;
}
return header_len + 8;
}
/* /*
* Do all the data movement associated with a fragment * Do all the data movement associated with a fragment
*/ */
@ -1359,62 +1505,68 @@ static inline int process_frag (ompi_osc_rdma_module_t *module,
for (int i = 0 ; i < frag->num_ops ; ++i) { for (int i = 0 ; i < frag->num_ops ; ++i) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: process_frag: type 0x%x. offset = %u", header->base.type, "osc rdma: process_frag: type 0x%x. flag 0x%x. offset %u",
(unsigned) ((uintptr_t)header - (uintptr_t)frag))); header->base.type, (unsigned) ((uintptr_t)header - (uintptr_t)frag),
header->base.flags));
switch (header->base.type) { if (OPAL_LIKELY(!(header->base.flags & OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE))) {
case OMPI_OSC_RDMA_HDR_TYPE_PUT: switch (header->base.type) {
ret = process_put(module, frag->source, &header->put); case OMPI_OSC_RDMA_HDR_TYPE_PUT:
break; ret = process_put(module, frag->source, &header->put);
case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG: break;
ret = process_put_long(module, frag->source, &header->put); case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG:
break; ret = process_put_long(module, frag->source, &header->put);
break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC: case OMPI_OSC_RDMA_HDR_TYPE_ACC:
ret = process_acc(module, frag->source, &header->acc); ret = process_acc(module, frag->source, &header->acc);
break; break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG: case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
ret = process_acc_long (module, frag->source, &header->acc); ret = process_acc_long (module, frag->source, &header->acc);
break; break;
case OMPI_OSC_RDMA_HDR_TYPE_LOCK_REQ: case OMPI_OSC_RDMA_HDR_TYPE_LOCK_REQ:
ret = ompi_osc_rdma_process_lock(module, frag->source, &header->lock); ret = ompi_osc_rdma_process_lock(module, frag->source, &header->lock);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
ret = sizeof (header->lock); ret = sizeof (header->lock);
}
break;
case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_REQ:
ret = process_unlock(module, frag->source, &header->unlock);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET:
ret = process_get (module, frag->source, &header->get);
break;
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
ret = process_cswap (module, frag->source, &header->cswap);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
ret = process_get_acc (module, frag->source, &header->get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
ret = process_get_acc_long (module, frag->source, &header->get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_REQ:
ret = process_flush (module, frag->source, &header->flush);
break;
case OMPI_OSC_RDMA_HDR_TYPE_COMPLETE:
ret = process_complete (module, frag->source, &header->complete);
break;
default:
opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type);
abort(); /* FIX ME */
} }
break; } else {
case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_REQ: ret = process_large_datatype_request (module, frag->source, header);
ret = process_unlock(module, frag->source, &header->unlock);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET:
ret = process_get (module, frag->source, &header->get);
break;
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
ret = process_cswap (module, frag->source, &header->cswap);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
ret = process_get_acc (module, frag->source, &header->get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
ret = process_get_acc_long (module, frag->source, &header->get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_REQ:
ret = process_flush (module, frag->source, &header->flush);
break;
case OMPI_OSC_RDMA_HDR_TYPE_COMPLETE:
ret = process_complete (module, frag->source, &header->complete);
break;
default:
opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type);
abort(); /* FIX ME */
} }
if (ret <= 0) { if (ret <= 0) {
opal_output(0, "Error processing fragment: %d", ret); opal_output(0, "Error processing fragment: %d", ret);
abort(); /* FIX ME */ abort(); /* FIX ME */
@ -1428,7 +1580,6 @@ static inline int process_frag (ompi_osc_rdma_module_t *module,
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/* dispatch for callback on message completion */ /* dispatch for callback on message completion */
static int ompi_osc_rdma_callback (ompi_request_t *request) static int ompi_osc_rdma_callback (ompi_request_t *request)
{ {
@ -1477,9 +1628,10 @@ static int ompi_osc_rdma_callback (ompi_request_t *request)
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ? mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ?
source : MPI_PROC_NULL); source : MPI_PROC_NULL);
osc_rdma_gc_clean (module); osc_rdma_gc_clean ();
opal_list_append (&module->request_gc, (opal_list_item_t *) request); /* put this request on the garbage colletion list */
osc_rdma_gc_add_request (request);
ompi_osc_rdma_frag_start_receive (module); ompi_osc_rdma_frag_start_receive (module);
OPAL_THREAD_UNLOCK(&module->lock); OPAL_THREAD_UNLOCK(&module->lock);
@ -1520,9 +1672,7 @@ isend_completion_cb(ompi_request_t *request)
mark_outgoing_completion(module); mark_outgoing_completion(module);
/* put this request on the garbage colletion list */ /* put this request on the garbage colletion list */
OPAL_THREAD_LOCK(&module->lock); osc_rdma_gc_add_request (request);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -47,8 +47,9 @@ static int frag_send_cb (ompi_request_t *request)
mark_outgoing_completion(module); mark_outgoing_completion(module);
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.frags, &frag->super); OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.frags, &frag->super);
/* put this request on the garbage colletion list */ /* put this request on the garbage colletion list */
opal_list_append (&module->request_gc, (opal_list_item_t *) request); osc_rdma_gc_add_request (request);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -54,6 +54,7 @@ typedef enum ompi_osc_rdma_hdr_type_t ompi_osc_rdma_hdr_type_t;
#define OMPI_OSC_RDMA_HDR_FLAG_NBO 0x01 #define OMPI_OSC_RDMA_HDR_FLAG_NBO 0x01
#define OMPI_OSC_RDMA_HDR_FLAG_VALID 0x02 #define OMPI_OSC_RDMA_HDR_FLAG_VALID 0x02
#define OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET 0x04 #define OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET 0x04
#define OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE 0x08
struct ompi_osc_rdma_header_base_t { struct ompi_osc_rdma_header_base_t {
/** fragment type. 8 bits */ /** fragment type. 8 bits */