msvf module

Mapper: reads VDIF frames from std.input and generates one mapreduce line per band per frame.

Parameters

See lib_mapredcorr.get_mapper_params_str()

Returns

See msvf.get_pair_str()

Notes


Reader:

See msvf.read_frame()


Configuration:

See const_mapred.py


Debugging:

See const_debug.py
msvf.adjust_frame_num_and_seconds(fs, samples_per_channel_in_frame_full, samples_per_channel_in_frame_single, second_frame, frame_num, shift_int, acc_time_str)[source]
Get adjusted number of frame and shift inside frame considering delays. This is to determine which accumulation
period the data corresponding to this frame should go to.
fs : float
sampling frequency [Hz].
samples_per_channel_in_frame_full : int
number of samples per channel per frame (full samples, i.e. R or R+jI)
samples_per_channel_in_frame_single : int
number of sample components (i.e. R, I).
second_frame : float
seconds corresponding to the first sample of the frame.
frame_num : int
frame number.
shift_int : int
shift in number of sample components.
acc_time_str : str(float)
accumulation period duration [s].
frame_num_adjusted
frame number considering shift (forced to be positive).
frame_num_adjusted_neg
frame number considering shift (may be negative)
adjusted_shift_inside_frame
first sample component inside the frame considering the shift.

TO DO:

Remove acc_time_str.
Check for potential issues in second change... acc_time_str and second_frame are not used!
(!) Old implementation, may not take into account non-integer multiples of second for acc. period, check (!)
msvf.calculate_corr_pairs_one_baseline_per_task(tot_stations=3, tot_pols=1, auto_stations=0, auto_pols=1)[source]

Create array with identificators for all the correlation pairs for all the stations p<station_i><station_j>.

Only used in one-baseline-per-task mode.

tot_stations : int
number of stations.
tot_pols : int
number of polarizations
auto_stations : int
0 : only different stations (default)
1 : allow same station
2 : only same station
auto_pols : int
0 : only different polarizations
1 : allow same polarization (default)
2 : only same polarization
pairs_list : list of lists [station_a,polarization_a,station_b,polarization_b]

Limitations:

Currently assuming that all the stations have the same number of polarizations.


TO DO:

This is only used in one-baseline-per-task mode, but still called from main and used in logging.
msvf.check_time_frame(accu_block, rel_pos_frame, actual_frame_time, seconds_ref, seconds_duration)[source]

Check if actual timestamp of this frame is inside the experiment time wnidow.

accu_block
accumulation period id (-1 if outside scan window).
rel_pos_frame
relative frame position in accumulation period.
actual_frame_time
actual timestamp (considering delays) for the first sample of the frame.
seconds_ref
start of the scan [s].
seconds_duration
duration of the scan [s].
process_frame : int
1 if frame inside scan.
after_end_time : int
1 if frame after end of defined window.
msvf.compute_shift_delay_samples(params_delays, vector_seconds_ref, freq_sample, seconds_frame, pair_st_so, data_type=0, front_time=None, cache_rates=[], cache_delays=[])[source]

Compute number of samples to shift signal (always positive since reference station is closest to source).

params_delays
delay model ini file.
vector_seconds_ref
list of floats with seconds for delay information (start time polynomials).
freq_sample
sampling frequency [Hz].
seconds_frame
seconds corresponding to the frame to be processed.
station_id
corresponds to id number in stations ini file.
source_id
[default 0], see limitations.

pair_st_so data_type

0 for real, 1 for complex.
front_time
frontier time, that is, time corresponding to the start of the integration period (takes priority over the seconds of the frame)
cache_rates
temporary information on delays to avoid reprocessing of the input files (see lib_ini_files.get_rates_delays()).
cache_delays
list with [seconds_fr_nearest,pair_st_so,delay] from previous computation.
shift_int
number of sample components to offset (integer delay).
delay
total delay (=freq_sample*(shift_int+fractional_sample_delay)).

fractional_sample_delay error_out

0 if sucess, -1 if error (e.g. accumulation period not found in ini file)
cache_rates
updated cache_rates (input).

Limitations:

Currently assuming single source (source_id always zero


TO DO:

Simplify code, no need for params_delays nor find_nearest().
msvf.encode_samples(signal_chunk_fft_out, encode_b64, apply_compression)[source]

Encode packed samples into base64.

signal_chunk_fft_out : 1D numpy array
bytes containing packed sample components.
encode_b64 : int
use base64 encoding, 1 by default.
apply_compression : int
0 by default.
signal_chunk_fft_out : str
signal encoded into base64.
msvf.get_absolute_delay(params_delays, vector_seconds_ref, seconds_frame, pair_st_so, front_time=None, cache_rates=[])[source]

Get all the delay information structures associated to the processed station, source and integration period.

params_delays
delay model ini file.
vector_seconds_ref : list of float
seconds for delay information (start time polynomials).
seconds_frame
seconds corresponding to the frame to be processed.
station_id
corresponds to id number in stations ini file.
source_id
[default 0], see limitations.
front_time
frontier time, that is, time corresponding to the start of the integration period (takes priority over the seconds of the frame)
cache_rates
temporary information on delays to avoid reprocessing of the input files (see lib_ini_files.get_rates_delays()).
abs_delay
initial absolute delay.
rate_delay
delay polynomials.
ref_delay
delay for the “reference” station.
error_out
-1 if station, source and accumulation period not found, 0 if sucess.
cache_rates
updated cache_rates (input).

Configuration:

VERBOSE_INI_DELAYS: from lib_ini_delays.py.


TO DO:

Merge code with compute_shift_delay_samples to avoid repetition.
msvf.get_alloc_tasks_linear_scaling(num_pairs)[source]

Get allocation of stations into tasks. It computes the matrix that defines which pairs are associated to each task. Only used in station-based-splitting (linear scaling with number of stations).

num_pairs : int
number of pairs.
a : binary 2D square array
task allocation.

TODO:

Detail the algorithm.
msvf.get_codebook_info(codecs_serial, params_media, current_file_name, station_name, chunk_size, apply_compression)[source]

Get information for compression library (e.g. vector quantization) if used.

codecs_serial : str
serialized codecs for compression.
params_media : str
with serialized media.ini.
current_file_name : str
name of the file currently being processed.
station_name : str
name of the station.
chunk_size : int
number of samples per chunk.
apply_compression : bool
whether to apply compression or not
encoding
[HARDCODED TO NO COMPRESSION]
codebook
codebook extracted from the codecs (vector quantization).
codebook_name
codebook name from .ini.
apply_compression
updated version of input based on configuration in .ini.

TO DO:

Harcoded output.
msvf.get_current_filename()[source]

Get the name of the file currently being processed from the environment variable. Note this is only for hadoop, thus the variable has to be created if running in pipeline, (which

is already done in lib_mapredcorr.py.
msvf.get_num_samples_per_frame(allsamples, num_channels, data_type)[source]

Based on number of channels and data type (complex or real, get number of samples per frame.

allsamples : int
total number of sample components in the frame.
num_channels : int
number of channels in the frame.
data_type : int
0 for real, 1 for complex.
tot_samples_per_channel_and_frame_full : int
number of samples per channel per frame (full samples, i.e. R or R+jI)
totsamples_per_channel_and_frame_single : int
number of sample components (i.e. R, I).
msvf.get_pair_all_baselines_per_task()[source]

Get pair for all baselines per tasks.

msvf.get_pair_linear_scaling(s0, t0)[source]

Get pair for linear scaling with number of stations.

s0 : int
station id
t0 : int
polarization id
msvf.get_pair_str(char_p, pair, accu_block, mod_channel, num_channels, seconds_fr, first_sample_signal, station_id, mod_polarization_id, freq_sample, bits_per_sample, data_type_char, encoding, n_bins_pcal_val, pcal_freq, one_baseline_per_task, task_scaling_stations, id_pair=0, tot_pairs=0, tot_accu_blocks=1, num_samples=0, abs_delay=0.0, rate_delay=[], freq_channel=0.0, fractional_sample_delay=0.0, accumulation_time=0.0, shift_int=0, sideband='L')[source]

Build output string with key and first part of value (metadata) for map output.

char_p : char {‘x’,’r’,’y’}
identifies the mode of operation:
‘x’ for all-baselines-per-task,
‘r’ for linear scaling with the number of stations,
‘y’ for one-baseline-per-task.
pair : str
A.A-A.A for all-baselines-per-task, station0.polarization0-station1.polarization1 for single-baseline-per-task.
accu_block : int
accumulation period id.
mod_channel : int
channel id.
num_channels : int
number of channels.
seconds_fr : int
[repeated] currently same as accu_block.
first_sample_signal : int
sample number for the first sample in this chunk of samples.
station_id : int
station identifier corresponding to this chunk of samples.
mod_polarization_id : int
polarization identifier corresponding to this chunk of samples.
freq_sample : int
sampling frequency [Hz].
bits_per_sample : int
number of bits per sample component.
data_type_char : char {‘r’,’c’}
‘r’ for real, ‘c’ for complex.
encoding : str
type of encoding compression used (C_INI_MEDIA_C_*).
n_bins_pcal_val : int
number of bins for the phase calibration window.
pcal_freq : int
phase calibration tone separation [Hz].
one_baseline_per_task : int
[default 0], 1 for one baseline per task (work in progress)
task_scaling_stations : int
[default 0], 1 for linear scaling with stations (work in progress)
id_pair
[only used if one_baseline_per_task==1]
tot_pairs
[currently unused, but should be used if one-baseline-per-task]
tot_accu_blocks : int
number of accumulation periods in the scan.
num_samples : int
number of samples in this chunk (required in reducer in case last bytes not filled with samples).
abs_delay : float
absolute delay for this chunk.
rate_delay : list
list with delay polynomials (see get_absolute_delay()).
freq_channel : float
edge frequency [Hz] corresponding to the samples in this chunk.
fractional_sample_delay : float
fractional sample delay for the first sample in this chunk.
accumulation_time : float
accumulation period duration [s].
shift_int : int
number of sample components used to offset these samples (integer delay).
sideband : char {‘L’,’U’}
‘L’ for lower-sideband ‘U’ for upper-sideband.
pair_str : str
complete header for the current set of samples being processed.

Output formatting and conventions:

The fields in the key below (k1,k2,...) separated by FIELD_SEP are referenced in const_mapred.py for the sorting configuration.
The metadata must follow the same order as defined in const_mapred.py.


Notes:

The full output corresponds to the output string generated here and the samples packed in base64 (done outside).


TO DO:

Simplify interface.
seconds_fr, delete and take accu block...
Define constants for real and complex chars, and for lower and upper sideband.
Define constant for initial “p” in line, initially for pair.
msvf.get_pointers_samples(tot_samples_one_frame, adjusted_shift_inside_frame, accu_block, rel_pos_frame, tot_samples_sup_frame, ref_offset=0)[source]

Compute metadata relative to sample organization (sample ids, offsets, chunk sizes...).

tot_samples_one_frame
number of samples per channel per frame.
adjusted_shift_inside_frame
shift (number of samples) for the first sample inside the frame.
accu_block
accumulation block corresponding to these samples (last sample).
rel_pos_frame
number of frame relative to the accumulation period.
tot_samples_sup_frame
number of samples per frame [see notes below].
ref_offset
integer shift (number of samples) for the first sample of the stream
tot_samples_v
number of samples in this chunk.
seconds_v
seconds corresponding to the start of the accumulation period (reference is zero).
offset_first_sample_iterator_v
offset used to fetch samples from the array with the samples.
offset_first_sample_signal_v
offset used to compute the position of this chunk in the complete stream.
chunk_size_v
same as tot_samples_v [remove].
acc_v
same as seconds_v [remove].

TO DO:

Superframe functionality needs debugging.
Check ref_offset.
Remove chunk_size_v and acc_v.
msvf.get_seconds_fr_front(front_time, vector_seconds_ref, seconds_frame)[source]

Find frontier seconds only if not available.

front_time
frontier seconds (i.e. timestamp for start time polynomials).
vector_seconds_ref
list of floats with seconds for delay information (start time polynomials).
seconds_frame
seconds corresponding to this frame.
seconds_fr_nearest
frontier seconds.
msvf.main()[source]
msvf.pack_and_encode_samples(signal_chunk_fft, use_bitarrays, encode_b64, apply_compression, bits_per_sample)[source]

Encode signal chunk for output of mapper.

signal_chunk_fft : 1D numpy array of int
sample components (integer values).
use_bitarrays : int
0 by default.
encode_b64 : int
use base64 encoding, 1 by default.
apply_compression : int
0 by default.
bits_per_sample : int
number of bits per sample component.
signal_chunk_fft_out : str
signal encoded into base64.

TO DO:

Place together with decode_samples_b64() for better organization.
msvf.pack_samples(signal_chunk_fft, bits_per_sample)[source]

Pack the sample components into bytes to avoid data storage overhead.

signal_chunk_fft : 1D numpy array
sample components (integer values).
bits_per_sample : int
number of bits per sample component.
signal_chunk_fft_out : 1D numpy array
bytes containing packed sample components.
msvf.read_frame(reader, show_errors, forced_frame_length=0, forced_format='VDIF', forced_version='custom')[source]

It returns the header and samples in the frame, based on the information from the media.ini file. If this information is not available then it assumes that it is a vdif frame.

reader : file handle
sys.stdin.
show_errors : int
[0 by default] 1 for verbose mode.
forced_frame_length : int
[0 by default] >0 to force the number of bytes per frame (if ==0 frame length is read in header)
forced_format
[leave deafult value] use only for new implementations of readers.
forced_version
[leave deafult value] use only for new implementations of readers.
header : list
frame header following the format:
[seconds_frame,invalid_marker,legacy_marker,reference_epoch,frame_number,
vdif_version,log_2_channels,frame_length,data_type,bits_per_sample,thread_id,station_id] where:
seconds_frame: integer value with seconds for this frame.
invalid_marker: VDIF invalid bit.
legacy_marker: VDIF legacy bit.
reference_epoch: VDIF frame epoch (float with MJD (TBC)).
frame_number: VDIF frame number (integer).
vdif_version: VDIF frame version (integer).
log_2_channels: VDIF logarithm in base 2 of the number of channels in the frame.
frame_length: VDIF frame length (integer with number of bytes).
data_type: VDIF frame data type bit.
bits_per_sample: number of bits per sample.
thread_id: VDIF thread identifier field (integer).
station_id: VDIF station id field (integer).
allsamples : 1d numpy array of int
sample components (see below for details).
check_size_samples : int
1 if read as many samples as expected from the frame length (and rest of metadata), 0 otherwise.

Adding new libraries:

-Each library will be identified by “format” and “version”, to be specified in the media.ini file.
-Add the new format and version into const_ini_media.py
-Add the check for new format in the if structure below (if forced format==...)
-See the definition of the header to be returned above.
-The samples should be in a 1D numpy array of integers. E.g. for VDIF complex frame, [I0, Q0, I1, Q1, ...]
-The implementation is currently tied to the VDIF format, so samples corresponding to different channels will be interleaved.
-If multiple bands or polarizations per file see VDIF specification (multiple thread or multiple channels).
-If simple case with single band and single polarization then log_2_channels=0 and thread_id=0 (and configure media.ini accordingly)
-See lib_vdif.py for more info.


TO DO:

Consider providing a general interface to allow easy development of new libraries.
use_ini_info always used, remove option.
forced_frame_length is not taken into account, remove it (length is read in the VDIF frame).
Remove option for bitarray_structures, no longer used.