1
1

don't call unpack when we received directly into the user buffer.. the

convertor doesn't handle it properly
continue peeking until we don't get anything else.. 
close the endpoint before closing the library.. 
add a blocking send that uses mx_test .. 

This commit was SVN r10684.
Этот коммит содержится в:
Galen Shipman 2006-07-06 19:54:13 +00:00
родитель bc7690bcb0
Коммит 5085061475
4 изменённых файлов: 181 добавлений и 64 удалений

Просмотреть файл

@ -42,6 +42,7 @@ ompi_mtl_datatype_pack(struct ompi_convertor_t *convertor,
iov.iov_base = malloc(max_data); iov.iov_base = malloc(max_data);
if (NULL == iov.iov_base) return OMPI_ERR_OUT_OF_RESOURCE; if (NULL == iov.iov_base) return OMPI_ERR_OUT_OF_RESOURCE;
*freeAfter = true; *freeAfter = true;
} else { } else {
iov.iov_base = NULL; iov.iov_base = NULL;
*freeAfter = false; *freeAfter = false;
@ -98,10 +99,10 @@ ompi_mtl_datatype_unpack(struct ompi_convertor_t *convertor,
max_data = iov.iov_len; max_data = iov.iov_len;
if (max_data > 0 && ompi_convertor_need_buffers(convertor)) {
ompi_convertor_unpack(convertor, &iov, &iov_count, ompi_convertor_unpack(convertor, &iov, &iov_count,
&max_data, &free_after); &max_data, &free_after);
if (max_data > 0 && ompi_convertor_need_buffers(convertor)) {
free(buffer); free(buffer);
} }

Просмотреть файл

@ -39,7 +39,7 @@ mca_mtl_mx_module_t ompi_mtl_mx = {
ompi_mtl_mx_del_procs, ompi_mtl_mx_del_procs,
ompi_mtl_mx_finalize, ompi_mtl_mx_finalize,
NULL, ompi_mtl_mx_send, /* don't use ompi_mtl_mx_send.. */
ompi_mtl_mx_isend, ompi_mtl_mx_isend,
ompi_mtl_mx_irecv, ompi_mtl_mx_irecv,
@ -114,6 +114,11 @@ ompi_mtl_mx_finalize(struct mca_mtl_base_module_t* mtl) {
opal_progress_unregister(ompi_mtl_mx_progress); opal_progress_unregister(ompi_mtl_mx_progress);
/* free resources */ /* free resources */
mx_return = mx_close_endpoint(ompi_mtl_mx.mx_endpoint);
if(mx_return != MX_SUCCESS){
opal_output(0, "Error in mx_close_endpoint (error %s)\n", mx_strerror(mx_return));
return OMPI_ERROR;
}
mx_return = mx_finalize(); mx_return = mx_finalize();
if(mx_return != MX_SUCCESS){ if(mx_return != MX_SUCCESS){
@ -164,7 +169,9 @@ int ompi_mtl_mx_progress( void ) {
mx_status_t mx_status; mx_status_t mx_status;
uint32_t result; uint32_t result;
mca_mtl_mx_request_t* mtl_mx_request; mca_mtl_mx_request_t* mtl_mx_request;
int completed = 0;
while(1){
mx_return = mx_ipeek(ompi_mtl_mx.mx_endpoint, mx_return = mx_ipeek(ompi_mtl_mx.mx_endpoint,
&mx_request, &mx_request,
&result); &result);
@ -173,6 +180,7 @@ int ompi_mtl_mx_progress( void ) {
opal_output(0, "Error in mx_ipeek (error %s)\n", mx_strerror(mx_return)); opal_output(0, "Error in mx_ipeek (error %s)\n", mx_strerror(mx_return));
} }
if(result) { if(result) {
completed++;
mx_return = mx_test(ompi_mtl_mx.mx_endpoint, mx_return = mx_test(ompi_mtl_mx.mx_endpoint,
&mx_request, &mx_request,
&mx_status, &mx_status,
@ -226,6 +234,9 @@ int ompi_mtl_mx_progress( void ) {
mtl_mx_request->super.completion_callback(&mtl_mx_request->super); mtl_mx_request->super.completion_callback(&mtl_mx_request->super);
} }
} else {
return completed;
} }
return 1;
} }
}

Просмотреть файл

@ -49,6 +49,14 @@ extern int ompi_mtl_mx_del_procs(struct mca_mtl_base_module_t* mtl,
struct ompi_proc_t** procs, struct ompi_proc_t** procs,
struct mca_mtl_base_endpoint_t **mtl_peer_data); struct mca_mtl_base_endpoint_t **mtl_peer_data);
int
ompi_mtl_mx_send(struct mca_mtl_base_module_t* mtl,
struct ompi_communicator_t* comm,
int dest,
int tag,
struct ompi_convertor_t *convertor,
mca_pml_base_send_mode_t mode);
extern int ompi_mtl_mx_isend(struct mca_mtl_base_module_t* mtl, extern int ompi_mtl_mx_isend(struct mca_mtl_base_module_t* mtl,
struct ompi_communicator_t* comm, struct ompi_communicator_t* comm,
int dest, int dest,

Просмотреть файл

@ -20,11 +20,108 @@
#include "ompi/datatype/datatype.h" #include "ompi/datatype/datatype.h"
#include "ompi/communicator/communicator.h" #include "ompi/communicator/communicator.h"
#include "ompi/datatype/convertor.h" #include "ompi/datatype/convertor.h"
#include "ompi/mca/mtl/base/mtl_base_datatype.h"
#include "mtl_mx.h" #include "mtl_mx.h"
#include "mtl_mx_request.h" #include "mtl_mx_request.h"
#include "ompi/mca/mtl/base/mtl_base_datatype.h"
int
ompi_mtl_mx_send(struct mca_mtl_base_module_t* mtl,
struct ompi_communicator_t* comm,
int dest,
int tag,
struct ompi_convertor_t *convertor,
mca_pml_base_send_mode_t mode)
{
mx_return_t mx_return;
uint64_t match_bits;
int ret;
mca_mtl_mx_request_t mtl_mx_request;
size_t length;
mx_status_t mx_status;
uint32_t result;
uint32_t timeout = 1000;
mca_mtl_mx_endpoint_t* mx_endpoint =
(mca_mtl_mx_endpoint_t*) comm->c_pml_procs[dest]->proc_ompi->proc_pml;
assert(mtl == &ompi_mtl_mx.super);
MX_SET_SEND_BITS(match_bits, comm->c_contextid, comm->c_my_rank, tag);
ret = ompi_mtl_datatype_pack(convertor,
&mtl_mx_request.mx_segment[0].segment_ptr,
&length,
&mtl_mx_request.free_after);
mtl_mx_request.mx_segment[0].segment_length = length;
mtl_mx_request.convertor = convertor;
mtl_mx_request.type = OMPI_MTL_MX_ISEND;
if (OMPI_SUCCESS != ret) return ret;
if(mode == MCA_PML_BASE_SEND_SYNCHRONOUS) {
#if 0
printf("issend bits: 0x%016llx\n", match_bits);
#endif
mx_return = mx_issend( ompi_mtl_mx.mx_endpoint,
mtl_mx_request.mx_segment,
1,
mx_endpoint->mx_peer_addr,
match_bits,
&mtl_mx_request,
&mtl_mx_request.mx_request
);
if(mx_return != MX_SUCCESS ) {
char peer_name[MX_MAX_HOSTNAME_LEN];
if(MX_SUCCESS != mx_nic_id_to_hostname( mx_endpoint->mx_peer->nic_id, peer_name)) {
sprintf( peer_name, "unknown %lx nic_id", (long)mx_endpoint->mx_peer->nic_id );
}
opal_output(0, "Error in mx_issend (error %s) sending to %s\n", mx_strerror(mx_return), peer_name);
}
} else {
#if 0
printf("isend bits: 0x%016llx\n", match_bits);
#endif
mx_return = mx_isend( ompi_mtl_mx.mx_endpoint,
mtl_mx_request.mx_segment,
1,
mx_endpoint->mx_peer_addr,
match_bits,
&mtl_mx_request,
&mtl_mx_request.mx_request
);
if(mx_return != MX_SUCCESS ) {
char peer_name[MX_MAX_HOSTNAME_LEN];
if(MX_SUCCESS != mx_nic_id_to_hostname( mx_endpoint->mx_peer->nic_id, peer_name)) {
sprintf( peer_name, "unknown %lx nic_id", (long)mx_endpoint->mx_peer->nic_id );
}
opal_output(0, "Error in mx_isend (error %s) sending to %s\n", mx_strerror(mx_return), peer_name);
}
}
do {
mx_return = mx_test(ompi_mtl_mx.mx_endpoint,
&mtl_mx_request.mx_request,
&mx_status,
&result);
if(mx_return != MX_SUCCESS) {
opal_output(0, "Error in mx_wait (error %s)\n", mx_strerror(mx_return));
abort();
}
if(result && mx_status.code != MX_STATUS_SUCCESS) {
opal_output(0, "Error in ompi_mtl_mx_send, mx_wait returned something other than MX_STATUS_SUCCESS: mx_status(%d).\n",
mx_status);
abort();
}
} while(!result);
return OMPI_SUCCESS;
}
int int
ompi_mtl_mx_isend(struct mca_mtl_base_module_t* mtl, ompi_mtl_mx_isend(struct mca_mtl_base_module_t* mtl,