1
1

- removed unused list type from ompi_list

- added probe/cancel stubs
- changed scheduler to allow unlimited first fragment size / use max fragment size

This commit was SVN r1211.
Этот коммит содержится в:
Tim Woodall 2004-06-09 19:45:08 +00:00
родитель 35ee48d78e
Коммит 69ff3b9ecc
19 изменённых файлов: 343 добавлений и 57 удалений

Просмотреть файл

@ -38,7 +38,6 @@ OBJ_CLASS_INSTANCE(
static void ompi_list_item_construct(ompi_list_item_t *item)
{
item->ompi_list_next = item->ompi_list_prev = NULL;
item->ompi_list_type = 0;
}
static void ompi_list_item_destruct(ompi_list_item_t *item)
@ -58,7 +57,6 @@ static void ompi_list_construct(ompi_list_t *list)
list->ompi_list_head.ompi_list_next = &list->ompi_list_tail;
list->ompi_list_tail.ompi_list_prev = &list->ompi_list_head;
list->ompi_list_tail.ompi_list_next = NULL;
list->ompi_list_type = 0;
list->ompi_list_length = 0;
}

Просмотреть файл

@ -16,7 +16,6 @@
extern ompi_class_t ompi_list_item_t_class;
extern ompi_class_t ompi_list_t_class;
typedef int ompi_list_type_t;
/*
@ -28,7 +27,6 @@ typedef int ompi_list_type_t;
typedef struct ompi_list_item
{
ompi_object_t super;
ompi_list_type_t ompi_list_type;
volatile struct ompi_list_item *ompi_list_next;
volatile struct ompi_list_item *ompi_list_prev;
} ompi_list_item_t;
@ -48,10 +46,9 @@ typedef struct ompi_list_item
typedef struct ompi_list
{
ompi_object_t super;
ompi_list_item_t ompi_list_head;
ompi_list_item_t ompi_list_tail;
ompi_list_type_t ompi_list_type;
ompi_object_t super;
ompi_list_item_t ompi_list_head;
ompi_list_item_t ompi_list_tail;
volatile size_t ompi_list_length;
} ompi_list_t;
@ -60,16 +57,6 @@ typedef struct ompi_list
* Inlined accessor functions
*/
static inline ompi_list_type_t ompi_list_get_type(ompi_list_t* list)
{
return list->ompi_list_type;
}
static inline void ompi_list_set_type(ompi_list_t* list, ompi_list_type_t type)
{
list->ompi_list_type = type;
}
static inline size_t ompi_list_get_size(ompi_list_t* list)
{
return list->ompi_list_length;

Просмотреть файл

@ -21,7 +21,9 @@ extern ompi_class_t mca_pml_base_request_t_class;
typedef enum {
MCA_PML_REQUEST_NULL,
MCA_PML_REQUEST_SEND,
MCA_PML_REQUEST_RECV
MCA_PML_REQUEST_RECV,
MCA_PML_REQUEST_IPROBE,
MCA_PML_REQUEST_PROBE
} mca_pml_base_request_type_t;

Просмотреть файл

@ -434,6 +434,66 @@ typedef int (*mca_pml_base_wait_all_fn_t)(
);
/**
* Probe to poll for pending recv.
*
* @param src (IN) Source rank w/in communicator.
* @param tag (IN) User defined tag.
* @param comm (IN) Communicator.
* @param matched (OUT) Flag indicating if matching recv exists.
* @param status (OUT) Completion statuses.
* @return OMPI_SUCCESS or failure status.
*
*/
typedef int (*mca_pml_base_iprobe_fn_t)(
int src,
int tag,
ompi_communicator_t* comm,
int *matched,
ompi_status_public_t *status
);
/**
* Blocking probe to wait for pending recv.
*
* @param src (IN) Source rank w/in communicator.
* @param tag (IN) User defined tag.
* @param comm (IN) Communicator.
* @param status (OUT) Completion statuses.
* @return OMPI_SUCCESS or failure status.
*
*/
typedef int (*mca_pml_base_probe_fn_t)(
int src,
int tag,
ompi_communicator_t* comm,
ompi_status_public_t *status
);
/**
* Cancel pending operation.
*
* @param request (IN) Request
* @return OMPI_SUCCESS or failure status.
*
*/
typedef int (*mca_pml_base_cancel_fn_t)(
ompi_request_t* request
);
/**
* Has a request been cancelled?
*
* @param request (IN) Request
* @return OMPI_SUCCESS or failure status.
*
*/
typedef int (*mca_pml_base_cancelled_fn_t)(
ompi_request_t* request,
int *flag
);
/**
* Release resources held by a persistent mode request.
*
@ -485,6 +545,10 @@ struct mca_pml_1_0_0_t {
mca_pml_base_test_all_fn_t pml_test_all;
mca_pml_base_wait_fn_t pml_wait;
mca_pml_base_wait_all_fn_t pml_wait_all;
mca_pml_base_iprobe_fn_t pml_iprobe;
mca_pml_base_probe_fn_t pml_probe;
mca_pml_base_cancel_fn_t pml_cancel;
mca_pml_base_cancelled_fn_t pml_cancelled;
mca_pml_base_free_fn_t pml_free;
mca_pml_base_null_fn_t pml_null;
};

Просмотреть файл

@ -14,7 +14,9 @@ noinst_LTLIBRARIES = libmca_pml_teg.la
libmca_pml_teg_la_SOURCES = \
pml_teg.c \
pml_teg.h \
pml_teg_cancel.c \
pml_teg_free.c \
pml_teg_iprobe.c \
pml_teg_irecv.c \
pml_teg_isend.c \
pml_teg_module.c \

Просмотреть файл

@ -37,6 +37,10 @@ mca_pml_teg_t mca_pml_teg = {
mca_pml_teg_test_all,
mca_pml_teg_wait,
mca_pml_teg_wait_all,
mca_pml_teg_iprobe,
mca_pml_teg_probe,
mca_pml_teg_cancel,
mca_pml_teg_cancelled,
mca_pml_teg_free,
mca_pml_teg_null
}

Просмотреть файл

@ -119,6 +119,31 @@ extern int mca_pml_teg_control(
extern int mca_pml_teg_progress(void);
extern int mca_pml_teg_iprobe(
int dst,
int tag,
struct ompi_communicator_t* comm,
int *matched,
ompi_status_public_t* status
);
extern int mca_pml_teg_probe(
int dst,
int tag,
struct ompi_communicator_t* comm,
ompi_status_public_t* status
);
extern int mca_pml_teg_cancel(
ompi_request_t* request
);
extern int mca_pml_teg_cancelled(
ompi_request_t* request,
int *flag
);
extern int mca_pml_teg_isend_init(
void *buf,
size_t count,

15
src/mca/pml/teg/src/pml_teg_cancel.c Обычный файл
Просмотреть файл

@ -0,0 +1,15 @@
#include "pml_teg.h"
int mca_pml_teg_cancel(ompi_request_t* request)
{
return OMPI_SUCCESS;
}
int mca_pml_teg_cancelled(ompi_request_t* request, int* flag)
{
if(NULL != flag)
*flag = 0;
return OMPI_SUCCESS;
}

87
src/mca/pml/teg/src/pml_teg_iprobe.c Обычный файл
Просмотреть файл

@ -0,0 +1,87 @@
#include "pml_teg_recvreq.h"
int mca_pml_teg_iprobe(
int src,
int tag,
struct ompi_communicator_t* comm,
int* matched,
ompi_status_public_t* status)
{
int rc;
mca_ptl_base_recv_request_t recvreq;
OBJ_CONSTRUCT(&recvreq, mca_ptl_base_recv_request_t);
recvreq.super.req_type = MCA_PML_REQUEST_IPROBE;
MCA_PTL_BASE_RECV_REQUEST_INIT(
&recvreq,
NULL,
0,
NULL,
src,
tag,
comm,
true);
if((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) {
OBJ_DESTRUCT(&recvreq);
return rc;
}
if((*matched = recvreq.super.req_mpi_done) == true && (NULL != status)) {
*status = recvreq.super.req_status;
}
OBJ_DESTRUCT(&recvreq);
return OMPI_SUCCESS;
}
int mca_pml_teg_probe(
int src,
int tag,
struct ompi_communicator_t* comm,
ompi_status_public_t* status)
{
int rc;
mca_ptl_base_recv_request_t recvreq;
OBJ_CONSTRUCT(&recvreq, mca_ptl_base_recv_request_t);
recvreq.super.req_type = MCA_PML_REQUEST_PROBE;
MCA_PTL_BASE_RECV_REQUEST_INIT(
&recvreq,
NULL,
0,
NULL,
src,
tag,
comm,
true);
if((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) {
OBJ_DESTRUCT(&recvreq);
return rc;
}
if(recvreq.super.req_mpi_done == false) {
/* give up and sleep until completion */
if(ompi_using_threads()) {
ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++;
while(recvreq.super.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
} else {
mca_pml_teg.teg_request_waiting++;
while(recvreq.super.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--;
}
}
if(NULL != status) {
*status = recvreq.super.req_status;
}
OBJ_DESTRUCT(&recvreq);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -48,11 +48,12 @@ void mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req)
*/
else {
bytes_to_frag = (ptl_proc->ptl_weight * bytes_remaining) / 100;
if(ptl->ptl_max_frag_size != 0 && bytes_to_frag > ptl->ptl_max_frag_size)
bytes_to_frag = ptl->ptl_max_frag_size;
}
rc = ptl->ptl_put(ptl, ptl_proc->ptl_peer, req, req->req_offset, &bytes_to_frag, 0);
rc = ptl->ptl_put(ptl, ptl_proc->ptl_peer, req, req->req_offset, bytes_to_frag, 0);
if(rc == OMPI_SUCCESS) {
req->req_offset += bytes_to_frag;
bytes_remaining = req->req_bytes_packed - req->req_offset;
}
}

Просмотреть файл

@ -46,7 +46,7 @@ static inline int mca_pml_teg_send_request_start(
int flags, rc;
/* start the first fragment */
if(first_fragment_size <= 0 || req->req_bytes_packed <= first_fragment_size) {
if(first_fragment_size == 0 || req->req_bytes_packed <= first_fragment_size) {
first_fragment_size = req->req_bytes_packed;
flags = (req->req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) ?
MCA_PTL_FLAGS_ACK_MATCHED : 0;

Просмотреть файл

@ -54,11 +54,22 @@ static inline bool mca_ptl_base_recv_frag_match(
request->super.req_peer = header->hdr_src;
request->super.req_tag = header->hdr_tag;
/* notify ptl of match */
ptl->ptl_matched(ptl, frag);
/*
* If probe - signal request is complete - but don't notify PTL
*/
if(request->super.req_type == MCA_PML_REQUEST_PROBE) {
/* process any additional fragments that arrived out of order */
frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags);
ptl->ptl_recv_progress(request, frag);
matched = mca_ptl_base_recv_frag_match(frag, header);
} else {
/* notify ptl of match */
ptl->ptl_matched(ptl, frag);
/* process any additional fragments that arrived out of order */
frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags);
};
};
return matched;
}

Просмотреть файл

@ -63,7 +63,8 @@ void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t* reque
/* We didn't find any matches. Record this irecv so we can match
* it when the message comes in.
*/
ompi_list_append(pml_comm->c_specific_receives+req_peer, (ompi_list_item_t*)request);
if(request->super.req_type != MCA_PML_REQUEST_IPROBE)
ompi_list_append(pml_comm->c_specific_receives+req_peer, (ompi_list_item_t*)request);
THREAD_UNLOCK(&pml_comm->c_matching_lock);
}
@ -111,7 +112,8 @@ void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t* request)
* it when the message comes in.
*/
ompi_list_append(&pml_comm->c_wild_receives, (ompi_list_item_t*)request);
if(request->super.req_type != MCA_PML_REQUEST_IPROBE)
ompi_list_append(&pml_comm->c_wild_receives, (ompi_list_item_t*)request);
THREAD_UNLOCK(&pml_comm->c_matching_lock);
}

Просмотреть файл

@ -47,21 +47,21 @@ typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t;
comm, \
persistent) \
{ \
request->req_bytes_packed = 0; \
request->req_bytes_received = 0; \
request->req_bytes_delivered = 0; \
request->super.req_sequence = 0; \
request->super.req_addr = addr; \
request->super.req_count = count; \
request->super.req_datatype = datatype; \
request->super.req_peer = src; \
request->super.req_tag = tag; \
request->super.req_comm = comm; \
request->super.req_proc = NULL; \
request->super.req_persistent = persistent; \
request->super.req_mpi_done = false; \
request->super.req_pml_done = false; \
request->super.req_free_called = false; \
(request)->req_bytes_packed = 0; \
(request)->req_bytes_received = 0; \
(request)->req_bytes_delivered = 0; \
(request)->super.req_sequence = 0; \
(request)->super.req_addr = addr; \
(request)->super.req_count = count; \
(request)->super.req_datatype = datatype; \
(request)->super.req_peer = src; \
(request)->super.req_tag = tag; \
(request)->super.req_comm = comm; \
(request)->super.req_proc = NULL; \
(request)->super.req_persistent = persistent; \
(request)->super.req_mpi_done = false; \
(request)->super.req_pml_done = false; \
(request)->super.req_free_called = false; \
}
/**

Просмотреть файл

@ -5,8 +5,10 @@
#include <stdio.h>
#include "mpi.h"
#include "runtime/runtime.h"
#include "mpi/c/bindings.h"
#include "mca/pml/pml.h"
#if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Cancel = PMPI_Cancel
#endif
@ -15,6 +17,23 @@
#include "mpi/c/profile/defines.h"
#endif
int MPI_Cancel(MPI_Request *request) {
return MPI_SUCCESS;
int MPI_Cancel(MPI_Request *request)
{
int rc;
if ( MPI_PARAM_CHECK ) {
rc = MPI_SUCCESS;
if ( OMPI_MPI_INVALID_STATE ) {
rc = MPI_ERR_INTERN;
} else if (request == NULL) {
rc = MPI_ERR_REQUEST;
}
OMPI_ERRHANDLER_CHECK(rc, (ompi_communicator_t*)NULL, rc, "MPI_Cancel");
}
if (NULL == *request) {
return MPI_SUCCESS;
}
rc = mca_pml.pml_cancel(*request);
OMPI_ERRHANDLER_RETURN(rc, (ompi_communicator_t*)NULL, rc, "MPI_Cancel");
}

Просмотреть файл

@ -5,7 +5,10 @@
#include "ompi_config.h"
#include "mpi.h"
#include "runtime/runtime.h"
#include "mpi/c/bindings.h"
#include "mca/pml/pml.h"
#if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Iprobe = PMPI_Iprobe
@ -15,7 +18,34 @@
#include "mpi/c/profile/defines.h"
#endif
int MPI_Iprobe(int source, int tag, MPI_Comm comm, int *flag,
MPI_Status *status) {
return MPI_SUCCESS;
int MPI_Iprobe(int source, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
{
int rc;
if (source == MPI_PROC_NULL) {
if (status) {
status->MPI_SOURCE = MPI_PROC_NULL;
status->MPI_TAG = MPI_ANY_TAG;
status->MPI_ERROR = MPI_SUCCESS;
status->_count = 0;
}
return MPI_SUCCESS;
}
if ( MPI_PARAM_CHECK ) {
rc = MPI_SUCCESS;
if ( OMPI_MPI_INVALID_STATE ) {
rc = MPI_ERR_INTERN;
} else if (tag < 0 || tag > MPI_TAG_UB_VALUE) {
rc = MPI_ERR_TAG;
} else if (ompi_comm_invalid(comm)) {
rc = MPI_ERR_COMM;
} else if (source != MPI_ANY_SOURCE && ompi_comm_peer_invalid(comm, source)) {
rc = MPI_ERR_RANK;
}
OMPI_ERRHANDLER_CHECK(rc, comm, rc, "MPI_Iprobe");
}
rc = mca_pml.pml_iprobe(source, tag, comm, flag, status);
OMPI_ERRHANDLER_RETURN(rc, comm, rc, "MPI_Iprobe");
}

Просмотреть файл

@ -5,7 +5,9 @@
#include <stdio.h>
#include "mpi.h"
#include "runtime/runtime.h"
#include "mpi/c/bindings.h"
#include "mca/pml/pml.h"
#if OMPI_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Probe = PMPI_Probe
@ -15,6 +17,34 @@
#include "mpi/c/profile/defines.h"
#endif
int MPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status *status) {
return MPI_SUCCESS;
int MPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status *status)
{
int rc;
if (source == MPI_PROC_NULL) {
if (status) {
status->MPI_SOURCE = MPI_PROC_NULL;
status->MPI_TAG = MPI_ANY_TAG;
status->MPI_ERROR = MPI_SUCCESS;
status->_count = 0;
}
return MPI_SUCCESS;
}
if ( MPI_PARAM_CHECK ) {
rc = MPI_SUCCESS;
if ( OMPI_MPI_INVALID_STATE ) {
rc = MPI_ERR_INTERN;
} else if (tag < 0 || tag > MPI_TAG_UB_VALUE) {
rc = MPI_ERR_TAG;
} else if (ompi_comm_invalid(comm)) {
rc = MPI_ERR_COMM;
} else if (source != MPI_ANY_SOURCE && ompi_comm_peer_invalid(comm, source)) {
rc = MPI_ERR_RANK;
}
OMPI_ERRHANDLER_CHECK(rc, comm, rc, "MPI_Probe");
}
rc = mca_pml.pml_probe(source, tag, comm, status);
OMPI_ERRHANDLER_RETURN(rc, comm, rc, "MPI_Probe");
}

Просмотреть файл

@ -12,12 +12,13 @@ ompi_class_t ompi_request_t_class = {
};
void ompi_request_construct(ompi_request_t* rq)
{
}
void ompi_request_destruct(ompi_request_t* rq)
void ompi_request_construct(ompi_request_t* req)
{
req->req_mode = OMPI_REQUEST_INVALID;
}
void ompi_request_destruct(ompi_request_t* req)
{
}

Просмотреть файл

@ -18,9 +18,17 @@ typedef enum {
OMPI_REQUEST_MAX
} ompi_request_type_t;
typedef enum {
OMPI_REQUEST_INVALID,
OMPI_REQUEST_INACTIVE,
OMPI_REQUEST_ACTIVE,
OMPI_REQUEST_CANCELLED
} ompi_request_mode_t;
struct ompi_request_t {
ompi_list_item_t super;
ompi_request_type_t req_type;
ompi_request_mode_t req_mode;
};
typedef struct ompi_request_t ompi_request_t;