Skip to content

Commit

Permalink
improved tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pravirkr committed May 26, 2024
1 parent 9d6601f commit c606326
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 268 deletions.
50 changes: 45 additions & 5 deletions kalman_detector/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import numpy as np
from numba import jit

from kalman_detector.svm import kalman_binary_compress


@jit(nopython=True)
def kalman_filter(
Expand All @@ -18,23 +20,23 @@ def kalman_filter(
Parameters
----------
spec : np.ndarray
1d spectra
1D array of spectrum values.
spec_std : np.ndarray
1d spectra noise std
1D array of spectrum (noise) standard deviations.
sig_eta : float
State transition std or Process noise. Sets the smoothness scale of
model change.
e0 : float
initial guess of model expectation value in first channel, by default 0
Initial guess of model expectation value in first channel, by default 0
v0 : float, optional
initial guess of model std in first channel, by default None
Initial guess of model std in first channel, by default None
chan_mask : np.ndarray, optional
mask of channels to ignore, by default None
Returns
-------
float
score, which is the likelihood of presence of signal.
score, which is the Log likelihood ratio of the NP hypothesis test.
Notes
-----
Expand Down Expand Up @@ -71,3 +73,41 @@ def kalman_filter(
) - 0.5 * np.log(2 * np.pi * spec_std[ichan] * spec_std[ichan])

return log_l_h1 - log_l_h0


def kalman_filter_binary(
spec: np.ndarray,
spec_std: np.ndarray,
sig_eta: float,
e0: float = 0,
v0: float | None = None,
) -> float:
"""Kalman score estimator for input 1d spectrum data in binary search tree.
Parameters
----------
spec : np.ndarray
1D array of spectrum values.
spec_std : np.ndarray
1D array of spectrum (noise) standard deviations.
sig_eta : float
State transition std or Process noise.
e0 : float
Initial guess of the expected value of the first hidden state A0, by default 0.
v0 : float, optional
Initial guess of the variance of the first hidden state A0, by default None.
Returns
-------
float
score, Log likelihood ratio of the NP hypothesis test.
"""
if v0 is None:
v0 = np.median(spec_std) ** 2

state = kalman_binary_compress(spec, spec_std, sig_eta, e0, v0)
log_l_h1 = state.log_s + np.log(np.sqrt((2 * np.pi) ** 2 / np.linalg.det(state.m)))
log_l_h0 = np.sum(
0.5 * np.log(1 / spec_std**2 / 2 / np.pi) - spec**2 / (2 * spec_std**2),
)
return log_l_h1 - log_l_h0
48 changes: 5 additions & 43 deletions kalman_detector/svm.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ def kalman_binary_compress(
spec: np.ndarray,
spec_std: np.ndarray,
sig_t: float,
e0: float = 0,
v0: float | None = None,
e0: float,
v0: float,
) -> State:
"""Kalman binary compression for a spectrum.
Expand All @@ -497,18 +497,15 @@ def kalman_binary_compress(
sig_t : float
Standard deviation of the tranition between states.
e0 : float
Initial guess of the expected value of the first hidden state A0, by default 0.
v0 : float, optional
Initial guess of the variance of the first hidden state A0, by default None.
Initial guess of the expected value of the first hidden state A0.
v0 : float
Initial guess of the variance of the first hidden state A0.
Returns
-------
State
Final state for the whole spectrum.
"""
if v0 is None:
v0 = np.median(spec_std) ** 2

var_d = spec_std**2
var_t = sig_t**2
states = [
Expand All @@ -520,38 +517,3 @@ def kalman_binary_compress(
while len(states) > 1:
states = [states[i] + states[i + 1] for i in range(0, len(states), 2)]
return states[0]


def kalman_binary_hypothesis(
spec: np.ndarray,
spec_std: np.ndarray,
sig_t: float,
e0: float,
v0: float,
) -> float:
"""Calculate the log likelihood ratio of the NP hypothesis test using a binary tree.
Parameters
----------
spec : np.ndarray
1D array of spectrum values.
spec_std : np.ndarray
1D array of spectrum standard deviations.
sig_t : float
Standard deviation of the tranition between states.
e0 : float
Expected value of the first hidden state A0.
v0 : float
Variance of the first hidden state A0.
Returns
-------
float
Log likelihood ratio of the NP hypothesis test.
"""
state = kalman_binary_compress(spec, spec_std, sig_t, e0, v0)
log_l_h1 = state.log_s + np.log(np.sqrt((2 * np.pi) ** 2 / np.linalg.det(state.m)))
log_l_h0 = np.sum(
0.5 * np.log(1 / spec_std**2 / 2 / np.pi) - spec**2 / (2 * spec_std**2),
)
return log_l_h1 - log_l_h0
205 changes: 53 additions & 152 deletions tests/test_kalman.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
from pathlib import Path

import numpy as np
import pytest
from numpy.polynomial import Polynomial

from kalman_detector import utils
from kalman_detector.core import kalman_filter
from kalman_detector.main import (
KalmanDetector,
KalmanDistribution,
secondary_spectrum_cumulative_chi2_score,
)
from kalman_detector.core import kalman_filter, kalman_filter_binary


class TestKalmanFilter:
Expand All @@ -20,156 +11,66 @@ def test_kalman_filter(self) -> None:
rng = np.random.default_rng()
spec_std = rng.normal(1, 0.1, size=nchans)
spec = rng.normal(target, spec_std)
score = kalman_filter(spec, spec_std, 0.1)
score = kalman_filter.py_func(spec, spec_std, 0.1)
mask = np.zeros(nchans, dtype=bool)
mask[rng.choice(nchans, 2, replace=False)] = True
score_masked = kalman_filter(spec, spec_std, 0.1, chan_mask=mask)
score_masked = kalman_filter.py_func(spec, spec_std, 0.1, chan_mask=mask)
np.testing.assert_allclose(score, score_masked, rtol=1e-1)


class TestKalmanDetector:
def test_q_par_float(self) -> None:
q_par = 0.1
std_vec = np.ones(336)
kalman = KalmanDetector(std_vec, q_par**2)
np.testing.assert_array_equal(kalman.transit_sigmas, np.array([q_par]))

def test_q_par_list(self) -> None:
q_par = [0.1, 0.2]
q_par_sq = [qq**2 for qq in q_par]
std_vec = np.ones(336)
kalman = KalmanDetector(std_vec, q_par_sq)
np.testing.assert_array_equal(kalman.transit_sigmas, np.array(q_par))

def test_initialization_fail(self) -> None:
std_vec = np.zeros(1024)
with pytest.raises(ValueError):
KalmanDetector(std_vec)

def test_prepare_fits(self) -> None:
q_par = [0.1, 0.2]
std_vec = np.arange(0.1, 1, 0.01)
kalman = KalmanDetector(std_vec, q_par)
kalman.prepare_fits(ntrials=1000)
assert isinstance(kalman.distributions[0], KalmanDistribution)
np.testing.assert_equal(len(kalman.distributions), len(q_par))

def test_get_significance(self) -> None:
nchans = 128
target = 5
class TestKalmanDetectorBinary:
@pytest.mark.parametrize("v0", [0.001, 0.01, 0.1, 1, 10, 100, 1000, 10000])
@pytest.mark.parametrize("nchans", [2, 128, 4096])
def test_consistency_v0(self, v0: float, nchans: int) -> None:
rng = np.random.default_rng()
std_vec = rng.normal(1, 0.1, size=nchans)
kalman = KalmanDetector(std_vec)
kalman.prepare_fits(ntrials=1000)
spectrum = rng.normal(target, std_vec)
sigs, scores = kalman.get_significance(spectrum)
np.testing.assert_equal(len(sigs), len(scores))
np.testing.assert_array_less(sigs, 0.1)

def test_get_significance_fail(self) -> None:
nchans = 128
spec = rng.lognormal(0, 0.3, nchans)
spec_std = rng.lognormal(0, 0.3, nchans)
sig_t = rng.lognormal(0, 0.3)
e0 = rng.lognormal(0, 0.3)

kalman1d = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0)
kalman2d = kalman_filter_binary(spec, spec_std, sig_t, e0=e0, v0=v0)
np.testing.assert_almost_equal(kalman1d, kalman2d, decimal=10)

@pytest.mark.parametrize("e0", [-10, -1, -0.1, -0.01, 0, 0.01, 0.1, 1, 10])
@pytest.mark.parametrize("nchans", [2, 128, 4096])
def test_consistency_e0(self, e0: float, nchans: int) -> None:
rng = np.random.default_rng()
std_vec = rng.normal(1, 0.1, size=nchans)
spectrum = np.ones(nchans + 1)
kalman = KalmanDetector(std_vec)
kalman.prepare_fits(ntrials=1000)
with pytest.raises(ValueError):
kalman.get_significance(spectrum)

def test_get_best_significance(self) -> None:
nchans = 336
corr_len = 300
ntrials = 1000
target_snr = 20

scores_arr = []
template = utils.simulate_gaussian_signal(
nchans,
corr_len,
complex_process=True,
)
spec_mean = np.zeros_like(template)
spec_std = np.ones_like(template)
kalman = KalmanDetector(spec_std)
kalman.prepare_fits(ntrials=1000)
spec = rng.lognormal(0, 0.3, nchans)
spec_std = rng.lognormal(0, 0.3, nchans)
sig_t = rng.lognormal(0, 0.3)
v0 = rng.lognormal(0, 0.3)

kalman1d = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0)
kalman2d = kalman_filter_binary(spec, spec_std, sig_t, e0=e0, v0=v0)
np.testing.assert_almost_equal(kalman2d, kalman1d, decimal=10)
# Test without v0
kalman1d_v0 = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0)
kalman2d_v0 = kalman_filter_binary(spec, spec_std, sig_t, e0=e0)
np.testing.assert_almost_equal(kalman1d_v0, kalman2d_v0, decimal=10)

@pytest.mark.parametrize("sig_t", [0.01, 0.1, 1, 10, 100, 1000])
@pytest.mark.parametrize("nchans", [2, 128, 1024])
def test_consistency_eta(self, sig_t: float, nchans: int) -> None:
rng = np.random.default_rng()
for _ in range(ntrials):
spec = target_snr * template + rng.normal(
spec_mean,
spec_std,
len(template),
)
best_sig = kalman.get_best_significance(spec)
scores_arr.append(best_sig)
score_mean = utils.get_snr_from_logsf(np.array(scores_arr).mean())
np.testing.assert_allclose(score_mean, target_snr, rtol=0.2)


class TestKalmanDistribution:
def test_initialization(self) -> None:
sigma_arr = np.arange(0.1, 1, 0.01)
sig_eta = 0.1
mask_tol = 0.1
ntrials = 1000
kdist = KalmanDistribution(
sigma_arr,
sig_eta,
ntrials=ntrials,
mask_tol=mask_tol,
)
np.testing.assert_equal(kdist.mask_tol, mask_tol)
np.testing.assert_equal(len(kdist.mask), len(sigma_arr))
np.testing.assert_equal(kdist.sig_eta, sig_eta)
np.testing.assert_equal(kdist.ntrials, ntrials)

def test_initialization_fail(self) -> None:
sigma_arr = np.zeros(1024)
with pytest.raises(ValueError):
KalmanDistribution(sigma_arr, 0.1)

def test_plot_diagnostic(self, tmpfile: str) -> None:
outfile_path = Path(f"{tmpfile}.pdf")
sigma_arr = np.arange(0.1, 1, 0.01)
kdist = KalmanDistribution(sigma_arr, 0.01, ntrials=1000)
kdist.plot_diagnostic(logy=True, outfile=outfile_path.as_posix())
assert outfile_path.is_file()
outfile_path.unlink()

def test_polyfit(self) -> None:
sigma_arr = np.arange(0.1, 1, 0.01)
kdist = KalmanDistribution(sigma_arr, 0.01, ntrials=1000)
assert isinstance(kdist.polyfit, Polynomial)

def test_str(self) -> None:
sigma_arr = np.arange(0.1, 1, 0.01)
kdist = KalmanDistribution(sigma_arr, 0.01, ntrials=1000)
assert str(kdist).startswith("KalmanDistribution")
assert repr(kdist) == str(kdist)

spec = rng.lognormal(0, 0.3, nchans)
spec_std = rng.lognormal(0, 0.3, nchans)
e0 = rng.lognormal(0, 0.3)
v0 = rng.lognormal(0, 0.3)

class TestSecondarySpectrym:
def test_secondary_spectrum_cumulative_chi2_score(self) -> None:
nchans = 336
corr_len = 300
ntrials = 1000
target_snr = 20
kalman1d = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0)
kalman2d = kalman_filter_binary(spec, spec_std, sig_t, e0=e0, v0=v0)
np.testing.assert_almost_equal(kalman2d, kalman1d, decimal=10)

scores_arr = []
template = utils.simulate_gaussian_signal(
nchans,
corr_len,
complex_process=True,
)
spec_mean = np.zeros_like(template)
spec_std = np.ones_like(template)
@pytest.mark.parametrize("nchans", [2, 4, 16, 64, 256, 1024, 4096, 8192])
def test_consistency_nchans(self, nchans: int) -> None:
rng = np.random.default_rng()
for _ in range(ntrials):
spec = target_snr * template + rng.normal(
spec_mean,
spec_std,
len(template),
)
score = secondary_spectrum_cumulative_chi2_score(spec, spec_std)
scores_arr.append(score)
score_mean = utils.get_snr_from_logsf(np.array(scores_arr).mean())
np.testing.assert_allclose(score_mean, target_snr, rtol=0.2)
spec = rng.lognormal(0, 0.3, nchans)
spec_std = rng.lognormal(0, 0.3, nchans)
sig_t = rng.lognormal(0, 0.3)
e0 = rng.lognormal(0, 0.3)
v0 = rng.lognormal(0, 0.3)

kalman1d = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0)
kalman2d = kalman_filter_binary(spec, spec_std, sig_t, e0=e0, v0=v0)
np.testing.assert_almost_equal(kalman2d, kalman1d, decimal=10)
Loading

0 comments on commit c606326

Please sign in to comment.