Skip to content

Commit

Permalink
Add SPDI normalization without using NCBI APIs
Browse files Browse the repository at this point in the history
Co-authored-by: Bob Dolin <bdolin@elimu.io>
  • Loading branch information
mihaitodor and rhdolin committed Aug 24, 2023
1 parent 11190a7 commit 7bdafaa
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 147 deletions.
4 changes: 4 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
import flask
from flask_cors import CORS
import os
from .fasta import download_fasta


def create_app():
# First ensure we have the fasta files locally
download_fasta()

# App and API
options = {
'swagger_url': '/',
Expand Down
150 changes: 3 additions & 147 deletions app/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from threading import Lock
from uuid import uuid4
import pyliftover
import requests
from datetime import datetime
import pymongo
from flask import abort
from itertools import groupby
import re
from .input_normalization import normalize

# MongoDB Client URIs
FHIR_genomics_data_client_uri = "mongodb+srv://download:download@cluster0.8ianr.mongodb.net/FHIRGenomicsData"
Expand Down Expand Up @@ -116,8 +116,6 @@ def get_liftover(from_db, to_db):

SUPPORTED_GENOMIC_SOURCE_CLASSES = ['germline', 'somatic']

NCBI_VARIATION_SERVICES_BASE_URL = 'https://api.ncbi.nlm.nih.gov/variation/v0/'

CHROMOSOME_CSV_FILE = 'app/_Dict_Chromosome.csv'

# Utility Functions
Expand Down Expand Up @@ -163,26 +161,6 @@ def merge_ranges(ranges):
return merged_ranges


def get_hgvs_contextuals_url(hgvs):
return f"{NCBI_VARIATION_SERVICES_BASE_URL}hgvs/{hgvs}/contextuals"


def get_spdi_all_equivalent_contextual_url(contextual_SPDI):
return f'{NCBI_VARIATION_SERVICES_BASE_URL}spdi/{contextual_SPDI}/all_equivalent_contextual'


def get_spdi_canonical_representative_url(contextual_SPDI):
return f'{NCBI_VARIATION_SERVICES_BASE_URL}spdi/{contextual_SPDI}/canonical_representative'


def build_spdi(seq_id, position, deleted_sequence, inserted_sequence):
return f"{seq_id}:{position}:{deleted_sequence}:{inserted_sequence}"


def get_spdi_elements(response_object):
return (response_object['seq_id'], response_object['position'], response_object['deleted_sequence'], response_object['inserted_sequence'])


def validate_subject(patient_id):
if not patients_db.find_one({"patientID": patient_id}):
abort(400, f"Patient ({patient_id}) not found.")
Expand Down Expand Up @@ -1002,133 +980,11 @@ def get_intersected_regions(bed_id, build, chrom, start, end, intersected_region


def hgvs_2_contextual_SPDIs(hgvs):

# convert hgvs to contextualSPDI
url = get_hgvs_contextuals_url(hgvs)
headers = {'Accept': 'application/json'}

r = requests.get(url, headers=headers)
if r.status_code != 200:
return False

response = r.json()
raw_data = response['data']
raw_SPDI = raw_data['spdis'][0]

seq_id, position, deleted_sequence, inserted_sequence = get_spdi_elements(raw_SPDI)

contextual_SPDI = build_spdi(seq_id, position, deleted_sequence, inserted_sequence)

# convert contextualSPDI to build37 and build38 contextual SPDIs
url = get_spdi_all_equivalent_contextual_url(contextual_SPDI)
headers = {'Accept': 'application/json'}

r = requests.get(url, headers=headers)
if r.status_code != 200:
return False

response = r.json()
raw_SPDI_List = response['data']['spdis']

b37SPDI = None
b38SPDI = None
for item in raw_SPDI_List:
if item['seq_id'].startswith("NC_"):
temp = get_build_and_chrom_by_ref_seq(item['seq_id'])
if temp:
seq_id, position, deleted_sequence, inserted_sequence = get_spdi_elements(item)

if temp['build'] == 'GRCh37':
b37SPDI = build_spdi(seq_id, position, deleted_sequence, inserted_sequence)
elif temp['build'] == 'GRCh38':
b38SPDI = build_spdi(seq_id, position, deleted_sequence, inserted_sequence)
else:
return False

return {"GRCh37": b37SPDI, "GRCh38": b38SPDI}


def hgvs_2_canonical_SPDI(hgvs):

# convert hgvs to contextualSPDI
url = get_hgvs_contextuals_url(hgvs)
headers = {'Accept': 'application/json'}

r = requests.get(url, headers=headers)
if r.status_code != 200:
return False

response = r.json()
raw_data = response['data']
raw_SPDI = raw_data['spdis'][0]

seq_id, position, deleted_sequence, inserted_sequence = get_spdi_elements(raw_SPDI)

contextual_SPDI = build_spdi(seq_id, position, deleted_sequence, inserted_sequence)

# convert contextualSPDI to canonical SPDI
url = get_spdi_canonical_representative_url(contextual_SPDI)
headers = {'Accept': 'application/json'}

r = requests.get(url, headers=headers)
if r.status_code != 200:
return False

response = r.json()
raw_SPDI = response['data']

seq_id, position, deleted_sequence, inserted_sequence = get_spdi_elements(raw_SPDI)

canonical_SPDI = build_spdi(seq_id, position, deleted_sequence, inserted_sequence)

return {"canonicalSPDI": canonical_SPDI}
return normalize(hgvs)


def SPDI_2_contextual_SPDIs(spdi):
url = get_spdi_all_equivalent_contextual_url(spdi)
headers = {'Accept': 'application/json'}

r = requests.get(url, headers=headers)
if r.status_code != 200:
return False

response = r.json()
raw_SPDI_List = response['data']['spdis']

b37SPDI = None
b38SPDI = None
for item in raw_SPDI_List:
if item['seq_id'].startswith("NC_"):
temp = get_build_and_chrom_by_ref_seq(item['seq_id'])
if temp:
seq_id, position, deleted_sequence, inserted_sequence = get_spdi_elements(item)

if temp['build'] == 'GRCh37':
b37SPDI = build_spdi(seq_id, position, deleted_sequence, inserted_sequence)
elif temp['build'] == 'GRCh38':
b38SPDI = build_spdi(seq_id, position, deleted_sequence, inserted_sequence)
else:
return False

return {"GRCh37": b37SPDI, "GRCh38": b38SPDI}


def SPDI_2_canonical_SPDI(spdi):
url = get_spdi_canonical_representative_url(spdi)
headers = {'Accept': 'application/json'}

r = requests.get(url, headers=headers)
if r.status_code != 200:
return False

response = r.json()
raw_SPDI = response['data']

seq_id, position, deleted_sequence, inserted_sequence = get_spdi_elements(raw_SPDI)

canonical_SPDI = build_spdi(seq_id, position, deleted_sequence, inserted_sequence)

return {"canonicalSPDI": canonical_SPDI}
return normalize(spdi)


def query_clinvar_by_variants(normalized_variant_list, code_list, query, population=False):
Expand Down
23 changes: 23 additions & 0 deletions app/fasta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pathlib import Path
from pyfastx import Fasta
from urllib.request import urlretrieve


def download_fasta():
try:
# Make sure the parent folder exists
Path('FASTA').mkdir(exist_ok=True)

for build in ['GRCh37', 'GRCh38']:
filename = build + '_latest_genomic.fna.gz'
filepath = 'FASTA/' + filename

# Download files
if not Path(filepath).is_file():
urlretrieve('https://ftp.ncbi.nlm.nih.gov/refseq/H_sapiens/annotation/' + build + '_latest/refseq_identifiers/' + filename, filepath)

# Build indexes
if not Path(filepath + '.fxi').is_file():
Fasta(filepath)
except Exception as error:
print(error)
168 changes: 168 additions & 0 deletions app/input_normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import hgvs.parser
import hgvs.dataproviders.uta
import hgvs.assemblymapper
from utilities.SPDI_Normalization import get_normalized_spdi

hgvsParser = hgvs.parser.Parser()
hgvsDataProvider = hgvs.dataproviders.uta.connect(
db_url="postgresql://anonymous:anonymous@uta.biocommons.org/uta/uta_20210129")
b37hgvsAssemblyMapper = hgvs.assemblymapper.AssemblyMapper(
hgvsDataProvider, assembly_name='GRCh37', alt_aln_method='splign', replace_reference=True)
b38hgvsAssemblyMapper = hgvs.assemblymapper.AssemblyMapper(
hgvsDataProvider, assembly_name='GRCh38', alt_aln_method='splign', replace_reference=True)

# ------------- point to latest data source ------------------------
# at unix command line: export UTA_DB_URL=postgresql://anonymous:anonymous@uta.biocommons.org/uta/uta_20210129

# ------------------ PARSE -------------


def parse_variant(variant):
parsed_variant_dict = dict()
parsed_variant_dict['parsed'] = hgvsParser.parse_hgvs_variant(variant)
return parsed_variant_dict

# ------------------ PROJECT -------------


def project_variant(parsed_variant):
projected_variant_dict = dict()
projected_variant_dict['b37projected'] = b37hgvsAssemblyMapper.c_to_g(
parsed_variant)
projected_variant_dict['b38projected'] = b38hgvsAssemblyMapper.c_to_g(
parsed_variant)
return projected_variant_dict

# ---------------- NORMALIZE to canonical SPDIs ---------------


def normalize_variant(parsed_variant, build):
pos = parsed_variant.posedit.pos.start.base-1
if parsed_variant.posedit.edit.ref:
ref = parsed_variant.posedit.edit.ref
else: # ref is blank for insertions
ref = ''
pos = pos+1
if str(parsed_variant.posedit.edit) == 'dup':
alt = ref+ref
elif parsed_variant.posedit.edit.alt:
alt = parsed_variant.posedit.edit.alt
else: # alt is blank for deletions
alt = ''
return get_normalized_spdi(parsed_variant.ac, pos, ref, alt, build)

# ---------------- CONVERT NM_HGVS to canonical SPDIs ---------------


def process_NM_HGVS(NM_HGVS):
parsed_variant_dict = parse_variant(NM_HGVS)
print(f"parsed: {parsed_variant_dict['parsed']}")

projected_variant_dict = project_variant(parsed_variant_dict['parsed'])
print(
f"b37projected: {projected_variant_dict['b37projected']}; b38projected: {projected_variant_dict['b38projected']}")

b37SPDI = normalize_variant(
projected_variant_dict['b37projected'], 'GRCh37')
b38SPDI = normalize_variant(
projected_variant_dict['b38projected'], 'GRCh38')
print(f"b37normalized: {b37SPDI}; b38normalized: {b38SPDI}")

return {"GRCh37": b37SPDI, "GRCh38": b38SPDI}


# ---------------- CONVERT NC_HGVS to canonical SPDIs ---------------


def process_NC_HGVS(NC_HGVS):
parsed_variant_dict = parse_variant(NC_HGVS)
parsed_variant = parsed_variant_dict['parsed']
print(f"parsed: {parsed_variant_dict['parsed']}")

var_c = b38hgvsAssemblyMapper.g_to_c(
parsed_variant, b38hgvsAssemblyMapper.relevant_transcripts(parsed_variant)[0])

projected_variant_dict = project_variant(var_c)
print(
f"b37projected: {projected_variant_dict['b37projected']}; b38projected: {projected_variant_dict['b38projected']}")

b37SPDI = normalize_variant(
projected_variant_dict['b37projected'], 'GRCh37')
b38SPDI = normalize_variant(
projected_variant_dict['b38projected'], 'GRCh38')
print(f"b37normalized: {b37SPDI}; b38normalized: {b38SPDI}")

return {"GRCh37": b37SPDI, "GRCh38": b38SPDI}

# ---------------- CONVERT NM_SPDI to canonical SPDIs ---------------


def process_NM_SPDI(NM_SPDI):
# convert SPDI into NM_HGVS then use NM_HGVS pipeline
refSeq = NM_SPDI.split(":")[0]
pos = int(NM_SPDI.split(":")[1])+1
ref = NM_SPDI.split(":")[2]
alt = NM_SPDI.split(":")[3]

if len(ref) == len(alt) == 1: # SNV
var_n = hgvsParser.parse_hgvs_variant(
refSeq+":n."+str(pos)+ref+">"+alt)
elif len(ref) == 0: # INS (e.g. NM_007294.3:c.5533_5534insG)
start = pos-1
end = start+1
var_n = hgvsParser.parse_hgvs_variant(
refSeq+":n."+str(start)+"_"+str(end)+'ins'+alt)
elif len(alt) == 0: # DEL (e.g. NM_000527.5:c.1350_1355del)
start = pos
end = start+len(ref)-1
var_n = hgvsParser.parse_hgvs_variant(
refSeq+":n."+str(start)+"_"+str(end)+'del')
elif len(alt) != 0 and len(ref) != 0: # DELINS (e.g. NM_007294.3:c.5359_5363delinsAGTGA)
start = pos
end = start+len(ref)-1
var_n = hgvsParser.parse_hgvs_variant(
refSeq+":n."+str(start)+"_"+str(end)+'delins'+alt)
NM_HGVS = b38hgvsAssemblyMapper.n_to_c(var_n)

return process_NM_HGVS(str(NM_HGVS))

# ---------------- CONVERT NC_SPDI to canonical SPDIs ---------------


def process_NC_SPDI(NC_SPDI):
# convert SPDI into NC_HGVS then use NC_HGVS pipeline
refSeq = NC_SPDI.split(":")[0]
pos = int(NC_SPDI.split(":")[1])+1
ref = NC_SPDI.split(":")[2]
alt = NC_SPDI.split(":")[3]

if len(ref) == len(alt) == 1: # SNV
NC_HGVS = (refSeq+":g."+str(pos)+ref+">"+alt)
elif len(ref) == 0: # INS (e.g. NM_007294.3:c.5533_5534insG)
start = pos-1
end = start+1
NC_HGVS = (refSeq+":g."+str(start)+"_"+str(end)+'ins'+alt)
elif len(alt) == 0: # DEL (e.g. NM_000527.5:c.1350_1355del)
start = pos
end = start+len(ref)-1
NC_HGVS = (refSeq+":g."+str(start)+"_"+str(end)+'del')
elif len(alt) != 0 and len(ref) != 0: # DELINS (e.g. NM_007294.3:c.5359_5363delinsAGTGA)
start = pos
end = start+len(ref)-1
NC_HGVS = (refSeq+":g."+str(start)+"_"+str(end)+'delins'+alt)

return process_NC_HGVS(str(NC_HGVS))


def normalize(variant):
print(f"submitted: {variant}")
if variant.upper().startswith('NM'):
if variant.count(':') == 3:
return process_NM_SPDI(variant)
else:
return process_NM_HGVS(variant)
elif variant.upper().startswith('NC'):
if variant.count(':') == 3:
return process_NC_SPDI(variant)
else:
return process_NC_HGVS(variant)
Loading

0 comments on commit 7bdafaa

Please sign in to comment.