Source code for mapred_cx

# -*- coding: utf-8 -*-
# <nbformat>3.0</nbformat>

# <codecell>

#!/usr/bin/env python
#
#The MIT CorrelX Correlator
#
#https://github.com/MITHaystack/CorrelX
#Contact: correlX@haystack.mit.edu
#Project leads: Victor Pankratius, Pedro Elosegui Project developer: A.J. Vazquez Alvarez
#
#Copyright 2017 MIT Haystack Observatory
#
#Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#
#The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
#
#------------------------------
#------------------------------
#Project: CorrelX.
#File: mapred_cx.py.
#Author: A.J. Vazquez Alvarez (ajvazquez@haystack.mit.edu)
#Description: 
"""
Main script to run the CorrelX correlator.

Parameters
----------
|  [-c configuration_file] (optional): configuration file with the configuration for the correlation.
|  [-s log_output_folder] (optional): folder for logs.
|  [-f forced_parameters] (optional): comma sepparated assignments for overriding parameters (for test batching).
|  [--help-parameters] (optional): show a list of all parameters available to override the configuration file.

Returns
-------
|  Correlation results: "Output directory" in configuration file. 
|                       Symbolic link to output file added to experiment folder.
|  Log files:           Folder specified in -s option.

Notes
-----
|
| **Example:**
|
|  python mapred_cx -n 10.0.2.4 -c basic_files_conf/configh.conf -s exp3 -f exper=/home/hduser/basic_files_data/ini_files_eht_two,fftr=1
|
|
| **TO DO:**
|
|  More detailed documentation.

"""
#History:
#initial version: 2016.11 ajva
#MIT Haystack Observatory

from __future__ import print_function
import sys
import os
import time
import imp
import argparse
import numpy as np

import const_config
imp.reload(const_config)
from const_config import *

import const_hadoop
imp.reload(const_hadoop)
from const_hadoop import *

import lib_config
imp.reload(lib_config)
from lib_config import *

import lib_ini_exper
imp.reload(lib_ini_exper)
from lib_ini_exper import *

import lib_mapredcorr
imp.reload(lib_mapredcorr)
from lib_mapredcorr import *

import lib_hadoop_hdfs
imp.reload(lib_hadoop_hdfs)
from lib_hadoop_hdfs import *

import lib_net_stats
imp.reload(lib_net_stats)
from lib_net_stats import *

# Vector quantization                           # VQ disabled
#import lib_vq
#imp.reload(lib_vq)
#from lib_vq import *


    



            
            #print("\nSpeedup",file=file_log)  
            #print(" " + "Nodes".ljust(10)  + "V. Cores".ljust(10) + "Speedup",file=file_log) 
            #
            #base_time = exec_times[0][5]
            #for i in exec_times:
            #    print(" " + str(i[1]).ljust(10) + str(i[2]).ljust(10)  + str(base_time/max(i[5],1)) + add_txt,file=file_log) 









if __name__ == '__main__':
    
    # Network stats
    NETWORK_STATS=0
    # Verbose
    v=1
    
    
    
    # Default values for configuration:
    config_file = "configh.conf"
    config_suffix = "_mod"
    output_log_folder = time.strftime("e%Y%m%d_%H%M%S")
    forced_params =""
    nodes_list=os.uname()[1] # Default first node only #""

    # Configuration for parameter help
    const_config_file=os.path.dirname(sys.argv[0])+"/const_config.py"    # File with configuration constants
    str_help_param="C_ARG_"                                              # Prefix for constants to be displayed
    len_str_help_param=len(str_help_param)

    timestamp_str = time.strftime("%Y%m%d_%H%M%S")




    # Argument parser configuration
    cparser = argparse.ArgumentParser(description='CorrelX')
    
    # TO DO: change option -s for -o
    # Configuration file
    cparser.add_argument('-c', action="store",\
                         dest="configuration_file",default=config_file,\
                         help="Specify a configuration file.")
     
    cparser.add_argument('-n', action="store",\
                         dest="nodes_list",default=nodes_list,\
                         help="Specify a comma-separated list of nodes.")
    
    # Suffix folder (for having log and output files of different simulations in different folders)
    cparser.add_argument('-s', action="store",\
                         dest="output_log_folder",default=output_log_folder,\
                         help="Specify a folder to store the output log files.")
    
    # Forced parameters (for overriding configuration from the configuration file)
    cparser.add_argument('-f', action="store",\
                         dest="forced_params",default=forced_params,\
                         help="Specify a comma-separated list of parameter=value to override "+\
                         "the configuration file(see --help-parameters).")
    
    cparser.add_argument('--help-parameters',action="store_true",\
                         dest="help_parameters",default=False,\
                         help="Show all parameters for option -f.")
    
    
    # Get arguments
    args = cparser.parse_args()   
    config_file = args.configuration_file
    output_log_folder = args.output_log_folder
    forced_params = args.forced_params

    nodes_list = args.nodes_list
    NODES_LIST=nodes_list.split(',')


    # Help for parameter overrides
    if args.help_parameters:
        # Show help and exit
        print("")
        print("Showing available configuration parameters:")
        print("")
        with open(const_config_file, 'r') as f_const:
            for line in f_const:
                if line[:len_str_help_param]==str_help_param:
                    line_split=line.strip().split('\"')
                    if line_split[3]=="display_in_help":
                        print(line_split[1]+line_split[2])
        print("")
        print("Example: python mapred_cx.py -f ppb=5000,slowstart=0.95")
        print("")
    else:
        
    
        # Timing results
        exec_times = []             #  Execution times
        output_files_list = []      #  Output files
        io_times = []               #  Times to copy files local <-> HDFS (these are estimated times!)
        
        # Network stats
        [v_str_hadoop,v_stats_param,v_stats_values_s,v_stats_values_e,v_stats_ping,v_stats_ping] = init_net_stats()
        
        # Prepare suffix for output and log files
        suffix_host=""
        with os.popen("hostname",'r',1) as f_out:
            for line in f_out:
                suffix_host=line.strip()
                break
        suffix_log="_"+time.strftime("%Y%m%d_%H%M%S")+"_"+suffix_host
        config_suffix+=suffix_log
        
        
        
        # Initilialize logging
        # Read only log file from configuration file     
        [FILE_LOG,TEMP_LOG] = get_log_file(config_file=config_file,suffix=suffix_log,output_log_folder=output_log_folder)
        
        
        print_header(header="Configuration",v=v,file_log=FILE_LOG)
        
        # Get files to get list of nodes
        #NODES_EXEC_FILE = get_nodes_file(config_file)+"_"+suffix_host
        
        if NODES_LIST==[]:
            if v==1:
                #print("\n[!] Error with hosts file! " + NODES_EXEC_FILE + "\n     Try running: ' ./get_nodelist.sh > " + NODES_EXEC_FILE + " ' before launching srun.",file=FILE_LOG)
                print("\n[!] Error with nodes list.",file=FILE_LOG)
        else:
            
            master_reduced=NODES_LIST[0]
    
            # Check if this node is master
            [is_master,my_name,my_ip] = is_this_node_master(master=master_reduced,temp_log=TEMP_LOG,v=1,file_log=FILE_LOG)
        
            # Substitute localhost with master ip/name (and update config_file to avoid overwriting template)
            config_file = get_config_mod_for_this_master(config_file,config_suffix,NODES_LIST[0],sys.argv[0])
            
            # Override configuration parameters
            override_configuration_parameters(forced_configuration_string=forced_params,config_file=config_file,\
                                              v=v,file_log=FILE_LOG)
        
            if v==1:
                print("\nConfig file updated: " + config_file,file=FILE_LOG)  
        
            
            
        
            # Read constants from configuration file
            [MAPPER, REDUCER, DEPENDENCIES, PACKETS_PER_HDFS_BLOCK,CHECKSUM_SIZE,\
                SRC_DIR,APP_DIR, CONF_DIR, TEMPLATES_CONF_DIR, TEMPLATES_ENV_DIR, HADOOP_DIR,HADOOP_CONF_DIR,NODES, \
                MAPPERSH,REDUCERSH,JOBSH,PYTHON_X,\
                USERNAME_MACHINES,MAX_SLAVES,SLAVES,MASTERS,MASTER_IS_SLAVE,HADOOP_TEMP_DIR,DATA_DIR,DATA_DIR_TMP,HDFS_DATA_DIR,HADOOP_START_DELAY,HADOOP_STOP_DELAY,\
                PREFIX_OUTPUT,HADOOP_TEXT_DELIMITER,OUTPUT_DIR,OUTPUT_SYM,RUN_PIPELINE,RUN_HADOOP,MAX_CPU_VCORES,\
                HDFS_REPLICATION,OVER_SLURM,HDFS_COPY_DELAY,\
                FFT_AT_MAPPER,INI_FOLDER,\
                INI_STATIONS, INI_SOURCES, INI_DELAY_MODEL, INI_DELAYS, INI_MEDIA, INI_CORRELATION,\
                INTERNAL_LOG_MAPPER,INTERNAL_LOG_REDUCER,ADJUST_MAPPERS,ADJUST_REDUCERS,FFTS_PER_CHUNK,TEXT_MODE,\
                USE_NOHASH_PARTITIONER,USE_LUSTRE_PLUGIN,LUSTRE_USER_DIR,LUSTRE_PREFIX,ONE_BASELINE_PER_TASK,\
                MIN_MAPPER_CHUNK,MAX_MAPPER_CHUNK,TASK_SCALING_STATIONS,SORT_OUTPUT,BM_AVOID_COPY,\
                BM_DELETE_OUTPUT,TIMEOUT_STOP,SINGLE_PRECISION,PROFILE_MAP,PROFILE_RED] = \
                    get_configuration(v=v,config_file=config_file,timestamp_str=timestamp_str,file_log=FILE_LOG)
    
            # Check errors in experiment .ini files
            init_success = check_errors_ini_exper(DATA_DIR,INI_FOLDER,INI_STATIONS,INI_SOURCES,INI_DELAY_MODEL,INI_MEDIA)
    
    
    
            if init_success==0:
                print("Failed initialization, exiting!")
    
            else:
    
                # Get configuration and output folders (specific for this master)
                [APP_DIR,CONF_DIR,HADOOP_CONF_DIR,HADOOP_DEFAULT_CONF_DIR,OUTPUT_DIR] = get_conf_out_dirs(master_name=my_name,\
                                                                                                hadoop_dir=HADOOP_DIR,\
                                                                                                app_dir=APP_DIR,\
                                                                                                conf_dir=CONF_DIR,\
                                                                                                suffix_conf=suffix_host,\
                                                                                                output_dir=OUTPUT_DIR,\
                                                                                                suffix_out=output_log_folder,\
                                                                                                v=v,file_log=FILE_LOG)
                                                                                                
    
                num_slaves = MAX_SLAVES
                num_vcores = MAX_CPU_VCORES        
            
            
                # Reduce node list if required by configuration
                [num_slaves,NODES_LIST] = reduce_list_nodes(num_slaves=num_slaves,nodes_list=NODES_LIST,v=v,file_log=FILE_LOG)
                
                # Overwrite nodes file
                overwrite_nodes_file(nodes_list=NODES_LIST,nodes_file=CONF_DIR+NODES,v=v,file_log=FILE_LOG)   
                    
                              
                 
                # Distribute Hadoop configuration files
                # This is required for all nodes to have the proper configuration files. 
                # This includes all .xml files but also
                #   .sh scripts with environment setup.
                # Note that HADOOP_CONF_DIR should be a different folder for every deployment i.e. for every different master,
                #   and thus name should depend on master name.
                distribute_files(simply_copy_local=OVER_SLURM,file_group="Hadoop config files",v=v,exec_permission=0,file_log=FILE_LOG,\
                                 files=["*"],source_dir=TEMPLATES_ENV_DIR,conf_dir=CONF_DIR,\
                                 destination_dir=HADOOP_CONF_DIR,nodes=NODES,username=USERNAME_MACHINES,temp_log=TEMP_LOG,\
                                 force_node=','.join(NODES_LIST))
            
            
                
                # Process .ini files
                [stations_serial_str,media_serial_str,\
                 correlation_serial_str,delays_serial_str,\
                 AUTO_STATIONS, AUTO_POLS, FFT_SIZE, \
                 ACCUMULATION_TIME, STATIONS, \
                 REF_EPOCH, SIGNAL_START, \
                 SIGNAL_DURATION,INPUT_FILES,\
                 FIRST_FRAME_NUM,NUM_FRAMES,CODECS_SERIAL,\
                 max_packet_size,total_frames,\
                 total_partitions,windowing,\
                 PHASE_CALIBRATION,delay_error,error_str_v,NUM_POLS] = process_ini_files(DATA_DIR,\
                                                                INI_STATIONS,\
                                                                INI_SOURCES,\
                                                                INI_DELAY_MODEL,\
                                                                INI_DELAYS,\
                                                                INI_MEDIA,\
                                                                INI_CORRELATION, ONE_BASELINE_PER_TASK)
                # TO DO: write a library for error checking.
                if delay_error is None:
                    init_success=0
                    print("ERROR: Incomplete data in delay model file! Exiting...")
                
                
                elif error_str_v!=[]:
                    init_success=0
                    print("")
                    for i in error_str_v:
                        print(i)
                        
                
                if init_success==1:
                    
                    # Pipeline mode
    
                    print_header(header="Pipeline execution",v=v,file_log=FILE_LOG)
                    if (is_master) and (RUN_PIPELINE):
                        # Pipeline application execution
                        pipeline_output_file=PREFIX_OUTPUT + "_s" + str(0) + "_v" + str(0) + ".out"
                        [pipeline_t_s,pipeline_t_e,pipeline_d] = pipeline_app(python_x=PYTHON_X,\
                                                                     stations=STATIONS,\
                                                                     input_files=INPUT_FILES,\
                                                                     app_dir=SRC_DIR,\
                                                                     mapper=MAPPER,\
                                                                     reducer=REDUCER,\
                                                                     fft_size=FFT_SIZE,\
                                                                     fft_at_mapper=FFT_AT_MAPPER,\
                                                                     accumulation_time=ACCUMULATION_TIME,\
                                                                     signal_start=SIGNAL_START,\
                                                                     signal_duration=SIGNAL_DURATION,\
                                                                     first_frame_num=FIRST_FRAME_NUM,\
                                                                     num_frames=NUM_FRAMES,\
                                                                     data_dir=DATA_DIR,\
                                                                     output_dir=OUTPUT_DIR,\
                                                                     output_sym=OUTPUT_SYM,\
                                                                     auto_stations=AUTO_STATIONS,\
                                                                     auto_pols=AUTO_POLS,\
                                                                     num_pols=NUM_POLS,\
                                                                     codecs_serial=CODECS_SERIAL,\
                                                                     v=v,\
                                                                     file_log=FILE_LOG,\
                                                                     file_out=pipeline_output_file,\
                                                                     ini_stations=INI_STATIONS,\
                                                                     ini_media=INI_MEDIA,\
                                                                     ini_delays=INI_DELAYS,\
                                                                     internal_log_mapper=INTERNAL_LOG_MAPPER,\
                                                                     internal_log_reducer=INTERNAL_LOG_REDUCER,\
                                                                     ffts_per_chunk=FFTS_PER_CHUNK,\
                                                                     windowing=windowing,\
                                                                     one_baseline_per_task=ONE_BASELINE_PER_TASK,\
                                                                     phase_calibration=PHASE_CALIBRATION,\
                                                                     min_mapper_chunk=MIN_MAPPER_CHUNK,\
                                                                     max_mapper_chunk=MAX_MAPPER_CHUNK,\
                                                                     task_scaling_stations=TASK_SCALING_STATIONS,\
                                                                     sort_output=SORT_OUTPUT,\
                                                                     single_precision=SINGLE_PRECISION,\
                                                                     profile_map=PROFILE_MAP,\
                                                                     profile_red=PROFILE_RED,\
                                                                     timestamp_str=timestamp_str)
                        
            
                        
                        exec_times+=[["Pipeline", 0, 0, pipeline_t_s, pipeline_t_e, pipeline_d]]
                        output_files_list+=[pipeline_output_file]
                    
                    
                    else:
                        exec_times+=[["Pipeline", 0, 0, 0, 0, 0]]
                        if v==1:
                            print("\nNot executed based on configuration.",file=FILE_LOG)
                
                    
                    
                    
                    #Hadoop execution
                    
                    
                    
                    print_header(header="MapReduce",v=v,file_log=FILE_LOG)

                    if RUN_HADOOP:   
                        # Process configuration files
                        [list_configurations,pairs_config] = get_list_configuration_files(config_file)
                        configuration_files = process_hadoop_config_files(list_configurations,pairs_config,\
                                                                          templates_dir=TEMPLATES_CONF_DIR,\
                                                                          conf_dir=CONF_DIR,v=v,\
                                                                          file_log=FILE_LOG)
                    
    
                        # Distribute configuration files
                    
                        # Copy configuration files to all nodes
                        files = configuration_files
                        # This node
                        distribute_files(simply_copy_local=1,file_group="Conf - first node",v=v,exec_permission=0,file_log=FILE_LOG,\
                                     files=files,source_dir=CONF_DIR,conf_dir=CONF_DIR,destination_dir=HADOOP_CONF_DIR,\
                                     nodes=NODES,username=USERNAME_MACHINES,temp_log=TEMP_LOG,force_node=','.join(NODES_LIST))
                        # Other nodes
                        distribute_files(simply_copy_local=OVER_SLURM,file_group="Conf",v=v,exec_permission=0,file_log=FILE_LOG,\
                                     files=files,source_dir=CONF_DIR,conf_dir=CONF_DIR,destination_dir=HADOOP_CONF_DIR,\
                                     nodes=NODES,username=USERNAME_MACHINES,temp_log=TEMP_LOG,force_node=','.join(NODES_LIST))
            
            
            
                        # Configure application and distribute it to nodes
            
            
            
            
                        # Interm. sh files for recovering LD_LIBRARY_PATH and calling the mapper and reducer
                        # Write these to CONF_DIR, otherwise they may be overwritten (by other tasks running in other nodes)!
                        
                        

                        # Additional dependencies
                        add_deps=[INI_DELAYS,INI_MEDIA,INI_STATIONS]
                        ini_delays_dep = INI_DELAYS.split("/")[-1]
                        ini_media_dep = INI_MEDIA.split("/")[-1]
                        ini_stations_dep = INI_STATIONS.split("/")[-1]                        
                        print("Additional dependencies:")
                        print(" "+','.join(add_deps))
            
            
            
                        # Create script for mapper
                        params_mapper=get_mapper_params_str(STATIONS,NUM_POLS,FFT_SIZE,ACCUMULATION_TIME,SIGNAL_START,SIGNAL_DURATION,\
                                                   FIRST_FRAME_NUM,NUM_FRAMES,CODECS_SERIAL,\
                                                   AUTO_STATIONS,AUTO_POLS,ini_stations_dep,ini_media_dep,\
                                                   ini_delays_dep,FFT_AT_MAPPER,INTERNAL_LOG_MAPPER,FFTS_PER_CHUNK,\
                                                   windowing,\
                                                   one_baseline_per_task=ONE_BASELINE_PER_TASK,\
                                                   phase_calibration=PHASE_CALIBRATION,min_mapper_chunk=MIN_MAPPER_CHUNK,
                                                   max_mapper_chunk=MAX_MAPPER_CHUNK,task_scaling_stations=TASK_SCALING_STATIONS,\
                                                   single_precision=SINGLE_PRECISION)
                        command_map = get_mr_command(app_dir=APP_DIR,script=MAPPER,params=params_mapper)
                        create_inter_sh(CONF_DIR+MAPPERSH,PYTHON_X,command_map,temp_log=TEMP_LOG,v=v,file_log=FILE_LOG)
                        
                        
                        
                        
                        # Get script for reducer
                        params_reducer=get_reducer_params_str(CODECS_SERIAL,FFT_AT_MAPPER,INTERNAL_LOG_REDUCER,FFT_SIZE,windowing,\
                                                              PHASE_CALIBRATION,SINGLE_PRECISION)
                        command_red = get_mr_command(app_dir=APP_DIR,script=REDUCER,params=params_reducer)
                        create_inter_sh(CONF_DIR+REDUCERSH,PYTHON_X,command_red,temp_log=TEMP_LOG,v=v,file_log=FILE_LOG)
            
            
            
            
            
                        # Copy application files to all nodes (master-associated folder)
                        # TO DO: add add_deps too
                        files = DEPENDENCIES
                        files.extend([MAPPER])
                        files.extend([REDUCER])
                        distribute_files(simply_copy_local=OVER_SLURM,file_group="App",v=v,exec_permission=1,file_log=FILE_LOG,\
                                     files=files,source_dir=SRC_DIR,conf_dir=CONF_DIR,\
                                     destination_dir=APP_DIR,nodes=NODES,username=USERNAME_MACHINES,temp_log=TEMP_LOG,\
                                     force_node=','.join(NODES_LIST))
                        files_sh = [MAPPERSH, REDUCERSH]
                        distribute_files(simply_copy_local=OVER_SLURM,file_group="App-sh",v=v,exec_permission=1,file_log=FILE_LOG,\
                                     files=files_sh,source_dir=CONF_DIR,conf_dir=CONF_DIR,\
                                     destination_dir=APP_DIR,nodes=NODES,username=USERNAME_MACHINES,temp_log=TEMP_LOG,\
                                     force_node=','.join(NODES_LIST))
            
            
            
                        # Stop Hadoop in case it's running
    
                        nodes_to_slaves_masters(conf_dir=CONF_DIR,hadoop_conf_dir=HADOOP_CONF_DIR,file_nodes=NODES,file_slaves=SLAVES,\
                                        file_masters=MASTERS,max_slaves=MAX_SLAVES,master_is_slave=MASTER_IS_SLAVE,v=v,file_log=FILE_LOG)
                        
                        cluster_stop(wait_time=HADOOP_STOP_DELAY,hadoop_dir=HADOOP_DIR,hadoop_conf_dir=HADOOP_CONF_DIR,\
                                               timeout=TIMEOUT_STOP,v=v,file_log=FILE_LOG,temp_log=TEMP_LOG)
                        
                        # Shut down other hadoop processes (this needs to be done in another way)
                        if v==1:
                            print("\nForcing shutdown of hadoop processes (in case they were not shut down properly)...",file=FILE_LOG)
                            # TO DO?: comment line in start-dfs.sh which starts the secondary namenode 
                            
                        os.system("kill `ps -axu|grep hadoop|grep " + USERNAME_MACHINES + "|awk '{print $2}'` > " + TEMP_LOG)
                        if v==1:    
                            with open(TEMP_LOG, 'r') as f_log:
                                for line in f_log:
                                    print(" "+line.strip(),file=file_log)
                        
                    
                        # Iterate on number of slaves
                        hadoop_failed=0
                        
                        #process nodes file, for creating slaves file
                        nodes_to_slaves_masters(conf_dir=CONF_DIR,hadoop_conf_dir=HADOOP_CONF_DIR,file_nodes=NODES,file_slaves=SLAVES,\
                                    file_masters=MASTERS,max_slaves=num_slaves,master_is_slave=MASTER_IS_SLAVE,v=v,file_log=FILE_LOG)
                    
                    
                    
                    
                        # Update replication (check that it is <= than the number of nodes
                        replication_forced = HDFS_REPLICATION
                        if num_slaves<=HDFS_REPLICATION:
                            replication_forced = num_slaves
                            replication_file = 'hdfs-site.xml'
                            if v==1:
                                print("\nUpdating " + replication_file + "...",file=FILE_LOG)
        
                            replication_config = [('Configuration file',replication_file),(C_H_HDFS_REPLICATION,str(num_slaves))]
                            processed_file = process_hcfile(v=v,file_log=FILE_LOG,pairs=replication_config,conf_dir=CONF_DIR,\
                                                    templates_dir=CONF_DIR)
        
                            files=[replication_file]
                            # This node
                            distribute_files(simply_copy_local=1,file_group="Replication - first node",v=v,exec_permission=0,file_log=FILE_LOG,files=files,source_dir=CONF_DIR,conf_dir=CONF_DIR,\
                                 destination_dir=HADOOP_CONF_DIR,nodes=NODES,username=USERNAME_MACHINES,temp_log=TEMP_LOG,\
                                 force_node=','.join(NODES_LIST))
                            # Other nodes
                            distribute_files(simply_copy_local=OVER_SLURM,file_group="Replication",v=v,exec_permission=0,file_log=FILE_LOG,files=files,source_dir=CONF_DIR,conf_dir=CONF_DIR,\
                                 destination_dir=HADOOP_CONF_DIR,nodes=NODES,username=USERNAME_MACHINES,temp_log=TEMP_LOG,\
                                 force_node=','.join(NODES_LIST))
                
                
                            
                      
                    
    
                        # Start Hadoop
                        if hadoop_failed==0:
                            t_nodes = cluster_start(wait_time=HADOOP_START_DELAY,hadoop_conf_dir=HADOOP_CONF_DIR,hadoop_dir=HADOOP_DIR,file_slaves=SLAVES,\
                                  temp_dir=HADOOP_TEMP_DIR,username=USERNAME_MACHINES,single_node=OVER_SLURM,\
                                  use_lustre_plugin=USE_LUSTRE_PLUGIN,v=v,file_log=FILE_LOG,temp_log=TEMP_LOG)
                    
    
                        if t_nodes>0:
          
            
            
                            # Copy media files into distributed filesystem (HDFS or Lustre)
    
                            # (!)Time measurements are approximate!! print and wait statements in function!!
                            [put_t_s,put_t_e,put_d] = copy_files_to_hdfs(replication=HDFS_REPLICATION,\
                                    input_files=INPUT_FILES,data_dir=DATA_DIR,data_dir_tmp=DATA_DIR_TMP,hadoop_dir=HADOOP_DIR,\
                                    hadoop_conf_dir=HADOOP_CONF_DIR,hdfs_data_dir=HDFS_DATA_DIR,\
                                    packets_per_hdfs_block=PACKETS_PER_HDFS_BLOCK,\
                                    temp_log=TEMP_LOG,copy_delay=HDFS_COPY_DELAY,checksum_size=CHECKSUM_SIZE,text_mode=TEXT_MODE,\
                                    use_lustre_plugin=USE_LUSTRE_PLUGIN,lustre_prefix=LUSTRE_PREFIX,bm_avoid_copy=BM_AVOID_COPY,\
                                    v=v,file_log=FILE_LOG) 
                            io_times+=[["HDFS-put " + str(num_slaves) + "s-" + str(num_vcores)+ "v" , num_slaves, num_vcores, put_t_s,put_t_e,put_d]]
    
    
    
    
            
                            
                            hdfs_output_file = PREFIX_OUTPUT + "_s" + str(num_slaves) + "_v" + str(num_vcores)+ suffix_log +".out"
                            if NETWORK_STATS==1:
                                [stats_params_s,stats_values_s,stats_ping_s] = get_network_stats(nodes_list=NODES_LIST,over_slurm=OVER_SLURM,v=v,file_log=FILE_LOG)
                            
                            
                            
                            
                            
                            
                            # Run mapreduce
                            [hadoop_t_s,hadoop_t_e,hadoop_d,get_t_s,get_t_e,get_d,sort_t_s,sort_t_e,sort_d] = \
                              run_mapreduce_sh(record_size=max_packet_size,\
                                                jobsh=CONF_DIR+JOBSH,\
                                                mappersh=CONF_DIR+MAPPERSH,\
                                                reducersh=CONF_DIR+REDUCERSH,\
                                                app_dir=APP_DIR,\
                                                hadoop_dir=HADOOP_DIR,\
                                                hadoop_conf_dir=HADOOP_CONF_DIR,\
                                                folder_deps=APP_DIR,\
                                                files_deps=DEPENDENCIES,\
                                                add_deps=add_deps,\
                                                mapper=MAPPER,\
                                                reducer=REDUCER,\
                                                hdfs_data_dir=HDFS_DATA_DIR,\
                                                hdfs_output_file=hdfs_output_file,\
                                                output_hadoop=hdfs_output_file,\
                                                text_mode=TEXT_MODE,\
                                                hadoop_text_delimiter=HADOOP_TEXT_DELIMITER,\
                                                temp_log=TEMP_LOG,\
                                                packets_per_hdfs_block=PACKETS_PER_HDFS_BLOCK,\
                                                total_frames=total_frames,\
                                                total_partitions=total_partitions,\
                                                adjust_mappers=ADJUST_MAPPERS,\
                                                adjust_reducers=ADJUST_REDUCERS,\
                                                use_nohash_partitioner=USE_NOHASH_PARTITIONER,\
                                                use_lustre_plugin=USE_LUSTRE_PLUGIN,\
                                                lustre_user_dir=LUSTRE_USER_DIR,\
                                                num_slaves=num_slaves,\
                                                num_vcores=num_vcores,\
                                                one_baseline_per_task=ONE_BASELINE_PER_TASK,\
                                                sort_output=SORT_OUTPUT,\
                                                bm_delete_output=BM_DELETE_OUTPUT,\
                                                bypass_reduce=0,\
                                                v=v,\
                                                file_log=FILE_LOG,\
                                                output_dir=OUTPUT_DIR,\
                                                output_sym=OUTPUT_SYM)
                                 
                            if NETWORK_STATS==1:
                                [stats_params_e,stats_values_e,stats_ping_e] = get_network_stats(nodes_list=NODES_LIST,over_slurm=OVER_SLURM,v=v,file_log=FILE_LOG)
                                delta_stats = compute_txrx_bytes(stats_values_s,stats_values_e,v=v,file_log=FILE_LOG)
                            
                            # Logging results
                            str_hadoop = "Hadoop " + str(num_slaves) + "s-" + str(num_vcores)+ "v" + " "
                            exec_times+=[[str_hadoop , num_slaves, num_vcores, hadoop_t_s,hadoop_t_e,hadoop_d]]
                            output_files_list+=[hdfs_output_file]
                            str_hdfs_get = "HDFS-get " + str(num_slaves) + "s-" + str(num_vcores)+ "v" + " "
                            io_times+=[[str_hdfs_get , num_slaves, num_vcores, get_t_s,get_t_e,get_d]]
    
                            str_file_sort = "File-sort " + str(num_slaves) + "s-" + str(num_vcores)+ "v" + " "
                            io_times+=[[str_file_sort , num_slaves, num_vcores, sort_t_s,sort_t_e,sort_d]]
    
                            v_str_hadoop += [str_hadoop]
                            
                            if NETWORK_STATS==1:
                                v_stats_param += [stats_params_s]
                                v_stats_values_s += [stats_values_s]
                                v_stats_values_e += [stats_values_e]
                                #v_stats_ping += [stats_ping_s]
                            
                                # Get only network status after mr
                                v_stats_ping += [stats_ping_e]
    
                        else:
                            hadoop_failed=1
                            if v==1:
                                print("\nHadoop initialization failed!",file=FILE_LOG)
                                print(" Check http://localhost:8088/cluster/nodes/unhealthy",file=FILE_LOG)
                                print(" Check available storage",file=FILE_LOG)
                                print(" Check available ports",file=FILE_LOG)
                        
                        # Cluster is stopped for each iteration
                        cluster_stop(wait_time=HADOOP_STOP_DELAY,hadoop_dir=HADOOP_DIR,hadoop_conf_dir=HADOOP_CONF_DIR,\
                                               timeout=TIMEOUT_STOP,v=v,file_log=FILE_LOG,temp_log=TEMP_LOG)
        
                
                    # Show results
                    print_header(header="Results",v=v,file_log=FILE_LOG)
                    
                
                    # Print execution times to log file
                    if PROFILE_MAP:
                        if v==1:
                            print("Profiled mapper, see output folder for results.",file=FILE_LOG)
                    if PROFILE_RED:
                        if v==1:
                            print("Profiled reducer, see output folder for results.",file=FILE_LOG)
                            
                    print_execution_times(exec_times=exec_times,io_times=io_times,bypass_print=PROFILE_MAP or PROFILE_RED,v=v,file_log=FILE_LOG)
                    if NETWORK_STATS==1:
                        print_network_totals(NODES_LIST,v_str_hadoop,v_stats_param,v_stats_values_s,v_stats_values_e,v_stats_ping,v=v,file_log=FILE_LOG)
            
        # Close log file
        if FILE_LOG!=sys.stdout:
            FILE_LOG.close()
    
    
        
        # Delete temporary files
        os.system("rm " + TEMP_LOG)

# <codecell>