Source code for matvis.wrapper

"""Simple example wrapper for basic usage of matvis."""

from __future__ import annotations

import logging
from typing import Literal

import numpy as np
from astropy import units as un
from astropy.coordinates import EarthLocation, SkyCoord
from astropy.time import Time
from pyuvdata import UVBeam
from pyuvdata.analytic_beam import AnalyticBeam
from pyuvdata.beam_interface import BeamInterface

from . import HAVE_GPU, cpu
from .core.beams import prepare_beam_unpolarized

if HAVE_GPU:
    from . import gpu

logger = logging.getLogger(__name__)


[docs] def simulate_vis( ants: dict[int, np.ndarray], fluxes: np.ndarray, ra: np.ndarray, dec: np.ndarray, freqs: np.ndarray, times: Time, beams: list[AnalyticBeam | UVBeam | BeamInterface], telescope_loc: EarthLocation, polarized: bool = False, precision: Literal[1, 2] = 1, use_feed: Literal["x", "y"] = "x", use_gpu: bool = False, beam_spline_opts: dict | None = None, beam_idx: np.ndarray | None = None, antpairs: np.ndarray | list[tuple[int, int]] | None = None, source_buffer: float = 1.0, coord_method: Literal[ "CoordinateRotationAstropy", "CoordinateRotationERFA", "GPUCoordinateRotationERFA", ] = "CoordinateRotationAstropy", coord_method_params: dict | None = None, matprod_method: Literal[ "MatMul", "VectorLoop", "CPUMatMul", "GPUMatMul", "CPUVectorLoop", "GPUVectorLoop", ] = "MatMul", **backend_kwargs, ): """ Run a basic simulation using ``matvis``. This wrapper handles the necessary coordinate conversions etc. Parameters ---------- ants : dict Dictionary of antenna positions. The keys are the antenna names (integers) and the values are the Cartesian x,y,z positions of the antennas (in meters) relative to the array center. fluxes : array_like 2D array with the flux of each source as a function of frequency, of shape (NSRCS, NFREQS). ra, dec : array_like Arrays of source RA and Dec positions in radians. RA goes from [0, 2 pi] and Dec from [-pi/2, +pi/2]. freqs : array_like Frequency channels for the simulation, in Hz. times : astropy.Time instance Times of the observation (can be an array of times). beams : list of ``UVBeam``, ``AnalyticBeam`` or ``BeamInterface`` objects Beam objects to use for each antenna. telescope_loc An EarthLocation object representing the center of the array. polarized : bool, optional If True, use polarized beams and calculate all available linearly- polarized visibilities, e.g. V_nn, V_ne, V_en, V_ee. Default: False (only uses the 'ee' polarization). precision : int, optional Which precision setting to use for :func:`~matvis`. If set to ``1``, uses the (``np.float32``, ``np.complex64``) dtypes. If set to ``2``, uses the (``np.float64``, ``np.complex128``) dtypes. use_feed Either 'x' or 'y'. Only used if polarized is False. use_gpu : bool, optional Whether to use the GPU for simulation. beam_spline_opts : dict, optional Options to be passed to :meth:`pyuvdata.uvbeam.UVBeam.interp` as `spline_opts`. beam_idx An array of integers, of the same length as ``ants``. Each entry is for an antenna of the same index, and its value should be the index of the beam in the beam list that corresponds to the antenna. antpairs A list of antpairs (in the form of 2-tuples of integers) to actually calculate visibility for. If None, all feed-pairs are calculated. source_buffer : float, optional The fraction of the total number of sources to use when allocating memory for the sources above horizon. For large numbers of sources, a fraction of ~0.55 should be sufficient. coord_method The method to use to transform coordinates from the equatorial to horizontal frame. The default is to use Astropy coordinate transforms. A faster option, which is accurate to within 6 mas, is to use "CoordinateTransformERFA" (or its GPU version, if using GPU). coord_method_params Parameters particular to the coordinate rotation method of choice. For example, for the CoordinateRotationERFA (and GPU version of the same) method, there is the parameter ``update_bcrs_every``, which should be a time in seconds, for which larger values speed up the computation. matprod_method The method to use for the final matrix multiplication. Default is 'MatMul', which simply uses matrix multiplication over the two full matrices. Currently, the other option is `VectorLoop`, which uses a loop over the antenna pairs, computing the sum over sources as a vector dot product, which can be faster for large arrays where `antpairs` is small (possibly from high redundancy). You should run a performance test before changing this. If not CPU/GPU prefix is specified, it will be added automatically based on the value of `use_gpu`. Returns ------- vis : array_like Complex array of shape (NFREQS, NTIMES, NBLS, NFEED, NFEED) if ``polarized == True``, or (NFREQS, NTIMES, NBLS) otherwise. """ if use_gpu: if not HAVE_GPU: raise ImportError("You cannot use GPU without installing GPU-dependencies!") import cupy as cp device = cp.cuda.Device() attrs = device.attributes attrs = {str(k): v for k, v in attrs.items()} string = "\n\t".join(f"{k}: {v}" for k, v in attrs.items()) logger.debug(f""" Your GPU has the following attributes: \t{string} """) fnc = gpu.simulate if use_gpu else cpu.simulate assert fluxes.shape == ( ra.size, freqs.size, ), "The `fluxes` array must have shape (NSRCS, NFREQS)." # Determine precision complex_dtype = np.complex64 if precision == 1 else np.complex128 # Get polarization information from beams if polarized: nfeeds = getattr(beams[0], "Nfeeds", 2) # Antenna x,y,z positions antpos = np.array([ants[k] for k in ants.keys()]) nants = antpos.shape[0] skycoords = SkyCoord(ra=ra * un.rad, dec=dec * un.rad, frame="icrs") npairs = len(antpairs) if antpairs is not None else nants * nants if polarized: vis = np.zeros( (freqs.size, times.size, npairs, nfeeds, nfeeds), dtype=complex_dtype ) else: vis = np.zeros((freqs.size, times.size, npairs), dtype=complex_dtype) if matprod_method in ["MatMul", "VectorLoop"]: matprod_method = f"GPU{matprod_method}" if use_gpu else f"CPU{matprod_method}" # Loop over frequencies and call matvis_cpu/gpu for i, freq in enumerate(freqs): vis[i] = fnc( antpos=antpos, freq=freq, times=times, skycoords=skycoords, telescope_loc=telescope_loc, I_sky=fluxes[:, i], beam_list=beams, precision=precision, polarized=polarized, beam_spline_opts=beam_spline_opts, beam_idx=beam_idx, antpairs=antpairs, source_buffer=source_buffer, matprod_method=matprod_method, coord_method=coord_method, coord_method_params=coord_method_params, **backend_kwargs, ) return vis