lib_fx_stack module

CorrelX FX correlation and samples-stack routines.

lib_fx_stack.compute_f_all(F1, fft_size, windowing, dtype_complex, F_frac=[], F_fs=[], F_refs=[], freq_channel=0, F_first_sample=[], F_rates=[], F_pcal_fix=[], F_side=[], F_ind=[], F_lti=[])[source]

Compute FFTs for all stations (all-baselines-per-task mode), and correct for fractional sample correction (linear phase).

F1
list of stored samples (corresponding actually to F1_partial). Each element of the list is a numpy array
with the complex samples in the time domain, with a number of samples that is a multiply of the FFT length.
fft_size : int
number of coefficients in the FFT.
windowing : str
shape of the window before FFT, currently ‘square’ by default.

dtype_complex: type of data for initialization of the rotators. F_frac

fractional and integer offsets applied at the mapper (acces via F_refs).
F_fs
sampling frequency for each stream in F1.
F_refs
indices to acces F_frac etc based on F_ind, i.e. from stored to new.
freq_channel
sky frequency.
F_first_sample
first sample number (actually last sample number plus one, it has to be corrected by subtracting the number of samples in F1.
F_rates
delay information for each of the streams (access via F_refs).
F_pcal_fix
offset for pcal accumulation results (due to the initial offset applied in the mapper). Later the pcal
signals will be realigned as if no delay was applied to them.
F_side
list of single side band side for each stream, ‘l’ LSB or ‘u’ USB (access via F_refs).
F_ind
list of station-polarization identifiers corresponding to the streams in F1 (this actually corresponds
to F1_ind_partial.
F1_fft
list of array of arrays with FFTs with rotations applied.
None
[unused] previously outputing the conjugate of F1_fft, removed for efficiency.
F_adj_shift_partial_out
[unused] previously used to keep track of the number of samples to
add/drop due to fractional sample overflows, superseded for F_frac_over.
F_adj_shift_pcal_out
[unused] previously used to keep track of the number of samples to
roll the phase calibration results prior to FFT them, superseded for F_pcal_fix_out.
F_pcal_fix_out
list with number of samples to roll the pcal streams prior to FFT them.
F_first_sample_out
first sample for each stream (actually last sample number plus one).

Procedure:

For each element in F1:
1. Create an array of arrays with the FFTs of the samples grouped into arrays of fft_size samples.
2. Create a frequency scale of fft_size (linear from 0 to (n-1)/n).
3a. If the computations have already been done for the same station, take the results.
3b. Otherwise:
Compute delay for the first sample, then fractional part of this delay, then scale frequency scale, then exponential.
Rotate the FFT using the previous rotator.


References:

[Th04] p363


TO DO:

Detail where in the FFT the fractional sample for the rotator is evaluated.

Check correction to phase in p363.
lib_fx_stack.compute_fx_for_all(F1_partial, F_ind_partial, F1, fft_size, windowing, acc_mat, count_acc, normalize_after_compute=False, F_ind=None, last_F_ind=None, n_sp=0, failed_acc_count=0, dismissed_acc_count=0, scaling_pair='A.A', dtype_complex=<class 'complex'>, acc_pcal=None, pre_pcal=None, n_bins_pcal=0, count_acc_pcal=0, phase_calibration=None, bypass_fx=0, F_delays=[], F_rates=[], F_fs=[], freq_channel=0.0, F_first_sample=[], F_first_sample_partial=[], F_frac=[], block_time=0.0, F_adj_shift_partial=[], F_stack_shift=[], F_adj_shift_pcal=[], F_stack_shift_pcal=[], F_pcal_fix=[], F_side=[], F_lti=[])[source]
Fringe rotation, FFTs, fractional sample correction for all station-polarizations, and
cross multiplication and accumulation for all baseline (all-baselines-per-task-mode)
F1_partial
list of previously stored samples.
F_ind_partial
station-polarization identifiers for the elements of F1_partial.
F1
new samples.
fft_size
number of coefficients fo the FFT.
windowing
shape of window prior to FFT (square by default).
acc_mat
accumulation matrix with provisional results (see compute_x_all for acc_mat description).
count_acc
number of accumulations performed so far.
normalize_after_compute
whether or not to normalize the results (to be activated in the last call in the integration period).
F_ind
station-polarization identifiers for the elements of F1.
last_F_ind
saved value of F_ind in the previous iteration.
n_sp
number of station-polarizations.
failed_acc_count
counter with number of failed accumulations.
dismissed_acc_count
counter with number of dismissed accumulations.
scaling_pair
mode of operation: “A.A” for all-baselines-per-task (default).
dtype_complex
complex type used for initialization of numpy arrays.
acc_pcal
provisional results for accumulated phase calibration.
pre_pcal
stored samples to be used in accumulation of phase calibration.
n_bins_pcal
number of samples for the windows to be accumulated for the pcal signal.
count_acc
number of accumulations performed previously during this integration period.
phase_calibration
whether or not to extract phase calibration tones.
bypass_fx
if 1, it will not do computations for cross-multiplication and accumulation.
F_delays
[unused] absolute delays for each of the streams in F1.
F_rates
delay information for each of the streams in F1.
F_fs
sampling frequency for each of the streams in F1.
freq_channel
sky frequency.
F_first_sample
first sample for each of the streams in F1.
F_first_sample_partial
first sample for each of the streams in the stored samples.
F_frac
fractional and integer sample delay for each of the streams in F1.
block_time
time for the current accumulation period.
F_adj_shift_partial
[unused] previously used to keep track of the samples to add/drop due to fractional sample
correction overflow (now using F_frac_over).
F_stack_shift
[unused] previously used to keep track of the samples added/droped due to fractional sample
correction (now using F_frac_over).
F_adj_shift_pcal
[unused] previously used to keep track of the samples to roll in the pcal signal.
F_stack_shift_pcal
[unused] equivalent to F_stack_shift for pcal.
F_pcal_fix
number of samples to roll in the pcal accumuled signal.
F_side
single side band side corresponding to the streams in F1.
F_lti
list of last, total, invalid samples for each stream
All output variables are udpated versions of those use as input, see procedure below for details. | acc_mat | count_acc | count_sub_acc | n_sp | last_F_ind | failed_acc_count | dismissed_acc_count | F1_partial | F_ind_partial | acc_pcal | pre_pcal | count_acc_pcal | F_first_sample_partial | F_adj_shift_partial | F_stack_shift | F_adj_shift_pcal | F_stack_shift_pcal | F_pcal_fix_out | F_lti

Procedure:

All-stations-per-tasks:
1. Concatenate samples (list of np arrays) for phase calibration computations.
2. Check for overflow in fractional sample correction in new samples.
3. Correct overflow in fractional sample correction (adding/dropping the required samples) in new samples.
4. Concatenate new samples into stored samples.
5. If possible separate the stored samples into two parts: the first part with a number of samples that is a multiple
of the fft size, and the remained in the second part.
6. Correct fringe rotation, FFT, correct fractional sample delay and cross-multiply, results go to accumulation matrix.
7. Leave unprocessed samples in F1_partial (stored samples).


Conventions:

failed accumulation: an accumulation is considered failed if the elements for the new samples do not match those stored.
dismissed accumulation: an accumulation is considered dismissed if it is not computed becaused there are not enough samples.
variables ending in *_partial correspond to stored samples.
F_refs is used to point to new samples when iterating through stored samples. Note that F1, F_frac, so e.g.
F_ind[F_refs[i]]==F_ind_partial[i] for i in range(F_ind_partial).


Configuration for trade-off performance vs. memory requirements:

Use the variable COMPUTE_FOR_SUB_ACC_PERIOD in const_mapred.py to configure the number of times the new samples are simply
stored without further computation. E.g. if =100, computations will be by-passed 99 out 100 times before the end
of the integration period. This allows to take advantage of increased efficiency with long numpy arrays, and also
to avoid some repeated computations in fringe_rotation() and compute_f_all().


Limitations:

Curently number of bins for pcal has to be the same for all stations.


TO DO:

Add support for different numbers of bins?
Check if not(not_enough_data):, may be problematic with samples left out.
lib_fx_stack.compute_x_all(F1_fft, F2_fft, count_acc, acc_mat, index_scaling_pair=-1, dtype_complex=<class 'complex'>)[source]

Compute multiply-accumulate for all baselines (all-baselines-per-task-mode)

F1_fft : list of np arrays of arrays
FFTs (left term).
F2_fft : list of np arrays of arrays with
FFTs (right term)If None, then it will take the conjugate of F1_fft.
count_acc
number of accumulations performed previously during this integration period.
acc_mat
accumulation matrix with results accumulated during this integration period.
index_scaling_pair
-1 for all-baselines-per-task mode, positive integer for other modes.
dtype_complex
type to initialize accumulation matrix.
acc_mat
accumulation matrix with the new results accumulated (3D numpy array with one row (and one column)
per station-polarization, and fft_size layers/pages with the results of the accumulation
for each pair). Note that the first two dimensions of the matrix are upper triangular, with the auto-correlations in the main diagonal.
count_acc
total number of accumulation performed including those in this call.
count_sub_acc
number of accumulation performed only in this call.
n_sp
number of station-polarizations (that is also the number of rows/columns of the matrix.

TO DO:

Add counters for invalid data.
lib_fx_stack.cut_remainder_fft_size_multiple(F_partial, fft_size_multiple)[source]

Function to migrate functionality from np arrays to list of arrays (for delay correction...). It divides an array into two, one of them with rows of length fft_size_multiple, the other with the remainder.

F_partial : list of numpy arrays
samples in the time domain, with no rotation applied yet, and wherein each of its elements may have a different length.
fft_size_multiple : int
maximum integer multiple of fft_size that is lesser than the length of the shortest element of F_partial.
F_partial_out
F_partial (input) truncated to the ff_size_multiple first elements.
F_partial_rem
Remainder of removing F_partial_out from F_partial.
lib_fx_stack.fix_frac_over(F1, F_frac_over_ind, F_ind, F_first_sample)[source]

Add or drop samples for overflow in fractional sample correction based on info from get_frac_over_ind().

F1
list of vectors with new data.
F_frac_over
structure obtained in get_frac_over_ind() with locations of samples to be added/dropped.
F_ind
list of stpols corresponding to F1 (only for logging).
F_first_sample
list of first sample number to F1 (only for logging).
F1
modified list of vectors with new data.
lib_fx_stack.fringe_rotation(F1, F_first_sample, F_rates, freq_channel, F_fs, F_delays, F_refs, block_time, F_frac, F_adj_shift_partial, F_side, F_ind, F_lti)[source]

Fringe rotation correction (previously doppler_correction()).

F1 : list
list with stored samples to be processed, which number of samples is an integer multiple of the fft length.
F1_first_sample : list
list with first sample (integer) corresponding to the samples in F1.
F_rates : list
list with the delay information for the latest data received. Note that if follows a different ordering as F1 here,
and thus its elements has to be accessed through F_refs.
freq_channel : float
sky frequency [Hz].
F_fs : list
list with the sampling frequency corresponding to the samples in F1 (access through F_refs).
F_delays
[unused] absolute delays of the streams in F1 (access through F_refs).
F_refs : list
list of indices for accessing F_rates, F_fs, etc (those filled in update_stored_samples()).
block_time
time corresponding to this accumulation period (unused).
F_frac
fractional and integer delay corresponding to F1 (access through F_refs).
F_adj_shift_partial
[unused] previously, list with information on added/dropped samples based on fractional sample correction
overflows, currently unused.
F_side : list
list with sideband for each of the streams in F1 (‘l’ for LSB, ‘u’ for USB) (access through F_refs).
F_ind : list
list of station identifiers in the format used in the key (e.g. 0.0, 0.1)
F1
F1 (input) with applied rotations if required.
F_first_sample
list with updated first samples (added number of samples in each element of F1).

Configuration for trade-off performance vs. precision:

There are three main modes, configuration in const_performance.py:
FULL_TIMESCALE=0 -> Min. precision, Max. performance: only a phase rotation is applied, based on the delay computed for
the first sample.
FULL_TIMESCALE=1 (default) -> Max. precision, Min. performance: a frequency shift is applied with a complete rotator, that is,
delays are computed for all the samples.
FULL_TIMESCALE=2 -> Trade-off solution: delay is computed for the first and last sample, and a linear interpolation is
done for obtaining the rest of delays.


Approximations:

For modes FULL_TIMESCALE 1 and 2, the time scale array is computed only once, assuming the same sampling frequency for
all the streams.

The processing currently performed in a loop iterating over the streams in F1, that should be sorted. Given this, for each
stream, it is checked if the previous processed stream corresponded to the same station, and if so, the previous rotators
are used.


Debugging:

Activate DEBUG_DELAYS for tabulated output of the delay computations.


References:

[Th04] p172-173


TO DO:

F_fs: assuming single sampling frequency
Bring configuration constants to configuration files.
Delete unused arguments.

Check if necessary to apply fractional offsets.
lib_fx_stack.fringe_rotation_work(clock_diff, poly_diff, seconds_ref_clock, delay_rate_ref, timescale, seconds_offset, n_samples, sideband, last_n_samples, str_st, last_str_st, data_type, last_data_type, first_iteration, freq_channel, fs, F1_i, nr, rotation)[source]

Worker for fringe rotation, see fringe_rotation() for more details.

lib_fx_stack.fringe_rotation_wrap(args)[source]
lib_fx_stack.get_exp(x)[source]

Get exponential based on fractional part of input, see Output below for details.

x : numpy array of float.
y : 1D numpy array of complex
complex rotation (exponential of 2*j*pi*fractional_part(x))
nr : bool
do not rotate (1 if all elements in y are 1, 0 otherwise)

Precision:

Integer part is removed to avoid problems with precision, rotation (j2pi).


Approximations:

-numpy.exp is not called in the trivial cases.
-IMPORTANT!: For arrays of more than 1 element, if the first, second and last elements are equal to zero, then
all the elements are assumed to be zero too. Need to replace this by some check on the polynomial.
lib_fx_stack.get_frac_over_ind(F_first_sample, F1, F_rates, F_fs, F_ind, F_side)[source]

Get locations of samples to be added/dropped due to fractional bit shift.

F_first_sample
list with the first sample number for each vector of data.
F1
list of vectors with new data for each stpol (only lengths are read).
F_rates
model/clock delay information for computing delays.
F_fs
sampling frequency for each vector with new data.
F_ind
list with identifiers for stations (for logging).
F_side
list with sidebands
F_frac_over_ind
list of [number_of_samples_to_be_added_or_dropped,[list_of_locations_for_these_changes]] for each element in F1.

Algorithm:

Compute integer+fractional sample delay at both extremes of the vector with data, then find intersection
with changes in fractional delay (given a fractional sample correction between 0 and 1, it checks for crossing at 0.5).


Debugging:

Activate DEBUG_FRAC_OVER for tabulated output of the fractional overflow corrections.


TO DO:

Add support for multiple samples.
lib_fx_stack.get_rotator(x)[source]

Get a single complex rotator from a list of rotators.

x : list of complex.
rotator
product of the elements in x.
TO DO: consider removing, devised to apply many rotators, but no longer needed.
lib_fx_stack.get_val_for_fringe_exp(sideband, data_type, freq_channel, fs, r_recalc)[source]

Compute vector ‘x’ with values that will go into e^j(2.pi.x) to be used in fringe rotation.

sideband : char {‘L’,’U’}
‘L’ for LSB, ‘U’ for USB.
datatype : char {‘c’,’r’}
‘c’ for complex samples, ‘r’ for real samples.
freq_channel : float
channel frequency [Hz] as in configuration file.
fs : float
sampling frequency [Hz].
r_recalc : 1D np.array of float
delay values.
val : 1D np.array of float
results.

Configuration:

Enable USE_NE_FRINGE for using numexpr, and if so, configure THREADS_NE for multithreading.


TO DO:

This should be checked.
Case USB complex not tested yet.
lib_fx_stack.hstack_new_samples(F1_partial, F_ind_partial, F_ind, F1, F_adj_shift_partial, F_stack_shift, F_lti_in, F_first_sample, mode_str='', F_frac_over_ind=[])[source]

Concatenation of new samples with previously saved.

F1_partial
previously saved samples.
F_ind_partial
identifiers for each row of saved samples.
F_ind
identifiers for rows of new samples.
F1
new samples.
F_adj_shift_partial
[unused] previously used for keeping track of the number of samples to add/drop, superseded by F_frac_over.
F_refs
indices for F1 based on F1_partial, used for other lists obtained in update_stored_samples(): (F_delay,F_rates,etc).
F_stack_shift
[unused] previously used to keep track of added/dropped samples.
F_lti
list with last sample (l), total number of samples processed (t), invalid samples (i), and adjuted samples for each stream.
F_adj_shift_partial
number of samples that have to be adjusted
F_first_sample
list with first sample corresponding to the streams in F1.
mode_str
“f” for normal operation model, “pcal” for saving the samples for phase calibration.
F_frac_over
list with the number of positions of the samples added/dropped due to fractional sample correction overflow,
see get_frac_over_ind() and fix_frac_over() for more details.
F1_partial_out
F1_partial with samples from F1 added.
F_ind_partial_out
station-polarization identifiers for the streams in F1_partial_out (same format as in keys).
F_refs_out
indices to F_frac etc structures based on F1_partial.
F_stack_shift_out
[unused] previously used to keep track of added/dropped samples.
F_lti_out
F_lti updated.
F1_out
Emtpy list of arrays.

Procedure:

If there are stored samples, iterate over list of stored samples:
1. [currently disabled] Add zero padding if first sample does not match the expected first sample number.
2. Concatenate new samples (accesed at F1 via F_ref) with stored samples.
Otherwise initialize structures and store new samples.


Approximations:

-Zero-padding currently disabled, new samples are simply stored. This due to the need to debug yet the offsets in the
update of added/dropped samples from fix_frac_over(). Although these numbers are obtained in the mapper, there may
be invalid frames that thus leave a gap.
-It is not checked if the first sample number is lesser than the last sample stored, but this should not be necessary.


Debugging:

Activate DEBUG_HSTACK for tabulated output of the concatenations.


TO DO:

Group repeated code, and create functions for managing F_lti.
lib_fx_stack.multiply_accumulate(accu_prod, v1fft, v2fft)[source]

[Only used in one-baseline-per-task mode.] Multiply and accumulate two ffts (one for each station).

lib_fx_stack.normalize_mat(acc_mat, count_acc, using_autocorrs=1)[source]

Normalice results in accumulation matrix.

acc_mat
accumulation matrix (see compute_x_all for the definition of the matrix).
count_acc
number of accumulations.
using_autocorrs
1 by default, normalization using auto-correlations.
acc_mat_out
normalized accumulation matrix.

Known issues:

Need to check scaling.

TO DO:

Take into account valid samples!
lib_fx_stack.shortest_row_F(F1)[source]

Get the minimum and maximum length of the elements of F1.

F1 : list of numpy 1D arrays.
shortest_row
length of the shortest element in F1.
longest_row
length of the longest element in F1.
lib_fx_stack.window_and_fft(v1_dequant, fft_size, windowing, flatten_chunks=1, dtype_complex=<class 'complex'>, rfft_data_type='c')[source]

Apply window and do FFT of set of samples, to be grouped into chunks of FFT size.

v1_dequant
numpy arrays with complex samples in the time domain (fringe rotation already applied to them).
fft_size
number of coefficients in the FFT.
windowing
shape of the window to be applied prior to FFT, square by default.
flatten_chunks
1 if one-baseline-per-task mode, otherwise all-baselines-per-task.
dtype_complex
complex type to be used in initialization of arrays.
rfft_data_type
[unused] initially devised to use rfft, currently not used.
v1_fft
numpy array of arrays, with as many rows as the ratio between number of samples and fft_size (forced to
be integer), and as many columns of fft_size.

Performance:

Using scipy fft, which yielded the highest performance on preliminary benchmarking with single thread reducer.
PyFFTW implemented, but needs to be tested and benchmarked.


**TO DO: **

Add a more elegant implementation for windowing, and add more windows.
Currently no windowing by default, may need to migrate functionality from this to windowing modes.