1
1
- first cut at MX support for short messages

This commit was SVN r3369.
Этот коммит содержится в:
Tim Woodall 2004-10-27 13:52:06 +00:00
родитель cddf69bebf
Коммит f6ab31f38d
21 изменённых файлов: 606 добавлений и 65 удалений

Просмотреть файл

@ -141,10 +141,10 @@ static inline int mca_pml_teg_send_request_start(
if(first_fragment_size == 0 || req->req_bytes_packed <= first_fragment_size) {
first_fragment_size = req->req_bytes_packed;
flags = (req->req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) ?
MCA_PTL_FLAGS_ACK_MATCHED : 0;
MCA_PTL_FLAGS_ACK : 0;
} else {
/* require match for first fragment of a multi-fragment */
flags = MCA_PTL_FLAGS_ACK_MATCHED;
flags = MCA_PTL_FLAGS_ACK;
}
rc = ptl->ptl_send(ptl, req->req_peer, req, 0, first_fragment_size, flags);
if(rc != OMPI_SUCCESS)

Просмотреть файл

@ -4,10 +4,11 @@
/**
* @file
*/
#ifndef MCA_PML_BASE_FRAGMENT_H
#define MCA_PML_BASE_FRAGMENT_H
#ifndef MCA_PTL_BASE_FRAGMENT_H
#define MCA_PTL_BASE_FRAGMENT_H
#include "class/ompi_list.h"
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#include "datatype/datatype.h"
#include "mca/ptl/base/ptl_base_header.h"
@ -15,7 +16,15 @@
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
OMPI_DECLSPEC extern ompi_class_t mca_ptl_base_frag_t_class;
/**
* Type of fragment
*/
typedef enum {
MCA_PTL_FRAGMENT_SEND,
MCA_PTL_FRAGMENT_RECV
} mca_ptl_base_frag_type_t;
/**
* Base type for fragment descriptors.
@ -27,10 +36,13 @@ struct mca_ptl_base_frag_t {
struct mca_ptl_base_peer_t* frag_peer; /**< PTL specific addressing info */
void *frag_addr; /**< pointer into request buffer at fragment offset */
size_t frag_size; /**< number of bytes available in request buffer */
mca_ptl_base_frag_type_t frag_type; /**< fragment derived type */
ompi_convertor_t frag_convertor; /**< datatype convertor for fragment packing/unpacking */
};
typedef struct mca_ptl_base_frag_t mca_ptl_base_frag_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_ptl_base_frag_t);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -24,8 +24,8 @@
#define MCA_PTL_HDR_TYPE_FIN 5
#define MCA_PTL_HDR_TYPE_FIN_ACK 6
#define MCA_PTL_FLAGS_ACK_MATCHED 1
#define MCA_PTL_FLAGS_ACK_AGGREGATE 2
#define MCA_PTL_FLAGS_ACK 1 /* is an ack required */
#define MCA_PTL_FLAGS_NBO 2 /* is the header in network byte order */
/*
@ -99,16 +99,20 @@ struct mca_ptl_base_frag_header_t {
typedef struct mca_ptl_base_frag_header_t mca_ptl_base_frag_header_t;
#define MCA_PTL_BASE_FRAG_HDR_NTOH(h) \
do { \
MCA_PTL_BASE_COMMON_HDR_NTOH((h).hdr_common); \
(h).hdr_frag_length = ntohl((h).hdr_frag_length); \
(h).hdr_frag_offset = ntohl((h).hdr_frag_offset); \
(h).hdr_frag_seq = ntoh64((h).hdr_frag_seq)
(h).hdr_frag_seq = ntoh64((h).hdr_frag_seq); \
} while (0)
#define MCA_PTL_BASE_FRAG_HDR_HTON(h) \
do { \
MCA_PTL_BASE_COMMON_HDR_HTON((h).hdr_common); \
(h).hdr_frag_length = htonl((h).hdr_frag_length); \
(h).hdr_frag_offset = htonl((h).hdr_frag_offset); \
(h).hdr_frag_seq = hton64((h).hdr_frag_seq)
(h).hdr_frag_seq = hton64((h).hdr_frag_seq); \
} while (0)
/**
* Header definition for the first fragment, contains the additional
@ -126,22 +130,26 @@ struct mca_ptl_base_match_header_t {
typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t;
#define MCA_PTL_BASE_MATCH_HDR_NTOH(h) \
do { \
MCA_PTL_BASE_FRAG_HDR_NTOH((h).hdr_frag); \
(h).hdr_contextid = ntohl((h).hdr_contextid); \
(h).hdr_src = ntohl((h).hdr_src); \
(h).hdr_dst = ntohl((h).hdr_dst); \
(h).hdr_tag = ntohl((h).hdr_tag); \
(h).hdr_msg_length = ntohl((h).hdr_msg_length); \
(h).hdr_msg_seq = ntoh64((h).hdr_msg_seq)
(h).hdr_msg_seq = ntoh64((h).hdr_msg_seq); \
} while (0)
#define MCA_PTL_BASE_MATCH_HDR_HTON(h) \
do { \
MCA_PTL_BASE_FRAG_HDR_HTON((h).hdr_frag); \
(h).hdr_contextid = htonl((h).hdr_contextid); \
(h).hdr_src = htonl((h).hdr_src); \
(h).hdr_dst = htonl((h).hdr_dst); \
(h).hdr_tag = htonl((h).hdr_tag); \
(h).hdr_msg_length = htonl((h).hdr_msg_length); \
(h).hdr_msg_seq = hton64((h).hdr_msg_seq)
(h).hdr_msg_seq = hton64((h).hdr_msg_seq); \
} while (0)
/**
@ -158,12 +166,16 @@ struct mca_ptl_base_ack_header_t {
typedef struct mca_ptl_base_ack_header_t mca_ptl_base_ack_header_t;
#define MCA_PTL_BASE_ACK_HDR_NTOH(h) \
do { \
MCA_PTL_BASE_COMMON_HDR_NTOH(h.hdr_common); \
(h).hdr_dst_size = ntohl((h).hdr_dst_size)
(h).hdr_dst_size = ntohl((h).hdr_dst_size); \
} while (0)
#define MCA_PTL_BASE_ACK_HDR_HTON(h) \
do { \
MCA_PTL_BASE_COMMON_HDR_HTON((h).hdr_common); \
(h).hdr_dst_size = htonl((h).hdr_dst_size)
(h).hdr_dst_size = htonl((h).hdr_dst_size); \
} while (0)
/**
* Union of defined header types.

Просмотреть файл

@ -12,16 +12,17 @@ static void mca_ptl_base_recv_frag_construct(mca_ptl_base_recv_frag_t* frag);
static void mca_ptl_base_recv_frag_destruct(mca_ptl_base_recv_frag_t* frag);
ompi_class_t mca_ptl_base_recv_frag_t_class = {
"mca_ptl_base_recv_frag_t",
OBJ_CLASS(mca_ptl_base_frag_t),
(ompi_construct_t) mca_ptl_base_recv_frag_construct,
(ompi_destruct_t) mca_ptl_base_recv_frag_destruct
};
OBJ_CLASS_INSTANCE(
mca_ptl_base_recv_frag_t,
mca_ptl_base_frag_t,
mca_ptl_base_recv_frag_construct,
mca_ptl_base_recv_frag_destruct
);
void mca_ptl_base_recv_frag_construct(mca_ptl_base_recv_frag_t* frag)
{
frag->frag_base.frag_type = MCA_PTL_FRAGMENT_RECV;
}
void mca_ptl_base_recv_frag_destruct(mca_ptl_base_recv_frag_t* frag)

Просмотреть файл

@ -19,6 +19,7 @@ ompi_class_t mca_ptl_base_send_frag_t_class = {
static void mca_ptl_base_send_frag_construct(mca_ptl_base_send_frag_t* frag)
{
frag->frag_base.frag_type = MCA_PTL_FRAGMENT_SEND;
}
static void mca_ptl_base_send_frag_destruct(mca_ptl_base_send_frag_t* frag)

Просмотреть файл

@ -395,7 +395,7 @@ mca_ptl_elan_matched (mca_ptl_base_module_t * ptl,
request = frag->frag_request;
recv_frag = (mca_ptl_elan_recv_frag_t * ) frag;
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) {
int desc_type ;
/* Basic ACK scheme following TCP cases */
mca_ptl_elan_send_frag_t *desc;

Просмотреть файл

@ -184,8 +184,7 @@ mca_ptl_elan_send_desc_done (
if(NULL == req) { /* An ack descriptor */
OMPI_FREE_LIST_RETURN (&ptl->queue->tx_desc_free,
(ompi_list_item_t *) frag);
} else if (0 == (header->hdr_common.hdr_flags
& MCA_PTL_FLAGS_ACK_MATCHED)
} else if (0 == (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK)
|| mca_pml_base_send_request_matched(req)) {
if(ompi_atomic_fetch_and_set_int (&frag->frag_progressed, 1) == 0)
{

Просмотреть файл

@ -364,7 +364,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
A_PRINT("inside match, the matched request is %p\n", request);
gm_ptl = (mca_ptl_gm_module_t *)ptl;
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED)
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK)
{
/* need to send an ack back */
recv_frag = (mca_ptl_gm_recv_frag_t *) frag;

Просмотреть файл

@ -252,7 +252,7 @@ void send_callback(struct gm_port *port,void * context, gm_status_t status)
OMPI_FREE_LIST_RETURN(&(ptl->gm_send_frags), ((ompi_list_item_t *) frag));
} */
else if (0 == (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED)
else if (0 == (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK)
|| mca_pml_base_send_request_matched(gm_send_req))
{
A_PRINT(" send callback : match not required\n");

Просмотреть файл

@ -383,7 +383,7 @@ void mca_ptl_ib_matched(mca_ptl_base_module_t* module,
D_PRINT("Matched frag\n");
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) {
mca_ptl_ib_send_frag_t *send_frag;
send_frag = mca_ptl_ib_alloc_send_frag(module, NULL);

Просмотреть файл

@ -301,8 +301,7 @@ void mca_ptl_ib_process_send_comp(mca_ptl_base_module_t *module,
/* An ack descriptor ? Don't know what to do! */
OMPI_FREE_LIST_RETURN(flist,
((ompi_list_item_t *) sendfrag));
} else if (0 == (header->hdr_common.hdr_flags
& MCA_PTL_FLAGS_ACK_MATCHED)
} else if (0 == (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK)
|| mca_pml_base_send_request_matched(req)) {
module->ptl_send_progress(module,

Просмотреть файл

@ -12,6 +12,7 @@
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#include "ptl_mx.h"
#include "ptl_mx_peer.h"
#include "ptl_mx_sendfrag.h"
#include "ptl_mx_recvfrag.h"
@ -21,9 +22,9 @@ mca_ptl_mx_module_t mca_ptl_mx_module = {
&mca_ptl_mx_component.super,
8, /* ptl_cache_size */
sizeof(mca_ptl_mx_send_frag_t), /* ptl_cache_bytes */
0, /* ptl_frag_first_size */
(32 * 1024) - sizeof(mca_ptl_base_header_t), /* ptl_frag_first_size */
0, /* ptl_frag_min_size */
0, /* ptl_frag_max_size */
-1, /* ptl_frag_max_size */
0, /* ptl_exclusivity */
0, /* ptl_latency */
0, /* ptl_bandwidth */
@ -110,7 +111,7 @@ void mca_ptl_mx_request_fini(struct mca_ptl_base_module_t* ptl, mca_pml_base_sen
* The PTL is responsible for updating the current data offset (req_offset) in the
* request to reflect the actual number of bytes fragmented. This may be less than
* the requested size, due to resource constraints or datatype alighnment/offset. If
* an acknowledgment is required, the MCA_PTL_FLAGS_ACK_MATCHED bit will be set in the
* an acknowledgment is required, the MCA_PTL_FLAGS_ACK bit will be set in the
* flags parameter. In this case, the PTL should not call ptl_send_progress() function
* to indicate completion of the fragment until the ack is received. For all other
* fragments ptl_send_progress() may be called based on local completion semantics.
@ -124,7 +125,27 @@ int mca_ptl_mx_send(
size_t size,
int flags)
{
return OMPI_ERROR;
mca_ptl_mx_module_t* ptl_mx = (mca_ptl_mx_module_t*)ptl;
mca_ptl_mx_send_frag_t* sendfrag;
int rc;
if (offset == 0 && sendreq->req_cached) {
sendfrag = (mca_ptl_mx_send_frag_t*)(sendreq+1);
} else {
ompi_list_item_t* item;
OMPI_FREE_LIST_GET(&mca_ptl_mx_component.mx_send_frags, item, rc);
if(NULL == (sendfrag = (mca_ptl_mx_send_frag_t*)item))
return rc;
}
rc = mca_ptl_mx_send_frag_init(sendfrag, ptl_peer, sendreq, offset, &size, flags);
if(rc != OMPI_SUCCESS)
return rc;
/* must update the offset after actual fragment size is determined
* before attempting to send the fragment
*/
sendreq->req_offset += size;
return mca_ptl_mx_send_frag_start(sendfrag, ptl_mx);
}
@ -141,7 +162,7 @@ int mca_ptl_mx_send(
* when a matching receive is posted.
*
* When this routine is called, the PTL is responsible for generating
* an acknowledgment to the peer if the MCA_PTL_FLAGS_ACK_MATCHED
* an acknowledgment to the peer if the MCA_PTL_FLAGS_ACK
* bit is set in the original fragment header. Additionally, the PTL
* is responsible for transferring any data associated with the fragment
* into the users buffer utilizing the datatype engine, and notifying
@ -153,6 +174,66 @@ void mca_ptl_mx_matched(
mca_ptl_base_module_t* ptl,
mca_ptl_base_recv_frag_t* frag)
{
mca_ptl_base_header_t* hdr = &frag->frag_base.frag_header;
mca_pml_base_recv_request_t* request = frag->frag_request;
mca_ptl_mx_module_t* mx_ptl = (mca_ptl_mx_module_t*)ptl;
mca_ptl_mx_recv_frag_t* mx_frag = (mca_ptl_mx_recv_frag_t*)frag;
unsigned int bytes_delivered = hdr->hdr_frag.hdr_frag_length;
bool ack_pending = false;
/* generate an acknowledgment if required */
if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) {
int rc;
mca_ptl_mx_send_frag_t* ack;
MCA_PTL_MX_SEND_FRAG_ALLOC(ack, rc);
if(NULL == ack) {
OMPI_THREAD_LOCK(&mca_ptl_mx_component.mx_lock);
ack_pending = true;
ompi_list_append(&mca_ptl_mx_component.mx_pending_acks, (ompi_list_item_t*)frag);
OMPI_THREAD_UNLOCK(&mca_ptl_mx_component.mx_lock);
} else {
mca_ptl_mx_send_frag_init_ack(ack, mx_ptl, mx_frag);
if(ack->frag_send.frag_base.frag_peer->peer_byte_swap) {
MCA_PTL_BASE_ACK_HDR_HTON(ack->frag_send.frag_base.frag_header.hdr_ack);
}
mca_ptl_mx_send_frag_start(ack, mx_ptl);
}
}
/* copy data into users buffer */
if(hdr->hdr_frag.hdr_frag_length > 0) {
struct iovec iov;
unsigned int iov_count = 1;
int free_after = 0;
ompi_proc_t *proc = ompi_comm_peer_lookup(request->req_base.req_comm,
request->req_base.req_ompi.req_status.MPI_SOURCE);
ompi_convertor_t* convertor = &frag->frag_base.frag_convertor;
/* initialize receive convertor */
ompi_convertor_copy(proc->proc_convertor, convertor);
ompi_convertor_init_for_recv(
convertor, /* convertor */
0, /* flags */
request->req_base.req_datatype, /* datatype */
request->req_base.req_count, /* count elements */
request->req_base.req_addr, /* users buffer */
hdr->hdr_frag.hdr_frag_offset, /* offset in bytes into packed buffer */
NULL ); /* not allocating memory */
iov.iov_base = mx_frag->frag_data;
iov.iov_len = hdr->hdr_frag.hdr_frag_length;
ompi_convertor_unpack(convertor, &iov, &iov_count, &bytes_delivered, &free_after );
}
/* update request status */
ptl->ptl_recv_progress(
ptl,
request,
hdr->hdr_frag.hdr_frag_length,
bytes_delivered);
/* release resources */
if(ack_pending == false)
MCA_PTL_MX_RECV_FRAG_RETURN(mx_frag);
}

Просмотреть файл

@ -20,17 +20,19 @@
* Myricom MX PTL component.
*/
struct mca_ptl_mx_component_t {
mca_ptl_base_component_1_0_0_t super; /**< base PTL component */
int mx_free_list_num; /**< initial size of free lists */
int mx_free_list_max; /**< maximum size of free lists */
int mx_free_list_inc; /**< number of elements to growing free lists by */
uint32_t mx_filter; /**< filter assigned to application */
uint32_t mx_num_ptls; /**< number of MX NICs available to app */
struct mca_ptl_mx_module_t** mx_ptls; /**< array of available PTL moduless */
ompi_free_list_t mx_send_frags; /**< free list of mx send fragments */
ompi_free_list_t mx_recv_frags; /**< free list of mx recv fragments */
ompi_hash_table_t mx_procs; /**< hash table of procs */
ompi_mutex_t mx_lock; /**< lock for accessing module state */
mca_ptl_base_component_1_0_0_t super; /**< base PTL component */
int mx_free_list_num; /**< initial size of free lists */
int mx_free_list_max; /**< maximum size of free lists */
int mx_free_list_inc; /**< number of elements to growing free lists by */
int mx_prepost; /**< number of preposted recvs */
uint32_t mx_filter; /**< filter assigned to application */
uint32_t mx_num_ptls; /**< number of MX NICs available to app */
struct mca_ptl_mx_module_t** mx_ptls; /**< array of available PTL moduless */
ompi_free_list_t mx_send_frags; /**< free list of mx send fragments */
ompi_free_list_t mx_recv_frags; /**< free list of mx recv fragments */
ompi_hash_table_t mx_procs; /**< hash table of procs */
ompi_list_t mx_pending_acks; /**< queue of pending sends */
ompi_mutex_t mx_lock; /**< lock for accessing module state */
};
typedef struct mca_ptl_mx_component_t mca_ptl_mx_component_t;
@ -122,6 +124,10 @@ struct mca_ptl_mx_module_t {
uint32_t mx_endpoint_id; /**< endpoint ID */
mx_endpoint_t mx_endpoint; /**< endpoint */
mx_endpoint_addr_t mx_endpoint_addr; /**< endpoint address */
#if OMPI_HAVE_THREADS
ompi_thread_t mx_thread; /**< thread for progressing outstanding requests */
bool mx_thread_run; /**< flag to indicate thread status */
#endif
};
typedef struct mca_ptl_mx_module_t mca_ptl_mx_module_t;
@ -263,7 +269,7 @@ extern void mca_ptl_mx_request_fini(
* when a matching receive is posted.
*
* When this routine is called, the PTL is responsible for generating
* an acknowledgment to the peer if the MCA_PTL_FLAGS_ACK_MATCHED
* an acknowledgment to the peer if the MCA_PTL_FLAGS_ACK
* bit is set in the original fragment header. Additionally, the PTL
* is responsible for transferring any data associated with the fragment
* into the users buffer utilizing the datatype engine, and notifying
@ -294,7 +300,7 @@ extern void mca_ptl_mx_matched(
* The PTL is responsible for updating the current data offset (req_offset) in the
* request to reflect the actual number of bytes fragmented. This may be less than
* the requested size, due to resource constraints or datatype alighnment/offset. If
* an acknowledgment is required, the MCA_PTL_FLAGS_ACK_MATCHED bit will be set in the
* an acknowledgment is required, the MCA_PTL_FLAGS_ACK bit will be set in the
* flags parameter. In this case, the PTL should not call ptl_send_progress() function
* to indicate completion of the fragment until the ack is received. For all other
* fragments ptl_send_progress() may be called based on local completion semantics.

Просмотреть файл

@ -4,6 +4,7 @@
#include "ompi_config.h"
#include "include/constants.h"
#include "util/output.h"
#include "threads/thread.h"
#include "ptl_mx.h"
#include "ptl_mx_recvfrag.h"
#include "ptl_mx_sendfrag.h"
@ -86,10 +87,14 @@ int mca_ptl_mx_component_open(void)
OBJ_CONSTRUCT(&mca_ptl_mx_component.mx_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_ptl_mx_component.mx_send_frags, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_mx_component.mx_recv_frags, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_mx_component.mx_procs, ompi_hash_table_t);
OBJ_CONSTRUCT(&mca_ptl_mx_component.mx_pending_acks, ompi_hash_table_t);
/* register MX module parameters */
mca_ptl_mx_module.mx_filter =
mca_ptl_mx_component.mx_filter =
(uint32_t)mca_ptl_mx_param_register_int("filter", 0xdeadbeef);
mca_ptl_mx_component.mx_prepost =
mca_ptl_mx_param_register_int("prepost", 16);
mca_ptl_mx_component.mx_free_list_num =
mca_ptl_mx_param_register_int("free_list_num", 256);
mca_ptl_mx_component.mx_free_list_max =
@ -130,6 +135,7 @@ int mca_ptl_mx_component_close(void)
OBJ_DESTRUCT(&mca_ptl_mx_component.mx_send_frags);
OBJ_DESTRUCT(&mca_ptl_mx_component.mx_recv_frags);
OBJ_DESTRUCT(&mca_ptl_mx_component.mx_lock);
OBJ_DESTRUCT(&mca_ptl_mx_component.mx_pending_acks);
return OMPI_SUCCESS;
}
@ -163,6 +169,9 @@ mca_ptl_base_module_t** mca_ptl_mx_component_init(
mca_ptl_mx_component.mx_free_list_inc,
NULL); /* use default allocator */
/* intialize process hash table */
ompi_hash_table_init(&mca_ptl_mx_component.mx_procs, 256);
/* initialize mx ptls */
if(OMPI_SUCCESS != mca_ptl_mx_module_init())
return NULL;

Просмотреть файл

@ -3,6 +3,8 @@
#include "util/output.h"
#include "ptl_mx_peer.h"
#include "ptl_mx_proc.h"
#include "ptl_mx_recvfrag.h"
#include "ptl_mx_sendfrag.h"
static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr);
@ -43,13 +45,13 @@ int mca_ptl_mx_module_init(void)
return OMPI_ERR_OUT_OF_RESOURCE;
if((status = mx_get_info(
NULL,
MX_NIC_COUNT,
MX_NIC_IDS,
nic_addrs,
size)) != MX_SUCCESS) {
free(nic_addrs);
return OMPI_ERR_INIT;
}
/* allocate an array of pointers to ptls */
mca_ptl_mx_component.mx_ptls = (mca_ptl_mx_module_t**)malloc(
sizeof(mca_ptl_mx_module_t*) * mca_ptl_mx_component.mx_num_ptls);
@ -57,7 +59,7 @@ int mca_ptl_mx_module_init(void)
ompi_output(0, "mca_ptl_mx_init: malloc() failed\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* create a ptl for each NIC */
for(i=0; i<mca_ptl_mx_component.mx_num_ptls; i++) {
mca_ptl_mx_module_t* ptl = mca_ptl_mx_create(nic_addrs[i]);
@ -84,10 +86,120 @@ int mca_ptl_mx_module_init(void)
endpoint_addrs,
mca_ptl_mx_component.mx_num_ptls * sizeof(mx_endpoint_addr_t))) != OMPI_SUCCESS)
return rc;
return OMPI_SUCCESS;
}
/**
* Prepost recv buffers
*/
static inline int mca_ptl_mx_post(mca_ptl_mx_module_t* ptl)
{
mca_ptl_mx_recv_frag_t* frag;
mx_return_t status;
int rc;
/* post an additional recv */
MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_ptl_mx_thread: unable to allocate recv frag\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_ptl_mx_recv_frag_init(frag, ptl);
status = mx_irecv(
ptl->mx_endpoint,
frag->frag_segments,
frag->frag_segment_count,
1,
MX_MATCH_MASK_NONE,
frag,
&frag->frag_request);
if(status != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_post: mx_irecv() failed with status=%d\n", status);
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/**
* Routine to process complete request(s).
*/
static void mca_ptl_mx_progress(mca_ptl_mx_module_t* ptl, mx_request_t mx_request)
{
mx_return_t mx_return;
mx_status_t mx_status;
uint32_t mx_result;
mca_ptl_base_frag_t* frag;
mx_return = mx_test(
ptl->mx_endpoint,
&mx_request,
&mx_status,
&mx_result);
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_progress: mx_test() failed with status=%d\n", mx_return);
return;
}
frag = (mca_ptl_base_frag_t*)mx_status.context;
switch(frag->frag_type) {
case MCA_PTL_FRAGMENT_SEND:
{
mca_ptl_mx_send_frag_handler((mca_ptl_mx_send_frag_t*)frag, ptl);
break;
}
case MCA_PTL_FRAGMENT_RECV:
{
mca_ptl_mx_recv_frag_handler((mca_ptl_mx_recv_frag_t*)frag, ptl);
mca_ptl_mx_post(ptl);
break;
}
default:
{
ompi_output(0, "mca_ptl_mx_progress: invalid request type: %d\n", frag->frag_type);
break;
}
}
}
/**
* Thread to progress outstanding requests.
*/
#if OMPI_HAVE_THREADS
static void* mca_ptl_mx_thread(ompi_object_t *arg)
{
ompi_thread_t* thr = (ompi_thread_t*)arg;
mca_ptl_mx_module_t* ptl = thr->t_arg;
while(ptl->mx_thread_run) {
mx_request_t mx_request;
mx_return_t mx_return;
uint32_t mx_result;
/* block waiting for status */
mx_return = mx_peek(
ptl->mx_endpoint,
UINT_MAX,
&mx_request,
&mx_result);
if(mx_return == MX_TIMEOUT)
continue;
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_thread: mx_probe() failed with status %d\n",
mx_return);
break;
}
/* process the pending request */
mca_ptl_mx_progress(ptl, mx_request);
}
return NULL;
}
#endif
/*
* Create and intialize an MX PTL module, where each module
@ -98,11 +210,13 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
{
mca_ptl_mx_module_t* ptl = malloc(sizeof(mca_ptl_mx_module_t));
mx_return_t status;
int i;
if(NULL == ptl)
return NULL;
/* copy over default settings */
memcpy(ptl, &mca_ptl_mx_module, sizeof(mca_ptl_mx_module_t));
OBJ_CONSTRUCT(&ptl->mx_peers, ompi_list_t);
/* open local endpoint */
status = mx_open_endpoint(
@ -114,7 +228,7 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
&ptl->mx_endpoint);
if(status != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_init: mx_open_endpoint() failed with status=%d\n", status);
free(ptl);
mca_ptl_mx_finalize(&ptl->super);
return NULL;
}
@ -123,7 +237,7 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
ptl->mx_endpoint,
&ptl->mx_endpoint_addr)) != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_init: mx_get_endpoint_addr() failed with status=%d\n", status);
free(ptl);
mca_ptl_mx_finalize(&ptl->super);
return NULL;
}
@ -134,9 +248,30 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
&ptl->mx_endpoint_id,
&ptl->mx_filter)) != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_init: mx_decompose_endpoint_addr() failed with status=%d\n", status);
mca_ptl_mx_finalize(&ptl->super);
return NULL;
}
/* pre-post receive buffers */
for(i=0; i<mca_ptl_mx_component.mx_prepost; i++) {
if(mca_ptl_mx_post(ptl) != OMPI_SUCCESS) {
mca_ptl_mx_finalize(&ptl->super);
return NULL;
}
}
#if OMPI_HAVE_THREADS
/* create a thread to progress requests */
OBJ_CONSTRUCT(&ptl->mx_thread, ompi_thread_t);
ptl->mx_thread.t_run = mca_ptl_mx_thread;
ptl->mx_thread.t_arg = ptl;
ptl->mx_thread_run = true;
if(ompi_thread_start(&ptl->mx_thread) != OMPI_SUCCESS) {
ompi_output(0, "mca_ptl_mx_create: unable to start progress thread.\n");
free(ptl);
return NULL;
}
#endif
return ptl;
}
@ -148,6 +283,11 @@ static mca_ptl_mx_module_t* mca_ptl_mx_create(uint64_t addr)
int mca_ptl_mx_finalize(struct mca_ptl_base_module_t* ptl)
{
mca_ptl_mx_module_t* ptl_mx = (mca_ptl_mx_module_t*)ptl;
#if OMPI_HAVE_THREADS
ptl_mx->mx_thread_run = false;
ompi_thread_join(&ptl_mx->mx_thread, NULL);
#endif
free(ptl_mx);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -16,17 +16,93 @@
*/
struct mca_ptl_mx_recv_frag_t {
mca_ptl_base_recv_frag_t frag_recv; /**< base receive fragment descriptor */
mx_request_t frag_request;
mx_segment_t frag_segments[2];
uint32_t frag_segment_count;
unsigned char frag_data[32768];
};
typedef struct mca_ptl_mx_recv_frag_t mca_ptl_mx_recv_frag_t;
OBJ_CLASS_DECLARATION(mca_ptl_mx_recv_frag_t);
#define MCA_PTL_MX_RECV_FRAG_ALLOC(frag, rc) \
#define MCA_PTL_MX_RECV_FRAG_ALLOC(recvfrag, rc) \
{ \
ompi_list_item_t* item; \
OMPI_FREE_LIST_GET(&mca_ptl_mx_component.mx_recv_frags, item, rc); \
frag = (mca_ptl_mx_recv_frag_t*)item; \
recvfrag = (mca_ptl_mx_recv_frag_t*)item; \
}
#define MCA_PTL_MX_RECV_FRAG_RETURN(recvfrag) \
OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_recv_frags, (ompi_list_item_t*)recvfrag);
/**
*
*/
static inline void mca_ptl_mx_recv_frag_init(
mca_ptl_mx_recv_frag_t* frag,
mca_ptl_mx_module_t* ptl)
{
frag->frag_recv.frag_base.frag_owner = &ptl->super;
frag->frag_recv.frag_base.frag_peer = NULL;
frag->frag_segment_count = 2;
frag->frag_segments[0].segment_ptr = &frag->frag_recv.frag_base.frag_header;
frag->frag_segments[0].segment_length = sizeof(frag->frag_recv.frag_base.frag_header);
frag->frag_segments[1].segment_ptr = frag->frag_data;
frag->frag_segments[1].segment_length = sizeof(frag->frag_data);
}
/**
*
*/
static inline void mca_ptl_mx_recv_frag_handler(
mca_ptl_mx_recv_frag_t* frag,
mca_ptl_mx_module_t* ptl)
{
mca_ptl_base_header_t* hdr = &frag->frag_recv.frag_base.frag_header;
switch(hdr->hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
{
if(hdr->hdr_common.hdr_flags & MCA_PTL_FLAGS_NBO) {
MCA_PTL_BASE_MATCH_HDR_NTOH(hdr->hdr_match);
}
ptl->super.ptl_match(&ptl->super, &frag->frag_recv, &hdr->hdr_match);
break;
}
case MCA_PTL_HDR_TYPE_FRAG:
break;
case MCA_PTL_HDR_TYPE_ACK:
break;
}
}
/**
*
*/
/**
*
*/
static inline void mca_ptl_mx_recv_frag_progress(
mca_ptl_mx_recv_frag_t* frag,
mca_ptl_mx_module_t* ptl)
{
/* copy data into user buffer */
/* update request status */
ptl->super.ptl_recv_progress(
&ptl->super,
frag->frag_recv.frag_request,
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length,
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length);
}
#endif

Просмотреть файл

@ -2,6 +2,7 @@
* $HEADER$
*/
#include "ptl_mx.h"
#include "ptl_mx_peer.h"
#include "ptl_mx_sendfrag.h"
@ -29,3 +30,138 @@ static void mca_ptl_mx_send_frag_destruct(mca_ptl_mx_send_frag_t* frag)
{
}
static void *mca_ptl_mx_alloc(size_t *size)
{
return malloc(*size);
}
/*
* Initialize the fragment based on the current offset into the users
* data buffer, and the indicated size.
*/
int mca_ptl_mx_send_frag_init(
mca_ptl_mx_send_frag_t* sendfrag,
mca_ptl_mx_peer_t* ptl_peer,
mca_pml_base_send_request_t* sendreq,
size_t offset,
size_t* size,
int flags)
{
/* message header */
size_t size_in = *size;
size_t size_out;
unsigned int iov_count, max_data;
mca_ptl_base_header_t* hdr = &sendfrag->frag_send.frag_base.frag_header;
sendfrag->frag_segments[0].segment_ptr = hdr;
sendfrag->frag_segments[0].segment_length = sizeof(mca_ptl_base_header_t);
if(offset == 0) {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t);
hdr->hdr_frag.hdr_frag_offset = offset;
hdr->hdr_frag.hdr_frag_seq = 0;
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
}
sendfrag->frag_free = 0;
/* initialize convertor */
if(size_in > 0) {
ompi_convertor_t *convertor;
struct iovec iov;
int rc;
convertor = &sendfrag->frag_send.frag_base.frag_convertor;
ompi_convertor_copy(&sendreq->req_convertor, convertor);
ompi_convertor_init_for_send(
convertor,
0,
sendreq->req_datatype,
sendreq->req_count,
sendreq->req_addr,
offset,
mca_ptl_mx_alloc );
/* if data is contigous convertor will return an offset
* into users buffer - otherwise will return an allocated buffer
* that holds the packed data
*/
iov.iov_base = NULL;
iov.iov_len = size_in;
iov_count = 1;
max_data = size_in;
if((rc = ompi_convertor_pack(
convertor,
&iov,
&iov_count,
&max_data,
&(sendfrag->frag_free))) < 0) {
return OMPI_ERROR;
}
/* adjust the freeAfter as the position zero is reserved for the header */
sendfrag->frag_free <<= 1;
sendfrag->frag_segments[1].segment_ptr = iov.iov_base;
sendfrag->frag_segments[1].segment_length = size_out;
sendfrag->frag_segment_count = 2;
sendfrag->frag_send.frag_base.frag_addr = iov.iov_base;
/* adjust size and request offset to reflect actual number of bytes packed by convertor */
size_out = iov.iov_len;
} else {
size_out = size_in;
sendfrag->frag_send.frag_base.frag_addr = NULL;
sendfrag->frag_send.frag_base.frag_size = 0;
sendfrag->frag_segment_count = 1;
}
hdr->hdr_frag.hdr_frag_length = size_out;
/* convert to network byte order if required */
if(ptl_peer->peer_byte_swap) {
hdr->hdr_common.hdr_flags |= MCA_PTL_FLAGS_NBO;
if(offset == 0) {
MCA_PTL_BASE_MATCH_HDR_HTON(hdr->hdr_match);
} else {
MCA_PTL_BASE_FRAG_HDR_HTON(hdr->hdr_frag);
}
}
/* fragment state */
sendfrag->frag_send.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->frag_send.frag_request = sendreq;
sendfrag->frag_send.frag_base.frag_size = size_out;
sendfrag->frag_send.frag_base.frag_peer = ptl_peer;
*size = size_out;
return OMPI_SUCCESS;
}
void mca_ptl_mx_send_frag_init_ack(
mca_ptl_mx_send_frag_t* ack,
mca_ptl_mx_module_t* ptl,
struct mca_ptl_mx_recv_frag_t* recv_frag)
{
}

Просмотреть файл

@ -12,6 +12,7 @@
#include "mca/pml/base/pml_base_sendreq.h"
#include "mca/ptl/base/ptl_base_sendfrag.h"
#include "ptl_mx.h"
#include "ptl_mx_peer.h"
/**
@ -19,16 +20,84 @@
*/
struct mca_ptl_mx_send_frag_t {
mca_ptl_base_send_frag_t frag_send; /**< base send fragment descriptor */
int frag_free;
mx_request_t frag_request;
mx_segment_t frag_segments[2];
size_t frag_segment_count;
};
typedef struct mca_ptl_mx_send_frag_t mca_ptl_mx_send_frag_t;
#define MCA_PTL_MX_SEND_FRAG_ALLOC(item, rc) \
OMPI_FREE_LIST_GET(&mca_ptl_tcp_component.mx_send_frags, item, rc);
#define MCA_PTL_MX_SEND_FRAG_ALLOC(sendfrag, rc) \
{ \
ompi_list_item_t* item; \
OMPI_FREE_LIST_GET(&mca_ptl_mx_component.mx_send_frags, item, rc); \
sendfrag = (mca_ptl_mx_send_frag_t*)item; \
}
#define MCA_PTL_MX_SEND_FRAG_RETURN(item) \
OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_send_frags, item);
#define MCA_PTL_MX_SEND_FRAG_RETURN(sendfrag) \
OMPI_FREE_LIST_RETURN(&mca_ptl_mx_component.mx_send_frags, (ompi_list_item_t*)sendfrag);
OBJ_CLASS_DECLARATION(mca_ptl_mx_send_frag_t);
/*
* Initialize the fragment based on the current offset into the users
* data buffer, and the indicated size.
*/
int mca_ptl_mx_send_frag_init(
mca_ptl_mx_send_frag_t* sendfrag,
struct mca_ptl_base_peer_t* ptl_peer,
mca_pml_base_send_request_t* sendreq,
size_t offset,
size_t* size,
int flags);
/*
* Start the MX send for the fragment.
*/
static inline int mca_ptl_mx_send_frag_start(
mca_ptl_mx_send_frag_t* sendfrag,
mca_ptl_mx_module_t* ptl)
{
mx_return_t mx_return = mx_isend(
ptl->mx_endpoint,
sendfrag->frag_segments,
sendfrag->frag_segment_count,
sendfrag->frag_send.frag_base.frag_peer->peer_addr,
1,
sendfrag,
&sendfrag->frag_request);
if(mx_return != MX_SUCCESS) {
ompi_output(0, "mca_ptl_mx_send_frag_start: mx_isend() failed with return value=%d\n", mx_return);
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/**
* Callback on MX send completion.
*/
static inline void mca_ptl_mx_send_frag_handler(
mca_ptl_mx_send_frag_t* sendfrag,
mca_ptl_mx_module_t* ptl)
{
ptl->super.ptl_send_progress(
&ptl->super,
sendfrag->frag_send.frag_request,
sendfrag->frag_send.frag_base.frag_size);
if(sendfrag->frag_send.frag_base.frag_header.hdr_frag.hdr_frag_offset != 0)
MCA_PTL_MX_SEND_FRAG_RETURN(sendfrag);
}
void mca_ptl_mx_send_frag_init_ack(
mca_ptl_mx_send_frag_t* ack,
mca_ptl_mx_module_t* ptl,
struct mca_ptl_mx_recv_frag_t* recv_frag);
#endif

Просмотреть файл

@ -124,7 +124,7 @@
* the request will be scheduled upon an acknowledgment from the peer
* that the request has been matched on the receive side. The PTL receiving
* the fragment is responsible for generating an acknowledgment when the
* MCA_PTL_FLAGS_ACK_MATCHED bit is set in the flags field of the fragment
* MCA_PTL_FLAGS_ACK bit is set in the flags field of the fragment
* header. The PTL receiving an ack is responsible for updating the
* the send request descriptor to point to the matched recv descriptor
* and the destination buffer address at the remote process. The address of
@ -187,7 +187,7 @@
* receives if the underlying transport supports scatter/gather operations.
*
* The ptl_matched() function should additionally generate, if required, an
* ack to the source process. An ack is required if the MCA_PTL_FLAGS_ACK_MATCHED
* ack to the source process. An ack is required if the MCA_PTL_FLAGS_ACK
* bit is set by the source in the flags field of the initial message header.
* As described above, the generated ack should contain a pointer to the matched
* receive request, along with the pointer to the destination buffer.
@ -481,7 +481,7 @@ typedef void (*mca_ptl_base_module_request_fini_fn_t)(
* The PTL is responsible for updating the current data offset (req_offset) in the
* request to reflect the actual number of bytes fragmented. This may be less than
* the requested size, due to resource constraints or datatype alighnment/offset. If
* an acknowledgment is required, the MCA_PTL_FLAGS_ACK_MATCHED bit will be set in the
* an acknowledgment is required, the MCA_PTL_FLAGS_ACK bit will be set in the
* flags parameter. In this case, the PTL should not call ptl_send_progress() function
* to indicate completion of the fragment until the ack is received. For all other
* fragments ptl_send_progress() may be called based on local completion semantics.
@ -592,7 +592,7 @@ typedef bool (*mca_ptl_base_module_match_fn_t)(
* when a matching receive is posted.
*
* When this routine is called, the PTL is responsible for generating
* an acknowledgment to the peer if the MCA_PTL_FLAGS_ACK_MATCHED
* an acknowledgment to the peer if the MCA_PTL_FLAGS_ACK
* bit is set in the original fragment header. Additionally, the PTL
* is responsible for transferring any data associated with the fragment
* into the users buffer utilizing the datatype engine, and notifying

Просмотреть файл

@ -255,7 +255,7 @@ void mca_ptl_tcp_matched(
{
/* send ack back to peer? */
mca_ptl_base_header_t* header = &frag->frag_base.frag_header;
if(header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
if(header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) {
int rc;
mca_ptl_tcp_send_frag_t* ack;
mca_ptl_tcp_recv_frag_t* recv_frag = (mca_ptl_tcp_recv_frag_t*)frag;

Просмотреть файл

@ -90,7 +90,7 @@ static inline void mca_ptl_tcp_send_frag_progress(mca_ptl_tcp_send_frag_t* frag)
* received, go ahead and update the request status
*/
} else if (frag->frag_vec_cnt == 0 &&
((frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) == 0 ||
((frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) == 0 ||
mca_pml_base_send_request_matched(request))) {
/* make sure this only happens once in threaded case */