1
1

Process pending put/get frags on endpoint connection establishment.

This commit was SVN r16785.
Этот коммит содержится в:
Gleb Natapov 2007-11-28 07:16:52 +00:00
родитель e502402470
Коммит 923666b75c
6 изменённых файлов: 119 добавлений и 106 удалений

Просмотреть файл

@ -913,7 +913,7 @@ int mca_btl_openib_send(
*/ */
int mca_btl_openib_put( mca_btl_base_module_t* btl, int mca_btl_openib_put( mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint, mca_btl_base_endpoint_t* ep,
mca_btl_base_descriptor_t* descriptor) mca_btl_base_descriptor_t* descriptor)
{ {
struct ibv_send_wr* bad_wr; struct ibv_send_wr* bad_wr;
@ -925,20 +925,31 @@ int mca_btl_openib_put( mca_btl_base_module_t* btl,
assert(openib_frag_type(frag) == MCA_BTL_OPENIB_FRAG_SEND_USER || assert(openib_frag_type(frag) == MCA_BTL_OPENIB_FRAG_SEND_USER ||
openib_frag_type(frag) == MCA_BTL_OPENIB_FRAG_SEND); openib_frag_type(frag) == MCA_BTL_OPENIB_FRAG_SEND);
if(ep->endpoint_state != MCA_BTL_IB_CONNECTED) {
int rc;
OPAL_THREAD_LOCK(&ep->endpoint_lock);
rc = check_endpoint_state(ep, descriptor, &ep->pending_put_frags);
OPAL_THREAD_UNLOCK(&ep->ep_lock);
if(OMPI_ERR_TEMP_OUT_OF_RESOURCE == rc)
return OMPI_SUCCESS;
if(OMPI_SUCCESS != rc)
return rc;
}
if(MCA_BTL_NO_ORDER == qp) if(MCA_BTL_NO_ORDER == qp)
qp = mca_btl_openib_component.rdma_qp; qp = mca_btl_openib_component.rdma_qp;
/* check for a send wqe */ /* check for a send wqe */
if (qp_get_wqe(endpoint, qp) < 0) { if (qp_get_wqe(ep, qp) < 0) {
qp_put_wqe(endpoint, qp); qp_put_wqe(ep, qp);
OPAL_THREAD_LOCK(&endpoint->endpoint_lock); OPAL_THREAD_LOCK(&ep->endpoint_lock);
opal_list_append(&endpoint->pending_put_frags, (opal_list_item_t*)frag); opal_list_append(&ep->pending_put_frags, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); OPAL_THREAD_UNLOCK(&ep->endpoint_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/* post descriptor */ /* post descriptor */
#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT #if OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if((endpoint->endpoint_proc->proc_ompi->proc_arch & OMPI_ARCH_ISBIGENDIAN) if((ep->endpoint_proc->proc_ompi->proc_arch & OMPI_ARCH_ISBIGENDIAN)
!= (ompi_proc_local()->proc_arch & OMPI_ARCH_ISBIGENDIAN)) { != (ompi_proc_local()->proc_arch & OMPI_ARCH_ISBIGENDIAN)) {
rem_addr = opal_swap_bytes8(rem_addr); rem_addr = opal_swap_bytes8(rem_addr);
rkey = opal_swap_bytes4(rkey); rkey = opal_swap_bytes4(rkey);
@ -950,13 +961,13 @@ int mca_btl_openib_put( mca_btl_base_module_t* btl,
to_com_frag(frag)->sg_entry.addr = to_com_frag(frag)->sg_entry.addr =
(uint64_t)descriptor->des_src->seg_addr.pval; (uint64_t)descriptor->des_src->seg_addr.pval;
to_com_frag(frag)->sg_entry.length = descriptor->des_src->seg_len; to_com_frag(frag)->sg_entry.length = descriptor->des_src->seg_len;
to_com_frag(frag)->endpoint = endpoint; to_com_frag(frag)->endpoint = ep;
descriptor->order = qp; descriptor->order = qp;
/* Setting opcode on a frag constructor isn't enough since prepare_src /* Setting opcode on a frag constructor isn't enough since prepare_src
* may return send_frag instead of put_frag */ * may return send_frag instead of put_frag */
frag->sr_desc.opcode = IBV_WR_RDMA_WRITE; frag->sr_desc.opcode = IBV_WR_RDMA_WRITE;
if(ibv_post_send(endpoint->qps[qp].qp->lcl_qp, &frag->sr_desc, &bad_wr)) if(ibv_post_send(ep->qps[qp].qp->lcl_qp, &frag->sr_desc, &bad_wr))
return OMPI_ERROR; return OMPI_ERROR;
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -967,8 +978,8 @@ int mca_btl_openib_put( mca_btl_base_module_t* btl,
* RDMA READ remote buffer to local buffer address. * RDMA READ remote buffer to local buffer address.
*/ */
int mca_btl_openib_get( mca_btl_base_module_t* btl, int mca_btl_openib_get(mca_btl_base_module_t* btl,
mca_btl_base_endpoint_t* endpoint, mca_btl_base_endpoint_t* ep,
mca_btl_base_descriptor_t* descriptor) mca_btl_base_descriptor_t* descriptor)
{ {
struct ibv_send_wr* bad_wr; struct ibv_send_wr* bad_wr;
@ -979,30 +990,41 @@ int mca_btl_openib_get( mca_btl_base_module_t* btl,
assert(openib_frag_type(frag) == MCA_BTL_OPENIB_FRAG_RECV_USER); assert(openib_frag_type(frag) == MCA_BTL_OPENIB_FRAG_RECV_USER);
if(ep->endpoint_state != MCA_BTL_IB_CONNECTED) {
int rc;
OPAL_THREAD_LOCK(&ep->endpoint_lock);
rc = check_endpoint_state(ep, descriptor, &ep->pending_get_frags);
OPAL_THREAD_UNLOCK(&ep->endpoint_lock);
if(OMPI_ERR_TEMP_OUT_OF_RESOURCE == rc)
return OMPI_SUCCESS;
if(OMPI_SUCCESS != rc)
return rc;
}
if(MCA_BTL_NO_ORDER == qp) if(MCA_BTL_NO_ORDER == qp)
qp = mca_btl_openib_component.rdma_qp; qp = mca_btl_openib_component.rdma_qp;
/* check for a send wqe */ /* check for a send wqe */
if (qp_get_wqe(endpoint, qp) < 0) { if (qp_get_wqe(ep, qp) < 0) {
qp_put_wqe(endpoint, qp); qp_put_wqe(ep, qp);
OPAL_THREAD_LOCK(&endpoint->endpoint_lock); OPAL_THREAD_LOCK(&ep->endpoint_lock);
opal_list_append(&endpoint->pending_get_frags, (opal_list_item_t*)frag); opal_list_append(&ep->pending_get_frags, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); OPAL_THREAD_UNLOCK(&ep->endpoint_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/* check for a get token */ /* check for a get token */
if(OPAL_THREAD_ADD32(&endpoint->get_tokens,-1) < 0) { if(OPAL_THREAD_ADD32(&ep->get_tokens,-1) < 0) {
qp_put_wqe(endpoint, qp); qp_put_wqe(ep, qp);
OPAL_THREAD_ADD32(&endpoint->get_tokens,1); OPAL_THREAD_ADD32(&ep->get_tokens,1);
OPAL_THREAD_LOCK(&endpoint->endpoint_lock); OPAL_THREAD_LOCK(&ep->endpoint_lock);
opal_list_append(&endpoint->pending_get_frags, (opal_list_item_t*)frag); opal_list_append(&ep->pending_get_frags, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); OPAL_THREAD_UNLOCK(&ep->endpoint_lock);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT #if OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if((endpoint->endpoint_proc->proc_ompi->proc_arch & OMPI_ARCH_ISBIGENDIAN) if((ep->endpoint_proc->proc_ompi->proc_arch & OMPI_ARCH_ISBIGENDIAN)
!= (ompi_proc_local()->proc_arch & OMPI_ARCH_ISBIGENDIAN)) { != (ompi_proc_local()->proc_arch & OMPI_ARCH_ISBIGENDIAN)) {
rem_addr = opal_swap_bytes8(rem_addr); rem_addr = opal_swap_bytes8(rem_addr);
rkey = opal_swap_bytes4(rkey); rkey = opal_swap_bytes4(rkey);
@ -1014,10 +1036,10 @@ int mca_btl_openib_get( mca_btl_base_module_t* btl,
to_com_frag(frag)->sg_entry.addr = to_com_frag(frag)->sg_entry.addr =
(uint64_t)descriptor->des_dst->seg_addr.pval; (uint64_t)descriptor->des_dst->seg_addr.pval;
to_com_frag(frag)->sg_entry.length = descriptor->des_dst->seg_len; to_com_frag(frag)->sg_entry.length = descriptor->des_dst->seg_len;
to_com_frag(frag)->endpoint = endpoint; to_com_frag(frag)->endpoint = ep;
descriptor->order = qp; descriptor->order = qp;
if(ibv_post_send(endpoint->qps[qp].qp->lcl_qp, &frag->sr_desc, &bad_wr)) if(ibv_post_send(ep->qps[qp].qp->lcl_qp, &frag->sr_desc, &bad_wr))
return OMPI_ERROR; return OMPI_ERROR;
return OMPI_SUCCESS; return OMPI_SUCCESS;

Просмотреть файл

@ -501,6 +501,9 @@ extern mca_btl_base_descriptor_t* mca_btl_openib_prepare_dst(
size_t reserve, size_t reserve,
size_t* size); size_t* size);
extern void mca_btl_openib_frag_progress_pending_put_get(
struct mca_btl_base_endpoint_t*, const int);
/** /**
* Fault Tolerance Event Notification Function * Fault Tolerance Event Notification Function
* *

Просмотреть файл

@ -86,9 +86,6 @@ static int btl_openib_module_progress(mca_btl_openib_hca_t *hca);
static void progress_pending_frags_pp(mca_btl_base_endpoint_t *, const int); static void progress_pending_frags_pp(mca_btl_base_endpoint_t *, const int);
static void progress_pending_frags_srq( mca_btl_openib_module_t* , const int); static void progress_pending_frags_srq( mca_btl_openib_module_t* , const int);
static void progress_pending_eager_rdma(mca_btl_base_endpoint_t*); static void progress_pending_eager_rdma(mca_btl_base_endpoint_t*);
static void btl_openib_frag_progress_pending_put_get(
mca_btl_openib_module_t* openib_btl, mca_btl_base_endpoint_t *endpoint,
const int qp);
static int openib_reg_mr(void *reg_data, void *base, size_t size, static int openib_reg_mr(void *reg_data, void *base, size_t size,
mca_mpool_base_registration_t *reg); mca_mpool_base_registration_t *reg);
@ -1314,32 +1311,33 @@ static void progress_pending_frags_pp(mca_btl_base_endpoint_t *ep, const int qp)
OPAL_THREAD_UNLOCK(&ep->endpoint_lock); OPAL_THREAD_UNLOCK(&ep->endpoint_lock);
} }
static void btl_openib_frag_progress_pending_put_get( void mca_btl_openib_frag_progress_pending_put_get(mca_btl_base_endpoint_t *ep,
mca_btl_openib_module_t* openib_btl, mca_btl_base_endpoint_t *endpoint, const int qp)
const int qp) { {
mca_btl_openib_module_t* openib_btl = ep->endpoint_btl;
opal_list_item_t *frag; opal_list_item_t *frag;
size_t i, len = opal_list_get_size(&endpoint->pending_get_frags); size_t i, len = opal_list_get_size(&ep->pending_get_frags);
for(i = 0; i < len && endpoint->qps[qp].qp->sd_wqe > 0 && for(i = 0; i < len && ep->qps[qp].qp->sd_wqe > 0 && ep->get_tokens > 0; i++)
endpoint->get_tokens > 0; i++) { {
OPAL_THREAD_LOCK(&endpoint->endpoint_lock); OPAL_THREAD_LOCK(&ep->endpoint_lock);
frag = opal_list_remove_first(&(endpoint->pending_get_frags)); frag = opal_list_remove_first(&(ep->pending_get_frags));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); OPAL_THREAD_UNLOCK(&ep->endpoint_lock);
if(NULL == frag) if(NULL == frag)
break; break;
if(mca_btl_openib_get((mca_btl_base_module_t *)openib_btl, endpoint, if(mca_btl_openib_get((mca_btl_base_module_t *)openib_btl, ep,
&to_base_frag(frag)->base) == OMPI_ERR_OUT_OF_RESOURCE) &to_base_frag(frag)->base) == OMPI_ERR_OUT_OF_RESOURCE)
break; break;
} }
len = opal_list_get_size(&endpoint->pending_put_frags); len = opal_list_get_size(&ep->pending_put_frags);
for(i = 0; i < len && endpoint->qps[qp].qp->sd_wqe > 0; i++) { for(i = 0; i < len && ep->qps[qp].qp->sd_wqe > 0; i++) {
OPAL_THREAD_LOCK(&endpoint->endpoint_lock); OPAL_THREAD_LOCK(&ep->endpoint_lock);
frag = opal_list_remove_first(&(endpoint->pending_put_frags)); frag = opal_list_remove_first(&(ep->pending_put_frags));
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); OPAL_THREAD_UNLOCK(&ep->endpoint_lock);
if(NULL == frag) if(NULL == frag)
break; break;
if(mca_btl_openib_put((mca_btl_base_module_t*)openib_btl, endpoint, if(mca_btl_openib_put((mca_btl_base_module_t*)openib_btl, ep,
&to_base_frag(frag)->base) == OMPI_ERR_OUT_OF_RESOURCE) &to_base_frag(frag)->base) == OMPI_ERR_OUT_OF_RESOURCE)
break; break;
} }
@ -1561,7 +1559,7 @@ static int btl_openib_module_progress(mca_btl_openib_hca_t* hca)
} }
/* new wqe or/and get token available. Try to progress pending frags */ /* new wqe or/and get token available. Try to progress pending frags */
progress_pending_frags_wqe(endpoint->qps[qp].qp); progress_pending_frags_wqe(endpoint->qps[qp].qp);
btl_openib_frag_progress_pending_put_get(openib_btl, endpoint, qp); mca_btl_openib_frag_progress_pending_put_get(endpoint, qp);
count++; count++;
break; break;

Просмотреть файл

@ -42,7 +42,6 @@
#include "btl_openib_endpoint.h" #include "btl_openib_endpoint.h"
#include "btl_openib_proc.h" #include "btl_openib_proc.h"
#include "btl_openib_frag.h" #include "btl_openib_frag.h"
#include "connect/base.h"
static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint); static void mca_btl_openib_endpoint_construct(mca_btl_base_endpoint_t* endpoint);
static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint); static void mca_btl_openib_endpoint_destruct(mca_btl_base_endpoint_t* endpoint);
@ -498,75 +497,35 @@ void mca_btl_openib_endpoint_connected(mca_btl_openib_endpoint_t *endpoint)
if(OMPI_ERROR == mca_btl_openib_endpoint_post_send(endpoint, frag)) if(OMPI_ERROR == mca_btl_openib_endpoint_post_send(endpoint, frag))
BTL_ERROR(("Error posting send")); BTL_ERROR(("Error posting send"));
} }
/* if upper layer called put or get before connection moved to connected
* state then we restart them here */
mca_btl_openib_frag_progress_pending_put_get(endpoint,
mca_btl_openib_component.rdma_qp);
} }
/* /*
* Attempt to send a fragment using a given endpoint. If the endpoint is not * Attempt to send a fragment using a given endpoint. If the endpoint is not
* connected, queue the fragment and start the connection as required. * connected, queue the fragment and start the connection as required.
*/ */
int mca_btl_openib_endpoint_send(mca_btl_base_endpoint_t* endpoint, int mca_btl_openib_endpoint_send(mca_btl_base_endpoint_t* ep,
mca_btl_openib_send_frag_t* frag) mca_btl_openib_send_frag_t* frag)
{ {
int rc; int rc;
bool call_progress = false;
OPAL_THREAD_LOCK(&endpoint->endpoint_lock); OPAL_THREAD_LOCK(&ep->endpoint_lock);
switch(endpoint->endpoint_state) { rc = check_endpoint_state(ep, &to_base_frag(frag)->base,
case MCA_BTL_IB_CONNECTING: &ep->pending_lazy_frags);
BTL_VERBOSE(("Queing because state is connecting")); if(OPAL_LIKELY(rc == OMPI_SUCCESS)) {
rc = mca_btl_openib_endpoint_post_send(ep, frag);
opal_list_append(&endpoint->pending_lazy_frags, if(OMPI_ERR_OUT_OF_RESOURCE == rc)
(opal_list_item_t *)frag);
call_progress = true;
rc = OMPI_SUCCESS; rc = OMPI_SUCCESS;
break; } else if(OMPI_ERR_TEMP_OUT_OF_RESOURCE == rc) {
rc = OMPI_SUCCESS;
case MCA_BTL_IB_CONNECT_ACK:
case MCA_BTL_IB_WAITING_ACK:
BTL_VERBOSE(("Queuing because waiting for ack"));
opal_list_append(&endpoint->pending_lazy_frags,
(opal_list_item_t *)frag);
call_progress = true;
rc = OMPI_SUCCESS;
break;
case MCA_BTL_IB_CLOSED:
BTL_VERBOSE(("Connection to endpoint closed ... connecting ..."));
opal_list_append(&endpoint->pending_lazy_frags,
(opal_list_item_t *)frag);
rc = ompi_btl_openib_connect.bcf_start_connect(endpoint);
/*
* As long as we expect a message from the peer (in order
* to setup the connection) let the event engine pool the
* OOB events. Note: we increment it once peer active
* connection.
*/
opal_progress_event_users_increment();
call_progress = true;
break;
case MCA_BTL_IB_FAILED:
rc = OMPI_ERR_UNREACH;
break;
case MCA_BTL_IB_CONNECTED:
BTL_VERBOSE(("Send to : %d, len : %lu, frag : %p",
endpoint->endpoint_proc->proc_guid.vpid,
frag->sg_entry.length, frag));
rc = mca_btl_openib_endpoint_post_send(endpoint, frag);
if(rc == OMPI_ERR_OUT_OF_RESOURCE )
rc = OMPI_SUCCESS;
break;
default:
rc = OMPI_ERR_UNREACH;
break;
} }
OPAL_THREAD_UNLOCK(&endpoint->endpoint_lock); OPAL_THREAD_UNLOCK(&ep->endpoint_lock);
if(call_progress) opal_progress();
return rc; return rc;
} }

Просмотреть файл

@ -33,6 +33,7 @@
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include "ompi/mca/btl/base/btl_base_error.h" #include "ompi/mca/btl/base/btl_base_error.h"
#include "connect/base.h"
BEGIN_C_DECLS BEGIN_C_DECLS
@ -325,6 +326,38 @@ try_send:
mca_btl_openib_endpoint_send_credits(ep, qp); mca_btl_openib_endpoint_send_credits(ep, qp);
} }
static inline int check_endpoint_state(mca_btl_openib_endpoint_t *ep,
mca_btl_base_descriptor_t *des, opal_list_t *pending_list)
{
int rc = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
switch(ep->endpoint_state) {
case MCA_BTL_IB_CLOSED:
rc = ompi_btl_openib_connect.bcf_start_connect(ep);
if(rc == OMPI_SUCCESS)
rc = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
/*
* As long as we expect a message from the peer (in order
* to setup the connection) let the event engine pool the
* OOB events. Note: we increment it once peer active
* connection.
*/
opal_progress_event_users_increment();
/* fall through */
default:
opal_list_append(pending_list, (opal_list_item_t *)des);
break;
case MCA_BTL_IB_FAILED:
rc = OMPI_ERR_UNREACH;
break;
case MCA_BTL_IB_CONNECTED:
rc = OMPI_SUCCESS;
break;
}
return rc;
}
END_C_DECLS END_C_DECLS
#endif #endif

Просмотреть файл

@ -66,8 +66,6 @@
#ifndef BTL_OPENIB_CONNECT_H #ifndef BTL_OPENIB_CONNECT_H
#define BTL_OPENIB_CONNECT_H #define BTL_OPENIB_CONNECT_H
#include "btl_openib_endpoint.h"
BEGIN_C_DECLS BEGIN_C_DECLS
/** /**
@ -85,7 +83,7 @@ typedef int (*ompi_btl_openib_connect_base_func_init_t)(void);
* Function to initiate a connection to a remote process * Function to initiate a connection to a remote process
*/ */
typedef int (*ompi_btl_openib_connect_base_func_start_connect_t) typedef int (*ompi_btl_openib_connect_base_func_start_connect_t)
(mca_btl_base_endpoint_t *e); (struct mca_btl_base_endpoint_t *e);
/** /**
* Function to finalize the connection functions * Function to finalize the connection functions