1
1

First commit of the multi-threaded GM driver.

- special function for thread
- now work on all thread models ...
- cleanup the initialization and finalization parts

This commit was SVN r3447.
Этот коммит содержится в:
George Bosilca 2004-10-30 06:55:18 +00:00
родитель 4f4f4688d9
Коммит d08e56f3d1
5 изменённых файлов: 318 добавлений и 272 удалений

Просмотреть файл

@ -36,7 +36,6 @@ mca_ptl_gm_module_t mca_ptl_gm_module = {
0, /* latency */
0, /* bandwidth */
MCA_PTL_PUT, /* ptl flags */
/* collection of interfaces */
mca_ptl_gm_add_procs,
mca_ptl_gm_del_procs,
@ -64,8 +63,7 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl,
struct mca_ptl_base_peer_t **peers,
ompi_bitmap_t * reachable)
{
int i,j;
int num_peer_ptls = 1;
uint32_t i, j, num_peer_ptls = 1;
struct ompi_proc_t *ompi_proc;
mca_ptl_gm_proc_t *ptl_proc;
mca_ptl_gm_peer_t *ptl_peer;
@ -102,7 +100,7 @@ mca_ptl_gm_add_procs (struct mca_ptl_base_module_t *ptl,
ptl_peer->global_id = ptl_proc->proc_addrs->global_id;
ptl_peer->port_number = ptl_proc->proc_addrs->port_id;
if (GM_SUCCESS !=
gm_global_id_to_node_id (((mca_ptl_gm_module_t *) ptl)->my_port,
gm_global_id_to_node_id (((mca_ptl_gm_module_t *) ptl)->gm_port,
ptl_proc->proc_addrs[j].global_id,
&lid)) {
ompi_output (0,
@ -147,9 +145,33 @@ mca_ptl_gm_del_procs (struct mca_ptl_base_module_t *ptl,
*
*/
int
mca_ptl_gm_finalize (struct mca_ptl_base_module_t *ptl)
mca_ptl_gm_finalize (struct mca_ptl_base_module_t *base_ptl)
{
free (ptl);
void* thread_return;
uint32_t index;
mca_ptl_gm_module_t* ptl = (mca_ptl_gm_module_t*)base_ptl;
/* we should do the same things as in the init step in reverse order.
* First we shutdown all threads if there are any.
*/
if( 0 != ptl->thread.t_handle ) {
pthread_cancel( ptl->thread.t_handle );
ompi_thread_join( &(ptl->thread), &thread_return );
}
for( index = 0; index < mca_ptl_gm_component.gm_num_ptl_modules; index++ ) {
if( mca_ptl_gm_component.gm_ptl_modules[index] == ptl ) {
mca_ptl_gm_component.gm_ptl_modules[index] = NULL;
OMPI_OUTPUT((0, "GM ptl %p succesfully finalized\n", (void*)ptl));
}
}
/* based on the fact that the port 0 is reserved, we can use ZERO
* to mark a port as unused.
*/
if( 0 != ptl->gm_port ) gm_close( ptl->gm_port );
ptl->gm_port = 0;
free( ptl );
return OMPI_SUCCESS;
}
@ -201,7 +223,6 @@ mca_ptl_gm_request_fini (struct mca_ptl_base_module_t *ptl,
frag->status = 0;
#endif
A_PRINT("entering request fini\n");
OBJ_DESTRUCT(request+1);
}
@ -287,7 +308,7 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
/* register the user buffer */
if (offset > 0) {
status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg);
status = gm_register_memory(gm_ptl->gm_port, buffer_ptr, bytes_reg);
if(GM_SUCCESS != status) {
ompi_output(0,"[%s:%d] Unable to register memory\n",__FILE__,__LINE__);
}
@ -317,7 +338,6 @@ mca_ptl_gm_put (struct mca_ptl_base_module_t *ptl,
rc = mca_ptl_gm_peer_send (putfrag->peer,putfrag,sendreq,
offset,&size,flags);
assert(rc == 0);
A_PRINT("after issuing the put completion(fin) for the request");
#endif
return OMPI_SUCCESS;
@ -387,7 +407,7 @@ mca_ptl_gm_matched( mca_ptl_base_module_t * ptl,
bytes_recv = frag->frag_base.frag_size;
bytes_reg = total_bytes - bytes_recv;
buffer_ptr += bytes_recv;
status = gm_register_memory(gm_ptl->my_port, buffer_ptr, bytes_reg);
status = gm_register_memory(gm_ptl->gm_port, buffer_ptr, bytes_reg);
recv_frag->registered_buf = buffer_ptr;
A_PRINT("Receiver: register addr: %p, bytes: %d\n",buffer_ptr,bytes_reg);

Просмотреть файл

@ -1,3 +1,5 @@
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* $HEADER$
*/
@ -20,7 +22,6 @@
#define MCA_PTL_GM_STATISTICS 0
#define GM_SIZE 30
#define THRESHOLD 16384
#define MAX_GM_PORTS 16
#define MAX_RECV_TOKENS 256
#define PTL_GM_ADMIN_SEND_TOKENS 0
#define PTL_GM_ADMIN_RECV_TOKENS 0
@ -43,6 +44,10 @@ struct mca_ptl_gm_component_t {
int gm_free_list_num; /**< initial size of free lists */
int gm_free_list_max; /**< maximum size of free lists */
int gm_free_list_inc; /**< number of elements to alloc when growing free lists */
int gm_max_port_number; /**< maximum number of ports by board */
int gm_max_boards_number; /**< maximum number of boards on the node */
int gm_max_rdma_frag_size; /**< maximum fragment size used to transfer data over RDMA */
char* gm_port_name; /**< the name used to get the port */
struct mca_ptl_gm_proc_t* gm_local;
ompi_list_t gm_procs;
ompi_list_t gm_send_req;
@ -60,10 +65,10 @@ extern mca_ptl_gm_component_t mca_ptl_gm_component;
*/
struct mca_ptl_gm_module_t {
mca_ptl_base_module_t super; /**< base PTL module interface */
struct gm_port *my_port;
unsigned int my_local_id;
unsigned int my_global_id;
unsigned int my_port_id;
struct gm_port *gm_port;
unsigned int local_id;
unsigned int global_id;
unsigned int port_id;
unsigned int num_send_tokens;
unsigned int num_recv_tokens;
unsigned int max_send_tokens;
@ -74,6 +79,8 @@ struct mca_ptl_gm_module_t {
ompi_list_t gm_send_frags_queue;
ompi_list_t gm_pending_acks;
ompi_list_t gm_recv_outstanding_queue;
ompi_thread_t thread;
#if MCA_PTL_GM_STATISTICS
size_t ptl_bytes_sent;
size_t ptl_bytes_recv;

Просмотреть файл

@ -63,52 +63,34 @@ static bool mca_ptl_gm_component_initialized = false;
*/
static inline char *
mca_ptl_gm_param_register_string (const char *param_name,
const char *default_value)
mca_ptl_gm_param_register_string( const char *param_name,
const char *default_value )
{
char *param_value;
int id =
mca_base_param_register_string ("ptl", "gm", param_name, NULL,
default_value);
char *param_value;
int id = mca_base_param_register_string( "ptl", "gm", param_name, NULL,
default_value) ;
mca_base_param_lookup_string (id, &param_value);
return param_value;
}
static inline int
mca_ptl_gm_param_register_int (const char *param_name, int default_value)
mca_ptl_gm_param_register_int( const char *param_name, int default_value )
{
int id =
int id =
mca_base_param_register_int ("ptl", "gm", param_name, NULL,
default_value);
int param_value = default_value;
int param_value = default_value;
mca_base_param_lookup_int (id, &param_value);
return param_value;
}
/*
*
*/
static int
ompi_mca_ptl_gm_finalize (mca_ptl_gm_module_t * gm)
{
/* add code */
return OMPI_SUCCESS;
}
/*
* Called by MCA framework to open the module, registers
* module parameters.
*/
int
mca_ptl_gm_component_open (void)
mca_ptl_gm_component_open(void)
{
/* initialize state */
mca_ptl_gm_component.gm_ptl_modules = NULL;
@ -122,11 +104,22 @@ mca_ptl_gm_component_open (void)
/* register GM component parameters */
mca_ptl_gm_module.super.ptl_first_frag_size =
mca_ptl_gm_param_register_int ("first_frag_size",
((PTL_GM_FIRST_FRAG_SIZE) - 64));
((PTL_GM_FIRST_FRAG_SIZE) - 64));
mca_ptl_gm_module.super.ptl_min_frag_size =
mca_ptl_gm_param_register_int ("min_frag_size", 1<<16);
mca_ptl_gm_module.super.ptl_max_frag_size =
mca_ptl_gm_param_register_int ("max_frag_size", 256 * 1024);
/* Parameters setting the message limits. */
mca_ptl_gm_component.gm_port_name =
mca_ptl_gm_param_register_string( "port_name", "OMPI_GM" );
mca_ptl_gm_component.gm_max_port_number =
mca_ptl_gm_param_register_int ("max_ports_number", 16 );
mca_ptl_gm_component.gm_max_boards_number =
mca_ptl_gm_param_register_int ("max_boards_number", 4 );
mca_ptl_gm_component.gm_max_rdma_frag_size =
mca_ptl_gm_param_register_int ("max_rdma_frag_size", 512 * 1024);
mca_ptl_gm_component.gm_max_ptl_modules =
mca_ptl_gm_param_register_int( "max_ptl_modules", 1 );
mca_ptl_gm_component.gm_free_list_num =
mca_ptl_gm_param_register_int ("free_list_num", 256);
@ -136,23 +129,11 @@ mca_ptl_gm_component_open (void)
return OMPI_SUCCESS;
}
/*
* component close
*/
int mca_ptl_gm_component_close (void)
{
#ifdef GOPAL_TODO
if (OMPI_SUCCESS != ompi_mca_ptl_gm_finalize(&mca_ptl_gm_component)) {
ompi_output(0,
"[%s:%d] error in finalizing gm state and PTL's.\n",
__FILE__, __LINE__);
return NULL;
}
#endif
if (NULL != mca_ptl_gm_component.gm_ptl_modules)
free (mca_ptl_gm_component.gm_ptl_modules);
@ -168,30 +149,28 @@ int mca_ptl_gm_component_close (void)
*/
static int
mca_ptl_gm_create (int i)
mca_ptl_gm_create( mca_ptl_gm_module_t** pptl )
{
mca_ptl_gm_module_t *ptl;
ptl = (mca_ptl_gm_module_t *)malloc (sizeof (mca_ptl_gm_module_t));
ptl = (mca_ptl_gm_module_t *)malloc( sizeof(mca_ptl_gm_module_t) );
if (NULL == ptl) {
ompi_output (0,
" ran out of resource to allocate ptl_instance \n");
ompi_output( 0, " ran out of resource to allocate ptl_instance \n" );
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* copy the basic informations in the new PTL */
memcpy (ptl, &mca_ptl_gm_module, sizeof (mca_ptl_gm_module));
mca_ptl_gm_component.gm_ptl_modules[i] = ptl;
ptl->thread.t_handle = (pthread_t)-1;
*pptl = ptl;
return OMPI_SUCCESS;
}
/*
* Register GM component addressing information. The MCA framework
* will make this available to all peers.
*/
static int
mca_ptl_gm_module_store_data_toexchange (void)
{
@ -209,9 +188,9 @@ mca_ptl_gm_module_store_data_toexchange (void)
for (i = 0; i < mca_ptl_gm_component.gm_num_ptl_modules; i++) {
mca_ptl_gm_module_t *ptl = mca_ptl_gm_component.gm_ptl_modules[i];
addrs[i].local_id = ptl->my_local_id;
addrs[i].global_id = ptl->my_global_id;
addrs[i].port_id = ptl->my_port_id;
addrs[i].local_id = ptl->local_id;
addrs[i].global_id = ptl->global_id;
addrs[i].port_id = ptl->port_id;
}
rc = mca_base_modex_send (&mca_ptl_gm_component.super.ptlm_version, addrs,
size);
@ -219,178 +198,223 @@ mca_ptl_gm_module_store_data_toexchange (void)
return rc;
}
static int
ompi_mca_ptl_gm_init (mca_ptl_gm_component_t * gm)
static void*
mca_ptl_gm_thread_progress( ompi_thread_t* thread )
{
mca_ptl_gm_module_t *ptl;
unsigned int board_no, port_no;
gm_status_t status;
int i;
int maxptls = 1; /* maxptls set to 1 */
gm_recv_event_t *event;
mca_ptl_gm_module_t* ptl = thread->t_arg;
mca_ptl_gm_component.gm_max_ptl_modules = maxptls;
mca_ptl_gm_component.gm_ptl_modules = malloc (maxptls *
sizeof (mca_ptl_gm_module_t *));
if (NULL == mca_ptl_gm_component.gm_ptl_modules)
return OMPI_ERR_OUT_OF_RESOURCE;
/* This thread enter in a cancel enabled state */
pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, NULL );
for (i = 0; i < maxptls; i++) {
mca_ptl_gm_create (i);
while(1) {
event = gm_blocking_receive(ptl->gm_port);
mca_ptl_gm_analyze_recv_event( ptl, event );
}
/*Hack : we have set the gm_max_ptl_modules to 1 */
for (i = 0; i < mca_ptl_gm_component.gm_max_ptl_modules; i++) {
ptl = mca_ptl_gm_component.gm_ptl_modules[i];
/* open the first available gm port for this board */
board_no = i;
for (port_no = 2; port_no < MAX_GM_PORTS; port_no++)
{
GM_DBG(PTL_GM_DBG_COMM,"about to call open port\n");
if (port_no == 3) continue;
/* port 0,1,3 reserved */
status = gm_open (&(ptl->my_port), board_no,
port_no, "OMPI-GM", GM_API_VERSION_2_0);
if (GM_SUCCESS == status) {
ptl->my_port_id = port_no;
mca_ptl_gm_component.gm_num_ptl_modules++;
break;
}
}
#if 1
/* Get node local Id */
if (GM_SUCCESS != gm_get_node_id (ptl->my_port, &(ptl->my_local_id))) {
ompi_output (0, " failure to get local_id \n");
return 0;
}
#endif
/* Convert local id to global id */
if (GM_SUCCESS !=
gm_node_id_to_global_id (ptl->my_port, ptl->my_local_id,
&(ptl->my_global_id))) {
ompi_output (0, " Error: Unable to get my GM global id \n");
return 0;
}
}
return OMPI_SUCCESS;
return PTHREAD_CANCELED;
}
static int
ompi_mca_ptl_gm_init_sendrecv (mca_ptl_gm_component_t * gm)
/* Scan all ports on the boards. As it's difficult to find the total number of boards
* we use a predefined maximum.
* Return the number of discovered boards where opening a port was a succesfull operation.
*/
static int32_t
mca_ptl_gm_discover_boards( mca_ptl_gm_module_t** pptl,
uint32_t max_ptls, uint32_t max_boards, uint32_t max_port )
{
int i;
mca_ptl_gm_module_t *ptl;
uint32_t board_no, port_no;
struct gm_port* gm_port;
uint32_t index = 0;
uint32_t local_id, global_id;
for( board_no = 0; board_no < max_boards; board_no++ ) {
/* open the first available gm port for this board */
for( port_no = 2; port_no < max_port; port_no++ ) {
if (port_no == 3) continue; /* port 0,1,3 reserved */
if( GM_SUCCESS == gm_open( &gm_port, board_no, port_no,
mca_ptl_gm_component.gm_port_name, GM_API_VERSION_2_0) )
break;
}
if( port_no == max_port ) continue;
/* Get node local Id */
if( GM_SUCCESS != gm_get_node_id( gm_port, &local_id) ) {
ompi_output (0, " failure to get local_id \n");
continue;
}
/* Convert local id to global id */
if (GM_SUCCESS != gm_node_id_to_global_id( gm_port, local_id, &global_id) ) {
ompi_output (0, " Error: Unable to get my GM global id \n");
continue;
}
/* Create the ptl. If fail return the number of already created */
if( OMPI_SUCCESS != mca_ptl_gm_create( &(pptl[index]) ) )
return index;
pptl[index]->port_id = port_no;
pptl[index]->gm_port = gm_port;
pptl[index]->local_id = local_id;
pptl[index]->global_id = global_id;
/* everything is OK let's mark it as usable and go to the next one */
if( (++index) >= max_ptls ) break;
}
return index;
}
static inline int
mca_ptl_gm_init_sendrecv (mca_ptl_gm_module_t * ptl)
{
uint32_t i;
gm_status_t status;
void *gm_send_reg_memory , *gm_recv_reg_memory;
ompi_free_list_t *fslist, *free_rlist;
mca_ptl_gm_send_frag_t *sfragment;
mca_ptl_gm_recv_frag_t *free_rfragment;
for (i = 0; i < mca_ptl_gm_component.gm_max_ptl_modules; i++) {
ptl = mca_ptl_gm_component.gm_ptl_modules[i];
ptl->num_send_tokens = gm_num_send_tokens (ptl->gm_port);
ptl->num_send_tokens -= PTL_GM_ADMIN_SEND_TOKENS;
ptl->num_recv_tokens = gm_num_receive_tokens (ptl->gm_port);
ptl->num_recv_tokens -= PTL_GM_ADMIN_RECV_TOKENS;
ptl->num_send_tokens = gm_num_send_tokens (ptl->my_port);
ptl->num_send_tokens -= PTL_GM_ADMIN_SEND_TOKENS;
ptl->num_recv_tokens = gm_num_receive_tokens (ptl->my_port);
ptl->num_recv_tokens -= PTL_GM_ADMIN_RECV_TOKENS;
/****************SEND****************************/
/* construct a list of send fragments */
OBJ_CONSTRUCT (&(ptl->gm_send_frags), ompi_free_list_t);
OBJ_CONSTRUCT (&(ptl->gm_send_frags_queue), ompi_list_t);
fslist = &(ptl->gm_send_frags);
/****************SEND****************************/
/* construct a list of send fragments */
OBJ_CONSTRUCT (&(ptl->gm_send_frags), ompi_free_list_t);
OBJ_CONSTRUCT (&(ptl->gm_send_frags_queue), ompi_list_t);
fslist = &(ptl->gm_send_frags);
ompi_free_list_init (&(ptl->gm_send_frags),
sizeof (mca_ptl_gm_send_frag_t),
OBJ_CLASS (mca_ptl_gm_send_frag_t),
ptl->num_send_tokens, ptl->num_send_tokens, 1, NULL); /* not using mpool */
ompi_free_list_init (&(ptl->gm_send_frags),
sizeof (mca_ptl_gm_send_frag_t),
OBJ_CLASS (mca_ptl_gm_send_frag_t),
ptl->num_send_tokens,ptl->num_send_tokens, 1, NULL); /* not using mpool */
/* allocate the elements */
sfragment = (mca_ptl_gm_send_frag_t *)
malloc (sizeof(mca_ptl_gm_send_frag_t) * (ptl->num_send_tokens));
/* allocate the elements */
sfragment = (mca_ptl_gm_send_frag_t *)
malloc (sizeof(mca_ptl_gm_send_frag_t) *
(ptl->num_send_tokens));
/* allocate the registered memory */
gm_send_reg_memory = gm_dma_malloc ( ptl->my_port,
(GM_SEND_BUF_SIZE * ptl->num_send_tokens) );
if( NULL == gm_send_reg_memory ) {
ompi_output( 0, "unable to allocate registered memory\n" );
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i = 0; i < ptl->num_send_tokens; i++) {
ompi_list_item_t *item;
sfragment->send_buf = gm_send_reg_memory;
item = (ompi_list_item_t *) sfragment;
OMPI_FREE_LIST_RETURN( fslist, item );
gm_send_reg_memory = ((char *)gm_send_reg_memory) + GM_SEND_BUF_SIZE;
sfragment++;
}
A_PRINT("recv_tokens = %d send_tokens = %d, allocted free lis =
%d\n",ptl->num_recv_tokens,ptl->num_send_tokens,fslist->fl_num_allocated);
/* allocate the registered memory */
gm_send_reg_memory = gm_dma_malloc ( ptl->gm_port,
(GM_SEND_BUF_SIZE * ptl->num_send_tokens) );
if( NULL == gm_send_reg_memory ) {
ompi_output( 0, "unable to allocate registered memory\n" );
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i = 0; i < ptl->num_send_tokens; i++) {
ompi_list_item_t *item;
sfragment->send_buf = gm_send_reg_memory;
item = (ompi_list_item_t *) sfragment;
OMPI_FREE_LIST_RETURN( fslist, item );
gm_send_reg_memory = ((char *)gm_send_reg_memory) + GM_SEND_BUF_SIZE;
sfragment++;
}
A_PRINT( ("recv_tokens = %d send_tokens = %d, allocted free lis = %d\n",
ptl->num_recv_tokens,ptl->num_send_tokens,fslist->fl_num_allocated) );
/*****************RECEIVE*****************************/
/*allow remote memory access */
status = gm_allow_remote_memory_access (ptl->my_port);
if (GM_SUCCESS != status) {
ompi_output (0, "unable to allow remote memory access\n");
/*****************RECEIVE*****************************/
/*allow remote memory access */
status = gm_allow_remote_memory_access (ptl->gm_port);
if (GM_SUCCESS != status) {
ompi_output (0, "unable to allow remote memory access\n");
}
}
OBJ_CONSTRUCT (&(ptl->gm_recv_outstanding_queue), ompi_list_t);
OBJ_CONSTRUCT (&(ptl->gm_recv_outstanding_queue), ompi_list_t);
/* construct the list of recv fragments free */
OBJ_CONSTRUCT (&(ptl->gm_recv_frags_free), ompi_free_list_t);
free_rlist = &(ptl->gm_recv_frags_free);
/* construct the list of recv fragments free */
OBJ_CONSTRUCT (&(ptl->gm_recv_frags_free), ompi_free_list_t);
free_rlist = &(ptl->gm_recv_frags_free);
ompi_free_list_init (&(ptl->gm_recv_frags_free),
sizeof (mca_ptl_gm_recv_frag_t),
OBJ_CLASS (mca_ptl_gm_recv_frag_t),
ptl->num_recv_tokens,ptl->num_recv_tokens, 1, NULL);
ompi_free_list_init (&(ptl->gm_recv_frags_free),
sizeof (mca_ptl_gm_recv_frag_t),
OBJ_CLASS (mca_ptl_gm_recv_frag_t),
ptl->num_recv_tokens,ptl->num_recv_tokens, 1, NULL);
/*allocate the elements */
free_rfragment = (mca_ptl_gm_recv_frag_t *)
malloc(sizeof(mca_ptl_gm_recv_frag_t) * NUM_RECV_FRAGS);
/*allocate the elements */
free_rfragment = (mca_ptl_gm_recv_frag_t *)
malloc(sizeof(mca_ptl_gm_recv_frag_t) * ptl->num_recv_tokens);
/*allocate the registered memory */
gm_recv_reg_memory =
gm_dma_malloc (ptl->gm_port, (GM_RECV_BUF_SIZE * ptl->num_recv_tokens ) );
for (i = 0; i < NUM_RECV_FRAGS; i++) {
ompi_list_item_t *item;
item = (ompi_list_item_t *) free_rfragment;
OMPI_FREE_LIST_RETURN( free_rlist, item );
free_rfragment++;
}
if( NULL == gm_recv_reg_memory ) {
ompi_output( 0, "unable to allocate registered memory for receive\n" );
return OMPI_ERR_OUT_OF_RESOURCE;
}
/*allocate the registered memory */
gm_recv_reg_memory =
gm_dma_malloc (ptl->my_port, (GM_RECV_BUF_SIZE * ptl->num_recv_tokens ) );
for( i = 0; i < ptl->num_recv_tokens; i++ ) {
ompi_list_item_t *item = (ompi_list_item_t *) free_rfragment;
OMPI_FREE_LIST_RETURN( free_rlist, item );
free_rfragment++;
if( NULL == gm_recv_reg_memory )
{
ompi_output( 0, "unable to allocate registered memory for receive\n" );
return OMPI_ERR_OUT_OF_RESOURCE;
}
for (i = 0; i < ptl->num_recv_tokens ; i++)
{
gm_provide_receive_buffer( ptl->my_port, gm_recv_reg_memory,
GM_SIZE, GM_LOW_PRIORITY );
gm_recv_reg_memory = ((char *)gm_recv_reg_memory) + GM_RECV_BUF_SIZE;
}
gm_provide_receive_buffer( ptl->gm_port, gm_recv_reg_memory,
GM_SIZE, GM_LOW_PRIORITY );
gm_recv_reg_memory = ((char *)gm_recv_reg_memory) + GM_RECV_BUF_SIZE;
}
return OMPI_SUCCESS;
}
static int
mca_ptl_gm_init( mca_ptl_gm_component_t * gm )
{
uint32_t index;
mca_ptl_gm_module_t* ptl;
/* let's try to find if GM is available */
if( GM_SUCCESS != gm_init() )
return OMPI_ERR_OUT_OF_RESOURCE;
/* First discover all available boards. For each board we will create a unique PTL */
mca_ptl_gm_component.gm_ptl_modules = calloc( mca_ptl_gm_component.gm_max_ptl_modules,
sizeof (mca_ptl_gm_module_t *));
if (NULL == mca_ptl_gm_component.gm_ptl_modules)
return OMPI_ERR_OUT_OF_RESOURCE;
mca_ptl_gm_component.gm_num_ptl_modules =
mca_ptl_gm_discover_boards( mca_ptl_gm_component.gm_ptl_modules,
mca_ptl_gm_component.gm_max_ptl_modules,
mca_ptl_gm_component.gm_max_boards_number,
mca_ptl_gm_component.gm_max_port_number );
/* In the case when we are in a multi-threaded environment each PTL will have it's
* own thread. At this point all structures are correctly initialized, each thread
* will grab one and use it.
*/
if( ompi_using_threads() ) {
uint32_t save_counter;
for( index = 0; index < mca_ptl_gm_component.gm_num_ptl_modules; index++ ) {
ptl = mca_ptl_gm_component.gm_ptl_modules[index];
/* Now prepost some received and allocate some sends. After this step the PTL
* is fully initialized.
*/
if( OMPI_SUCCESS != mca_ptl_gm_init_sendrecv( ptl ) )
break;
ptl->thread.t_run = (ompi_thread_fn_t)mca_ptl_gm_thread_progress;
ptl->thread.t_arg = (void*)ptl;
if( OMPI_SUCCESS != ompi_thread_start( &(ptl->thread) ) )
break;
}
save_counter = index;
/* If we are unable to start all the required threads we update the total
* number of threads and call finalize for the others PTLs.
*/
for( ; index < mca_ptl_gm_component.gm_num_ptl_modules; index++ ) {
mca_ptl_base_module_t* ptl = (mca_ptl_base_module_t*)mca_ptl_gm_component.gm_ptl_modules[index];
ptl->ptl_finalize( ptl );
}
mca_ptl_gm_component.gm_num_ptl_modules = save_counter;
}
return (mca_ptl_gm_component.gm_num_ptl_modules > 0 ? OMPI_SUCCESS : OMPI_ERR_OUT_OF_RESOURCE);
}
/*
* Initialize the GM component,
@ -405,20 +429,12 @@ mca_ptl_gm_component_init (int *num_ptl_modules,
mca_ptl_base_module_t **ptls;
*num_ptl_modules = 0;
*allow_multi_user_threads = false;
*have_hidden_threads = false;
*allow_multi_user_threads = true;
*have_hidden_threads = true;
if (OMPI_SUCCESS != ompi_mca_ptl_gm_init (&mca_ptl_gm_component)) {
ompi_output (0,
"[%s:%d] error in initializing gm state and PTL's.\n",
__FILE__, __LINE__);
return NULL;
}
if (OMPI_SUCCESS != ompi_mca_ptl_gm_init_sendrecv (&mca_ptl_gm_component)) {
ompi_output (0,
"[%s:%d] error in initializing buffer resources .\n",
__FILE__, __LINE__);
if (OMPI_SUCCESS != mca_ptl_gm_init (&mca_ptl_gm_component)) {
ompi_output( 0, "[%s:%d] error in initializing gm state and PTL's.\n",
__FILE__, __LINE__ );
return NULL;
}
@ -426,7 +442,6 @@ mca_ptl_gm_component_init (int *num_ptl_modules,
if (OMPI_SUCCESS != mca_ptl_gm_module_store_data_toexchange ())
return 0;
/* return array of PTLs */
ptls = (mca_ptl_base_module_t**) malloc (
mca_ptl_gm_component.gm_num_ptl_modules * sizeof(mca_ptl_base_module_t *));

Просмотреть файл

@ -34,16 +34,15 @@ int mca_ptl_gm_peer_put(mca_ptl_gm_peer_t *ptl_peer,
void * target_buffer,
int bytes)
{
gm_put( ptl_peer->peer_ptl->my_port, fragment->registered_buf,
(gm_remote_ptr_t)target_buffer, bytes, GM_LOW_PRIORITY,
ptl_peer->local_id, ptl_peer->port_number,
put_callback, (void *)fragment );
gm_put( ptl_peer->peer_ptl->gm_port, fragment->registered_buf,
(gm_remote_ptr_t)target_buffer, bytes, GM_LOW_PRIORITY,
ptl_peer->local_id, ptl_peer->port_number,
put_callback, (void *)fragment );
fragment->send_frag.frag_base.frag_owner = &ptl_peer->peer_ptl->super;
fragment->send_frag.frag_base.frag_peer =
(struct mca_ptl_base_peer_t*)ptl_peer;
(struct mca_ptl_base_peer_t*)ptl_peer;
fragment->send_frag.frag_base.frag_addr =(void *)target_buffer;
fragment->send_frag.frag_base.frag_size = bytes;
return OMPI_SUCCESS;
@ -125,7 +124,7 @@ int mca_ptl_gm_peer_send(mca_ptl_gm_peer_t *ptl_peer,
((mca_ptl_base_header_t *)header)->hdr_ack.hdr_src_ptr);
/* initiate the gm send */
gm_send_with_callback( ptl_peer->peer_ptl->my_port, fragment->send_buf,
gm_send_with_callback( ptl_peer->peer_ptl->gm_port, fragment->send_buf,
GM_SIZE, size_out, GM_LOW_PRIORITY, ptl_peer->local_id,
ptl_peer->port_number, send_callback, (void *)fragment );
@ -181,7 +180,7 @@ void put_callback(struct gm_port *port,void * context, gm_status_t status)
#endif
/* deregister the user memory */
status = gm_deregister_memory(ptl->my_port, (char *)(putfrag->registered_buf), bytes2);
status = gm_deregister_memory(ptl->gm_port, (char *)(putfrag->registered_buf), bytes2);
if(GM_SUCCESS != status) {
ompi_output(0," unpinning memory failed\n");
@ -446,7 +445,7 @@ void ptl_gm_ctrl_frag(struct mca_ptl_gm_module_t *ptl,
/* deregister the memory */
bytes = header->hdr_ack.hdr_dst_size;
reg_buf =(char *) header->hdr_ack.hdr_dst_addr.pval;
status = gm_deregister_memory(ptl->my_port, reg_buf, bytes);
status = gm_deregister_memory(ptl->gm_port, reg_buf, bytes);
if(GM_SUCCESS != status)
{
@ -555,7 +554,6 @@ void mca_ptl_gm_outstanding_recv(mca_ptl_gm_module_t *ptl)
size = ompi_list_get_size (&ptl->gm_recv_outstanding_queue);
if (size > 0)
{
frag = (mca_ptl_gm_recv_frag_t *)
@ -581,55 +579,59 @@ void mca_ptl_gm_outstanding_recv(mca_ptl_gm_module_t *ptl)
}
int mca_ptl_gm_analyze_recv_event(mca_ptl_gm_module_t* ptl, gm_recv_event_t* event )
{
void * mesg;
mca_ptl_gm_recv_frag_t * frag;
switch (gm_ntohc(event->recv.type)) {
case GM_RECV_EVENT:
case GM_HIGH_RECV_EVENT:
case GM_PEER_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT:
mesg = gm_ntohp(event->recv.buffer);
frag = ptl_gm_handle_recv( ptl, event );
GM_DBG(PTL_GM_DBG_COMM,"FINISHED HANDLING INCOMING EVENT\n");
if( (frag != NULL) && !(frag->matched) ) {
/* allocate temporary buffer: temporary until the fragment will be finally matched */
char* buffer = malloc( GM_SEND_BUF_SIZE );
if (NULL == buffer) {
ompi_output(0, "[%s:%d] error in allocating memory \n", __FILE__, __LINE__);
}
/* copy the data from the registered buffer to the newly allocated one */
memcpy( buffer, mesg, gm_ntohl(event->recv.length) );
/* associate the buffer with the unexpected fragment */
frag->frag_recv.frag_base.frag_addr = (void *)buffer;
/* mark the fragment as having pending buffers */
frag->have_allocated_buffer = true;
}
gm_provide_receive_buffer( ptl->gm_port, gm_ntohp(event->recv.buffer),
GM_SIZE, GM_LOW_PRIORITY );
break;
case GM_NO_RECV_EVENT:
break;
default:
gm_unknown(ptl->gm_port, event);
}
return 0;
}
int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp)
{
int i;
uint32_t i;
gm_recv_event_t *event;
void * mesg;
mca_ptl_gm_module_t *ptl;
mca_ptl_gm_recv_frag_t * frag;
for( i = 0; i< gm_comp->gm_num_ptl_modules; i++) {
ptl = gm_comp->gm_ptl_modules[i];
event = gm_receive(ptl->my_port);
switch (gm_ntohc(event->recv.type)) {
case GM_RECV_EVENT:
case GM_HIGH_RECV_EVENT:
case GM_PEER_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT:
mesg = gm_ntohp(event->recv.buffer);
frag = ptl_gm_handle_recv( ptl, event );
GM_DBG(PTL_GM_DBG_COMM,"FINISHED HANDLING INCOMING EVENT\n");
if( (frag != NULL) && !(frag->matched) ) {
/* allocate temporary buffer: temporary until the fragment will be finally matched */
char* buffer = malloc( GM_SEND_BUF_SIZE );
if (NULL == buffer)
{
ompi_output(0, "[%s:%d] error in allocating memory \n",
__FILE__, __LINE__);
}
/* copy the data from the registered buffer to the newly allocated one */
memcpy( buffer, mesg, gm_ntohl(event->recv.length) );
/* associate the buffer with the unexpected fragment */
frag->frag_recv.frag_base.frag_addr = (void *)buffer;
/* mark the fragment as having pending buffers */
frag->have_allocated_buffer = true;
}
gm_provide_receive_buffer( ptl->my_port, gm_ntohp(event->recv.buffer),
GM_SIZE, GM_LOW_PRIORITY );
break;
case GM_NO_RECV_EVENT:
break;
default:
gm_unknown(ptl->my_port, event);
}
event = gm_receive(ptl->gm_port);
mca_ptl_gm_analyze_recv_event( ptl, event );
}
return 0;
}

Просмотреть файл

@ -49,6 +49,8 @@ mca_ptl_gm_recv_frag_t* ptl_gm_data_frag( struct mca_ptl_gm_module_t *ptl,
mca_ptl_gm_recv_frag_t* ptl_gm_handle_recv( mca_ptl_gm_module_t *ptl,
gm_recv_event_t* event );
int mca_ptl_gm_analyze_recv_event(mca_ptl_gm_module_t* ptl, gm_recv_event_t* event );
int mca_ptl_gm_incoming_recv (mca_ptl_gm_component_t * gm_comp);