diff --git a/kalman_detector/core.py b/kalman_detector/core.py index d9a4942..4a44490 100644 --- a/kalman_detector/core.py +++ b/kalman_detector/core.py @@ -3,6 +3,8 @@ import numpy as np from numba import jit +from kalman_detector.svm import kalman_binary_compress + @jit(nopython=True) def kalman_filter( @@ -18,23 +20,23 @@ def kalman_filter( Parameters ---------- spec : np.ndarray - 1d spectra + 1D array of spectrum values. spec_std : np.ndarray - 1d spectra noise std + 1D array of spectrum (noise) standard deviations. sig_eta : float State transition std or Process noise. Sets the smoothness scale of model change. e0 : float - initial guess of model expectation value in first channel, by default 0 + Initial guess of model expectation value in first channel, by default 0 v0 : float, optional - initial guess of model std in first channel, by default None + Initial guess of model std in first channel, by default None chan_mask : np.ndarray, optional mask of channels to ignore, by default None Returns ------- float - score, which is the likelihood of presence of signal. + score, which is the Log likelihood ratio of the NP hypothesis test. Notes ----- @@ -71,3 +73,41 @@ def kalman_filter( ) - 0.5 * np.log(2 * np.pi * spec_std[ichan] * spec_std[ichan]) return log_l_h1 - log_l_h0 + + +def kalman_filter_binary( + spec: np.ndarray, + spec_std: np.ndarray, + sig_eta: float, + e0: float = 0, + v0: float | None = None, +) -> float: + """Kalman score estimator for input 1d spectrum data in binary search tree. + + Parameters + ---------- + spec : np.ndarray + 1D array of spectrum values. + spec_std : np.ndarray + 1D array of spectrum (noise) standard deviations. + sig_eta : float + State transition std or Process noise. + e0 : float + Initial guess of the expected value of the first hidden state A0, by default 0. + v0 : float, optional + Initial guess of the variance of the first hidden state A0, by default None. + + Returns + ------- + float + score, Log likelihood ratio of the NP hypothesis test. + """ + if v0 is None: + v0 = np.median(spec_std) ** 2 + + state = kalman_binary_compress(spec, spec_std, sig_eta, e0, v0) + log_l_h1 = state.log_s + np.log(np.sqrt((2 * np.pi) ** 2 / np.linalg.det(state.m))) + log_l_h0 = np.sum( + 0.5 * np.log(1 / spec_std**2 / 2 / np.pi) - spec**2 / (2 * spec_std**2), + ) + return log_l_h1 - log_l_h0 diff --git a/kalman_detector/svm.py b/kalman_detector/svm.py index 1aa4c01..63f03e8 100644 --- a/kalman_detector/svm.py +++ b/kalman_detector/svm.py @@ -483,8 +483,8 @@ def kalman_binary_compress( spec: np.ndarray, spec_std: np.ndarray, sig_t: float, - e0: float = 0, - v0: float | None = None, + e0: float, + v0: float, ) -> State: """Kalman binary compression for a spectrum. @@ -497,18 +497,15 @@ def kalman_binary_compress( sig_t : float Standard deviation of the tranition between states. e0 : float - Initial guess of the expected value of the first hidden state A0, by default 0. - v0 : float, optional - Initial guess of the variance of the first hidden state A0, by default None. + Initial guess of the expected value of the first hidden state A0. + v0 : float + Initial guess of the variance of the first hidden state A0. Returns ------- State Final state for the whole spectrum. """ - if v0 is None: - v0 = np.median(spec_std) ** 2 - var_d = spec_std**2 var_t = sig_t**2 states = [ @@ -520,38 +517,3 @@ def kalman_binary_compress( while len(states) > 1: states = [states[i] + states[i + 1] for i in range(0, len(states), 2)] return states[0] - - -def kalman_binary_hypothesis( - spec: np.ndarray, - spec_std: np.ndarray, - sig_t: float, - e0: float, - v0: float, -) -> float: - """Calculate the log likelihood ratio of the NP hypothesis test using a binary tree. - - Parameters - ---------- - spec : np.ndarray - 1D array of spectrum values. - spec_std : np.ndarray - 1D array of spectrum standard deviations. - sig_t : float - Standard deviation of the tranition between states. - e0 : float - Expected value of the first hidden state A0. - v0 : float - Variance of the first hidden state A0. - - Returns - ------- - float - Log likelihood ratio of the NP hypothesis test. - """ - state = kalman_binary_compress(spec, spec_std, sig_t, e0, v0) - log_l_h1 = state.log_s + np.log(np.sqrt((2 * np.pi) ** 2 / np.linalg.det(state.m))) - log_l_h0 = np.sum( - 0.5 * np.log(1 / spec_std**2 / 2 / np.pi) - spec**2 / (2 * spec_std**2), - ) - return log_l_h1 - log_l_h0 diff --git a/tests/test_kalman.py b/tests/test_kalman.py index 126df8d..0c6d5f4 100644 --- a/tests/test_kalman.py +++ b/tests/test_kalman.py @@ -1,16 +1,7 @@ -from pathlib import Path - import numpy as np import pytest -from numpy.polynomial import Polynomial -from kalman_detector import utils -from kalman_detector.core import kalman_filter -from kalman_detector.main import ( - KalmanDetector, - KalmanDistribution, - secondary_spectrum_cumulative_chi2_score, -) +from kalman_detector.core import kalman_filter, kalman_filter_binary class TestKalmanFilter: @@ -20,156 +11,66 @@ def test_kalman_filter(self) -> None: rng = np.random.default_rng() spec_std = rng.normal(1, 0.1, size=nchans) spec = rng.normal(target, spec_std) - score = kalman_filter(spec, spec_std, 0.1) + score = kalman_filter.py_func(spec, spec_std, 0.1) mask = np.zeros(nchans, dtype=bool) mask[rng.choice(nchans, 2, replace=False)] = True - score_masked = kalman_filter(spec, spec_std, 0.1, chan_mask=mask) + score_masked = kalman_filter.py_func(spec, spec_std, 0.1, chan_mask=mask) np.testing.assert_allclose(score, score_masked, rtol=1e-1) -class TestKalmanDetector: - def test_q_par_float(self) -> None: - q_par = 0.1 - std_vec = np.ones(336) - kalman = KalmanDetector(std_vec, q_par**2) - np.testing.assert_array_equal(kalman.transit_sigmas, np.array([q_par])) - - def test_q_par_list(self) -> None: - q_par = [0.1, 0.2] - q_par_sq = [qq**2 for qq in q_par] - std_vec = np.ones(336) - kalman = KalmanDetector(std_vec, q_par_sq) - np.testing.assert_array_equal(kalman.transit_sigmas, np.array(q_par)) - - def test_initialization_fail(self) -> None: - std_vec = np.zeros(1024) - with pytest.raises(ValueError): - KalmanDetector(std_vec) - - def test_prepare_fits(self) -> None: - q_par = [0.1, 0.2] - std_vec = np.arange(0.1, 1, 0.01) - kalman = KalmanDetector(std_vec, q_par) - kalman.prepare_fits(ntrials=1000) - assert isinstance(kalman.distributions[0], KalmanDistribution) - np.testing.assert_equal(len(kalman.distributions), len(q_par)) - - def test_get_significance(self) -> None: - nchans = 128 - target = 5 +class TestKalmanDetectorBinary: + @pytest.mark.parametrize("v0", [0.001, 0.01, 0.1, 1, 10, 100, 1000, 10000]) + @pytest.mark.parametrize("nchans", [2, 128, 4096]) + def test_consistency_v0(self, v0: float, nchans: int) -> None: rng = np.random.default_rng() - std_vec = rng.normal(1, 0.1, size=nchans) - kalman = KalmanDetector(std_vec) - kalman.prepare_fits(ntrials=1000) - spectrum = rng.normal(target, std_vec) - sigs, scores = kalman.get_significance(spectrum) - np.testing.assert_equal(len(sigs), len(scores)) - np.testing.assert_array_less(sigs, 0.1) - - def test_get_significance_fail(self) -> None: - nchans = 128 + spec = rng.lognormal(0, 0.3, nchans) + spec_std = rng.lognormal(0, 0.3, nchans) + sig_t = rng.lognormal(0, 0.3) + e0 = rng.lognormal(0, 0.3) + + kalman1d = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0) + kalman2d = kalman_filter_binary(spec, spec_std, sig_t, e0=e0, v0=v0) + np.testing.assert_almost_equal(kalman1d, kalman2d, decimal=10) + + @pytest.mark.parametrize("e0", [-10, -1, -0.1, -0.01, 0, 0.01, 0.1, 1, 10]) + @pytest.mark.parametrize("nchans", [2, 128, 4096]) + def test_consistency_e0(self, e0: float, nchans: int) -> None: rng = np.random.default_rng() - std_vec = rng.normal(1, 0.1, size=nchans) - spectrum = np.ones(nchans + 1) - kalman = KalmanDetector(std_vec) - kalman.prepare_fits(ntrials=1000) - with pytest.raises(ValueError): - kalman.get_significance(spectrum) - - def test_get_best_significance(self) -> None: - nchans = 336 - corr_len = 300 - ntrials = 1000 - target_snr = 20 - - scores_arr = [] - template = utils.simulate_gaussian_signal( - nchans, - corr_len, - complex_process=True, - ) - spec_mean = np.zeros_like(template) - spec_std = np.ones_like(template) - kalman = KalmanDetector(spec_std) - kalman.prepare_fits(ntrials=1000) + spec = rng.lognormal(0, 0.3, nchans) + spec_std = rng.lognormal(0, 0.3, nchans) + sig_t = rng.lognormal(0, 0.3) + v0 = rng.lognormal(0, 0.3) + + kalman1d = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0) + kalman2d = kalman_filter_binary(spec, spec_std, sig_t, e0=e0, v0=v0) + np.testing.assert_almost_equal(kalman2d, kalman1d, decimal=10) + # Test without v0 + kalman1d_v0 = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0) + kalman2d_v0 = kalman_filter_binary(spec, spec_std, sig_t, e0=e0) + np.testing.assert_almost_equal(kalman1d_v0, kalman2d_v0, decimal=10) + + @pytest.mark.parametrize("sig_t", [0.01, 0.1, 1, 10, 100, 1000]) + @pytest.mark.parametrize("nchans", [2, 128, 1024]) + def test_consistency_eta(self, sig_t: float, nchans: int) -> None: rng = np.random.default_rng() - for _ in range(ntrials): - spec = target_snr * template + rng.normal( - spec_mean, - spec_std, - len(template), - ) - best_sig = kalman.get_best_significance(spec) - scores_arr.append(best_sig) - score_mean = utils.get_snr_from_logsf(np.array(scores_arr).mean()) - np.testing.assert_allclose(score_mean, target_snr, rtol=0.2) - - -class TestKalmanDistribution: - def test_initialization(self) -> None: - sigma_arr = np.arange(0.1, 1, 0.01) - sig_eta = 0.1 - mask_tol = 0.1 - ntrials = 1000 - kdist = KalmanDistribution( - sigma_arr, - sig_eta, - ntrials=ntrials, - mask_tol=mask_tol, - ) - np.testing.assert_equal(kdist.mask_tol, mask_tol) - np.testing.assert_equal(len(kdist.mask), len(sigma_arr)) - np.testing.assert_equal(kdist.sig_eta, sig_eta) - np.testing.assert_equal(kdist.ntrials, ntrials) - - def test_initialization_fail(self) -> None: - sigma_arr = np.zeros(1024) - with pytest.raises(ValueError): - KalmanDistribution(sigma_arr, 0.1) - - def test_plot_diagnostic(self, tmpfile: str) -> None: - outfile_path = Path(f"{tmpfile}.pdf") - sigma_arr = np.arange(0.1, 1, 0.01) - kdist = KalmanDistribution(sigma_arr, 0.01, ntrials=1000) - kdist.plot_diagnostic(logy=True, outfile=outfile_path.as_posix()) - assert outfile_path.is_file() - outfile_path.unlink() - - def test_polyfit(self) -> None: - sigma_arr = np.arange(0.1, 1, 0.01) - kdist = KalmanDistribution(sigma_arr, 0.01, ntrials=1000) - assert isinstance(kdist.polyfit, Polynomial) - - def test_str(self) -> None: - sigma_arr = np.arange(0.1, 1, 0.01) - kdist = KalmanDistribution(sigma_arr, 0.01, ntrials=1000) - assert str(kdist).startswith("KalmanDistribution") - assert repr(kdist) == str(kdist) - + spec = rng.lognormal(0, 0.3, nchans) + spec_std = rng.lognormal(0, 0.3, nchans) + e0 = rng.lognormal(0, 0.3) + v0 = rng.lognormal(0, 0.3) -class TestSecondarySpectrym: - def test_secondary_spectrum_cumulative_chi2_score(self) -> None: - nchans = 336 - corr_len = 300 - ntrials = 1000 - target_snr = 20 + kalman1d = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0) + kalman2d = kalman_filter_binary(spec, spec_std, sig_t, e0=e0, v0=v0) + np.testing.assert_almost_equal(kalman2d, kalman1d, decimal=10) - scores_arr = [] - template = utils.simulate_gaussian_signal( - nchans, - corr_len, - complex_process=True, - ) - spec_mean = np.zeros_like(template) - spec_std = np.ones_like(template) + @pytest.mark.parametrize("nchans", [2, 4, 16, 64, 256, 1024, 4096, 8192]) + def test_consistency_nchans(self, nchans: int) -> None: rng = np.random.default_rng() - for _ in range(ntrials): - spec = target_snr * template + rng.normal( - spec_mean, - spec_std, - len(template), - ) - score = secondary_spectrum_cumulative_chi2_score(spec, spec_std) - scores_arr.append(score) - score_mean = utils.get_snr_from_logsf(np.array(scores_arr).mean()) - np.testing.assert_allclose(score_mean, target_snr, rtol=0.2) + spec = rng.lognormal(0, 0.3, nchans) + spec_std = rng.lognormal(0, 0.3, nchans) + sig_t = rng.lognormal(0, 0.3) + e0 = rng.lognormal(0, 0.3) + v0 = rng.lognormal(0, 0.3) + + kalman1d = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0) + kalman2d = kalman_filter_binary(spec, spec_std, sig_t, e0=e0, v0=v0) + np.testing.assert_almost_equal(kalman2d, kalman1d, decimal=10) diff --git a/tests/test_kalman_binary.py b/tests/test_kalman_binary.py deleted file mode 100644 index 004282d..0000000 --- a/tests/test_kalman_binary.py +++ /dev/null @@ -1,68 +0,0 @@ -import numpy as np -import pytest - -from kalman_detector.core import kalman_filter -from kalman_detector.svm import kalman_binary_hypothesis - - -class TestKalmanDetector2D: - @pytest.mark.parametrize("v0", [0.001, 0.01, 0.1, 1, 10, 100, 1000, 10000]) - @pytest.mark.parametrize("nchans", [2, 128, 4096]) - def test_consistency_v0(self, v0: float, nchans: int) -> None: - rng = np.random.default_rng() - spec = rng.lognormal(0, 0.3, nchans) - spec_std = rng.lognormal(0, 0.3, nchans) - sig_t = rng.lognormal(0, 0.3) - e0 = rng.lognormal(0, 0.3) - - kalman1d = kalman_filter(spec, spec_std, sig_t, e0=e0, v0=v0) - kalman2d = kalman_binary_hypothesis(spec, spec_std, sig_t, e0=e0, v0=v0) - np.testing.assert_almost_equal(kalman1d, kalman2d, decimal=10) - kalman1d_py = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0) - np.testing.assert_almost_equal(kalman1d, kalman1d_py, decimal=10) - - @pytest.mark.parametrize("e0", [-10, -1, -0.1, -0.01, 0, 0.01, 0.1, 1, 10]) - @pytest.mark.parametrize("nchans", [2, 128, 4096]) - def test_consistency_e0(self, e0: float, nchans: int) -> None: - rng = np.random.default_rng() - spec = rng.lognormal(0, 0.3, nchans) - spec_std = rng.lognormal(0, 0.3, nchans) - sig_t = rng.lognormal(0, 0.3) - v0 = rng.lognormal(0, 0.3) - - kalman1d = kalman_filter(spec, spec_std, sig_t, e0=e0, v0=v0) - kalman2d = kalman_binary_hypothesis(spec, spec_std, sig_t, e0=e0, v0=v0) - np.testing.assert_almost_equal(kalman2d, kalman1d, decimal=10) - # Test without v0 - kalman1d_py = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0) - np.testing.assert_almost_equal(kalman1d, kalman1d_py, decimal=10) - - @pytest.mark.parametrize("sig_t", [0.01, 0.1, 1, 10, 100, 1000]) - @pytest.mark.parametrize("nchans", [2, 128, 1024]) - def test_consistency_eta(self, sig_t: float, nchans: int) -> None: - rng = np.random.default_rng() - spec = rng.lognormal(0, 0.3, nchans) - spec_std = rng.lognormal(0, 0.3, nchans) - e0 = rng.lognormal(0, 0.3) - v0 = rng.lognormal(0, 0.3) - - kalman1d = kalman_filter(spec, spec_std, sig_t, e0=e0, v0=v0) - kalman2d = kalman_binary_hypothesis(spec, spec_std, sig_t, e0=e0, v0=v0) - np.testing.assert_almost_equal(kalman2d, kalman1d, decimal=10) - kalman1d_py = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0) - np.testing.assert_almost_equal(kalman1d, kalman1d_py, decimal=10) - - @pytest.mark.parametrize("nchans", [2, 4, 16, 64, 256, 1024, 4096, 8192]) - def test_consistency_nchans(self, nchans: int) -> None: - rng = np.random.default_rng() - spec = rng.lognormal(0, 0.3, nchans) - spec_std = rng.lognormal(0, 0.3, nchans) - sig_t = rng.lognormal(0, 0.3) - e0 = rng.lognormal(0, 0.3) - v0 = rng.lognormal(0, 0.3) - - kalman1d = kalman_filter(spec, spec_std, sig_t, e0=e0, v0=v0) - kalman2d = kalman_binary_hypothesis(spec, spec_std, sig_t, e0=e0, v0=v0) - np.testing.assert_almost_equal(kalman2d, kalman1d, decimal=10) - kalman1d_py = kalman_filter.py_func(spec, spec_std, sig_t, e0=e0, v0=v0) - np.testing.assert_almost_equal(kalman1d, kalman1d_py, decimal=10) diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..75c3999 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,160 @@ +from pathlib import Path + +import numpy as np +import pytest +from numpy.polynomial import Polynomial + +from kalman_detector import utils +from kalman_detector.main import ( + KalmanDetector, + KalmanDistribution, + secondary_spectrum_cumulative_chi2_score, +) + + +class TestKalmanDetector: + def test_q_par_float(self) -> None: + q_par = 0.1 + std_vec = np.ones(336) + kalman = KalmanDetector(std_vec, q_par**2) + np.testing.assert_array_equal(kalman.transit_sigmas, np.array([q_par])) + + def test_q_par_list(self) -> None: + q_par = [0.1, 0.2] + q_par_sq = [qq**2 for qq in q_par] + std_vec = np.ones(336) + kalman = KalmanDetector(std_vec, q_par_sq) + np.testing.assert_array_equal(kalman.transit_sigmas, np.array(q_par)) + + def test_initialization_fail(self) -> None: + std_vec = np.zeros(1024) + with pytest.raises(ValueError): + KalmanDetector(std_vec) + + def test_prepare_fits(self) -> None: + q_par = [0.1, 0.2] + std_vec = np.arange(0.1, 1, 0.01) + kalman = KalmanDetector(std_vec, q_par) + kalman.prepare_fits(ntrials=1000) + assert isinstance(kalman.distributions[0], KalmanDistribution) + np.testing.assert_equal(len(kalman.distributions), len(q_par)) + + def test_get_significance(self) -> None: + nchans = 128 + target = 5 + rng = np.random.default_rng() + std_vec = rng.normal(1, 0.1, size=nchans) + kalman = KalmanDetector(std_vec) + kalman.prepare_fits(ntrials=1000) + spectrum = rng.normal(target, std_vec) + sigs, scores = kalman.get_significance(spectrum) + np.testing.assert_equal(len(sigs), len(scores)) + np.testing.assert_array_less(sigs, 0.1) + + def test_get_significance_fail(self) -> None: + nchans = 128 + rng = np.random.default_rng() + std_vec = rng.normal(1, 0.1, size=nchans) + spectrum = np.ones(nchans + 1) + kalman = KalmanDetector(std_vec) + kalman.prepare_fits(ntrials=1000) + with pytest.raises(ValueError): + kalman.get_significance(spectrum) + + def test_get_best_significance(self) -> None: + nchans = 336 + corr_len = 300 + ntrials = 1000 + target_snr = 20 + + scores_arr = [] + template = utils.simulate_gaussian_signal( + nchans, + corr_len, + complex_process=True, + ) + spec_mean = np.zeros_like(template) + spec_std = np.ones_like(template) + kalman = KalmanDetector(spec_std) + kalman.prepare_fits(ntrials=1000) + rng = np.random.default_rng() + for _ in range(ntrials): + spec = target_snr * template + rng.normal( + spec_mean, + spec_std, + len(template), + ) + best_sig = kalman.get_best_significance(spec) + scores_arr.append(best_sig) + score_mean = utils.get_snr_from_logsf(np.array(scores_arr).mean()) + np.testing.assert_allclose(score_mean, target_snr, rtol=0.2) + + +class TestKalmanDistribution: + def test_initialization(self) -> None: + sigma_arr = np.arange(0.1, 1, 0.01) + sig_eta = 0.1 + mask_tol = 0.1 + ntrials = 1000 + kdist = KalmanDistribution( + sigma_arr, + sig_eta, + ntrials=ntrials, + mask_tol=mask_tol, + ) + np.testing.assert_equal(kdist.mask_tol, mask_tol) + np.testing.assert_equal(len(kdist.mask), len(sigma_arr)) + np.testing.assert_equal(kdist.sig_eta, sig_eta) + np.testing.assert_equal(kdist.ntrials, ntrials) + + def test_initialization_fail(self) -> None: + sigma_arr = np.zeros(1024) + with pytest.raises(ValueError): + KalmanDistribution(sigma_arr, 0.1) + + def test_plot_diagnostic(self, tmpfile: str) -> None: + outfile_path = Path(f"{tmpfile}.pdf") + sigma_arr = np.arange(0.1, 1, 0.01) + kdist = KalmanDistribution(sigma_arr, 0.01, ntrials=1000) + kdist.plot_diagnostic(logy=True, outfile=outfile_path.as_posix()) + assert outfile_path.is_file() + outfile_path.unlink() + + def test_polyfit(self) -> None: + sigma_arr = np.arange(0.1, 1, 0.01) + kdist = KalmanDistribution(sigma_arr, 0.01, ntrials=1000) + assert isinstance(kdist.polyfit, Polynomial) + + def test_str(self) -> None: + sigma_arr = np.arange(0.1, 1, 0.01) + kdist = KalmanDistribution(sigma_arr, 0.01, ntrials=1000) + assert str(kdist).startswith("KalmanDistribution") + assert repr(kdist) == str(kdist) + + +class TestSecondarySpectrym: + def test_secondary_spectrum_cumulative_chi2_score(self) -> None: + nchans = 336 + corr_len = 300 + ntrials = 1000 + target_snr = 20 + + scores_arr = [] + template = utils.simulate_gaussian_signal( + nchans, + corr_len, + complex_process=True, + ) + spec_mean = np.zeros_like(template) + spec_std = np.ones_like(template) + rng = np.random.default_rng() + for _ in range(ntrials): + spec = target_snr * template + rng.normal( + spec_mean, + spec_std, + len(template), + ) + score = secondary_spectrum_cumulative_chi2_score(spec, spec_std) + scores_arr.append(score) + score_mean = utils.get_snr_from_logsf(np.array(scores_arr).mean()) + np.testing.assert_allclose(score_mean, target_snr, rtol=0.2)