1
1

PML/UCX: blocked calls optimizations

- refactoring of opal/UCX progress calls
- added UCX progress priority

Signed-off-by: Sergey Oblomov <sergeyo@mellanox.com>
(cherry picked from commit b0f87f22358914ae9f8fc382daa4052b31ed2aeb)
Этот коммит содержится в:
Sergey Oblomov 2018-08-21 13:22:18 +03:00
родитель ea4d30b16f
Коммит 9215eb9a3b
2 изменённых файлов: 61 добавлений и 60 удалений

Просмотреть файл

@ -538,13 +538,12 @@ int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src
mca_pml_ucx_get_datatype(datatype), mca_pml_ucx_get_datatype(datatype),
ucp_tag, ucp_tag_mask, req); ucp_tag, ucp_tag_mask, req);
for (;;) { MCA_COMMON_UCX_PROGRESS_LOOP(ompi_pml_ucx.ucp_worker) {
status = ucp_request_test(req, &info); status = ucp_request_test(req, &info);
if (status != UCS_INPROGRESS) { if (status != UCS_INPROGRESS) {
mca_pml_ucx_set_recv_status_safe(mpi_status, status, &info); mca_pml_ucx_set_recv_status_safe(mpi_status, status, &info);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
opal_progress();
} }
} }
@ -731,16 +730,12 @@ mca_pml_ucx_send_nb(ucp_ep_h ep, const void *buf, size_t count,
req = (ompi_request_t*)mca_pml_ucx_common_send(ep, buf, count, datatype, req = (ompi_request_t*)mca_pml_ucx_common_send(ep, buf, count, datatype,
mca_pml_ucx_get_datatype(datatype), mca_pml_ucx_get_datatype(datatype),
tag, mode, tag, mode, cb);
mca_pml_ucx_send_completion);
if (OPAL_LIKELY(req == NULL)) { if (OPAL_LIKELY(req == NULL)) {
return OMPI_SUCCESS; return OMPI_SUCCESS;
} else if (!UCS_PTR_IS_ERR(req)) { } else if (!UCS_PTR_IS_ERR(req)) {
PML_UCX_VERBOSE(8, "got request %p", (void*)req); PML_UCX_VERBOSE(8, "got request %p", (void*)req);
ucp_worker_progress(ompi_pml_ucx.ucp_worker); MCA_COMMON_UCX_WAIT_LOOP(req, ompi_pml_ucx.ucp_worker, "ucx send", ompi_request_free(&req));
ompi_request_wait(&req, MPI_STATUS_IGNORE);
return OMPI_SUCCESS;
} else { } else {
PML_UCX_ERROR("ucx send failed: %s", ucs_status_string(UCS_PTR_STATUS(req))); PML_UCX_ERROR("ucx send failed: %s", ucs_status_string(UCS_PTR_STATUS(req)));
return OMPI_ERROR; return OMPI_ERROR;
@ -753,7 +748,7 @@ mca_pml_ucx_send_nbr(ucp_ep_h ep, const void *buf, size_t count,
ucp_datatype_t ucx_datatype, ucp_tag_t tag) ucp_datatype_t ucx_datatype, ucp_tag_t tag)
{ {
void *req; ucs_status_ptr_t req;
ucs_status_t status; ucs_status_t status;
/* coverity[bad_alloc_arithmetic] */ /* coverity[bad_alloc_arithmetic] */
@ -763,12 +758,7 @@ mca_pml_ucx_send_nbr(ucp_ep_h ep, const void *buf, size_t count,
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
ucp_worker_progress(ompi_pml_ucx.ucp_worker); MCA_COMMON_UCX_WAIT_LOOP(req, ompi_pml_ucx.ucp_worker, "ucx send", (void)0);
while ((status = ucp_request_check_status(req)) == UCS_INPROGRESS) {
opal_progress();
}
return OPAL_LIKELY(UCS_OK == status) ? OMPI_SUCCESS : OMPI_ERROR;
} }
#endif #endif
@ -804,6 +794,8 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i
int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm, int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, ompi_status_public_t* mpi_status) int *matched, ompi_status_public_t* mpi_status)
{ {
static unsigned progress_count = 0;
ucp_tag_t ucp_tag, ucp_tag_mask; ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info; ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg; ucp_tag_message_h ucp_msg;
@ -816,8 +808,9 @@ int mca_pml_ucx_iprobe(int src, int tag, struct ompi_communicator_t* comm,
if (ucp_msg != NULL) { if (ucp_msg != NULL) {
*matched = 1; *matched = 1;
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
} else { } else {
opal_progress(); (++progress_count % opal_common_ucx.progress_iterations) ?
(void)ucp_worker_progress(ompi_pml_ucx.ucp_worker) : opal_progress();
*matched = 0; *matched = 0;
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -833,22 +826,23 @@ int mca_pml_ucx_probe(int src, int tag, struct ompi_communicator_t* comm,
PML_UCX_TRACE_PROBE("probe", src, tag, comm); PML_UCX_TRACE_PROBE("probe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
for (;;) {
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask, MCA_COMMON_UCX_PROGRESS_LOOP(ompi_pml_ucx.ucp_worker) {
0, &info); ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag,
ucp_tag_mask, 0, &info);
if (ucp_msg != NULL) { if (ucp_msg != NULL) {
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
opal_progress();
} }
} }
int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm, int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm,
int *matched, struct ompi_message_t **message, int *matched, struct ompi_message_t **message,
ompi_status_public_t* mpi_status) ompi_status_public_t* mpi_status)
{ {
static unsigned progress_count = 0;
ucp_tag_t ucp_tag, ucp_tag_mask; ucp_tag_t ucp_tag, ucp_tag_mask;
ucp_tag_recv_info_t info; ucp_tag_recv_info_t info;
ucp_tag_message_h ucp_msg; ucp_tag_message_h ucp_msg;
@ -864,7 +858,8 @@ int mca_pml_ucx_improbe(int src, int tag, struct ompi_communicator_t* comm,
*matched = 1; *matched = 1;
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
} else { } else {
opal_progress(); (++progress_count % opal_common_ucx.progress_iterations) ?
(void)ucp_worker_progress(ompi_pml_ucx.ucp_worker) : opal_progress();
*matched = 0; *matched = 0;
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
@ -881,7 +876,7 @@ int mca_pml_ucx_mprobe(int src, int tag, struct ompi_communicator_t* comm,
PML_UCX_TRACE_PROBE("mprobe", src, tag, comm); PML_UCX_TRACE_PROBE("mprobe", src, tag, comm);
PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm); PML_UCX_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask, tag, src, comm);
for (;;) { MCA_COMMON_UCX_PROGRESS_LOOP(ompi_pml_ucx.ucp_worker) {
ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask, ucp_msg = ucp_tag_probe_nb(ompi_pml_ucx.ucp_worker, ucp_tag, ucp_tag_mask,
1, &info); 1, &info);
if (ucp_msg != NULL) { if (ucp_msg != NULL) {
@ -890,8 +885,6 @@ int mca_pml_ucx_mprobe(int src, int tag, struct ompi_communicator_t* comm,
mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info); mca_pml_ucx_set_recv_status_safe(mpi_status, UCS_OK, &info);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
opal_progress();
} }
} }

Просмотреть файл

@ -52,6 +52,33 @@ BEGIN_C_DECLS
__VA_ARGS__); \ __VA_ARGS__); \
} }
/* progress loop to allow call UCX/opal progress */
/* used C99 for-statement variable initialization */
#define MCA_COMMON_UCX_PROGRESS_LOOP(_worker) \
for (unsigned iter = 0;; (++iter % opal_common_ucx.progress_iterations) ? \
(void)ucp_worker_progress(_worker) : opal_progress())
#define MCA_COMMON_UCX_WAIT_LOOP(_request, _worker, _msg, _completed) \
do { \
ucs_status_t status; \
/* call UCX progress */ \
MCA_COMMON_UCX_PROGRESS_LOOP(_worker) { \
status = opal_common_ucx_request_status(_request); \
if (UCS_INPROGRESS != status) { \
_completed; \
if (OPAL_LIKELY(UCS_OK == status)) { \
return OPAL_SUCCESS; \
} else { \
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", \
(_msg) ? (_msg) : __FUNCTION__, \
UCS_PTR_STATUS(_request), \
ucs_status_string(UCS_PTR_STATUS(_request))); \
return OPAL_ERROR; \
} \
} \
} \
} while (0)
typedef struct opal_common_ucx_module { typedef struct opal_common_ucx_module {
int output; int output;
int verbose; int verbose;
@ -67,16 +94,22 @@ OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status); OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker); OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
static inline
ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)
{
#if !HAVE_DECL_UCP_REQUEST_CHECK_STATUS
ucp_tag_recv_info_t info;
return ucp_request_test(request, &info);
#else
return ucp_request_check_status(request);
#endif
}
static inline static inline
int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker, int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,
const char *msg) const char *msg)
{ {
ucs_status_t status;
int i;
#if !HAVE_DECL_UCP_REQUEST_CHECK_STATUS
ucp_tag_recv_info_t info;
#endif
/* check for request completed or failed */ /* check for request completed or failed */
if (OPAL_LIKELY(UCS_OK == request)) { if (OPAL_LIKELY(UCS_OK == request)) {
return OPAL_SUCCESS; return OPAL_SUCCESS;
@ -87,32 +120,7 @@ int opal_common_ucx_wait_request(ucs_status_ptr_t request, ucp_worker_h worker,
return OPAL_ERROR; return OPAL_ERROR;
} }
while (1) { MCA_COMMON_UCX_WAIT_LOOP(request, worker, msg, ucp_request_free(request));
/* call UCX progress */
for (i = 0; i < opal_common_ucx.progress_iterations; i++) {
if (UCS_INPROGRESS != (status =
#if HAVE_DECL_UCP_REQUEST_CHECK_STATUS
ucp_request_check_status(request)
#else
ucp_request_test(request, &info)
#endif
)) {
ucp_request_free(request);
if (OPAL_LIKELY(UCS_OK == status)) {
return OPAL_SUCCESS;
} else {
MCA_COMMON_UCX_VERBOSE(1, "%s failed: %d, %s", msg ? msg : __FUNCTION__,
UCS_PTR_STATUS(request),
ucs_status_string(UCS_PTR_STATUS(request)));
return OPAL_ERROR;
}
}
ucp_worker_progress(worker);
}
/* call OPAL progress on every opal_common_ucx_progress_iterations
* calls to UCX progress */
opal_progress();
}
} }
static inline static inline