1
1

Add error callback to the btl interface, this allows error to be delivered to

the upperlayer assynchronously although there are some issues with this.. such
as there are multiple consumers of the btl's.. who get's the

This commit was SVN r11232.
Этот коммит содержится в:
Galen Shipman 2006-08-16 20:21:38 +00:00
родитель 0f47949703
Коммит 3b49953ce2
25 изменённых файлов: 260 добавлений и 48 удалений

Просмотреть файл

@ -594,6 +594,22 @@ typedef int (*mca_bml_base_module_register_fn_t)(
/**
* Register a callback function that is called of error.
*
* @param bml (IN) BML module
* @return Status indicating if cleanup was successful
*
*/
typedef int (*mca_bml_base_module_register_error_cb_fn_t)(
mca_btl_base_module_error_cb_fn_t cbfunc
);
/**
* BML module interface functions and attributes.
*/
@ -613,6 +629,8 @@ struct mca_bml_base_module_t {
mca_bml_base_module_del_btl_fn_t bml_del_btl;
mca_bml_base_module_del_proc_btl_fn_t bml_del_proc_btl;
mca_bml_base_module_register_fn_t bml_register;
mca_bml_base_module_register_error_cb_fn_t bml_register_error;
mca_bml_base_module_finalize_fn_t bml_finalize;
mca_bml_base_module_progress_fn_t bml_progress;

Просмотреть файл

@ -49,6 +49,7 @@ mca_bml_r2_module_t mca_bml_r2 = {
mca_bml_r2_del_btl,
mca_bml_r2_del_proc_btl,
mca_bml_r2_register,
mca_bml_r2_register_error,
mca_bml_r2_finalize,
mca_bml_r2_progress
}
@ -718,6 +719,32 @@ int mca_bml_r2_register(
}
/*
* Register an error handler with/ all active btls
* if they support error handlers..
*/
int mca_bml_r2_register_error(
mca_btl_base_module_error_cb_fn_t cbfunc
)
{
uint32_t i;
int rc;
mca_btl_base_module_t *btl;
for(i = 0; i < mca_bml_r2.num_btl_modules; i++) {
btl = mca_bml_r2.btl_modules[i];
if(btl->btl_register_error) {
rc = btl->btl_register_error(btl, cbfunc);
if(OMPI_SUCCESS != rc) {
return rc;
}
}
}
return OMPI_SUCCESS;
}
int mca_bml_r2_component_fini(void)
{
/* FIX */

Просмотреть файл

@ -96,6 +96,7 @@ int mca_bml_r2_register(
void* data
);
int mca_bml_r2_register_error( mca_btl_base_module_error_cb_fn_t cbfunc );
int mca_bml_r2_finalize( void );

Просмотреть файл

@ -153,6 +153,8 @@ typedef uint8_t mca_btl_base_tag_t;
#define MCA_BTL_EXCLUSIVITY_DEFAULT 1024 /* GM/IB/etc. */
#define MCA_BTL_EXCLUSIVITY_LOW 0 /* TCP used as a last resort */
/* error callback flags */
#define MCA_BTL_ERROR_FLAGS_FATAL 0x1
/**
* Asynchronous callback function on completion of an operation.
@ -390,6 +392,32 @@ typedef int (*mca_btl_base_module_register_fn_t)(
);
/**
* Callback function that is called asynchronously on receipt
* of an error from the transport layer
*
*/
typedef void (*mca_btl_base_module_error_cb_fn_t)(
struct mca_btl_base_module_t* btl,
int32_t flags
);
/**
* Register a callback function that is called on receipt
* of an error.
*
* @param btl (IN) BTL module
* @return Status indicating if cleanup was successful
*
*/
typedef int (*mca_btl_base_module_register_error_fn_t)(
struct mca_btl_base_module_t* btl,
mca_btl_base_module_error_cb_fn_t cbfunc
);
/**
* Allocate a descriptor with a segment of the requested size.
* Note that the BTL layer may choose to return a smaller size
@ -514,10 +542,11 @@ struct mca_btl_base_module_t {
uint32_t btl_flags; /**< flags (put/get...) */
/* BTL function table */
mca_btl_base_module_add_procs_fn_t btl_add_procs;
mca_btl_base_module_del_procs_fn_t btl_del_procs;
mca_btl_base_module_register_fn_t btl_register;
mca_btl_base_module_finalize_fn_t btl_finalize;
mca_btl_base_module_add_procs_fn_t btl_add_procs;
mca_btl_base_module_del_procs_fn_t btl_del_procs;
mca_btl_base_module_register_fn_t btl_register;
mca_btl_base_module_register_error_fn_t btl_register_error;
mca_btl_base_module_finalize_fn_t btl_finalize;
mca_btl_base_module_alloc_fn_t btl_alloc;
mca_btl_base_module_free_fn_t btl_free;
@ -527,7 +556,7 @@ struct mca_btl_base_module_t {
mca_btl_base_module_put_fn_t btl_put;
mca_btl_base_module_get_fn_t btl_get;
mca_btl_base_module_dump_fn_t btl_dump; /* diagnostics */
/* the mpool associated with this btl (optional) */
mca_mpool_base_module_t* btl_mpool;
};

Просмотреть файл

@ -70,6 +70,7 @@ mca_btl_gm_module_t mca_btl_gm_module = {
mca_btl_gm_add_procs,
mca_btl_gm_del_procs,
mca_btl_gm_register,
mca_btl_gm_register_error_cb,
mca_btl_gm_finalize,
mca_btl_gm_alloc,
mca_btl_gm_free,
@ -175,6 +176,19 @@ int mca_btl_gm_register(
}
/*
*Register callback function for error handling..
*/
int mca_btl_gm_register_error_cb(
struct mca_btl_base_module_t* btl,
mca_btl_base_module_error_cb_fn_t cbfunc)
{
mca_btl_gm_module_t* gm_btl = (mca_btl_gm_module_t*) btl;
gm_btl->error_cb = cbfunc; /* stash for later */
return OMPI_SUCCESS;
}
/**
* Allocate a segment.
*

Просмотреть файл

@ -110,6 +110,7 @@ struct mca_btl_gm_module_t {
opal_thread_t gm_thread;
bool gm_progress;
#endif
mca_btl_base_module_error_cb_fn_t error_cb;
};
typedef struct mca_btl_gm_module_t mca_btl_gm_module_t;
extern mca_btl_gm_module_t mca_btl_gm_module;
@ -257,6 +258,31 @@ extern int mca_btl_gm_register(
mca_btl_base_tag_t tag,
mca_btl_base_module_recv_cb_fn_t cbfunc,
void* cbdata);
/**
* Register a callback function that is called on error..
*
* @param btl (IN) BTL module
* @return Status indicating if cleanup was successful
*/
int mca_btl_gm_register_error_cb(
struct mca_btl_base_module_t* btl,
mca_btl_base_module_error_cb_fn_t cbfunc
);
/**
* Register a callback function that is called on error.
*
* @param btl (IN) BTL module
* @return Status indicating if registration was successful
*
*/
extern int mca_btl_gm_register_error_cb(
struct mca_btl_base_module_t* btl,
mca_btl_base_module_error_cb_fn_t cbfunc);
/**
* Allocate a descriptor with a segment of the requested size.

Просмотреть файл

@ -224,6 +224,7 @@ mca_btl_gm_module_init (mca_btl_gm_module_t * btl)
OBJ_CONSTRUCT(&btl->gm_thread, opal_thread_t);
#endif
btl->error_cb = NULL;
/* query nic tokens */
btl->gm_num_send_tokens = gm_num_send_tokens (btl->port);
btl->gm_max_send_tokens = btl->gm_num_send_tokens;
@ -559,10 +560,16 @@ int mca_btl_gm_component_progress()
frag->segment.seg_len = gm_ntohl(event->recv.length) - sizeof(mca_btl_base_header_t);
reg = &btl->gm_reg[hdr->tag];
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
/* cbfunc may be null if interface goes down.. */
if(reg->cbfunc) {
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
} else {
btl->error_cb(&btl->super,
MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
}
MCA_BTL_GM_FRAG_POST(btl,frag);
count++;
break;
@ -579,12 +586,17 @@ int mca_btl_gm_component_progress()
frag->segment.seg_addr.pval = (hdr+1);
frag->segment.seg_len = gm_ntohl(event->recv.length) - sizeof(mca_btl_base_header_t);
reg = &btl->gm_reg[hdr->tag];
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
MCA_BTL_GM_FRAG_POST(btl,frag);
if(reg->cbfunc) {
OPAL_THREAD_UNLOCK(&mca_btl_gm_component.gm_lock);
reg->cbfunc(&btl->super, hdr->tag, &frag->base, reg->cbdata);
OPAL_THREAD_LOCK(&mca_btl_gm_component.gm_lock);
MCA_BTL_GM_FRAG_POST(btl,frag);
} else {
btl->error_cb(&btl->super,
MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
}
count++;
break;
}

Просмотреть файл

@ -47,6 +47,7 @@ mca_btl_mx_module_t mca_btl_mx_module = {
mca_btl_mx_add_procs,
mca_btl_mx_del_procs,
mca_btl_mx_register,
NULL, /* register error */
mca_btl_mx_finalize,
mca_btl_mx_alloc,
mca_btl_mx_free,

Просмотреть файл

@ -51,6 +51,7 @@ mca_btl_openib_module_t mca_btl_openib_module = {
mca_btl_openib_add_procs,
mca_btl_openib_del_procs,
mca_btl_openib_register,
mca_btl_openib_register_error_cb,
mca_btl_openib_finalize,
/* we need alloc free, pack */
mca_btl_openib_alloc,
@ -228,6 +229,19 @@ int mca_btl_openib_register(
}
/*
*Register callback function for error handling..
*/
int mca_btl_openib_register_error_cb(
struct mca_btl_base_module_t* btl,
mca_btl_base_module_error_cb_fn_t cbfunc)
{
mca_btl_openib_module_t* openib_btl = (mca_btl_openib_module_t*) btl;
openib_btl->error_cb = cbfunc; /* stash for later */
return OMPI_SUCCESS;
}
/**
* Allocate a segment.
*

Просмотреть файл

@ -207,6 +207,9 @@ struct mca_btl_openib_module_t {
size_t eager_rdma_frag_size; /**< length of eager frag */
orte_pointer_array_t *eager_rdma_buffers; /**< RDMA buffers to poll */
uint32_t eager_rdma_buffers_count; /**< number of RDMA buffers */
mca_btl_base_module_error_cb_fn_t error_cb; /**< error handler */
}; typedef struct mca_btl_openib_module_t mca_btl_openib_module_t;
struct mca_btl_openib_frag_t;
@ -230,6 +233,19 @@ int mca_btl_openib_register(
mca_btl_base_module_recv_cb_fn_t cbfunc,
void* cbdata
);
/**
* Register a callback function that is called on error..
*
* @param btl (IN) BTL module
* @return Status indicating if cleanup was successful
*/
int mca_btl_openib_register_error_cb(
struct mca_btl_base_module_t* btl,
mca_btl_base_module_error_cb_fn_t cbfunc
);
/**

Просмотреть файл

@ -883,8 +883,11 @@ static int btl_openib_component_progress(void)
ret = btl_openib_handle_incoming_hp(openib_btl,
frag->endpoint, frag,
size - sizeof(mca_btl_openib_footer_t));
if (ret != MPI_SUCCESS)
return ret;
if (ret != MPI_SUCCESS) {
openib_btl->error_cb(&openib_btl->super,
MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
}
count++;
} else
OPAL_THREAD_UNLOCK(&endpoint->eager_rdma_local.lock);
@ -928,15 +931,16 @@ static int btl_openib_component_progress(void)
abort();
}
return OMPI_ERROR;
openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
}
/* Handle work completions */
switch(wc.opcode) {
case IBV_WC_RECV_RDMA_WITH_IMM:
BTL_ERROR(("Got an RDMA with Immediate data Not supported!"));
return OMPI_ERROR;
openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
case IBV_WC_RDMA_WRITE:
case IBV_WC_SEND :
@ -994,8 +998,10 @@ static int btl_openib_component_progress(void)
frag = (mca_btl_openib_frag_t*) (unsigned long) wc.wr_id;
ret = btl_openib_handle_incoming_hp(openib_btl,
frag->endpoint, frag, wc.byte_len);
if (ret != OMPI_SUCCESS)
return ret;
if (ret != OMPI_SUCCESS) {
openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
}
count++;
break;
@ -1009,7 +1015,8 @@ static int btl_openib_component_progress(void)
ne=ibv_poll_cq(openib_btl->ib_cq_lp, 1, &wc );
if(ne < 0){
BTL_ERROR(("error polling LP CQ with %d errno says %s", ne, strerror(errno)));
return OMPI_ERROR;
openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
}
else if(1 == ne) {
if(wc.status != IBV_WC_SUCCESS) {
@ -1028,7 +1035,8 @@ static int btl_openib_component_progress(void)
BTL_PEER_ERROR(remote_proc, ("error polling LP CQ with status %s status number %d for wr_id %llu opcode %d",
btl_openib_component_status_to_string(wc.status),
wc.status, wc.wr_id, wc.opcode));
return OMPI_ERROR;
openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
}
/* Handle n/w completions */
@ -1036,7 +1044,8 @@ static int btl_openib_component_progress(void)
case IBV_WC_RECV_RDMA_WITH_IMM:
BTL_ERROR(("Got an RDMA with Immediate data Not supported!"));
return OMPI_ERROR;
openib_btl->error_cb(&openib_btl->super, MCA_BTL_ERROR_FLAGS_FATAL);
return 0;
case IBV_WC_SEND:

Просмотреть файл

@ -57,6 +57,7 @@ mca_btl_base_module_t mca_btl_self = {
mca_btl_self_add_procs,
mca_btl_self_del_procs,
mca_btl_self_register,
NULL,
mca_btl_self_finalize,
mca_btl_self_alloc,
mca_btl_self_free,

Просмотреть файл

@ -98,6 +98,7 @@ mca_btl_sm_t mca_btl_sm[2] = {
mca_btl_sm_add_procs_same_base_addr,
mca_btl_sm_del_procs,
mca_btl_sm_register,
NULL,
mca_btl_sm_finalize,
mca_btl_sm_alloc,
mca_btl_sm_free,
@ -124,6 +125,7 @@ mca_btl_sm_t mca_btl_sm[2] = {
mca_btl_sm_add_procs,
mca_btl_sm_del_procs,
mca_btl_sm_register,
NULL,
mca_btl_sm_finalize,
mca_btl_sm_alloc,
mca_btl_sm_free,

Просмотреть файл

@ -48,6 +48,7 @@ mca_btl_tcp_module_t mca_btl_tcp_module = {
mca_btl_tcp_add_procs,
mca_btl_tcp_del_procs,
mca_btl_tcp_register,
NULL, /* register error */
mca_btl_tcp_finalize,
mca_btl_tcp_alloc,
mca_btl_tcp_free,

Просмотреть файл

@ -221,7 +221,10 @@ int mca_btl_tcp_component_open(void)
mca_btl_tcp_module.super.btl_max_rdma_size =
mca_btl_tcp_param_register_int("max_rdma_size", INT_MAX);
mca_btl_tcp_module.super.btl_flags =
mca_btl_tcp_param_register_int("flags", MCA_BTL_FLAGS_PUT|MCA_BTL_FLAGS_SEND_INPLACE);
mca_btl_tcp_param_register_int("flags", MCA_BTL_FLAGS_PUT |
MCA_BTL_FLAGS_SEND_INPLACE |
MCA_BTL_FLAGS_NEED_CSUM |
MCA_BTL_FLAGS_NEED_ACK );
return OMPI_SUCCESS;
}

Просмотреть файл

@ -46,7 +46,8 @@ mca_btl_template_module_t mca_btl_template_module = {
0, /* flags */
mca_btl_template_add_procs,
mca_btl_template_del_procs,
mca_btl_template_register,
mca_btl_template_register,
NULL,
mca_btl_template_finalize,
mca_btl_template_alloc,
mca_btl_template_free,

Просмотреть файл

@ -56,6 +56,7 @@ mca_btl_ud_module_t mca_btl_ud_module = {
mca_btl_ud_add_procs,
mca_btl_ud_del_procs,
mca_btl_ud_register,
NULL, /* register error */
mca_btl_ud_finalize,
/* we need alloc free, pack */
mca_btl_ud_alloc,

Просмотреть файл

@ -53,6 +53,7 @@ mca_btl_udapl_module_t mca_btl_udapl_module = {
mca_btl_udapl_add_procs,
mca_btl_udapl_del_procs,
mca_btl_udapl_register,
NULL, /* register error */
mca_btl_udapl_finalize,
mca_btl_udapl_alloc,
mca_btl_udapl_free,

Просмотреть файл

@ -36,6 +36,7 @@
#include "pml_dr_recvreq.h"
#include "ompi/mca/bml/base/base.h"
#include "orte/mca/ns/ns.h"
#include "orte/mca/errmgr/errmgr.h"
mca_pml_dr_t mca_pml_dr = {
{
@ -60,6 +61,10 @@ mca_pml_dr_t mca_pml_dr = {
}
};
void mca_pml_dr_error_handler(
struct mca_btl_base_module_t* btl,
int32_t flags);
int mca_pml_dr_enable(bool enable)
{
if( false == enable ) return OMPI_SUCCESS;
@ -104,6 +109,13 @@ int mca_pml_dr_del_comm(ompi_communicator_t* comm)
}
void mca_pml_dr_error_handler(
struct mca_btl_base_module_t* btl,
int32_t flags) {
orte_errmgr.abort();
}
/*
* For each proc setup a datastructure that indicates the PTLs
* that can be used to reach the destination.
@ -139,13 +151,22 @@ int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs)
);
if(OMPI_SUCCESS != rc)
return rc;
/* register recv handler */
rc = mca_bml.bml_register(
MCA_BTL_TAG_PML,
mca_pml_dr_recv_frag_callback,
NULL);
/* initialize free list of receive buffers */
if(OMPI_SUCCESS != rc)
return rc;
/* register error handlers */
rc = mca_bml.bml_register_error(mca_pml_dr_error_handler);
if(OMPI_SUCCESS != rc)
return rc;
ompi_free_list_init(
&mca_pml_dr.buffers,
sizeof(mca_pml_dr_buffer_t) + mca_pml_dr.eager_limit,

Просмотреть файл

@ -65,7 +65,10 @@ struct mca_pml_dr_t {
/* pending lists */
opal_list_t send_pending;
opal_list_t acks_pending;
/* active lists */
opal_list_t send_active;
/* free lists */
ompi_free_list_t send_requests;
ompi_free_list_t recv_requests;

Просмотреть файл

@ -157,6 +157,7 @@ int mca_pml_dr_component_open(void)
NULL);
OBJ_CONSTRUCT(&mca_pml_dr.send_pending, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_dr.send_active, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_dr.acks_pending, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_dr.buffers, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_dr.endpoints, ompi_pointer_array_t);
@ -179,6 +180,7 @@ int mca_pml_dr_component_close(void)
return rc;
OBJ_DESTRUCT(&mca_pml_dr.send_pending);
OBJ_DESTRUCT(&mca_pml_dr.send_active);
OBJ_DESTRUCT(&mca_pml_dr.acks_pending);
OBJ_DESTRUCT(&mca_pml_dr.recv_requests);
OBJ_DESTRUCT(&mca_pml_dr.recv_frags);

Просмотреть файл

@ -22,24 +22,6 @@
#include "orte/mca/ns/ns.h"
static void mca_pml_dr_endpoint_copy(mca_pml_dr_endpoint_t* dst, mca_pml_dr_endpoint_t* src)
{
dst->local = src->local;
dst->src = src->src;
dst->dst = src->dst;
ompi_seq_tracker_copy(&dst->seq_sends, &src->seq_sends);
ompi_seq_tracker_copy(&dst->seq_recvs, &src->seq_recvs);
ompi_seq_tracker_copy(&dst->seq_recvs_matched, &src->seq_recvs_matched);
dst->vfrag_seq = src->vfrag_seq;
/* this won't work for comm spawn and other dynamic
processes, but will work for initial job start */
/* dst->local = dst->dst = ompi_pointer_array_add(&mca_pml_dr.endpoints, */
/* (void*) dst); */
}
static void mca_pml_dr_endpoint_construct(mca_pml_dr_endpoint_t* ep)
{

Просмотреть файл

@ -779,6 +779,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
);
if(des == NULL) {
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_remove_item(&mca_pml_dr.send_active, (opal_list_item_t*)sendreq);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;
@ -825,6 +826,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
mca_bml_base_free(bml_btl,des);
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_remove_item(&mca_pml_dr.send_active, (opal_list_item_t*) sendreq);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;
@ -865,6 +867,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
MCA_PML_DR_VFRAG_ALLOC(vfrag,rc);
if(NULL == vfrag) {
OPAL_THREAD_LOCK(&mca_pml_dr.lock);
opal_list_remove_item(&mca_pml_dr.send_active, (opal_list_item_t*)sendreq);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock);
break;
@ -900,6 +903,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
);
if(des == NULL) {
OPAL_THREAD_LOCK(&mca_pml_dr.lock);
opal_list_remove_item(&mca_pml_dr.send_active, (opal_list_item_t*)sendreq);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock);
break;
@ -946,6 +950,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
mca_bml_base_free(bml_btl,des);
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_list_remove_item(&mca_pml_dr.send_active, (opal_list_item_t*)sendreq);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;

Просмотреть файл

@ -85,6 +85,8 @@ OBJ_CLASS_DECLARATION(mca_pml_dr_send_request_t);
OMPI_FREE_LIST_WAIT(&mca_pml_dr.send_requests, item, rc); \
sendreq = (mca_pml_dr_send_request_t*)item; \
sendreq->req_send.req_base.req_proc = proc; \
opal_list_append(&mca_pml_dr.send_active, \
(opal_list_item_t*) sendreq); \
} \
}
@ -241,6 +243,8 @@ do {
#define MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq) \
do { \
assert( false == sendreq->req_send.req_base.req_pml_complete ); \
opal_list_remove_item(&mca_pml_dr.send_active, \
(opal_list_item_t*) sendreq); \
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
@ -355,6 +359,8 @@ do { \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
if(NULL == sendreq) \
break; \
opal_list_append(&mca_pml_dr.send_active, \
(opal_list_item_t*) sendreq); \
mca_pml_dr_send_request_schedule(sendreq); \
} \
} while (0)

Просмотреть файл

@ -101,6 +101,21 @@ static void mca_pml_dr_vfrag_wdog_timeout(int fd, short event, void* data)
}
static void mca_pml_dr_vfrag_cleanup_active_desc(mca_bml_base_btl_t* bml_btl) {
opal_list_item_t* item;
for (item = opal_list_get_first(&mca_pml_dr.send_active) ;
item != opal_list_get_end(&mca_pml_dr.send_active) ;
item = opal_list_get_next(item)) {
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) item;
mca_btl_base_descriptor_t* des = sendreq->req_descriptor;
if( des->des_context == bml_btl) {
des->des_context = NULL;
}
}
}
/**
* The ack timer expired, better do something about it, like resend the entire vfrag?
*/
@ -118,6 +133,7 @@ static void mca_pml_dr_vfrag_ack_timeout(int fd, short event, void* data)
/* declare btl dead */
opal_output(0, "%s:%d:%s: failing BTL: %s", __FILE__, __LINE__, __func__,
vfrag->bml_btl->btl->btl_component->btl_version.mca_component_name);
mca_pml_dr_vfrag_cleanup_active_desc(vfrag->bml_btl);
mca_bml.bml_del_btl(vfrag->bml_btl->btl);
mca_pml_dr_vfrag_reset(vfrag);
}