From 184ccc8e91c78feba09d515ffc1b0a75ad2ff3be Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 25 Jan 2017 06:55:11 -0800 Subject: [PATCH] Cleanup some code so it is clear that it is executing in an event. Ensure that peer event base is properly set on incoming connections Signed-off-by: Ralph Castain --- orte/mca/oob/tcp/oob_tcp_connection.c | 3 + orte/mca/rml/base/base.h | 3 +- orte/mca/rml/base/rml_base_msg_handlers.c | 100 +++++++++++----------- 3 files changed, 52 insertions(+), 54 deletions(-) diff --git a/orte/mca/oob/tcp/oob_tcp_connection.c b/orte/mca/oob/tcp/oob_tcp_connection.c index aa95968689..505badecd7 100644 --- a/orte/mca/oob/tcp/oob_tcp_connection.c +++ b/orte/mca/oob/tcp/oob_tcp_connection.c @@ -501,6 +501,9 @@ static void tcp_peer_event_init(mca_oob_tcp_peer_t* peer) { if (peer->sd >= 0) { assert(!peer->send_ev_active && !peer->recv_ev_active); + if (NULL == peer->ev_base) { + ORTE_OOB_TCP_NEXT_BASE(peer); + } opal_event_set(peer->ev_base, &peer->recv_event, peer->sd, diff --git a/orte/mca/rml/base/base.h b/orte/mca/rml/base/base.h index ef4795cacb..b96e9fb441 100644 --- a/orte/mca/rml/base/base.h +++ b/orte/mca/rml/base/base.h @@ -12,7 +12,7 @@ * All rights reserved. * Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -259,7 +259,6 @@ OBJ_CLASS_DECLARATION(orte_self_send_xfer_t); /* common implementations */ ORTE_DECLSPEC void orte_rml_base_post_recv(int sd, short args, void *cbdata); ORTE_DECLSPEC void orte_rml_base_process_msg(int fd, short flags, void *cbdata); -ORTE_DECLSPEC void orte_rml_base_complete_recv_msg (orte_rml_recv_t **recv_msg); /* Stub API interfaces to cycle through active plugins */ diff --git a/orte/mca/rml/base/rml_base_msg_handlers.c b/orte/mca/rml/base/rml_base_msg_handlers.c index 62b52ad6cd..897fc10712 100644 --- a/orte/mca/rml/base/rml_base_msg_handlers.c +++ b/orte/mca/rml/base/rml_base_msg_handlers.c @@ -12,7 +12,7 @@ * All rights reserved. * Copyright (c) 2007-2013 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2015-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -117,12 +117,57 @@ void orte_rml_base_post_recv(int sd, short args, void *cbdata) OBJ_RELEASE(req); } -void orte_rml_base_complete_recv_msg (orte_rml_recv_t **recv_msg) +static void msg_match_recv(orte_rml_posted_recv_t *rcv, bool get_all) { + opal_list_item_t *item, *next; + orte_rml_recv_t *msg; + orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD; + + /* scan thru the list of unmatched recvd messages and + * see if any matches this spec - if so, push the first + * into the recvd msg queue and look no further + */ + item = opal_list_get_first(&orte_rml_base.unmatched_msgs); + while (item != opal_list_get_end(&orte_rml_base.unmatched_msgs)) { + next = opal_list_get_next(item); + msg = (orte_rml_recv_t*)item; + opal_output_verbose(5, orte_rml_base_framework.framework_output, + "%s checking recv for %s against unmatched msg from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&rcv->peer), + ORTE_NAME_PRINT(&msg->sender)); + + /* since names could include wildcards, must use + * the more generalized comparison function + */ + if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &msg->sender, &rcv->peer) && + msg->tag == rcv->tag) { + ORTE_RML_ACTIVATE_MESSAGE(msg); + opal_list_remove_item(&orte_rml_base.unmatched_msgs, item); + if (!get_all) { + break; + } + } + item = next; + } +} + +void orte_rml_base_process_msg(int fd, short flags, void *cbdata) +{ + orte_rml_recv_t *msg = (orte_rml_recv_t*)cbdata; orte_rml_posted_recv_t *post; orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD; opal_buffer_t buf; - orte_rml_recv_t *msg = *recv_msg; + + OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output, + "%s message received from %s for tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&msg->sender), + msg->tag)); + + OPAL_TIMING_EVENT((&tm_rml,"from %s %d bytes", + ORTE_NAME_PRINT(&msg->sender), msg->iov.iov_len)); + /* see if we have a waiting recv for this message */ OPAL_LIST_FOREACH(post, &orte_rml_base.posted_recvs, orte_rml_posted_recv_t) { /* since names could include wildcards, must use @@ -184,52 +229,3 @@ void orte_rml_base_complete_recv_msg (orte_rml_recv_t **recv_msg) msg->tag)); opal_list_append(&orte_rml_base.unmatched_msgs, &msg->super); } - -static void msg_match_recv(orte_rml_posted_recv_t *rcv, bool get_all) -{ - opal_list_item_t *item, *next; - orte_rml_recv_t *msg; - orte_ns_cmp_bitmask_t mask = ORTE_NS_CMP_ALL | ORTE_NS_CMP_WILD; - - /* scan thru the list of unmatched recvd messages and - * see if any matches this spec - if so, push the first - * into the recvd msg queue and look no further - */ - item = opal_list_get_first(&orte_rml_base.unmatched_msgs); - while (item != opal_list_get_end(&orte_rml_base.unmatched_msgs)) { - next = opal_list_get_next(item); - msg = (orte_rml_recv_t*)item; - opal_output_verbose(5, orte_rml_base_framework.framework_output, - "%s checking recv for %s against unmatched msg from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&rcv->peer), - ORTE_NAME_PRINT(&msg->sender)); - - /* since names could include wildcards, must use - * the more generalized comparison function - */ - if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &msg->sender, &rcv->peer) && - msg->tag == rcv->tag) { - ORTE_RML_ACTIVATE_MESSAGE(msg); - opal_list_remove_item(&orte_rml_base.unmatched_msgs, item); - if (!get_all) { - break; - } - } - item = next; - } -} - -void orte_rml_base_process_msg(int fd, short flags, void *cbdata) -{ - orte_rml_recv_t *msg = (orte_rml_recv_t*)cbdata; - OPAL_OUTPUT_VERBOSE((5, orte_rml_base_framework.framework_output, - "%s message received from %s for tag %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&msg->sender), - msg->tag)); - - OPAL_TIMING_EVENT((&tm_rml,"from %s %d bytes", - ORTE_NAME_PRINT(&msg->sender), msg->iov.iov_len)); - orte_rml_base_complete_recv_msg(&msg); -}