1
1

- exchange procs arch/byte ordering at startup

- added tcp ptl support for heterogenous architectures

This commit was SVN r3075.
Этот коммит содержится в:
Tim Woodall 2004-10-12 21:50:25 +00:00
родитель 3c352a3ca4
Коммит ce03565c4f
11 изменённых файлов: 169 добавлений и 21 удалений

Просмотреть файл

@ -176,6 +176,19 @@ int mca_pml_teg_add_procs(ompi_proc_t** procs, size_t nprocs)
if(OMPI_SUCCESS != rc)
return rc;
/* iterate through each of the procs and set the peers architecture */
for(p=0; p<nprocs; p++) {
uint32_t proc_arch;
size_t size = sizeof(proc_arch);
rc = mca_base_modex_recv(&mca_pml_teg_component.pmlm_version, procs[p],
(void**)&proc_arch, &size);
if(rc != OMPI_SUCCESS)
return rc;
if(size != sizeof(proc_arch))
return OMPI_ERROR;
procs[p]->proc_arch = ntohl(proc_arch);
}
/* attempt to add all procs to each ptl */
ptl_peers = malloc(nprocs * sizeof(struct mca_ptl_base_peer_t*));
for(p_index = 0; p_index < mca_pml_teg.teg_num_ptl_modules; p_index++) {

Просмотреть файл

@ -54,7 +54,6 @@ struct mca_pml_teg_t {
long teg_irecvs;
long teg_sends;
long teg_recvs;
long teg_waits;
#endif
};
typedef struct mca_pml_teg_t mca_pml_teg_t;

Просмотреть файл

@ -67,7 +67,6 @@ int mca_pml_teg_component_open(void)
OBJ_CONSTRUCT(&mca_pml_teg.teg_procs, ompi_list_t);
#if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_waits = 0;
mca_pml_teg.teg_sends = 0;
mca_pml_teg.teg_recvs = 0;
mca_pml_teg.teg_isends = 0;
@ -89,8 +88,6 @@ int mca_pml_teg_component_open(void)
int mca_pml_teg_component_close(void)
{
#if MCA_PML_TEG_STATISTICS && OMPI_ENABLE_DEBUG
ompi_output(0, "mca_pml_teg.teg_waits = %d\n",
mca_pml_teg.teg_waits);
ompi_output(0, "mca_pml_teg.teg_sends = %d\n",
mca_pml_teg.teg_sends);
ompi_output(0, "mca_pml_teg.teg_recvs = %d\n",
@ -123,6 +120,8 @@ mca_pml_base_module_t* mca_pml_teg_component_init(int* priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads)
{
uint32_t proc_arch;
int rc;
*priority = 0;
*have_hidden_threads = false;
@ -146,6 +145,14 @@ mca_pml_base_module_t* mca_pml_teg_component_init(int* priority,
ompi_output(0, "mca_pml_teg_component_init: mca_pml_bsend_init failed\n");
return NULL;
}
/* post this processes datatype */
proc_arch = ompi_proc_local()->proc_arch;
proc_arch = htonl(proc_arch);
rc = mca_base_modex_send(&mca_pml_teg_component.pmlm_version, &proc_arch, sizeof(proc_arch));
if(rc != OMPI_SUCCESS)
return NULL;
*allow_multi_user_threads &= true;
return &mca_pml_teg.super;
}

Просмотреть файл

@ -7,7 +7,10 @@
#ifndef MCA_PTL_BASE_HEADER_H
#define MCA_PTL_BASE_HEADER_H
#include "ompi_config.h"
#include "mca/ptl/ptl.h"
#include <sys/types.h>
#include <netinet/in.h>
#define MCA_PTL_HDR_TYPE_MATCH 0
@ -22,6 +25,46 @@
#define MCA_PTL_FLAGS_ACK_AGGREGATE 2
/*
* Convert a 64 bit value to network byte order.
*/
static inline uint64_t hton64(uint64_t val)
{
union { uint64_t ll;
uint32_t l[2];
} w, r;
/* platform already in network byte order? */
if(htonl(1) == 1L)
return val;
w.ll = val;
r.l[0] = htonl(w.l[1]);
r.l[1] = htonl(w.l[0]);
return r.ll;
}
/*
* Convert a 64 bit value from network to host byte order.
*/
static inline uint64_t ntoh64(uint64_t val)
{
union { uint64_t ll;
uint32_t l[2];
} w, r;
/* platform already in network byte order? */
if(htonl(1) == 1L)
return val;
w.ll = val;
r.l[0] = ntohl(w.l[1]);
r.l[1] = ntohl(w.l[0]);
return r.ll;
}
/**
* Common header attributes - must be first element in each header type
*/
@ -32,6 +75,12 @@ struct mca_ptl_base_common_header_t {
};
typedef struct mca_ptl_base_common_header_t mca_ptl_base_common_header_t;
#define MCA_PTL_BASE_COMMON_HDR_NTOH(h) \
(h).hdr_size = ntohs((h).hdr_size)
#define MCA_PTL_BASE_COMMON_HDR_HTON(h) \
(h).hdr_size = htons((h).hdr_size)
/**
* Basic header for all fragments.
@ -46,6 +95,17 @@ struct mca_ptl_base_frag_header_t {
};
typedef struct mca_ptl_base_frag_header_t mca_ptl_base_frag_header_t;
#define MCA_PTL_BASE_FRAG_HDR_NTOH(h) \
MCA_PTL_BASE_COMMON_HDR_NTOH((h).hdr_common); \
(h).hdr_frag_length = ntohl((h).hdr_frag_length); \
(h).hdr_frag_offset = ntohl((h).hdr_frag_offset); \
(h).hdr_frag_seq = ntoh64((h).hdr_frag_seq)
#define MCA_PTL_BASE_FRAG_HDR_HTON(h) \
MCA_PTL_BASE_COMMON_HDR_HTON((h).hdr_common); \
(h).hdr_frag_length = htonl((h).hdr_frag_length); \
(h).hdr_frag_offset = htonl((h).hdr_frag_offset); \
(h).hdr_frag_seq = hton64((h).hdr_frag_seq)
/**
* Header definition for the first fragment, contains the additional
@ -62,6 +122,24 @@ struct mca_ptl_base_match_header_t {
};
typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t;
#define MCA_PTL_BASE_MATCH_HDR_NTOH(h) \
MCA_PTL_BASE_FRAG_HDR_NTOH((h).hdr_frag); \
(h).hdr_contextid = ntohl((h).hdr_contextid); \
(h).hdr_src = ntohl((h).hdr_src); \
(h).hdr_dst = ntohl((h).hdr_dst); \
(h).hdr_tag = ntohl((h).hdr_tag); \
(h).hdr_msg_length = ntohl((h).hdr_msg_length); \
(h).hdr_msg_seq = ntoh64((h).hdr_msg_seq)
#define MCA_PTL_BASE_MATCH_HDR_HTON(h) \
MCA_PTL_BASE_FRAG_HDR_HTON((h).hdr_frag); \
(h).hdr_contextid = htonl((h).hdr_contextid); \
(h).hdr_src = htonl((h).hdr_src); \
(h).hdr_dst = htonl((h).hdr_dst); \
(h).hdr_tag = htonl((h).hdr_tag); \
(h).hdr_msg_length = htonl((h).hdr_msg_length); \
(h).hdr_msg_seq = hton64((h).hdr_msg_seq)
/**
* Header used to acknowledgment outstanding fragment(s).
@ -76,6 +154,13 @@ struct mca_ptl_base_ack_header_t {
};
typedef struct mca_ptl_base_ack_header_t mca_ptl_base_ack_header_t;
#define MCA_PTL_BASE_ACK_HDR_NTOH(h) \
MCA_PTL_BASE_COMMON_HDR_NTOH(h.hdr_common); \
(h).hdr_dst_size = ntohl((h).hdr_dst_size)
#define MCA_PTL_BASE_ACK_HDR_HTON(h) \
MCA_PTL_BASE_COMMON_HDR_HTON((h).hdr_common); \
(h).hdr_dst_size = htonl((h).hdr_dst_size)
/**
* Union of defined header types.

Просмотреть файл

@ -107,6 +107,10 @@ int mca_ptl_tcp_add_procs(
OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock);
return rc;
}
/* do we need to convert to/from network byte order */
if(ompi_proc->proc_arch != proc_self->proc_arch)
ptl_peer->peer_byte_swap = true;
ompi_bitmap_set_bit(reachable, i);
OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock);
peers[i] = ptl_peer;
@ -193,6 +197,9 @@ void mca_ptl_tcp_send_frag_return(struct mca_ptl_base_module_t* ptl, struct mca_
}
OMPI_THREAD_UNLOCK(&mca_ptl_tcp_component.tcp_lock);
mca_ptl_tcp_send_frag_init_ack(frag, ptl, pending->frag_recv.frag_base.frag_peer, pending);
if(frag->frag_send.frag_base.frag_peer->peer_byte_swap) {
MCA_PTL_BASE_ACK_HDR_HTON(frag->frag_send.frag_base.frag_header.hdr_ack);
}
mca_ptl_tcp_peer_send(pending->frag_recv.frag_base.frag_peer, frag, 0);
mca_ptl_tcp_recv_frag_return(ptl, pending);
} else {
@ -262,6 +269,9 @@ void mca_ptl_tcp_matched(
OMPI_THREAD_UNLOCK(&mca_ptl_tcp_component.tcp_lock);
} else {
mca_ptl_tcp_send_frag_init_ack(ack, ptl, recv_frag->frag_recv.frag_base.frag_peer, recv_frag);
if(ack->frag_send.frag_base.frag_peer->peer_byte_swap) {
MCA_PTL_BASE_ACK_HDR_HTON(ack->frag_send.frag_base.frag_header.hdr_ack);
}
mca_ptl_tcp_peer_send(ack->frag_send.frag_base.frag_peer, ack, 0);
}
}

Просмотреть файл

@ -54,6 +54,7 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer)
ptl_peer->peer_recv_event.ev_flags = 0;
ptl_peer->peer_state = MCA_PTL_TCP_CLOSED;
ptl_peer->peer_retries = 0;
ptl_peer->peer_byte_swap = false;
OBJ_CONSTRUCT(&ptl_peer->peer_frags, ompi_list_t);
OBJ_CONSTRUCT(&ptl_peer->peer_send_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&ptl_peer->peer_recv_lock, ompi_mutex_t);

Просмотреть файл

@ -39,20 +39,21 @@ typedef enum {
*/
struct mca_ptl_base_peer_t {
ompi_list_item_t super;
ompi_list_item_t super;
struct mca_ptl_tcp_module_t* peer_ptl; /**< PTL instance that created this connection */
struct mca_ptl_tcp_proc_t* peer_proc; /**< proc structure corresponding to peer */
struct mca_ptl_tcp_addr_t* peer_addr; /**< address of peer */
int peer_sd; /**< socket connection to peer */
mca_ptl_tcp_send_frag_t* peer_send_frag; /**< current send frag being processed */
mca_ptl_tcp_recv_frag_t* peer_recv_frag; /**< current recv frag being processed */
mca_ptl_tcp_state_t peer_state; /**< current state of the connection */
size_t peer_retries; /**< number of connection retries attempted */
ompi_list_t peer_frags; /**< list of pending frags to send */
ompi_mutex_t peer_send_lock; /**< lock for concurrent access to peer state */
ompi_mutex_t peer_recv_lock; /**< lock for concurrent access to peer state */
ompi_event_t peer_send_event; /**< event for async processing of send frags */
ompi_event_t peer_recv_event; /**< event for async processing of recv frags */
struct mca_ptl_tcp_proc_t* peer_proc; /**< proc structure corresponding to peer */
struct mca_ptl_tcp_addr_t* peer_addr; /**< address of peer */
int peer_sd; /**< socket connection to peer */
mca_ptl_tcp_send_frag_t* peer_send_frag; /**< current send frag being processed */
mca_ptl_tcp_recv_frag_t* peer_recv_frag; /**< current recv frag being processed */
mca_ptl_tcp_state_t peer_state; /**< current state of the connection */
size_t peer_retries; /**< number of connection retries attempted */
ompi_list_t peer_frags; /**< list of pending frags to send */
ompi_mutex_t peer_send_lock; /**< lock for concurrent access to peer state */
ompi_mutex_t peer_recv_lock; /**< lock for concurrent access to peer state */
ompi_event_t peer_send_event; /**< event for async processing of send frags */
ompi_event_t peer_recv_event; /**< event for async processing of recv frags */
bool peer_byte_swap; /**< is peer a different byte ordering? */
};
typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t;

Просмотреть файл

@ -74,10 +74,33 @@ void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
/* read common header */
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_header_t))
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_header_t)) {
mca_ptl_tcp_peer_t* ptl_peer;
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_header_t)) == false)
return false;
/* convert this to host byte order if required */
if(frag->frag_recv.frag_base.frag_peer->peer_byte_swap) {
/* note this field is only a byte - so doesn't matter what the byte ordering is */
switch(frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
MCA_PTL_BASE_MATCH_HDR_NTOH(frag->frag_recv.frag_base.frag_header.hdr_match);
break;
case MCA_PTL_HDR_TYPE_FRAG:
MCA_PTL_BASE_FRAG_HDR_NTOH(frag->frag_recv.frag_base.frag_header.hdr_frag);
break;
case MCA_PTL_HDR_TYPE_ACK:
case MCA_PTL_HDR_TYPE_NACK:
MCA_PTL_BASE_FRAG_HDR_NTOH(frag->frag_recv.frag_base.frag_header.hdr_frag);
break;
default:
ompi_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X",
*(unsigned long*)&frag->frag_recv.frag_base.frag_header);
return true;
}
}
}
switch(frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
return mca_ptl_tcp_recv_frag_match(frag, sd);
@ -89,7 +112,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
default:
ompi_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X",
*(unsigned long*)&frag->frag_recv.frag_base.frag_header);
return false;
return true;
}
}

Просмотреть файл

@ -128,6 +128,15 @@ int mca_ptl_tcp_send_frag_init(
}
hdr->hdr_frag.hdr_frag_length = size_out;
/* convert to network byte order if required */
if(ptl_peer->peer_byte_swap) {
if(offset == 0) {
MCA_PTL_BASE_MATCH_HDR_HTON(hdr->hdr_match);
} else {
MCA_PTL_BASE_FRAG_HDR_HTON(hdr->hdr_frag);
}
}
/* fragment state */
sendfrag->frag_owner = &ptl_peer->peer_ptl->super;
sendfrag->frag_send.frag_request = sendreq;

Просмотреть файл

@ -43,11 +43,11 @@ void ompi_proc_construct(ompi_proc_t* proc)
proc->proc_pml = NULL;
proc->proc_modex = NULL;
proc->proc_arch = 0;
OBJ_CONSTRUCT(&proc->proc_lock, ompi_mutex_t);
/* FIX - need to determine remote process architecture */
proc->proc_convertor = ompi_convertor_create(0, 0);
proc->proc_arch = 0;
OMPI_THREAD_LOCK(&ompi_proc_lock);
ompi_list_append(&ompi_proc_list, (ompi_list_item_t*)proc);

Просмотреть файл

@ -20,7 +20,7 @@ struct ompi_proc_t {
ompi_process_name_t proc_name;
struct mca_pml_proc_t* proc_pml; /* PML specific proc data */
struct mca_base_modex_t* proc_modex; /* MCA module exchange data */
int proc_arch;
uint32_t proc_arch;
ompi_convertor_t* proc_convertor;
ompi_mutex_t proc_lock;