Skip to content

Commit

Permalink
Merge pull request #233 from niaid/hpc_fixes
Browse files Browse the repository at this point in the history
Hpc fixes
  • Loading branch information
philipmac authored May 25, 2023
2 parents 47b2c20 + ad0ea0b commit 71d8ea1
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 31 deletions.
27 changes: 18 additions & 9 deletions em_workflows/brt/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import os
from em_workflows.file_path import FilePath
import subprocess
import re
import math

from pathlib import Path
Expand Down Expand Up @@ -127,10 +126,10 @@ def gen_dimension_command(file_path: FilePath, ali_or_rec: str) -> str:
stderr = sp.stderr.decode("utf-8")
msg = f"Command ok : {stderr} -- {stdout}"
utils.log(msg)
xyz_dim = re.split(" +(\d+)", stdout)
z_dim = xyz_dim[5]
xyz_dim = [int(x) for x in stdout.split()]
z_dim = xyz_dim[2]
utils.log(f"z_dim: {z_dim:}")
return z_dim
return str(z_dim)


@task
Expand Down Expand Up @@ -531,12 +530,12 @@ def cleanup_files(file_path: FilePath, pattern=str):
tilt_movie_assets = gen_tilt_movie.map(
file_path=fps, upstream_tasks=[keyimg_assets]
)
cleanup_files.map(
clean_align_mrc = cleanup_files.map(
file_path=fps,
pattern=unmapped("*_align_*.mrc"),
upstream_tasks=[tilt_movie_assets, thumb_assets, keyimg_assets],
)
cleanup_files.map(
clean_ali_jpg = cleanup_files.map(
file_path=fps,
pattern=unmapped("*ali*.jpg"),
upstream_tasks=[tilt_movie_assets, thumb_assets, keyimg_assets],
Expand All @@ -557,12 +556,12 @@ def cleanup_files(file_path: FilePath, pattern=str):
file_path=fps, upstream_tasks=[averagedVolume_assets]
)
recon_movie_assets = gen_recon_movie.map(file_path=fps, upstream_tasks=[ave_jpgs])
cleanup_files.map(
clean_mp4 = cleanup_files.map(
file_path=fps,
pattern=unmapped("*_mp4.*.jpg"),
upstream_tasks=[recon_movie_assets, ave_jpgs],
)
cleanup_files.map(
clean_ave_mrc = cleanup_files.map(
file_path=fps,
pattern=unmapped("*_ave*.mrc"),
upstream_tasks=[recon_movie_assets, ave_jpgs],
Expand Down Expand Up @@ -615,13 +614,23 @@ def cleanup_files(file_path: FilePath, pattern=str):
cp_wd_to_assets = utils.copy_workdirs.map(
fps, upstream_tasks=[callback_with_tilt_mov]
)
rm_workdirs = utils.cleanup_workdir.map(fps, upstream_tasks=[cp_wd_to_assets])
# finally filter error states, and convert to JSON and send.
filtered_callback = utils.filter_results(callback_with_tilt_mov)

cb = utils.send_callback_body(
token=token, callback_url=callback_url, files_elts=filtered_callback
)
rm_workdirs = utils.cleanup_workdir(
fps,
upstream_tasks=[
cb,
cp_wd_to_assets,
clean_align_mrc,
clean_ali_jpg,
clean_ave_mrc,
clean_mp4,
],
)

# the other tasks might be always run or something,
# this is far enough along to get an idea of success.
Expand Down
5 changes: 3 additions & 2 deletions em_workflows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ def SLURM_exec():
name="dask-worker",
cores=60,
memory="32G",
# processes=1,
processes=1,
death_timeout=121,
local_directory="/gs1/home/macmenaminpe/tmp/",
queue="gpu",
walltime="4:00:00",
job_extra_directives=["--gres=gpu:1"],
)
cluster.adapt(minimum=1, maximum=6)
cluster.scale(1)
# cluster.adapt(minimum=1, maximum=6)
logging = prefect.context.get("logger")
logging.debug("Dask cluster started")
logging.debug(f"see dashboard {cluster.dashboard_link}")
Expand Down
4 changes: 1 addition & 3 deletions em_workflows/dm_conversion/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,7 @@ def scale_jpegs(file_path: FilePath, size: str) -> Optional[dict]:
prim_fp=callback_with_thumbs, asset=keyimg_assets
)
# finally filter error states, and convert to JSON and send.
rm_workdirs = utils.cleanup_workdir.map(
fp=fps, upstream_tasks=[callback_with_keyimgs]
)
rm_workdirs = utils.cleanup_workdir(fps, upstream_tasks=[callback_with_keyimgs])
filtered_callback = utils.filter_results(callback_with_keyimgs)

callback_sent = utils.send_callback_body(
Expand Down
13 changes: 4 additions & 9 deletions em_workflows/file_path.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
import glob
import shutil
import os
from typing import List, Dict
Expand Down Expand Up @@ -142,10 +141,10 @@ def copy_to_assets_dir(self, fp_to_cp: Path) -> Path:
if fp_to_cp.is_dir():
if dest.exists():
shutil.rmtree(dest)
shutil.copytree(fp_to_cp, dest)
d = shutil.copytree(fp_to_cp, dest)
else:
shutil.copyfile(fp_to_cp, dest)
return dest
d = shutil.copyfile(fp_to_cp, dest)
return Path(d)

# def add_assets_entry(
# self, asset_path: Path, asset_type: str, metadata: Dict[str, str] = None
Expand Down Expand Up @@ -261,10 +260,6 @@ def copy_workdir_to_assets(self) -> Path:
dest = Path(
f"{self.assets_dir.as_posix()}/{dir_name_as_date}/{self.fp_in.stem}"
)
existing_workdirs = glob.glob(f"{self.assets_dir.as_posix()}/work_dir_*")
for _dir in existing_workdirs:
log(f"Trying to remove old workdir {_dir}")
shutil.rmtree(_dir)
if dest.exists():
log(f"Output assets directory already exists! removing: {dest}")
shutil.rmtree(dest)
Expand All @@ -274,7 +269,7 @@ def copy_workdir_to_assets(self) -> Path:
def rm_workdir(self):
"""Removes the the entire working directory"""
log(f"Removing working dir: {self.working_dir}")
shutil.rmtree(self.working_dir)
shutil.rmtree(self.working_dir, ignore_errors=True)

@staticmethod
def run(cmd: List[str], log_file: str) -> int:
Expand Down
4 changes: 1 addition & 3 deletions em_workflows/sem_tomo/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,7 @@ def gen_keyimg_small(fp_in: FilePath) -> Dict:
callback_with_corr_movies = utils.add_asset.map(
prim_fp=callback_with_corr_mrcs, asset=corrected_movie_assets
)
rm_workdirs = utils.cleanup_workdir.map(
fp=fps, upstream_tasks=[callback_with_corr_movies]
)
rm_workdirs = utils.cleanup_workdir(fps, upstream_tasks=[callback_with_corr_movies])

# finally filter error states, and convert to JSON and send.
filtered_callback = utils.filter_results(callback_with_corr_movies)
Expand Down
11 changes: 6 additions & 5 deletions em_workflows/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,20 @@ def add_asset(prim_fp: dict, asset: dict) -> dict:


@task(max_retries=3, retry_delay=datetime.timedelta(seconds=10), trigger=always_run)
def cleanup_workdir(fp: FilePath):
def cleanup_workdir(fps: List[FilePath]):
"""
:param fp: a FilePath which has a working_dir to be removed
| working_dir isn't needed after run, so rm.
| task wrapper on the FilePath rm_workdir method.
"""
if context.parameters["keep_workdir"] is not True:
log(f"Trying to remove {fp.working_dir}")
fp.rm_workdir()
else:
if prefect.context.parameters.get("keep_workdir") is True:
log("keep_workdir is set to True, skipping removal.")
else:
for fp in fps:
log(f"Trying to remove {fp.working_dir}")
fp.rm_workdir()


# @task
Expand Down

0 comments on commit 71d8ea1

Please sign in to comment.