diff --git a/src/prefect/cli/shell.py b/src/prefect/cli/shell.py index 6ca13f50bf6c..6837e5d00950 100644 --- a/src/prefect/cli/shell.py +++ b/src/prefect/cli/shell.py @@ -8,7 +8,7 @@ import subprocess import sys import threading -from typing import List, Optional +from typing import Any, Dict, List, Optional import typer from typing_extensions import Annotated @@ -62,6 +62,7 @@ def run_shell_process( log_output: bool = True, stream_stdout: bool = False, log_stderr: bool = False, + popen_kwargs: Optional[Dict[str, Any]] = None, ): """ Asynchronously executes the specified shell command and logs its output. @@ -70,28 +71,32 @@ def run_shell_process( It handles both the execution of the command and the collection of its output for logging purposes. Args: - command (str): The shell command to execute. - log_output (bool, optional): If True, the output of the command (both stdout and stderr) is logged to Prefect. - Defaults to True - stream_stdout (bool, optional): If True, the stdout of the command is streamed to Prefect logs. Defaults to False. - log_stderr (bool, optional): If True, the stderr of the command is logged to Prefect logs. Defaults to False. - + command: The shell command to execute. + log_output: If True, the output of the command (both stdout and stderr) is logged to Prefect. + stream_stdout: If True, the stdout of the command is streamed to Prefect logs. + log_stderr: If True, the stderr of the command is logged to Prefect logs. + popen_kwargs: Additional keyword arguments to pass to the `subprocess.Popen` call. """ logger = get_run_logger() if log_output else logging.getLogger("prefect") + # Default Popen kwargs that can be overridden + kwargs = { + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + "shell": True, + "text": True, + "bufsize": 1, + "universal_newlines": True, + } + + if popen_kwargs: + kwargs |= popen_kwargs + # Containers for log batching stdout_container, stderr_container = [], [] - with subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - text=True, - bufsize=1, - universal_newlines=True, - ) as proc: + with subprocess.Popen(command, **kwargs) as proc: # Create threads for collecting stdout and stderr if stream_stdout: stdout_logger = logger.info diff --git a/tests/cli/test_shell.py b/tests/cli/test_shell.py index c85db78a03b8..0a209ecb4cfc 100644 --- a/tests/cli/test_shell.py +++ b/tests/cli/test_shell.py @@ -1,5 +1,7 @@ +import os from unittest.mock import AsyncMock, patch +from prefect.cli.shell import run_shell_process from prefect.testing.cli import invoke_and_assert from prefect.utilities.asyncutils import run_sync_in_worker_thread @@ -145,3 +147,33 @@ async def test_shell_runner_integration(monkeypatch): ) runner_mock.assert_awaited_once_with(run_once=True) + + +class TestRunShellProcess: + def test_run_shell_process_basic(self, tmp_path): + """Test basic command execution""" + test_file = tmp_path / "test.txt" + run_shell_process(f"touch {test_file}") + assert test_file.exists() + + def test_run_shell_process_with_cwd(self, tmp_path): + """Test command execution with custom working directory""" + subdir = tmp_path / "subdir" + subdir.mkdir() + test_file = "test.txt" + + run_shell_process(f"touch {test_file}", popen_kwargs={"cwd": str(subdir)}) + + assert (subdir / test_file).exists() + + def test_run_shell_process_with_env(self, tmp_path): + """Test command execution with custom environment variables""" + custom_env = os.environ.copy() + custom_env["TEST_VAR"] = "hello" + + run_shell_process( + "echo $TEST_VAR > output.txt", + popen_kwargs={"env": custom_env, "cwd": str(tmp_path)}, + ) + + assert (tmp_path / "output.txt").read_text().strip() == "hello"