1
1
openmpi/ompi/mca/osc/rdma/osc_rdma_comm.c
Brian Barrett 1a9f48c89d Some much needed cleanup of the rdma one-sided component, similar to
r14703 for the point-to-point component.

  * Associate the list of long message requests to poll with the 
    component, not the individual modules
  * add progress thread that sits on the OMPI request structure
    and wakes up at the appropriate time to poll the message
    list to move long messages asynchronously.
  * Instead of calling opal_progress() all over the place, move
    to using the condition variables like the rest of the project.
    Has the advantage of moving it slightly further along in the
    becoming thread safe thing.
  * Fix a problem with the passive side of unlock where it could 
    go recursive and cause all kinds of problems, especially 
    when progress threads are used. Instead, have two parts of 
    passive unlock -- one to start the unlock, and another to 
    complete the lock and send the ack back. The data moving 
    code trips the second at the right time. 

This commit was SVN r14751.

The following SVN revision numbers were found above:
  r14703 --> open-mpi/ompi@2b4b754925
2007-05-24 15:41:24 +00:00

236 строки
8.1 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include <stdio.h>
#include "osc_rdma.h"
#include "osc_rdma_sendreq.h"
#include "osc_rdma_header.h"
#include "osc_rdma_data_move.h"
static int
enqueue_sendreq(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq)
{
OPAL_THREAD_LOCK(&(module->m_lock));
opal_list_append(&(module->m_pending_sendreqs),
(opal_list_item_t*) sendreq);
module->m_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(module->m_lock));
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_module_accumulate(void *origin_addr, int origin_count,
struct ompi_datatype_t *origin_dt,
int target, int target_disp, int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_op_t *op, ompi_win_t *win)
{
int ret;
ompi_osc_rdma_sendreq_t *sendreq;
if ((OMPI_WIN_STARTED & ompi_win_get_mode(win)) &&
(!GET_MODULE(win)->m_sc_remote_active_ranks[target])) {
return MPI_ERR_RMA_SYNC;
}
if (OMPI_WIN_FENCE & ompi_win_get_mode(win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
if (op != &ompi_mpi_op_replace &&
!ompi_ddt_is_predefined(target_dt)) {
fprintf(stderr, "MPI_Accumulate currently does not support reductions\n");
fprintf(stderr, "with any user-defined types. This will be rectified\n");
fprintf(stderr, "in a future release.\n");
return MPI_ERR_UNSUPPORTED_OPERATION;
}
/* shortcut 0 count case */
if (0 == origin_count || 0 == target_count) {
return OMPI_SUCCESS;
}
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_ACC,
origin_addr,
origin_count,
origin_dt,
target,
target_disp,
target_count,
target_dt,
GET_MODULE(win),
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
sendreq->req_op_id = op->o_f_to_c_index;
/* enqueue sendreq */
ret = enqueue_sendreq(GET_MODULE(win), sendreq);
return ret;
}
int
ompi_osc_rdma_module_get(void *origin_addr,
int origin_count,
struct ompi_datatype_t *origin_dt,
int target,
int target_disp,
int target_count,
struct ompi_datatype_t *target_dt,
ompi_win_t *win)
{
int ret;
ompi_osc_rdma_sendreq_t *sendreq;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
if ((OMPI_WIN_STARTED & ompi_win_get_mode(win)) &&
(!module->m_sc_remote_active_ranks[target])) {
return MPI_ERR_RMA_SYNC;
}
if (OMPI_WIN_FENCE & ompi_win_get_mode(win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
/* shortcut 0 count case */
if (0 == origin_count || 0 == target_count) {
return OMPI_SUCCESS;
}
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_GET,
origin_addr,
origin_count,
origin_dt,
target,
target_disp,
target_count,
target_dt,
module,
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
/* if we're doing fence synchronization, try to actively send
right now */
if (module->m_eager_send &&
(OMPI_WIN_FENCE & ompi_win_get_mode(win))) {
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out += 1;
module->m_num_pending_sendreqs[sendreq->req_target_rank] += 1;
OPAL_THREAD_UNLOCK(&(module->m_lock));
ret = ompi_osc_rdma_sendreq_send(module, sendreq);
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out -= 1;
opal_list_append(&(module->m_pending_sendreqs),
(opal_list_item_t*) sendreq);
OPAL_THREAD_LOCK(&module->m_lock);
ret = OMPI_SUCCESS;
}
} else {
/* enqueue sendreq */
ret = enqueue_sendreq(module, sendreq);
}
return ret;
}
int
ompi_osc_rdma_module_put(void *origin_addr, int origin_count,
struct ompi_datatype_t *origin_dt,
int target, int target_disp, int target_count,
struct ompi_datatype_t *target_dt, ompi_win_t *win)
{
int ret;
ompi_osc_rdma_sendreq_t *sendreq;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
if ((OMPI_WIN_STARTED & ompi_win_get_mode(win)) &&
(!module->m_sc_remote_active_ranks[target])) {
return MPI_ERR_RMA_SYNC;
}
if (OMPI_WIN_FENCE & ompi_win_get_mode(win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
/* shortcut 0 count case */
if (0 == origin_count || 0 == target_count) {
return OMPI_SUCCESS;
}
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_PUT,
origin_addr,
origin_count,
origin_dt,
target,
target_disp,
target_count,
target_dt,
module,
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
/* if we're doing fence synchronization, try to actively send
right now */
if (module->m_eager_send &&
(OMPI_WIN_FENCE & ompi_win_get_mode(win))) {
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out += 1;
module->m_num_pending_sendreqs[sendreq->req_target_rank] += 1;
OPAL_THREAD_UNLOCK(&(module->m_lock));
ret = ompi_osc_rdma_sendreq_send(module, sendreq);
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out -= 1;
opal_list_append(&(module->m_pending_sendreqs),
(opal_list_item_t*) sendreq);
OPAL_THREAD_LOCK(&module->m_lock);
ret = OMPI_SUCCESS;
}
} else {
/* enqueue sendreq */
ret = enqueue_sendreq(module, sendreq);
}
return ret;
}