From ce03565c4f0c3e4a6769c9914417a0673a4ec179 Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Tue, 12 Oct 2004 21:50:25 +0000 Subject: [PATCH] - exchange procs arch/byte ordering at startup - added tcp ptl support for heterogenous architectures This commit was SVN r3075. --- src/mca/pml/teg/src/pml_teg.c | 13 ++++ src/mca/pml/teg/src/pml_teg.h | 1 - src/mca/pml/teg/src/pml_teg_component.c | 13 +++- src/mca/ptl/base/ptl_base_header.h | 85 +++++++++++++++++++++++++ src/mca/ptl/tcp/src/ptl_tcp.c | 10 +++ src/mca/ptl/tcp/src/ptl_tcp_peer.c | 1 + src/mca/ptl/tcp/src/ptl_tcp_peer.h | 27 ++++---- src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c | 27 +++++++- src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c | 9 +++ src/proc/proc.c | 2 +- src/proc/proc.h | 2 +- 11 files changed, 169 insertions(+), 21 deletions(-) diff --git a/src/mca/pml/teg/src/pml_teg.c b/src/mca/pml/teg/src/pml_teg.c index 7421a111e7..10d01c3ee9 100644 --- a/src/mca/pml/teg/src/pml_teg.c +++ b/src/mca/pml/teg/src/pml_teg.c @@ -176,6 +176,19 @@ int mca_pml_teg_add_procs(ompi_proc_t** procs, size_t nprocs) if(OMPI_SUCCESS != rc) return rc; + /* iterate through each of the procs and set the peers architecture */ + for(p=0; pproc_arch = ntohl(proc_arch); + } + /* attempt to add all procs to each ptl */ ptl_peers = malloc(nprocs * sizeof(struct mca_ptl_base_peer_t*)); for(p_index = 0; p_index < mca_pml_teg.teg_num_ptl_modules; p_index++) { diff --git a/src/mca/pml/teg/src/pml_teg.h b/src/mca/pml/teg/src/pml_teg.h index 9b471e7293..f2b925901f 100644 --- a/src/mca/pml/teg/src/pml_teg.h +++ b/src/mca/pml/teg/src/pml_teg.h @@ -54,7 +54,6 @@ struct mca_pml_teg_t { long teg_irecvs; long teg_sends; long teg_recvs; - long teg_waits; #endif }; typedef struct mca_pml_teg_t mca_pml_teg_t; diff --git a/src/mca/pml/teg/src/pml_teg_component.c b/src/mca/pml/teg/src/pml_teg_component.c index d18c2445fe..dff565d6be 100644 --- a/src/mca/pml/teg/src/pml_teg_component.c +++ b/src/mca/pml/teg/src/pml_teg_component.c @@ -67,7 +67,6 @@ int mca_pml_teg_component_open(void) OBJ_CONSTRUCT(&mca_pml_teg.teg_procs, ompi_list_t); #if MCA_PML_TEG_STATISTICS - mca_pml_teg.teg_waits = 0; mca_pml_teg.teg_sends = 0; mca_pml_teg.teg_recvs = 0; mca_pml_teg.teg_isends = 0; @@ -89,8 +88,6 @@ int mca_pml_teg_component_open(void) int mca_pml_teg_component_close(void) { #if MCA_PML_TEG_STATISTICS && OMPI_ENABLE_DEBUG - ompi_output(0, "mca_pml_teg.teg_waits = %d\n", - mca_pml_teg.teg_waits); ompi_output(0, "mca_pml_teg.teg_sends = %d\n", mca_pml_teg.teg_sends); ompi_output(0, "mca_pml_teg.teg_recvs = %d\n", @@ -123,6 +120,8 @@ mca_pml_base_module_t* mca_pml_teg_component_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads) { + uint32_t proc_arch; + int rc; *priority = 0; *have_hidden_threads = false; @@ -146,6 +145,14 @@ mca_pml_base_module_t* mca_pml_teg_component_init(int* priority, ompi_output(0, "mca_pml_teg_component_init: mca_pml_bsend_init failed\n"); return NULL; } + + /* post this processes datatype */ + proc_arch = ompi_proc_local()->proc_arch; + proc_arch = htonl(proc_arch); + rc = mca_base_modex_send(&mca_pml_teg_component.pmlm_version, &proc_arch, sizeof(proc_arch)); + if(rc != OMPI_SUCCESS) + return NULL; + *allow_multi_user_threads &= true; return &mca_pml_teg.super; } diff --git a/src/mca/ptl/base/ptl_base_header.h b/src/mca/ptl/base/ptl_base_header.h index a1016605b8..35e4a8289b 100644 --- a/src/mca/ptl/base/ptl_base_header.h +++ b/src/mca/ptl/base/ptl_base_header.h @@ -7,7 +7,10 @@ #ifndef MCA_PTL_BASE_HEADER_H #define MCA_PTL_BASE_HEADER_H +#include "ompi_config.h" #include "mca/ptl/ptl.h" +#include +#include #define MCA_PTL_HDR_TYPE_MATCH 0 @@ -22,6 +25,46 @@ #define MCA_PTL_FLAGS_ACK_AGGREGATE 2 +/* + * Convert a 64 bit value to network byte order. + */ + +static inline uint64_t hton64(uint64_t val) +{ + union { uint64_t ll; + uint32_t l[2]; + } w, r; + + /* platform already in network byte order? */ + if(htonl(1) == 1L) + return val; + w.ll = val; + r.l[0] = htonl(w.l[1]); + r.l[1] = htonl(w.l[0]); + return r.ll; +} + + +/* + * Convert a 64 bit value from network to host byte order. + */ + +static inline uint64_t ntoh64(uint64_t val) +{ + union { uint64_t ll; + uint32_t l[2]; + } w, r; + + /* platform already in network byte order? */ + if(htonl(1) == 1L) + return val; + w.ll = val; + r.l[0] = ntohl(w.l[1]); + r.l[1] = ntohl(w.l[0]); + return r.ll; +} + + /** * Common header attributes - must be first element in each header type */ @@ -32,6 +75,12 @@ struct mca_ptl_base_common_header_t { }; typedef struct mca_ptl_base_common_header_t mca_ptl_base_common_header_t; +#define MCA_PTL_BASE_COMMON_HDR_NTOH(h) \ + (h).hdr_size = ntohs((h).hdr_size) + +#define MCA_PTL_BASE_COMMON_HDR_HTON(h) \ + (h).hdr_size = htons((h).hdr_size) + /** * Basic header for all fragments. @@ -46,6 +95,17 @@ struct mca_ptl_base_frag_header_t { }; typedef struct mca_ptl_base_frag_header_t mca_ptl_base_frag_header_t; +#define MCA_PTL_BASE_FRAG_HDR_NTOH(h) \ + MCA_PTL_BASE_COMMON_HDR_NTOH((h).hdr_common); \ + (h).hdr_frag_length = ntohl((h).hdr_frag_length); \ + (h).hdr_frag_offset = ntohl((h).hdr_frag_offset); \ + (h).hdr_frag_seq = ntoh64((h).hdr_frag_seq) + +#define MCA_PTL_BASE_FRAG_HDR_HTON(h) \ + MCA_PTL_BASE_COMMON_HDR_HTON((h).hdr_common); \ + (h).hdr_frag_length = htonl((h).hdr_frag_length); \ + (h).hdr_frag_offset = htonl((h).hdr_frag_offset); \ + (h).hdr_frag_seq = hton64((h).hdr_frag_seq) /** * Header definition for the first fragment, contains the additional @@ -62,6 +122,24 @@ struct mca_ptl_base_match_header_t { }; typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t; +#define MCA_PTL_BASE_MATCH_HDR_NTOH(h) \ + MCA_PTL_BASE_FRAG_HDR_NTOH((h).hdr_frag); \ + (h).hdr_contextid = ntohl((h).hdr_contextid); \ + (h).hdr_src = ntohl((h).hdr_src); \ + (h).hdr_dst = ntohl((h).hdr_dst); \ + (h).hdr_tag = ntohl((h).hdr_tag); \ + (h).hdr_msg_length = ntohl((h).hdr_msg_length); \ + (h).hdr_msg_seq = ntoh64((h).hdr_msg_seq) + +#define MCA_PTL_BASE_MATCH_HDR_HTON(h) \ + MCA_PTL_BASE_FRAG_HDR_HTON((h).hdr_frag); \ + (h).hdr_contextid = htonl((h).hdr_contextid); \ + (h).hdr_src = htonl((h).hdr_src); \ + (h).hdr_dst = htonl((h).hdr_dst); \ + (h).hdr_tag = htonl((h).hdr_tag); \ + (h).hdr_msg_length = htonl((h).hdr_msg_length); \ + (h).hdr_msg_seq = hton64((h).hdr_msg_seq) + /** * Header used to acknowledgment outstanding fragment(s). @@ -76,6 +154,13 @@ struct mca_ptl_base_ack_header_t { }; typedef struct mca_ptl_base_ack_header_t mca_ptl_base_ack_header_t; +#define MCA_PTL_BASE_ACK_HDR_NTOH(h) \ + MCA_PTL_BASE_COMMON_HDR_NTOH(h.hdr_common); \ + (h).hdr_dst_size = ntohl((h).hdr_dst_size) + +#define MCA_PTL_BASE_ACK_HDR_HTON(h) \ + MCA_PTL_BASE_COMMON_HDR_HTON((h).hdr_common); \ + (h).hdr_dst_size = htonl((h).hdr_dst_size) /** * Union of defined header types. diff --git a/src/mca/ptl/tcp/src/ptl_tcp.c b/src/mca/ptl/tcp/src/ptl_tcp.c index 8986d13934..47f4668e24 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp.c +++ b/src/mca/ptl/tcp/src/ptl_tcp.c @@ -107,6 +107,10 @@ int mca_ptl_tcp_add_procs( OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock); return rc; } + /* do we need to convert to/from network byte order */ + if(ompi_proc->proc_arch != proc_self->proc_arch) + ptl_peer->peer_byte_swap = true; + ompi_bitmap_set_bit(reachable, i); OMPI_THREAD_UNLOCK(&ptl_proc->proc_lock); peers[i] = ptl_peer; @@ -193,6 +197,9 @@ void mca_ptl_tcp_send_frag_return(struct mca_ptl_base_module_t* ptl, struct mca_ } OMPI_THREAD_UNLOCK(&mca_ptl_tcp_component.tcp_lock); mca_ptl_tcp_send_frag_init_ack(frag, ptl, pending->frag_recv.frag_base.frag_peer, pending); + if(frag->frag_send.frag_base.frag_peer->peer_byte_swap) { + MCA_PTL_BASE_ACK_HDR_HTON(frag->frag_send.frag_base.frag_header.hdr_ack); + } mca_ptl_tcp_peer_send(pending->frag_recv.frag_base.frag_peer, frag, 0); mca_ptl_tcp_recv_frag_return(ptl, pending); } else { @@ -262,6 +269,9 @@ void mca_ptl_tcp_matched( OMPI_THREAD_UNLOCK(&mca_ptl_tcp_component.tcp_lock); } else { mca_ptl_tcp_send_frag_init_ack(ack, ptl, recv_frag->frag_recv.frag_base.frag_peer, recv_frag); + if(ack->frag_send.frag_base.frag_peer->peer_byte_swap) { + MCA_PTL_BASE_ACK_HDR_HTON(ack->frag_send.frag_base.frag_header.hdr_ack); + } mca_ptl_tcp_peer_send(ack->frag_send.frag_base.frag_peer, ack, 0); } } diff --git a/src/mca/ptl/tcp/src/ptl_tcp_peer.c b/src/mca/ptl/tcp/src/ptl_tcp_peer.c index a3c0b228b2..12906f3e1a 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_peer.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_peer.c @@ -54,6 +54,7 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer) ptl_peer->peer_recv_event.ev_flags = 0; ptl_peer->peer_state = MCA_PTL_TCP_CLOSED; ptl_peer->peer_retries = 0; + ptl_peer->peer_byte_swap = false; OBJ_CONSTRUCT(&ptl_peer->peer_frags, ompi_list_t); OBJ_CONSTRUCT(&ptl_peer->peer_send_lock, ompi_mutex_t); OBJ_CONSTRUCT(&ptl_peer->peer_recv_lock, ompi_mutex_t); diff --git a/src/mca/ptl/tcp/src/ptl_tcp_peer.h b/src/mca/ptl/tcp/src/ptl_tcp_peer.h index 3b83668817..f6ae0e3f31 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_peer.h +++ b/src/mca/ptl/tcp/src/ptl_tcp_peer.h @@ -39,20 +39,21 @@ typedef enum { */ struct mca_ptl_base_peer_t { - ompi_list_item_t super; + ompi_list_item_t super; struct mca_ptl_tcp_module_t* peer_ptl; /**< PTL instance that created this connection */ - struct mca_ptl_tcp_proc_t* peer_proc; /**< proc structure corresponding to peer */ - struct mca_ptl_tcp_addr_t* peer_addr; /**< address of peer */ - int peer_sd; /**< socket connection to peer */ - mca_ptl_tcp_send_frag_t* peer_send_frag; /**< current send frag being processed */ - mca_ptl_tcp_recv_frag_t* peer_recv_frag; /**< current recv frag being processed */ - mca_ptl_tcp_state_t peer_state; /**< current state of the connection */ - size_t peer_retries; /**< number of connection retries attempted */ - ompi_list_t peer_frags; /**< list of pending frags to send */ - ompi_mutex_t peer_send_lock; /**< lock for concurrent access to peer state */ - ompi_mutex_t peer_recv_lock; /**< lock for concurrent access to peer state */ - ompi_event_t peer_send_event; /**< event for async processing of send frags */ - ompi_event_t peer_recv_event; /**< event for async processing of recv frags */ + struct mca_ptl_tcp_proc_t* peer_proc; /**< proc structure corresponding to peer */ + struct mca_ptl_tcp_addr_t* peer_addr; /**< address of peer */ + int peer_sd; /**< socket connection to peer */ + mca_ptl_tcp_send_frag_t* peer_send_frag; /**< current send frag being processed */ + mca_ptl_tcp_recv_frag_t* peer_recv_frag; /**< current recv frag being processed */ + mca_ptl_tcp_state_t peer_state; /**< current state of the connection */ + size_t peer_retries; /**< number of connection retries attempted */ + ompi_list_t peer_frags; /**< list of pending frags to send */ + ompi_mutex_t peer_send_lock; /**< lock for concurrent access to peer state */ + ompi_mutex_t peer_recv_lock; /**< lock for concurrent access to peer state */ + ompi_event_t peer_send_event; /**< event for async processing of send frags */ + ompi_event_t peer_recv_event; /**< event for async processing of recv frags */ + bool peer_byte_swap; /**< is peer a different byte ordering? */ }; typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t; diff --git a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c index d76528bd11..ed1c4502ff 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_recvfrag.c @@ -74,10 +74,33 @@ void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) { /* read common header */ - if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_header_t)) + if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_header_t)) { + mca_ptl_tcp_peer_t* ptl_peer; if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_header_t)) == false) return false; + /* convert this to host byte order if required */ + if(frag->frag_recv.frag_base.frag_peer->peer_byte_swap) { + /* note this field is only a byte - so doesn't matter what the byte ordering is */ + switch(frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type) { + case MCA_PTL_HDR_TYPE_MATCH: + MCA_PTL_BASE_MATCH_HDR_NTOH(frag->frag_recv.frag_base.frag_header.hdr_match); + break; + case MCA_PTL_HDR_TYPE_FRAG: + MCA_PTL_BASE_FRAG_HDR_NTOH(frag->frag_recv.frag_base.frag_header.hdr_frag); + break; + case MCA_PTL_HDR_TYPE_ACK: + case MCA_PTL_HDR_TYPE_NACK: + MCA_PTL_BASE_FRAG_HDR_NTOH(frag->frag_recv.frag_base.frag_header.hdr_frag); + break; + default: + ompi_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X", + *(unsigned long*)&frag->frag_recv.frag_base.frag_header); + return true; + } + } + } + switch(frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type) { case MCA_PTL_HDR_TYPE_MATCH: return mca_ptl_tcp_recv_frag_match(frag, sd); @@ -89,7 +112,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd) default: ompi_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X", *(unsigned long*)&frag->frag_recv.frag_base.frag_header); - return false; + return true; } } diff --git a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c index d1578c6021..c858f9e732 100644 --- a/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c +++ b/src/mca/ptl/tcp/src/ptl_tcp_sendfrag.c @@ -128,6 +128,15 @@ int mca_ptl_tcp_send_frag_init( } hdr->hdr_frag.hdr_frag_length = size_out; + /* convert to network byte order if required */ + if(ptl_peer->peer_byte_swap) { + if(offset == 0) { + MCA_PTL_BASE_MATCH_HDR_HTON(hdr->hdr_match); + } else { + MCA_PTL_BASE_FRAG_HDR_HTON(hdr->hdr_frag); + } + } + /* fragment state */ sendfrag->frag_owner = &ptl_peer->peer_ptl->super; sendfrag->frag_send.frag_request = sendreq; diff --git a/src/proc/proc.c b/src/proc/proc.c index 187a131ffd..c1d1c8d514 100644 --- a/src/proc/proc.c +++ b/src/proc/proc.c @@ -43,11 +43,11 @@ void ompi_proc_construct(ompi_proc_t* proc) proc->proc_pml = NULL; proc->proc_modex = NULL; - proc->proc_arch = 0; OBJ_CONSTRUCT(&proc->proc_lock, ompi_mutex_t); /* FIX - need to determine remote process architecture */ proc->proc_convertor = ompi_convertor_create(0, 0); + proc->proc_arch = 0; OMPI_THREAD_LOCK(&ompi_proc_lock); ompi_list_append(&ompi_proc_list, (ompi_list_item_t*)proc); diff --git a/src/proc/proc.h b/src/proc/proc.h index dee0a8efb8..bc6a4e8527 100644 --- a/src/proc/proc.h +++ b/src/proc/proc.h @@ -20,7 +20,7 @@ struct ompi_proc_t { ompi_process_name_t proc_name; struct mca_pml_proc_t* proc_pml; /* PML specific proc data */ struct mca_base_modex_t* proc_modex; /* MCA module exchange data */ - int proc_arch; + uint32_t proc_arch; ompi_convertor_t* proc_convertor; ompi_mutex_t proc_lock;