lib_mapredcorr module

Main functions for performing the mapreduce correlation through Hadoop (and pipeline).

lib_mapredcorr.create_inter_sh(filename, python_x, command, temp_log, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Create script with python call for mapper/reducer. Devised to avoid issues with arguments. Currently used for creating the mapper and reducer scripts passed to hadoop.

filename
filename for resulting bash script with python call.
python_x
python executable.
command
python script plus arguments.
temp_log
path to temporary (buffer) file for system calls.
v
verbose if 1.
file_log
handler for log file.
N/A
lib_mapredcorr.d_opt(param, value, extra_q=0)[source]

Create string “-D param=value”

lib_mapredcorr.get_mapper_params_str(stations, num_pols, fft_size, accumulation_time, signal_start, signal_duration, first_frame_num, num_frames, codecs_serial, auto_stations, auto_pols, ini_stations, ini_media, ini_delays, fft_at_mapper, internal_log_mapper, ffts_per_chunk, windowing, one_baseline_per_task, phase_calibration, min_mapper_chunk, max_mapper_chunk, task_scaling_stations, single_precision)[source]

Returns string with all the parameters to call the mapper.

stations
number of stations.
num_pols
[not used in all-baselines-per-task mode] number of polarizations.
fft_size
[not used if fft in reducer?] fft length from the configuration.
accumulation_time
duration of the accumulation period (float) [s] .
signal_start
start time for the experiment (float) [s] .
signal_duration
duration of the experiment (float) [s] .
first_frame_num
[only testing] -1 by default. Discard frames with id lesser than this value.
num_frames
[only testing] -1 by default. If >0 discard frames with id greater than this value.
If both first_frame_num and num_frames are <0, all frames are processed.
codecs_serial
[only testing] “” by default. Serialized version of codecs used for comrpession.
auto_stations
[not used in all-baselines-per-task mode] controls pairs generation (see msvf.calculate_corr_pairs())
auto_pols
[not used in all-baselines-per-task mode] controls pairs generation (see msvf.calculate_corr_pairs())
ini_stations
string with stations ini file name.
ini_media
string with media ini file name.
ini_delays
string with delays ini file name.
fft_at_mapper
[0 by default]. Initially devised to allow configuration of FFT in mapper or reducer.
internal_log_mapper
[unused]
ffts_per_chunk
[-1 by default]. If -1, all the samples in the channel go into the same line. Other values allow to
control the number of samples that go into the same line, but this feature is discontinued.
windowing : str
window before FFT, default value is “square”.
one_baseline_per_task: bool
boolean to activate one-baseline-per-task mode.
phase_calibration
[unused].
min_mapper_chunk
[-1 by default]
max_mapper_chunk
[-1 by default]
task_scaling_stations
0 for all-baselines-per-task mode, 1 to activate linear scaling with number of stations.
single_precision
[unused]
mapper_params_str : str
parameters to call mapper.

TO DO:

Automate finding max number of polarizations.
Remove all unused arguments.
Consider adding paths for long ini files, currently this is done only for delays.ini.
Consider removing fft_size, fft always in reducer.
lib_mapredcorr.get_mr_command(app_dir, script, params)[source]

Script for creating line with call to mapper/reducer with parameters.

lib_mapredcorr.get_options_custom_partitioner(use_nohash_partitioner=1)[source]

Options for custom partitioner.

lib_mapredcorr.get_options_fixed_length_records(record_size)[source]

Options for fixed length records as input, instead of text.


TO DO:

This needs work.
lib_mapredcorr.get_options_logging(log_properties)[source]

Options for log4j logging.

log_properties
path to log4j.properties file.

TO DO:

Currently not working, and thus “log_properties” is copied to a specific folder (see un_mapreduce_sh()).
lib_mapredcorr.get_options_lustre()[source]

Options for lustre filesystem.

lib_mapredcorr.get_options_num_maps(num_maps)[source]

Total number of mappers for this job.

lib_mapredcorr.get_options_num_reduces(num_reduces)[source]

Total number of reducers for this job.

lib_mapredcorr.get_options_partitioning(field_sep, key_fields, key_field_sep, part_opts, comp_opts)[source]

Get partitioning options.

field_sep
field separator.
key_fields
number of key fields.
key_field_sep
key field separator.
part_opts
partitioning options.
comp_opts
comparator options.
option1 : str
java options (relative to partitioning) for job submission.
lib_mapredcorr.get_options_text_delimiter(hadoop_text_delimiter)[source]

Options for text delimiter.

lib_mapredcorr.get_reducer_params_str(codecs_serial, fft_at_mapper, internal_log_reducer, fft_size, windowing, phase_calibration, single_precision)[source]

Returns string with all the parameters to call the reducer.

codecs_serial
[only testing] “” by default. Serialized version of codecs used for comrpession.
fft_at_mapper
same variable used to call get_mapper_params_str(), inverted here.
internal_log_reducer
[unused]
fft_size
fft length from the configuration.
windowing
window type before FFT, “square” by default.
phase_calibration
if 1 phase calibration tones will be extracted.
single_precision
boolean to control data types for unpacked samples.
reducer_params_str : str
parameters to call mapper.

**TO DO:

Remove unused parameters
lib_mapredcorr.pipeline_app(python_x, stations, input_files, app_dir, mapper, reducer, fft_size, fft_at_mapper, accumulation_time, signal_start, signal_duration, first_frame_num, num_frames, data_dir, output_dir, output_sym, auto_stations, auto_pols, num_pols, codecs_serial, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, file_out='vt4.txt', ini_stations='none', ini_media='none', ini_delays='none', internal_log_mapper=1, internal_log_reducer=1, ffts_per_chunk=1, windowing='square', one_baseline_per_task=True, phase_calibration=0, min_mapper_chunk=-1, max_mapper_chunk=-1, task_scaling_stations=0, sort_output=1, single_precision=0, profile_map=0, profile_red=0, timestamp_str='')[source]

Perform correlation through pipeline execution (that is, without hadoop). All the data is passed through the mapper, then the results are sorted and passed through the reducer.

See table below.
list with start time, end time and duration of the execution in seconds.
Note that the environment variable map_input_file is modified for each processed file to emulate the hadoop behavior
(and thus provide access to the mapper to the name of the file currently being processed.
  get_mapper_params_str() get_reducer_params_str()
stations x  
fft_size x x
fft_at_mapper x x
accumulation_time x  
signal_start x  
signal_duration x  
first_frame_num x  
num_frames x  
auto_stations x  
auto_pols x  
num_pols x  
codecs_serial x x
ini_stations x  
ini_media x  
ini_delays: x  
internal_log_mapper: x  
internal_log_reducer:   x
ffts_per_chunk: x  
windowing: x x
one_baseline_per_task: x  
phase_calibration: x x
min_mapper_chunk: x  
max_mapper_chunk: x  
task_scaling_stations: x  
single_precision: x x
python_x: str with python executable.
input_files: list with filenames for the media.
app_dir:
path with the location of the .py files for mapper,
reducer and all their dependencies.
mapper: mapper .py filename.
reducer: reducer .py filename.
data_dir: path with the location for the media.
output_dir: path for the intermediate and output files.
output_sym:
path for the symbolic link to the output file
(typically sub-path in experiment folder).
v: boolean to activate verbose mode.
file_log: file for logging.
file_out: output file.
sort_output:
[0 by default] boolean to activate the sorting of the
output.
profile_map:
[0 by default] 1 to profile mapper using pycallgraph,
2 to profile using cProfile.
profile_red:
[0 by default] 1 to profile reducer using pycallgraph,
2 to profile using cProfile.
lib_mapredcorr.run_mapreduce_sh(record_size, jobsh, mappersh, reducersh, app_dir, hadoop_dir, hadoop_conf_dir, folder_deps, files_deps, add_deps, mapper, reducer, hdfs_data_dir, hdfs_output_file, output_hadoop, text_mode, hadoop_text_delimiter, output_dir, output_sym, temp_log, packets_per_hdfs_block, total_frames, total_partitions, adjust_mappers, adjust_reducers, use_nohash_partitioner=1, use_lustre_plugin=0, lustre_user_dir='r/ajva/', num_slaves=1, num_vcores=1, one_baseline_per_task=True, sort_output=1, bm_delete_output=0, bypass_reduce=0, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Perform correlation through hadoop. Requires hadoop started (see lib_hadoop_hdfs.py).

record_size
[0 by default] If 1 will use fixed length records in Hadoop, testing only.
jobsh
bash file to write complete call to hadoop for job submission.
mappersh
bash file with complete call to the mapper.
reducersh
bash file with complete call to the reducer.
app_dir
path to mapper and reducer .py files.
hadoop_dir
base path of the hadoop installation.
hadoop_conf_dir
path with hadoop configuration files.
folder_deps
path to the dependencies for the mapper and the reducer.
files_deps
list of strings with the filenames of the dependencies (.py files).
add_deps
list of strings with paths for additional dependencies (.ini files if applicable)
mapper
filename of the mapper (.py).
reducer
filename of the reducer (.py).
hdfs_data_dir
working path in HDFS or Lustre.
hdfs_output_file
filename of the output file in the working path (HDFS or Lustre).
output_hadoop
filename of the output file in a local folder (output_dir).
text_mode
[1 by default] 0 for binary, only testing.
hadoop_text_delimiter
text delimiter for input data at mapper, see notes below.
output_dir
output folder (local).
output_sym
path (local) for the symbolic link to the output file (typically sub-path in experiment folder).
temp_log
filename for temporary logging.
packets_per_hdfs_block
number of frames per mapper.
total_frames
total number of frames in all the VDIF files to be processed.
total_partitions
maximum number of reducers.
adjust_mappers
force the calculated number of mappers to be multiplied by this factor.
adjust_reducers
force the calculated number of reducers to be multiplied by this factor.
use_nohash_partitioner
0 for default partitioner, 1 for nohash partitioner (better load balancing).
use_lustre_plugin
boolean to allow Hadoop to work directly in Lustre instead of HDFS.
lustre_user_dir
absolute path for the Lustre working path.
num_slaves
[unused] included to have the option to control the number of mappers reducers based on this.
num_vcores
[unused] included to have the option to control the number of mappers reducers based on this

one_baseline_per_task [0 by default] sort_output

[0 by default]
bm_delete_output
delete ouput file if 1 (only for benchmarking).
bypass_reduce
do not run reduce phase if 1 (so that output is directly that of the mappers), use only for debugging.
v
0 by default, 1 for verbose mode.
file_log
logging file.
start_time
number of seconds when the job is launched.
end_time
number of seconds when the job finishes.
elapsed_time
duration of the execution of the job [s].
ret_start_time
number of seconds when the output file is requested to the working filesystem (HDFS/Lustre).
ret_end_time
number of seconds when the output file gets to the local output folder.
ret_elapsed_time
duration of the retrieval of the output file from HDFS/Lustre to the local output folder [s].
sort_start_time
number of seconds when the output file sort starts.
sort_end_time
number of seconds when the output file sort starts.
sort_elapsed_time
duration of the output file sort [s].

Configuration:

-Controling the number of mappers and reducers:
packets_per_hdfs_block and total frames control the number of mappers.
total_partitions controls the number of reducers.
adjust_mappers and adjust_reducers allow to modify the number or mappers/reducers.
*The number of reducers allows finer tunning:
-if adjust_reducers==0, the reducer phase is bypassed.
-if adjust_reducers<0, the number of reducers is fixed to the absolute value of the integer given.
-Configuration of the input reader:
Hadoop is currently used in text mode to process binary data. Until an implementation with full binary support,
the text mode is used. In this mode Hadoop splits the input blocks if it finds this delimiter, so it has to
be configured to minimize its probability. Need to find a more elegant solution.
-Configuration files:
Use a different hadoop_conf_dir for each node, otherwise there will be conflicts in clusters with shared filesystems.
-Filesystem:
Use Lustre if possible.


Dependencies:

-Lustre:
Lustre support based on the pluging in https://github.com/Seagate/lustrefs.
-Partitioner:
The custom partitioner KeyFieldBasedPartitionerNH (NH for no hash) has to be used to avoid unbalanced loads on the reducers,
due to the default behavior by hashing the keys.


Notes:

Note that in this case if adjust_mappers and adjust_reducers are -1 that does not mean that are computed
automatically (this is the usual convention in the code), but however that only 1 mapper or 1 reducer are used
(see “Configuration” for more details).


TO DO:

Implement native binary support, instead of using text mode.
Document sorting.