Skip to content

Commit

Permalink
Shireenk/anderson reyes/wait for job ready flag (#263)
Browse files Browse the repository at this point in the history
* add support for waiting until streaming pipeline reaches running state
* also added tests
* fix comments and lint issues
* use fstrings, remove typings, add docs
* update tests arguments to wait_until_pipeline_running from wait_until_pipeline_complete
* Fix testing associated with @anersonreyes changes

Co-authored-by: Anderson Reyes <reyesanderson428@gmail.com>
  • Loading branch information
shireen-bean and AndersonReyes authored Mar 23, 2022
1 parent 5a86bd8 commit e625565
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 7 deletions.
2 changes: 2 additions & 0 deletions core/src/klio_core/config/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,11 @@ class KlioJobConfig(object):
version = utils.field(type=int)

# optional attributes

allow_non_klio_messages = utils.field(type=bool, default=False)
metrics = utils.field(default={})
blocking = utils.field(type=bool, default=False)
wait_for_pipeline_running = utils.field(type=bool, default=False)

def __config_post_init__(self, config_dict):
self._raw = config_dict
Expand Down
2 changes: 2 additions & 0 deletions core/tests/config/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def job_config_dict():
},
"more": "config",
"that": {"the": "user"},
"wait_for_pipeline_running": False,
"might": ["include"],
"blocking": False,
}
Expand Down Expand Up @@ -103,6 +104,7 @@ def final_job_config_dict():
},
"more": "config",
"that": {"the": "user"},
"wait_for_pipeline_running": False,
"might": ["include"],
"blocking": False,
"allow_non_klio_messages": False,
Expand Down
10 changes: 9 additions & 1 deletion docs/src/userguide/config/job_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ Klio-specific and :ref:`user-specified custom <custom-conf>` job configuration.

**Default**: ``False``

.. option:: job_config.wait_for_pipeline_running BOOL

Wait for Streaming job to reach running state and then exit. klio pools every
minute for 10 minutes max. If The pipeline has not reached running status after
10 minutes, klio fails. If the pipeline reaches a terminal non-running state,
klio logs an error and exits.

**Default**: ``False``

.. _custom-conf:
.. option:: job_config.<additional_key> ANY
Expand Down Expand Up @@ -273,7 +281,7 @@ client depends on the runner:
.. caution::

When running on Dataflow, in order for the Native metrics client to be able to report metrics to Stackdriver,
When running on Dataflow, in order for the Native metrics client to be able to report metrics to Stackdriver,
the following ``experiment`` must be added to ``klio-job.yaml``:

.. code-block:: yaml
Expand Down
58 changes: 52 additions & 6 deletions exec/src/klio_exec/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import logging
import os
import re
import time

import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.runners import runner

from klio import __version__ as klio_lib_version
from klio import transforms
Expand Down Expand Up @@ -555,6 +557,39 @@ def _setup_pipeline(self, pipeline):
**output_config.to_io_kwargs()
)

def wait_for_pipeline_running(
self,
pipeline_result,
timeout_sec=10 * 60, # 10 minutes in seconds
poll_interval_sec=60,
):
status = None

for _ in range(0, timeout_sec, poll_interval_sec):
try:
status = pipeline_result.state
if status == runner.PipelineState.RUNNING:
logging.info(f"Pipeline status is {status}, done waiting")
return status
elif runner.PipelineState.is_terminal(status):
logging.error("Pipeline already in terminal status")
return status

logging.info(
f"Pipeline status {status} is not "
f"{runner.PipelineState.RUNNING}, retrying in "
f"{poll_interval_sec} seconds"
)
except Exception as e:
logging.exception(e)

time.sleep(poll_interval_sec)

raise TimeoutError(
f"Pipeline finished in status {status} "
f"but expected {runner.PipelineState.RUNNING}"
)

def run(self):
self._verify_packaging()
options = self._get_pipeline_options()
Expand All @@ -581,19 +616,30 @@ def run(self):
self.runtime_conf = self.runtime_conf._replace(update=None)
return self.run()

logging.error("Error running pipeline: %s" % e)
logging.error(f"Error running pipeline: {e}")
raise SystemExit(1)

# TODO: update me to `var.KlioRunner.DIRECT_GKE_RUNNER` once
# direct_on_gke_runner_clean is merged
is_direct_gke = (
self.config.pipeline_options.runner == "DirectGKERunner"
)
should_block = (
self.runtime_conf.direct_runner,
self.runtime_conf.blocking,
is_direct_gke,
should_block = any(
(
self.runtime_conf.direct_runner,
self.runtime_conf.blocking,
is_direct_gke,
)
)
if any(should_block):

if should_block:
# the pipeline on direct runner will otherwise get garbage collected
result.wait_until_finish()

# If the blocking flag was already passed don't wait again
if (
self.config.job_config.wait_for_pipeline_running
and self.config.pipeline_options.streaming
and not should_block
):
self.wait_for_pipeline_running(result)
23 changes: 23 additions & 0 deletions exec/tests/unit/commands/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,9 @@ def test_get_pipeline_options(
)
@pytest.mark.parametrize("blocking", (True, False))
@pytest.mark.parametrize("streaming", (True, False))
@pytest.mark.parametrize("wait_until_pipeline_running", (True, False))
def test_run_pipeline(
wait_until_pipeline_running,
streaming,
blocking,
direct_runner,
Expand All @@ -659,6 +661,9 @@ def test_run_pipeline(
direct_runner=direct_runner, update=True, blocking=blocking
)

mock_wait_until_pipeline_running = mocker.patch.object(
run.KlioPipeline, "wait_for_pipeline_running", autospec=True
)
mock_verify_packaging = mocker.Mock()
mock_get_run_callable = mocker.Mock()
mock_run_callable = mocker.Mock()
Expand Down Expand Up @@ -711,6 +716,13 @@ def test_run_pipeline(
config.job_config.events.inputs = [mock_input]
config.job_config.events.outputs = [mock_output]

if streaming:
config.job_config.wait_for_pipeline_running = (
wait_until_pipeline_running
)
else:
config.job_config.wait_for_pipeline_running = False

if run_error:
mock_pipeline.return_value.run.side_effect = [
run_error,
Expand Down Expand Up @@ -751,6 +763,17 @@ def test_run_pipeline(
result = mock_pipeline.return_value.run.return_value
result.wait_until_finish.assert_called_once_with()

if (
streaming
and not blocking
and wait_until_pipeline_running
and not direct_runner
and config.pipeline_options.runner != "DirectGKERunner"
):
mock_wait_until_pipeline_running.assert_called_once()
else:
mock_wait_until_pipeline_running.assert_not_called()


@pytest.mark.parametrize(
"update,value_err_msg",
Expand Down

0 comments on commit e625565

Please sign in to comment.