Skip to content

Commit

Permalink
Add ecephys.get_sorter_name()
Browse files Browse the repository at this point in the history
Closes #9
  • Loading branch information
bjhardcastle committed Sep 16, 2024
1 parent a493298 commit 03346c4
Showing 1 changed file with 132 additions and 0 deletions.
132 changes: 132 additions & 0 deletions src/aind_session/extensions/ecephys.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import contextlib
import datetime
import itertools
import json
import logging
from typing import ClassVar, Literal

Expand Down Expand Up @@ -462,6 +465,135 @@ def current_sorting_computations(
ttl_hash=aind_session.utils.get_ttl_hash(1 * 60),
)

@staticmethod
def get_sorter_name(data_asset_id: str | codeocean.data_asset.DataAsset) -> str:
"""
Get the version of the Kilosort pipeline used to create the sorted data asset.
Tries to find `sorter_name` in the following json files, in order, for any
probe:
- `processing.json` (in root of asset)
- `si_folder.json` (in `spikesorted` dir)
- `sorting.json` (in `postprocessed` dir)
- `params.json` (in root of asset, for older assets)
Raises `ValueError` if none of the json files exist, or if none contain the
`sorter_name` key, either of which indicates that the asset data is
incomplete due to the sorting pipeline failing for all probes.
Examples
--------
- processing.json['processing_pipeline']['data_processes'][index]['parameters']['sorter_name']:
>>> aind_session.ecephys.get_sorter_name('921a186a-d8ff-4efc-8e1a-891fde8cd394')
'kilosort2_5'
- processing.json['processing_pipeline']['data_processes'][index]['parameters']['sorter_name']:
>>> aind_session.ecephys.get_sorter_name('01d9d159-96f0-43f1-9d14-29b5c2521d94')
'kilosort4'
- processing.json['data_processes'][index]['parameters']['sorter_name']:
>>> aind_session.ecephys.get_sorter_name('205fc2d0-5f00-468f-a82d-47c94afcd40c')
'kilosort2_5'
- spikesorted/si_folder.json['annotations']['__sorting_info__']['params']['sorter_name']:
>>> aind_session.ecephys.get_sorter_name('bd0ad804-4a33-4613-9d6c-6281e442bade')
'kilosort2_5'
- params.json['spikesorting']['sorter_name']
>>> aind_session.ecephys.get_sorter_name('0eca2d35-5c8c-48bb-a921-e48cf3d871de')
'kilosort2_5'
- no sorter_name available:
>>> aind_session.ecephys.get_sorter_name('b4a7757c-6826-49eb-b3dd-d6cd871c5e7c')
Traceback (most recent call last):
...
ValueError: Sorting data are incomplete for
data_asset_id='b4a7757c-6826-49eb-b3dd-d6cd871c5e7c' (pipeline likely failed) - cannot get sorter name
"""
source_dir = aind_session.utils.codeocean_utils.get_data_asset_source_dir(
aind_session.utils.codeocean_utils.get_normalized_uuid(data_asset_id)
)

def _get_sorter_name_from_processing_json(source_dir: upath.UPath) -> str:
processing_path = source_dir / "processing.json"
if not processing_path.exists():
raise FileNotFoundError(f"No 'processing.json' found in {source_dir}")
processing_text = processing_path.read_text()
if '"sorter_name":' not in processing_text:
raise KeyError(f"No 'sorter_name' value found in processing.json for {data_asset_id=}")
processing: dict = json.loads(processing_text)
if "processing_pipeline" in processing:
data_processes = processing["processing_pipeline"]["data_processes"]
else:
assert "data_processes" in processing, f"Fix method of getting sorter name: 'data_processes' not in processing.json for {data_asset_id=}"
data_processes = processing["data_processes"]
for p in data_processes:
if isinstance(p, list):
sorting: dict = next(
(d for d in p if d.get("name") == "Spike sorting"),
{},
)
break
else:
if p.get("name") == "Spike sorting":
sorting = p
break
else:
raise AssertionError(
f"Fix method of getting sorter name: 'sorter_name' is in processing.json, but not in expected location for {data_asset_id=}"
)
assert (
"parameters" in sorting
), f"Fix method of getting sorter name: 'parameters' not in 'Spike sorting' data process in processing.json for {data_asset_id=}"
if "sorter_name" not in sorting["parameters"]:
raise KeyError(f"No 'sorter_name' key found in sorting parameters in processing.json")
sorter_name: str = sorting["parameters"]["sorter_name"]
logger.debug(f"Found sorter_name key in processing.json: {sorter_name}")
return sorter_name

def _get_sorter_name_from_sorted_folders(source_dir: upath.UPath) -> str:
json_paths = []
for json_path in (
itertools.chain(
(source_dir / "spikesorted").rglob("si_folder.json"),
(source_dir / "postprocessed").rglob("sorting.json"),
)
):
json_paths.append(json_path)
info = json_path.read_text()
if '"sorter_name":' in info:
sorter_name = json.loads(info)["annotations"]["__sorting_info__"]["params"]["sorter_name"]
logger.debug(f"Found sorter_name key in {json_path.name}: {sorter_name}")
return sorter_name
else:
if not json_paths:
raise FileNotFoundError(f"No 'processing.json', 'si_folder.json', or 'sorting.json' files found - asset {data_asset_id} likely contains incomplete data")
else:
raise KeyError(f"Fix method of getting sorter name: 'sorter_name' not a value in {set(p.name for p in json_paths)} for {data_asset_id=}")

def _get_sorter_name_from_params_json(source_dir: upath.UPath) -> str:
params_path = source_dir / "params.json"
if not params_path.exists():
raise FileNotFoundError(f"No 'params.json' found in {source_dir}")
params_text = params_path.read_text()
if '"sorter_name":' not in params_text:
raise KeyError(f"No 'sorter_name' key found in {params_path.name}")
params: dict = json.loads(params_text)
assert params, f"Fix method of getting sorter name: {params=} for {data_asset_id=}"
assert "spikesorting" in params, f"Fix method of getting sorter name: 'spikesorting' not in {params_path.name} for {data_asset_id=}"
assert "sorter_name" in params["spikesorting"], f"Fix method of getting sorter name: 'sorter_name' not in 'spikesorting' in {params_path.name} for {data_asset_id=}"
sorter_name = params["spikesorting"]["sorter_name"]
logger.debug(f"Found sorter_name key in params.json: {sorter_name}")
return sorter_name

with contextlib.suppress(FileNotFoundError, KeyError):
return _get_sorter_name_from_processing_json(source_dir)
with contextlib.suppress(FileNotFoundError, KeyError):
return _get_sorter_name_from_sorted_folders(source_dir)
with contextlib.suppress(FileNotFoundError, KeyError):
return _get_sorter_name_from_params_json(source_dir)
raise ValueError(f"Sorting data are incomplete for {data_asset_id=!r} (pipeline likely failed) - cannot get sorter name")

if __name__ == "__main__":
from aind_session import testmod
Expand Down

0 comments on commit 03346c4

Please sign in to comment.