diff --git a/orte/mca/plm/base/plm_base_receive.c b/orte/mca/plm/base/plm_base_receive.c index 799d30fe3a..bc0b588f20 100644 --- a/orte/mca/plm/base/plm_base_receive.c +++ b/orte/mca/plm/base/plm_base_receive.c @@ -77,7 +77,14 @@ int orte_plm_base_comm_start(void) processing = false; OBJ_CONSTRUCT(&lock, opal_mutex_t); OBJ_CONSTRUCT(&recvs, opal_list_t); +#ifndef __WINDOWS__ pipe(ready_fd); +#else + if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, ready_fd) == -1) { + return ORTE_ERROR; + } +#endif + opal_event_set(&ready, ready_fd[0], OPAL_EV_READ, process_msg, NULL); opal_event_add(&ready, 0); @@ -102,7 +109,11 @@ int orte_plm_base_comm_stop(void) OBJ_DESTRUCT(&recvs); opal_event_del(&ready); +#ifndef __WINDOWS__ close(ready_fd[0]); +#else + closesocket(ready_fd[0]); +#endif processing = false; OBJ_DESTRUCT(&lock); @@ -142,7 +153,11 @@ void process_msg(int fd, short event, void *data) processing = true; /* clear the file descriptor to stop the event from refiring */ +#ifndef __WINDOWS__ read(fd, &dump, sizeof(dump)); +#else + recv(fd, (char *) &dump, sizeof(dump), 0); +#endif while (NULL != (item = opal_list_remove_first(&recvs))) { msgpkt = (orte_msg_packet_t*)item; diff --git a/orte/mca/rml/rml_types.h b/orte/mca/rml/rml_types.h index 6122731c15..dcf5e6ee3e 100644 --- a/orte/mca/rml/rml_types.h +++ b/orte/mca/rml/rml_types.h @@ -53,7 +53,7 @@ typedef struct { } orte_msg_packet_t; ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_msg_packet_t); - +#ifndef __WINDOWS__ #define ORTE_PROCESS_MESSAGE(rlist, lck, flg, fd, crt, sndr, buf) \ do { \ orte_msg_packet_t *pkt; \ @@ -75,6 +75,29 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_msg_packet_t); } \ OPAL_THREAD_UNLOCK((lck)); \ } while(0); +#else +#define ORTE_PROCESS_MESSAGE(rlist, lck, flg, fd, crt, sndr, buf) \ + do { \ + orte_msg_packet_t *pkt; \ + int data=1; \ + pkt = OBJ_NEW(orte_msg_packet_t); \ + pkt->sender.jobid = (sndr)->jobid; \ + pkt->sender.vpid = (sndr)->vpid; \ + if ((crt)) { \ + pkt->buffer = OBJ_NEW(opal_buffer_t); \ + opal_dss.copy_payload(pkt->buffer, *(buf)); \ + } else { \ + pkt->buffer = *(buf); \ + *(buf) = NULL; \ + } \ + OPAL_THREAD_LOCK((lck)); \ + opal_list_append((rlist), &pkt->super); \ + if (!(flg)) { \ + send((fd), (const char*) &data, sizeof(data), 0); \ + } \ + OPAL_THREAD_UNLOCK((lck)); \ + } while(0); +#endif /**