#! /usr/bin/env python
# -*- mode: python; coding: utf-8 -*-
# Copyright 2016 Peter Williams <> and collaborators.
# Licensed under the MIT License.
# XXX remove hashbang
"""Compute phase closure diagnostics from a Measurement Set
For most results to make sense, the data should be observations of a bright
point source at phase center.
from __future__ import absolute_import, division, print_function, unicode_literals
__all__ = str ('Config closures').split ()
import collections
import six, numpy as np
from six.moves import range
# XXX relativize e.g. "...cli"
from pwkit.cli import check_usage, die
from pwkit.kwargv import ParseKeywords, Custom
from import util
from import sanitize_unicode as b
closures_doc = \
<closures> vis=MS [keywords...]
Phase closure triples.
Path of the MeasurementSet dataset to read. Required.
array=, baseline=, field=, observation=, polarization=, scan=,
scanintent=, spw=, taql=, time=, uvdist=
MeasurementSet selectors used to filter the input data.
Default polarization is 'RR,LL'. All polarizations are averaged
together, so mixing parallel- and cross-hand pols is almost
never what you want to do.
Name of the column to use for visibility data. Defaults to 'data'.
You might want it to be 'corrected_data'.
class Config (ParseKeywords):
vis = Custom (str, required=True)
datacol = 'data'
believeweights = False
# MeasurementSet filters
array = str
baseline = str
field = str
observation = str
polarization = 'RR,LL'
scan = str
scanintent = str
spw = str
taql = str
time = str
uvdist = str
loglevel = 'warn'
# ########################################################################
# Begin copying/emulating mirtask.util
FPOL_X = 0
FPOL_Y = 1
FPOL_R = 2
FPOL_L = 3
FPOL_I = 4
FPOL_Q = 5
FPOL_U = 6
FPOL_V = 7
fpol_names = 'XYRLIQUV'
# This table helps split a CASA pol code into a pair of fpol values. The pair
# is packed into 8 bits, the upper 3 being for the left pol and the lower 4
# being for the right.
_pol_to_fpol = np.array ([
0xFFFF, # ?, illegal
0x44, 0x55, 0x66, 0x77, # I Q U V
0x22, 0x23, 0x32, 0x33, # RR RL LR LL
0x00, 0x01, 0x10, 0x11, # XX XY YX YY
0x20, 0x21, 0x30, 0x31, # RX RY LX LY
0x02, 0x03, 0x12, 0x13, # XR XL YR YL
# not bothering with the rest because seriously
pol_is_intensity = np.array ([
False, # ?, illegal
True, True, True, True, # I Q U V
True, False, False, True, # RR RL LR LL
True, False, False, True, # XX XY YX YY
False, False, False, False, # RX RY LX LY
False, False, False, False, # XR XL YR YL
# not bothering with the rest because seriously
# This table performs the reverse mapping, with index being the two f-pol
# values packed into four bits each. A value of 0xFF indicates an illegal
# pairing. The values come from
_fpol_to_pol = np.ndarray (128, dtype=np.int8)
_fpol_to_pol.fill (0xFF)
_fpol_to_pol[0x00] = 9
_fpol_to_pol[0x01] = 10
_fpol_to_pol[0x10] = 11
_fpol_to_pol[0x11] = 12
_fpol_to_pol[0x22] = 5
_fpol_to_pol[0x23] = 5
_fpol_to_pol[0x32] = 7
_fpol_to_pol[0x33] = 8
_fpol_to_pol[0x44] = 1
_fpol_to_pol[0x55] = 2
_fpol_to_pol[0x66] = 3
_fpol_to_pol[0x77] = 4
# A "antpol" (AP) is an integer identifying an antenna/fpol pair. It
# can be decoded without any external information. We used zero-based
# integer antenna numbers so that
# AP = M << 3 + FP
def ap_format (ap, getname=str):
return '%s%c' % (getname (ap >> 3), fpol_names[ap & 0x7])
ap_ant = lambda ap: ap >> 3
ap_fpol = lambda ap: ap & 0x7
antpol_to_ap = lambda antnum, fpol: (antnum << 3) + fpol
# A "basepol" is 2-tuple of antpols.
def bp_format (bp, getname=str):
ap1, ap2 = bp
if ap1 < 0:
raise ValueError ('first antpol %d is negative' % ap1)
if ap2 < 0:
raise ValueError ('second antpol %d is negative' % ap2)
return '%s%c-%s%c' % (getname (ap1 >> 3), fpol_names[ap1 & 0x7],
getname (ap2 >> 3), fpol_names[ap2 & 0x7])
def bp_to_aap (bp):
"""Converts a basepol into a tuple of (ant1, ant2, pol)."""
ap1, ap2 = bp
if ap1 < 0:
raise ValueError ('first antpol %d is negative' % ap1)
if ap2 < 0:
raise ValueError ('second antpol %d is negative' % ap2)
pol = _fpol_to_pol[((ap1 & 0x7) << 4) + (ap2 & 0x7)]
if pol == 0xFF:
raise ValueError ('no CASA polarization code for pairing '
'%c-%c' % (fpol_names[ap1 & 0x7],
fpol_names[ap2 & 0x7]))
return ap1 >> 3, ap2 >> 3, pol
def aap_to_bp (ant1, ant2, pol):
"""Create a basepol from antenna numbers and a CASA polarization code."""
if ant1 < 0:
raise ValueError ('first antenna is below 0: %s' % ant1)
if ant2 < ant1:
raise ValueError ('second antenna is below first: %s' % ant2)
if pol < 1 or pol > 12:
raise ValueError ('illegal polarization code %s' % pol)
fps = _pol_to_fpol[pol]
ap1 = (ant1 << 3) + ((fps >> 4) & 0x07)
ap2 = (ant2 << 3) + (fps & 0x07)
return ap1, ap2
# End copying/emulating mirtask.util
# ########################################################################
class StatsCollector (object):
def __init__ (self, chunk0size=64):
self.chunk0size = chunk0size
self._keymap = collections.defaultdict (lambda: len (self._keymap))
self._m0 = None # 0th moment
self._m1 = None #
self._m2 = None # 2nd moment
def accum (self, key, value, weight=1):
index = self._keymap[key]
if self._m0 is None:
self._m0 = np.zeros ((self.chunk0size,), dtype=np.result_type (weight))
self._m1 = np.zeros ((self.chunk0size,), dtype=np.result_type (value, weight))
self._m2 = np.zeros_like (self._m1)
elif index >= self._m0.size:
self._m0 = np.concatenate ((self._m0, np.zeros_like (self._m0)))
self._m1 = np.concatenate ((self._m1, np.zeros_like (self._m1)))
self._m2 = np.concatenate ((self._m2, np.zeros_like (self._m2)))
self._m0[index] += weight
q = weight * value
self._m1[index] += q
q *= value
self._m2[index] += q
return self
def finish (self, keyset, mask=True):
"""Returns (weights, means, variances), where:
ndarray of number of samples per key
computed mean value for each key
computed variance for each key
n_us = len (self._keymap)
# By definition (for now), wt >= 1 everywhere, so we don't need to
# worry about div-by-zero.
wt_us = self._m0[:n_us]
mean_us = self._m1[:n_us] / wt_us
var_us = self._m2[:n_us] / wt_us - mean_us**2
n_them = len (keyset)
wt = np.zeros (n_them, dtype=self._m0.dtype)
mean = np.empty (n_them, dtype=self._m1.dtype)
mean.fill (np.nan)
var = np.empty_like (mean)
var.fill (np.nan)
us_idx = []
them_idx = []
for them_i, key in enumerate (keyset):
us_i = self._keymap[key]
if us_i < n_us:
them_idx.append (them_i)
us_idx.append (us_i)
# otherwise, we must not have seen that key
wt[them_idx] = wt_us[us_idx]
mean[them_idx] = mean_us[us_idx]
var[them_idx] = var_us[us_idx]
if mask:
m = ~np.isfinite (mean)
mean = (mean, m)
var = (var, m)
self._m0 = self._m1 = self._m2 = None
self._keymap.clear ()
return wt, mean, var
class StatsCollector2D (object):
def __init__ (self, chunk0size=64):
self.chunk0size = chunk0size
self._key1map = collections.defaultdict (lambda: len (self._key1map))
self._key2map = collections.defaultdict (lambda: len (self._key2map))
self._m0 = None
self._m1 = None
self._m2 = None
def accum (self, key1, key2, value, weight=1):
index1 = self._key1map[key1]
index2 = self._key2map[key2]
if self._m0 is None:
self._m0 = np.zeros ((self.chunk0size, self.chunk0size), dtype=np.result_type (weight))
self._m1 = np.zeros ((self.chunk0size, self.chunk0size), dtype=np.result_type (value, weight))
self._m2 = np.zeros_like (self._m1)
if index1 >= self._m0.shape[0]:
self._m0 = np.concatenate ((self._m0, np.zeros_like (self._m0)), axis=0)
self._m1 = np.concatenate ((self._m1, np.zeros_like (self._m1)), axis=0)
self._m2 = np.concatenate ((self._m2, np.zeros_like (self._m2)), axis=0)
if index2 >= self._m0.shape[1]:
self._m0 = np.concatenate ((self._m0, np.zeros_like (self._m0)), axis=1)
self._m1 = np.concatenate ((self._m1, np.zeros_like (self._m1)), axis=1)
self._m2 = np.concatenate ((self._m2, np.zeros_like (self._m2)), axis=1)
self._m0[index1,index2] += weight
q = weight * value
self._m1[index1,index2] += q
q *= value
self._m2[index1,index2] += q
return self
def finish (self, key1set, key2set, mask=True):
"""Returns (weights, means, variances), where:
ndarray of number of samples per key; shape (n1, n2), where n1 is the
size of key1set and n2 is the size of key2set.
computed mean value for each key
computed variance for each key
n1_us = len (self._key1map)
n2_us = len (self._key2map)
wt_us = self._m0[:n1_us,:n2_us]
badwt = (wt_us == 0) | ~np.isfinite (wt_us)
wt_us[badwt] = 1
mean_us = self._m1[:n1_us,:n2_us] / wt_us
var_us = self._m2[:n1_us,:n2_us] / wt_us - mean_us**2
wt_us[badwt] = 0
mean_us[badwt] = np.nan
var_us[badwt] = np.nan
n1_them = len (key1set)
n2_them = len (key2set)
wt = np.zeros ((n1_them, n2_them), dtype=self._m0.dtype)
mean = np.empty ((n1_them, n2_them), dtype=self._m1.dtype)
mean.fill (np.nan)
var = np.empty_like (mean)
var.fill (np.nan)
# You can't fancy-index on two axes simultaneously, so we do a manual
# loop on the first axis.
us_idx2 = []
them_idx2 = []
for them_i2, key2 in enumerate (key2set):
us_i2 = self._key2map[key2]
if us_i2 < n2_us:
them_idx2.append (them_i2)
us_idx2.append (us_i2)
# otherwise, we must not have seen that key
for them_i1, key1 in enumerate (key1set):
us_i1 = self._key1map[key1]
if us_i1 >= n1_us:
continue # don't have this key
wt[them_i1,them_idx2] = wt_us[us_i1,us_idx2]
mean[them_i1,them_idx2] = mean_us[us_i1,us_idx2]
var[them_i1,them_idx2] = var_us[us_i1,us_idx2]
if mask:
m = ~np.isfinite (mean)
mean = (mean, m)
var = (var, m)
self._m0 = self._m1 = self._m2 = None
self._key1map.clear ()
self._key2map.clear ()
return wt, mean, var
class StatsCollectorND (object):
"""This is vaguely like StatsCollector2D, but rather than having two discrete
keyed axes, we are passed one key and an N-dimensional vector of values.
def __init__ (self, chunk0size=64):
self.chunk0size = chunk0size
self._keymap = collections.defaultdict (lambda: len (self._keymap))
self._m0 = None
self._m1 = None
self._m2 = None
def accum (self, key, values, weights=1):
index = self._keymap[key]
values = np.asarray (values)
weights = np.broadcast_to (weights, values.shape)
if self._m0 is None:
self._m0 = np.zeros ((self.chunk0size,) + values.shape, dtype=weights.dtype)
self._m1 = np.zeros ((self.chunk0size,) + values.shape, dtype=np.result_type (weights, values))
self._m2 = np.zeros_like (self._m1)
elif index >= self._m0.size:
self._m0 = np.concatenate ((self._m0, np.zeros_like (self._m0)))
self._m1 = np.concatenate ((self._m1, np.zeros_like (self._m1)))
self._m2 = np.concatenate ((self._m2, np.zeros_like (self._m2)))
if values.shape != self._m0.shape[1:]:
raise ValueError ('inconsistent `values` shapes: had %r, got %r' % (
self._m0.shape[1:], values.shape))
self._m0[index] += weights
q = weights * values
self._m1[index] += q
q *= values
self._m2[index] += q
return self
def finish (self, keyset, mask=True):
"""Returns (weights, means, variances), where:
total weights per key
computed mean value for each key
computed variance for each key
The arrays all have a shape of `(nkeys,)+shape(values)`.
n_us = len (self._keymap)
wt_us = self._m0[:n_us]
badwt = (wt_us == 0) | ~np.isfinite (wt_us)
wt_us[badwt] = 1.
mean_us = self._m1[:n_us] / wt_us
var_us = self._m2[:n_us] / wt_us - mean_us**2
wt_us[badwt] = 0
mean_us[badwt] = np.nan
var_us[badwt] = np.nan
n_them = len (keyset)
wt = np.zeros ((n_them,) + mean_us.shape[1:], dtype=self._m0.dtype)
mean = np.empty ((n_them,) + mean_us.shape[1:], dtype=self._m1.dtype)
mean.fill (np.nan)
var = np.empty_like (mean)
var.fill (np.nan)
us_idx = []
them_idx = []
for them_i, key in enumerate (keyset):
us_i = self._keymap[key]
if us_i < n_us:
them_idx.append (them_i)
us_idx.append (us_i)
# otherwise, we must not have seen that key
wt[them_idx] = wt_us[us_idx]
mean[them_idx] = mean_us[us_idx]
var[them_idx] = var_us[us_idx]
if mask:
m = ~np.isfinite (mean)
mean = (mean, m)
var = (var, m)
self._m0 = self._m1 = self._m2 = None
self._keymap.clear ()
return wt, mean, var
def postproc (stats_result):
"""Simple helper to postprocess angular outputs from StatsCollectors in the
way we want.
n, mean, scat = stats_result
mean *= 180 / np.pi # rad => deg
scat /= n # variance-of-samples => variance-of-mean
scat **= 0.5 # variance => stddev
scat *= 180 / np.pi # rad => deg
return mean, scat
def postproc_mask (stats_result):
"""Simple helper to postprocess angular outputs from StatsCollectors in the
way we want.
n, mean, scat = stats_result
ok = np.isfinite (mean)
n = n[ok]
mean = mean[ok]
scat = scat[ok]
mean *= 180 / np.pi # rad => deg
scat /= n # variance-of-samples => variance-of-mean
scat **= 0.5 # variance => stddev
scat *= 180 / np.pi # rad => deg
return ok, mean, scat
def grid_bp_data (bps, items, mask=True):
"""Given a bunch of scalars associated with intensity-type basepols, place
them onto a grid. There should be only two polarizations represented (e.g.
RR, LL); baselines for one of them will be put into the upper triangle of
the grid, while baselines for the other will be put into the lower triangle.
`bps` is an iterable that yields a superset of all bps to be gridded.
`items` is an iterable that yields tuples of (bp, value). (`bps` is
therefore a bit redundant with it, but this structure makes it so that we
don't need to iterate of `items` twice, which can be convenient.)
Returns: (pol1, pol2, ants, data), where
The polarization gridded into the upper triangle
The polarization gridded into the lower triangle
An array of the antenna numbers seen in `bps`
An n-by-n grid of the values, where n is the size of `ants`. Unsampled
basepols are filled with NaNs, or masked if `mask` is True.
The basepol (ant1, ant2, pol1) is gridded into `data[i1,i2]`, where `i1`
is the index of `ant1` in `ants`, etc. The basepol (ant1, ant2, pol2) is
gridded into `data[i2,i1]`.
seen_ants = set ()
seen_pols = set ()
for ant1, ant2, pol in (bp_to_aap (bp) for bp in bps):
seen_ants.add (ant1)
seen_ants.add (ant2)
seen_pols.add (pol)
if len (seen_pols) != 2:
raise Exception ('can only work with 2 polarizations')
pol1, pol2 = sorted (seen_pols)
seen_ants = np.array (sorted (seen_ants))
ant_map = dict ((a, i) for (i, a) in enumerate (seen_ants))
data = None
n = len (seen_ants)
for bp, value in items:
if data is None:
data = np.empty ((n, n), dtype=np.result_type (value))
data.fill (np.nan)
ant1, ant2, pol = bp_to_aap (bp)
i1 = ant_map[ant1]
i2 = ant_map[ant2]
if pol == pol1:
data[i1,i2] = value
data[i2,i1] = value
if mask:
data = (data, ~np.isfinite (data))
return pol1, pol2, seen_ants, data
class ClosureCalculator (object):
def process (self, cfg):
# Initialize whole-run buffers.
self.all_aps = set ()
self.all_bps = set ()
self.all_times = set ()
self.global_stats_by_time = StatsCollector ()
self.ap_stats_by_ddid = collections.defaultdict (StatsCollector)
self.bp_stats_by_ddid = collections.defaultdict (StatsCollector)
self.ap_spec_stats_by_ddid = collections.defaultdict (StatsCollectorND)
self.ap_time_stats_by_ddid = collections.defaultdict (StatsCollector2D)
self._process_ms (cfg)
return self
def _process_ms (self, cfg):
tb = ()
ms = ()
me = ()
# Prep. (b(cfg.vis))
ms.msselect (b(dict ((n, cfg.get (n)) for n in util.msselect_keys
if cfg.get (n) is not None)))
rangeinfo = ms.range (b'data_desc_id field_id'.split ())
ddids = rangeinfo['data_desc_id']
fields = rangeinfo['field_id'] (b(cfg.vis + '/DATA_DESCRIPTION'), nomodify=True)
ddid_to_polid = tb.getcol (b'POLARIZATION_ID')
ddid_to_spwid = tb.getcol (b'SPECTRAL_WINDOW_ID')
tb.close () (b(cfg.vis + '/POLARIZATION'), nomodify=True)
polid_to_polns = tb.getcol (b'CORR_TYPE')
tb.close () (b(cfg.vis + '/ANTENNA'), nomodify=True)
self.ant_names = tb.getcol (b'NAME')
self.ant_stations = tb.getcol (b'STATION')
tb.close ()
# Read stuff in. We can't expect that weight values have their
# absolute scale set correctly, but we can still use them to set the
# relative weighting of the data points.
# datacol is (ncorr, nchan, nchunk)
# flag is (ncorr, nchan, nchunk); zero means OK data
# weight is (ncorr, nchunk) [XXX: WEIGHT_SPECTRUM?]
# uvw is (3, nchunk)
# time is (nchunk)
# Iteration order, from slowest varying to fastest varying:
# spw, time, baseline, poln, frequency
# We are encouraged to use the new iteration interface
# (ms.niterinit(), etc), but as of 4.6.0 it is fundamentally broken
# (asking for ANTENNA2 gets you ANTENNA1) so never mind.
colnames = [cfg.datacol] + 'flag weight time antenna1 antenna2'.split ()
colnames = b(colnames)
for ddid in ddids:
ms.selectinit (ddid)
ms.iterinit (maxrows=4096)
ms.iterorigin ()
self.cur_ddid = ddid
self.cur_time = -1.
self._start_timeslot ()
all_polns = polid_to_polns[:,ddid_to_polid[ddid]]
polinfo = [(i, p) for i, p in enumerate (all_polns) if pol_is_intensity[p]]
while True:
cols = ms.getdata (items=colnames)
for i in range (cols['time'].size): # all records
time = cols['time'][i]
if time != self.cur_time:
self._finish_timeslot ()
self.cur_time = time
self._start_timeslot ()
antenna1 = cols['antenna1'][i]
antenna2 = cols['antenna2'][i]
if antenna1 == antenna2:
continue # no autocorrelations
for j, poln in polinfo: # all polns
flags = cols['flag'][j,:,i]
if flags.all ():
continue # all flagged
data = cols[cfg.datacol][j,:,i]
# data and flags are now both shape (nchan,)
np.logical_not (flags, flags)
# flags=1 now indicates good data
self.all_times.add (time)
bp = aap_to_bp (antenna1, antenna2, poln)
self.all_bps.add (bp)
self.data_by_bp[bp] = (data, flags) # + weight spectrum?
# Here we exploit the fact that we're only considering
# intensity-type polarizations, so bp[0] and bp[1]
# always have the same fpol. Also that ap_by_fpol is a
# defaultdict.
for ap in bp:
self.all_aps.add (ap)
self.ap_by_fpol[ap & 0x7].add (ap)
if not ms.iternext ():
self._finish_timeslot ()
ms.close ()
return self
def _getname (self, antidx):
return '%s@%s:' % (self.ant_names[antidx], self.ant_stations[antidx])
def _start_timeslot (self):
self.data_by_bp = {}
self.ap_by_fpol = collections.defaultdict (set)
def _finish_timeslot (self):
"""We have loaded in all of the visibilities in one timeslot. We can now
compute the phase closure triples.
XXX: we should only process independent triples. Are we???
for fpol, aps in self.ap_by_fpol.iteritems ():
aps = sorted (aps)
nap = len (aps)
for i1, ap1 in enumerate (aps):
for i2 in range (i1, nap):
ap2 = aps[i2]
bp1 = (ap1, ap2)
info = self.data_by_bp.get (bp1)
if info is None:
data1, flags1 = info
for i3 in range (i2, nap):
ap3 = aps[i3]
bp2 = (ap2, ap3)
info = self.data_by_bp.get (bp2)
if info is None:
data2, flags2 = info
bp3 = (ap1, aps[i3])
info = self.data_by_bp.get (bp3)
if info is None:
data3, flags3 = info
# try to minimize allocations:
tflags = flags1 & flags2
np.logical_and (tflags, flags3, tflags)
if not tflags.any ():
triple = data3.conj ()
np.multiply (triple, data1, triple)
np.multiply (triple, data2, triple)
self._process_sample (ap1, ap2, ap3, triple, tflags)
# Reset for next timeslot
self.cur_time = -1.
self.bp_by_ap = None
self.ap_by_fpol = None
def _process_sample (self, ap1, ap2, ap3, triple, tflags):
"""We have computed one independent phase closure triple in one timeslot.
# Frequency-resolved:
np.divide (triple, np.abs (triple), triple)
phase = np.angle (triple)
self.ap_spec_stats_by_ddid[self.cur_ddid].accum (ap1, phase, tflags + 0.)
self.ap_spec_stats_by_ddid[self.cur_ddid].accum (ap2, phase, tflags + 0.)
self.ap_spec_stats_by_ddid[self.cur_ddid].accum (ap3, phase, tflags + 0.)
# Frequency-averaged:
triple = (triple, tflags) / tflags.sum ()
phase = np.angle (triple)
self.global_stats_by_time.accum (self.cur_time, phase)
self.ap_stats_by_ddid[self.cur_ddid].accum (ap1, phase)
self.ap_stats_by_ddid[self.cur_ddid].accum (ap2, phase)
self.ap_stats_by_ddid[self.cur_ddid].accum (ap3, phase)
self.bp_stats_by_ddid[self.cur_ddid].accum ((ap1, ap2), phase)
self.bp_stats_by_ddid[self.cur_ddid].accum ((ap1, ap3), phase)
self.bp_stats_by_ddid[self.cur_ddid].accum ((ap2, ap3), phase)
self.ap_time_stats_by_ddid[self.cur_ddid].accum (self.cur_time, ap1, phase)
self.ap_time_stats_by_ddid[self.cur_ddid].accum (self.cur_time, ap2, phase)
self.ap_time_stats_by_ddid[self.cur_ddid].accum (self.cur_time, ap3, phase)
def report (self, cfg):
import omega as om
import omega.gtk3
from pwkit import ndshow_gtk3
self.all_aps = np.sort (list (self.all_aps))
self.all_bps = sorted (self.all_bps)
self.all_times = np.sort (list (self.all_times))
# Antpols by DDID, time:
data = []
descs = []
for ddid, stats in self.ap_time_stats_by_ddid.iteritems ():
mean, scat = postproc (stats.finish (self.all_times, self.all_aps))
data.append (mean / scat)
descs.append ('DDID %d' % ddid)
ndshow_gtk3.cycle (data, descs, run_main=True)
# Antpols by DDID, freq:
data = []
descs = []
for ddid, stats in self.ap_spec_stats_by_ddid.iteritems ():
mean, scat = postproc (stats.finish (self.all_aps))
data.append (mean / scat)
descs.append ('DDID %d' % ddid)
ndshow_gtk3.cycle (data, descs, run_main=True)
# Antpols by DDID
p = om.RectPlot ()
for ddid, stats in self.ap_stats_by_ddid.iteritems ():
ok, mean, scat = postproc_mask (stats.finish (self.all_aps))
p.addXYErr (np.arange (len (self.all_aps))[ok], mean, scat, 'DDID %d' % ddid)
p.setBounds (-0.5, len (self.all_aps) - 0.5)
p.addHLine (0, keyText=None, zheight=-1) ()
# Basepols by DDID
data = []
descs = []
tostatuses = []
def bpgrid_status (pol1, pol2, ants, yx):
i, j = [int (_) for _ in np.floor (yx + 0.5)]
if i < 0 or j < 0 or i >= ants.size or j >= ants.size:
return ''
ni = self._getname (ants[i])
nj = self._getname (ants[j])
if i <= j:
return '%s-%s %s' % (ni, nj, util.pol_names[pol1])
return '%s-%s %s' % (nj, ni, util.pol_names[pol2])
for ddid, stats in self.bp_stats_by_ddid.iteritems ():
mean, scat = postproc (stats.finish (self.all_bps))
nmean = mean / scat
from itertools import izip
pol1, pol2, ants, grid = grid_bp_data (self.all_bps,
izip (self.all_bps, nmean))
data.append (grid)
descs.append ('DDID %d' % ddid)
tostatuses.append (lambda yx: bpgrid_status (pol1, pol2, ants, yx))
ndshow_gtk3.cycle (data, descs, tostatuses=tostatuses, run_main=True)
# Everything by time
ok, mean, scat = postproc_mask (self.global_stats_by_time.finish (self.all_times))
stimes = self.all_times[ok] / 86400
st0 = int (np.floor (stimes.min ()))
stimes -= st0
p = om.quickXYErr (stimes, mean, scat)
p.addHLine (0, keyText=None, zheight=-1)
p.setXLabel ('MJD - %d' % st0) ()
def closures_cli (argv):
check_usage (closures_doc, argv, usageifnoargs=True)
cfg = Config ().parse (argv[1:])
util.logger (cfg.loglevel)
ClosureCalculator ().process (cfg).report (cfg)
if __name__ == '__main__': # XXX TEMP
import sys
from pwkit import cli
cli.unicode_stdio ()
cli.propagate_sigint ()
closures_cli (sys.argv)
