1
1

- for non-threaded builds - set progress to be blocking for non-mpi apps

- reorg MX

This commit was SVN r3383.
Этот коммит содержится в:
Tim Woodall 2004-10-28 15:40:46 +00:00
родитель 3a5cf46856
Коммит 847c08fda5
14 изменённых файлов: 447 добавлений и 315 удалений

Просмотреть файл

@ -32,8 +32,8 @@ mca_ptl_mx_module_t mca_ptl_mx_module = {
mca_ptl_mx_add_procs,
mca_ptl_mx_del_procs,
mca_ptl_mx_finalize,
mca_ptl_mx_send, /* put */
mca_ptl_mx_send, /* put */
mca_ptl_mx_send,
mca_ptl_mx_send_continue,
NULL, /* get */
mca_ptl_mx_matched, /* matched */
mca_ptl_mx_request_init,
@ -45,6 +45,16 @@ mca_ptl_mx_module_t mca_ptl_mx_module = {
};
/**
* Allocate memory for use by the convert.
*/
static void *mca_ptl_mx_alloc(size_t *size)
{
return malloc(*size);
}
/**
* PML->PTL Initialize a send request for use by the PTL.
*
@ -125,8 +135,10 @@ int mca_ptl_mx_send(
size_t size,
int flags)
{
mca_ptl_mx_module_t* ptl_mx = (mca_ptl_mx_module_t*)ptl;
mca_ptl_mx_module_t* mx_ptl = (mca_ptl_mx_module_t*)ptl;
mca_ptl_mx_send_frag_t* sendfrag;
mca_ptl_base_header_t* hdr;
mx_return_t mx_return;
int rc;
if (offset == 0 && sendreq->req_cached) {
@ -137,18 +149,234 @@ int mca_ptl_mx_send(
if(NULL == (sendfrag = (mca_ptl_mx_send_frag_t*)item))
return rc;
}
rc = mca_ptl_mx_send_frag_init(sendfrag, ptl_peer, sendreq, offset, &size, flags);
if(rc != OMPI_SUCCESS)
return rc;
/* setup message header */
hdr = &sendfrag->frag_send.frag_base.frag_header;
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
/* initialize convertor */
sendfrag->frag_free = 0;
if(size > 0) {
ompi_convertor_t *convertor;
struct iovec iov;
unsigned int iov_count;
unsigned int max_data;
int rc;
convertor = &sendfrag->frag_send.frag_base.frag_convertor;
ompi_convertor_copy(&sendreq->req_convertor, convertor);
ompi_convertor_init_for_send(
convertor,
0,
sendreq->req_datatype,
sendreq->req_count,
sendreq->req_addr,
offset,
mca_ptl_mx_alloc );
/* if data is contigous convertor will return an offset
* into users buffer - otherwise will return an allocated buffer
* that holds the packed data
*/
iov.iov_base = NULL;
iov.iov_len = size;
iov_count = 1;
max_data = size;
if((rc = ompi_convertor_pack(
convertor,
&iov,
&iov_count,
&max_data,
&(sendfrag->frag_free))) < 0) {
return OMPI_ERROR;
}
/* adjust the freeAfter as the position zero is reserved for the header */
sendfrag->frag_free <<= 1;
sendfrag->frag_segments[1].segment_ptr = iov.iov_base;
sendfrag->frag_segments[1].segment_length = iov.iov_len;
sendfrag->frag_segment_count = 2;
sendfrag->frag_send.frag_base.frag_addr = iov.iov_base;
} else {
sendfrag->frag_send.frag_base.frag_addr = NULL;
sendfrag->frag_send.frag_base.frag_size = 0;
sendfrag->frag_segment_count = 1;
}
hdr->hdr_frag.hdr_frag_length = size;
/* convert header to network byte order if required */
if(ptl_peer->peer_byte_swap) {
hdr->hdr_common.hdr_flags |= MCA_PTL_FLAGS_NBO;
MCA_PTL_BASE_MATCH_HDR_HTON(hdr->hdr_match);
}
/* fragment state */
sendfrag->frag_send.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->frag_send.frag_request = sendreq;
sendfrag->frag_send.frag_base.frag_size = size;
sendfrag->frag_send.frag_base.frag_peer = ptl_peer;
/* must update the offset after actual fragment size is determined
* before attempting to send the fragment
*/
sendreq->req_offset += size;
return mca_ptl_mx_send_frag_start(sendfrag, ptl_mx);
/* start the fragment */
mx_return = mx_isend(
mx_ptl->mx_endpoint,
sendfrag->frag_segments,
sendfrag->frag_segment_count,
ptl_peer->peer_addr,
1,
sendfrag,
&sendfrag->frag_request);
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_send: mx_isend() failed with return value=%d\n", mx_return);
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/**
* PML->PTL Initiate a send to the peer.
*
* @param ptl (IN) PTL instance
* @param ptl_base_peer (IN) PTL peer addressing
* @param request (IN) Send request
* @param offset Current offset into packed/contiguous buffer.
* @param size (IN) Number of bytes PML is requesting PTL to deliver,
* @param flags (IN) Flags that should be passed to the peer via the message header.
* @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments
*
* Continue sending fragments of a large message to the peer.
*/
int mca_ptl_mx_send_continue(
struct mca_ptl_base_module_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_pml_base_send_request_t* sendreq,
size_t offset,
size_t size,
int flags)
{
mca_ptl_mx_module_t* mx_ptl = (mca_ptl_mx_module_t*)ptl;
mca_ptl_mx_send_frag_t* sendfrag;
mca_ptl_base_header_t* hdr;
mx_return_t mx_return;
int rc;
/* allocate fragment */
MCA_PTL_MX_SEND_FRAG_ALLOC(sendfrag, rc);
if(rc != OMPI_SUCCESS) {
return rc;
}
/* setup message header */
hdr = &sendfrag->frag_send.frag_base.frag_header;
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
/* initialize convertor */
sendfrag->frag_free = 0;
if(size > 0) {
ompi_convertor_t *convertor;
struct iovec iov;
unsigned int iov_count;
unsigned int max_data;
int rc;
convertor = &sendfrag->frag_send.frag_base.frag_convertor;
ompi_convertor_copy(&sendreq->req_convertor, convertor);
ompi_convertor_init_for_send(
convertor,
0,
sendreq->req_datatype,
sendreq->req_count,
sendreq->req_addr,
offset,
mca_ptl_mx_alloc );
/* if data is contigous convertor will return an offset
* into users buffer - otherwise will return an allocated buffer
* that holds the packed data
*/
iov.iov_base = NULL;
iov.iov_len = size;
iov_count = 1;
max_data = size;
if((rc = ompi_convertor_pack(
convertor,
&iov,
&iov_count,
&max_data,
&(sendfrag->frag_free))) < 0) {
return OMPI_ERROR;
}
/* adjust the freeAfter as the position zero is reserved for the header */
sendfrag->frag_free <<= 1;
sendfrag->frag_segments[1].segment_ptr = iov.iov_base;
sendfrag->frag_segments[1].segment_length = iov.iov_len;
sendfrag->frag_segment_count = 2;
sendfrag->frag_send.frag_base.frag_addr = iov.iov_base;
} else {
sendfrag->frag_send.frag_base.frag_addr = NULL;
sendfrag->frag_send.frag_base.frag_size = 0;
sendfrag->frag_segment_count = 1;
}
hdr->hdr_frag.hdr_frag_length = size;
/* fragment state */
sendfrag->frag_send.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->frag_send.frag_request = sendreq;
sendfrag->frag_send.frag_base.frag_size = size;
sendfrag->frag_send.frag_base.frag_peer = ptl_peer;
/* convert header to network byte order if required */
if(ptl_peer->peer_byte_swap) {
hdr->hdr_common.hdr_flags |= MCA_PTL_FLAGS_NBO;
MCA_PTL_BASE_FRAG_HDR_HTON(hdr->hdr_frag);
}
/* must update the offset after actual fragment size is determined
* before attempting to send the fragment
*/
sendreq->req_offset += size;
/* start the fragment */
mx_return = mx_isend(
mx_ptl->mx_endpoint,
sendfrag->frag_segments,
sendfrag->frag_segment_count,
sendfrag->frag_send.frag_base.frag_peer->peer_addr,
1,
sendfrag,
&sendfrag->frag_request);
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_send: mx_isend() failed with return value=%d\n", mx_return);
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/**
* PML->PTL Notification from the PML to the PTL that a receive
* has been posted and matched against the indicated fragment.
@ -192,11 +420,28 @@ void mca_ptl_mx_matched(
ompi_list_append(&mca_ptl_mx_component.mx_pending_acks, (ompi_list_item_t*)frag);
OMPI_THREAD_UNLOCK(&mca_ptl_mx_component.mx_lock);
} else {
mx_return_t mx_return;
mca_ptl_mx_send_frag_init_ack(ack, mx_ptl, mx_frag);
if(ack->frag_send.frag_base.frag_peer->peer_byte_swap) {
MCA_PTL_BASE_ACK_HDR_HTON(ack->frag_send.frag_base.frag_header.hdr_ack);
}
mca_ptl_mx_send_frag_start(ack, mx_ptl);
/* start the fragment */
mx_return = mx_isend(
mx_ptl->mx_endpoint,
ack->frag_segments,
ack->frag_segment_count,
ack->frag_send.frag_base.frag_peer->peer_addr,
1,
ack,
&ack->frag_request);
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_matched: mx_isend() failed with return value=%d\n", mx_return);
OMPI_THREAD_LOCK(&mca_ptl_mx_component.mx_lock);
ack_pending = true;
ompi_list_append(&mca_ptl_mx_component.mx_pending_acks, (ompi_list_item_t*)frag);
OMPI_THREAD_UNLOCK(&mca_ptl_mx_component.mx_lock);
}
}
}

Просмотреть файл

@ -25,6 +25,7 @@ struct mca_ptl_mx_component_t {
int mx_free_list_max; /**< maximum size of free lists */
int mx_free_list_inc; /**< number of elements to growing free lists by */
int mx_prepost; /**< number of preposted recvs */
int mx_debug; /**< debug level */
uint32_t mx_filter; /**< filter assigned to application */
uint32_t mx_num_ptls; /**< number of MX NICs available to app */
struct mca_ptl_mx_module_t** mx_ptls; /**< array of available PTL moduless */
@ -316,5 +317,28 @@ extern int mca_ptl_mx_send(
);
/**
* PML->PTL Continue sending fragments of a large message.
*
* @param ptl (IN) PTL instance
* @param ptl_base_peer (IN) PTL peer addressing
* @param request (IN) Send request
* @param offset Current offset into packed/contiguous buffer.
* @param size (IN) Number of bytes PML is requesting PTL to deliver,
* @param flags (IN) Flags that should be passed to the peer via the message header.
* @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments
*
*/
extern int mca_ptl_mx_send_continue(
struct mca_ptl_base_module_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_pml_base_send_request_t*,
size_t offset,
size_t size,
int flags
);
#endif

Просмотреть файл

@ -91,9 +91,11 @@ int mca_ptl_mx_component_open(void)
/* register MX module parameters */
mca_ptl_mx_component.mx_filter =
(uint32_t)mca_ptl_mx_param_register_int("filter", 0xdeadbeef);
(uint32_t)mca_ptl_mx_param_register_int("filter", 0x12345);
mca_ptl_mx_component.mx_prepost =
mca_ptl_mx_param_register_int("prepost", 16);
mca_ptl_mx_param_register_int("prepost", 1);
mca_ptl_mx_component.mx_debug =
mca_ptl_mx_param_register_int("debug", 0);
mca_ptl_mx_component.mx_free_list_num =
mca_ptl_mx_param_register_int("free_list_num", 256);
mca_ptl_mx_component.mx_free_list_max =
@ -117,6 +119,7 @@ int mca_ptl_mx_component_open(void)
int mca_ptl_mx_component_close(void)
{
mx_finalize();
if (mca_ptl_mx_component.mx_send_frags.fl_num_allocated !=
mca_ptl_mx_component.mx_send_frags.super.ompi_list_length) {
ompi_output(0, "mx send frags: %d allocated %d returned\n",
@ -232,7 +235,7 @@ int mca_ptl_mx_component_progress(mca_ptl_tstamp_t tstamp)
return OMPI_ERROR;
}
if(mx_result > 0) {
mca_ptl_mx_progress(ptl, mx_request);
MCA_PTL_MX_PROGRESS(ptl, mx_request);
}
}
return OMPI_SUCCESS;

Просмотреть файл

@ -112,14 +112,16 @@ static void* mca_ptl_mx_thread(ompi_object_t *arg)
&mx_result);
if(mx_return == MX_TIMEOUT)
continue;
if(mx_return != MX_SUCCESS) {
else if(ptl->mx_thread_run == false)
break;
else if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_thread: mx_probe() failed with status %d\n",
mx_return);
break;
}
/* process the pending request */
mca_ptl_mx_progress(ptl, mx_request);
MCA_PTL_MX_PROGRESS(ptl, mx_request);
}
return NULL;
}
@ -146,7 +148,7 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
/* open local endpoint */
status = mx_open_endpoint(
addr,
MX_ANY_ENDPOINT,
1,
mca_ptl_mx_component.mx_filter,
NULL,
0,
@ -165,7 +167,7 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
mca_ptl_mx_finalize(&ptl->super);
return NULL;
}
/* breakup the endpoint address */
if((status = mx_decompose_endpoint_addr(
ptl->mx_endpoint_addr,
@ -177,9 +179,19 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
return NULL;
}
if(mca_ptl_mx_component.mx_debug) {
ompi_output(0, "mca_ptl_mx_create: opened %08X:%08X:%08X:%08X\n",
(uint32_t)(ptl->mx_nic_addr >> 32),
(uint32_t)ptl->mx_nic_addr,
ptl->mx_endpoint_id,
ptl->mx_filter);
}
/* pre-post receive buffers */
for(i=0; i<mca_ptl_mx_component.mx_prepost; i++) {
if(mca_ptl_mx_post(ptl) != OMPI_SUCCESS) {
int rc;
MCA_PTL_MX_POST(ptl, rc);
if(rc != OMPI_SUCCESS) {
mca_ptl_mx_finalize(&ptl->super);
return NULL;
}
@ -208,10 +220,12 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
int mca_ptl_mx_finalize(struct mca_ptl_base_module_t* ptl)
{
mca_ptl_mx_module_t* ptl_mx = (mca_ptl_mx_module_t*)ptl;
mx_wakeup(ptl_mx->mx_endpoint);
#if OMPI_HAVE_THREADS
ptl_mx->mx_thread_run = false;
ompi_thread_join(&ptl_mx->mx_thread, NULL);
#endif
mx_close_endpoint(ptl_mx->mx_endpoint);
free(ptl_mx);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -18,74 +18,109 @@
* Prepost recv buffers
*/
static inline int mca_ptl_mx_post(mca_ptl_mx_module_t* ptl)
{
mca_ptl_mx_recv_frag_t* frag;
mx_return_t status;
int rc;
/* post an additional recv */
MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_ptl_mx_thread: unable to allocate recv frag\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_ptl_mx_recv_frag_init(frag, ptl);
status = mx_irecv(
ptl->mx_endpoint,
frag->frag_segments,
frag->frag_segment_count,
1,
MX_MATCH_MASK_NONE,
frag,
&frag->frag_request);
if(status != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_post: mx_irecv() failed with status=%d\n", status);
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
#define MCA_PTL_MX_POST(ptl, rc) \
do { \
mca_ptl_mx_recv_frag_t* frag; \
mx_return_t mx_return; \
/* post an additional recv */ \
MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc); \
if(rc != OMPI_SUCCESS) { \
ompi_output(0, "mca_ptl_mx_post: unable to allocate recv fragn"); \
rc = OMPI_ERR_OUT_OF_RESOURCE; \
break; \
} \
frag->frag_recv.frag_base.frag_owner = &ptl->super; \
frag->frag_recv.frag_base.frag_peer = NULL; \
frag->frag_segment_count = 2; \
frag->frag_segments[1].segment_ptr = frag->frag_data; \
frag->frag_segments[1].segment_length = sizeof(frag->frag_data); \
\
mx_return = mx_irecv( \
ptl->mx_endpoint, \
frag->frag_segments, \
frag->frag_segment_count, \
1, \
MX_MATCH_MASK_NONE, \
frag, \
&frag->frag_request); \
if(mx_return != MX_SUCCESS) { \
ompi_output(0, "mca_ptl_mx_post: mx_irecv() failed with status=%dn", \
mx_return); \
rc = OMPI_ERROR; \
} \
rc = OMPI_SUCCESS; \
} while(0)
/**
* Routine to process complete request(s).
*/
static inline void mca_ptl_mx_progress(mca_ptl_mx_module_t* ptl, mx_request_t mx_request)
{
mx_return_t mx_return;
mx_status_t mx_status;
uint32_t mx_result;
mca_ptl_base_frag_t* frag;
mx_return = mx_test(
ptl->mx_endpoint,
&mx_request,
&mx_status,
&mx_result);
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_progress: mx_test() failed with status=%d\n", mx_return);
return;
}
frag = (mca_ptl_base_frag_t*)mx_status.context;
switch(frag->frag_type) {
case MCA_PTL_FRAGMENT_SEND:
{
mca_ptl_mx_send_frag_handler((mca_ptl_mx_send_frag_t*)frag, ptl);
break;
}
case MCA_PTL_FRAGMENT_RECV:
{
mca_ptl_mx_recv_frag_handler((mca_ptl_mx_recv_frag_t*)frag, ptl);
mca_ptl_mx_post(ptl);
break;
}
default:
{
ompi_output(0, "mca_ptl_mx_progress: invalid request type: %d\n", frag->frag_type);
break;
}
}
static inline void MCA_PTL_MX_PROGRESS(mca_ptl_mx_module_t* ptl, mx_request_t mx_request)
{
mx_return_t mx_return;
mx_status_t mx_status;
uint32_t mx_result;
mca_ptl_base_frag_t* frag;
mx_return = mx_test(
ptl->mx_endpoint,
&mx_request,
&mx_status,
&mx_result);
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_progress: mx_test() failed with status=%dn",
mx_return);
return;
}
frag = (mca_ptl_base_frag_t*)mx_status.context;
switch(frag->frag_type) {
case MCA_PTL_FRAGMENT_SEND:
{
mca_ptl_mx_send_frag_t* sendfrag = (mca_ptl_mx_send_frag_t*)frag;
mca_pml_base_send_request_t* sendreq =
sendfrag->frag_send.frag_request;
bool req_cached = sendreq->req_cached;
ptl->super.ptl_send_progress(
&ptl->super,
sendreq,
sendfrag->frag_send.frag_base.frag_size);
if(req_cached == false)
MCA_PTL_MX_SEND_FRAG_RETURN(sendfrag);
break;
}
case MCA_PTL_FRAGMENT_RECV:
{
mca_ptl_mx_recv_frag_t* recvfrag = (mca_ptl_mx_recv_frag_t*)frag;
mca_ptl_base_header_t* hdr =
&recvfrag->frag_recv.frag_base.frag_header;
int rc;
switch(hdr->hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
{
if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_NBO) {
MCA_PTL_BASE_MATCH_HDR_NTOH(hdr->hdr_match);
}
ptl->super.ptl_match(&ptl->super, &recvfrag->frag_recv,
&hdr->hdr_match);
break;
}
case MCA_PTL_HDR_TYPE_FRAG:
break;
case MCA_PTL_HDR_TYPE_ACK:
break;
}
MCA_PTL_MX_POST(ptl, rc);
break;
}
default:
{
ompi_output(0, "mca_ptl_mx_progress: invalid request type: %dn",
frag->frag_type);
break;
}
}
}

Просмотреть файл

@ -141,10 +141,30 @@ mca_ptl_mx_proc_t* mca_ptl_mx_proc_lookup(const ompi_process_name_t *name)
int mca_ptl_mx_proc_insert(mca_ptl_mx_proc_t* ptl_proc, mca_ptl_base_peer_t* ptl_peer)
{
/* insert into peer array */
mx_endpoint_addr_t addr;
uint64_t mx_nic_addr;
uint32_t mx_endpoint_id;
uint32_t mx_filter;
ptl_peer->peer_proc = ptl_proc;
ptl_peer->peer_addr = ptl_proc->proc_addrs[ptl_proc->proc_peer_count];
ptl_proc->proc_peers[ptl_proc->proc_peer_count] = ptl_peer;
ptl_proc->proc_peer_count++;
/* breakup the endpoint address and reconstruct - otherwise it doesn't
* appear to be initialized correctly for this proc
*/
mx_decompose_endpoint_addr(
ptl_peer->peer_addr,
&mx_nic_addr,
&mx_endpoint_id,
&mx_filter);
memset(&ptl_peer->peer_addr, 0, sizeof(ptl_peer->peer_addr));
mx_compose_endpoint_addr(
mx_nic_addr,
mx_endpoint_id,
mx_filter,
&ptl_peer->peer_addr);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -22,6 +22,9 @@ OBJ_CLASS_INSTANCE(
static void mca_ptl_mx_recv_frag_construct(mca_ptl_mx_recv_frag_t* frag)
{
/* one time initialization */
frag->frag_segments[0].segment_ptr = &frag->frag_recv.frag_base.frag_header;
frag->frag_segments[0].segment_length = sizeof(frag->frag_recv.frag_base.frag_header);
}

Просмотреть файл

@ -36,68 +36,5 @@ OBJ_CLASS_DECLARATION(mca_ptl_mx_recv_frag_t);
#define MCA_PTL_MX_RECV_FRAG_RETURN(recvfrag) \
OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_recv_frags, (ompi_list_item_t*)recvfrag);
/**
*
*/
static inline void mca_ptl_mx_recv_frag_init(
mca_ptl_mx_recv_frag_t* frag,
mca_ptl_mx_module_t* ptl)
{
frag->frag_recv.frag_base.frag_owner = &ptl->super;
frag->frag_recv.frag_base.frag_peer = NULL;
frag->frag_segment_count = 2;
frag->frag_segments[0].segment_ptr = &frag->frag_recv.frag_base.frag_header;
frag->frag_segments[0].segment_length = sizeof(frag->frag_recv.frag_base.frag_header);
frag->frag_segments[1].segment_ptr = frag->frag_data;
frag->frag_segments[1].segment_length = sizeof(frag->frag_data);
}
/**
*
*/
static inline void mca_ptl_mx_recv_frag_handler(
mca_ptl_mx_recv_frag_t* frag,
mca_ptl_mx_module_t* ptl)
{
mca_ptl_base_header_t* hdr = &frag->frag_recv.frag_base.frag_header;
switch(hdr->hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
{
if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_NBO) {
MCA_PTL_BASE_MATCH_HDR_NTOH(hdr->hdr_match);
}
ptl->super.ptl_match(&ptl->super, &frag->frag_recv, &hdr->hdr_match);
break;
}
case MCA_PTL_HDR_TYPE_FRAG:
break;
case MCA_PTL_HDR_TYPE_ACK:
break;
}
}
/**
*
*/
static inline void mca_ptl_mx_recv_frag_progress(
mca_ptl_mx_recv_frag_t* frag,
mca_ptl_mx_module_t* ptl)
{
/* copy data into user buffer */
/* update request status */
ptl->super.ptl_recv_progress(
&ptl->super,
frag->frag_recv.frag_request,
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length,
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length);
}
#endif

Просмотреть файл

@ -23,6 +23,9 @@ OBJ_CLASS_INSTANCE(
static void mca_ptl_mx_send_frag_construct(mca_ptl_mx_send_frag_t* frag)
{
/* one time initialization */
frag->frag_segments[0].segment_ptr = &frag->frag_send.frag_base.frag_header;
frag->frag_segments[0].segment_length = sizeof(mca_ptl_base_header_t);
}
@ -36,125 +39,6 @@ static void *mca_ptl_mx_alloc(size_t *size)
return malloc(*size);
}
/*
* Initialize the fragment based on the current offset into the users
* data buffer, and the indicated size.
*/
int mca_ptl_mx_send_frag_init(
mca_ptl_mx_send_frag_t* sendfrag,
mca_ptl_mx_peer_t* ptl_peer,
mca_pml_base_send_request_t* sendreq,
size_t offset,
size_t* size,
int flags)
{
/* message header */
size_t size_in = *size;
size_t size_out;
unsigned int iov_count, max_data;
mca_ptl_base_header_t* hdr = &sendfrag->frag_send.frag_base.frag_header;
sendfrag->frag_segments[0].segment_ptr = hdr;
sendfrag->frag_segments[0].segment_length = sizeof(mca_ptl_base_header_t);
if(offset == 0) {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
}
sendfrag->frag_free = 0;
/* initialize convertor */
if(size_in > 0) {
ompi_convertor_t *convertor;
struct iovec iov;
int rc;
convertor = &sendfrag->frag_send.frag_base.frag_convertor;
ompi_convertor_copy(&sendreq->req_convertor, convertor);
ompi_convertor_init_for_send(
convertor,
0,
sendreq->req_datatype,
sendreq->req_count,
sendreq->req_addr,
offset,
mca_ptl_mx_alloc );
/* if data is contigous convertor will return an offset
* into users buffer - otherwise will return an allocated buffer
* that holds the packed data
*/
iov.iov_base = NULL;
iov.iov_len = size_in;
iov_count = 1;
max_data = size_in;
if((rc = ompi_convertor_pack(
convertor,
&iov,
&iov_count,
&max_data,
&(sendfrag->frag_free))) < 0) {
return OMPI_ERROR;
}
/* adjust the freeAfter as the position zero is reserved for the header */
sendfrag->frag_free <<= 1;
sendfrag->frag_segments[1].segment_ptr = iov.iov_base;
sendfrag->frag_segments[1].segment_length = size_out;
sendfrag->frag_segment_count = 2;
sendfrag->frag_send.frag_base.frag_addr = iov.iov_base;
/* adjust size and request offset to reflect actual number of bytes packed by convertor */
size_out = iov.iov_len;
} else {
size_out = size_in;
sendfrag->frag_send.frag_base.frag_addr = NULL;
sendfrag->frag_send.frag_base.frag_size = 0;
sendfrag->frag_segment_count = 1;
}
hdr->hdr_frag.hdr_frag_length = size_out;
/* convert to network byte order if required */
if(ptl_peer->peer_byte_swap) {
hdr->hdr_common.hdr_flags |= MCA_PTL_FLAGS_NBO;
if(offset == 0) {
MCA_PTL_BASE_MATCH_HDR_HTON(hdr->hdr_match);
} else {
MCA_PTL_BASE_FRAG_HDR_HTON(hdr->hdr_frag);
}
}
/* fragment state */
sendfrag->frag_send.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->frag_send.frag_request = sendreq;
sendfrag->frag_send.frag_base.frag_size = size_out;
sendfrag->frag_send.frag_base.frag_peer = ptl_peer;
*size = size_out;
return OMPI_SUCCESS;
}
void mca_ptl_mx_send_frag_init_ack(
mca_ptl_mx_send_frag_t* ack,

Просмотреть файл

@ -50,59 +50,6 @@ typedef struct mca_ptl_mx_send_frag_t mca_ptl_mx_send_frag_t;
OBJ_CLASS_DECLARATION(mca_ptl_mx_send_frag_t);
/*
* Initialize the fragment based on the current offset into the users
* data buffer, and the indicated size.
*/
int mca_ptl_mx_send_frag_init(
mca_ptl_mx_send_frag_t* sendfrag,
struct mca_ptl_base_peer_t* ptl_peer,
mca_pml_base_send_request_t* sendreq,
size_t offset,
size_t* size,
int flags);
/*
* Start the MX send for the fragment.
*/
static inline int mca_ptl_mx_send_frag_start(
mca_ptl_mx_send_frag_t* sendfrag,
mca_ptl_mx_module_t* ptl)
{
mx_return_t mx_return = mx_isend(
ptl->mx_endpoint,
sendfrag->frag_segments,
sendfrag->frag_segment_count,
sendfrag->frag_send.frag_base.frag_peer->peer_addr,
1,
sendfrag,
&sendfrag->frag_request);
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_send_frag_start: mx_isend() failed with return value=%d\n", mx_return);
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/**
* Callback on MX send completion.
*/
static inline void mca_ptl_mx_send_frag_handler(
mca_ptl_mx_send_frag_t* sendfrag,
mca_ptl_mx_module_t* ptl)
{
ptl->super.ptl_send_progress(
&ptl->super,
sendfrag->frag_send.frag_request,
sendfrag->frag_send.frag_base.frag_size);
if(sendfrag->frag_send.frag_base.frag_header.hdr_frag.hdr_frag_offset != 0)
MCA_PTL_MX_SEND_FRAG_RETURN(sendfrag);
}
void mca_ptl_mx_send_frag_init_ack(
mca_ptl_mx_send_frag_t* ack,

Просмотреть файл

@ -20,6 +20,7 @@
#include "info/info.h"
#include "util/proc_info.h"
#include "runtime/runtime.h"
#include "runtime/ompi_progress.h"
#include "runtime/ompi_rte_wait.h"
#include "mca/base/base.h"
@ -43,6 +44,9 @@ int ompi_mpi_finalize(void)
int ret;
ompi_mpi_finalized = true;
#if OMPI_HAVE_THREADS == 0
ompi_progress_events(OMPI_EVLOOP_ONCE);
#endif
/* unregister process */
if (OMPI_SUCCESS != (ret = ompi_registry.rte_unregister(

Просмотреть файл

@ -308,6 +308,10 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
goto error;
}
#if OMPI_HAVE_THREADS == 0
ompi_progress_events(OMPI_EVLOOP_NONBLOCK);
#endif
error:
if (ret != OMPI_SUCCESS) {
ompi_show_help("help-mpi-runtime",

Просмотреть файл

@ -4,9 +4,17 @@
#include "runtime/ompi_progress.h"
static int ompi_progress_event_flag = OMPI_EVLOOP_ONCE;
void ompi_progress_events(int flag)
{
ompi_progress_event_flag = flag;
}
void ompi_progress(void)
{
ompi_event_loop(OMPI_EVLOOP_NONBLOCK);
if(ompi_progress_event_flag != 0)
ompi_event_loop(ompi_progress_event_flag);
mca_pml.pml_progress();
}

Просмотреть файл

@ -3,7 +3,11 @@
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
OMPI_DECLSPEC extern void ompi_progress_events(int);
OMPI_DECLSPEC extern void ompi_progress(void);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif