Start to cleanup the header dependencies
Allow the TCP driver to release the memory allocated by the convertor. For that I add one integer (keeping the state of the actual allocation for each send_fragment) and a saved iovec (just to be able to remember the correct starting point for each iovec). This commit was SVN r3122.
Этот коммит содержится в:
родитель
da1c28b966
Коммит
93aa0d87bf
@ -35,6 +35,7 @@ ompi_class_t mca_ptl_tcp_send_frag_t_class = {
|
|||||||
|
|
||||||
static void mca_ptl_tcp_send_frag_construct(mca_ptl_tcp_send_frag_t* frag)
|
static void mca_ptl_tcp_send_frag_construct(mca_ptl_tcp_send_frag_t* frag)
|
||||||
{
|
{
|
||||||
|
frag->free_after = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -64,7 +65,6 @@ int mca_ptl_tcp_send_frag_init(
|
|||||||
size_t size_in = *size;
|
size_t size_in = *size;
|
||||||
size_t size_out;
|
size_t size_out;
|
||||||
unsigned int iov_count, max_data;
|
unsigned int iov_count, max_data;
|
||||||
int freeAfter = 0;
|
|
||||||
|
|
||||||
mca_ptl_base_header_t* hdr = &sendfrag->frag_header;
|
mca_ptl_base_header_t* hdr = &sendfrag->frag_header;
|
||||||
if(offset == 0) {
|
if(offset == 0) {
|
||||||
@ -93,6 +93,7 @@ int mca_ptl_tcp_send_frag_init(
|
|||||||
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
|
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sendfrag->free_after = 0;
|
||||||
/* initialize convertor */
|
/* initialize convertor */
|
||||||
if(size_in > 0) {
|
if(size_in > 0) {
|
||||||
ompi_convertor_t *convertor;
|
ompi_convertor_t *convertor;
|
||||||
@ -117,9 +118,12 @@ int mca_ptl_tcp_send_frag_init(
|
|||||||
sendfrag->frag_vec[1].iov_len = size_in;
|
sendfrag->frag_vec[1].iov_len = size_in;
|
||||||
iov_count = 1;
|
iov_count = 1;
|
||||||
max_data = size_in;
|
max_data = size_in;
|
||||||
if((rc = ompi_convertor_pack(convertor, &sendfrag->frag_vec[1],
|
if((rc = ompi_convertor_pack( convertor, &sendfrag->frag_vec[1],
|
||||||
&iov_count, &max_data, &freeAfter)) < 0)
|
&iov_count, &max_data, &(sendfrag->free_after) )) < 0) {
|
||||||
return OMPI_ERROR;
|
return OMPI_ERROR;
|
||||||
|
}
|
||||||
|
/* adjust the freeAfter as the position zero is reserved for the header */
|
||||||
|
sendfrag->free_after <<= 1;
|
||||||
|
|
||||||
/* adjust size and request offset to reflect actual number of bytes packed by convertor */
|
/* adjust size and request offset to reflect actual number of bytes packed by convertor */
|
||||||
size_out = sendfrag->frag_vec[1].iov_len;
|
size_out = sendfrag->frag_vec[1].iov_len;
|
||||||
@ -198,8 +202,13 @@ bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t* frag, int sd)
|
|||||||
for(i=0; i<num_vecs; i++) {
|
for(i=0; i<num_vecs; i++) {
|
||||||
if(cnt >= (int)frag->frag_vec_ptr->iov_len) {
|
if(cnt >= (int)frag->frag_vec_ptr->iov_len) {
|
||||||
cnt -= frag->frag_vec_ptr->iov_len;
|
cnt -= frag->frag_vec_ptr->iov_len;
|
||||||
|
if( frag->free_after & 1 ) {
|
||||||
|
free( frag->frag_saved_vec.iov_base );
|
||||||
|
}
|
||||||
frag->frag_vec_ptr++;
|
frag->frag_vec_ptr++;
|
||||||
frag->frag_vec_cnt--;
|
frag->frag_vec_cnt--;
|
||||||
|
frag->frag_saved_vec = *frag->frag_vec_ptr;
|
||||||
|
frag->free_after >>= 1;
|
||||||
} else {
|
} else {
|
||||||
frag->frag_vec_ptr->iov_base = (ompi_iov_base_ptr_t)
|
frag->frag_vec_ptr->iov_base = (ompi_iov_base_ptr_t)
|
||||||
(((unsigned char*)frag->frag_vec_ptr->iov_base) + cnt);
|
(((unsigned char*)frag->frag_vec_ptr->iov_base) + cnt);
|
||||||
|
@ -10,8 +10,8 @@
|
|||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include "include/sys/atomic.h"
|
|
||||||
#include "ompi_config.h"
|
#include "ompi_config.h"
|
||||||
|
#include "include/sys/atomic.h"
|
||||||
#include "mca/pml/base/pml_base_sendreq.h"
|
#include "mca/pml/base/pml_base_sendreq.h"
|
||||||
#include "mca/ptl/base/ptl_base_sendfrag.h"
|
#include "mca/ptl/base/ptl_base_sendfrag.h"
|
||||||
#include "ptl_tcp.h"
|
#include "ptl_tcp.h"
|
||||||
@ -26,9 +26,11 @@ struct mca_ptl_base_peer_t;
|
|||||||
*/
|
*/
|
||||||
struct mca_ptl_tcp_send_frag_t {
|
struct mca_ptl_tcp_send_frag_t {
|
||||||
mca_ptl_base_send_frag_t frag_send; /**< base send fragment descriptor */
|
mca_ptl_base_send_frag_t frag_send; /**< base send fragment descriptor */
|
||||||
|
int32_t free_after; /**< keep trace of which vectors we have to free */
|
||||||
struct iovec *frag_vec_ptr; /**< pointer into iovec array */
|
struct iovec *frag_vec_ptr; /**< pointer into iovec array */
|
||||||
size_t frag_vec_cnt; /**< number of iovec structs left to process */
|
size_t frag_vec_cnt; /**< number of iovec structs left to process */
|
||||||
struct iovec frag_vec[2]; /**< array of iovecs for send */
|
struct iovec frag_vec[2]; /**< array of iovecs for send */
|
||||||
|
struct iovec frag_saved_vec; /**< save the initial values from the current iovec */
|
||||||
volatile int frag_progressed; /**< for threaded case - has request status been updated */
|
volatile int frag_progressed; /**< for threaded case - has request status been updated */
|
||||||
};
|
};
|
||||||
typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t;
|
typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t;
|
||||||
@ -130,6 +132,7 @@ static inline void mca_ptl_tcp_send_frag_init_ack(
|
|||||||
ack->frag_vec[0].iov_base = (ompi_iov_base_ptr_t)hdr;
|
ack->frag_vec[0].iov_base = (ompi_iov_base_ptr_t)hdr;
|
||||||
ack->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);
|
ack->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);
|
||||||
ack->frag_vec_cnt = 1;
|
ack->frag_vec_cnt = 1;
|
||||||
|
ack->free_after = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Загрузка…
Ссылка в новой задаче
Block a user