/* -*- Mode: C; c-basic-offset:4 ; -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University. * All rights reserved. * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. * All rights reserved. * Copyright (c) 2004 The Ohio State University. * All rights reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "ompi_config.h" #include #include "ompi/class/ompi_bitmap.h" #include "opal/util/output.h" #include "orte/util/proc_info.h" #include "orte/mca/ns/ns.h" #include "ompi/mca/ptl/ptl.h" #include "ompi/mca/ptl/base/ptl_base_header.h" #include "ptl_gm.h" #include "ptl_gm_proc.h" #include "ptl_gm_priv.h" #include "ptl_gm_peer.h" #include "ptl_gm_sendfrag.h" #include "ompi/proc/proc.h" mca_ptl_gm_module_t mca_ptl_gm_module = { { &mca_ptl_gm_component.super, 1, /* max size of request cache */ sizeof(mca_ptl_gm_send_frag_t), /* bytes required by ptl for a request */ 0, /* max size of first fragment */ 0, /* min fragment size */ 0, /* max fragment size */ 0, /* exclusivity */ 50, /* latency */ 0, /* bandwidth */ MCA_PTL_PUT, /* ptl flags */ /* collection of interfaces */ mca_ptl_gm_add_procs, mca_ptl_gm_del_procs, mca_ptl_gm_finalize, mca_ptl_gm_peer_send, mca_ptl_gm_put, mca_ptl_gm_get, mca_ptl_gm_matched, mca_ptl_gm_request_init, mca_ptl_gm_request_fini, NULL, NULL, NULL } }; OBJ_CLASS_INSTANCE (mca_ptl_gm_send_request_t, mca_ptl_base_send_request_t, NULL, NULL); OBJ_CLASS_INSTANCE (mca_ptl_gm_peer_t, opal_list_item_t, NULL, NULL); int mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl, size_t nprocs, struct ompi_proc_t **orte_procs, struct mca_ptl_base_peer_t **peers, ompi_bitmap_t * reachable) { uint32_t i, j, num_peer_ptls = 1; struct ompi_proc_t *orte_proc; mca_ptl_gm_proc_t *ptl_proc; mca_ptl_gm_peer_t *ptl_peer; ompi_proc_t* local_proc = ompi_proc_local(); for (i = 0; i < nprocs; i++) { orte_proc = orte_procs[i]; if( orte_proc == local_proc ) continue; ptl_proc = mca_ptl_gm_proc_create ((mca_ptl_gm_module_t *) ptl, orte_proc); if (NULL == ptl_proc) { opal_output( 0, "[%s:%d] cannot allocate memory for the GM module", __FILE__, __LINE__ ); continue; } OPAL_THREAD_LOCK (&ptl_proc->proc_lock); if (ptl_proc->proc_addr_count == ptl_proc->proc_peer_count) { OPAL_THREAD_UNLOCK (&ptl_proc->proc_lock); opal_output( 0, "[%s:%d] modex exchange failed for GM module", __FILE__, __LINE__ ); continue; } ptl_peer = NULL; /* force it to NULL before looping through the ptls */ /* TODO: make this extensible to multiple nics */ for( j = 0; j < num_peer_ptls; j++ ) { ptl_peer = OBJ_NEW (mca_ptl_gm_peer_t); if (NULL == ptl_peer) { OPAL_THREAD_UNLOCK (&ptl_proc->proc_lock); opal_output( 0, "[%s:%d] cannot allocate memory for one of the GM ptl", __FILE__, __LINE__ ); continue; } ptl_peer->peer_ptl = (mca_ptl_gm_module_t *) ptl; ptl_peer->peer_proc = ptl_proc; ptl_peer->peer_addr.port_id = ptl_proc->proc_addrs->port_id; #if GM_API_VERSION > 0x200 ptl_peer->peer_addr.global_id = ptl_proc->proc_addrs->global_id; if (GM_SUCCESS != gm_global_id_to_node_id(((mca_ptl_gm_module_t *) ptl)->gm_port, ptl_proc->proc_addrs[j].global_id, &(ptl_peer->peer_addr.local_id))) { opal_output( 0, "[%s:%d] error in converting global to local id \n", __FILE__, __LINE__ ); OBJ_RELEASE( ptl_peer ); assert( NULL == ptl_peer ); continue; } #else strncpy( ptl_peer->peer_addr.global_id, ptl_proc->proc_addrs->global_id, GM_MAX_HOST_NAME_LEN ); ptl_peer->peer_addr.local_id = gm_host_name_to_node_id( ((mca_ptl_gm_module_t *) ptl)->gm_port, ptl_proc->proc_addrs[j].global_id ); if( GM_NO_SUCH_NODE_ID == ptl_peer->peer_addr.local_id ) { opal_output( 0, "Unable to convert the remote host name (%s) to a host id", ptl_proc->proc_addrs[j].global_id ); OBJ_RELEASE( ptl_peer ); assert( NULL == ptl_peer ); continue; } #endif /* GM_API_VERSION > 0x200 */ ptl_proc->peer_arr[ptl_proc->proc_peer_count] = ptl_peer; ptl_proc->proc_peer_count++; ompi_bitmap_set_bit (reachable, i); /* set the bit again and again */ } OPAL_THREAD_UNLOCK (&ptl_proc->proc_lock); peers[i] = (struct mca_ptl_base_peer_t*)ptl_peer; } return OMPI_SUCCESS; } /* * */ int mca_ptl_gm_del_procs (struct mca_ptl_base_module_t *ptl, size_t nprocs, struct ompi_proc_t **procs, struct mca_ptl_base_peer_t **peers) { size_t i; for (i = 0; i < nprocs; i++) { OBJ_RELEASE (peers[i]); } return OMPI_SUCCESS; } /* * */ int mca_ptl_gm_finalize (struct mca_ptl_base_module_t *base_ptl) { uint32_t index; mca_ptl_gm_module_t* ptl = (mca_ptl_gm_module_t*)base_ptl; for( index = 0; index < mca_ptl_gm_component.gm_num_ptl_modules; index++ ) { if( mca_ptl_gm_component.gm_ptl_modules[index] == ptl ) { mca_ptl_gm_component.gm_ptl_modules[index] = NULL; break; } } if( index == mca_ptl_gm_component.gm_num_ptl_modules ) { opal_output( 0, "%p is not a GM PTL !!!\n", (void*)base_ptl ); return OMPI_ERROR; } /* we should do the same things as in the init step in reverse order. * First we shutdown all threads if there are any. */ #if OMPI_HAVE_POSIX_THREADS if( 0 != ptl->thread.t_handle ) { void* thread_return; pthread_cancel( ptl->thread.t_handle ); opal_thread_join( &(ptl->thread), &thread_return ); } #endif /* OMPI_HAVE_POSIX_THREADS */ /* Closing each port require several steps. As there is no way to cancel all * already posted messages we start by unregistering all memory and then close * the port. After we can release all internal data. */ if( ptl->gm_send_dma_memory != NULL ) { gm_dma_free( ptl->gm_port, ptl->gm_send_dma_memory ); ptl->gm_send_dma_memory = NULL; } if( ptl->gm_recv_dma_memory != NULL ) { gm_dma_free( ptl->gm_port, ptl->gm_recv_dma_memory ); ptl->gm_recv_dma_memory = NULL; } /* Now close the port if one is open */ if( ptl->gm_port != NULL ) { gm_close( ptl->gm_port ); ptl->gm_port = NULL; } /* And now release all internal ressources. */ OBJ_DESTRUCT( &(ptl->gm_send_frags) ); if( ptl->gm_send_fragments != NULL ) { free( ptl->gm_send_fragments ); ptl->gm_send_fragments = NULL; } OBJ_DESTRUCT( &(ptl->gm_recv_frags_free) ); if( ptl->gm_recv_fragments != NULL ) { free( ptl->gm_recv_fragments ); ptl->gm_recv_fragments = NULL; } /* These are supposed to be empty by now */ OBJ_DESTRUCT( &(ptl->gm_send_frags_queue) ); OBJ_DESTRUCT( &(ptl->gm_pending_acks) ); OBJ_DESTRUCT( &(ptl->gm_recv_outstanding_queue) ); /* And finally release the PTL itself */ free( ptl ); return OMPI_SUCCESS; } int mca_ptl_gm_request_init( struct mca_ptl_base_module_t *ptl, struct mca_ptl_base_send_request_t *request ) { #if 0 mca_ptl_gm_send_frag_t *frag; struct mca_ptl_gm_send_request_t *req; frag = mca_ptl_gm_alloc_send_frag(ptl, request); if (NULL == frag) { opal_output(0,"[%s:%d] Unable to allocate a gm send fragment\n"); return OMPI_ERR_OUT_OF_RESOURCE; } else { req = (mca_ptl_gm_send_request_t *)request; req->req_frag = frag; frag->status = 0; /*MCA_PTL_GM_FRAG_CACHED;*/ } return OMPI_SUCCESS; #endif return OMPI_SUCCESS; } /* * */ void mca_ptl_gm_request_fini (struct mca_ptl_base_module_t *ptl, struct mca_ptl_base_send_request_t *request) { #if 0 mca_ptl_gm_send_frag_t *frag; frag = ((mca_ptl_gm_send_request_t *)request)->req_frag; OMPI_FREE_LIST_RETURN(&(((mca_ptl_gm_module_t *)ptl)->gm_send_frags), (opal_list_item_t *)frag); frag->status = 0; #endif OBJ_DESTRUCT(request+1); } /* * Initiate a put */ int mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl, struct mca_ptl_base_peer_t *ptl_peer, struct mca_ptl_base_send_request_t *sendreq, size_t offset, size_t size, int flags) { int rc; mca_ptl_gm_send_frag_t *putfrag; rc = mca_ptl_gm_put_frag_init( &putfrag, (mca_ptl_gm_peer_t*)ptl_peer, (mca_ptl_gm_module_t*)ptl, sendreq, offset, &size, flags ); rc = mca_ptl_gm_peer_send_continue( (mca_ptl_gm_peer_t *)ptl_peer, putfrag, sendreq, offset, &size, flags ); return OMPI_SUCCESS; } /* * initiate a get. */ int mca_ptl_gm_get (struct mca_ptl_base_module_t *ptl, struct mca_ptl_base_peer_t *ptl_base_peer, struct mca_ptl_base_recv_request_t *request, size_t offset, size_t size, int flags) { return OMPI_SUCCESS; } static void mca_ptl_gm_basic_ack_callback( struct gm_port* port, void* context, gm_status_t status ) { mca_ptl_gm_module_t* gm_ptl; mca_ptl_base_header_t* header; header = (mca_ptl_base_header_t*)context; gm_ptl = (mca_ptl_gm_module_t*)header->hdr_ack.hdr_dst_addr.pval; OMPI_GM_FREE_LIST_RETURN( &(gm_ptl->gm_send_dma_frags), ((opal_list_item_t*)header) ); /* release the send token */ opal_atomic_add( &(gm_ptl->num_send_tokens), 1 ); } /* A posted receive has been matched - if required send an * ack back to the peer and process the fragment. */ void mca_ptl_gm_matched( mca_ptl_base_module_t* ptl, mca_ptl_base_recv_frag_t* frag ) { mca_ptl_base_recv_request_t *request; mca_ptl_base_header_t *hdr; int32_t rc; mca_ptl_gm_module_t *gm_ptl; mca_ptl_gm_recv_frag_t *recv_frag; mca_ptl_gm_peer_t* peer; struct iovec iov = { NULL, 0 }; gm_ptl = (mca_ptl_gm_module_t *)ptl; request = frag->frag_request; recv_frag = (mca_ptl_gm_recv_frag_t *)frag; peer = (mca_ptl_gm_peer_t*)recv_frag->frag_recv.frag_base.frag_peer; if( frag->frag_base.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK ) { /* need to send an ack back */ opal_list_item_t *item; OMPI_FREE_LIST_WAIT( &(gm_ptl->gm_send_dma_frags), item, rc ); if( NULL == item ) { opal_output(0,"[%s:%d] unable to alloc a gm fragment\n", __FILE__,__LINE__); OPAL_THREAD_LOCK (&mca_ptl_gm_component.gm_lock); opal_list_append (&mca_ptl_gm_module.gm_pending_acks, (opal_list_item_t *)frag); OPAL_THREAD_UNLOCK (&mca_ptl_gm_component.gm_lock); } else { opal_atomic_sub( &(gm_ptl->num_send_tokens), 1 ); assert( gm_ptl->num_send_tokens >= 0 ); hdr = (mca_ptl_base_header_t*)item; hdr->hdr_ack.hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; hdr->hdr_ack.hdr_common.hdr_flags = frag->frag_base.frag_header.hdr_common.hdr_flags; hdr->hdr_ack.hdr_src_ptr = frag->frag_base.frag_header.hdr_rndv.hdr_src_ptr; hdr->hdr_ack.hdr_dst_match.lval = 0L; hdr->hdr_ack.hdr_dst_match.pval = request; hdr->hdr_ack.hdr_dst_addr.lval = 0L; hdr->hdr_ack.hdr_dst_addr.pval = ptl; /* local use */ hdr->hdr_ack.hdr_dst_size = request->req_recv.req_bytes_packed; gm_send_with_callback( ((mca_ptl_gm_module_t*)ptl)->gm_port, hdr, GM_SIZE, sizeof(mca_ptl_base_ack_header_t), GM_LOW_PRIORITY, peer->peer_addr.local_id, peer->peer_addr.port_id, mca_ptl_gm_basic_ack_callback, (void *)hdr ); } } if( frag->frag_base.frag_size > 0 ) { ompi_convertor_t* convertor; uint32_t out_size; int32_t freeAfter; size_t max_data; iov.iov_len = recv_frag->attached_data_length; /* Here we expect that frag_addr is the begin of the buffer header included */ iov.iov_base = frag->frag_base.frag_addr; convertor = &(request->req_recv.req_convertor); out_size = 1; max_data = iov.iov_len; rc = ompi_convertor_unpack( convertor, &(iov), &out_size, &max_data, &freeAfter ); assert( rc >= 0 ); recv_frag->frag_bytes_processed += max_data; } /* update progress*/ ptl->ptl_recv_progress( ptl, request, iov.iov_len, iov.iov_len ); /* Now update the status of the fragment */ if( ((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer == true ) { gm_release_local_buffer( ((mca_ptl_gm_recv_frag_t*)frag)->frag_recv.frag_base.frag_addr ); ((mca_ptl_gm_recv_frag_t*)frag)->have_allocated_buffer = false; } /* I'm done with this fragment. Return it to the free list */ OMPI_FREE_LIST_RETURN( &(gm_ptl->gm_recv_frags_free), (opal_list_item_t*)frag ); }