1
1
This commit was SVN r6423.
Этот коммит содержится в:
Brian Barrett 2005-07-11 21:00:08 +00:00
родитель 49dbd29034
Коммит aca3abac5d
12 изменённых файлов: 452 добавлений и 86 удалений

Просмотреть файл

@ -33,7 +33,7 @@ AC_DEFUN([MCA_btl_portals_CONFIG_VAL], [
AC_MSG_ERROR([--without-btl-portals-$1 is invalid argument])
;;
*)
$2="$with_m4_bpatsubst([btl-portals-$1], -, _)"
$2="[$with_]m4_bpatsubst([btl-portals-$1], -, _)"
AC_MSG_RESULT([[$]$2])
;;
esac

Просмотреть файл

@ -60,7 +60,7 @@ mca_btl_portals_module_t mca_btl_portals_module = {
mca_btl_portals_prepare_dst,
mca_btl_portals_send,
mca_btl_portals_put,
mca_btl_portals_get,
mca_btl_portals_get
},
};
@ -125,14 +125,30 @@ mca_btl_portals_add_procs(struct mca_btl_base_module_t* btl,
if (NULL != portals_procs) free(portals_procs);
OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock);
if (need_recv_setup) {
/* create eqs */
int i;
for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) {
int ptl_ret = PtlEQAlloc(ptl_btl->portals_ni_h,
ptl_btl->portals_eq_sizes[i],
PTL_EQ_HANDLER_NONE,
&(ptl_btl->portals_eq_handles[i]));
if (PTL_OK != ptl_ret) {
opal_output(mca_btl_portals_component.portals_output,
"Error creating EQ %d: %d", i, ptl_ret);
OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock);
/* BWB - better error code? */
return OMPI_ERROR;
}
}
ret = mca_btl_portals_recv_enable(ptl_btl);
} else {
ret = OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock);
return ret;
}
@ -149,6 +165,9 @@ mca_btl_portals_del_procs(struct mca_btl_base_module_t *btl,
int ret = OMPI_SUCCESS;
bool need_recv_shutdown = false;
opal_output_verbose(100, mca_btl_portals_component.portals_output,
"del_procs called for %ld procs", (long) nprocs);
OPAL_THREAD_LOCK(&ptl_btl->portals_lock);
for (i = 0 ; i < nprocs ; ++i) {
@ -161,14 +180,26 @@ mca_btl_portals_del_procs(struct mca_btl_base_module_t *btl,
need_recv_shutdown = true;
}
OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock);
if (need_recv_shutdown) {
int i;
ret = mca_btl_portals_recv_disable(ptl_btl);
/* destroy eqs */
for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) {
int ptl_ret = PtlEQFree(ptl_btl->portals_eq_handles[i]);
if (PTL_OK != ptl_ret) {
opal_output(mca_btl_portals_component.portals_output,
"Error freeing EQ %d: %d", i, ptl_ret);
}
}
} else {
ret = OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&ptl_btl->portals_lock);
return ret;
}
@ -208,6 +239,7 @@ mca_btl_portals_alloc(struct mca_btl_base_module_t* btl,
}
frag->base.des_flags = 0;
frag->type = MCA_BTL_PORTALS_FRAG_SEND;
return (mca_btl_base_descriptor_t*) frag;
}
@ -217,16 +249,22 @@ int
mca_btl_portals_free(struct mca_btl_base_module_t* btl,
mca_btl_base_descriptor_t* des)
{
mca_btl_portals_module_t* portals_btl = (mca_btl_portals_module_t*) btl;
mca_btl_portals_frag_t* frag = (mca_btl_portals_frag_t*) des;
if (frag->size == 0) {
MCA_BTL_PORTALS_FRAG_RETURN_USER(btl, frag);
} else if (frag->size == btl->btl_eager_limit){
MCA_BTL_PORTALS_FRAG_RETURN_EAGER(btl, frag);
} else if (frag->size == btl->btl_max_send_size) {
MCA_BTL_PORTALS_FRAG_RETURN_MAX(btl, frag);
} else {
return OMPI_ERR_BAD_PARAM;
if (frag->type == MCA_BTL_PORTALS_FRAG_SEND) {
if (frag->size == 0) {
MCA_BTL_PORTALS_FRAG_RETURN_USER(btl, frag);
} else if (frag->size == btl->btl_eager_limit){
MCA_BTL_PORTALS_FRAG_RETURN_EAGER(btl, frag);
} else if (frag->size == btl->btl_max_send_size) {
MCA_BTL_PORTALS_FRAG_RETURN_MAX(btl, frag);
} else {
return OMPI_ERR_BAD_PARAM;
}
} else {
mca_btl_portals_return_chunk_part(portals_btl, frag);
MCA_BTL_PORTALS_FRAG_RETURN_USER(btl, frag);
}
return OMPI_SUCCESS;
@ -238,7 +276,39 @@ mca_btl_portals_finalize(struct mca_btl_base_module_t *btl_base)
{
struct mca_btl_portals_module_t *btl =
(struct mca_btl_portals_module_t *) btl_base;
int ret;
int ret, i;
opal_list_item_t *item;
OPAL_THREAD_LOCK(&ptl_btl->portals_lock);
if (0 != opal_list_get_size(&btl->portals_endpoint_list)) {
OPAL_THREAD_LOCK(&ptl_btl->portals_lock);
while (NULL !=
(item = opal_list_remove_first(&btl->portals_endpoint_list))) {
OBJ_RELEASE(item);
}
/* only do this if there was something in the endpoint list.
otherwise, it has alredy been done. */
/* shut down recv queues */
ret = mca_btl_portals_recv_disable(btl);
/* destroy eqs */
for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) {
int ptl_ret = PtlEQFree(btl->portals_eq_handles[i]);
if (PTL_OK != ptl_ret) {
opal_output(mca_btl_portals_component.portals_output,
"Error freeing EQ %d: %d", i, ptl_ret);
}
}
}
OBJ_DESTRUCT(&btl->portals_endpoint_list);
OBJ_DESTRUCT(&btl->portals_recv_chunks);
OPAL_THREAD_UNLOCK(&btl->portals_lock);
if (PTL_INVALID_HANDLE != btl->portals_ni_h) {
ret = PtlNIFini(btl->portals_ni_h);
@ -248,6 +318,9 @@ mca_btl_portals_finalize(struct mca_btl_base_module_t *btl_base)
return OMPI_ERROR;
}
}
OBJ_DESTRUCT(&btl->portals_lock);
opal_output_verbose(20, mca_btl_portals_component.portals_output,
"successfully finalized module");

Просмотреть файл

@ -99,12 +99,20 @@ struct mca_btl_portals_module_t {
int portals_recv_mds_num;
/* size of each md for first frags */
int portals_recv_mds_size;
/* list of recv chunks */
opal_list_t portals_recv_chunks;
/* size for event queue */
int portals_eq_sizes[MCA_BTL_PORTALS_EQ_SIZE];
/* frag receive event queue */
ptl_handle_eq_t portals_eq_handles[MCA_BTL_PORTALS_EQ_SIZE];
/* "reject" entry for recv match list */
ptl_handle_me_t portals_recv_reject_me_h;
/* number outstanding sends */
volatile int32_t portals_outstanding_sends;
/* our portals network interface */
ptl_handle_ni_t portals_ni_h;
/* the limits returned from PtlNIInit for interface */
@ -137,7 +145,7 @@ int mca_btl_portals_component_progress(void);
* Not part of the BTL interface. Need to be implemented for every
* version of Portals
*/
int mca_btl_portals_init(mca_btl_portals_component_t *comp);
int mca_btl_portals_init_compat(mca_btl_portals_component_t *comp);
/* 4th argument is a ptl_peers array, as that's what we'll get back
from many of the access functions... */

Просмотреть файл

@ -40,7 +40,7 @@ FILE* utcp_api_out;
FILE* utcp_lib_out;
int
mca_btl_portals_init(mca_btl_portals_component_t *comp)
mca_btl_portals_init_compat(mca_btl_portals_component_t *comp)
{
ptl_process_id_t info;
int ret;

Просмотреть файл

@ -123,10 +123,6 @@ mca_btl_portals_component_open(void)
mca_btl_portals_component.portals_ifname =
param_register_string("ifname", "eth0");
#endif
portals_output_stream.lds_verbose_level =
param_register_int("debug_level",
BTL_PORTALS_DEFAULT_DEBUG_LEVEL);
mca_btl_portals_component.portals_free_list_init_num =
param_register_int("free_list_init_num",
BTL_PORTALS_DEFAULT_FREE_LIST_INIT_NUM);
@ -138,24 +134,15 @@ mca_btl_portals_component_open(void)
BTL_PORTALS_DEFAULT_FREE_LIST_INC_NUM);
/* start up debugging output */
portals_output_stream.lds_verbose_level =
param_register_int("debug_level",
BTL_PORTALS_DEFAULT_DEBUG_LEVEL);
asprintf(&(portals_output_stream.lds_prefix),
"btl: portals (%5d): ", getpid());
mca_btl_portals_component.portals_output =
opal_output_open(&portals_output_stream);
/* fill default module state */
mca_btl_portals_module.super.btl_flags = MCA_BTL_FLAGS_SEND;
for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) {
mca_btl_portals_module.portals_eq_sizes[i] = 0;
mca_btl_portals_module.portals_eq_handles[i] = PTL_EQ_NONE;
}
mca_btl_portals_module.portals_ni_h = PTL_INVALID_HANDLE;
mca_btl_portals_module.portals_sr_dropped = 0;
/* get configured state for default module */
mca_btl_portals_module.super.btl_eager_limit =
param_register_int("eager_limit",
BTL_PORTALS_DEFAULT_EAGER_LIMIT);
@ -178,6 +165,36 @@ mca_btl_portals_component_open(void)
mca_btl_portals_module.super.btl_bandwidth =
param_register_int("bandwidth", 1000);
mca_btl_portals_module.super.btl_flags = MCA_BTL_FLAGS_SEND;
bzero(&(mca_btl_portals_module.portals_reg),
sizeof(mca_btl_portals_module.portals_reg));
for (i = 0 ; i < MCA_BTL_PORTALS_EQ_SIZE ; ++i) {
mca_btl_portals_module.portals_eq_sizes[i] = 0;
mca_btl_portals_module.portals_eq_handles[i] = PTL_EQ_NONE;
}
/* eq handles will be created when the module is instantiated.
Set sizes here */
mca_btl_portals_module.portals_eq_sizes[MCA_BTL_PORTALS_EQ_RECV] =
param_register_int("eq_recv_size", BTL_PORTALS_DEFAULT_RECV_QUEUE_SIZE);
/* sends_pending * 3 for start, end, ack */
mca_btl_portals_module.portals_eq_sizes[MCA_BTL_PORTALS_EQ_SEND] =
param_register_int("eq_send_max_pending", BTL_PORTALS_MAX_SENDS_PENDING) * 3;
mca_btl_portals_module.portals_eq_sizes[MCA_BTL_PORTALS_EQ_RDMA] =
param_register_int("eq_rdma_size", 512); /* BWB - FIXME - make param */
mca_btl_portals_module.portals_recv_reject_me_h = PTL_INVALID_HANDLE;
mca_btl_portals_module.portals_recv_mds_num =
param_register_int("recv_md_num", 3); /* BWB - FIXME - make param */
mca_btl_portals_module.portals_recv_mds_size =
param_register_int("recv_md_size", 524288); /* BWB - FIXME - make param */
mca_btl_portals_module.portals_ni_h = PTL_INVALID_HANDLE;
mca_btl_portals_module.portals_sr_dropped = 0;
mca_btl_portals_module.portals_outstanding_sends = 0;
return OMPI_SUCCESS;
}
@ -200,6 +217,7 @@ mca_btl_portals_component_close(void)
free(portals_output_stream.lds_prefix);
}
/* close debugging stream */
opal_output_close(mca_btl_portals_component.portals_output);
mca_btl_portals_component.portals_output = -1;
@ -217,15 +235,15 @@ mca_btl_portals_component_init(int *num_btls,
*num_btls = 0;
if (enable_progress_threads) {
if (enable_progress_threads || enable_mpi_threads) {
opal_output_verbose(20, mca_btl_portals_component.portals_output,
"disabled because progress threads enabled");
"disabled because threads enabled");
return NULL;
}
/* initialize portals btl. note that this is in the compat code because
it's fairly non-portable between implementations */
if (OMPI_SUCCESS != mca_btl_portals_init(&mca_btl_portals_component)) {
if (OMPI_SUCCESS != mca_btl_portals_init_compat(&mca_btl_portals_component)) {
opal_output_verbose(20, mca_btl_portals_component.portals_output,
"disabled because compatibility init failed");
return NULL;
@ -275,6 +293,15 @@ mca_btl_portals_component_init(int *num_btls,
mca_btl_portals_component.portals_free_list_max_num,
mca_btl_portals_component.portals_free_list_inc_num,
NULL);
/* endpoint list */
OBJ_CONSTRUCT(&(ptl_btl->portals_endpoint_list), opal_list_t);
/* receive chunk list */
OBJ_CONSTRUCT(&(ptl_btl->portals_recv_chunks), opal_list_t);
/* lock */
OBJ_CONSTRUCT(&(ptl_btl->portals_lock), opal_mutex_t);
}
*num_btls = mca_btl_portals_component.portals_num_modules;
@ -304,7 +331,8 @@ mca_btl_portals_component_progress(void)
PTL_EQ_NONE) continue; /* they are all initialized at once */
#if OMPI_ENABLE_DEBUG
/* BWB - this is going to kill performance */
/* check for dropped packets. In theory, our protocol covers
this, but it can't hurt to check while we're debugging */
PtlNIStatus(module->portals_ni_h,
PTL_SR_DROP_COUNT,
&numdropped);
@ -318,7 +346,7 @@ mca_btl_portals_component_progress(void)
ret = PtlEQPoll(module->portals_eq_handles,
MCA_BTL_PORTALS_EQ_SIZE, /* number of eq handles */
10,
10, /* poll time */
&ev,
&which);
if (PTL_EQ_EMPTY == ret) {
@ -334,17 +362,6 @@ mca_btl_portals_component_progress(void)
"*** Event queue entries were dropped");
}
#if BTL_PORTALS_HAVE_EVENT_UNLINK && OMPI_ENABLE_DEBUG
/* not everyone has UNLINK. Use it only to print the event,
so we can make sure we properly re-initialize the ones that
need to be re-initialized */
if (PTL_EVENT_UNLINK == ev.type) {
OPAL_OUTPUT_VERBOSE((100, mca_btl_portals_component.portals_output,
"unlink event occurred"));
continue;
}
#endif
switch (which) {
case MCA_BTL_PORTALS_EQ_RECV:
mca_btl_portals_process_recv(module, &ev);

Просмотреть файл

@ -38,7 +38,7 @@ static void mca_btl_portals_endpoint_construct(mca_btl_base_endpoint_t* endpoint
OBJ_CLASS_INSTANCE(
mca_btl_portals_endpoint_t,
opal_object_t,
opal_list_item_t,
mca_btl_portals_endpoint_construct,
NULL);

Просмотреть файл

@ -21,7 +21,7 @@
static void
mca_btl_portals_frag_common_constructor(mca_btl_portals_frag_t* frag)
mca_btl_portals_frag_common_send_constructor(mca_btl_portals_frag_t* frag)
{
frag->base.des_dst = 0;
frag->base.des_dst_cnt = 0;
@ -31,6 +31,8 @@ mca_btl_portals_frag_common_constructor(mca_btl_portals_frag_t* frag)
frag->segment.seg_addr.pval = frag + sizeof(mca_btl_portals_frag_t);
frag->segment.seg_len = frag->size;
frag->segment.seg_key.key64 = 0;
frag->type = MCA_BTL_PORTALS_FRAG_SEND;
}
@ -38,7 +40,7 @@ static void
mca_btl_portals_frag_eager_constructor(mca_btl_portals_frag_t* frag)
{
frag->size = mca_btl_portals_module.super.btl_eager_limit;
mca_btl_portals_frag_common_constructor(frag);
mca_btl_portals_frag_common_send_constructor(frag);
}
@ -46,7 +48,7 @@ static void
mca_btl_portals_frag_max_constructor(mca_btl_portals_frag_t* frag)
{
frag->size = mca_btl_portals_module.super.btl_max_send_size;
mca_btl_portals_frag_common_constructor(frag);
mca_btl_portals_frag_common_send_constructor(frag);
}

Просмотреть файл

@ -22,6 +22,22 @@ extern "C" {
#endif
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_portals_frag_t);
typedef enum {
MCA_BTL_PORTALS_FRAG_SEND,
MCA_BTL_PORTALS_FRAG_RECV
} mca_btl_portals_frag_type_t;
struct mca_btl_portals_send_frag_t {
struct mca_btl_portals_module_t *btl;
struct mca_btl_base_endpoint_t *endpoint;
mca_btl_base_header_t hdr;
};
typedef struct mca_btl_portals_send_frag_t mca_btl_portals_send_frag_t;
struct mca_btl_portals_recv_frag_t {
struct mca_btl_portals_recv_chunk_t *chunk;
};
typedef struct mca_btl_portals_recv_frag_t mca_btl_portals_recv_frag_t;
/**
@ -30,10 +46,13 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_portals_frag_t);
struct mca_btl_portals_frag_t {
mca_btl_base_descriptor_t base;
mca_btl_base_segment_t segment;
struct mca_btl_portals_module_t *btl;
struct mca_btl_base_endpoint_t *endpoint;
mca_btl_base_header_t hdr;
mca_btl_portals_frag_type_t type;
size_t size;
union {
mca_btl_portals_send_frag_t send_frag;
mca_btl_portals_recv_frag_t recv_frag;
} u;
};
typedef struct mca_btl_portals_frag_t mca_btl_portals_frag_t;
OBJ_CLASS_DECLARATION(mca_btl_portals_frag_t);

Просмотреть файл

@ -21,11 +21,69 @@
#include "btl_portals.h"
#include "btl_portals_recv.h"
#include "btl_portals_frag.h"
OBJ_CLASS_INSTANCE(mca_btl_portals_recv_chunk_t,
opal_list_item_t,
NULL, NULL);
int
mca_btl_portals_recv_enable(mca_btl_portals_module_t *module)
{
ptl_md_t md;
ptl_handle_md_t md_h;
ptl_process_id_t any_proc = {PTL_NID_ANY, PTL_PID_ANY};
int ret;
int i;
/* create the reject entry */
md.start = NULL;
md.length = 0;
md.threshold = PTL_MD_THRESH_INF;
md.max_size = 0;
md.options = PTL_MD_TRUNCATE;
md.user_ptr = NULL;
md.eq_handle = PTL_EQ_NONE;
ret = PtlMEAttach(module->portals_ni_h,
BTL_PORTALS_SEND_TABLE_ID,
any_proc,
0, /* match */
0, /* ignore */
PTL_RETAIN,
PTL_INS_AFTER,
&(module->portals_recv_reject_me_h));
if (PTL_OK != ret) {
opal_output(mca_btl_portals_component.portals_output,
"Error creating recv reject ME: %d", ret);
return OMPI_ERROR;
}
ret = PtlMDAttach(module->portals_recv_reject_me_h,
md,
PTL_RETAIN,
&md_h);
if (PTL_OK != ret) {
opal_output(mca_btl_portals_component.portals_output,
"Error attaching recv reject MD: %d", ret);
mca_btl_portals_recv_disable(module);
return OMPI_ERROR;
}
/* create the recv chunks */
for (i = 0 ; i < module->portals_recv_mds_num ; ++i) {
mca_btl_portals_recv_chunk_t *chunk =
mca_btl_portals_recv_chunk_init(module);
if (NULL == chunk) {
mca_btl_portals_recv_disable(module);
return OMPI_ERROR;
}
opal_list_append(&(module->portals_recv_chunks),
(opal_list_item_t*) chunk);
mca_btl_portals_activate_chunk(chunk);
}
return OMPI_SUCCESS;
}
@ -33,14 +91,23 @@ mca_btl_portals_recv_enable(mca_btl_portals_module_t *module)
int
mca_btl_portals_recv_disable(mca_btl_portals_module_t *module)
{
return OMPI_SUCCESS;
}
opal_list_item_t *item;
if (opal_list_get_size(&module->portals_recv_chunks) > 0) {
while (NULL !=
(item = opal_list_remove_first(&module->portals_recv_chunks))) {
mca_btl_portals_recv_chunk_t *chunk =
(mca_btl_portals_recv_chunk_t*) item;
mca_btl_portals_recv_chunk_free(chunk);
}
}
if (PTL_INVALID_HANDLE != module->portals_recv_reject_me_h) {
/* destroy the reject entry */
PtlMEUnlink(module->portals_recv_reject_me_h);
module->portals_recv_reject_me_h = PTL_INVALID_HANDLE;
}
int
mca_btl_portals_process_recv(mca_btl_portals_module_t *module,
ptl_event_t *ev)
{
return OMPI_SUCCESS;
}
@ -69,12 +136,7 @@ mca_btl_portals_recv_chunk_init(mca_btl_portals_module_t *module)
int
mca_btl_portals_recv_chunk_free(mca_btl_portals_recv_chunk_t *chunk)
{
if (PTL_INVALID_HANDLE != chunk->me_h) {
PtlMEUnlink(chunk->me_h);
chunk->me_h = PTL_INVALID_HANDLE;
}
/* need to clear out the md */
while (chunk->pending != 0) {
mca_btl_portals_component_progress();
}
@ -95,6 +157,78 @@ mca_btl_portals_recv_chunk_free(mca_btl_portals_recv_chunk_t *chunk)
}
OBJ_CLASS_INSTANCE(mca_btl_portals_recv_chunk_t,
opal_list_item_t,
NULL, NULL);
int
mca_btl_portals_process_recv(mca_btl_portals_module_t *module,
ptl_event_t *ev)
{
mca_btl_portals_frag_t *frag = NULL;
mca_btl_portals_recv_chunk_t *chunk = ev->md.user_ptr;
mca_btl_base_tag_t tag = (mca_btl_base_tag_t) ev->hdr_data;
int ret;
switch (ev->type) {
case PTL_EVENT_PUT_START:
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"recv: PTL_EVENT_PUT_START for tag %d, link %d",
tag, (int) ev->link);
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to start event\n");
OPAL_THREAD_ADD32(&(chunk->pending), 1);
}
break;
case PTL_EVENT_PUT_END:
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"recv: PTL_EVENT_PUT_END for tag %d, link %d",
tag, (int) ev->link);
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to start event\n");
return OMPI_ERROR;
}
/* ok, we've got data */
opal_output_verbose(95, mca_btl_portals_component.portals_output,
"received data for tag %d\n", tag);
/* it's a user, so we have to manually setup the segment */
MCA_BTL_PORTALS_FRAG_ALLOC_USER(module, frag, ret);
frag->type = MCA_BTL_PORTALS_FRAG_RECV;
frag->size = ev->mlength;
frag->base.des_dst = &frag->segment;
frag->base.des_dst_cnt = 1;
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->segment.seg_addr.pval = (((char*) ev->md.start) + ev->offset);
frag->segment.seg_len = frag->size;
frag->segment.seg_key.key64 = 0;
frag->u.recv_frag.chunk = chunk;
if (ev->md.length - (ev->offset + ev->mlength) < ev->md.max_size) {
/* the chunk is full. It's deactivated, automagically but we
can't start it up again until everyone is done with it.
The actual reactivation and all that will happen after the
free completes the last operation... */
chunk->full = true;
opal_atomic_mb();
}
module->portals_reg[tag].cbfunc(&module->super,
tag,
&frag->base,
module->portals_reg[tag].cbdata);
break;
default:
break;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -17,6 +17,8 @@
#ifndef MCA_BTL_PORTALS_RECV_H
#define MCA_BTL_PORTALS_RECV_H
#include "btl_portals_frag.h"
struct mca_btl_portals_recv_chunk_t {
opal_list_item_t base;
@ -44,20 +46,28 @@ int mca_btl_portals_process_recv(mca_btl_portals_module_t *module,
/**
* Create a chunk of memory for receiving send messages. Must call
* activate_chunk on the returned chunk of memory before it will be
* active with the POrtals library */
* active with the POrtals library
*
* Module lock must be held before calling this function
*/
mca_btl_portals_recv_chunk_t*
mca_btl_portals_recv_chunk_init(mca_btl_portals_module_t *module);
/**
* Free a chunk of memory. Will remove the match entry, then progress
* Portals until the pending count is returned to 0. Will then free
* all resources associated with chunk.
*
* Module lock must be held before calling this function
*/
int mca_btl_portals_recv_chunk_free(mca_btl_portals_recv_chunk_t *chunk);
/*
/**
* activate a chunk. Chunks that are full (have gone inactive) can be
* re-activated with this call
* re-activated with this call. There is no need to hold the lock
* before calling this function
*/
static inline int
mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk)
@ -92,7 +102,10 @@ mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk)
md.user_ptr = chunk;
md.eq_handle = chunk->btl->portals_eq_handles[MCA_BTL_PORTALS_EQ_RECV];
chunk->pending = 0;
chunk->full = false;
/* make sure that everyone sees the update on full value */
opal_atomic_mb();
ret = PtlMDAttach(chunk->me_h,
md,
@ -109,4 +122,23 @@ mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk)
return OMPI_SUCCESS;
}
static inline void
mca_btl_portals_return_chunk_part(mca_btl_portals_module_t *module,
mca_btl_portals_frag_t *frag)
{
mca_btl_portals_recv_chunk_t *chunk = frag->u.recv_frag.chunk;
int ret;
if (chunk->full == true) {
OPAL_THREAD_ADD32(&(chunk->pending), -1);
if (chunk->pending == 0) {
ret = mca_btl_portals_activate_chunk(chunk);
if (OMPI_SUCCESS != ret) {
/* BWB - now what? */
}
}
}
}
#endif /* MCA_BTL_PORTALS_RECV_H */

Просмотреть файл

@ -27,8 +27,82 @@ int
mca_btl_portals_process_send(mca_btl_portals_module_t *module,
ptl_event_t *ev)
{
opal_output_verbose(99, mca_btl_portals_component.portals_output,
"process_send");
mca_btl_portals_frag_t *frag =
(mca_btl_portals_frag_t*) ev->md.user_ptr;
switch (ev->type) {
case PTL_EVENT_SEND_START:
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"send: PTL_EVENT_SEND_START for 0x%x",
frag);
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to start send event\n");
frag->base.des_cbfunc(&module->super,
frag->u.send_frag.endpoint,
&frag->base,
OMPI_ERROR);
}
break;
case PTL_EVENT_SEND_END:
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"send: PTL_EVENT_SEND_END for 0x%x",
frag);
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to end send event\n");
frag->base.des_cbfunc(&module->super,
frag->u.send_frag.endpoint,
&frag->base,
OMPI_ERROR);
}
break;
case PTL_EVENT_ACK:
/* ok, this is the real work - the message has been received
on the other side. If mlength == 0, that means that we hit
the reject md and we need to try to retransmit */
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"send: PTL_EVENT_ACK for 0x%x",
frag);
if (0 == ev->mlength) {
/* other side did not receive the message */
/* BWB - implement check for retransmit */
opal_output(mca_btl_portals_component.portals_output,
"message was dropped and retransmits not implemented");
frag->base.des_cbfunc(&module->super,
frag->u.send_frag.endpoint,
&frag->base,
OMPI_ERROR);
} else {
/* the other side received the message */
OPAL_THREAD_ADD32(&module->portals_outstanding_sends, -1);
/* we're done with the md - return it. Do this before
anything else in case the PML releases resources, then
gets more resources (ie, what's currently in this
md) */
PtlMDUnlink(ev->md_handle);
/* let the PML know we're done... */
frag->base.des_cbfunc(&module->super,
frag->u.send_frag.endpoint,
&frag->base,
OMPI_SUCCESS);
}
break;
default:
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"send: unexpected event %d for 0x%x",
ev->type, frag);
break;
}
return OMPI_SUCCESS;
}
@ -42,9 +116,17 @@ mca_btl_portals_send(struct mca_btl_base_module_t* btl,
{
mca_btl_portals_module_t *ptl_btl = (mca_btl_portals_module_t*) btl;
mca_btl_portals_frag_t *frag = (mca_btl_portals_frag_t*) descriptor;
frag->endpoint = endpoint;
frag->hdr.tag = tag;
frag->btl = ptl_btl;
int32_t num_sends;
frag->u.send_frag.endpoint = endpoint;
frag->u.send_frag.hdr.tag = tag;
frag->u.send_frag.btl = ptl_btl;
num_sends = OPAL_THREAD_ADD32(&ptl_btl->portals_outstanding_sends, 1);
/* BWB - implement check for too many pending messages */
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"send called for frag 0x%x", frag);
return mca_btl_portals_send_frag(frag);
}

Просмотреть файл

@ -30,7 +30,6 @@ mca_btl_portals_send_frag(mca_btl_portals_frag_t *frag)
ptl_handle_md_t md_h;
int ret;
/* setup the send */
md.start = frag->segment.seg_addr.pval;
md.length = frag->segment.seg_len;
@ -38,10 +37,10 @@ mca_btl_portals_send_frag(mca_btl_portals_frag_t *frag)
md.max_size = 0;
md.options = 0; /* BWB - can we optimize? */
md.user_ptr = frag; /* keep a pointer to ourselves */
md.eq_handle = frag->btl->portals_eq_handles[MCA_BTL_PORTALS_EQ_SEND];
md.eq_handle = frag->u.send_frag.btl->portals_eq_handles[MCA_BTL_PORTALS_EQ_SEND];
/* make a free-floater */
ret = PtlMDBind(frag->btl->portals_ni_h,
ret = PtlMDBind(frag->u.send_frag.btl->portals_ni_h,
md,
PTL_UNLINK,
&md_h);
@ -53,12 +52,12 @@ mca_btl_portals_send_frag(mca_btl_portals_frag_t *frag)
ret = PtlPut(md_h,
PTL_ACK_REQ,
frag->endpoint->endpoint_ptl_id,
frag->u.send_frag.endpoint->endpoint_ptl_id,
BTL_PORTALS_SEND_TABLE_ID,
0, /* ac_index */
0, /* match bits */
0, /* ac_index - not used*/
frag->segment.seg_key.key64, /* match bits */
0, /* remote offset - not used */
frag->hdr.tag); /* hdr_data - tag */
frag->u.send_frag.hdr.tag); /* hdr_data - tag */
if (ret != PTL_OK) {
opal_output(mca_btl_portals_component.portals_output,
"PtlPut failed with error %d", ret);