msvf module¶
Mapper: reads VDIF frames from std.input and generates one mapreduce line per band per frame.
Parameters¶
See lib_mapredcorr.get_mapper_params_str()
Returns¶
See msvf.get_pair_str()
Notes¶
-
msvf.
adjust_frame_num_and_seconds
(fs, samples_per_channel_in_frame_full, samples_per_channel_in_frame_single, second_frame, frame_num, shift_int, acc_time_str)[source]¶ - Get adjusted number of frame and shift inside frame considering delays. This is to determine which accumulation
- period the data corresponding to this frame should go to.
- fs : float
- sampling frequency [Hz].
- samples_per_channel_in_frame_full : int
- number of samples per channel per frame (full samples, i.e. R or R+jI)
- samples_per_channel_in_frame_single : int
- number of sample components (i.e. R, I).
- second_frame : float
- seconds corresponding to the first sample of the frame.
- frame_num : int
- frame number.
- shift_int : int
- shift in number of sample components.
- acc_time_str : str(float)
- accumulation period duration [s].
- frame_num_adjusted
- frame number considering shift (forced to be positive).
- frame_num_adjusted_neg
- frame number considering shift (may be negative)
- adjusted_shift_inside_frame
- first sample component inside the frame considering the shift.
TO DO:Remove acc_time_str.Check for potential issues in second change... acc_time_str and second_frame are not used!(!) Old implementation, may not take into account non-integer multiples of second for acc. period, check (!)
-
msvf.
calculate_corr_pairs_one_baseline_per_task
(tot_stations=3, tot_pols=1, auto_stations=0, auto_pols=1)[source]¶ Create array with identificators for all the correlation pairs for all the stations p<station_i><station_j>.
Only used in one-baseline-per-task mode.
- tot_stations : int
- number of stations.
- tot_pols : int
- number of polarizations
- auto_stations : int
- 0 : only different stations (default)1 : allow same station2 : only same station
- auto_pols : int
- 0 : only different polarizations1 : allow same polarization (default)2 : only same polarization
pairs_list : list of lists [station_a,polarization_a,station_b,polarization_b]Limitations:Currently assuming that all the stations have the same number of polarizations.TO DO:This is only used in one-baseline-per-task mode, but still called from main and used in logging.
-
msvf.
check_time_frame
(accu_block, rel_pos_frame, actual_frame_time, seconds_ref, seconds_duration)[source]¶ Check if actual timestamp of this frame is inside the experiment time wnidow.
- accu_block
- accumulation period id (-1 if outside scan window).
- rel_pos_frame
- relative frame position in accumulation period.
- actual_frame_time
- actual timestamp (considering delays) for the first sample of the frame.
- seconds_ref
- start of the scan [s].
- seconds_duration
- duration of the scan [s].
- process_frame : int
- 1 if frame inside scan.
- after_end_time : int
- 1 if frame after end of defined window.
-
msvf.
compute_shift_delay_samples
(params_delays, vector_seconds_ref, freq_sample, seconds_frame, pair_st_so, data_type=0, front_time=None, cache_rates=[], cache_delays=[])[source]¶ Compute number of samples to shift signal (always positive since reference station is closest to source).
- params_delays
- delay model ini file.
- vector_seconds_ref
- list of floats with seconds for delay information (start time polynomials).
- freq_sample
- sampling frequency [Hz].
- seconds_frame
- seconds corresponding to the frame to be processed.
- station_id
- corresponds to id number in stations ini file.
- source_id
- [default 0], see limitations.
pair_st_so data_type
0 for real, 1 for complex.- front_time
- frontier time, that is, time corresponding to the start of the integration period (takes priority over the seconds of the frame)
- cache_rates
- temporary information on delays to avoid reprocessing of the input files (see lib_ini_files.get_rates_delays()).
- cache_delays
- list with [seconds_fr_nearest,pair_st_so,delay] from previous computation.
- shift_int
- number of sample components to offset (integer delay).
- delay
- total delay (=freq_sample*(shift_int+fractional_sample_delay)).
fractional_sample_delay error_out
0 if sucess, -1 if error (e.g. accumulation period not found in ini file)- cache_rates
- updated cache_rates (input).
Limitations:Currently assuming single source (source_id always zeroTO DO:Simplify code, no need for params_delays nor find_nearest().
-
msvf.
encode_samples
(signal_chunk_fft_out, encode_b64, apply_compression)[source]¶ Encode packed samples into base64.
- signal_chunk_fft_out : 1D numpy array
- bytes containing packed sample components.
- encode_b64 : int
- use base64 encoding, 1 by default.
- apply_compression : int
- 0 by default.
- signal_chunk_fft_out : str
- signal encoded into base64.
-
msvf.
get_absolute_delay
(params_delays, vector_seconds_ref, seconds_frame, pair_st_so, front_time=None, cache_rates=[])[source]¶ Get all the delay information structures associated to the processed station, source and integration period.
- params_delays
- delay model ini file.
- vector_seconds_ref : list of float
- seconds for delay information (start time polynomials).
- seconds_frame
- seconds corresponding to the frame to be processed.
- station_id
- corresponds to id number in stations ini file.
- source_id
- [default 0], see limitations.
- front_time
- frontier time, that is, time corresponding to the start of the integration period (takes priority over the seconds of the frame)
- cache_rates
- temporary information on delays to avoid reprocessing of the input files (see lib_ini_files.get_rates_delays()).
- abs_delay
- initial absolute delay.
- rate_delay
- delay polynomials.
- ref_delay
- delay for the “reference” station.
- error_out
- -1 if station, source and accumulation period not found, 0 if sucess.
- cache_rates
- updated cache_rates (input).
Configuration:VERBOSE_INI_DELAYS: from lib_ini_delays.py.TO DO:Merge code with compute_shift_delay_samples to avoid repetition.
-
msvf.
get_alloc_tasks_linear_scaling
(num_pairs)[source]¶ Get allocation of stations into tasks. It computes the matrix that defines which pairs are associated to each task. Only used in station-based-splitting (linear scaling with number of stations).
- num_pairs : int
- number of pairs.
- a : binary 2D square array
- task allocation.
TODO:Detail the algorithm.
-
msvf.
get_codebook_info
(codecs_serial, params_media, current_file_name, station_name, chunk_size, apply_compression)[source]¶ Get information for compression library (e.g. vector quantization) if used.
- codecs_serial : str
- serialized codecs for compression.
- params_media : str
- with serialized media.ini.
- current_file_name : str
- name of the file currently being processed.
- station_name : str
- name of the station.
- chunk_size : int
- number of samples per chunk.
- apply_compression : bool
- whether to apply compression or not
- encoding
- [HARDCODED TO NO COMPRESSION]
- codebook
- codebook extracted from the codecs (vector quantization).
- codebook_name
- codebook name from .ini.
- apply_compression
- updated version of input based on configuration in .ini.
TO DO:Harcoded output.
-
msvf.
get_current_filename
()[source]¶ Get the name of the file currently being processed from the environment variable. Note this is only for hadoop, thus the variable has to be created if running in pipeline, (which
is already done in lib_mapredcorr.py.
-
msvf.
get_num_samples_per_frame
(allsamples, num_channels, data_type)[source]¶ Based on number of channels and data type (complex or real, get number of samples per frame.
- allsamples : int
- total number of sample components in the frame.
- num_channels : int
- number of channels in the frame.
- data_type : int
- 0 for real, 1 for complex.
- tot_samples_per_channel_and_frame_full : int
- number of samples per channel per frame (full samples, i.e. R or R+jI)
- totsamples_per_channel_and_frame_single : int
- number of sample components (i.e. R, I).
-
msvf.
get_pair_linear_scaling
(s0, t0)[source]¶ Get pair for linear scaling with number of stations.
- s0 : int
- station id
- t0 : int
- polarization id
-
msvf.
get_pair_str
(char_p, pair, accu_block, mod_channel, num_channels, seconds_fr, first_sample_signal, station_id, mod_polarization_id, freq_sample, bits_per_sample, data_type_char, encoding, n_bins_pcal_val, pcal_freq, one_baseline_per_task, task_scaling_stations, id_pair=0, tot_pairs=0, tot_accu_blocks=1, num_samples=0, abs_delay=0.0, rate_delay=[], freq_channel=0.0, fractional_sample_delay=0.0, accumulation_time=0.0, shift_int=0, sideband='L')[source]¶ Build output string with key and first part of value (metadata) for map output.
- char_p : char {‘x’,’r’,’y’}
- identifies the mode of operation:
- ‘x’ for all-baselines-per-task,‘r’ for linear scaling with the number of stations,‘y’ for one-baseline-per-task.
- pair : str
- A.A-A.A for all-baselines-per-task, station0.polarization0-station1.polarization1 for single-baseline-per-task.
- accu_block : int
- accumulation period id.
- mod_channel : int
- channel id.
- num_channels : int
- number of channels.
- seconds_fr : int
- [repeated] currently same as accu_block.
- first_sample_signal : int
- sample number for the first sample in this chunk of samples.
- station_id : int
- station identifier corresponding to this chunk of samples.
- mod_polarization_id : int
- polarization identifier corresponding to this chunk of samples.
- freq_sample : int
- sampling frequency [Hz].
- bits_per_sample : int
- number of bits per sample component.
- data_type_char : char {‘r’,’c’}
- ‘r’ for real, ‘c’ for complex.
- encoding : str
- type of encoding compression used (C_INI_MEDIA_C_*).
- n_bins_pcal_val : int
- number of bins for the phase calibration window.
- pcal_freq : int
- phase calibration tone separation [Hz].
- one_baseline_per_task : int
- [default 0], 1 for one baseline per task (work in progress)
- task_scaling_stations : int
- [default 0], 1 for linear scaling with stations (work in progress)
- id_pair
- [only used if one_baseline_per_task==1]
- tot_pairs
- [currently unused, but should be used if one-baseline-per-task]
- tot_accu_blocks : int
- number of accumulation periods in the scan.
- num_samples : int
- number of samples in this chunk (required in reducer in case last bytes not filled with samples).
- abs_delay : float
- absolute delay for this chunk.
- rate_delay : list
- list with delay polynomials (see get_absolute_delay()).
- freq_channel : float
- edge frequency [Hz] corresponding to the samples in this chunk.
- fractional_sample_delay : float
- fractional sample delay for the first sample in this chunk.
- accumulation_time : float
- accumulation period duration [s].
- shift_int : int
- number of sample components used to offset these samples (integer delay).
- sideband : char {‘L’,’U’}
- ‘L’ for lower-sideband ‘U’ for upper-sideband.
- pair_str : str
- complete header for the current set of samples being processed.
Output formatting and conventions:The fields in the key below (k1,k2,...) separated by FIELD_SEP are referenced in const_mapred.py for the sorting configuration.The metadata must follow the same order as defined in const_mapred.py.Notes:The full output corresponds to the output string generated here and the samples packed in base64 (done outside).TO DO:Simplify interface.seconds_fr, delete and take accu block...Define constants for real and complex chars, and for lower and upper sideband.Define constant for initial “p” in line, initially for pair.
-
msvf.
get_pointers_samples
(tot_samples_one_frame, adjusted_shift_inside_frame, accu_block, rel_pos_frame, tot_samples_sup_frame, ref_offset=0)[source]¶ Compute metadata relative to sample organization (sample ids, offsets, chunk sizes...).
- tot_samples_one_frame
- number of samples per channel per frame.
- adjusted_shift_inside_frame
- shift (number of samples) for the first sample inside the frame.
- accu_block
- accumulation block corresponding to these samples (last sample).
- rel_pos_frame
- number of frame relative to the accumulation period.
- tot_samples_sup_frame
- number of samples per frame [see notes below].
- ref_offset
- integer shift (number of samples) for the first sample of the stream
- tot_samples_v
- number of samples in this chunk.
- seconds_v
- seconds corresponding to the start of the accumulation period (reference is zero).
- offset_first_sample_iterator_v
- offset used to fetch samples from the array with the samples.
- offset_first_sample_signal_v
- offset used to compute the position of this chunk in the complete stream.
- chunk_size_v
- same as tot_samples_v [remove].
- acc_v
- same as seconds_v [remove].
TO DO:Superframe functionality needs debugging.Check ref_offset.Remove chunk_size_v and acc_v.
-
msvf.
get_seconds_fr_front
(front_time, vector_seconds_ref, seconds_frame)[source]¶ Find frontier seconds only if not available.
- front_time
- frontier seconds (i.e. timestamp for start time polynomials).
- vector_seconds_ref
- list of floats with seconds for delay information (start time polynomials).
- seconds_frame
- seconds corresponding to this frame.
- seconds_fr_nearest
- frontier seconds.
-
msvf.
pack_and_encode_samples
(signal_chunk_fft, use_bitarrays, encode_b64, apply_compression, bits_per_sample)[source]¶ Encode signal chunk for output of mapper.
- signal_chunk_fft : 1D numpy array of int
- sample components (integer values).
- use_bitarrays : int
- 0 by default.
- encode_b64 : int
- use base64 encoding, 1 by default.
- apply_compression : int
- 0 by default.
- bits_per_sample : int
- number of bits per sample component.
- signal_chunk_fft_out : str
- signal encoded into base64.
TO DO:Place together with decode_samples_b64() for better organization.
-
msvf.
pack_samples
(signal_chunk_fft, bits_per_sample)[source]¶ Pack the sample components into bytes to avoid data storage overhead.
- signal_chunk_fft : 1D numpy array
- sample components (integer values).
- bits_per_sample : int
- number of bits per sample component.
- signal_chunk_fft_out : 1D numpy array
- bytes containing packed sample components.
-
msvf.
read_frame
(reader, show_errors, forced_frame_length=0, forced_format='VDIF', forced_version='custom')[source]¶ It returns the header and samples in the frame, based on the information from the media.ini file. If this information is not available then it assumes that it is a vdif frame.
- reader : file handle
- sys.stdin.
- show_errors : int
- [0 by default] 1 for verbose mode.
- forced_frame_length : int
- [0 by default] >0 to force the number of bytes per frame (if ==0 frame length is read in header)
- forced_format
- [leave deafult value] use only for new implementations of readers.
- forced_version
- [leave deafult value] use only for new implementations of readers.
- header : list
- frame header following the format:
- [seconds_frame,invalid_marker,legacy_marker,reference_epoch,frame_number,vdif_version,log_2_channels,frame_length,data_type,bits_per_sample,thread_id,station_id] where:seconds_frame: integer value with seconds for this frame.invalid_marker: VDIF invalid bit.legacy_marker: VDIF legacy bit.reference_epoch: VDIF frame epoch (float with MJD (TBC)).frame_number: VDIF frame number (integer).vdif_version: VDIF frame version (integer).log_2_channels: VDIF logarithm in base 2 of the number of channels in the frame.frame_length: VDIF frame length (integer with number of bytes).data_type: VDIF frame data type bit.bits_per_sample: number of bits per sample.thread_id: VDIF thread identifier field (integer).station_id: VDIF station id field (integer).
- allsamples : 1d numpy array of int
- sample components (see below for details).
- check_size_samples : int
- 1 if read as many samples as expected from the frame length (and rest of metadata), 0 otherwise.
Adding new libraries:-Each library will be identified by “format” and “version”, to be specified in the media.ini file.-Add the new format and version into const_ini_media.py-Add the check for new format in the if structure below (if forced format==...)-See the definition of the header to be returned above.-The samples should be in a 1D numpy array of integers. E.g. for VDIF complex frame, [I0, Q0, I1, Q1, ...]-The implementation is currently tied to the VDIF format, so samples corresponding to different channels will be interleaved.-If multiple bands or polarizations per file see VDIF specification (multiple thread or multiple channels).-If simple case with single band and single polarization then log_2_channels=0 and thread_id=0 (and configure media.ini accordingly)-See lib_vdif.py for more info.TO DO:Consider providing a general interface to allow easy development of new libraries.use_ini_info always used, remove option.forced_frame_length is not taken into account, remove it (length is read in the VDIF frame).Remove option for bitarray_structures, no longer used.