rsvf module

Reducer: performs VLBI correlation from a text file with the lines for all the stations,
sorted as defined in const_mapred.py based on the key and format defined in msvf.get_pair_str().

Parameters

See lib_mapredcorr.get_reducer_params_str()

Returns

See rsvf.get_lines_out_for_all() for visibilities. See rsvf.get_lines_stats() for statistics.

Notes


**Reader:

Expecting lines with
See rsvf.split_input_line()
rsvf.decode_samples_b64(vector_split_samples, vector_split_encoding)[source]

Decode base64.

vector_split_samples
string with the samples (that is a component of the list vector_split).
vector_split_encoding
compression (VQ) encoding, disabled by default.
out: 1D numpy array
samples (components if complex), packed in binary format (uint8). Samples still need to be “dequantized”.
rsvf.extract_params_split(vector_split)[source]

Get paramters from line header (header of the data, not part of the key).

vector_split : str
second part of the header with all the parameters provided by the mapper,
associated to the samples to be processed.
bits_per_sample
number of bits for each component of the sample.
block_first_sample
accumulation period. TO DO: check this.
data_type
sample type, ‘r’ for real, ‘c’ for complex.
encoding
[unused] initially used for introducing compression in the data (VQ), currently not used.
encoding_width
[unused] also associated to compression.
n_bins_pcal
number of samples for the windows to be accumulated for the pcal signal.
num_samples
number of samples in this line.
abs_delay
absolute delay.
rate_delay
delay information corresponding these samples (polynomials, etc).
fs
sampling frequency.
fs_pcal
[unused] phase calibration signal frequency spacing.
freq_channel
sky frequency.
first_sample
first sample number (integer starting at 0).
fractional_sample_delay
fractional sample delay corresponding to the sample 0 of this stream.
accumulation_time
time duration of the integration period.
shift_delay
integer number of samples offset for the sample 0 of this stream.
sideband
single side band side, ‘l’ for LSB, ‘u’ for USB.

Conventions:

See const_mapred.py for constants positions and descriptions.
rsvf.get_key_all_out(char_type, F_ind_s0, F_ind_s1, acc_str)[source]

Get key for reducer output.

char_type : char
operation mode (see split_input_line())
F_ind_s0
first station-polarization for this baseline.
F_ind_s1
second station-polarization for this baseline.
acc_str : str
multi-key for output line.
output : str
key for output line.
rsvf.get_lines_out_for_all(char_type, n_sp, F_ind, current_acc_str, count_acc, acc_mat, current_block_first_sample, current_vector_split, acc_pcal, count_acc_pcal, scaling_pair='A.A')[source]

Get output lines for all results in accumulation matrix.

char_type
operation mode (see split_input_line()).
n_sp
number of station-polarizations.
F_ind
structure with ids for station-polarizations.
current_acc_str
multi-key
count_acc
number of accumulations for the visibilities.
acc_mat : complex 3D array
visibilities for all baselines for this acc period and band. See lib_fx_stack.compute_x_all() for more info.
current_block_first_sample
<first_sample>.<channel_index>.
current_vector_split
metadata as in the input line.
acc_pcal : complex 2D array
phase calibration results for all stations for this acc period and band. See lib_pcal.accumulate_pcal_all() for more info.
count_acc_pcal
number of accumulations for the phase calibration results.
scaling_pair
station-polarization for this task (used in linear-scaling, “A.A” by default (all-baseslines-per-task).
lines_out
list of lines with output results (visibilities and phase calibration).
rsvf.get_lines_stats(current_key_pair_accu, F_stack_shift, F_adj_shift_partial, F_lti, F_ind, failed_acc_count, current_block_first_sample, dismissed_acc_count)[source]

Get list of lines with stats for this accumulation period including: -Number of dropped/added samples (for fractional sample overflows) (stack) -Number of fractional sample overflows (shift) -For each stationpol: last sample, total samples, missing/invalid samples (lti) -Number of failed accumulations (will be one if some data is uncorrelated, which may be

simply due to missalignment from delays.
current_key_pair_accu
part of the key with pair (station-pol A and station-pol B) and accumulation period.
F_stack_shift
[unused?] see lib_fx_stack().
F_adj_shift_partial
[unused?] see lib_fx_stack().
F_lti
list with last sample (l), total number of samples processed (t), invalid samples (i), and
adjuted samples for each stream.
F_ind
list with station-polarizations.
failed_acc_count
number of failed accumulations.
current_block_first_sample
<first_sample>.<channel_index>
dismissed_acc_count
number of dismissed accumulations.
lines_stats : list of str
lines with stats.

TO DO:

Remove unused.
rsvf.get_shapes_F1(F1)[source]

Get string showing shapes of F1.

F1: list of multidimensional np.arrays
(each elment has the samples for each station-poliarization.
out : str
rsvf.get_str_pcal_out(acc_pcal, current_n_bins_pcal, count_acc_pcal, current_key_pair_accu, current_vector_split, current_block_first_sample)[source]

[Only used in one-baseline-per-task mode] Get output string for phase calibration.

acc_pcal : complex 1D np.array
phase calibration results for one baseline, one band and one accumulation period.
current_n_bins_pcal
number of bins (number of elements in acc_pcal).
count_acc_pcal
number of accumulations performed to get pcal results.
current_key_pair_accu
part of the key with the baseline and the accumulation multi-key.
current_vector_split
metadata as in the input line.
current_block_first_sample
<first_sample>.<channel_index>.
str_print : str
output line with phase calibration results.
rsvf.get_str_pcal_out_all(sp, acc_pcal, current_n_bins_pcal, count_acc_pcal, current_key_pair_accu, current_vector_split, current_block_first_sample)[source]

Get output string for phase calibration (all-baselines-per-task).

sp
station-polarization
acc_pcal : complex 1D np.array
phase calibration results for one baseline, one band and one accumulation period.
current_n_bins_pcal
number of bins (number of elements in acc_pcal).
count_acc_pcal
number of accumulations performed to get pcal results.
current_key_pair_accu
part of the key with the baseline and the accumulation multi-key.
current_vector_split
metadata as in the input line.
current_block_first_sample
<first_sample>.<channel_index>.
str_print : str
output line with phase calibration results.
rsvf.get_str_r_out(current_key_pair_accu, count_acc, current_vector_split, current_block_first_sample, accu_prod_div)[source]

Get output string for reducer.

current_key_pair_accu
part of the key with the baseline and the accumulation multi-key.
count_acc
number of accumulations.
current_vector_split
list with metadata.
current_block_first_sample
<first_sample>.<channel_index>
accu_prod_div : complex 1D np.array
visibilities for one baseline, one band and one accumulation period.
str_print : str
output line with visibilities.
rsvf.main()[source]
rsvf.restore_Fs(last_F_delays, last_F_rates, last_F_frac, last_F_fs, last_F_fs_pcal, last_F_side, last_F_first_sample, F_delays, F_rates, F_frac, F_fs, F_fs_pcal, F_side, F_first_sample)[source]

Keep previous structures in case there is no data for all stationpols.

rsvf.split_input_line(line)[source]

Get sub-keys from read line.

line : str
whole line read from input.
key_pair_accu : str
part of the key with pair (station-pol A and station-pol B) and accumulation period.
key_sample : str
part of they key with sample number.
key_station
station identifier
vector_split
second part of the header with information necessary for the processing of the samples.
is_autocorr
used in one-baseline-per-task mode, indicates that this pair is an autocorrelation and therefore
these samples will be correlated with themselves.
key_station_pol
station-polarization identifier.
char_type
identifies the mode of operation, as defined in the mapper:
‘x’ for all-baselines-per-task,
‘r’ for linear scaling with the number of stations,
‘y’ for one-baseline-per-task.
accu_block
integration period number.

TO DO:

Move char_type (char_p in mapper) to constants section.
rsvf.str_list(F_list, sep_c=', ')[source]

Get string with representation of list.

rsvf.update_stored_samples(v_dequant, F1, F_ind, key_station_pol, F_delays, F_rates, F_fs, F_fs_pcal, abs_delay, rate_delay, fs, fs_pcal, F_first_sample, first_sample, data_type, F_frac, fractional_sample_delay, shift_delay, F_side, sideband, fft_size_in)[source]

Store samples and metadata, to be processed later.

*For data structures see output below. *For metadata parameters see extract_params_split(). v_dequant :numpy 1D array of complex

dequantized samples.
F_*: lists where each element correspond to one read line. All these lists are related, i.e. the n-th element
of all lists correspond to the same read line.

TO DO:

Add checks.