diff --git a/orte/mca/pls/xcpu/pls_xcpu.c b/orte/mca/pls/xcpu/pls_xcpu.c index ec75df3d4a..667d3a40d3 100644 --- a/orte/mca/pls/xcpu/pls_xcpu.c +++ b/orte/mca/pls/xcpu/pls_xcpu.c @@ -77,96 +77,245 @@ orte_pls_base_module_t orte_pls_xcpu_module = { orte_pls_xcpu_terminate_proc, orte_pls_xcpu_finalize }; -int lrx(int argc, char **argv); -int get_argc(char **argv){ - int i=0; - while(argv[i]){ - i++; - } - return i; -} -void free_stack(tid_stack *s){ +/** include a prototype for the xcpu launch function */ +int lrx(int argc, char **argv); + +/** LOCAL SUPPORT FUNCTIONS **/ + +/** provide a local function to release the function stack + * required by xcpu + */ +static void orte_pls_xcpu_free_stack(tid_stack *s){ if(s){ - free_stack(s->next); + orte_pls_xcpu_free_stack(s->next); free(s); } } +/** provide a function to setup the environment for the remote + * processes. We need to ensure that the remote processes know + * their OpenRTE name, their gpr and ns replicas, the universe + * to which they belong, etc. - otherwise, they may run, but they + * will never actually join the rest of the job. This function + * creates the common environment for all the processes. + */ +static int orte_pls_xcpu_setup_env(char ***env) +{ + char *uri, *param; + + /** the append_nosize utility is kind enough to do whatever allocation is necessary + * to start the argv array if it doesn't already exist, so we can just start "appending" + * information to it + */ + if (OPAL_SUCCESS != (rc = int opal_argv_append_nosize(env, "--universe"))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if (OPAL_SUCCESS != (rc = int opal_argv_append_nosize(env, orte_universe_info.name))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /** Since we may be doing this anywhere, we always check to see if we are + * the replica, or if we need to link this process to somewhere else + */ + if (NULL != orte_process_info.ns_replica_uri) { + uri = strdup(orte_process_info.ns_replica_uri); + } else { + uri = orte_rml.get_uri(); + } + asprintf(¶m, "\"%s\"", uri); + free(uri); + if (OPAL_SUCCESS != (rc = int opal_argv_append_nosize(env, "--nsreplica"))) { + ORTE_ERROR_LOG(rc); + free(param); + return rc; + } + if (OPAL_SUCCESS != (rc = int opal_argv_append_nosize(env, param))) { + ORTE_ERROR_LOG(rc); + free(param); + return rc; + } + free(param); + /** do the same for the gpr */ + if (OPAL_SUCCESS != (rc = int opal_argv_append_nosize(env, "--gprreplica"))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if (NULL != orte_process_info.gpr_replica_uri) { + uri = strdup(orte_process_info.gpr_replica_uri); + } else { + uri = orte_rml.get_uri(); + } + asprintf(¶m, "\"%s\"", uri); + free(uri); + if (OPAL_SUCCESS != (rc = int opal_argv_append_nosize(env, param))) { + ORTE_ERROR_LOG(rc); + free(param); + return rc; + } + free(param); +} + + +/** LAUNCH **/ + /* This is the main function that will launch jobs on remote compute modes * @param jobid the jobid of the job to launch * @retval ORTE_SUCCESS or error */ int orte_pls_xcpu_launch(orte_jobid_t jobid){ opal_list_t mapping; - char **new_argv; - int new_argc, nprocs=0; + char **base_argv, **app_argv; + char *header[] = { + "dummy", + NULL, + NULL}; + int nprocs=0, argc; int rc, i=0; - tid_stack *t_stack, *temp_stack; + orte_pls_xcpu_tid_stack *t_stack, *temp_stack; opal_list_item_t *item, *temp; orte_rmaps_base_map_t* map; - /* first get the list of nodes on which we are going to launch job */ - /* OBJ_CONSTRUCT construct/initialize objects that are not dynamically allocated. - * see file opal/class/opal_object.h for detils - */ + orte_rmaps_base_node_t *node; + orte_rmaps_base_proc_t *proc; + + /** first get the mapping we are going to use to launch job. The head + * of the list is OBJ_CONSTRUCT'd since it is not dynamically allocated. The + * get_map function, however, will dynamically allocate the items in the + * list itself - these will be released when we OBJ_DESTRUCT the list at + * the end + */ /*fprintf(stdout, "\nxcpu launch called, job id: %d\n", jobid);*/ OBJ_CONSTRUCT(&mapping, opal_list_t); - /* 1. get map from registry*/ + /** get the mapping from the registry. This will provide a linked list, one + * item for each mapping. Each item contains the full context of the application + * that is to be executed upon that node. In particular, we need to obtain + * the argv array that is included in that context as this tells us the application + * to launch plus any "flags" to pass to it. + */ if(ORTE_SUCCESS != (rc = orte_rmaps_base_get_map(jobid, &mapping))) { ORTE_ERROR_LOG(rc); return rc; } - /* 2. use the map to launch jobs*/ - map=(orte_rmaps_base_map_t*)opal_list_get_first(&mapping); - new_argc=get_argc(map->app->argv)+3; - new_argv=(char**)malloc(new_argc*sizeof(char*)); - new_argv[0]=(char*)malloc(1);/*it could be anything ... doesn't matter*/ - for(i=2; iapp->argv[i-2]; - /*fprintf(stdout, "new_argv[%d]:%s\n", i, new_argv[i]);*/ + + /** since it is possible that each node could be executing a different application, + * we cannot just do a mass launch - that would only be supported in the special + * case of all the application processes being identical. Instead, we are going to + * step our way through the list, launching each process individually. For each node, + * however, we need to have an argv array that fully describes the respective + * command line options -- *including* all those required by OpenRTE and Open MPI + * to interconnect the processes to the rest of the job. + * + * First, therefore, let's construct an argv that contains all the OpenRTE and + * Open MPI required information. We will later "merge" this into the argv for + * each application prior to launch. + */ + if (ORTE_SUCCESS != (rc = orte_pls_xcpu_setup_env(&base_env))) { + ORTE_ERROR_LOG(rc); + return rc; } - new_argv[i]=NULL; - /*printf("new_argv[%d] is nulled\n", i);*/ + + /** we have to do the following so that we can use the opal_argv utilities + * to properly insert the header into the app's argv + */ + header[1] = strdup("dummy"); + + /** Now loop through all the provided maps to launch their associated apps */ t_stack=NULL; + nprocs = 0; for(item = opal_list_get_first(&mapping); item != opal_list_get_end(&mapping); item = opal_list_get_next(item)) { map = (orte_rmaps_base_map_t*) item; + + /** augment the map's argv with our base environment */ + argc = opal_argv_len(map->app->argv); + opal_argv_insert(&(map->app->argv), argc, base_env); + + /** xcpu requires an argv format that has a dummy filler in the + * first location, followed by the node name, and then the standard + * argv array we've all come to know and love (i.e., the application + * name followed by options). We use the opal_argv utilities to + * prepend this header info to the application's argv. + * + * Note: at this point, the header contains a dummy placeholder + * for the node name - we'll fill that in later. + */ + opal_argv_insert(&(map->app->argv), 0, header); + + /** Loop through each process in the map and launch it */ + /* now here.. do we want to pass all node-names and binary as * arguments to xcpu_launch or do we want to launch then one * by one, by providing only one node-name and binary at a time? */ - for(temp = opal_list_get_first(&map->nodes); - temp != opal_list_get_end(&map->nodes); + for(temp = opal_list_get_first(&map->procs); + temp != opal_list_get_end(&map->procs); temp = opal_list_get_next(temp)){ - - new_argv[1]=((orte_rmaps_base_node_t*)temp)->node->node_name; - /*above should contain node name where process is to be launched*/ - /*fprintf(stdout, "node name: %s\n", new_argv[1]);*/ - nprocs=((orte_rmaps_base_node_t*)temp)->node_procs.opal_list_length; - /*fprintf(stdout, "list length: %d\n", nprocs);*/ - for (i = 0; inext=t_stack; - t_stack=temp_stack; - t_stack->tid=lrx(new_argc, new_argv); + proc = (orte_rmaps_base_proc_t*)temp; + node = proc->proc_node; + + /** each proc_t entry contains the application to be executed, + * the node upon which it is to be executed, and its OpenRTE + * process name (plus a few other things). We use that + * info to build the launch command by inserting them into + * the argv array + */ + + /** start by pointing the proper location at the node name where + * this process is to be launched + */ + if (NULL != map->app->argv[1]) free(map->app->argv[1]); + map->app->argv[1] = strdup(node->node->node_name); + + /** now add the process name to the argv */ + if (OPAL_SUCCESS != (rc = int opal_argv_append_nosize(&(map->app->argv), "--name"))) { + ORTE_ERROR_LOG(rc); + return rc; } + if (ORTE_SUCCESS != (rc = orte_ns.get_proc_name_string(¶m, name))) { + ORTE_ERROR_LOG(rc); + return rc; + } + if (OPAL_SUCCESS != (rc = int opal_argv_append_nosize(&(map->app->argv), param))) { + ORTE_ERROR_LOG(rc); + free(param); + return rc; + } + free(param); + + /** the launcher wants to know how long the argv array is - get that now */ + argc = opal_argv_len(map->app->argv); + + /** add this process to the stack so we can track it */ + temp_stack=(orte_pls_xcpu_tid_stack*)malloc(sizeof(orte_pls_xcpu_tid_stack)); + temp_stack->next=t_stack; + t_stack=temp_stack; + + /** launch the process */ + t_stack->tid=lrx(argc, map->app->argv); + + /** cleanup the app's argv. Only the last two locations need to be deleted + * since everything else is common to all the applications (we let the + * node name location be handled as above) + */ + opal_argv_delete(&argc, &(map->app->argv), argc-2, 2); } } - /* wait for all thrads that have launched processes on remote nodes - * */ + /** wait for all threads that have launched processes on remote nodes */ temp_stack=t_stack; while(t_stack){ pthread_join(t_stack->tid, NULL); t_stack=t_stack->next; } orte_soh.begin_monitoring_job(jobid); - - free_stack(temp_stack); - free(new_argv[0]); - /*free(new_argv[1]);*/ - free(new_argv); + + /** cleanup local storage */ + orte_pls_xcpu_free_stack(temp_stack); + opal_argv_free(base_env); OBJ_DESTRUCT(&mapping); + + /** launch complete */ /*fprintf(stdout, "launch finished\n");*/ return ORTE_SUCCESS; } @@ -175,7 +324,7 @@ int orte_pls_xcpu_terminate_job(orte_jobid_t jobid){ return ORTE_SUCCESS; } int orte_pls_xcpu_terminate_proc(const orte_process_name_t* proc_name){ - return ORTE_SUCCESS; + return ORTE_SUCCESS; } int orte_pls_xcpu_finalize(void){ return ORTE_SUCCESS; diff --git a/orte/mca/pls/xcpu/pls_xcpu.h b/orte/mca/pls/xcpu/pls_xcpu.h index 218c2bed05..8a8f10fac2 100644 --- a/orte/mca/pls/xcpu/pls_xcpu.h +++ b/orte/mca/pls/xcpu/pls_xcpu.h @@ -1,5 +1,5 @@ /* -*- C -*- - * + * * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. @@ -11,17 +11,17 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ * * */ /** * @file: - * Header file for the xcpu launcher. This will use xcpu to launch jobs on - * the list of nodes that it will get from RAS (resource allocation + * Header file for the xcpu launcher. This will use xcpu to launch jobs on + * the list of nodes that it will get from RAS (resource allocation * system (slurm??) * -# pls_xcpu is called by orterun. It reads the ompi registry and launch * the binary on the nodes specified in the registry. @@ -67,15 +67,15 @@ int orte_pls_xcpu_finalize(void); */ struct orte_pls_xcpu_component_t { orte_pls_base_component_t super;/*base_class this is needed others below this are not*/ - + /* most of the memebrs below are going to get removed from this structure * and so are their registrations from open() function */ bool done_launching; /* Is true if we are done launching the user's app. */ int debug; /* If greater than 0 print debugging information */ int num_procs; /* The number of processes that are running */ - int priority; /* The priority of this component. This will be returned if - * we determine that xcpu is available and running on this node, + int priority; /* The priority of this component. This will be returned if + * we determine that xcpu is available and running on this node, */ int terminate_sig; /* The signal that gets sent to a process to kill it. */ size_t num_daemons; /* The number of daemons that are currently running. */ @@ -91,11 +91,11 @@ struct orte_pls_xcpu_component_t { */ typedef struct orte_pls_xcpu_component_t orte_pls_xcpu_component_t; -struct tid_stack { +struct orte_pls_xcpu_tid_stack { int tid; - struct tid_stack *next; + struct orte_pls_xcpu_tid_stack *next; }; -typedef struct tid_stack tid_stack; +typedef struct orte_pls_xcpu_tid_stack orte_pls_xcpu_tid_stack; ORTE_DECLSPEC extern orte_pls_xcpu_component_t mca_pls_xcpu_component; ORTE_DECLSPEC extern orte_pls_base_module_t orte_pls_xcpu_module; /* this is defined in pls_xcpu.c file */