Cleanup some code so it is clear that it is executing in an event. Ensure that peer event base is properly set on incoming connections
Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
родитель
4e06b96701
Коммит
184ccc8e91
@ -501,6 +501,9 @@ static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer)
|
|||||||
{
|
{
|
||||||
if (peer->sd >= 0) {
|
if (peer->sd >= 0) {
|
||||||
assert(!peer->send_ev_active && !peer->recv_ev_active);
|
assert(!peer->send_ev_active && !peer->recv_ev_active);
|
||||||
|
if (NULL == peer->ev_base) {
|
||||||
|
ORTE_OOB_TCP_NEXT_BASE(peer);
|
||||||
|
}
|
||||||
opal_event_set(peer->ev_base,
|
opal_event_set(peer->ev_base,
|
||||||
&peer->recv_event,
|
&peer->recv_event,
|
||||||
peer->sd,
|
peer->sd,
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
|
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
|
||||||
* reserved.
|
* reserved.
|
||||||
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
|
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
||||||
* Copyright (c) 2016 Research Organization for Information Science
|
* Copyright (c) 2016 Research Organization for Information Science
|
||||||
* and Technology (RIST). All rights reserved.
|
* and Technology (RIST). All rights reserved.
|
||||||
* $COPYRIGHT$
|
* $COPYRIGHT$
|
||||||
@ -259,7 +259,6 @@ OBJ_CLASS_DECLARATION(orte_self_send_xfer_t);
|
|||||||
/* common implementations */
|
/* common implementations */
|
||||||
ORTE_DECLSPEC void orte_rml_base_post_recv(int sd, short args, void *cbdata);
|
ORTE_DECLSPEC void orte_rml_base_post_recv(int sd, short args, void *cbdata);
|
||||||
ORTE_DECLSPEC void orte_rml_base_process_msg(int fd, short flags, void *cbdata);
|
ORTE_DECLSPEC void orte_rml_base_process_msg(int fd, short flags, void *cbdata);
|
||||||
ORTE_DECLSPEC void orte_rml_base_complete_recv_msg (orte_rml_recv_t **recv_msg);
|
|
||||||
|
|
||||||
|
|
||||||
/* Stub API interfaces to cycle through active plugins */
|
/* Stub API interfaces to cycle through active plugins */
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
* All rights reserved.
|
* All rights reserved.
|
||||||
* Copyright (c) 2007-2013 Los Alamos National Security, LLC. All rights
|
* Copyright (c) 2007-2013 Los Alamos National Security, LLC. All rights
|
||||||
* reserved.
|
* reserved.
|
||||||
* Copyright (c) 2015-2016 Intel, Inc. All rights reserved.
|
* Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
|
||||||
* $COPYRIGHT$
|
* $COPYRIGHT$
|
||||||
*
|
*
|
||||||
* Additional copyrights may follow
|
* Additional copyrights may follow
|
||||||
@ -117,12 +117,57 @@ void orte_rml_base_post_recv(int sd, short args, void *cbdata)
|
|||||||
OBJ_RELEASE(req);
|
OBJ_RELEASE(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
void orte_rml_base_complete_recv_msg (orte_rml_recv_t **recv_msg)
|
static void msg_match_recv(orte_rml_posted_recv_t *rcv, bool get_all)
|
||||||
{
|
{
|
||||||
|
opal_list_item_t *item, *next;
|
||||||
|
orte_rml_recv_t *msg;
|
||||||
|
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
|
||||||
|
|
||||||
|
/* scan thru the list of unmatched recvd messages and
|
||||||
|
* see if any matches this spec - if so, push the first
|
||||||
|
* into the recvd msg queue and look no further
|
||||||
|
*/
|
||||||
|
item = opal_list_get_first(&orte_rml_base.unmatched_msgs);
|
||||||
|
while (item != opal_list_get_end(&orte_rml_base.unmatched_msgs)) {
|
||||||
|
next = opal_list_get_next(item);
|
||||||
|
msg = (orte_rml_recv_t*)item;
|
||||||
|
opal_output_verbose(5, orte_rml_base_framework.framework_output,
|
||||||
|
"%s checking recv for %s against unmatched msg from %s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&rcv->peer),
|
||||||
|
ORTE_NAME_PRINT(&msg->sender));
|
||||||
|
|
||||||
|
/* since names could include wildcards, must use
|
||||||
|
* the more generalized comparison function
|
||||||
|
*/
|
||||||
|
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &msg->sender, &rcv->peer) &&
|
||||||
|
msg->tag == rcv->tag) {
|
||||||
|
ORTE_RML_ACTIVATE_MESSAGE(msg);
|
||||||
|
opal_list_remove_item(&orte_rml_base.unmatched_msgs, item);
|
||||||
|
if (!get_all) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
item = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void orte_rml_base_process_msg(int fd, short flags, void *cbdata)
|
||||||
|
{
|
||||||
|
orte_rml_recv_t *msg = (orte_rml_recv_t*)cbdata;
|
||||||
orte_rml_posted_recv_t *post;
|
orte_rml_posted_recv_t *post;
|
||||||
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
|
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
|
||||||
opal_buffer_t buf;
|
opal_buffer_t buf;
|
||||||
orte_rml_recv_t *msg = *recv_msg;
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
|
||||||
|
"%s message received from %s for tag %d",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&msg->sender),
|
||||||
|
msg->tag));
|
||||||
|
|
||||||
|
OPAL_TIMING_EVENT((&tm_rml,"from %s %d bytes",
|
||||||
|
ORTE_NAME_PRINT(&msg->sender), msg->iov.iov_len));
|
||||||
|
|
||||||
/* see if we have a waiting recv for this message */
|
/* see if we have a waiting recv for this message */
|
||||||
OPAL_LIST_FOREACH(post, &orte_rml_base.posted_recvs, orte_rml_posted_recv_t) {
|
OPAL_LIST_FOREACH(post, &orte_rml_base.posted_recvs, orte_rml_posted_recv_t) {
|
||||||
/* since names could include wildcards, must use
|
/* since names could include wildcards, must use
|
||||||
@ -184,52 +229,3 @@ void orte_rml_base_complete_recv_msg (orte_rml_recv_t **recv_msg)
|
|||||||
msg->tag));
|
msg->tag));
|
||||||
opal_list_append(&orte_rml_base.unmatched_msgs, &msg->super);
|
opal_list_append(&orte_rml_base.unmatched_msgs, &msg->super);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void msg_match_recv(orte_rml_posted_recv_t *rcv, bool get_all)
|
|
||||||
{
|
|
||||||
opal_list_item_t *item, *next;
|
|
||||||
orte_rml_recv_t *msg;
|
|
||||||
orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD;
|
|
||||||
|
|
||||||
/* scan thru the list of unmatched recvd messages and
|
|
||||||
* see if any matches this spec - if so, push the first
|
|
||||||
* into the recvd msg queue and look no further
|
|
||||||
*/
|
|
||||||
item = opal_list_get_first(&orte_rml_base.unmatched_msgs);
|
|
||||||
while (item != opal_list_get_end(&orte_rml_base.unmatched_msgs)) {
|
|
||||||
next = opal_list_get_next(item);
|
|
||||||
msg = (orte_rml_recv_t*)item;
|
|
||||||
opal_output_verbose(5, orte_rml_base_framework.framework_output,
|
|
||||||
"%s checking recv for %s against unmatched msg from %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&rcv->peer),
|
|
||||||
ORTE_NAME_PRINT(&msg->sender));
|
|
||||||
|
|
||||||
/* since names could include wildcards, must use
|
|
||||||
* the more generalized comparison function
|
|
||||||
*/
|
|
||||||
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &msg->sender, &rcv->peer) &&
|
|
||||||
msg->tag == rcv->tag) {
|
|
||||||
ORTE_RML_ACTIVATE_MESSAGE(msg);
|
|
||||||
opal_list_remove_item(&orte_rml_base.unmatched_msgs, item);
|
|
||||||
if (!get_all) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
item = next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void orte_rml_base_process_msg(int fd, short flags, void *cbdata)
|
|
||||||
{
|
|
||||||
orte_rml_recv_t *msg = (orte_rml_recv_t*)cbdata;
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output,
|
|
||||||
"%s message received from %s for tag %d",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&msg->sender),
|
|
||||||
msg->tag));
|
|
||||||
|
|
||||||
OPAL_TIMING_EVENT((&tm_rml,"from %s %d bytes",
|
|
||||||
ORTE_NAME_PRINT(&msg->sender), msg->iov.iov_len));
|
|
||||||
orte_rml_base_complete_recv_msg(&msg);
|
|
||||||
}
|
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user