6b22641669
I know it does not make much sense but one can play around with the performance. Numbers are available at http://www.unixer.de/research/nbcoll/perf/. This is the first step towards collv2. Next step includes the addition of non-blocking functions to the MPI-Layer and the collv1 interface. It implements all MPI-1 collective algorithms in a non-blocking manner. However, the collv1 interface does not allow non-blocking collectives so that all collectives are used blocking by the ompi-glue layer. I wanted to add LibNBC as a separate subdirectory, but I could not convince the buildsystem (and had not the time). So the component looks pretty messy. It would be great if somebody could explain me how to move all nbc*{c,h}, and {hb,dict}*{c,h} to a seperate subdirectory. It's .ompi_ignored because I did not test it exhaustively yet. This commit was SVN r11401.
1143 строки
44 KiB
C
1143 строки
44 KiB
C
#define IB
|
|
#include "ib.h"
|
|
|
|
double t;
|
|
|
|
/* is IB initialized? */
|
|
static int IB_Ginitialized=0;
|
|
/* the keyval (global) */
|
|
static int IB_Gkeyval=MPI_KEYVAL_INVALID;
|
|
|
|
static struct {
|
|
VAPI_hca_hndl_t hca_hndl;
|
|
VAPI_pd_hndl_t pd_hndl;
|
|
|
|
hb_tree *memlist; /* this is the libdict structure to hang off the search tree */
|
|
} IB_Hca_info;
|
|
|
|
/* function definitions */
|
|
static __inline__ void IB_Memlist_memlist_delete(IB_Memlistel *entry);
|
|
static __inline__ void IB_Memlist_delete_key(IB_Memlistel *k);
|
|
static __inline__ int IB_Memlist_compare_entries(IB_Memlistel *a, IB_Memlistel *b, void *param);
|
|
static __inline__ void IB_Taglist_delete(IB_Taglstel *entry);
|
|
static __inline__ void IB_Taglist_delete_key(IB_Taglstel *k);
|
|
static __inline__ int IB_Taglist_compare_entries(IB_Taglstel *a, IB_Taglstel *b, void *param);
|
|
|
|
#if 0
|
|
static __inline__ int IB_Addtotaglst(IB_Comminfo *comminfo, int tag, IB_Req *req, int peer) {
|
|
IB_Taglstel *new;
|
|
|
|
new = malloc(sizeof(IB_Taglstel));
|
|
new->tag = tag;
|
|
new->peer = peer;
|
|
new->req = req;
|
|
new->next = NULL;
|
|
|
|
/* first element in list */
|
|
if(comminfo->taglistend == NULL) {
|
|
comminfo->taglisthead = new;
|
|
} else {
|
|
comminfo->taglistend->next = new;
|
|
}
|
|
//printf("added tag %i/peer %i as new element to taglist\n", tag, peer);
|
|
comminfo->taglistend = new;
|
|
|
|
return IB_OK;
|
|
}
|
|
|
|
static __inline__ IB_Req *IB_Gettagreq(IB_Comminfo *comminfo, int tag, int peer) {
|
|
IB_Taglstel *ptr, *tmp;
|
|
IB_Req *req;
|
|
|
|
/* empty list */
|
|
if(comminfo->taglisthead == NULL) return NULL;
|
|
|
|
ptr = comminfo->taglisthead;
|
|
/* tag/peer is in first element */
|
|
if((ptr->tag == tag) && (ptr->peer == peer)) {
|
|
req = comminfo->taglisthead->req;
|
|
comminfo->taglisthead = ptr->next;
|
|
/* list is empty */
|
|
if(comminfo->taglisthead == NULL) comminfo->taglistend = NULL;
|
|
free(ptr);
|
|
//printf("removed tag %i\n", tag);
|
|
return req;
|
|
}
|
|
|
|
while ((ptr->next != NULL)) {
|
|
if((ptr->next->tag == tag) && (ptr->next->peer == peer)) break;
|
|
ptr = ptr->next;
|
|
}
|
|
|
|
if(ptr->next == NULL)
|
|
/* we did not find it */
|
|
return NULL;
|
|
else {
|
|
/* we found it somewhere in the middle */
|
|
req = ptr->next->req;
|
|
tmp = ptr->next;
|
|
ptr->next = ptr->next->next;
|
|
/* it was the last element */
|
|
if(ptr->next == NULL) comminfo->taglistend = ptr;
|
|
free(tmp);
|
|
//printf("removed tag %i\n", tag);
|
|
return req;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
static int IB_Create_qp( int rank, int remote, VAPI_hca_hndl_t *hca_hndl_p, VAPI_cq_hndl_t *sr_cq_hndl_p, VAPI_cq_hndl_t *rr_cq_hndl_p, VAPI_qp_hndl_t *qp_hndl_p, VAPI_pd_hndl_t *pd_hndp_p, MPI_Comm comm);
|
|
|
|
static int IB_Key_copy(MPI_Comm oldcomm, int keyval, void *extra_state, void *attribute_val_in, void *attribute_val_out, int *flag) {
|
|
/* delete the attribute in the new comm - it will be created at the
|
|
* first usage */
|
|
*flag = 0;
|
|
|
|
return MPI_SUCCESS;
|
|
}
|
|
|
|
static int IB_Key_delete(MPI_Comm comm, int keyval, void *attribute_val, void *extra_state) {
|
|
IB_Comminfo *comminfo;
|
|
|
|
if(keyval == IB_Gkeyval) {
|
|
comminfo=(IB_Comminfo*)attribute_val;
|
|
free(comminfo);
|
|
} else {
|
|
printf("Got wrong keyval!(%i)\n", keyval);
|
|
}
|
|
|
|
return MPI_SUCCESS;
|
|
}
|
|
|
|
static __inline__ void IB_stat( VAPI_ret_t ret, char *string ) {
|
|
int rank;
|
|
|
|
if( ret != VAPI_OK) {
|
|
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
|
|
printf("[%u]: *** [ERROR] *** %s -> %s (%s)\n", rank, string, VAPI_strerror( ret ), VAPI_strerror_sym( ret ));
|
|
// return;
|
|
MPI_Finalize();
|
|
exit(1);
|
|
}
|
|
}
|
|
|
|
static __inline__ void IB_CQ_stat( VAPI_wc_desc_t ret, char *string ) {
|
|
int rank;
|
|
|
|
if(ret.status != VAPI_SUCCESS) {
|
|
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
|
|
printf("[%u]: *** [ERROR] *** %s -> %s\n", rank, string, VAPI_wc_status_sym( ret.status ));
|
|
printf("[%u]: *** [ERROR] *** opcode -> %s\n", rank, VAPI_cqe_opcode_sym(ret.opcode));
|
|
// return;
|
|
MPI_Finalize();
|
|
exit(1);
|
|
}
|
|
}
|
|
|
|
static int IB_Init() {
|
|
u_int32_t num_of_hcas; /* actual number of hcas */
|
|
VAPI_hca_id_t *hca_id_buf_p; /* HCA result buffer */
|
|
int ret, res;
|
|
|
|
/* keyval is not initialized yet, we have to init it */
|
|
if(MPI_KEYVAL_INVALID == IB_Gkeyval) {
|
|
res = MPI_Keyval_create(IB_Key_copy, IB_Key_delete, &(IB_Gkeyval), NULL);
|
|
if((MPI_SUCCESS != res)) { printf("Error in MPI_Keyval_create() (%i)\n", res); return IB_OOR; }
|
|
}
|
|
|
|
hca_id_buf_p = malloc(sizeof(VAPI_hca_id_t) * 2);
|
|
|
|
/* get all HCAs */
|
|
ret = EVAPI_list_hcas( (u_int32_t)2,
|
|
&num_of_hcas,
|
|
hca_id_buf_p );
|
|
if( ret == VAPI_OK) {
|
|
fprintf(stderr, "[INFO] found %d adapter(s), first-name: %s\n", num_of_hcas, (char *)hca_id_buf_p);
|
|
} else {
|
|
IB_stat( ret, "EVAPI_list_hcas()" );
|
|
}
|
|
|
|
/* get handle of first HCA */
|
|
ret = EVAPI_get_hca_hndl( *hca_id_buf_p, &IB_Hca_info.hca_hndl );
|
|
IB_stat( ret, "EVAPI_get_hca_hndl()" );
|
|
|
|
// allocate PD
|
|
ret = VAPI_alloc_pd( IB_Hca_info.hca_hndl,
|
|
&IB_Hca_info.pd_hndl );
|
|
IB_stat( ret, "VAPI_alloc_pd()" );
|
|
|
|
IB_Hca_info.memlist = hb_tree_new((dict_cmp_func)IB_Memlist_compare_entries, (void *) IB_Memlist_delete_key, (void *)IB_Memlist_memlist_delete);
|
|
if(IB_Hca_info.memlist == NULL) { printf("error in hb_dict_new()\n"); return IB_OOR; }
|
|
|
|
IB_Ginitialized = 1;
|
|
|
|
return IB_OK;
|
|
}
|
|
|
|
static __inline__ IB_Comminfo *IB_Comm_init(MPI_Comm comm) {
|
|
IB_Comminfo *comminfo;
|
|
int res, flag;
|
|
|
|
if(!IB_Ginitialized) IB_Init();
|
|
|
|
res = MPI_Attr_get(comm, IB_Gkeyval, &comminfo, &flag);
|
|
if((MPI_SUCCESS != res)) { printf("Error in MPI_Attr_get() (%i)\n", res); return NULL; }
|
|
if (!flag) {
|
|
VAPI_cqe_num_t num_of_entries_p; /* # CQ entries */
|
|
VAPI_mrw_t req_mrw_p;
|
|
VAPI_mrw_t rep_mrw_p; /* responded memory region */
|
|
IB_Peer_info *a2abuf1, *a2abuf2, *a2abuf3;
|
|
int p, i, j, rank;
|
|
|
|
res = MPI_Comm_size(comm, &p);
|
|
res = MPI_Comm_rank(comm, &rank);
|
|
|
|
/* we have to create a new one */
|
|
comminfo = malloc(sizeof(IB_Comminfo));
|
|
if(comminfo == NULL) { printf("Error in malloc()\n"); return NULL; }
|
|
|
|
comminfo->taglisthead = NULL;
|
|
comminfo->taglistend = NULL;
|
|
|
|
//printf("[%i] build up %i connections in comm %p \n", rank, p-1, comm);
|
|
/* allocate QPs */
|
|
comminfo->qp_hndl_arr = malloc(p*sizeof(VAPI_qp_hndl_t));
|
|
if(comminfo->qp_hndl_arr == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate SR CQs */
|
|
comminfo->sr_cq_hndl_arr = malloc(p*sizeof(VAPI_cq_hndl_t));
|
|
if(comminfo->sr_cq_hndl_arr == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate RR CQs */
|
|
comminfo->rr_cq_hndl_arr = malloc(p*sizeof(VAPI_cq_hndl_t));
|
|
if(comminfo->rr_cq_hndl_arr == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate a tag list for each host */
|
|
comminfo->taglist=malloc(p*sizeof(hb_tree*));
|
|
if(comminfo->taglist == NULL) { printf("malloc() error\n"); return NULL; }
|
|
for(i=0; i<p;i++) {
|
|
comminfo->taglist[i] = hb_tree_new((dict_cmp_func)IB_Taglist_compare_entries, (void *) IB_Taglist_delete_key, (void *)IB_Taglist_delete);
|
|
if(comminfo->taglist[i] == NULL) { printf("hb_tree_new() error\n"); return NULL; }
|
|
}
|
|
|
|
/* allocate rtr send queue */
|
|
comminfo->rtr_send=malloc(IB_RTR_SIZE*sizeof(IB_Peer_info_tag));
|
|
if(comminfo->rtr_send == NULL) { printf("malloc() error\n"); return NULL; }
|
|
for(i=0; i<IB_RTR_SIZE;i++) comminfo->rtr_send[i].tag=-1;
|
|
/* allocate rtr queue */
|
|
comminfo->rtr=malloc(p*sizeof(IB_Peer_info_tag*));
|
|
if(comminfo->rtr== NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate rtr free queue */
|
|
comminfo->rtr_peer_free=malloc(p*sizeof(int*));
|
|
if(comminfo->rtr_peer_free == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate rtr info */
|
|
comminfo->rtr_info=malloc(p*sizeof(IB_Peer_info));
|
|
if(comminfo->rtr_info == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate rtr l_key */
|
|
comminfo->rtr_l_key=malloc(p*sizeof(VAPI_lkey_t));
|
|
if(comminfo->rtr_l_key == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate rtr memory region handle */
|
|
comminfo->rtr_mr_hndl_p=malloc(p*sizeof(VAPI_mr_hndl_t));
|
|
if(comminfo->rtr_mr_hndl_p == NULL) { printf("malloc() error\n"); return NULL; }
|
|
|
|
/* allocate eager send queue */
|
|
comminfo->eager_send=malloc(IB_EAGER_SIZE*sizeof(IB_Eager_data));
|
|
if(comminfo->eager_send == NULL) { printf("malloc() error\n"); return NULL; }
|
|
for(i=0; i<IB_EAGER_SIZE;i++) comminfo->eager_send[i].tag=-1;
|
|
/* allocate eager queue */
|
|
comminfo->eager=malloc(p*sizeof(IB_Eager_data*));
|
|
if(comminfo->eager== NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate eager free queue */
|
|
comminfo->eager_peer_free=malloc(p*sizeof(int*));
|
|
if(comminfo->eager_peer_free == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate eager info */
|
|
comminfo->eager_info=malloc(p*sizeof(IB_Peer_info));
|
|
if(comminfo->eager_info == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate eager l_key */
|
|
comminfo->eager_l_key=malloc(p*sizeof(VAPI_lkey_t));
|
|
if(comminfo->eager_l_key == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate eager memory region handle */
|
|
comminfo->eager_mr_hndl_p=malloc(p*sizeof(VAPI_mr_hndl_t));
|
|
if(comminfo->eager_mr_hndl_p == NULL) { printf("malloc() error\n"); return NULL; }
|
|
|
|
/* allocate eager free info */
|
|
comminfo->eager_free_info=malloc(p*sizeof(IB_Peer_info));
|
|
if(comminfo->eager_free_info == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate eager free memory region handle */
|
|
comminfo->eager_free_mr_hndl_p=malloc(p*sizeof(VAPI_mr_hndl_t));
|
|
if(comminfo->eager_free_mr_hndl_p == NULL) { printf("malloc() error\n"); return NULL; }
|
|
|
|
/* allocate a2abuf1 */
|
|
a2abuf1=malloc(p*sizeof(IB_Peer_info));
|
|
if(a2abuf1 == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate a2abuf2 */
|
|
a2abuf2=malloc(p*sizeof(IB_Peer_info));
|
|
if(a2abuf2 == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* allocate a2abuf3 */
|
|
a2abuf3=malloc(p*sizeof(IB_Peer_info));
|
|
if(a2abuf3 == NULL) { printf("malloc() error\n"); return NULL; }
|
|
|
|
for(i = 0; i < p; i++) {
|
|
if(i == rank) continue;
|
|
res = VAPI_create_cq(IB_Hca_info.hca_hndl, 100000, &comminfo->sr_cq_hndl_arr[i], &num_of_entries_p );
|
|
IB_stat( res, "VAPI_create_cq()" );
|
|
res = VAPI_create_cq(IB_Hca_info.hca_hndl, 100000, &comminfo->rr_cq_hndl_arr[i], &num_of_entries_p );
|
|
IB_stat( res, "VAPI_create_cq()" );
|
|
|
|
res = IB_Create_qp(rank, i, &IB_Hca_info.hca_hndl, &(comminfo->sr_cq_hndl_arr[i]), &(comminfo->rr_cq_hndl_arr[i]), &(comminfo->qp_hndl_arr[i]), &IB_Hca_info.pd_hndl, comm );
|
|
//printf("[%i] rank %i has sr_cq: %i and rr_cq: %i and qp_hndl: %i\n", rank, i, (int)comminfo->sr_cq_hndl_arr[i], (int)comminfo->rr_cq_hndl_arr[i], (int)comminfo->qp_hndl_arr[i]);
|
|
if(res != 0) { printf("Error in IB_Create_qp (%i)\n", res); return NULL; }
|
|
|
|
/* allocate rtr element */
|
|
comminfo->rtr[i] = malloc(sizeof(IB_Peer_info_tag)*IB_RTR_SIZE);
|
|
if(comminfo->rtr[i] == NULL) { printf("malloc() error\n"); return NULL; }
|
|
for(j=0; j<IB_RTR_SIZE; j++) comminfo->rtr[i][j].tag = -1;
|
|
/* allocate rtr free queue */
|
|
comminfo->rtr_peer_free[i]=malloc(sizeof(int)*IB_RTR_SIZE);
|
|
if(comminfo->rtr_peer_free[i] == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* set free rtr queue to free :) */
|
|
for(j=0; j<IB_RTR_SIZE; j++) comminfo->rtr_peer_free[i][j] = -1;
|
|
|
|
/* allocate eager element */
|
|
comminfo->eager[i] = malloc(sizeof(IB_Eager_data)*IB_EAGER_SIZE);
|
|
if(comminfo->eager[i] == NULL) { printf("malloc() error\n"); return NULL; }
|
|
for(j=0; j<IB_EAGER_SIZE; j++) { memset((void*)(&comminfo->eager[i][j]), 0, sizeof(IB_Eager_data)); }
|
|
/* allocate free queue */
|
|
comminfo->eager_peer_free[i]=malloc(sizeof(int)*IB_EAGER_SIZE);
|
|
if(comminfo->eager_peer_free[i] == NULL) { printf("malloc() error\n"); return NULL; }
|
|
/* set free queue to free :) */
|
|
for(j=0; j<IB_EAGER_SIZE; j++) comminfo->eager_peer_free[i][j] = -1;
|
|
}
|
|
|
|
for(i=0; i<p;i++) {
|
|
if (rank == i) continue;
|
|
/* register rtr buffer */
|
|
memset(&req_mrw_p, 0, sizeof(VAPI_mrw_t));
|
|
req_mrw_p.type = VAPI_MR;
|
|
req_mrw_p.start = (VAPI_virt_addr_t)(comminfo->rtr[i]);
|
|
req_mrw_p.size = sizeof(IB_Peer_info_tag)*IB_RTR_SIZE;
|
|
req_mrw_p.pd_hndl = IB_Hca_info.pd_hndl;
|
|
req_mrw_p.acl = VAPI_EN_LOCAL_WRITE |
|
|
VAPI_EN_REMOTE_WRITE |
|
|
VAPI_EN_REMOTE_READ;
|
|
|
|
res = VAPI_register_mr( IB_Hca_info.hca_hndl,
|
|
&req_mrw_p,
|
|
&comminfo->rtr_mr_hndl_p[i],
|
|
&rep_mrw_p );
|
|
IB_stat( res, "VAPI_register_mr() for RTR buffers" );
|
|
|
|
comminfo->rtr_l_key[i] = rep_mrw_p.l_key;
|
|
|
|
a2abuf1[i].r_key = (unsigned long)rep_mrw_p.r_key;
|
|
a2abuf1[i].addr = (unsigned long)comminfo->rtr[i];
|
|
//printf("[%i] my info - r_key: %lu addr for rank %i: %lu\n", rank, a2abuf[i].r_key, i, a2abuf[i].addr);
|
|
|
|
/* register eager buffer */
|
|
memset(&req_mrw_p, 0, sizeof(VAPI_mrw_t));
|
|
req_mrw_p.type = VAPI_MR;
|
|
req_mrw_p.start = (VAPI_virt_addr_t)(comminfo->eager[i]);
|
|
req_mrw_p.size = sizeof(IB_Eager_data)*IB_EAGER_SIZE;
|
|
req_mrw_p.pd_hndl = IB_Hca_info.pd_hndl;
|
|
req_mrw_p.acl = VAPI_EN_LOCAL_WRITE |
|
|
VAPI_EN_REMOTE_WRITE |
|
|
VAPI_EN_REMOTE_READ;
|
|
|
|
res = VAPI_register_mr( IB_Hca_info.hca_hndl,
|
|
&req_mrw_p,
|
|
&comminfo->eager_mr_hndl_p[i],
|
|
&rep_mrw_p );
|
|
IB_stat( res, "VAPI_register_mr() for eager buffers" );
|
|
|
|
comminfo->eager_l_key[i] = rep_mrw_p.l_key;
|
|
|
|
a2abuf2[i].r_key = (unsigned long)rep_mrw_p.r_key;
|
|
a2abuf2[i].addr = (unsigned long)comminfo->eager[i];
|
|
|
|
/* register eager free buffer */
|
|
memset(&req_mrw_p, 0, sizeof(VAPI_mrw_t));
|
|
req_mrw_p.type = VAPI_MR;
|
|
req_mrw_p.start = (VAPI_virt_addr_t)(comminfo->eager_peer_free[i]);
|
|
req_mrw_p.size = sizeof(int)*IB_EAGER_SIZE;
|
|
req_mrw_p.pd_hndl = IB_Hca_info.pd_hndl;
|
|
req_mrw_p.acl = VAPI_EN_LOCAL_WRITE |
|
|
VAPI_EN_REMOTE_WRITE |
|
|
VAPI_EN_REMOTE_READ;
|
|
|
|
res = VAPI_register_mr( IB_Hca_info.hca_hndl,
|
|
&req_mrw_p,
|
|
&comminfo->eager_free_mr_hndl_p[i],
|
|
&rep_mrw_p );
|
|
IB_stat( res, "VAPI_register_mr() for eager free buffer" );
|
|
|
|
a2abuf3[i].r_key = (unsigned long)rep_mrw_p.r_key;
|
|
a2abuf3[i].addr = (unsigned long)comminfo->eager_peer_free[i];
|
|
//printf("[%i] my info - r_key: %lu addr for rank %i: %lu\n", rank, a2abuf1[i].r_key, i, a2abuf1[i].addr);
|
|
//printf("[%i] my info - r_key: %lu addr for rank %i: %lu\n", rank, a2abuf2[i].r_key, i, a2abuf2[i].addr);
|
|
//printf("[%i] my info - r_key: %lu addr for rank %i: %lu\n", rank, a2abuf3[i].r_key, i, a2abuf3[i].addr);
|
|
}
|
|
MPI_Alltoall(a2abuf1, 2, MPI_UNSIGNED_LONG, comminfo->rtr_info, 2, MPI_UNSIGNED_LONG, comm);
|
|
free(a2abuf1);
|
|
MPI_Alltoall(a2abuf2, 2, MPI_UNSIGNED_LONG, comminfo->eager_info, 2, MPI_UNSIGNED_LONG, comm);
|
|
free(a2abuf2);
|
|
MPI_Alltoall(a2abuf3, 2, MPI_UNSIGNED_LONG, comminfo->eager_free_info, 2, MPI_UNSIGNED_LONG, comm);
|
|
free(a2abuf3);
|
|
for(i=0; i<p;i++) {
|
|
if (rank == i) continue;
|
|
//printf("[%i] rtr rem info - r_key: %lu addr for me at node %i: %lu\n", rank, comminfo->rtr_info[i].r_key, i, comminfo->rtr_info[i].addr);
|
|
//printf("[%i] eager rem info - r_key: %lu addr for me at node %i: %lu\n", rank, comminfo->eager_info[i].r_key, i, comminfo->eager_info[i].addr);
|
|
//printf("[%i] eager_free rem info - r_key: %lu addr for me at node %i: %lu\n", rank, comminfo->eager_free_info[i].r_key, i, comminfo->eager_free_info[i].addr);
|
|
}
|
|
|
|
/* register rtr send buffer */
|
|
memset(&req_mrw_p, 0, sizeof(VAPI_mrw_t));
|
|
req_mrw_p.type = VAPI_MR;
|
|
req_mrw_p.start = (VAPI_virt_addr_t)(comminfo->rtr_send);
|
|
req_mrw_p.size = sizeof(IB_Peer_info_tag)*IB_RTR_SIZE;
|
|
req_mrw_p.pd_hndl = IB_Hca_info.pd_hndl;
|
|
req_mrw_p.acl = VAPI_EN_LOCAL_WRITE |
|
|
VAPI_EN_REMOTE_WRITE |
|
|
VAPI_EN_REMOTE_READ;
|
|
|
|
res = VAPI_register_mr( IB_Hca_info.hca_hndl,
|
|
&req_mrw_p,
|
|
&comminfo->rtr_send_mr_hndl_p,
|
|
&rep_mrw_p );
|
|
IB_stat( res, "VAPI_register_mr()" );
|
|
|
|
comminfo->rtr_send_l_key = rep_mrw_p.l_key;
|
|
|
|
/* register eager send buffer */
|
|
memset(&req_mrw_p, 0, sizeof(VAPI_mrw_t));
|
|
req_mrw_p.type = VAPI_MR;
|
|
req_mrw_p.start = (VAPI_virt_addr_t)(comminfo->eager_send);
|
|
req_mrw_p.size = sizeof(IB_Eager_data)*IB_EAGER_SIZE;
|
|
req_mrw_p.pd_hndl = IB_Hca_info.pd_hndl;
|
|
req_mrw_p.acl = VAPI_EN_LOCAL_WRITE |
|
|
VAPI_EN_REMOTE_WRITE |
|
|
VAPI_EN_REMOTE_READ;
|
|
|
|
res = VAPI_register_mr( IB_Hca_info.hca_hndl,
|
|
&req_mrw_p,
|
|
&comminfo->eager_send_mr_hndl_p,
|
|
&rep_mrw_p );
|
|
IB_stat( res, "VAPI_register_mr()" );
|
|
|
|
comminfo->eager_send_l_key = rep_mrw_p.l_key;
|
|
|
|
/* register empty send buffer */
|
|
memset(&req_mrw_p, 0, sizeof(VAPI_mrw_t));
|
|
req_mrw_p.type = VAPI_MR;
|
|
req_mrw_p.start = (VAPI_virt_addr_t)(&comminfo->empty);
|
|
req_mrw_p.size = sizeof(int32_t);
|
|
req_mrw_p.pd_hndl = IB_Hca_info.pd_hndl;
|
|
req_mrw_p.acl = VAPI_EN_LOCAL_WRITE |
|
|
VAPI_EN_REMOTE_WRITE |
|
|
VAPI_EN_REMOTE_READ;
|
|
|
|
res = VAPI_register_mr( IB_Hca_info.hca_hndl,
|
|
&req_mrw_p,
|
|
&comminfo->eager_send_mr_hndl_p,
|
|
&rep_mrw_p );
|
|
IB_stat( res, "VAPI_register_mr()" );
|
|
|
|
comminfo->empty_l_key = rep_mrw_p.l_key;
|
|
|
|
comminfo->empty = -1;
|
|
|
|
/* put the new attribute to the comm */
|
|
res = MPI_Attr_put(comm, IB_Gkeyval, comminfo);
|
|
if((MPI_SUCCESS != res)) { printf("Error in MPI_Attr_put() (%i)\n", res); return NULL; }
|
|
}
|
|
|
|
return comminfo;
|
|
}
|
|
|
|
static __inline__ int IB_Register_mem(void *buf, int size, VAPI_mr_hndl_t *mr, VAPI_rkey_t *r_key, VAPI_lkey_t *l_key) {
|
|
VAPI_mrw_t req_mrw_p;
|
|
VAPI_mrw_t rep_mrw_p; /* responded memory region */
|
|
int res, rank;
|
|
IB_Memlistel *memel, *newel, keyel;
|
|
|
|
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
|
//printf("[%i] in IB_Register_mem\n", rank);
|
|
|
|
keyel.buf = buf;
|
|
keyel.size = size;
|
|
memel = hb_tree_search(IB_Hca_info.memlist, &keyel);
|
|
if(memel != NULL) {
|
|
//printf("[%i] we found a region from %lu to %lu :-)\n", rank, (unsigned long)memel->buf,(unsigned long)(memel->buf+memel->size));
|
|
if(r_key != NULL) *r_key = memel->r_key;
|
|
if(l_key != NULL) *l_key = memel->l_key;
|
|
return IB_OK;
|
|
}
|
|
|
|
//printf("[%i] we did not find a region - registering %i bytes from addr %lu to %lu :-(\n", rank, size, (unsigned long)buf, (unsigned long)(buf+size));
|
|
memset(&req_mrw_p, 0, sizeof(VAPI_mrw_t));
|
|
req_mrw_p.type = VAPI_MR;
|
|
req_mrw_p.start = (VAPI_virt_addr_t)buf;
|
|
req_mrw_p.size = size;
|
|
req_mrw_p.pd_hndl = IB_Hca_info.pd_hndl;
|
|
req_mrw_p.acl = VAPI_EN_LOCAL_WRITE |
|
|
VAPI_EN_REMOTE_WRITE |
|
|
VAPI_EN_REMOTE_READ;
|
|
|
|
res = VAPI_register_mr( IB_Hca_info.hca_hndl,
|
|
&req_mrw_p,
|
|
mr,
|
|
&rep_mrw_p );
|
|
IB_stat( res, "VAPI_register_mr()" );
|
|
|
|
/* TODO: we should react to "Resources temporary unavailable" (EAGAIN)
|
|
* and free some MRs ... to continue :) */
|
|
newel = malloc(sizeof(IB_Memlistel));
|
|
newel->buf = buf;
|
|
newel->size = size;
|
|
newel->mr=mr;
|
|
newel->r_key=rep_mrw_p.r_key;
|
|
newel->l_key=rep_mrw_p.l_key;
|
|
res = hb_tree_insert (IB_Hca_info.memlist, newel, newel, 0);
|
|
if(res != 0) printf("[%i] error in dict_insert() (%i) while inserting region from %lu to %lu\n", rank, res, (unsigned long)newel->buf, (unsigned long)(newel->buf+newel->size));
|
|
//if(res == 0) printf("[%i] inserted region from %lu to %lu\n", rank, (unsigned long)newel->buf, (unsigned long)newel->top);
|
|
|
|
if(r_key != NULL) *r_key = rep_mrw_p.r_key;
|
|
if(l_key != NULL) *l_key = rep_mrw_p.l_key;
|
|
|
|
return IB_OK;
|
|
}
|
|
|
|
static __inline__ int IB_Do_send(IB_Req *req) {
|
|
int res;
|
|
|
|
//printf("[%i] posting data SR from %p size: %i tag: %i to %i (addr: %lu, r_key: %u)\n", req->rank, (void*)req->sr_sg_lst.addr, req->sr_sg_lst.len, req->sr_desc.imm_data, req->peer, (unsigned long)req->sr_desc.remote_addr, req->sr_desc.r_key);
|
|
res = VAPI_post_sr( IB_Hca_info.hca_hndl, req->comminfo->qp_hndl_arr[req->peer], &req->sr_desc );
|
|
IB_stat( res, "VAPI_post_sr()" );
|
|
|
|
req->status = SEND_POSTED_SR;
|
|
|
|
return IB_OK;
|
|
}
|
|
|
|
int IB_Isend(void *buf, int count, MPI_Datatype type, int dst, int tag, MPI_Comm comm, IB_Request *request) {
|
|
int res,sendentry,i;
|
|
MPI_Aint ext;
|
|
IB_Req *req;
|
|
char *flag;
|
|
VAPI_sg_lst_entry_t sr_sg_lst; /* the IB SG list */
|
|
VAPI_sr_desc_t sr_desc; /* the IB SR descr. */
|
|
|
|
*request = malloc(sizeof(IB_Req));
|
|
req = *request;
|
|
|
|
if(!count) {
|
|
req->status = SEND_DONE;
|
|
return IB_OK;
|
|
}
|
|
|
|
MPI_Type_extent(type, &ext);
|
|
|
|
req->comminfo = IB_Comm_init(comm);
|
|
if(req->comminfo == NULL) { printf("Error in IB_Comm_init()\n"); return IB_OOR; }
|
|
|
|
if(count*ext <= IB_EAGER_LIMIT) {
|
|
/* we send this as eager message */
|
|
|
|
/* find free eager send buffer */
|
|
for(sendentry=0; sendentry<IB_EAGER_SIZE; sendentry++) {
|
|
if(req->comminfo->eager_send[sendentry].tag == -1) break;
|
|
}
|
|
if(sendentry==IB_EAGER_SIZE) { printf("*** eager send list full - we should retry later but crash\n"); exit(1); }
|
|
//printf("[%i] found local eager sendbuffer entry %i\n", req->rank, sendentry);
|
|
req->sendel = sendentry;
|
|
|
|
/* copy data into buffer */
|
|
memcpy(&req->comminfo->eager_send[sendentry].buf, buf, count*ext);
|
|
/* set header */
|
|
req->comminfo->eager_send[sendentry].size=count*ext;
|
|
req->comminfo->eager_send[sendentry].tag=tag;
|
|
/* set next byte after message to '1' */
|
|
flag = (char*)&req->comminfo->eager_send[sendentry].buf + req->comminfo->eager_send[sendentry].size;
|
|
*flag = (char)1;
|
|
|
|
/* prepare eager send request */
|
|
sr_sg_lst.addr = (VAPI_virt_addr_t)(&req->comminfo->eager_send[sendentry]);
|
|
sr_sg_lst.len = sizeof(int32_t)+2*sizeof(int16_t)+sizeof(char)+req->comminfo->eager_send[sendentry].size; /* TODO: dangerous - datatype */
|
|
sr_sg_lst.lkey = req->comminfo->eager_send_l_key;
|
|
sr_desc.id = (u_int64_t)req;
|
|
sr_desc.opcode = VAPI_RDMA_WRITE;
|
|
sr_desc.set_se = 0;
|
|
sr_desc.comp_type = VAPI_SIGNALED;
|
|
sr_desc.sg_lst_p = &sr_sg_lst;
|
|
sr_desc.sg_lst_len = 1;
|
|
sr_desc.imm_data = (u_int64_t)0; /* not used */
|
|
sr_desc.fence = 0;
|
|
sr_desc.compare_add = 0;
|
|
sr_desc.r_key = (VAPI_rkey_t)(req->comminfo->eager_info[dst].r_key);
|
|
/* get offset in the receiver's rtr eager array */
|
|
for(i=0; i<IB_EAGER_SIZE; i++) {
|
|
if(req->comminfo->eager_peer_free[dst][i] == -1) break;
|
|
}
|
|
if(i==IB_EAGER_SIZE) { printf("******************* [%i] eager list on peer %i full - we should retry later but crash\n", req->rank, dst); }
|
|
req->comminfo->eager_peer_free[dst][i] = tag;
|
|
sr_desc.remote_addr = req->comminfo->eager_info[dst].addr+i*sizeof(IB_Eager_data); /* TODO: 64 Bit */
|
|
req->comminfo->eager_send[sendentry].index=i;
|
|
//printf("[%i] found free eager slot index %i on node %i (addr: %lu) free)\n", req->rank, i, dst, sr_desc.remote_addr);
|
|
|
|
res = VAPI_post_sr( IB_Hca_info.hca_hndl, req->comminfo->qp_hndl_arr[dst], &sr_desc );
|
|
IB_stat( res, "VAPI_post_sr()" );
|
|
//printf("[%i] post EAGER SR from %lu to node %i addr %lu rkey: %u, len: %o\n", req->rank, sr_sg_lst.addr, req->peer, (unsigned long)sr_desc.remote_addr, sr_desc.r_key, sr_sg_lst.len);
|
|
|
|
|
|
req->status = SEND_SENT_EAGER;
|
|
//while(IB_Test(&req) != IB_OK);
|
|
|
|
} else {
|
|
/* we send this as rendezvous */
|
|
|
|
/* register memory region for send */
|
|
//printf("[%i] register send memory %lu size: %i\n", req->rank, (unsigned long)buf, (int)(count*ext));
|
|
res = IB_Register_mem(buf, count*ext, &req->mr_hndl_p, NULL, &req->sr_sg_lst.lkey); /* TODO: count*ext Danger for datatypes ... */
|
|
|
|
/* initialize sr_desc as far as we can (remote r_key and addr are
|
|
* missing set after we received RTR */
|
|
req->sr_sg_lst.addr = (VAPI_virt_addr_t)buf;
|
|
req->sr_sg_lst.len = count*ext; /* TODO: count*ext Danger for datatypes ... */
|
|
req->sr_desc.id = (u_int64_t)req;
|
|
req->sr_desc.opcode = VAPI_SEND_WITH_IMM;
|
|
req->sr_desc.set_se = 0;
|
|
req->sr_desc.comp_type = VAPI_SIGNALED;
|
|
req->sr_desc.sg_lst_p = &req->sr_sg_lst;
|
|
req->sr_desc.sg_lst_len = 1;
|
|
req->sr_desc.imm_data = (u_int64_t)tag;
|
|
req->sr_desc.fence = 0;
|
|
req->sr_desc.compare_add= 0;
|
|
|
|
req->status = SEND_WAITING_RTR;
|
|
}
|
|
|
|
//IB_Test(&req);
|
|
MPI_Comm_rank(comm, &req->rank);
|
|
MPI_Comm_size(comm, &req->p);
|
|
req->tag = tag;
|
|
req->peer= dst;
|
|
|
|
return IB_OK;
|
|
}
|
|
|
|
int IB_Irecv(void *buf, int count, MPI_Datatype type, int src, int tag, MPI_Comm comm, IB_Request *request) {
|
|
int res, i, sendentry;
|
|
MPI_Aint ext;
|
|
VAPI_sg_lst_entry_t sr_sg_lst; /* the IB SG list */
|
|
VAPI_sr_desc_t sr_desc; /* the IB SR descr. */
|
|
VAPI_sg_lst_entry_t rr_sg_lst;
|
|
VAPI_rr_desc_t rr_desc;
|
|
IB_Req *req;
|
|
IB_Taglstel *newel;
|
|
|
|
*request = malloc(sizeof(IB_Req));
|
|
req = *request;
|
|
|
|
if(count == 0) {
|
|
req->status = RECV_DONE;
|
|
return IB_OK;
|
|
}
|
|
|
|
MPI_Type_extent(type, &ext);
|
|
MPI_Comm_size(comm, &req->p);
|
|
MPI_Comm_rank(comm, &req->rank);
|
|
req->tag = tag;
|
|
req->peer= src;
|
|
req->buf=buf;
|
|
|
|
req->comminfo = IB_Comm_init(comm);
|
|
if(req->comminfo == NULL) { printf("Error in IB_Comm_init()\n"); return IB_OOR; }
|
|
|
|
if(count*ext <= IB_EAGER_LIMIT) {
|
|
/* do nothing, wait for eager message */
|
|
req->status = RECV_WAITING_EAGER;
|
|
/* we take a look if we received it already */
|
|
//IB_Test(&req);
|
|
} else {
|
|
/* find a new empty sendentry in the comminfo->send array which is
|
|
* pre-registered to send RTR messages from */
|
|
for(sendentry=0; sendentry<IB_RTR_SIZE; sendentry++) {
|
|
if(req->comminfo->rtr_send[sendentry].tag == -1) break;
|
|
}
|
|
if(sendentry==IB_RTR_SIZE) { printf("*** rtr send list full - we should retry later but crash\n"); exit(1); }
|
|
/* fill selected send entry */
|
|
req->comminfo->rtr_send[sendentry].tag = tag;
|
|
req->comminfo->rtr_send[sendentry].addr = (unsigned long)buf;
|
|
/* remember index in sendlist to free it fast after sending */
|
|
req->sendel = sendentry;
|
|
|
|
/* register memory region for recv */
|
|
res = IB_Register_mem(buf, count*ext, &req->mr_hndl_p, (VAPI_rkey_t*)&req->comminfo->rtr_send[sendentry].r_key, &rr_sg_lst.lkey); /* TODO: count*ext Danger for datatypes ... */
|
|
|
|
/* prepare data receive request */
|
|
rr_sg_lst.addr = (VAPI_virt_addr_t)buf;
|
|
rr_sg_lst.len = count*ext;
|
|
rr_desc.sg_lst_p = &rr_sg_lst;
|
|
rr_desc.sg_lst_len = 1;
|
|
rr_desc.id = (u_int64_t)req;
|
|
rr_desc.opcode = VAPI_RECEIVE;
|
|
rr_desc.comp_type = VAPI_SIGNALED;
|
|
|
|
res = VAPI_post_rr( IB_Hca_info.hca_hndl, req->comminfo->qp_hndl_arr[src], &rr_desc );
|
|
IB_stat( res, "VAPI_post_rr()" );
|
|
|
|
/* add the tag and the peer to the taglist */
|
|
/* legacy old crappy taglist :) */
|
|
//res = IB_Addtotaglst(comminfo, tag, req, src);
|
|
|
|
/* new fancy taglist */
|
|
newel = malloc(sizeof(IB_Taglstel));
|
|
newel->tag=tag;
|
|
newel->req=req;
|
|
res = hb_tree_insert(req->comminfo->taglist[src], newel, newel, 0);
|
|
//printf("[%i] inserted request %lu with tag %i and src %i\n", req->rank, (unsigned long)req, tag, src);
|
|
|
|
/* prepare RTR send request */
|
|
sr_sg_lst.addr = (VAPI_virt_addr_t)(&req->comminfo->rtr_send[sendentry]);
|
|
sr_sg_lst.len = sizeof(IB_Peer_info_tag);
|
|
sr_sg_lst.lkey = req->comminfo->rtr_send_l_key;
|
|
sr_desc.id = (u_int64_t)req;
|
|
sr_desc.opcode = VAPI_RDMA_WRITE;
|
|
sr_desc.set_se = 0;
|
|
sr_desc.comp_type = VAPI_SIGNALED;
|
|
sr_desc.sg_lst_p = &sr_sg_lst;
|
|
sr_desc.sg_lst_len = 1;
|
|
sr_desc.imm_data = (u_int64_t)0; /* not used */
|
|
sr_desc.fence = 0;
|
|
sr_desc.compare_add = 0;
|
|
sr_desc.r_key = (VAPI_rkey_t)(req->comminfo->rtr_info[src].r_key);
|
|
/* get offset in the receiver's rtr RTR array */
|
|
for(i=0; i<IB_RTR_SIZE; i++) {
|
|
if(req->comminfo->rtr_peer_free[src][i] == -1) break;
|
|
}
|
|
if(i==IB_RTR_SIZE) { printf("*** unexpected list full - we should retry later but crash\n"); }
|
|
req->comminfo->rtr_peer_free[src][i] = tag;
|
|
sr_desc.remote_addr = req->comminfo->rtr_info[src].addr+i*sizeof(IB_Peer_info_tag); /* TODO: 64 Bit */
|
|
//printf("[%i] found free RTR slot index %i on node %i (addr: %lu) free)\n", req->rank, i, src, sr_desc.remote_addr);
|
|
|
|
/* post RTR request */
|
|
res = VAPI_post_sr( IB_Hca_info.hca_hndl, req->comminfo->qp_hndl_arr[req->peer], &sr_desc );
|
|
IB_stat( res, "VAPI_post_sr()" );
|
|
//printf("[%i] post RTR SR from %lu to node %i addr %lu rkey: %u\n", req->rank, sr_sg_lst.addr, req->peer, (unsigned long)sr_desc.remote_addr, sr_desc.r_key);
|
|
|
|
req->status = RECV_SENDING_RTR;
|
|
}
|
|
|
|
//IB_Test(&req);
|
|
|
|
return IB_OK;
|
|
}
|
|
|
|
int IB_Test(IB_Request *request) {
|
|
int i, j, res;
|
|
VAPI_wc_desc_t comp_desc_p; /* work completion descriptor */
|
|
IB_Req *tmpreq, *req;
|
|
IB_Taglstel *memel, keyel;
|
|
char *flag;
|
|
|
|
req = *request;
|
|
|
|
if((req->status == SEND_DONE) || (req->status == RECV_DONE) || (req->status == RECV_EAGER_DONE))
|
|
return IB_OK;
|
|
|
|
/* if I wait for RTR - search rtr array for my tag ... */
|
|
if(req->status == SEND_WAITING_RTR) {
|
|
for(i=0; i<IB_RTR_SIZE; i++) {
|
|
if(req->comminfo->rtr[req->peer][i].tag == req->tag) {
|
|
//printf("[%i] found RTR from peer %i at addr %lu (tag: %i, r_key: %lu, addr: %lu)\n", req->rank, req->peer, (unsigned long)(&req->comminfo->rtr[req->peer][i]), req->comminfo->rtr[req->peer][i].tag, req->comminfo->rtr[req->peer][i].r_key, (unsigned long)req->comminfo->rtr[req->peer][i].addr);
|
|
req->sr_desc.r_key = (VAPI_rkey_t)req->comminfo->rtr[req->peer][i].r_key;
|
|
req->sr_desc.remote_addr = req->comminfo->rtr[req->peer][i].addr; /* TODO: 64 Bit */
|
|
/* 'free' rtr element */
|
|
req->comminfo->rtr[req->peer][i].tag = -1;
|
|
IB_Do_send(req);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* I wait for an eager message */
|
|
if(req->status == RECV_WAITING_EAGER) {
|
|
/* ok, poll all eager slots we have from the peer we wait for */
|
|
for(i=0; i<IB_EAGER_SIZE; i++) {
|
|
if(req->comminfo->eager[req->peer][i].tag == req->tag) {
|
|
VAPI_sg_lst_entry_t sr_sg_lst; /* the IB SG list */
|
|
VAPI_sr_desc_t sr_desc; /* the IB SR descr. */
|
|
int index;
|
|
|
|
/* poll last byte until we can be sure that we have the *full*
|
|
* message in the buffer */
|
|
flag = (char*)&req->comminfo->eager[req->peer][i].buf + req->comminfo->eager[req->peer][i].size;
|
|
while(*flag != (char)1);
|
|
//printf("[%i] found eager message from peer %i at addr %lu (tag: %i)\n", req->rank, req->peer, (unsigned long)(&req->comminfo->eager[req->peer][i]), (int)req->comminfo->eager[req->peer][i].tag);
|
|
/* copy message to recv buffer */
|
|
memcpy(req->buf, &req->comminfo->eager[req->peer][i].buf, req->comminfo->eager[req->peer][i].size);
|
|
index = req->comminfo->eager[req->peer][i].index;
|
|
/* set the buffer to '0' to make flag-polling useful */
|
|
memset((void*)(&req->comminfo->eager[req->peer][i]), 0, sizeof(int32_t)+2*sizeof(int16_t)+sizeof(char)+req->comminfo->eager[req->peer][i].size);
|
|
/* RDMA into the free-buffer on the sender to indicate that my
|
|
* buffer can be reused */
|
|
|
|
//printf("[%i] RDMA EAGER_RECVD (%i) to node %i in buffer %lu at index %i\n", req->rank, req->comminfo->empty, req->peer, (unsigned long)req->comminfo->eager_free_info[req->peer].addr, index);
|
|
/* prepare EAGER_RECVD send request */
|
|
sr_sg_lst.addr = (VAPI_virt_addr_t)(&req->comminfo->empty);
|
|
sr_sg_lst.len = sizeof(int);
|
|
sr_sg_lst.lkey = req->comminfo->empty_l_key;
|
|
sr_desc.id = (u_int64_t)0;
|
|
sr_desc.opcode = VAPI_RDMA_WRITE;
|
|
sr_desc.set_se = 0;
|
|
sr_desc.comp_type = VAPI_SIGNALED;
|
|
sr_desc.sg_lst_p = &sr_sg_lst;
|
|
sr_desc.sg_lst_len = 1;
|
|
sr_desc.imm_data = (u_int64_t)0; /* not used */
|
|
sr_desc.fence = 0;
|
|
sr_desc.compare_add = 0;
|
|
sr_desc.r_key = (VAPI_rkey_t)(req->comminfo->eager_free_info[req->peer].r_key);
|
|
sr_desc.remote_addr = req->comminfo->eager_free_info[req->peer].addr+sizeof(int)*index; /* TODO: 64 Bit */
|
|
|
|
/* post EAGER_RECVD request */
|
|
res = VAPI_post_sr( IB_Hca_info.hca_hndl, req->comminfo->qp_hndl_arr[req->peer], &sr_desc );
|
|
IB_stat( res, "VAPI_post_sr()" );
|
|
/* mark receive as done */
|
|
req->status = RECV_EAGER_DONE;
|
|
/* leave loop */
|
|
break;
|
|
// return IB_OK;
|
|
// -> we need to poll CQs ...
|
|
}
|
|
}
|
|
// return IB_CONTINUE;
|
|
}
|
|
|
|
//t=0-MPI_Wtime();
|
|
/* poll all CQs of the comm related to req (we should probably only poll those where we wait for something */
|
|
//for(i=0; i<req->p; i++) {
|
|
//for(i=req->peer; i<=req->peer; i++) {
|
|
//if(i == req->rank) continue;
|
|
i=req->peer;
|
|
/************************************** SEND QUEUE handling ************************************/
|
|
res = VAPI_poll_cq( IB_Hca_info.hca_hndl, req->comminfo->sr_cq_hndl_arr[i], &comp_desc_p );
|
|
if((res != VAPI_EBUSY) && (res != VAPI_CQ_EMPTY)) {
|
|
//printf("[%i] have SR CQE from host %i\n", req->rank, i);
|
|
IB_CQ_stat( comp_desc_p, "VAPI_poll_cq(SR)" );
|
|
/* id == 0 for eager recvd messages - we do not need to wait for them ... */
|
|
if(comp_desc_p.id != 0) {
|
|
tmpreq = (IB_Req*)(comp_desc_p.id);
|
|
if(tmpreq->status == SEND_POSTED_SR) {
|
|
/* we sent the message and are ready */
|
|
/* TODO: free *all* request resources here */
|
|
tmpreq->status = SEND_DONE;
|
|
//printf("[%i] req %lu send to %i with tag %i is done ...\n", tmpreq->rank, (unsigned long)tmpreq, tmpreq->peer, tmpreq->tag);
|
|
} else if (tmpreq->status == SEND_SENT_EAGER) {
|
|
/* set rtr sendlist element to free */
|
|
tmpreq->comminfo->eager_send[tmpreq->sendel].tag = -1;
|
|
tmpreq->status = SEND_DONE;
|
|
//printf("[%i] eager req %lu send to %i with tag %i is done ...\n", tmpreq->rank, (unsigned long)tmpreq, tmpreq->peer, tmpreq->tag);
|
|
} else if ((tmpreq->status == RECV_SENDING_RTR) || (tmpreq->status == RECV_DONE)) {
|
|
/* set rtr sendlist element to free */
|
|
tmpreq->comminfo->rtr_send[tmpreq->sendel].tag = -1;
|
|
/* do not change DONE requests back :o) */
|
|
if(tmpreq->status != RECV_DONE) tmpreq->status = RECV_SENT_RTR;
|
|
} else {
|
|
printf("[%i] req %lu unexpected status (%i) for send to %i (tag: %i) after poll sr_cq \n", tmpreq->rank, (unsigned long)tmpreq, tmpreq->status, tmpreq->peer, tmpreq->tag);
|
|
}
|
|
}
|
|
}
|
|
/************************************** RECEIVE QUEUE handling ************************************/
|
|
res = VAPI_poll_cq( IB_Hca_info.hca_hndl, req->comminfo->rr_cq_hndl_arr[i], &comp_desc_p );
|
|
if((res != VAPI_EBUSY) && (res != VAPI_CQ_EMPTY)) {
|
|
//printf("[%i] have RR CQE from host %i\n", req->rank, i);
|
|
IB_CQ_stat( comp_desc_p, "VAPI_poll_cq(RR)" );
|
|
|
|
/* we received real data - match it (tag, peer)*/
|
|
keyel.tag=comp_desc_p.imm_data;
|
|
memel = hb_tree_search(req->comminfo->taglist[i], &keyel);
|
|
if(memel == NULL) {
|
|
printf("[%i] got unexpected packet with tag: %i, peer: %i\n", req->rank, comp_desc_p.imm_data, i);
|
|
printf(" this CANNOT happen!!!\n");
|
|
return IB_CONTINUE;
|
|
}
|
|
/* delete element - TODO: this should actually be done together with
|
|
* the find ... we should extend libdict */
|
|
res = hb_tree_remove(req->comminfo->taglist[i], &keyel, 0);
|
|
if(res != 0) { printf("error deleting tag element in hb_tree_remove()\n"); }
|
|
tmpreq = memel->req;
|
|
//printf("[%i] found request %lu for tag %i and src %i\n", tmpreq->rank, (unsigned long)tmpreq, comp_desc_p.imm_data, i);
|
|
if((tmpreq->status == RECV_SENDING_RTR) || (tmpreq->status == RECV_SENT_RTR)) {
|
|
/* TODO: free *all* req resources here */
|
|
//printf("[%i] req %lu we received data message from host %i for tag %i-> done \n", req->rank, (unsigned long)tmpreq, req->peer, req->tag);
|
|
|
|
/* get offset in the receiver's array */
|
|
for(j=0; j<IB_RTR_SIZE; j++) {
|
|
if(tmpreq->comminfo->rtr_peer_free[tmpreq->peer][j] == tmpreq->tag) break;
|
|
}
|
|
if(j==IB_RTR_SIZE) { printf("[%i] we did not find tag %i to delete - should not HAPPEN!!!\n", tmpreq->rank, tmpreq->tag); }
|
|
tmpreq->comminfo->rtr_peer_free[tmpreq->peer][j] = -1;
|
|
|
|
tmpreq->status = RECV_DONE;
|
|
} else {
|
|
printf("[%i] req %lu (tag: %i) unexpected status (%i) after poll rr_cq \n", tmpreq->rank, (unsigned long)tmpreq, tmpreq->tag, tmpreq->status);
|
|
}
|
|
}
|
|
//}
|
|
//t+=MPI_Wtime();
|
|
//printf("time: %lf\n", t*1e6);
|
|
|
|
if((req->status == SEND_DONE) || (req->status == RECV_DONE) || (req->status == RECV_EAGER_DONE))
|
|
return IB_OK;
|
|
else
|
|
return IB_CONTINUE;
|
|
}
|
|
|
|
int IB_Wait(IB_Request *request) {
|
|
|
|
while(IB_Test(request) != IB_OK) {};
|
|
|
|
return IB_OK;
|
|
}
|
|
|
|
int IB_Testall(int count, IB_Request *requests, int *flag) {
|
|
int i, res;
|
|
|
|
*flag = 1;
|
|
for(i=0; i<count; i++) {
|
|
res = IB_Test(&requests[i]);
|
|
if(res != IB_OK) *flag = 0;
|
|
}
|
|
|
|
return IB_OK;
|
|
}
|
|
|
|
int IB_Waitall(int count, IB_Request *requests) {
|
|
int i, res, done;
|
|
|
|
done = IB_CONTINUE;
|
|
do {
|
|
done = IB_OK;
|
|
for(i=0; i<count; i++) {
|
|
if((requests[i]->status != SEND_DONE) && (requests[i]->status != RECV_DONE) && (requests[i]->status != RECV_EAGER_DONE))
|
|
{
|
|
res = IB_Test(&requests[i]); /* we shouldn't test ready requests ... */
|
|
/* in case of error */
|
|
if((res != IB_OK) && (res != IB_CONTINUE)) break;
|
|
/* we have at least one unfinished request ... */
|
|
if(res == IB_CONTINUE) done = IB_CONTINUE;
|
|
}
|
|
}
|
|
} while(done == IB_CONTINUE);
|
|
|
|
return done;
|
|
}
|
|
|
|
static int IB_Create_qp( int rank, int remote,
|
|
VAPI_hca_hndl_t *hca_hndl_p,
|
|
VAPI_cq_hndl_t *sr_cq_hndl_p,
|
|
VAPI_cq_hndl_t *rr_cq_hndl_p,
|
|
VAPI_qp_hndl_t *qp_hndl_p,
|
|
VAPI_pd_hndl_t *pd_hndp_p,
|
|
MPI_Comm comm) {
|
|
// variables
|
|
VAPI_qp_init_attr_t qp_init_attr_p; // QP init attribs
|
|
VAPI_ret_t ret; // VAPI return value
|
|
VAPI_qp_prop_t qp_prop_p, rem_qp_prop_p; // QP properties
|
|
VAPI_hca_port_t hca_port_p, rem_hca_port_p; // port properties
|
|
VAPI_qp_attr_mask_t qp_attr_mask_p; // QP attribute mask
|
|
VAPI_qp_attr_t qp_attr_p; // QP attributes
|
|
VAPI_qp_cap_t qp_cap_p; // QP capabilities
|
|
|
|
MPI_Status stat; // mpi status
|
|
|
|
// get my LID
|
|
ret = VAPI_query_hca_port_prop( *hca_hndl_p,
|
|
(IB_port_t)1,
|
|
&hca_port_p );
|
|
IB_stat( ret, "EVAPI_get_hca_hndl()" );
|
|
|
|
|
|
// set QP Properties ...
|
|
qp_init_attr_p.sq_cq_hndl = *sr_cq_hndl_p; // cq associated with sr's
|
|
qp_init_attr_p.rq_cq_hndl = *rr_cq_hndl_p; // cq associated with rr's
|
|
qp_init_attr_p.cap.max_oust_wr_sq = 1000; // TODO guessed
|
|
qp_init_attr_p.cap.max_oust_wr_rq = 1000; // TODO guessed
|
|
qp_init_attr_p.cap.max_sg_size_sq = 1; // TODO guessed
|
|
qp_init_attr_p.cap.max_sg_size_rq = 1; // TODO guessed
|
|
qp_init_attr_p.cap.max_inline_data_sq = 1; // TODO guessed
|
|
// qp_init_attr_p.rdd_hndl = no rdd (Reliable Datagram Domain) used
|
|
qp_init_attr_p.sq_sig_type = VAPI_SIGNAL_ALL_WR; // signal all submitted WR's
|
|
qp_init_attr_p.rq_sig_type = VAPI_SIGNAL_ALL_WR; // signal all submitted WR's
|
|
qp_init_attr_p.pd_hndl = *pd_hndp_p; // the PD
|
|
// QP Transport Service Type
|
|
qp_init_attr_p.ts_type = VAPI_TS_RC;
|
|
|
|
// create QP
|
|
ret = VAPI_create_qp( *hca_hndl_p,
|
|
&qp_init_attr_p,
|
|
qp_hndl_p,
|
|
&qp_prop_p );
|
|
IB_stat( ret, "VAPI_create_qp()" );
|
|
|
|
MPI_Send(&hca_port_p.lid, 1, MPI_INT, remote, 0, comm);
|
|
MPI_Recv(&rem_hca_port_p.lid, 1, MPI_INT, remote, 0, comm, &stat);
|
|
MPI_Send(&qp_prop_p.qp_num, 1, MPI_INT, remote, 0, comm);
|
|
MPI_Recv(&rem_qp_prop_p.qp_num, 1, MPI_INT, remote, 0, comm, &stat);
|
|
|
|
//fprintf(stderr, "[INFO] (lid:qp) - %x:%u -> %x:%u\n", hca_port_p.lid, qp_prop_p.qp_num, rem_hca_port_p.lid, rem_qp_prop_p.qp_num);
|
|
|
|
// see page 222 for details ...
|
|
/* Transition RST to INIT */
|
|
QP_ATTR_MASK_CLR_ALL(qp_attr_mask_p);
|
|
// QP State
|
|
QP_ATTR_MASK_SET( qp_attr_mask_p, QP_ATTR_QP_STATE);
|
|
qp_attr_p.qp_state = VAPI_INIT;
|
|
|
|
// partition key index (see page 319)
|
|
QP_ATTR_MASK_SET( qp_attr_mask_p, QP_ATTR_PKEY_IX);
|
|
qp_attr_p.pkey_ix = 0; // first partition key
|
|
|
|
// queue key -> only for datagram (RD, UD) TODO set this!
|
|
// QP_ATTR_MASK_SET( qp_attr_mask_p, QP_ATTR_QKEY);
|
|
// qp_attr_p.qkey = 0;
|
|
|
|
// enable/disable RDMA R / Atomic -> allow all :)
|
|
QP_ATTR_MASK_SET( qp_attr_mask_p, QP_ATTR_REMOTE_ATOMIC_FLAGS);
|
|
qp_attr_p.remote_atomic_flags = VAPI_EN_REM_WRITE;
|
|
|
|
// physical port
|
|
QP_ATTR_MASK_SET( qp_attr_mask_p, QP_ATTR_PORT);
|
|
qp_attr_p.port = 1; // TODO should not be static!
|
|
|
|
ret = VAPI_modify_qp( *hca_hndl_p,
|
|
*qp_hndl_p,
|
|
&qp_attr_p,
|
|
&qp_attr_mask_p,
|
|
&qp_cap_p );
|
|
IB_stat( ret, "VAPI_modify_qp() (RST->INIT)" );
|
|
|
|
// Transition INIT to RTR
|
|
QP_ATTR_MASK_CLR_ALL(qp_attr_mask_p);
|
|
|
|
// new state
|
|
QP_ATTR_MASK_SET( qp_attr_mask_p, QP_ATTR_QP_STATE);
|
|
qp_attr_p.qp_state = VAPI_RTR;
|
|
|
|
// remote node address vector
|
|
QP_ATTR_MASK_SET( qp_attr_mask_p, QP_ATTR_AV);
|
|
qp_attr_p.av.sl = 0; // TODO guessed
|
|
qp_attr_p.av.dlid = rem_hca_port_p.lid; // partners lid
|
|
qp_attr_p.av.src_path_bits = 0; // TODO guessed
|
|
qp_attr_p.av.static_rate = 0; // TODO guessed
|
|
qp_attr_p.av.grh_flag = 0; // non set ...
|
|
qp_attr_p.av.traffic_class = 0; // TODO guessed
|
|
qp_attr_p.av.hop_limit = 255; // should be ok - only global routing
|
|
qp_attr_p.av.flow_label = 0; // only global routing
|
|
qp_attr_p.av.sgid_index = 0; // only global routing
|
|
qp_attr_p.av.port = 1; // TODO guessed
|
|
|
|
// packet sequence number
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_RQ_PSN);
|
|
qp_attr_p.rq_psn = 0;
|
|
|
|
// number of responder resources for RDMA R + Atomic
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_QP_OUS_RD_ATOM);
|
|
qp_attr_p.qp_ous_rd_atom = 5; //TODO tune here?
|
|
|
|
// minimum rnr nak timer
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_MIN_RNR_TIMER);
|
|
qp_attr_p.min_rnr_timer = 1; // TODO guessed
|
|
|
|
// dest QP number
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_DEST_QP_NUM);
|
|
qp_attr_p.dest_qp_num = rem_qp_prop_p.qp_num; // partners qp num
|
|
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_PATH_MTU);
|
|
qp_attr_p.path_mtu = MTU1024; // TODO tune here
|
|
|
|
ret = VAPI_modify_qp( *hca_hndl_p,
|
|
*qp_hndl_p,
|
|
&qp_attr_p,
|
|
&qp_attr_mask_p,
|
|
&qp_cap_p );
|
|
IB_stat( ret, "VAPI_modify_qp() (INIT->RTR)" );
|
|
|
|
|
|
// Transition RTR to RTS
|
|
QP_ATTR_MASK_CLR_ALL(qp_attr_mask_p);
|
|
// new state
|
|
QP_ATTR_MASK_SET( qp_attr_mask_p, QP_ATTR_QP_STATE);
|
|
qp_attr_p.qp_state = VAPI_RTS;
|
|
|
|
// SQ sequence number
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_SQ_PSN);
|
|
qp_attr_p.sq_psn = 0;
|
|
|
|
// local ACK timeout
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_TIMEOUT);
|
|
qp_attr_p.timeout = 10; // TODO tune here
|
|
|
|
// retry count
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_RNR_RETRY);
|
|
qp_attr_p.rnr_retry = 255; // increased due to VAPI_RETRY_EXC_ERR if 30 nodes fire 0.5MB at one :) -> maximum :)
|
|
|
|
// number of outstanding RDMA R / atomic at destination
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_OUS_DST_RD_ATOM);
|
|
qp_attr_p.ous_dst_rd_atom = 10; // TODO tune here
|
|
|
|
// retry count
|
|
QP_ATTR_MASK_SET(qp_attr_mask_p, QP_ATTR_RETRY_COUNT);
|
|
qp_attr_p.retry_count = 255; // guessed
|
|
|
|
ret = VAPI_modify_qp( *hca_hndl_p,
|
|
*qp_hndl_p,
|
|
&qp_attr_p,
|
|
&qp_attr_mask_p,
|
|
&qp_cap_p );
|
|
IB_stat( ret, "VAPI_modify_qp() (RTR->RTS)" );
|
|
|
|
return IB_OK;
|
|
}
|
|
|
|
static __inline__ int IB_Memlist_compare_entries(IB_Memlistel *a, IB_Memlistel *b,void *param) {
|
|
|
|
/* two memory regions are defined as equal if they have some common
|
|
* memory - more is not possible, because we have to ensure
|
|
* reflexibility (a=b includes b=a) */
|
|
|
|
if( (a->buf == b->buf) && (a->size == b->size) ) {
|
|
return 0;
|
|
}
|
|
if ( (a->buf < b->buf)) {
|
|
return -1;
|
|
}
|
|
return +1;
|
|
}
|
|
|
|
static __inline__ void IB_Memlist_delete_key(IB_Memlistel *k) {
|
|
/* do nothing because the key and the data element are identical :-)
|
|
* both (the single one :) is freed in IB_Memlist_memlist_delete() */
|
|
}
|
|
|
|
static __inline__ void IB_Memlist_memlist_delete(IB_Memlistel *entry) {
|
|
/* free entry and deregister MR here ... */
|
|
}
|
|
|
|
static __inline__ int IB_Taglist_compare_entries(IB_Taglstel *a, IB_Taglstel *b,void *param) {
|
|
|
|
if( a->tag == b->tag ) {
|
|
return 0;
|
|
}
|
|
if( a->tag < b->tag ) {
|
|
return -1;
|
|
}
|
|
return +1;
|
|
}
|
|
|
|
static __inline__ void IB_Taglist_delete_key(IB_Taglstel *k) {
|
|
/* do nothing because the key and the data element are identical :-)
|
|
* both (the single one :) is freed in IB_Memlist_memlist_delete() */
|
|
}
|
|
|
|
static __inline__ void IB_Taglist_delete(IB_Taglstel *entry) {
|
|
/* free taglistentry */
|
|
free(entry);
|
|
}
|