# -*- coding: utf-8 -*-
#!/usr/bin/env python
#The MIT CorrelX Correlator
#Project leads: Victor Pankratius, Pedro Elosegui Project developer: A.J. Vazquez Alvarez
#Copyright 2017 MIT Haystack Observatory
#Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#Project: CorrelX.
#Author: A.J. Vazquez Alvarez (
Management of CorrelX configuration files.

#initial version: 2016.10 ajva
#MIT Haystack Observatory

from __future__ import print_function

import imp
import sys
import os

import const_config
from const_config import *

import const_hadoop
from const_hadoop import *

    import configparser
except ImportError:
    import ConfigParser as configparser

[docs]def get_log_file(config_file,suffix="",output_log_folder="e"): """ Get logging files. Parameters ---------- config_file : str path to CorrelX configuration file. suffix : str suffix (with timestamp) to be added to log filename. output_log_folder : str suffix to be added to log file path. Returns ------- file_log : handler to file handler to log file. temp_log : str path to temporary (buffer) file for system calls. """ config = configparser.ConfigParser() config.optionxform = str temp_log=config.get(C_CONF_FILES,C_CONF_FILES_TMP_LOG) temp_log+=suffix file_log_input=config.get(C_CONF_GEN,C_CONF_GEN_LOG_FILE) if file_log_input=="sys.stdout": file_log = sys.stdout else: os.system("mkdir -p "+output_log_folder) file_log_input=output_log_folder+"/log_"+suffix+"_ext" file_log = open(file_log_input,'w') return([file_log,temp_log])
[docs]def get_nodes_file(config_file): """ Get name of file with list of nodes (from config file). Parameters ---------- config_file : str path to CorrelX configuration file. Returns ------- file_read_nodes : str path to hosts file. """ config = configparser.ConfigParser() config.optionxform = str file_read_nodes=config.get(C_CONF_GEN,C_CONF_GEN_HOSTS) return(file_read_nodes)
[docs]def get_config_mod_for_this_master(config_file,config_suffix,master_node,script_arg_zero): """ Overwrite all instances of "localhost" in configuration file with master node name into new configuration file (original configuration file is used as a template), "~" with home folder for the current user, "localuser" with the current user, and "localpath" with the path of the script_arg_zero ( Parameters ---------- config_file : str path to CorrelX configuration file. config_suffix : str suffix to be added to resulting configuration file. master_node : str master node name. script_arg_zero : str path given for the main script ( Returns ------- new_config_file : str configuration file plus suffix. Notes ----- | | **TO DO:** | | Move new configuration file into folder with logs for this job. """ new_config_file = config_file + config_suffix abs_local_path = os.path.dirname(os.path.abspath(script_arg_zero)) abs_local_path = os.path.abspath(abs_local_path+C_CONF_RELATIVE_PATH_LOCALPATH) os.system("sed -e 's/"+C_CONF_RES_LOCALHOST+"/" + master_node + "/' " + config_file +" > " + new_config_file) os.system("sed -i \"s@~@$HOME@g\" " + new_config_file) os.system("sed -i \"s@"+C_CONF_RES_LOCALUSER+"@$USER@g\" " + new_config_file) os.system("sed -i \"s@"+C_CONF_RES_LOCALPATH+"@" + abs_local_path + "@g\" " + new_config_file) return(new_config_file)
[docs]def get_configuration(file_log,config_file,timestamp_str,v=0): """ Read parameters from configuration file "configh.conf". Parameters ---------- file_log : handler to file handler to log file. config_file : str path to CorrelX configuration file. timestamp_str : str suffix to be added to temporary data folder (hwere media will be split). v : int verbose if 1. Returns ------- MAPPER : str Python mapper (.py). REDUCER : str Python reducer (.py). DEPENDENCIES : str Comma separated list of Python files required for mapper and reducer (,,etc). PACKETS_PER_HDFS_BLOCK : int Number of VDIF frames per file split. CHECKSUM_SIZE : int Number of bytes for checksum. SRC_DIR : str Folder with Python sources for mapper, reducer and dependencies. APP_DIR : str Folder to place mapper, reducer and dependencies in all nodes (in master-associated folder). CONF_DIR : str Base working folder for configuration files (to be updated later for this master). TEMPLATES_CONF_DIR : str Folder with templates for core-site.xml,yarn-site.xml,mapred-site.xml,hdfs-site.xml. TEMPLATES_ENV_DIR : str Folder with templates for, etc. HADOOP_DIR : str Path to Hadoop home folder. HADOOP_CONF_DIR : str Path to Hadoop configuration folder (to be updated later for this master). NODES : str File to write list of nodes to host the cluster (one node per line). MAPPERSH : str File to write bash script for mapper (call to python script with all arguments). REDUCERSH : str File to write bash script for reducer (call to python script with all arguments). JOBSH : str File to write bash script for job request for Hadoop (call to python script with all arguments). PYTHON_X : str Path to Python executable. USERNAME_MACHINES : str Username for ssh into the cluster machines. MAX_SLAVES : int Maximum number of worker nodes (-1 no maximum). SLAVES : str Filename for Hadoop slaves file. MASTERS : str Filename for Hadoop masters file. MASTER_IS_SLAVE : bool Boolean, if 1 master is also launching a nodemanager (doing mapreduce). HADOOP_TEMP_DIR : str Folder for Hadoop temporary folders. DATA_DIR : str Path with media input files. DATA_DIR_TMP : str Path to folder to place splits of input file before moving them to the distributed filesystem. HDFS_DATA_DIR : str Path in the HDFS distributed filesystem to move input splits. HADOOP_START_DELAY : str Number of seconds to wait after every interaction with Hadoop during the cluster initialization. HADOOP_STOP_DELAY : str Number of seconds to wait after every interaction with Hadoop during the cluster termination. PREFIX_OUTPUT : str Prefix for output file. HADOOP_TEXT_DELIMITER : str Text delimiter for input splits (lib_mapredcorr.run_mapreduce_sh). OUTPUT_DIR : str Folder in local filesystem to place output file. OUTPUT_SYM : str Folder within experiment configuration folders to place symbolic link to output file. RUN_PIPELINE : bool Boolean, if 1 will run in pipeline mode. RUN_HADOOP : bool Boolean, if 1 will run Hadoop. MAX_CPU_VCORES : int Maximum number of virtual CPU cores. HDFS_REPLICATION : int Number of copies of each input split in HDFS. OVER_SLURM : bool Boolean, 1 to run in a cluster where the local filesystem is NFS (or synchronized among all nodes). HDFS_COPY_DELAY : int Number of seconds to wait after every interaction with Hadoop during file distribution to HDFS. FFT_AT_MAPPER : bool Boolean, if 0 FFT is done at reducer (default). INI_FOLDER : str Folder with experiment .ini files. INI_STATIONS : str Stations ini file name. INI_SOURCES : str Sources ini file name. INI_DELAY_MODEL : str Delay model ini file name. INI_DELAYS : str Delay polynomials ini file name. INI_MEDIA : str Media ini file name. INI_CORRELATION : str Correlation ini file name. INTERNAL_LOG_MAPPER [remove] currently default 0. INTERNAL_LOG_REDUCER [remove] currenlty default 0. ADJUST_MAPPERS : float Force number of mappers computed automatically to be multiplied by this number. ADJUST_REDUCERS : float Force number of reducers computed automatically to be multiplied by this number. FFTS_PER_CHUNK [Remove] Number of DFT windows per mapper output, -1 by default (whole frame) TEXT_MODE : bool True by default. USE_NOHASH_PARTITIONER : bool True to use NoHash partitioner. USE_LUSTRE_PLUGIN : bool True to use Lustre plugin for Hadoop. LUSTRE_USER_DIR : str Absolute path for the Lustre working path (used in mapreduce job). LUSTRE_PREFIX : str Path in Lustre to preceed HDFS_DATA_DIR if using Lustre. ONE_BASELINE_PER_TASK : int 0 by default (if 1, old implementation allowed scaling with one baseline per task in the reducers). MIN_MAPPER_CHUNK [Remove] Chunk constraints for mapper. MAX_MAPPER_CHUNK [Remove] Chunk constraints for mapper. TASK_SCALING_STATIONS: int 0 by default (if 1, old implementation allowed linear scaling per task in the reducers). SORT_OUTPUT : bool If 1 will sort lines in output file. BM_AVOID_COPY : bool If 1 will not split and copy input files if this has already been done previously (for benchmarking). BM_DELETE_OUTPUT : bool If 1 will not retrieve output file from distributed filesystem (for benchmarking). TIMEOUT_STOP : int Number of seconds to wait before terminating nodes during cluster stop routine. SINGLE_PRECISION : bool If 1 computations will be done in single precision. PROFILE_MAP: int | if 1 will generate call graphs with timing information for mapper (requires Python Call Graph package), | if 2 will use cProfile. PROFILE_RED : int | if 1 will generate call graphs with timing information for reducer (requires Python Call Graph package), | if 2 will use cProfile. Notes ----- | | **Configuration:** | | All constants taken from and | | | **TO DO:** | | OVER_SLURM: explain better, and check assumptions. | Remove INTERNAL_LOG_MAPPER and INTERNAL_LOG_REDUCER. | Remove FFTS_PER_CHUNK,MIN_MAPPER_CHUNK and MAX_MAPPER_CHUNK. | Check that SINGLE_PRECISION is followed in mapper and reducer. """ #config_file="configh.conf" config = configparser.ConfigParser() config.optionxform = str if v==1: print("\nReading configuration file...",file=file_log) # General RUN_PIPELINE = config.getboolean( C_CONF_GEN, C_CONF_GEN_RUN_PIPE) SORT_OUTPUT = config.getboolean( C_CONF_GEN, C_CONF_GEN_SORT_OUTPUT) RUN_HADOOP = config.getboolean( C_CONF_GEN, C_CONF_GEN_RUN_HADOOP) OVER_SLURM = config.getboolean( C_CONF_GEN, C_CONF_GEN_OVER_SLURM) USE_NOHASH_PARTITIONER = config.getboolean(C_CONF_GEN, C_CONF_GEN_USE_NOHASH) USE_LUSTRE_PLUGIN = config.getboolean( C_CONF_GEN, C_CONF_GEN_USE_LUSTRE) LUSTRE_USER_DIR = config.get( C_CONF_GEN, C_CONF_GEN_LUSTRE_USER_FOLDER) LUSTRE_PREFIX = config.get( C_CONF_GEN, C_CONF_GEN_LUSTRE_PREFIX) INTERNAL_LOG_MAPPER = 0 # TO DO: remove INTERNAL_LOG_REDUCER = 0 # TO DO: remove # Benchmarking BM_AVOID_COPY = config.getboolean( C_CONF_BENCHMARKING, C_CONF_BENCHMARKING_AVOID_COPY) BM_DELETE_OUTPUT = config.getboolean( C_CONF_BENCHMARKING, C_CONF_BENCHMARKING_DELETE_OUTPUT) # Profiling PROFILE_MAP = config.getboolean( C_CONF_PROFILING, C_CONF_PROFILING_MAP) PROFILE_RED = config.getboolean( C_CONF_PROFILING, C_CONF_PROFILING_RED) use_pycallgraph = config.getboolean( C_CONF_PROFILING, C_CONF_PROFILING_PYCALLGRAPH) if not(use_pycallgraph): PROFILE_MAP = 2*int(PROFILE_MAP) PROFILE_RED = 2*int(PROFILE_RED) # Files MAPPER = config.get( C_CONF_FILES, C_CONF_FILES_MAP) REDUCER = config.get( C_CONF_FILES, C_CONF_FILES_RED) DEPENDENCIES = config.get( C_CONF_FILES, C_CONF_FILES_DEP).split(',') MAPPERSH = config.get( C_CONF_FILES, C_CONF_FILES_MAP_BASH) REDUCERSH = config.get( C_CONF_FILES, C_CONF_FILES_RED_BASH) JOBSH = config.get( C_CONF_FILES, C_CONF_FILES_JOB_BASH) PYTHON_X = config.get( C_CONF_FILES, C_CONF_FILES_PYTHON) NODES = config.get( C_CONF_FILES, C_CONF_FILES_NODES) SRC_DIR = config.get( C_CONF_FILES, C_CONF_FILES_SRC_DIR) if SRC_DIR[-1]!="/": SRC_DIR+=("/") APP_DIR = config.get( C_CONF_FILES, C_CONF_FILES_APP_DIR) if APP_DIR[-1]!="/": APP_DIR+=("/") CONF_DIR = config.get( C_CONF_FILES, C_CONF_FILES_CONF_DIR) if CONF_DIR[-1]!="/": CONF_DIR+=("/") TEMPLATES_CONF_DIR = config.get( C_CONF_FILES, C_CONF_FILES_CONF_TEMPLATES) if TEMPLATES_CONF_DIR[-1]!="/": TEMPLATES_CONF_DIR+=("/") TEMPLATES_ENV_DIR = config.get( C_CONF_FILES, C_CONF_FILES_ENV_TEMPLATES) if TEMPLATES_ENV_DIR[-1]!="/": TEMPLATES_ENV_DIR+=("/") HADOOP_DIR = config.get( C_CONF_FILES, C_CONF_FILES_HADOOP_DIR) if HADOOP_DIR[-1]!="/": HADOOP_DIR+=("/") HADOOP_CONF_DIR = HADOOP_DIR+HADOOP_REL_ETC # (!) This is overriten later at get_conf_out_dirs() HADOOP_TEMP_DIR = config.get( C_CONF_FILES, C_CONF_FILES_TMP_DIR) if HADOOP_TEMP_DIR[-1]!="/": HADOOP_TEMP_DIR+=("/") DATA_DIR_TMP = config.get( C_CONF_FILES, C_CONF_FILES_TMP_DATA_DIR) if DATA_DIR_TMP[-1]!="/": DATA_DIR_TMP+=("/") OUTPUT_DIR = config.get( C_CONF_FILES, C_CONF_FILES_OUT_DIR) if OUTPUT_DIR[-1]!="/": OUTPUT_DIR+=("/") USERNAME_MACHINES = config.get( C_CONF_FILES, C_CONF_FILES_USERNAME_MACHINES) PREFIX_OUTPUT = config.get( C_CONF_FILES, C_CONF_FILES_PREFIX_OUTPUT) # HDFS HDFS_DATA_DIR = config.get( C_CONF_HDFS, C_CONF_HDFS_INPUT_DATA_DIR) HDFS_DATA_DIR_SUFFIX = config.get( C_CONF_HDFS, C_CONF_HDFS_DIR_SUFFIX) HDFS_DATA_DIR+=HDFS_DATA_DIR_SUFFIX if HDFS_DATA_DIR[-1]!="/": HDFS_DATA_DIR+=("/") PACKETS_PER_HDFS_BLOCK = config.getint( C_CONF_HDFS, C_CONF_HDFS_PACKETS_PER_BLOCK) CHECKSUM_SIZE = config.getint( C_CONF_HDFS, C_CONF_HDFS_CHECKSUM_SIZE) # Has to divide the block size (which is a multiple of 32) # PACKET SIZE is generated with "generate_vdif", next cell. # Slaves SLAVES = config.get( C_CONF_HSLAVES, C_CONF_H_ALL_CONFIG_FILE) MAX_SLAVES = config.getint( C_CONF_HSLAVES, C_CONF_HSLAVES_MAX_SLAVES) # Masters MASTERS = config.get( C_CONF_HMASTERS, C_CONF_H_ALL_CONFIG_FILE) MASTER_IS_SLAVE = config.getboolean( C_CONF_HMASTERS, C_CONF_HMASTERS_MASTER_IS_SLAVE) # Hadoop-other HADOOP_START_DELAY = config.getint( C_CONF_OTHER, C_CONF_OTHER_START_DELAY) HADOOP_STOP_DELAY = config.getint( C_CONF_OTHER, C_CONF_OTHER_STOP_DELAY) HDFS_COPY_DELAY = config.getint( C_CONF_OTHER, C_CONF_OTHER_COPY_DELAY) TEXT_MODE = config.getboolean( C_CONF_OTHER, C_CONF_OTHER_TEXT_MODE) HADOOP_TEXT_DELIMITER = config.get( C_CONF_OTHER, C_CONF_OTHER_TEXT_DELIMITER) MAX_CPU_VCORES = config.getint( C_CONF_OTHER, C_CONF_OTHER_MAX_CPU_VCORES) FFT_AT_MAPPER = config.getboolean( C_CONF_OTHER, C_CONF_OTHER_FFT_MAP) ADJUST_MAPPERS = config.getfloat( C_CONF_OTHER, C_CONF_OTHER_ADJ_MAP) ADJUST_REDUCERS = config.getfloat( C_CONF_OTHER, C_CONF_OTHER_ADJ_RED) ONE_BASELINE_PER_TASK = config.getboolean( C_CONF_OTHER, C_CONF_OTHER_ONE_BASELINE) TASK_SCALING_STATIONS = config.getboolean( C_CONF_OTHER, C_CONF_OTHER_SCALING_STATIONS) TIMEOUT_STOP = config.getint( C_CONF_OTHER, C_CONF_OTHER_TIMEOUT_STOP_NODES) SINGLE_PRECISION = config.getboolean( C_CONF_OTHER, C_CONF_OTHER_SINGLE_PRECISION) FFTS_PER_CHUNK = -1 # TO DO: remove MIN_MAPPER_CHUNK = -1 # TO DO: remove MAX_MAPPER_CHUNK = -1 # TO DO: remove # Experiment INI_FOLDER = config.get( C_CONF_EXP, C_CONF_EXP_FOLDER) INI_STATIONS = INI_FOLDER + "/" + config.get( C_CONF_EXP, C_CONF_EXP_STATIONS) INI_SOURCES = INI_FOLDER + "/" + config.get( C_CONF_EXP, C_CONF_EXP_SOURCES) INI_DELAYS = INI_FOLDER + "/" + config.get( C_CONF_EXP, C_CONF_EXP_DELAYS) INI_DELAY_MODEL = INI_FOLDER + "/" + config.get( C_CONF_EXP, C_CONF_EXP_DELAY_MODEL) INI_MEDIA = INI_FOLDER + "/" + config.get( C_CONF_EXP, C_CONF_EXP_MEDIA) INI_CORRELATION = INI_FOLDER + "/" + config.get( C_CONF_EXP, C_CONF_EXP_CORRELATION) DATA_DIR = INI_FOLDER + "/" + config.get( C_CONF_EXP, C_CONF_EXP_MEDIA_SUB) + "/" OUT_DIR_PREFIX = config.get( C_CONF_EXP, C_CONF_EXP_MEDIA_SUB_PREFIX) #TO DO: vcores value is forced from MAX_CPU_VCORES... maybe it has to be taken out from the configuration file section... fix this. HDFS_REPLICATION = config.getint( C_CONF_H_HDFS, C_H_HDFS_REPLICATION) ##AUTO_STATIONS = config.getint("Correlation",'Corr. same station') ##AUTO_THREADS = config.getint("Correlation",'Corr. same thread') OUTPUT_SYM=INI_FOLDER+"/"+OUT_DIR_PREFIX+"_"+timestamp_str+"/" if v==1: #if SAMPLES_PER_PACKET<FFT_SIZE: # print(" (!) The packet has fewer samples than the size of the FFT",file=file_log) print(" FFT at mapper:\t\t\t" + str(int(FFT_AT_MAPPER)),file=file_log) print(" Hadoop delays: [initialization = " + str(HADOOP_START_DELAY) + " s], [termination = " + str(HADOOP_STOP_DELAY) + " s], [HDFS = " + str(HDFS_COPY_DELAY) + " s]",file=file_log) if MAX_SLAVES>0: print(" Iterating: NODES (max=" +str(MAX_SLAVES) + ")",file=file_log) else: print(" Single run max. number of available nodes",file=file_log) if OVER_SLURM: print(" ---> SLURM - Configured for running on SLURM",file=file_log) else: print(" ---> VBox - Configured for running on VBox",file=file_log) print(" Temp dir overriden (default: /tmp)",file=file_log) return([MAPPER, REDUCER, DEPENDENCIES, PACKETS_PER_HDFS_BLOCK,CHECKSUM_SIZE,\ SRC_DIR,APP_DIR, CONF_DIR, TEMPLATES_CONF_DIR, TEMPLATES_ENV_DIR, HADOOP_DIR,HADOOP_CONF_DIR,NODES, MAPPERSH,REDUCERSH,JOBSH,PYTHON_X,\ USERNAME_MACHINES,MAX_SLAVES,SLAVES,MASTERS,MASTER_IS_SLAVE,HADOOP_TEMP_DIR,DATA_DIR,DATA_DIR_TMP,HDFS_DATA_DIR,HADOOP_START_DELAY,HADOOP_STOP_DELAY,\ PREFIX_OUTPUT,HADOOP_TEXT_DELIMITER,OUTPUT_DIR,OUTPUT_SYM,RUN_PIPELINE,RUN_HADOOP,MAX_CPU_VCORES,\ HDFS_REPLICATION,OVER_SLURM,HDFS_COPY_DELAY,\ FFT_AT_MAPPER,INI_FOLDER,\ INI_STATIONS, INI_SOURCES, INI_DELAY_MODEL, INI_DELAYS, INI_MEDIA, INI_CORRELATION,\ INTERNAL_LOG_MAPPER,INTERNAL_LOG_REDUCER,ADJUST_MAPPERS,ADJUST_REDUCERS,FFTS_PER_CHUNK,TEXT_MODE,\ USE_NOHASH_PARTITIONER,USE_LUSTRE_PLUGIN,LUSTRE_USER_DIR,LUSTRE_PREFIX,ONE_BASELINE_PER_TASK,\ MIN_MAPPER_CHUNK,MAX_MAPPER_CHUNK,TASK_SCALING_STATIONS,SORT_OUTPUT,BM_AVOID_COPY,\ BM_DELETE_OUTPUT,TIMEOUT_STOP,SINGLE_PRECISION,PROFILE_MAP,PROFILE_RED])
[docs]def update_config_param(source_file,pairs,v=0,file_log=sys.stdout): """ Updates a list of pairs [parameter,value] in a configuration file. Should be valid to any .ini file, but this is used to override the configuration on the CorrelX configuration file. Parameters ---------- source_file : str configuration file (.ini). pairs : list list of [parameter,value]. v : int verbose if 1. file_log : file handler handler for log file. Returns ------- N/A Notes ----- | | **TO DO:** | | Currently parameters that are not found are not added, this should be reported. """ if v==1: print("\nOverriding configuration file!",file=file_log) destination_file = source_file isamec = "" copy_again=1 destination_file+=".tmp" isamec = " mv " + destination_file + " " + source_file command = "" iteration=0 for pair in pairs: iteration+=1 if iteration==1: #command+="sed -e 's/" + pair[0] + ".*/" + pair[0] + "\\t\\t" + pair[1] + "/g' " + source_file command+="sed -e 's;" + pair[0] + ".*;" + pair[0] + "\\t\\t" + pair[1] + ";g' " + source_file if iteration>0: #command+="|sed -e 's/" + pair[0] + ".*/" + pair[0] + "\\t\\t" + pair[1] + "/g' " command+="|sed -e 's;" + pair[0] + ".*;" + pair[0] + "\\t\\t" + pair[1] + ";g' " command+= "> " + destination_file os.system(command) os.system(isamec) if v==1: for pair in pairs: #[1:]: print(" " + pair[0].ljust(49) + " " + pair[1],file=file_log) # Now check if any parameter was not found, and thus was not updated # Checks lowercase! with open(source_file, 'r') as f: lines = for pair in pairs: count=1 if pair[0].lower() in lines.lower(): count=0 if count==1: if v==1: print(" (!) Parameter not found: " + pair[0].ljust(49),file=file_log)
[docs]def override_configuration_parameters(forced_configuration_string,config_file,v=1,file_log=sys.stdout): """ This function takes as input the string with the parameters to the main script and overrides the corresponding parameters in the configuration files. This is to simplify batch testing. Parameters ---------- forced_configuration_string : str Comma separated list of parameter0=value0,parameter1=value1,... config_file : str Path to CorrelX configuration file. v : int Verbose if 1. file_log : file handler Handler for log file. Output: ------- N/A Notes ----- | | **Assumptions:** | | Assuming that C_H_MAPRED_RED_OPTS is higher than C_H_MAPRED_MAP_OPTS, so the first value is | taken for C_H_MAPRED_CHILD_OPTS. | | | **Notes:** | | For new parameters in configh.conf: | (1) Add constants for CLI in | (2) Check/add constants for hadoop configuration files in (if applicable). | (3) Add parameter reading in get_configuration(). | (4) Add option in if-structure below. """ # Override configuration parameters # e.g.: -f stations=3,slowstart=0.4,ppb=1 read_configuration_pairs=[i.split('=') for i in forced_configuration_string.split(',')] forced_configuration_pairs=[] unr_parameters=[] for pair in read_configuration_pairs: if len(pair)<2: unr_parameters+=[pair] else: [parameter,value] = pair if parameter==C_ARG_PPB: # Num. VDIF frames per block forced_configuration_pairs+=[[C_CONF_HDFS_PACKETS_PER_BLOCK+":",value]] elif parameter==C_ARG_SLOWSTART: # Mapreduce slowstart forced_configuration_pairs+=[[C_H_MAPRED_SLOWSTART+":",value]] elif parameter==C_ARG_REPLICATION: # HDFS replication forced_configuration_pairs+=[[C_H_HDFS_REPLICATION+":",value]] elif parameter==C_ARG_FFTM: # FFT at mapper forced_configuration_pairs+=[[C_CONF_OTHER_FFT_MAP+":",value]] elif parameter==C_ARG_FFTR: # FFT at reducer forced_configuration_pairs+=[[C_CONF_OTHER_FFT_MAP+":",str(int(not(bool(value))))]] elif parameter==C_ARG_ADJM: # Factor to multiply number of mappers forced_configuration_pairs+=[[C_CONF_OTHER_ADJ_MAP+":",value]] elif parameter==C_ARG_ADJR: # Factor to multiply number of reducers forced_configuration_pairs+=[[C_CONF_OTHER_ADJ_RED+":",value]] elif parameter==C_ARG_VCORES: # Number of cores per node forced_configuration_pairs+=[[C_H_YARN_MAX_VCORES+":",value]] forced_configuration_pairs+=[[C_H_YARN_RES_VCORES+":",value]] forced_configuration_pairs+=[[C_CONF_OTHER_MAX_CPU_VCORES+":",value]] elif parameter==C_ARG_VCORESPERMAP: # Number of cores per map task forced_configuration_pairs+=[[C_H_MAPRED_VCORES_MAP+":",value]] elif parameter==C_ARG_VCORESPERRED: # Number of cores per reduce task forced_configuration_pairs+=[[C_H_MAPRED_VCORES_RED+":",value]] elif parameter==C_ARG_MAPSPERNODE: # Max. maps per node forced_configuration_pairs+=[[C_H_MAPRED_MAP_MAX+":",value]] elif parameter==C_ARG_REDUCESPERNODE: # Max. reduces per node forced_configuration_pairs+=[[C_H_MAPRED_RED_MAX+":",value]] elif parameter==C_ARG_NODEMEM: # Memory per node (maps+reduces) forced_configuration_pairs+=[[C_H_YARN_MAX_MEM_MB+":",value]] forced_configuration_pairs+=[[C_H_YARN_RES_MEM_MB+":",value]] elif parameter==C_ARG_CONTAINERMEM_MAP: # Memory per container (mapper) forced_configuration_pairs+=[[C_H_YARN_MAP_MB+":",value]] elif parameter==C_ARG_CONTAINERMEM_RED: # Memory per container (reducer) forced_configuration_pairs+=[[C_H_YARN_RED_MB+":",value]] elif parameter==C_ARG_CONTAINERMEM_AM: # Memory per container (app manager) forced_configuration_pairs+=[[C_H_YARN_AM_RES_MB+":",value]] elif parameter==C_ARG_CONTAINERHEAP_MAP: # Memory heap per container (mapper) #forced_configuration_pairs+=[[C_H_MAPRED_MAP_OPTS+":", "-Xms"+value+"m -Xmx"+value+"m"]] # init forced_configuration_pairs+=[[C_H_MAPRED_MAP_OPTS+":", "-Xmx"+value+"m"]] # no init elif parameter==C_ARG_CONTAINERHEAP_RED: # Memory heap per container (mapper) #forced_configuration_pairs+=[[C_H_MAPRED_RED_OPTS+":", "-Xms"+value+"m -Xmx"+value+"m"]] # init #forced_configuration_pairs+=[[C_H_MAPRED_CHILD_OPTS+":", "-Xms"+value+"m -Xmx"+value+"m"]] # init forced_configuration_pairs+=[[C_H_MAPRED_RED_OPTS+":", "-Xmx"+value+"m"]] # no init forced_configuration_pairs+=[[C_H_MAPRED_CHILD_OPTS+":", "-Xmx"+value+"m"]] # no init elif parameter==C_ARG_CONTAINERHEAP_AM: # Memory heap per container (mapper) #forced_configuration_pairs+=[[C_H_MAPRED_AM_OPTS+":", "-Xms"+value+"m -Xmx"+value+"m"]] # init forced_configuration_pairs+=[[C_H_MAPRED_AM_OPTS+":", "-Xmx"+value+"m"]] # no init forced_configuration_pairs+=[[C_H_MAPRED_MAP_OPTS+":", "-Xmx"+value+"m"]] # no init forced_configuration_pairs+=[[C_H_MAPRED_RED_OPTS+":", "-Xmx"+value+"m"]] # no init elif parameter==C_ARG_SORTMEM: # Sort memory (after map) forced_configuration_pairs+=[[C_H_MAPRED_SORT_MB+":",value]] elif parameter==C_ARG_BLOCKSIZE: # HDFS block size forced_configuration_pairs+=[[C_H_HDFS_BLOCKSIZE+":",value]] elif parameter==C_ARG_MASTERWORKS: # Master node is also slave forced_configuration_pairs+=[[C_CONF_HMASTERS_MASTER_IS_SLAVE+":",value]] elif parameter==C_ARG_TASKSPERJVM: # Tasks before restaring JVM forced_configuration_pairs+=[[C_H_MAPRED_JVM_NUMTASKS+":",value]] elif parameter==C_ARG_AVOIDCOPY: # Do not re-copy input files forced_configuration_pairs+=[[C_CONF_BENCHMARKING_AVOID_COPY+":",value]] elif parameter==C_ARG_DELETEOUTPUT: # Delete output file (for benchmarking) forced_configuration_pairs+=[[C_CONF_BENCHMARKING_DELETE_OUTPUT+":",value]] elif parameter==C_ARG_SCALEST: # Linear scaling stations forced_configuration_pairs+=[[C_CONF_OTHER_SCALING_STATIONS+":",value]] elif parameter==C_ARG_MEDIASUFFIX: # Media suffix folder forced_configuration_pairs+=[[C_CONF_HDFS_DIR_SUFFIX+":",value]] elif parameter==C_ARG_SINGLEPRECISION: # Single precision in computations forced_configuration_pairs+=[[C_CONF_OTHER_SINGLE_PRECISION+":",value]] #elif parameter==C_ARG_DATA: # Data directory # list_value=value.split("/") # value='\/'.join(map(str, list_value)) # forced_configuration_pairs+=[["Data directory:",value]] elif parameter==C_ARG_EXPER: # Experiment folder forced_configuration_pairs+=[[C_CONF_EXP_FOLDER+":",value]] elif parameter==C_ARG_OUT: # Output folder forced_configuration_pairs+=[[C_CONF_FILES_OUT_DIR+":",value]] elif parameter==C_ARG_APP: # Application folder forced_configuration_pairs+=[[C_CONF_FILES_APP_DIR+":",value]] elif parameter==C_ARG_SERIAL: # Run pipeline mode forced_configuration_pairs+=[[C_CONF_GEN_RUN_PIPE+":",value]] elif parameter==C_ARG_PARALLEL: # Run Hadoop forced_configuration_pairs+=[[C_CONF_GEN_RUN_HADOOP+":",value]] elif parameter==C_ARG_NM_LOC_PORT: # Nodemanager ports forced_configuration_pairs+=[[C_H_YARN_NM_LOCALIZER_ADDRESS+":",HOSTNAME_HADOOP+":"+value]] elif parameter==C_ARG_NM_WEB_PORT: forced_configuration_pairs+=[[C_H_YARN_NM_WEBAPP_ADDRESS+":",HOSTNAME_HADOOP+":"+value]] elif parameter==C_ARG_SHUFFLE_PORT: forced_configuration_pairs+=[[C_H_MAPRED_SHUFFLE_PORT+":",value]] else: # Unknown parameter, simply report it unr_parameters+=[parameter] update_config_param(source_file=config_file,pairs=forced_configuration_pairs,v=v,file_log=file_log) if v==1: if unr_parameters!=[]: print("\n Unrecognized parameters:",file=file_log) for i in unr_parameters: print(" "+''.join(map(str,i)),file=file_log)
[docs]def get_list_configuration_files(config_file): """ Get list of Hadoop configuration files. Parameters ---------- config_file : str Path to CorrelX configuration file. Returns ------- list_configurations : list List of sections from configuration file associated to lists in "pairs_config" below. pairs_config : list List of pairs [[param0,value0],[param1,value1]...]...,[[...]...,[...]]] to update Hadoop config files later. """ config = configparser.ConfigParser() config.optionxform = str list_configurations=[i for i in config.sections() if (C_CONF_PREH in i)and(C_CONF_SUF_SLAVES not in i)and(C_CONF_SUF_OTHER not in i)] pairs_config = [[list(j) for j in config.items(i)] for i in list_configurations ] return([list_configurations,pairs_config])
[docs]def is_this_node_master(master,temp_log,v=0,file_log=sys.stdout): """ Devised in case the script was run in parallel at many nodes. Currenlty simply used to enforce that only one node is running as master. Parameters ---------- master : str master node name. temp_log : str path to temporary file for system calls (buffer). v : int verbose if 1. file_log : file handler handler for log file. Returns ------- this_is_master : int 1 if current node is the master, 0 otherwise. my_name : str current node name. my_ip : str current node IP address. Notes ----- | | **TO DO:** | | Simplify this. """ os.system("hostname > " + temp_log) os.system("hostname -I >> " + temp_log) this_is_master = 0 local_info=[] thiss=" not " with open(temp_log, 'r') as f_tmp: for line in f_tmp: local_info += line.strip().split(",") if master in line: this_is_master = 1 thiss=" " my_name = local_info[0] my_ip = local_info[1] if v==1: print("\nChecking if this node is master:",file=file_log) print(" Node " + my_name + " (" + my_ip + ") is" + thiss + "master",file=file_log) return([this_is_master,my_name,my_ip])
[docs]def overwrite_nodes_file(nodes_list,nodes_file,v=0,file_log=sys.stdout): """ Overwrite nodes file (in case less nodes are requested than are available). Parameters ---------- nodes_list : list of str names of the nodes in the allocation. nodes_file : str path to nodes file. v : int verbose if 1. file_log : file handler handler for log file. """ if v==1: print("\nCreating nodes file:",file=file_log) print(" " + nodes_file,file=file_log) with open(nodes_file, 'w') as f: for i in nodes_list: print(i,file=f) if v==1: print(" " + i,file=file_log) if v==1: print(" Nodes file overwritten!",file=file_log)
[docs]def create_directories(directories,v=0,file_log=sys.stdout): """ Create directories from a list of str with their paths. """ if v==1: print("\nCreating directories:",file=file_log) for i in directories: if not os.path.isdir(i): os.system("mkdir -p " + i) if v==1: print(" "+i,file=file_log)
[docs]def get_conf_out_dirs(master_name,hadoop_dir,app_dir,conf_dir,suffix_conf,output_dir,suffix_out,v=1,file_log=sys.stdout): """ Get paths for configuration and output folders. App and Conf directories are modified with master's name. Parameters ---------- master_name : str master node hostname. hadoop_dir : str hadoop base folder. app_dir : str base app folder. conf_dir : str base configuration folder. suffix_conf : str suffix for configuration folder. output_dir : str base output folder. suffix_out : str suffix for output folder (only the string after the last "/" is taken). v : int verbose if 1. file_log : file handler handler for log file. Returns ------- app_dir : str path to app folder (modified for this master). conf_dir : str path to configuration folder (for modified config file and bash scripts). hadoop_conf_dir : str path to hadoop configuration folder for master node. hadoop_default_conf_dir : str path to hadoop default configuration folder (to be used at slaves nodes). output_dir : str path in local filesystem for output file. Notes ----- Having a different folder associated to the master node allows to run multiple deployments in the same cluster if the local filesystem is NFS. """ if "/" in suffix_out: suffix_out=suffix_out.split('/')[-1] conf_dir+=master_name + "/" # Create conf dir for each node (still shared...) app_dir+=master_name + "/" # Create app dir for each node (still shared...) hadoop_conf_dir = conf_dir + "etc_hadoop_" + suffix_conf +"/" # Hadoop conf dir for this host output_dir = output_dir + suffix_out +"/" # Create more folders for different experiments create_directories(directories=[conf_dir,hadoop_conf_dir,output_dir],v=v,file_log=file_log) # This is for slaves, for distributing files hadoop_default_conf_dir = hadoop_dir+HADOOP_REL_ETC return([app_dir,conf_dir,hadoop_conf_dir,hadoop_default_conf_dir,output_dir])
[docs]def reduce_list_nodes(num_slaves,nodes_list,v=1,file_log=sys.stdout): """ Reduce list of nodes given a maximum number of nodes. Parameters ---------- num_slaves : int maximum number of slaves (-1 for no maximum). nodes_list : list of str names of nodes. Output: ------- num_slaves: number of nodes in updated list. nodes_list: updated nodes_list. """ if num_slaves>0 and num_slaves<len(nodes_list): nodes_list=nodes_list[:num_slaves] if v==1: print("\nForcing number of nodes: ".ljust(24) + str(num_slaves),file=file_log) else: num_slaves=len(nodes_list) return([num_slaves,nodes_list])