Skip to content

Commit

Permalink
adding redaction of cause number
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolassaw authored and newswim committed Oct 6, 2024
1 parent 9d4c89c commit 79d2966
Showing 1 changed file with 73 additions and 34 deletions.
107 changes: 73 additions & 34 deletions src/cleaner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,37 @@
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

class Cleaner:
# List of motions identified as evidentiary.
# TODO: These should be moved to a separate JSON in resources
GOOD_MOTIONS = [
"Motion To Suppress",
"Motion to Reduce Bond",
"Motion to Reduce Bond Hearing",
"Motion for Production",
"Motion For Speedy Trial",
"Motion for Discovery",
"Motion In Limine",
]

GOOD_MOTIONS = [
"Motion To Suppress",
"Motion to Reduce Bond",
"Motion to Reduce Bond Hearing",
"Motion for Production",
"Motion For Speedy Trial",
"Motion for Discovery",
"Motion In Limine",
]

class Cleaner:
def __init__(self):
pass

def redact_cause_number(self, input_dict: dict) -> str:
# This will hash and redact the cause number and then add it to the output file.
cause_number_hash = xxhash.xxh64(str(input_dict["code"])).hexdigest()
return cause_number_hash

def get_or_create_folder_path(self, county: str, folder_type: str) -> str:
"""Returns and ensures the existence of the folder path."""
folder_path = os.path.join(os.path.dirname(__file__), "..", "..", "data", county.lower(), folder_type)
folder_path = os.path.join(
os.path.dirname(__file__), "..", "..", "data", county.lower(), folder_type
)
try:
if not os.path.exists(folder_path):
os.makedirs(folder_path)
Expand All @@ -51,22 +62,24 @@ def load_and_map_charge_names(self, file_path: str) -> dict:
if not charge_data:
logging.error(f"Failed to load charge data from {file_path}")
raise FileNotFoundError(f"File not found or is empty: {file_path}")
# Create dictionary mapping charge names
# Create dictionary mapping charge names
try:
return {item['charge_name']: item for item in charge_data}
return {item["charge_name"]: item for item in charge_data}
except KeyError as e:
logging.error(f"Error in mapping charge names: {e}")
raise ValueError(f"Invalid data structure: {file_path}")

def process_charges(self, charges: list[dict], charge_mapping: dict) -> tuple[list[dict], str]:
def process_charges(
self, charges: list[dict], charge_mapping: dict
) -> tuple[list[dict], str]:
"""
Processes a list of charges by formatting charge details,
Processes a list of charges by formatting charge details,
mapping charges to UMich data, and finding the earliest charge date.
Args:
charges: A list of charges where each charge is a dictionary containing charge details.
charge_mapping: A dictionary mapping charge names to corresponding UMich data.
Returns:
tuple: A list of processed charges and the earliest charge date.
"""
Expand All @@ -86,7 +99,9 @@ def process_charges(self, charges: list[dict], charge_mapping: dict) -> tuple[li
try:
charge_datetime = dt.datetime.strptime(charge["date"], "%m/%d/%Y")
charge_dates.append(charge_datetime)
charge_dict["charge_date"] = dt.datetime.strftime(charge_datetime, "%Y-%m-%d")
charge_dict["charge_date"] = dt.datetime.strftime(
charge_datetime, "%Y-%m-%d"
)
except ValueError:
logging.error(f"Error parsing date for charge: {charge}")
continue
Expand All @@ -112,12 +127,18 @@ def process_charges(self, charges: list[dict], charge_mapping: dict) -> tuple[li
def contains_good_motion(self, motion: str, event: list | str) -> bool:
"""Recursively check if a motion exists in an event list or sublist."""
if isinstance(event, list):
return any(self.contains_good_motion(motion, item) for item in event)
return any(self.contains_good_motion(motion, item) for item in event)
return motion.lower() in event.lower()

def find_good_motions(self, events: list | str, good_motions: list[str]) -> list[str]:
def find_good_motions(
self, events: list | str, good_motions: list[str]
) -> list[str]:
"""Finds motions in events based on list of good motions."""
return [motion for motion in good_motions if self.contains_good_motion(motion, events)]
return [
motion
for motion in good_motions
if self.contains_good_motion(motion, events)
]

def hash_defense_attorney(self, input_dict: dict) -> str:
"""Hashes the defense attorney info to anonymize it."""
Expand All @@ -128,7 +149,6 @@ def hash_defense_attorney(self, input_dict: dict) -> str:
logging.error(f"Missing defense attorney data: {e}")
return ""


def write_json_output(self, file_path: str, data: dict) -> None:
"""Writes the given data to a JSON file at the specified file path."""
try:
Expand All @@ -138,7 +158,12 @@ def write_json_output(self, file_path: str, data: dict) -> None:
except OSError as e:
logging.error(f"Failed to write JSON output to {file_path}: {e}")

def process_single_case(self, case_json_folder_path: str, case_json_filename:str, cleaned_folder_path: str) -> None:
def process_single_case(
self,
case_json_folder_path: str,
case_json_filename: str,
cleaned_folder_path: str,
) -> None:
"""Process a single case JSON file."""
input_json_path = os.path.join(case_json_folder_path, case_json_filename)
input_dict = self.load_json_file(input_json_path)
Expand All @@ -158,23 +183,31 @@ def process_single_case(self, case_json_folder_path: str, case_json_filename:str
"motions": [],
"has_evidence_of_representation": False,
"defense_attorney": self.hash_defense_attorney(input_dict),
"parsing_date": dt.datetime.today().strftime('%Y-%m-%d')
"parsing_date": dt.datetime.today().strftime("%Y-%m-%d"),
}

# Load charge mappings
charge_name_to_umich_file = os.path.join(
os.path.dirname(__file__), "..", "..", "resources", "umich-uccs-database.json"
os.path.dirname(__file__),
"..",
"..",
"resources",
"umich-uccs-database.json",
)
charges_mapped = self.load_and_map_charge_names(charge_name_to_umich_file)

# Process charges and motions
output_json_data["charges"], output_json_data["earliest_charge_date"] = self.process_charges(
input_dict["charge information"], charges_mapped
output_json_data["charges"], output_json_data["earliest_charge_date"] = (
self.process_charges(input_dict["charge information"], charges_mapped)
)
output_json_data["motions"] = self.find_good_motions(
input_dict["other events and hearings"], self.GOOD_MOTIONS
input_dict["other events and hearings"], GOOD_MOTIONS
)
output_json_data["has_evidence_of_representation"] = (
len(output_json_data["motions"]) > 0
)
output_json_data["has_evidence_of_representation"] = len(output_json_data["motions"]) > 0

output_json_data["cause_number_redacted"] = self.redact_cause_number(input_dict)

# Write output to file
output_filepath = os.path.join(cleaned_folder_path, case_json_filename)
Expand All @@ -187,13 +220,17 @@ def process_json_files(self, county: str, case_json_folder_path: str) -> None:
except (FileNotFoundError, Exception) as e:
logging.error(f"Error reading directory {case_json_folder_path}: {e}")
return

# Ensure the case_json_cleaned folder exists
cleaned_folder_path = self.get_or_create_folder_path(county, "case_json_cleaned")
cleaned_folder_path = self.get_or_create_folder_path(
county, "case_json_cleaned"
)

for case_json_filename in list_case_json_files:
try:
self.process_single_case(case_json_folder_path, case_json_filename, cleaned_folder_path)
self.process_single_case(
case_json_folder_path, case_json_filename, cleaned_folder_path
)
except Exception as e:
logging.error(f"Error processing file {case_json_filename}. Error: {e}")

Expand All @@ -214,4 +251,6 @@ def clean(self, county: str) -> None:
self.process_json_files(county, case_json_folder_path)
logging.info(f"Completed processing for county: {county}")
except Exception as e:
logging.error(f"Error during cleaning process for county: {county}. Error: {e}")
logging.error(
f"Error during cleaning process for county: {county}. Error: {e}"
)

0 comments on commit 79d2966

Please sign in to comment.