Skip to content

Commit

Permalink
pass through kwargs for run_shell_process (#15817)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Oct 28, 2024
1 parent 06f0683 commit f029c5b
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 16 deletions.
37 changes: 21 additions & 16 deletions src/prefect/cli/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import subprocess
import sys
import threading
from typing import List, Optional
from typing import Any, Dict, List, Optional

import typer
from typing_extensions import Annotated
Expand Down Expand Up @@ -62,6 +62,7 @@ def run_shell_process(
log_output: bool = True,
stream_stdout: bool = False,
log_stderr: bool = False,
popen_kwargs: Optional[Dict[str, Any]] = None,
):
"""
Asynchronously executes the specified shell command and logs its output.
Expand All @@ -70,28 +71,32 @@ def run_shell_process(
It handles both the execution of the command and the collection of its output for logging purposes.
Args:
command (str): The shell command to execute.
log_output (bool, optional): If True, the output of the command (both stdout and stderr) is logged to Prefect.
Defaults to True
stream_stdout (bool, optional): If True, the stdout of the command is streamed to Prefect logs. Defaults to False.
log_stderr (bool, optional): If True, the stderr of the command is logged to Prefect logs. Defaults to False.
command: The shell command to execute.
log_output: If True, the output of the command (both stdout and stderr) is logged to Prefect.
stream_stdout: If True, the stdout of the command is streamed to Prefect logs.
log_stderr: If True, the stderr of the command is logged to Prefect logs.
popen_kwargs: Additional keyword arguments to pass to the `subprocess.Popen` call.
"""

logger = get_run_logger() if log_output else logging.getLogger("prefect")

# Default Popen kwargs that can be overridden
kwargs = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"shell": True,
"text": True,
"bufsize": 1,
"universal_newlines": True,
}

if popen_kwargs:
kwargs |= popen_kwargs

# Containers for log batching
stdout_container, stderr_container = [], []
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
text=True,
bufsize=1,
universal_newlines=True,
) as proc:
with subprocess.Popen(command, **kwargs) as proc:
# Create threads for collecting stdout and stderr
if stream_stdout:
stdout_logger = logger.info
Expand Down
32 changes: 32 additions & 0 deletions tests/cli/test_shell.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
from unittest.mock import AsyncMock, patch

from prefect.cli.shell import run_shell_process
from prefect.testing.cli import invoke_and_assert
from prefect.utilities.asyncutils import run_sync_in_worker_thread

Expand Down Expand Up @@ -145,3 +147,33 @@ async def test_shell_runner_integration(monkeypatch):
)

runner_mock.assert_awaited_once_with(run_once=True)


class TestRunShellProcess:
def test_run_shell_process_basic(self, tmp_path):
"""Test basic command execution"""
test_file = tmp_path / "test.txt"
run_shell_process(f"touch {test_file}")
assert test_file.exists()

def test_run_shell_process_with_cwd(self, tmp_path):
"""Test command execution with custom working directory"""
subdir = tmp_path / "subdir"
subdir.mkdir()
test_file = "test.txt"

run_shell_process(f"touch {test_file}", popen_kwargs={"cwd": str(subdir)})

assert (subdir / test_file).exists()

def test_run_shell_process_with_env(self, tmp_path):
"""Test command execution with custom environment variables"""
custom_env = os.environ.copy()
custom_env["TEST_VAR"] = "hello"

run_shell_process(
"echo $TEST_VAR > output.txt",
popen_kwargs={"env": custom_env, "cwd": str(tmp_path)},
)

assert (tmp_path / "output.txt").read_text().strip() == "hello"

0 comments on commit f029c5b

Please sign in to comment.