lib_mapredcorr module¶
Main functions for performing the mapreduce correlation through Hadoop (and pipeline).
-
lib_mapredcorr.
create_inter_sh
(filename, python_x, command, temp_log, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]¶ Create script with python call for mapper/reducer. Devised to avoid issues with arguments. Currently used for creating the mapper and reducer scripts passed to hadoop.
- filename
- filename for resulting bash script with python call.
- python_x
- python executable.
- command
- python script plus arguments.
- temp_log
- path to temporary (buffer) file for system calls.
- v
- verbose if 1.
- file_log
- handler for log file.
N/A
-
lib_mapredcorr.
get_mapper_params_str
(stations, num_pols, fft_size, accumulation_time, signal_start, signal_duration, first_frame_num, num_frames, codecs_serial, auto_stations, auto_pols, ini_stations, ini_media, ini_delays, fft_at_mapper, internal_log_mapper, ffts_per_chunk, windowing, one_baseline_per_task, phase_calibration, min_mapper_chunk, max_mapper_chunk, task_scaling_stations, single_precision)[source]¶ Returns string with all the parameters to call the mapper.
- stations
- number of stations.
- num_pols
- [not used in all-baselines-per-task mode] number of polarizations.
- fft_size
- [not used if fft in reducer?] fft length from the configuration.
- accumulation_time
- duration of the accumulation period (float) [s] .
- signal_start
- start time for the experiment (float) [s] .
- signal_duration
- duration of the experiment (float) [s] .
- first_frame_num
- [only testing] -1 by default. Discard frames with id lesser than this value.
- num_frames
- [only testing] -1 by default. If >0 discard frames with id greater than this value.
- If both first_frame_num and num_frames are <0, all frames are processed.
- codecs_serial
- [only testing] “” by default. Serialized version of codecs used for comrpession.
- auto_stations
- [not used in all-baselines-per-task mode] controls pairs generation (see msvf.calculate_corr_pairs())
- auto_pols
- [not used in all-baselines-per-task mode] controls pairs generation (see msvf.calculate_corr_pairs())
- ini_stations
- string with stations ini file name.
- ini_media
- string with media ini file name.
- ini_delays
- string with delays ini file name.
- fft_at_mapper
- [0 by default]. Initially devised to allow configuration of FFT in mapper or reducer.
- internal_log_mapper
- [unused]
- ffts_per_chunk
- [-1 by default]. If -1, all the samples in the channel go into the same line. Other values allow to
- control the number of samples that go into the same line, but this feature is discontinued.
- windowing : str
- window before FFT, default value is “square”.
- one_baseline_per_task: bool
- boolean to activate one-baseline-per-task mode.
- phase_calibration
- [unused].
- min_mapper_chunk
- [-1 by default]
- max_mapper_chunk
- [-1 by default]
- task_scaling_stations
- 0 for all-baselines-per-task mode, 1 to activate linear scaling with number of stations.
- single_precision
- [unused]
- mapper_params_str : str
- parameters to call mapper.
TO DO:Automate finding max number of polarizations.Remove all unused arguments.Consider adding paths for long ini files, currently this is done only for delays.ini.Consider removing fft_size, fft always in reducer.
-
lib_mapredcorr.
get_mr_command
(app_dir, script, params)[source]¶ Script for creating line with call to mapper/reducer with parameters.
-
lib_mapredcorr.
get_options_custom_partitioner
(use_nohash_partitioner=1)[source]¶ Options for custom partitioner.
-
lib_mapredcorr.
get_options_fixed_length_records
(record_size)[source]¶ Options for fixed length records as input, instead of text.
TO DO:This needs work.
-
lib_mapredcorr.
get_options_logging
(log_properties)[source]¶ Options for log4j logging.
- log_properties
- path to log4j.properties file.
TO DO:Currently not working, and thus “log_properties” is copied to a specific folder (see un_mapreduce_sh()).
-
lib_mapredcorr.
get_options_partitioning
(field_sep, key_fields, key_field_sep, part_opts, comp_opts)[source]¶ Get partitioning options.
- field_sep
- field separator.
- key_fields
- number of key fields.
- key_field_sep
- key field separator.
- part_opts
- partitioning options.
- comp_opts
- comparator options.
- option1 : str
- java options (relative to partitioning) for job submission.
-
lib_mapredcorr.
get_options_text_delimiter
(hadoop_text_delimiter)[source]¶ Options for text delimiter.
-
lib_mapredcorr.
get_reducer_params_str
(codecs_serial, fft_at_mapper, internal_log_reducer, fft_size, windowing, phase_calibration, single_precision)[source]¶ Returns string with all the parameters to call the reducer.
- codecs_serial
- [only testing] “” by default. Serialized version of codecs used for comrpession.
- fft_at_mapper
- same variable used to call get_mapper_params_str(), inverted here.
- internal_log_reducer
- [unused]
- fft_size
- fft length from the configuration.
- windowing
- window type before FFT, “square” by default.
- phase_calibration
- if 1 phase calibration tones will be extracted.
- single_precision
- boolean to control data types for unpacked samples.
- reducer_params_str : str
- parameters to call mapper.
-
lib_mapredcorr.
pipeline_app
(python_x, stations, input_files, app_dir, mapper, reducer, fft_size, fft_at_mapper, accumulation_time, signal_start, signal_duration, first_frame_num, num_frames, data_dir, output_dir, output_sym, auto_stations, auto_pols, num_pols, codecs_serial, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, file_out='vt4.txt', ini_stations='none', ini_media='none', ini_delays='none', internal_log_mapper=1, internal_log_reducer=1, ffts_per_chunk=1, windowing='square', one_baseline_per_task=True, phase_calibration=0, min_mapper_chunk=-1, max_mapper_chunk=-1, task_scaling_stations=0, sort_output=1, single_precision=0, profile_map=0, profile_red=0, timestamp_str='')[source]¶ Perform correlation through pipeline execution (that is, without hadoop). All the data is passed through the mapper, then the results are sorted and passed through the reducer.
See table below.list with start time, end time and duration of the execution in seconds.- Note that the environment variable map_input_file is modified for each processed file to emulate the hadoop behavior
- (and thus provide access to the mapper to the name of the file currently being processed.
get_mapper_params_str() get_reducer_params_str() stations x fft_size x x fft_at_mapper x x accumulation_time x signal_start x signal_duration x first_frame_num x num_frames x auto_stations x auto_pols x num_pols x codecs_serial x x ini_stations x ini_media x ini_delays: x internal_log_mapper: x internal_log_reducer: x ffts_per_chunk: x windowing: x x one_baseline_per_task: x phase_calibration: x x min_mapper_chunk: x max_mapper_chunk: x task_scaling_stations: x single_precision: x x python_x: str with python executable. input_files: list with filenames for the media. app_dir: - path with the location of the .py files for mapper,
- reducer and all their dependencies.
mapper: mapper .py filename. reducer: reducer .py filename. data_dir: path with the location for the media. output_dir: path for the intermediate and output files. output_sym: - path for the symbolic link to the output file
- (typically sub-path in experiment folder).
v: boolean to activate verbose mode. file_log: file for logging. file_out: output file. sort_output: - [0 by default] boolean to activate the sorting of the
- output.
profile_map: - [0 by default] 1 to profile mapper using pycallgraph,
- 2 to profile using cProfile.
profile_red: - [0 by default] 1 to profile reducer using pycallgraph,
- 2 to profile using cProfile.
-
lib_mapredcorr.
run_mapreduce_sh
(record_size, jobsh, mappersh, reducersh, app_dir, hadoop_dir, hadoop_conf_dir, folder_deps, files_deps, add_deps, mapper, reducer, hdfs_data_dir, hdfs_output_file, output_hadoop, text_mode, hadoop_text_delimiter, output_dir, output_sym, temp_log, packets_per_hdfs_block, total_frames, total_partitions, adjust_mappers, adjust_reducers, use_nohash_partitioner=1, use_lustre_plugin=0, lustre_user_dir='r/ajva/', num_slaves=1, num_vcores=1, one_baseline_per_task=True, sort_output=1, bm_delete_output=0, bypass_reduce=0, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]¶ Perform correlation through hadoop. Requires hadoop started (see lib_hadoop_hdfs.py).
- record_size
- [0 by default] If 1 will use fixed length records in Hadoop, testing only.
- jobsh
- bash file to write complete call to hadoop for job submission.
- mappersh
- bash file with complete call to the mapper.
- reducersh
- bash file with complete call to the reducer.
- app_dir
- path to mapper and reducer .py files.
- hadoop_dir
- base path of the hadoop installation.
- hadoop_conf_dir
- path with hadoop configuration files.
- folder_deps
- path to the dependencies for the mapper and the reducer.
- files_deps
- list of strings with the filenames of the dependencies (.py files).
- add_deps
- list of strings with paths for additional dependencies (.ini files if applicable)
- mapper
- filename of the mapper (.py).
- reducer
- filename of the reducer (.py).
- hdfs_data_dir
- working path in HDFS or Lustre.
- hdfs_output_file
- filename of the output file in the working path (HDFS or Lustre).
- output_hadoop
- filename of the output file in a local folder (output_dir).
- text_mode
- [1 by default] 0 for binary, only testing.
- hadoop_text_delimiter
- text delimiter for input data at mapper, see notes below.
- output_dir
- output folder (local).
- output_sym
- path (local) for the symbolic link to the output file (typically sub-path in experiment folder).
- temp_log
- filename for temporary logging.
- packets_per_hdfs_block
- number of frames per mapper.
- total_frames
- total number of frames in all the VDIF files to be processed.
- total_partitions
- maximum number of reducers.
- adjust_mappers
- force the calculated number of mappers to be multiplied by this factor.
- adjust_reducers
- force the calculated number of reducers to be multiplied by this factor.
- use_nohash_partitioner
- 0 for default partitioner, 1 for nohash partitioner (better load balancing).
- use_lustre_plugin
- boolean to allow Hadoop to work directly in Lustre instead of HDFS.
- lustre_user_dir
- absolute path for the Lustre working path.
- num_slaves
- [unused] included to have the option to control the number of mappers reducers based on this.
- num_vcores
- [unused] included to have the option to control the number of mappers reducers based on this
one_baseline_per_task [0 by default] sort_output
[0 by default]- bm_delete_output
- delete ouput file if 1 (only for benchmarking).
- bypass_reduce
- do not run reduce phase if 1 (so that output is directly that of the mappers), use only for debugging.
- v
- 0 by default, 1 for verbose mode.
- file_log
- logging file.
- start_time
- number of seconds when the job is launched.
- end_time
- number of seconds when the job finishes.
- elapsed_time
- duration of the execution of the job [s].
- ret_start_time
- number of seconds when the output file is requested to the working filesystem (HDFS/Lustre).
- ret_end_time
- number of seconds when the output file gets to the local output folder.
- ret_elapsed_time
- duration of the retrieval of the output file from HDFS/Lustre to the local output folder [s].
- sort_start_time
- number of seconds when the output file sort starts.
- sort_end_time
- number of seconds when the output file sort starts.
- sort_elapsed_time
- duration of the output file sort [s].
Configuration:-Controling the number of mappers and reducers:packets_per_hdfs_block and total frames control the number of mappers.total_partitions controls the number of reducers.adjust_mappers and adjust_reducers allow to modify the number or mappers/reducers.*The number of reducers allows finer tunning:-if adjust_reducers==0, the reducer phase is bypassed.-if adjust_reducers<0, the number of reducers is fixed to the absolute value of the integer given.-Configuration of the input reader:Hadoop is currently used in text mode to process binary data. Until an implementation with full binary support,the text mode is used. In this mode Hadoop splits the input blocks if it finds this delimiter, so it has tobe configured to minimize its probability. Need to find a more elegant solution.-Configuration files:Use a different hadoop_conf_dir for each node, otherwise there will be conflicts in clusters with shared filesystems.-Filesystem:Use Lustre if possible.Dependencies:-Lustre:Lustre support based on the pluging in https://github.com/Seagate/lustrefs.-Partitioner:The custom partitioner KeyFieldBasedPartitionerNH (NH for no hash) has to be used to avoid unbalanced loads on the reducers,due to the default behavior by hashing the keys.Notes:Note that in this case if adjust_mappers and adjust_reducers are -1 that does not mean that are computedautomatically (this is the usual convention in the code), but however that only 1 mapper or 1 reducer are used(see “Configuration” for more details).TO DO:Implement native binary support, instead of using text mode.Document sorting.