resolved several threading issues
This commit was SVN r1023.
Этот коммит содержится в:
родитель
94a93ab1e1
Коммит
fa0fdba04e
@ -236,7 +236,7 @@ int mca_base_modex_exchange(void)
|
||||
rc = mca_oob.oob_send(
|
||||
proc->proc_job,
|
||||
proc->proc_vpid,
|
||||
MCA_OOB_ANY_TAG,
|
||||
0,
|
||||
self_module->module_data,
|
||||
self_module->module_data_size);
|
||||
if(rc != LAM_SUCCESS) {
|
||||
@ -255,7 +255,7 @@ int mca_base_modex_exchange(void)
|
||||
for(i=0; i<nprocs; i++) {
|
||||
lam_proc_t *proc = procs[i];
|
||||
mca_base_modex_module_t* proc_module;
|
||||
int tag = MCA_OOB_ANY_TAG;
|
||||
int tag = 0;
|
||||
int rc;
|
||||
|
||||
if(proc == self)
|
||||
|
@ -75,6 +75,7 @@ mca_oob_cofs_recv(lam_job_handle_t job_handle, int vpid, int* tag,
|
||||
blocking_recv_posted = 1;
|
||||
while (ret == LAM_ERR_WOULD_BLOCK) {
|
||||
ret = do_recv(job_handle, vpid, tag, data, data_len);
|
||||
sleep(1);
|
||||
}
|
||||
blocking_recv_posted = 0;
|
||||
return ret;
|
||||
|
@ -90,22 +90,6 @@ mca_oob_cofs_init(int *priority, bool *allow_multi_user_threads,
|
||||
snprintf(mca_oob_cofs_comm_loc, LAM_PATH_MAX, "%s", tmp);
|
||||
}
|
||||
|
||||
/*
|
||||
* See if we can write in our directory...
|
||||
*/
|
||||
tmp = malloc(strlen(mca_oob_cofs_comm_loc) + 5);
|
||||
if (tmp == NULL) return NULL;
|
||||
sprintf(tmp, "%s/me", mca_oob_cofs_comm_loc);
|
||||
fp = fopen(tmp, "w");
|
||||
if (fp == NULL) {
|
||||
printf("oob_cofs can not write in communication dir\n");
|
||||
free(tmp);
|
||||
return NULL;
|
||||
}
|
||||
fclose(fp);
|
||||
unlink(tmp);
|
||||
free(tmp);
|
||||
|
||||
/*
|
||||
* BWB - fix me, make register the "right" way...
|
||||
*/
|
||||
@ -117,6 +101,22 @@ mca_oob_cofs_init(int *priority, bool *allow_multi_user_threads,
|
||||
}
|
||||
mca_oob_cofs_my_vpid = atoi(tmp);
|
||||
|
||||
/*
|
||||
* See if we can write in our directory...
|
||||
*/
|
||||
tmp = malloc(strlen(mca_oob_cofs_comm_loc) + 5);
|
||||
if (tmp == NULL) return NULL;
|
||||
sprintf(tmp, "%s/oob.%d", mca_oob_cofs_comm_loc, mca_oob_cofs_my_vpid);
|
||||
fp = fopen(tmp, "w");
|
||||
if (fp == NULL) {
|
||||
printf("oob_cofs can not write in communication dir\n");
|
||||
free(tmp);
|
||||
return NULL;
|
||||
}
|
||||
fclose(fp);
|
||||
unlink(tmp);
|
||||
free(tmp);
|
||||
|
||||
mca_oob_cofs_serial = 0;
|
||||
|
||||
return &mca_oob_cofs_1_0_0;
|
||||
|
@ -132,22 +132,6 @@ mca_pcm_cofs_init(int *priority, bool *allow_multi_user_threads,
|
||||
snprintf(mca_pcm_cofs_comm_loc, LAM_PATH_MAX, "%s", tmp);
|
||||
}
|
||||
|
||||
/*
|
||||
* See if we can write in our directory...
|
||||
*/
|
||||
tmp = malloc(strlen(mca_pcm_cofs_comm_loc) + 5);
|
||||
if (tmp == NULL) return NULL;
|
||||
sprintf(tmp, "%s/me", mca_pcm_cofs_comm_loc);
|
||||
fp = fopen(tmp, "w");
|
||||
if (fp == NULL) {
|
||||
printf("pcm_cofs can not write in communication dir\n");
|
||||
free(tmp);
|
||||
return NULL;
|
||||
}
|
||||
fclose(fp);
|
||||
unlink(tmp);
|
||||
free(tmp);
|
||||
|
||||
/*
|
||||
* BWB - fix me, make register the "right" way...
|
||||
*/
|
||||
@ -159,6 +143,22 @@ mca_pcm_cofs_init(int *priority, bool *allow_multi_user_threads,
|
||||
}
|
||||
mca_pcm_cofs_my_vpid = atoi(tmp);
|
||||
|
||||
/*
|
||||
* See if we can write in our directory...
|
||||
*/
|
||||
tmp = malloc(strlen(mca_pcm_cofs_comm_loc) + 5);
|
||||
if (tmp == NULL) return NULL;
|
||||
sprintf(tmp, "%s/pcm.%d", mca_pcm_cofs_comm_loc, mca_pcm_cofs_my_vpid);
|
||||
fp = fopen(tmp, "w");
|
||||
if (fp == NULL) {
|
||||
printf("pcm_cofs can not write in communication dir\n");
|
||||
free(tmp);
|
||||
return NULL;
|
||||
}
|
||||
fclose(fp);
|
||||
unlink(tmp);
|
||||
free(tmp);
|
||||
|
||||
mca_pcm_cofs_my_handle = getenv("MCA_common_lam_cofs_job_handle");
|
||||
|
||||
mca_pcm_cofs_procs = NULL;
|
||||
|
@ -36,7 +36,7 @@ struct mca_pml_base_request_t {
|
||||
int32_t req_tag; /**< user defined tag */
|
||||
lam_communicator_t *req_comm; /**< communicator pointer */
|
||||
lam_proc_t* req_proc; /**< peer process */
|
||||
mca_ptl_base_sequence_t req_sequence; /**< sequence number for MPI pt-2-pt ordering */
|
||||
mca_ptl_sequence_t req_sequence; /**< sequence number for MPI pt-2-pt ordering */
|
||||
lam_datatype_t *req_datatype; /**< pointer to data type */
|
||||
mca_pml_base_request_type_t req_type; /**< MPI request type - used for test */
|
||||
lam_status_public_t req_status; /**< completion status */
|
||||
|
@ -155,6 +155,29 @@ typedef int (*mca_pml_base_del_procs_fn_t)(struct lam_proc_t **procs, size_t npr
|
||||
typedef int (*mca_pml_base_add_ptls_fn_t)(lam_list_t *ptls);
|
||||
|
||||
|
||||
/**
|
||||
* Downcall from MCA layer to enable the PML/PTLs.
|
||||
*
|
||||
* @param param parameter to change
|
||||
* @param value optional value
|
||||
* @param size size of value
|
||||
* @return LAM_SUCCESS or failure status.
|
||||
*/
|
||||
typedef int (*mca_pml_base_control_fn_t)(
|
||||
int param,
|
||||
void *value,
|
||||
size_t size
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* For non-threaded case, provides MCA the opportunity to
|
||||
* progress outstanding requests on all ptls.
|
||||
*
|
||||
* @return LAM_SUCCESS or failure status.
|
||||
*/
|
||||
typedef int (*mca_pml_base_progress_fn_t)(void);
|
||||
|
||||
/**
|
||||
* MPI Interface Functions
|
||||
*/
|
||||
@ -445,6 +468,8 @@ struct mca_pml_1_0_0_t {
|
||||
mca_pml_base_add_procs_fn_t pml_add_procs;
|
||||
mca_pml_base_del_procs_fn_t pml_del_procs;
|
||||
mca_pml_base_add_ptls_fn_t pml_add_ptls;
|
||||
mca_pml_base_control_fn_t pml_control;
|
||||
mca_pml_base_progress_fn_t pml_progress;
|
||||
|
||||
/* downcalls from MPI to PML */
|
||||
mca_pml_base_add_comm_fn_t pml_add_comm;
|
||||
|
@ -22,6 +22,8 @@ mca_pml_teg_t mca_pml_teg = {
|
||||
mca_pml_teg_add_procs,
|
||||
mca_pml_teg_del_procs,
|
||||
mca_pml_teg_add_ptls,
|
||||
mca_pml_teg_control,
|
||||
mca_pml_teg_progress,
|
||||
mca_pml_teg_add_comm,
|
||||
mca_pml_teg_del_comm,
|
||||
mca_pml_teg_irecv_init,
|
||||
@ -109,6 +111,21 @@ int mca_pml_teg_add_ptls(lam_list_t *ptls)
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Pass control information through to all PTL modules.
|
||||
*/
|
||||
|
||||
int mca_pml_teg_control(int param, void* value, size_t size)
|
||||
{
|
||||
size_t i=0;
|
||||
for(i=0; i<mca_pml_teg.teg_num_ptl_modules; i++) {
|
||||
int rc = mca_pml_teg.teg_ptl_modules[i]->ptlm_control(param,value,size);
|
||||
if(rc != LAM_SUCCESS)
|
||||
return rc;
|
||||
}
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* For each proc setup a datastructure that indicates the PTLs
|
||||
* that can be used to reach the destination.
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "mca/pml/base/pml_base_request.h"
|
||||
#include "mca/ptl/ptl.h"
|
||||
|
||||
#define MCA_PML_TEG_STATISTICS 1
|
||||
|
||||
/**
|
||||
* TEG PML Interface
|
||||
@ -42,6 +43,12 @@ struct mca_pml_teg_t {
|
||||
/* free list of recv requests */
|
||||
lam_free_list_t teg_recv_requests;
|
||||
|
||||
#if MCA_PML_TEG_STATISTICS
|
||||
long teg_waits;
|
||||
long teg_condition_waits;
|
||||
long teg_condition_broadcasts;
|
||||
#endif
|
||||
|
||||
/* request completion */
|
||||
lam_mutex_t teg_request_lock;
|
||||
lam_condition_t teg_request_cond;
|
||||
@ -99,6 +106,14 @@ extern int mca_pml_teg_add_ptls(
|
||||
lam_list_t *ptls
|
||||
);
|
||||
|
||||
extern int mca_pml_teg_control(
|
||||
int param,
|
||||
void *size,
|
||||
size_t value
|
||||
);
|
||||
|
||||
extern int mca_pml_teg_progress(void);
|
||||
|
||||
extern int mca_pml_teg_isend_init(
|
||||
void *buf,
|
||||
size_t count,
|
||||
|
@ -71,6 +71,12 @@ int mca_pml_teg_module_open(void)
|
||||
teg_null->req_status.MPI_ERROR = LAM_SUCCESS;
|
||||
teg_null->req_status._count = 0;
|
||||
|
||||
#if MCA_PML_TEG_STATISTICS
|
||||
mca_pml_teg.teg_waits = 0;
|
||||
mca_pml_teg.teg_condition_waits = 0;
|
||||
mca_pml_teg.teg_condition_broadcasts = 0;
|
||||
#endif
|
||||
|
||||
mca_pml_teg.teg_free_list_num =
|
||||
mca_pml_teg_param_register_int("free_list_num", 256);
|
||||
mca_pml_teg.teg_free_list_max =
|
||||
@ -85,6 +91,22 @@ int mca_pml_teg_module_open(void)
|
||||
|
||||
int mca_pml_teg_module_close(void)
|
||||
{
|
||||
#if MCA_PML_TEG_STATISTICS
|
||||
lam_output(0, "mca_pml_teg.teg_waits = %d\n",
|
||||
mca_pml_teg.teg_waits);
|
||||
lam_output(0, "mca_pml_teg.teg_condition_waits = %d\n",
|
||||
mca_pml_teg.teg_condition_waits);
|
||||
lam_output(0, "mca_pml_teg.teg_condition_broadcast = %d\n",
|
||||
mca_pml_teg.teg_condition_broadcasts);
|
||||
#endif
|
||||
|
||||
if (mca_pml_teg.teg_recv_requests.fl_num_allocated !=
|
||||
mca_pml_teg.teg_recv_requests.super.lam_list_length) {
|
||||
lam_output(0, "teg recv requests: %d allocated %d returned\n",
|
||||
mca_pml_teg.teg_recv_requests.fl_num_allocated,
|
||||
mca_pml_teg.teg_recv_requests.super.lam_list_length);
|
||||
}
|
||||
|
||||
if(NULL != mca_pml_teg.teg_ptl_modules)
|
||||
free(mca_pml_teg.teg_ptl_modules);
|
||||
if(NULL != mca_pml_teg.teg_ptls)
|
||||
|
@ -4,8 +4,7 @@
|
||||
|
||||
int mca_pml_teg_progress(void)
|
||||
{
|
||||
#if 0
|
||||
mca_ptl_base_tstamp_t tstamp;
|
||||
mca_ptl_tstamp_t tstamp = 0;
|
||||
size_t i;
|
||||
|
||||
/*
|
||||
@ -13,7 +12,6 @@ int mca_pml_teg_progress(void)
|
||||
*/
|
||||
for(i=0; i<mca_pml_teg.teg_num_ptl_modules; i++)
|
||||
mca_pml_teg.teg_ptl_modules[i]->ptlm_progress(tstamp);
|
||||
#endif
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -15,10 +15,13 @@ void mca_pml_teg_recv_request_progress(
|
||||
req->super.req_status.MPI_TAG = req->super.req_tag;
|
||||
req->super.req_status.MPI_ERROR = LAM_SUCCESS;
|
||||
req->super.req_status._count = req->req_bytes_delivered;
|
||||
req->super.req_mpi_done = true;
|
||||
req->super.req_pml_done = true;
|
||||
req->super.req_mpi_done = true;
|
||||
if(mca_pml_teg.teg_request_waiting) {
|
||||
#if MCA_PML_TEG_STATISTICS
|
||||
mca_pml_teg.teg_condition_broadcasts++;
|
||||
lam_condition_broadcast(&mca_pml_teg.teg_request_cond);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
lam_mutex_unlock(&mca_pml_teg.teg_request_lock);
|
||||
|
@ -14,6 +14,10 @@ int mca_pml_teg_wait(
|
||||
int completed = -1;
|
||||
mca_pml_base_request_t* pml_request;
|
||||
|
||||
#if MCA_PML_TEG_STATISTICS
|
||||
mca_pml_teg.teg_waits++;
|
||||
#endif
|
||||
|
||||
#if LAM_HAVE_THREADS
|
||||
/* poll for completion */
|
||||
for(c=0; completed < 0 && c < mca_pml_teg.teg_poll_iterations; c++) {
|
||||
@ -41,8 +45,12 @@ int mca_pml_teg_wait(
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(completed < 0)
|
||||
if(completed < 0) {
|
||||
#if MCA_PML_TEG_STATISTICS
|
||||
mca_pml_teg.teg_condition_waits++;
|
||||
#endif
|
||||
lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
|
||||
}
|
||||
} while(completed < 0);
|
||||
mca_pml_teg.teg_request_waiting--;
|
||||
lam_mutex_unlock(&mca_pml_teg.teg_request_lock);
|
||||
|
@ -42,16 +42,16 @@ int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size)
|
||||
size_t i;
|
||||
|
||||
/* send message sequence-number support - sender side */
|
||||
comm->c_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size);
|
||||
comm->c_msg_seq = malloc(sizeof(mca_ptl_sequence_t) * size);
|
||||
if(NULL == comm->c_msg_seq)
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
memset(comm->c_msg_seq, 0, sizeof(mca_ptl_base_sequence_t) * size);
|
||||
memset(comm->c_msg_seq, 0, sizeof(mca_ptl_sequence_t) * size);
|
||||
|
||||
/* send message sequence-number support - receiver side */
|
||||
comm->c_next_msg_seq = malloc(sizeof(mca_ptl_base_sequence_t) * size);
|
||||
comm->c_next_msg_seq = malloc(sizeof(mca_ptl_sequence_t) * size);
|
||||
if(NULL == comm->c_next_msg_seq)
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
memset(comm->c_next_msg_seq, 0, sizeof(mca_ptl_base_sequence_t) * size);
|
||||
memset(comm->c_next_msg_seq, 0, sizeof(mca_ptl_sequence_t) * size);
|
||||
|
||||
/* unexpected fragments queues */
|
||||
comm->c_unexpected_frags = malloc(sizeof(lam_list_t) * size);
|
||||
|
@ -20,9 +20,9 @@ extern lam_class_t mca_pml_ptl_comm_t_class;
|
||||
*/
|
||||
struct mca_pml_comm_t {
|
||||
lam_object_t super;
|
||||
mca_ptl_base_sequence_t *c_msg_seq; /**< send message sequence number - sender side */
|
||||
mca_ptl_base_sequence_t *c_next_msg_seq; /**< send message sequence number - receiver side */
|
||||
mca_ptl_base_sequence_t c_recv_seq; /**< recv request sequence number - receiver side */
|
||||
mca_ptl_sequence_t *c_msg_seq; /**< send message sequence number - sender side */
|
||||
mca_ptl_sequence_t *c_next_msg_seq; /**< send message sequence number - receiver side */
|
||||
mca_ptl_sequence_t c_recv_seq; /**< recv request sequence number - receiver side */
|
||||
lam_mutex_t c_matching_lock; /**< matching lock */
|
||||
lam_list_t *c_unexpected_frags; /**< unexpected fragment queues */
|
||||
lam_list_t *c_frags_cant_match; /**< out-of-order fragment queues */
|
||||
@ -50,9 +50,9 @@ extern int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size);
|
||||
* @return Next available sequence number.
|
||||
*/
|
||||
|
||||
static inline mca_ptl_base_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst)
|
||||
static inline mca_ptl_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst)
|
||||
{
|
||||
mca_ptl_base_sequence_t sequence;
|
||||
mca_ptl_sequence_t sequence;
|
||||
lam_mutex_lock(&comm->c_matching_lock);
|
||||
sequence = comm->c_msg_seq[dst]++;
|
||||
lam_mutex_unlock(&comm->c_matching_lock);
|
||||
|
@ -37,7 +37,7 @@ struct mca_ptl_base_frag_header_t {
|
||||
mca_ptl_base_common_header_t hdr_common; /**< common attributes */
|
||||
uint32_t hdr_frag_length; /**< fragment length */
|
||||
uint32_t hdr_frag_offset; /**< offset into message */
|
||||
mca_ptl_base_sequence_t hdr_frag_seq; /**< fragment sequence number */
|
||||
mca_ptl_sequence_t hdr_frag_seq; /**< fragment sequence number */
|
||||
lam_ptr_t hdr_src_ptr; /**< pointer to source fragment */
|
||||
lam_ptr_t hdr_dst_ptr; /**< pointer to matched receive */
|
||||
};
|
||||
@ -55,7 +55,7 @@ struct mca_ptl_base_match_header_t {
|
||||
int32_t hdr_dst; /**< destination rank */
|
||||
int32_t hdr_tag; /**< user tag */
|
||||
uint32_t hdr_msg_length; /**< message length */
|
||||
mca_ptl_base_sequence_t hdr_msg_seq; /**< message sequence number */
|
||||
mca_ptl_sequence_t hdr_msg_seq; /**< message sequence number */
|
||||
};
|
||||
typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t;
|
||||
|
||||
|
@ -79,7 +79,7 @@ bool mca_ptl_base_match(
|
||||
lam_list_t *additional_matches)
|
||||
{
|
||||
/* local variables */
|
||||
mca_ptl_base_sequence_t frag_msg_seq,next_msg_seq_expected;
|
||||
mca_ptl_sequence_t frag_msg_seq,next_msg_seq_expected;
|
||||
lam_communicator_t *comm_ptr;
|
||||
mca_ptl_base_recv_request_t *matched_receive;
|
||||
mca_pml_ptl_comm_t *pml_comm;
|
||||
@ -374,7 +374,7 @@ static mca_ptl_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
|
||||
{
|
||||
/* local variables */
|
||||
mca_ptl_base_recv_request_t *specific_recv, *wild_recv, *return_match;
|
||||
mca_ptl_base_sequence_t wild_recv_seq, specific_recv_seq;
|
||||
mca_ptl_sequence_t wild_recv_seq, specific_recv_seq;
|
||||
int frag_src,frag_tag, wild_recv_tag, specific_recv_tag;
|
||||
|
||||
/* initialization */
|
||||
@ -513,7 +513,7 @@ static void mca_ptl_base_check_cantmatch_for_match(lam_list_t *additional_matche
|
||||
{
|
||||
/* local parameters */
|
||||
int match_found;
|
||||
mca_ptl_base_sequence_t next_msg_seq_expected, frag_seq;
|
||||
mca_ptl_sequence_t next_msg_seq_expected, frag_seq;
|
||||
mca_ptl_base_recv_frag_t *frag_desc;
|
||||
mca_ptl_base_recv_request_t *matched_receive;
|
||||
|
||||
|
@ -144,9 +144,13 @@ struct mca_ptl_base_recv_frag_t;
|
||||
struct mca_ptl_base_send_frag_t;
|
||||
struct mca_ptl_base_match_header_t;
|
||||
|
||||
typedef uint64_t mca_ptl_base_sequence_t;
|
||||
typedef uint64_t mca_ptl_base_tstamp_t;
|
||||
typedef lam_list_t mca_ptl_base_queue_t;
|
||||
typedef uint64_t mca_ptl_sequence_t;
|
||||
typedef uint64_t mca_ptl_tstamp_t;
|
||||
typedef lam_list_t mca_ptl_queue_t;
|
||||
|
||||
typedef enum {
|
||||
MCA_PTL_ENABLE
|
||||
} mca_ptl_control_t;
|
||||
|
||||
/*
|
||||
* PTL module interface functions and datatype.
|
||||
@ -179,6 +183,33 @@ typedef struct mca_ptl_t** (*mca_ptl_base_module_init_fn_t)(
|
||||
bool *have_hidden_threads
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* MCA->PTL Called to dynamically change a module parameter.
|
||||
*
|
||||
* @param flag (IN) Parameter to change.
|
||||
* @param value (IN) Optional parameter value.
|
||||
*
|
||||
* @return LAM_SUCCESS or error code on failure.
|
||||
*/
|
||||
typedef int (*mca_ptl_base_module_control_fn_t)(
|
||||
int param,
|
||||
void* value,
|
||||
size_t size
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* MCA->PTL Called to progress outstanding requests for
|
||||
* non-threaded polling environments.
|
||||
*
|
||||
* @param tstamp Current time.
|
||||
* @return LAM_SUCCESS or error code on failure.
|
||||
*/
|
||||
typedef int (*mca_ptl_base_module_progress_fn_t)(
|
||||
mca_ptl_tstamp_t tstamp
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* PTL module descriptor. Contains module version information
|
||||
@ -189,6 +220,8 @@ struct mca_ptl_base_module_1_0_0_t {
|
||||
mca_base_module_t ptlm_version;
|
||||
mca_base_module_data_1_0_0_t ptlm_data;
|
||||
mca_ptl_base_module_init_fn_t ptlm_init;
|
||||
mca_ptl_base_module_control_fn_t ptlm_control;
|
||||
mca_ptl_base_module_progress_fn_t ptlm_progress;
|
||||
};
|
||||
typedef struct mca_ptl_base_module_1_0_0_t mca_ptl_base_module_1_0_0_t;
|
||||
typedef struct mca_ptl_base_module_1_0_0_t mca_ptl_base_module_t;
|
||||
|
@ -77,6 +77,22 @@ extern mca_ptl_t** mca_ptl_tcp_module_init(
|
||||
bool *have_hidden_threads
|
||||
);
|
||||
|
||||
/**
|
||||
* TCP module control.
|
||||
*/
|
||||
extern int mca_ptl_tcp_module_control(
|
||||
int param,
|
||||
void* value,
|
||||
size_t size
|
||||
);
|
||||
|
||||
/**
|
||||
* TCP module progress.
|
||||
*/
|
||||
extern int mca_ptl_tcp_module_progress(
|
||||
mca_ptl_tstamp_t tstamp
|
||||
);
|
||||
|
||||
/**
|
||||
* TCP PTL Interface
|
||||
*/
|
||||
|
@ -55,7 +55,9 @@ mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = {
|
||||
false
|
||||
},
|
||||
|
||||
mca_ptl_tcp_module_init /* module init */
|
||||
mca_ptl_tcp_module_init,
|
||||
mca_ptl_tcp_module_control,
|
||||
mca_ptl_tcp_module_progress,
|
||||
}
|
||||
};
|
||||
|
||||
@ -135,6 +137,25 @@ int mca_ptl_tcp_module_open(void)
|
||||
|
||||
int mca_ptl_tcp_module_close(void)
|
||||
{
|
||||
if (mca_ptl_tcp_module.tcp_send_requests.fl_num_allocated !=
|
||||
mca_ptl_tcp_module.tcp_send_requests.super.lam_list_length) {
|
||||
lam_output(0, "tcp send requests: %d allocated %d returned\n",
|
||||
mca_ptl_tcp_module.tcp_send_requests.fl_num_allocated !=
|
||||
mca_ptl_tcp_module.tcp_send_requests.super.lam_list_length);
|
||||
}
|
||||
if (mca_ptl_tcp_module.tcp_send_frags.fl_num_allocated !=
|
||||
mca_ptl_tcp_module.tcp_send_frags.super.lam_list_length) {
|
||||
lam_output(0, "tcp send frags: %d allocated %d returned\n",
|
||||
mca_ptl_tcp_module.tcp_send_frags.fl_num_allocated !=
|
||||
mca_ptl_tcp_module.tcp_send_frags.super.lam_list_length);
|
||||
}
|
||||
if (mca_ptl_tcp_module.tcp_recv_frags.fl_num_allocated !=
|
||||
mca_ptl_tcp_module.tcp_recv_frags.super.lam_list_length) {
|
||||
lam_output(0, "tcp recv frags: %d allocated %d returned\n",
|
||||
mca_ptl_tcp_module.tcp_recv_frags.fl_num_allocated !=
|
||||
mca_ptl_tcp_module.tcp_recv_frags.super.lam_list_length);
|
||||
}
|
||||
|
||||
free(mca_ptl_tcp_module.tcp_if_include);
|
||||
free(mca_ptl_tcp_module.tcp_if_exclude);
|
||||
if (NULL != mca_ptl_tcp_module.tcp_ptls)
|
||||
@ -202,11 +223,14 @@ static int mca_ptl_tcp_module_create_instances(void)
|
||||
if(if_index < 0) {
|
||||
lam_output(0,"mca_ptl_tcp_module_init: invalid interface \"%s\"", if_name);
|
||||
} else {
|
||||
lam_output(0,"interface: %s\n", if_name);
|
||||
mca_ptl_tcp_create(if_index);
|
||||
}
|
||||
argv++;
|
||||
}
|
||||
lam_argv_free(include);
|
||||
if(mca_ptl_tcp_module.tcp_num_ptls)
|
||||
return LAM_SUCCESS;
|
||||
|
||||
/* if the interface list was not specified by the user, create
|
||||
* a PTL for each interface that was not excluded.
|
||||
@ -224,8 +248,10 @@ static int mca_ptl_tcp_module_create_instances(void)
|
||||
argv++;
|
||||
}
|
||||
/* if this interface was not found in the excluded list - create a PTL */
|
||||
if(argv == 0 || *argv == 0)
|
||||
if(argv == 0 || *argv == 0) {
|
||||
lam_output(0,"interface: %s\n", if_name);
|
||||
mca_ptl_tcp_create(if_index);
|
||||
}
|
||||
}
|
||||
lam_argv_free(exclude);
|
||||
return LAM_SUCCESS;
|
||||
@ -292,7 +318,6 @@ static int mca_ptl_tcp_module_create_listen(void)
|
||||
LAM_EV_READ|LAM_EV_PERSIST,
|
||||
mca_ptl_tcp_module_recv_handler,
|
||||
0);
|
||||
lam_event_add(&mca_ptl_tcp_module.tcp_recv_event, 0);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
@ -387,6 +412,37 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls,
|
||||
return ptls;
|
||||
}
|
||||
|
||||
/*
|
||||
* TCP module control
|
||||
*/
|
||||
|
||||
int mca_ptl_tcp_module_control(int param, void* value, size_t size)
|
||||
{
|
||||
switch(param) {
|
||||
case MCA_PTL_ENABLE:
|
||||
if(*(int*)value)
|
||||
lam_event_add(&mca_ptl_tcp_module.tcp_recv_event, 0);
|
||||
else
|
||||
lam_event_del(&mca_ptl_tcp_module.tcp_recv_event);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TCP module progress.
|
||||
*/
|
||||
|
||||
int mca_ptl_tcp_module_progress(mca_ptl_tstamp_t tstamp)
|
||||
{
|
||||
lam_event_loop(LAM_EVLOOP_ONCE);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Called by mca_ptl_tcp_module_recv() when the TCP listen
|
||||
* socket has pending connection requests. Accept incoming
|
||||
|
@ -111,6 +111,7 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
|
||||
if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd)) {
|
||||
mca_ptl_tcp_send_frag_progress(frag);
|
||||
} else {
|
||||
lam_output(0, "send incomplete\n");
|
||||
ptl_peer->peer_send_frag = frag;
|
||||
lam_event_add(&ptl_peer->peer_send_event, 0);
|
||||
}
|
||||
|
@ -221,8 +221,10 @@ bool mca_ptl_tcp_proc_accept(mca_ptl_tcp_proc_t* ptl_proc, struct sockaddr_in* a
|
||||
THREAD_LOCK(&ptl_proc->proc_lock);
|
||||
for(i=0; i<ptl_proc->proc_peer_count; i++) {
|
||||
mca_ptl_base_peer_t* ptl_peer = ptl_proc->proc_peers[i];
|
||||
if(mca_ptl_tcp_peer_accept(ptl_peer, addr, sd))
|
||||
if(mca_ptl_tcp_peer_accept(ptl_peer, addr, sd)) {
|
||||
THREAD_UNLOCK(&ptl_proc->proc_lock);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
THREAD_UNLOCK(&ptl_proc->proc_lock);
|
||||
return false;
|
||||
|
@ -122,8 +122,7 @@ int lam_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
default */
|
||||
|
||||
mca_base_param_lookup_int(param, &value);
|
||||
/* lam_mpi_param_check = (bool) value; */
|
||||
lam_mpi_param_check = false;
|
||||
lam_mpi_param_check = (bool) value;
|
||||
|
||||
/* do module exchange */
|
||||
if (LAM_SUCCESS != (ret = mca_base_modex_exchange())) {
|
||||
@ -139,6 +138,11 @@ int lam_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
}
|
||||
free(procs);
|
||||
|
||||
/* start PTL's */
|
||||
param = 1;
|
||||
if (LAM_SUCCESS != (ret = mca_pml.pml_control(MCA_PTL_ENABLE, ¶m, sizeof(param))))
|
||||
return ret;
|
||||
|
||||
/* save the resulting thread levels */
|
||||
|
||||
lam_mpi_thread_requested = requested;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user