1
1

Merge pull request #1339 from hjelmn/osc_pt2pt_fixes

osc/pt2pt: various threading fixes
Этот коммит содержится в:
Nathan Hjelm 2016-02-02 16:47:09 -07:00
родитель d812695201 519fffb65e
Коммит 615b27ca82
9 изменённых файлов: 127 добавлений и 190 удалений

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
@ -149,20 +149,20 @@ struct ompi_osc_pt2pt_module_t {
uint32_t *epoch_outgoing_frag_count;
/** cyclic counter for a unique tage for long messages. */
unsigned int tag_counter;
unsigned int rtag_counter;
uint32_t tag_counter;
uint32_t rtag_counter;
/* Number of outgoing fragments that have completed since the
begining of time */
uint32_t outgoing_frag_count;
volatile uint32_t outgoing_frag_count;
/* Next outgoing fragment count at which we want a signal on cond */
uint32_t outgoing_frag_signal_count;
volatile uint32_t outgoing_frag_signal_count;
/* Number of incoming fragments that have completed since the
begining of time */
uint32_t active_incoming_frag_count;
volatile uint32_t active_incoming_frag_count;
/* Next incoming buffer count at which we want a signal on cond */
uint32_t active_incoming_frag_signal_count;
volatile uint32_t active_incoming_frag_signal_count;
/** Number of targets locked/being locked */
unsigned int passive_target_access_epoch;
@ -409,14 +409,6 @@ int ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module,
int tag,
struct ompi_communicator_t *comm);
int ompi_osc_pt2pt_component_isend(ompi_osc_pt2pt_module_t *module,
const void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int dest,
int tag,
struct ompi_communicator_t *comm);
/**
* ompi_osc_pt2pt_progress_pending_acc:
*
@ -639,8 +631,8 @@ static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending)
opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super));
}
#define OSC_PT2PT_FRAG_TAG 0x10000
#define OSC_PT2PT_FRAG_MASK 0x0ffff
#define OSC_PT2PT_FRAG_TAG 0x80000
#define OSC_PT2PT_FRAG_MASK 0x7ffff
/**
* get_tag:
@ -658,11 +650,8 @@ static inline int get_tag(ompi_osc_pt2pt_module_t *module)
/* the LSB of the tag is used be the receiver to determine if the
message is a passive or active target (ie, where to mark
completion). */
int tmp = module->tag_counter + !!(module->passive_target_access_epoch);
module->tag_counter = (module->tag_counter + 4) & OSC_PT2PT_FRAG_MASK;
return tmp;
int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->tag_counter, 4);
return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch);
}
static inline int get_rtag(ompi_osc_pt2pt_module_t *module)
@ -670,11 +659,8 @@ static inline int get_rtag(ompi_osc_pt2pt_module_t *module)
/* the LSB of the tag is used be the receiver to determine if the
message is a passive or active target (ie, where to mark
completion). */
int tmp = module->rtag_counter + !!(module->passive_target_access_epoch);
module->rtag_counter = (module->rtag_counter + 4) & OSC_PT2PT_FRAG_MASK;
return tmp;
int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->rtag_counter, 4);
return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch);
}
/**
* ompi_osc_pt2pt_accumulate_lock:

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2010 IBM Corporation. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
@ -211,7 +211,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
OPAL_THREAD_LOCK(&sync->lock);
OPAL_THREAD_LOCK(&module->lock);
/* check if we are already in an access epoch */
if (ompi_osc_pt2pt_access_epoch_active (module)) {

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
@ -34,20 +34,15 @@
#include <stdio.h>
/* progress an OSC request */
static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request)
{
ompi_osc_pt2pt_request_t *pt2pt_request = (ompi_osc_pt2pt_request_t *) request->req_complete_cb_data;
ompi_osc_pt2pt_module_t *module = pt2pt_request->module;
ompi_osc_pt2pt_module_t *module =
(ompi_osc_pt2pt_module_t*) request->req_complete_cb_data;
OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_req_comm_complete called tag = %d",
request->req_status.MPI_TAG));
"isend_completion_cb called"));
mark_outgoing_completion (module);
if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) {
ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR);
}
mark_outgoing_completion(module);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
@ -55,6 +50,39 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
return OMPI_SUCCESS;
}
static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
{
ompi_osc_pt2pt_request_t *pt2pt_request = (ompi_osc_pt2pt_request_t *) request->req_complete_cb_data;
OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_req_comm_complete called tag = %d",
request->req_status.MPI_TAG));
if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) {
ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR);
}
return ompi_osc_pt2pt_comm_complete (request);
}
static inline int ompi_osc_pt2pt_data_isend (ompi_osc_pt2pt_module_t *module, const void *buf,
size_t count, ompi_datatype_t *datatype, int dest,
int tag, ompi_osc_pt2pt_request_t *request)
{
/* increment the outgoing send count */
ompi_osc_signal_outgoing (module, dest, 1);
if (NULL != request) {
++request->outstanding_requests;
return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm,
ompi_osc_pt2pt_req_comm_complete, request);
}
return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm,
ompi_osc_pt2pt_comm_complete, module);
}
static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request)
{
ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data;
@ -282,14 +310,14 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
payload_len = origin_dt->super.size * origin_count;
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
/* allocate space for the header plus space to store ddt_len */
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -301,9 +329,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
tag = get_tag(module);
}
/* flush will be called at the end of this function. make sure all post messages have
* arrived. */
if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) {
if (is_long_msg) {
/* wait for eager sends to be active before starting a long put */
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
@ -361,18 +388,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
header->tag = tag;
osc_pt2pt_hton(header, proc);
/* increase the outgoing signal count */
ompi_osc_signal_outgoing (module, target, 1);
if (request) {
request->outstanding_requests = 1;
ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_dt,
target, tag, module->comm, ompi_osc_pt2pt_req_comm_complete,
request);
} else {
ret = ompi_osc_pt2pt_component_isend (module,origin_addr, origin_count, origin_dt, target, tag,
module->comm);
}
ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt, target, tag,
request);
}
} while (0);
@ -380,14 +397,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
}
ret = ompi_osc_pt2pt_frag_finish(module, frag);
if (request || is_long_msg) {
/* need to flush now in case the caller decides to wait on the request */
ompi_osc_pt2pt_frag_flush_target (module, target);
}
return ret;
return ompi_osc_pt2pt_frag_finish(module, frag);
}
int
@ -459,14 +469,14 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
payload_len = origin_dt->super.size * origin_count;
frag_len = sizeof(*header) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
if (OMPI_SUCCESS != ret) {
frag_len = sizeof(*header) + ddt_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
if (OMPI_SUCCESS != ret) {
/* allocate space for the header plus space to store ddt_len */
frag_len = sizeof(*header) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -478,9 +488,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
tag = get_rtag (module);
}
/* flush will be called at the end of this function. make sure all post messages have
* arrived. */
if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) {
if (is_long_msg || is_long_datatype) {
/* wait for synchronization before posting a long message */
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
@ -538,18 +547,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"acc: starting long accumulate with tag %d", tag));
/* increment the outgoing send count */
ompi_osc_signal_outgoing (module, target, 1);
if (request) {
request->outstanding_requests = 1;
ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_dt,
target, tag, module->comm, ompi_osc_pt2pt_req_comm_complete,
request);
} else {
ret = ompi_osc_pt2pt_component_isend (module, origin_addr, origin_count, origin_dt, target, tag,
module->comm);
}
ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt, target, tag,
request);
}
} while (0);
@ -561,14 +560,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
}
ret = ompi_osc_pt2pt_frag_finish(module, frag);
if (is_long_msg || request) {
/* need to flush now in case the caller decides to wait on the request */
ompi_osc_pt2pt_frag_flush_target (module, target);
}
return ret;
return ompi_osc_pt2pt_frag_finish(module, frag);
}
int
@ -639,7 +631,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
}
frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
if (OMPI_SUCCESS != ret) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -787,11 +779,11 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
ddt_len = ompi_datatype_pack_description_length(target_dt);
frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
if (OMPI_SUCCESS != ret) {
/* allocate space for the header plus space to store ddt_len */
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -804,9 +796,8 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
/* for bookkeeping the get is "outgoing" */
ompi_osc_signal_outgoing (module, target, 1);
/* flush will be called at the end of this function. make sure all post messages have
* arrived. */
if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) {
if (!release_req) {
/* wait for epoch to begin before starting rget operation */
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
@ -857,14 +848,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
*request = &pt2pt_request->super;
}
ret = ompi_osc_pt2pt_frag_finish(module, frag);
if (!release_req) {
/* need to flush now in case the caller decides to wait on the request */
ompi_osc_pt2pt_frag_flush_target (module, target);
}
return ret;
return ompi_osc_pt2pt_frag_finish(module, frag);
}
int ompi_osc_pt2pt_rget (void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt,
@ -1003,14 +987,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
}
frag_len = sizeof(*header) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false);
if (OMPI_SUCCESS != ret) {
frag_len = sizeof(*header) + ddt_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true);
if (OMPI_SUCCESS != ret) {
/* allocate space for the header plus space to store ddt_len */
frag_len = sizeof(*header) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -1030,9 +1014,8 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
/* increment the number of outgoing fragments */
ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests);
/* flush will be called at the end of this function. make sure all post messages have
* arrived. */
if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == pt2pt_sync->type) {
if (!release_req) {
/* wait for epoch to begin before starting operation */
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
@ -1100,14 +1083,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
*request = (ompi_request_t *) pt2pt_request;
}
ret = ompi_osc_pt2pt_frag_finish(module, frag);
if (!release_req) {
/* need to flush now in case the caller decides to wait on the request */
ompi_osc_pt2pt_frag_flush_target (module, target_rank);
}
return ret;
return ompi_osc_pt2pt_frag_finish(module, frag);
}
int ompi_osc_pt2pt_get_accumulate(const void *origin_addr, int origin_count,

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
@ -213,7 +213,7 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
char *ptr;
int ret;
ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr);
ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
memcpy (ptr, data, len);
@ -1683,33 +1683,6 @@ int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf,
osc_pt2pt_incoming_req_complete, module);
}
static int
isend_completion_cb(ompi_request_t *request)
{
ompi_osc_pt2pt_module_t *module =
(ompi_osc_pt2pt_module_t*) request->req_complete_cb_data;
OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
"isend_completion_cb called"));
mark_outgoing_completion(module);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, const void *buf,
size_t count, struct ompi_datatype_t *datatype,
int dest, int tag, struct ompi_communicator_t *comm)
{
return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, comm,
isend_completion_cb, module);
}
int ompi_osc_pt2pt_isend_w_cb (const void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx)
{

Просмотреть файл

@ -1,7 +1,7 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
@ -33,7 +33,8 @@ struct ompi_osc_pt2pt_frag_t {
char *top;
/* Number of operations which have started writing into the frag, but not yet completed doing so */
int32_t pending;
volatile int32_t pending;
int32_t pending_long_sends;
ompi_osc_pt2pt_frag_header_t *header;
ompi_osc_pt2pt_module_t *module;
};
@ -44,12 +45,24 @@ extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_p
extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_frag_t* buffer)
{
opal_atomic_wmb ();
if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) {
opal_atomic_mb ();
return ompi_osc_pt2pt_frag_start(module, buffer);
}
return OMPI_SUCCESS;
}
/*
* Note: module lock must be held during this operation
*/
static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
char **ptr)
char **ptr, bool long_send)
{
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
ompi_osc_pt2pt_frag_t *curr;
@ -66,29 +79,21 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
OPAL_THREAD_LOCK(&module->lock);
curr = peer->active_frag;
if (NULL == curr || curr->remain_len < request_len) {
opal_free_list_item_t *item = NULL;
if (NULL != curr) {
curr->remain_len = 0;
peer->active_frag = NULL;
opal_atomic_mb ();
if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) {
if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) {
/* If there's something pending, the pending finish will
start the buffer. Otherwise, we need to start it now. */
if (0 == OPAL_THREAD_ADD32(&curr->pending, -1)) {
ret = ompi_osc_pt2pt_frag_start(module, curr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
ret = ompi_osc_pt2pt_frag_finish (module, curr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
}
item = opal_free_list_get (&mca_osc_pt2pt_component.frags);
if (OPAL_UNLIKELY(NULL == item)) {
curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags);
if (OPAL_UNLIKELY(NULL == curr)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
curr = peer->active_frag = (ompi_osc_pt2pt_frag_t*) item;
curr->target = target;
@ -96,7 +101,8 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
curr->top = (char*) (curr->header + 1);
curr->remain_len = mca_osc_pt2pt_component.buffer_size;
curr->module = module;
curr->pending = 1;
curr->pending = 2;
curr->pending_long_sends = long_send;
curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG;
curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
@ -104,12 +110,18 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
}
curr->header->source = ompi_comm_rank(module->comm);
curr->header->num_ops = 0;
curr->header->num_ops = 1;
if (curr->remain_len < request_len) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
peer->active_frag = curr;
} else {
OPAL_THREAD_ADD32(&curr->pending, 1);
OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
curr->pending_long_sends += long_send;
}
*ptr = curr->top;
@ -117,25 +129,9 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
curr->top += request_len;
curr->remain_len -= request_len;
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_ADD32(&curr->pending, 1);
OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
return OMPI_SUCCESS;
}
/*
* Note: module lock must be held for this operation
*/
static inline int ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_frag_t* buffer)
{
if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) {
return ompi_osc_pt2pt_frag_start(module, buffer);
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -8,7 +8,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2010 IBM Corporation. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
@ -244,6 +244,8 @@ static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module
}
}
} else {
lock->eager_send_active = true;
}
return OMPI_SUCCESS;

Просмотреть файл

@ -51,6 +51,7 @@ request_construct(ompi_osc_pt2pt_request_t *request)
request->super.req_status._cancelled = 0;
request->super.req_free = request_free;
request->super.req_cancel = request_cancel;
request->outstanding_requests = 0;
}
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_request_t,

Просмотреть файл

@ -1,7 +1,7 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
@ -57,6 +57,7 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_request_t);
#define OMPI_OSC_PT2PT_REQUEST_RETURN(req) \
do { \
OMPI_REQUEST_FINI(&(req)->super); \
(req)->outstanding_requests = 0; \
opal_free_list_return (&mca_osc_pt2pt_component.requests, \
(opal_free_list_item_t *) (req)); \
} while (0)

Просмотреть файл

@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2015-2016 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
@ -163,8 +163,10 @@ static inline void ompi_osc_pt2pt_sync_expected (ompi_osc_pt2pt_sync_t *sync)
{
int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1);
if (0 == new_value) {
OPAL_THREAD_LOCK(&sync->lock);
sync->eager_send_active = true;
opal_condition_broadcast (&sync->cond);
OPAL_THREAD_UNLOCK(&sync->lock);
}
}