1
1

add ability to buffer put/accumulate messages during an epoch

This commit was SVN r15295.
Этот коммит содержится в:
Brian Barrett 2007-07-05 21:40:06 +00:00
родитель 5bbee1482e
Коммит 25e52238ab
7 изменённых файлов: 670 добавлений и 474 удалений

Просмотреть файл

@ -31,6 +31,13 @@
BEGIN_C_DECLS
struct ompi_osc_rdma_buffer_t {
mca_btl_base_descriptor_t* descriptor;
size_t remain_len;
mca_bml_base_btl_t *bml_btl;
};
typedef struct ompi_osc_rdma_buffer_t ompi_osc_rdma_buffer_t;
struct ompi_osc_rdma_component_t {
/** Extend the basic osc component interface */
ompi_osc_base_component_t super;
@ -182,6 +189,10 @@ struct ompi_osc_rdma_module_t {
ompi_osc_rdma_peer_info_t *m_peer_info;
int32_t m_rdma_num_pending;
/*** buffering ***/
bool m_use_buffers;
ompi_osc_rdma_buffer_t *m_pending_buffers;
/* ********************* FENCE data ************************ */
/* an array of <sizeof(m_comm)> ints, each containing the value
1. */

Просмотреть файл

@ -144,6 +144,13 @@ component_open(void)
"Info key of same name overrides this value.",
false, false, 1, NULL);
mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version,
"use_buffers",
"Coalesce messages during an epoch to reduce "
"network utilization. Info key of same name "
"overrides this value.",
false, false, 0, NULL);
mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version,
"use_rdma",
"Use real RDMA operations to transfer data. "
@ -355,6 +362,13 @@ ompi_osc_rdma_component_select(ompi_win_t *win,
module->m_setup_info = NULL;
module->m_peer_info = NULL;
/* buffer setup */
module->m_use_buffers = check_config_value_bool("use_buffers", info);
module->m_pending_buffers = malloc(sizeof(ompi_osc_rdma_buffer_t) *
ompi_comm_size(module->m_comm));
memset(module->m_pending_buffers, 0,
sizeof(ompi_osc_rdma_buffer_t) * ompi_comm_size(module->m_comm));
/* fence data */
module->m_fence_coll_counts = (int*)
malloc(sizeof(int) * ompi_comm_size(module->m_comm));
@ -484,410 +498,419 @@ ompi_osc_rdma_component_select(ompi_win_t *win,
/* dispatch for callback on message completion */
static void
component_fragment_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata)
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata)
{
int ret;
ompi_osc_rdma_module_t *module;
void *payload;
uint8_t hdr_type;
bool done = false;
ompi_osc_rdma_base_header_t *base_header =
(ompi_osc_rdma_base_header_t*) descriptor->des_dst[0].seg_addr.pval;
assert(descriptor->des_dst[0].seg_len >=
sizeof(ompi_osc_rdma_base_header_t));
hdr_type = ((ompi_osc_rdma_base_header_t*)
descriptor->des_dst[0].seg_addr.pval)->hdr_type;
/* handle message */
switch (hdr_type) {
case OMPI_OSC_RDMA_HDR_PUT:
{
ompi_osc_rdma_send_header_t *header;
while (!done) {
switch (base_header->hdr_type) {
case OMPI_OSC_RDMA_HDR_PUT:
{
ompi_osc_rdma_send_header_t *header;
/* get our header and payload */
header = (ompi_osc_rdma_send_header_t*)
descriptor->des_dst[0].seg_addr.pval;
payload = (void*) (header + 1);
/* get our header and payload */
header = (ompi_osc_rdma_send_header_t*) base_header;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
if (!ompi_win_exposure_epoch(module->m_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->m_win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(module->m_win,
OMPI_WIN_FENCE |
OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
}
ret = ompi_osc_rdma_sendreq_recv_put(module, header, payload);
}
break;
case OMPI_OSC_RDMA_HDR_ACC:
{
ompi_osc_rdma_send_header_t *header;
/* get our header and payload */
header = (ompi_osc_rdma_send_header_t*)
descriptor->des_dst[0].seg_addr.pval;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
if (!ompi_win_exposure_epoch(module->m_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->m_win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(module->m_win,
OMPI_WIN_FENCE |
OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
}
/* receive into temporary buffer */
ret = ompi_osc_rdma_sendreq_recv_accum(module, header, payload);
}
break;
case OMPI_OSC_RDMA_HDR_GET:
{
ompi_datatype_t *datatype;
ompi_osc_rdma_send_header_t *header;
ompi_osc_rdma_replyreq_t *replyreq;
ompi_proc_t *proc;
/* get our header and payload */
header = (ompi_osc_rdma_send_header_t*)
descriptor->des_dst[0].seg_addr.pval;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
if (!ompi_win_exposure_epoch(module->m_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->m_win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(module->m_win,
OMPI_WIN_FENCE |
OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
}
/* create or get a pointer to our datatype */
proc = ompi_comm_peer_lookup( module->m_comm, header->hdr_origin );
datatype = ompi_osc_rdma_datatype_create(proc, &payload);
if (NULL == datatype) {
opal_output(ompi_osc_base_output,
"Error recreating datatype. Aborting.");
ompi_mpi_abort(module->m_comm, 1, false);
}
/* create replyreq sendreq */
ret = ompi_osc_rdma_replyreq_alloc_init(module,
header->hdr_origin,
header->hdr_origin_sendreq,
header->hdr_target_disp,
header->hdr_target_count,
datatype,
&replyreq);
/* send replyreq */
ompi_osc_rdma_replyreq_send(module, replyreq);
/* sendreq does the right retain, so we can release safely */
OBJ_RELEASE(datatype);
}
break;
case OMPI_OSC_RDMA_HDR_REPLY:
{
ompi_osc_rdma_reply_header_t *header;
ompi_osc_rdma_sendreq_t *sendreq;
/* get our header and payload */
header = (ompi_osc_rdma_reply_header_t*)
descriptor->des_dst[0].seg_addr.pval;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_REPLY_HDR_NTOH(*header);
}
#endif
/* get original sendreq pointer */
sendreq = (ompi_osc_rdma_sendreq_t*) header->hdr_origin_sendreq.pval;
module = sendreq->req_module;
/* receive data */
ompi_osc_rdma_replyreq_recv(module, sendreq, header, payload);
}
break;
case OMPI_OSC_RDMA_HDR_POST:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
int32_t count;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_post_msgs -= 1);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count == 0) {
module->m_eager_send_active = module->m_eager_send_ok;
while (module->m_eager_send_active &&
opal_list_get_size(&module->m_pending_sendreqs)) {
ompi_osc_rdma_sendreq_t *sendreq;
OPAL_THREAD_LOCK(&module->m_lock);
sendreq = (ompi_osc_rdma_sendreq_t*)
opal_list_remove_first(&module->m_pending_sendreqs);
if (NULL == sendreq) {
OPAL_THREAD_UNLOCK(&module->m_lock);
break;
}
sendreq->req_module->m_num_pending_out += 1;
OPAL_THREAD_UNLOCK(&module->m_lock);
ret = ompi_osc_rdma_sendreq_send(module, sendreq);
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out -= 1;
opal_list_append(&(module->m_pending_sendreqs),
(opal_list_item_t*) sendreq);
OPAL_THREAD_UNLOCK(&module->m_lock);
break;
if (!ompi_win_exposure_epoch(module->m_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->m_win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(module->m_win,
OMPI_WIN_FENCE |
OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
}
opal_condition_broadcast(&module->m_cond);
ret = ompi_osc_rdma_sendreq_recv_put(module, header, &payload);
}
}
break;
case OMPI_OSC_RDMA_HDR_COMPLETE:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
int32_t count;
break;
case OMPI_OSC_RDMA_HDR_ACC:
{
ompi_osc_rdma_send_header_t *header;
/* get our header and payload */
header = (ompi_osc_rdma_send_header_t*) base_header;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* we've heard from one more place, and have value reqs to
process */
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_complete_msgs -= 1);
count += (module->m_num_pending_in += header->hdr_value[0]);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (!ompi_win_exposure_epoch(module->m_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->m_win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(module->m_win,
OMPI_WIN_FENCE |
OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
}
if (count == 0) opal_condition_broadcast(&module->m_cond);
}
break;
/* receive into temporary buffer */
ret = ompi_osc_rdma_sendreq_recv_accum(module, header, &payload);
}
break;
case OMPI_OSC_RDMA_HDR_LOCK_REQ:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
int32_t count;
case OMPI_OSC_RDMA_HDR_GET:
{
ompi_datatype_t *datatype;
ompi_osc_rdma_send_header_t *header;
ompi_osc_rdma_replyreq_t *replyreq;
ompi_proc_t *proc;
/* get our header and payload */
header = (ompi_osc_rdma_send_header_t*) base_header;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
if (!ompi_win_exposure_epoch(module->m_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->m_win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(module->m_win,
OMPI_WIN_FENCE |
OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
}
/* create or get a pointer to our datatype */
proc = ompi_comm_peer_lookup( module->m_comm, header->hdr_origin );
datatype = ompi_osc_rdma_datatype_create(proc, &payload);
if (NULL == datatype) {
opal_output(ompi_osc_base_output,
"Error recreating datatype. Aborting.");
ompi_mpi_abort(module->m_comm, 1, false);
}
/* create replyreq sendreq */
ret = ompi_osc_rdma_replyreq_alloc_init(module,
header->hdr_origin,
header->hdr_origin_sendreq,
header->hdr_target_disp,
header->hdr_target_count,
datatype,
&replyreq);
/* send replyreq */
ompi_osc_rdma_replyreq_send(module, replyreq);
/* sendreq does the right retain, so we can release safely */
OBJ_RELEASE(datatype);
}
break;
case OMPI_OSC_RDMA_HDR_REPLY:
{
ompi_osc_rdma_reply_header_t *header;
ompi_osc_rdma_sendreq_t *sendreq;
/* get our header and payload */
header = (ompi_osc_rdma_reply_header_t*) base_header;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_REPLY_HDR_NTOH(*header);
}
#endif
/* get original sendreq pointer */
sendreq = (ompi_osc_rdma_sendreq_t*) header->hdr_origin_sendreq.pval;
module = sendreq->req_module;
/* receive data */
ompi_osc_rdma_replyreq_recv(module, sendreq, header, &payload);
}
break;
case OMPI_OSC_RDMA_HDR_POST:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*) base_header;
int32_t count;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
if (header->hdr_value[1] > 0) {
ompi_osc_rdma_passive_lock(module, header->hdr_value[0],
header->hdr_value[1]);
} else {
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_lock_received_ack += 1);
count = (module->m_num_post_msgs -= 1);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count == 0) {
module->m_eager_send_active = module->m_eager_send_ok;
while (module->m_eager_send_active &&
opal_list_get_size(&module->m_pending_sendreqs)) {
ompi_osc_rdma_sendreq_t *sendreq;
OPAL_THREAD_LOCK(&module->m_lock);
sendreq = (ompi_osc_rdma_sendreq_t*)
opal_list_remove_first(&module->m_pending_sendreqs);
if (NULL == sendreq) {
OPAL_THREAD_UNLOCK(&module->m_lock);
break;
}
sendreq->req_module->m_num_pending_out += 1;
OPAL_THREAD_UNLOCK(&module->m_lock);
ret = ompi_osc_rdma_sendreq_send(module, sendreq);
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out -= 1;
opal_list_append(&(module->m_pending_sendreqs),
(opal_list_item_t*) sendreq);
OPAL_THREAD_UNLOCK(&module->m_lock);
break;
}
}
opal_condition_broadcast(&module->m_cond);
}
}
break;
case OMPI_OSC_RDMA_HDR_COMPLETE:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*) base_header;
int32_t count;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* we've heard from one more place, and have value reqs to
process */
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_complete_msgs -= 1);
count += (module->m_num_pending_in += header->hdr_value[0]);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count != 0) opal_condition_broadcast(&module->m_cond);
if (count == 0) opal_condition_broadcast(&module->m_cond);
}
}
break;
break;
case OMPI_OSC_RDMA_HDR_UNLOCK_REQ:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
case OMPI_OSC_RDMA_HDR_LOCK_REQ:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*) base_header;
int32_t count;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
ompi_osc_rdma_passive_unlock(module, header->hdr_value[0],
header->hdr_value[1]);
}
break;
if (header->hdr_value[1] > 0) {
ompi_osc_rdma_passive_lock(module, header->hdr_value[0],
header->hdr_value[1]);
} else {
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_lock_received_ack += 1);
OPAL_THREAD_UNLOCK(&module->m_lock);
case OMPI_OSC_RDMA_HDR_UNLOCK_REPLY:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
int32_t count;
if (count != 0) opal_condition_broadcast(&module->m_cond);
}
}
break;
case OMPI_OSC_RDMA_HDR_UNLOCK_REQ:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*) base_header;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count == 0) opal_condition_broadcast(&module->m_cond);
}
break;
ompi_osc_rdma_passive_unlock(module, header->hdr_value[0],
header->hdr_value[1]);
}
break;
case OMPI_OSC_RDMA_HDR_RDMA_COMPLETE:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
int32_t count;
case OMPI_OSC_RDMA_HDR_UNLOCK_REPLY:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*) base_header;
int32_t count;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_pending_in -= header->hdr_value[0]);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count == 0) opal_condition_broadcast(&module->m_cond);
}
break;
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count == 0) opal_condition_broadcast(&module->m_cond);
}
break;
case OMPI_OSC_RDMA_HDR_RDMA_INFO:
{
ompi_osc_rdma_rdma_info_header_t *header =
(ompi_osc_rdma_rdma_info_header_t*)
descriptor->des_dst[0].seg_addr.pval;
ompi_proc_t *proc = NULL;
mca_bml_base_endpoint_t *endpoint = NULL;
mca_bml_base_btl_t *bml_btl;
ompi_osc_rdma_btl_t *rdma_btl;
int origin, index;
case OMPI_OSC_RDMA_HDR_RDMA_COMPLETE:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*) base_header;
int32_t count;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_RDMA_INFO_HDR_NTOH(*header);
}
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
origin = header->hdr_origin;
/* find the bml_btl */
proc = ompi_comm_peer_lookup(module->m_comm, origin);
endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml;
bml_btl = mca_bml_base_btl_array_find(&endpoint->btl_rdma, btl);
if (NULL == bml_btl) {
opal_output(ompi_osc_base_output,
"received rdma info for unknown btl from rank %d",
origin);
return;
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_pending_in -= header->hdr_value[0]);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count == 0) opal_condition_broadcast(&module->m_cond);
}
break;
OPAL_THREAD_LOCK(&module->m_lock);
index = module->m_peer_info[origin].peer_num_btls++;
rdma_btl = &(module->m_peer_info[origin].peer_btls[index]);
case OMPI_OSC_RDMA_HDR_RDMA_INFO:
{
ompi_osc_rdma_rdma_info_header_t *header =
(ompi_osc_rdma_rdma_info_header_t*) base_header;
ompi_proc_t *proc = NULL;
mca_bml_base_endpoint_t *endpoint = NULL;
mca_bml_base_btl_t *bml_btl;
ompi_osc_rdma_btl_t *rdma_btl;
int origin, index;
payload = (void*) (header + 1);
rdma_btl->peer_seg_key = header->hdr_segkey;
rdma_btl->bml_btl = bml_btl;
rdma_btl->rdma_order = MCA_BTL_NO_ORDER;
rdma_btl->num_sent = 0;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_RDMA_INFO_HDR_NTOH(*header);
}
#endif
module->m_setup_info->num_btls_callin++;
OPAL_THREAD_UNLOCK(&module->m_lock);
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
origin = header->hdr_origin;
/* find the bml_btl */
proc = ompi_comm_peer_lookup(module->m_comm, origin);
endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml;
bml_btl = mca_bml_base_btl_array_find(&endpoint->btl_rdma, btl);
if (NULL == bml_btl) {
opal_output(ompi_osc_base_output,
"received rdma info for unknown btl from rank %d",
origin);
return;
}
OPAL_THREAD_LOCK(&module->m_lock);
index = module->m_peer_info[origin].peer_num_btls++;
rdma_btl = &(module->m_peer_info[origin].peer_btls[index]);
rdma_btl->peer_seg_key = header->hdr_segkey;
rdma_btl->bml_btl = bml_btl;
rdma_btl->rdma_order = MCA_BTL_NO_ORDER;
rdma_btl->num_sent = 0;
module->m_setup_info->num_btls_callin++;
OPAL_THREAD_UNLOCK(&module->m_lock);
opal_condition_broadcast(&module->m_cond);
}
break;
opal_condition_broadcast(&module->m_cond);
}
break;
default:
/* BWB - FIX ME - this sucks */
opal_output(ompi_osc_base_output,
"received packet for Window with unknown type");
}
case OMPI_OSC_RDMA_HDR_MULTI_END:
payload = base_header;
done = true;
break;
default:
/* BWB - FIX ME - this sucks */
opal_output(ompi_osc_base_output,
"received packet for Window with unknown type");
}
if ((base_header->hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_MULTI) != 0) {
base_header = (ompi_osc_rdma_base_header_t*) payload;
} else {
done = true;
}
}
}
int

Просмотреть файл

@ -75,7 +75,55 @@ inmsg_mark_complete(ompi_osc_rdma_module_t *module)
}
}
/**********************************************************************
*
* Multi-buffer support
*
**********************************************************************/
static int
send_multi_buffer(ompi_osc_rdma_module_t *module, int rank)
{
ompi_osc_rdma_base_header_t *header = (ompi_osc_rdma_base_header_t*)
((char*) module->m_pending_buffers[rank].descriptor->des_src[0].seg_addr.pval +
module->m_pending_buffers[rank].descriptor->des_src[0].seg_len);
header->hdr_type = OMPI_OSC_RDMA_HDR_MULTI_END;
header->hdr_flags = 0;
module->m_pending_buffers[rank].descriptor->des_src[0].seg_len +=
sizeof(ompi_osc_rdma_base_header_t);
mca_bml_base_send(module->m_pending_buffers[rank].bml_btl,
module->m_pending_buffers[rank].descriptor,
MCA_BTL_TAG_OSC_RDMA);
module->m_pending_buffers[rank].descriptor = NULL;
module->m_pending_buffers[rank].bml_btl = NULL;
module->m_pending_buffers[rank].remain_len = 0;
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_flush(ompi_osc_rdma_module_t *module)
{
int i;
for (i = 0 ; i < ompi_comm_size(module->m_comm) ; ++i) {
if (module->m_pending_buffers[i].descriptor != NULL) {
send_multi_buffer(module, i);
}
}
return OMPI_SUCCESS;
}
/**********************************************************************
*
* RDMA data transfers (put / get)
*
**********************************************************************/
static void
rdma_cb(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* endpoint,
@ -240,12 +288,12 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
ompi_osc_rdma_sendreq_t *sendreq =
(ompi_osc_rdma_sendreq_t*) descriptor->des_cbdata;
ompi_osc_rdma_send_header_t *header =
(ompi_osc_rdma_send_header_t*) descriptor->des_src[0].seg_addr.pval;
ompi_osc_rdma_module_t *module = sendreq->req_module;
ompi_osc_rdma_sendreq_t *sendreq = NULL;
ompi_osc_rdma_module_t *module = NULL;
int32_t count;
bool done = false;
if (OMPI_SUCCESS != status) {
/* requeue and return */
@ -254,53 +302,80 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
return;
}
/* have to look at header, and not the sendreq because in the case
of get, it's possible that the sendreq has been freed already
(if the remote side replies before we get our send completion
callback) and already allocated to another request. We don't
wait for this completion before exiting a synchronization point
in the case of get, as we really don't care when it completes -
only when the data arrives. */
if (OMPI_OSC_RDMA_HDR_GET != header->hdr_base.hdr_type) {
if (header->hdr_base.hdr_type == OMPI_OSC_RDMA_HDR_MULTI_END) {
done = true;
}
while (!done) {
sendreq = (ompi_osc_rdma_sendreq_t*) header->hdr_origin_sendreq.pval;
module = sendreq->req_module;
/* have to look at header, and not the sendreq because in the
case of get, it's possible that the sendreq has been freed
already (if the remote side replies before we get our send
completion callback) and already allocated to another
request. We don't wait for this completion before exiting
a synchronization point in the case of get, as we really
don't care when it completes - only when the data
arrives. */
if (OMPI_OSC_RDMA_HDR_GET != header->hdr_base.hdr_type) {
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
/* do we need to post a send? */
if (header->hdr_msg_length != 0) {
/* sendreq is done. Mark it as so and get out of here */
OPAL_THREAD_LOCK(&sendreq->req_module->m_lock);
count = sendreq->req_module->m_num_pending_out -= 1;
OPAL_THREAD_UNLOCK(&sendreq->req_module->m_lock);
ompi_osc_rdma_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->m_cond);
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->cbfunc = ompi_osc_rdma_sendreq_send_long_cb;
longreq->cbdata = sendreq;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d starting long sendreq to %d (%d)",
sendreq->req_module->m_comm->c_my_rank,
sendreq->req_target_rank,
header->hdr_origin_tag));
/* do we need to post a send? */
if (header->hdr_msg_length != 0) {
/* sendreq is done. Mark it as so and get out of here */
OPAL_THREAD_LOCK(&sendreq->req_module->m_lock);
count = sendreq->req_module->m_num_pending_out -= 1;
OPAL_THREAD_UNLOCK(&sendreq->req_module->m_lock);
ompi_osc_rdma_sendreq_free(sendreq);
if (0 == count) {
opal_condition_broadcast(&sendreq->req_module->m_cond);
}
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->cbfunc = ompi_osc_rdma_sendreq_send_long_cb;
longreq->cbdata = sendreq;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d starting long sendreq to %d (%d)",
sendreq->req_module->m_comm->c_my_rank,
sendreq->req_target_rank,
header->hdr_origin_tag));
mca_pml.pml_isend(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_origin_tag,
MCA_PML_BASE_SEND_STANDARD,
sendreq->req_module->m_comm,
&(longreq->request));
mca_pml.pml_isend(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_origin_tag,
MCA_PML_BASE_SEND_STANDARD,
sendreq->req_module->m_comm,
&(longreq->request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
}
} else {
ompi_osc_rdma_sendreq_free(sendreq);
}
if (0 == (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_MULTI)) {
done = true;
} else {
header = (ompi_osc_rdma_send_header_t*)
(((char*) header) +
sizeof(ompi_osc_rdma_send_header_t) +
ompi_ddt_pack_description_length(sendreq->req_target_datatype) +
header->hdr_msg_length);
if (header->hdr_base.hdr_type == OMPI_OSC_RDMA_HDR_MULTI_END) {
done = true;
}
}
}
@ -341,7 +416,7 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
size_t written_data = 0;
size_t needed_len = sizeof(ompi_osc_rdma_send_header_t);
const void *packed_ddt;
size_t packed_ddt_len;
size_t packed_ddt_len, remain;
if ((module->m_eager_send_active) &&
(module->m_use_rdma) &&
@ -353,39 +428,60 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) return ret;
}
packed_ddt_len = ompi_ddt_pack_description_length(sendreq->req_target_datatype);
/* we always need to send the ddt */
packed_ddt_len = ompi_ddt_pack_description_length(sendreq->req_target_datatype);
needed_len += packed_ddt_len;
if (OMPI_OSC_RDMA_GET != sendreq->req_type) {
needed_len += sendreq->req_origin_bytes_packed;
}
/* Get a BTL so we have the eager limit */
endpoint = (mca_bml_base_endpoint_t*) sendreq->req_target_proc->proc_bml;
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
descriptor = bml_btl->btl_alloc(bml_btl->btl,
MCA_BTL_NO_ORDER,
needed_len < bml_btl->btl_eager_limit ? needed_len :
bml_btl->btl_eager_limit);
if (NULL == descriptor) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
/* see if we already have a buffer */
if ((module->m_pending_buffers[sendreq->req_target_rank].remain_len >=
sizeof(ompi_osc_rdma_send_header_t) + sendreq->req_origin_bytes_packed) ||
(0 < module->m_pending_buffers[sendreq->req_target_rank].remain_len &&
sendreq->req_origin_bytes_packed > 2048)) {
bml_btl = module->m_pending_buffers[sendreq->req_target_rank].bml_btl;
descriptor = module->m_pending_buffers[sendreq->req_target_rank].descriptor;
remain = module->m_pending_buffers[sendreq->req_target_rank].remain_len;
} else {
/* send the existing buffer */
if (module->m_pending_buffers[sendreq->req_target_rank].descriptor) {
send_multi_buffer(module, sendreq->req_target_rank);
}
assert(OMPI_SUCCESS == ret);
/* verify at least enough space for header */
if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_rdma_send_header_t)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* get a buffer... */
endpoint = (mca_bml_base_endpoint_t*) sendreq->req_target_proc->proc_bml;
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
descriptor = bml_btl->btl_alloc(bml_btl->btl,
MCA_BTL_NO_ORDER,
module->m_use_buffers ? bml_btl->btl_eager_limit : needed_len < bml_btl->btl_eager_limit ? needed_len :
bml_btl->btl_eager_limit);
if (NULL == descriptor) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
/* setup descriptor */
descriptor->des_cbfunc = ompi_osc_rdma_sendreq_send_cb;
descriptor->des_cbdata = (void*) sendreq;
descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY;
/* verify at least enough space for header */
if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_rdma_send_header_t)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* setup descriptor */
descriptor->des_cbfunc = ompi_osc_rdma_sendreq_send_cb;
descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY;
module->m_pending_buffers[sendreq->req_target_rank].bml_btl = bml_btl;
module->m_pending_buffers[sendreq->req_target_rank].descriptor = descriptor;
module->m_pending_buffers[sendreq->req_target_rank].remain_len = descriptor->des_src[0].seg_len - sizeof(ompi_osc_rdma_base_header_t);
remain = module->m_pending_buffers[sendreq->req_target_rank].remain_len;
descriptor->des_src[0].seg_len = 0;
}
/* pack header */
header = (ompi_osc_rdma_send_header_t*) descriptor->des_src[0].seg_addr.pval;
header = (ompi_osc_rdma_send_header_t*)
((char*) descriptor->des_src[0].seg_addr.pval + descriptor->des_src[0].seg_len);
written_data += sizeof(ompi_osc_rdma_send_header_t);
header->hdr_base.hdr_flags = 0;
header->hdr_windx = sendreq->req_module->m_comm->c_contextid;
@ -413,26 +509,26 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
#if OMPI_ENABLE_MEM_DEBUG
header->hdr_target_op = 0;
#endif
sendreq->req_refcount++;
break;
}
/* Set datatype id and / or pack datatype */
ret = ompi_ddt_get_pack_description(sendreq->req_target_datatype, &packed_ddt);
if (OMPI_SUCCESS != ret) goto cleanup;
memcpy((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data,
memcpy((unsigned char*) descriptor->des_src[0].seg_addr.pval + descriptor->des_src[0].seg_len + written_data,
packed_ddt, packed_ddt_len);
written_data += packed_ddt_len;
if (OMPI_OSC_RDMA_GET != sendreq->req_type) {
/* if sending data and it fits, pack payload */
if (descriptor->des_src[0].seg_len >=
written_data + sendreq->req_origin_bytes_packed) {
if (remain >= written_data + sendreq->req_origin_bytes_packed) {
struct iovec iov;
uint32_t iov_count = 1;
size_t max_data = sendreq->req_origin_bytes_packed;
iov.iov_len = max_data;
iov.iov_base = (IOVBASE_TYPE*)((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data);
iov.iov_base = (IOVBASE_TYPE*)((unsigned char*) descriptor->des_src[0].seg_addr.pval + descriptor->des_src[0].seg_len + written_data);
ret = ompi_convertor_pack(&sendreq->req_origin_convertor, &iov, &iov_count,
&max_data );
@ -441,37 +537,66 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
goto cleanup;
}
assert(max_data == sendreq->req_origin_bytes_packed);
written_data += max_data;
descriptor->des_src[0].seg_len = written_data;
descriptor->des_src[0].seg_len += written_data;
header->hdr_msg_length = sendreq->req_origin_bytes_packed;
} else {
descriptor->des_src[0].seg_len += written_data;
header->hdr_msg_length = 0;
header->hdr_origin_tag = create_send_tag(module);
}
} else {
descriptor->des_src[0].seg_len = written_data;
descriptor->des_src[0].seg_len += written_data;
header->hdr_msg_length = 0;
}
module->m_pending_buffers[sendreq->req_target_rank].remain_len -= written_data;
if (module->m_use_buffers) {
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_MULTI;
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (sendreq->req_target_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
OMPI_OSC_RDMA_SEND_HDR_HTON(*header);
}
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (sendreq->req_target_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
OMPI_OSC_RDMA_SEND_HDR_HTON(*header);
}
#endif
/* send fragment */
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d sending sendreq to %d",
sendreq->req_module->m_comm->c_my_rank,
sendreq->req_target_rank));
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_RDMA);
goto done;
if (module->m_pending_buffers[sendreq->req_target_rank].remain_len <
sizeof(ompi_osc_rdma_send_header_t) + 128) {
/* not enough space left - send now */
ret = send_multi_buffer(module, sendreq->req_target_rank);
} else {
ret = OMPI_SUCCESS;
}
goto done;
} else {
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (sendreq->req_target_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
OMPI_OSC_RDMA_SEND_HDR_HTON(*header);
}
#endif
/* send fragment */
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d sending sendreq to %d",
sendreq->req_module->m_comm->c_my_rank,
sendreq->req_target_rank));
module->m_pending_buffers[sendreq->req_target_rank].bml_btl = NULL;
module->m_pending_buffers[sendreq->req_target_rank].descriptor = NULL;
module->m_pending_buffers[sendreq->req_target_rank].remain_len = 0;
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_RDMA);
goto done;
}
cleanup:
if (descriptor != NULL) {
@ -659,6 +784,10 @@ ompi_osc_rdma_sendreq_recv_put_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
OBJ_RELEASE(longreq->req_datatype);
ompi_osc_rdma_longreq_free(longreq);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d finished receiving long put message",
longreq->req_module->m_comm->c_my_rank));
inmsg_mark_complete(longreq->req_module);
}
@ -667,14 +796,14 @@ ompi_osc_rdma_sendreq_recv_put_long_cb(ompi_osc_rdma_longreq_t *longreq)
int
ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
void *inbuf)
void **inbuf)
{
int ret = OMPI_SUCCESS;
void *target = (unsigned char*) module->m_win->w_baseptr +
(header->hdr_target_disp * module->m_win->w_disp_unit);
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, header->hdr_origin );
struct ompi_datatype_t *datatype =
ompi_osc_rdma_datatype_create(proc, &inbuf);
ompi_osc_rdma_datatype_create(proc, inbuf);
if (NULL == datatype) {
opal_output(ompi_osc_base_output,
@ -701,7 +830,7 @@ ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
0,
&convertor);
iov.iov_len = header->hdr_msg_length;
iov.iov_base = (IOVBASE_TYPE*)inbuf;
iov.iov_base = (IOVBASE_TYPE*)*inbuf;
max_data = iov.iov_len;
ompi_convertor_unpack(&convertor,
&iov,
@ -710,28 +839,41 @@ ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
OBJ_DESTRUCT(&convertor);
OBJ_RELEASE(datatype);
inmsg_mark_complete(module);
*inbuf = ((char*) *inbuf) + header->hdr_msg_length;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d received put message from %d",
module->m_comm->c_my_rank,
header->hdr_origin));
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->cbfunc = ompi_osc_rdma_sendreq_recv_put_long_cb;
longreq->cbdata = NULL;
longreq->req_datatype = datatype;
longreq->req_module = module;
longreq->cbfunc = ompi_osc_rdma_sendreq_recv_put_long_cb;
longreq->cbdata = NULL;
longreq->req_datatype = datatype;
longreq->req_module = module;
ret = mca_pml.pml_irecv(target,
header->hdr_target_count,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->m_comm,
&(longreq->request));
ret = mca_pml.pml_irecv(target,
header->hdr_target_count,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->m_comm,
&(longreq->request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d started long recv put message from %d (%d)",
module->m_comm->c_my_rank,
header->hdr_origin,
header->hdr_origin_tag));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
}
return ret;
@ -788,13 +930,13 @@ ompi_osc_rdma_sendreq_recv_accum_long_cb(ompi_osc_rdma_longreq_t *longreq)
int
ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
void *payload)
void **payload)
{
int ret = OMPI_SUCCESS;
struct ompi_op_t *op = ompi_osc_rdma_op_create(header->hdr_target_op);
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, header->hdr_origin );
struct ompi_datatype_t *datatype =
ompi_osc_rdma_datatype_create(proc, &payload);
ompi_osc_rdma_datatype_create(proc, payload);
if (NULL == datatype) {
opal_output(ompi_osc_base_output,
@ -807,7 +949,7 @@ ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
OPAL_THREAD_LOCK(&module->m_acc_lock);
/* copy the data from the temporary buffer into the user window */
ret = ompi_osc_rdma_process_op(module, header, datatype, op, payload,
ret = ompi_osc_rdma_process_op(module, header, datatype, op, *payload,
header->hdr_msg_length);
/* unlock the window for accumulates */
@ -823,6 +965,8 @@ ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
"%d received accum message from %d",
module->m_comm->c_my_rank,
header->hdr_origin));
*payload = ((char*) *payload) + header->hdr_msg_length;
} else {
ompi_osc_rdma_longreq_t *longreq;
ptrdiff_t lb, extent, true_lb, true_extent;
@ -902,7 +1046,7 @@ int
ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq,
ompi_osc_rdma_reply_header_t *header,
void *payload)
void **payload)
{
int ret = OMPI_SUCCESS;
@ -916,7 +1060,7 @@ ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
int32_t count;
iov.iov_len = header->hdr_msg_length;
iov.iov_base = (IOVBASE_TYPE*)payload;
iov.iov_base = (IOVBASE_TYPE*)*payload;
max_data = iov.iov_len;
ompi_convertor_unpack(&sendreq->req_origin_convertor,
&iov,
@ -925,6 +1069,8 @@ ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
count = sendreq->req_module->m_num_pending_out -= 1;
ompi_osc_rdma_sendreq_free(sendreq);
*payload = ((char*) *payload) + header->hdr_msg_length;
if (0 == count) opal_condition_broadcast(&sendreq->req_module->m_cond);
} else {
ompi_osc_rdma_longreq_t *longreq;

Просмотреть файл

@ -23,37 +23,40 @@
/* send a sendreq (the request from the origin for a Put, Get, or
Accumulate, including the payload for Put and Accumulate) */
int ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq);
ompi_osc_rdma_sendreq_t *sendreq);
/* send a replyreq (the request from the target of a Get, with the
payload for the origin */
int ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_replyreq_t *replyreq);
ompi_osc_rdma_replyreq_t *replyreq);
/* receive the target side of a sendreq for a put, directly into the user's window */
int ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
void *payload);
ompi_osc_rdma_send_header_t *header,
void **payload);
/* receive the target side of a sendreq for an accumulate, possibly
using a temproart buffer, then calling the reduction functions */
int ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
void *payload);
ompi_osc_rdma_send_header_t *header,
void **payload);
/* receive the origin side of a replyreq (the reply part of an
MPI_Get), directly into the user's window */
int ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq,
ompi_osc_rdma_reply_header_t *header,
void *payload);
ompi_osc_rdma_sendreq_t *sendreq,
ompi_osc_rdma_reply_header_t *header,
void **payload);
int ompi_osc_rdma_control_send(ompi_osc_rdma_module_t *module,
ompi_proc_t *proc,
uint8_t type, int32_t value0, int32_t value1);
ompi_proc_t *proc,
uint8_t type,
int32_t value0, int32_t value1);
int ompi_osc_rdma_rdma_ack_send(ompi_osc_rdma_module_t *module,
ompi_proc_t *proc,
ompi_osc_rdma_btl_t *rdma_btl);
int ompi_osc_rdma_flush(ompi_osc_rdma_module_t *module);
#endif

Просмотреть файл

@ -25,7 +25,7 @@
#include "opal/types.h"
/* Note -- 0x05 to 0x0A are of control_hdr type */
/* Note -- 0x05 to 0x0C are of control_hdr type */
#define OMPI_OSC_RDMA_HDR_PUT 0x01
#define OMPI_OSC_RDMA_HDR_ACC 0x02
#define OMPI_OSC_RDMA_HDR_GET 0x03
@ -36,9 +36,11 @@
#define OMPI_OSC_RDMA_HDR_UNLOCK_REQ 0x08
#define OMPI_OSC_RDMA_HDR_UNLOCK_REPLY 0x09
#define OMPI_OSC_RDMA_HDR_RDMA_COMPLETE 0x0A
#define OMPI_OSC_RDMA_HDR_RDMA_INFO 0x0B
#define OMPI_OSC_RDMA_HDR_MULTI_END 0x0B
#define OMPI_OSC_RDMA_HDR_RDMA_INFO 0x0C
#define OMPI_OSC_RDMA_HDR_FLAG_NBO 0x01
#define OMPI_OSC_RDMA_HDR_FLAG_NBO 0x01
#define OMPI_OSC_RDMA_HDR_FLAG_MULTI 0x02
struct ompi_osc_rdma_base_header_t {
uint8_t hdr_type;

Просмотреть файл

@ -41,6 +41,8 @@ typedef enum {
struct ompi_osc_rdma_sendreq_t {
ompi_request_t super;
int req_refcount;
/** type of sendreq (from ompi_osc_rdma_req_type_t) */
ompi_osc_rdma_req_type_t req_type;
/** pointer to the module that created the sendreq */
@ -106,6 +108,7 @@ ompi_osc_rdma_sendreq_alloc(ompi_osc_rdma_module_t *module,
(*sendreq)->req_module = module;
(*sendreq)->req_target_rank = target_rank;
(*sendreq)->req_target_proc = proc;
(*sendreq)->req_refcount = 1;
return OMPI_SUCCESS;
}
@ -165,14 +168,16 @@ ompi_osc_rdma_sendreq_init_target(ompi_osc_rdma_sendreq_t *sendreq,
static inline int
ompi_osc_rdma_sendreq_free(ompi_osc_rdma_sendreq_t *sendreq)
{
ompi_convertor_cleanup(&sendreq->req_origin_convertor);
if (0 == (--sendreq->req_refcount)) {
ompi_convertor_cleanup(&sendreq->req_origin_convertor);
OBJ_RELEASE(sendreq->req_target_datatype);
OBJ_RELEASE(sendreq->req_origin_datatype);
OBJ_RELEASE(sendreq->req_target_datatype);
OBJ_RELEASE(sendreq->req_origin_datatype);
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.c_sendreqs,
(opal_list_item_t*) sendreq);
}
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.c_sendreqs,
(opal_list_item_t*) sendreq);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -150,6 +150,8 @@ ompi_osc_rdma_module_fence(int assert, ompi_win_t *win)
}
}
ompi_osc_rdma_flush(module);
OPAL_THREAD_LOCK(&module->m_lock);
/* if some requests couldn't be started, push into the
"queued" list, where we will try to restart them later. */
@ -338,6 +340,8 @@ ompi_osc_rdma_module_complete(ompi_win_t *win)
}
}
ompi_osc_rdma_flush(module);
OPAL_THREAD_LOCK(&module->m_lock);
/* if some requests couldn't be started, push into the
"queued" list, where we will try to restart them later. */
@ -558,6 +562,8 @@ ompi_osc_rdma_module_unlock(int target,
}
}
ompi_osc_rdma_flush(module);
OPAL_THREAD_LOCK(&module->m_lock);
/* if some requests couldn't be started, push into the
"queued" list, where we will try to restart them later. */