Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throughput logger - log per epoch #799

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 112 additions & 15 deletions tests/framework/callbacks/test_throughput_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# pyre-strict

import unittest
from unittest.mock import ANY, call, MagicMock
from unittest.mock import ANY, call, MagicMock, patch

import torch
from pyre_extensions import none_throws
Expand All @@ -17,13 +17,15 @@
from torchtnt.framework._test_utils import (
DummyAutoUnit,
DummyPredictUnit,
DummyTrainUnit,
generate_random_dataloader,
)
from torchtnt.framework.callbacks.throughput_logger import ThroughputLogger
from torchtnt.framework.predict import predict

from torchtnt.framework.state import EntryPoint, PhaseState, State
from torchtnt.framework.train import _train_impl
from torchtnt.framework.state import ActivePhase, EntryPoint, PhaseState, State
from torchtnt.framework.train import _train_impl, train
from torchtnt.framework.unit import TrainUnit
from torchtnt.utils.loggers.logger import MetricLogger


Expand Down Expand Up @@ -121,21 +123,18 @@ def test_with_comparing_time(self) -> None:
evaluate_every_n_epochs=2,
),
)
throughput_logger = ThroughputLogger(
logger=logger,
throughput_per_batch={"Batches": 1, "Queries": 8},
log_every_n_steps=1,
)

# we want to be able to compare the logging value to the state, so we need to create state manually and
# call _train_impl. This would have been similar to calling fit() and getting the state as a ret value
_train_impl(
state,
DummyAutoUnit(module=torch.nn.Linear(2, 2)),
CallbackHandler(
[
ThroughputLogger(
logger=logger,
throughput_per_batch={"Batches": 1, "Queries": 8},
log_every_n_steps=1,
)
],
),
CallbackHandler([throughput_logger]),
)

train_iteration_times = none_throws(
Expand Down Expand Up @@ -163,8 +162,8 @@ def test_with_comparing_time(self) -> None:
eval_iteration_times[i] + eval_twfb_times[i] for i in range(2)
]
self.assertEqual(
logger.log.call_count, 12
) # 8 train (2epochs x 2steps x 2items), 4 eval (1x2x2)
logger.log.call_count, 18
) # steps: 8 train (2epochs x 2steps x 2items), 4 eval (1x2x2). epochs: 4 train (2epoch x 2items). 2 eval (1x2)
train_batches_step_logs = [
call(
"Train: Batches per second (step granularity)",
Expand Down Expand Up @@ -197,11 +196,36 @@ def test_with_comparing_time(self) -> None:
)
for i in range(2)
]
# for epoch, we test the logged value separately
train_batches_epoch_logs = [
call("Train: Batches per second (epoch granularity)", ANY, i)
for i in range(1, 3)
]
train_queries_epoch_logs = [
call("Train: Queries per second (epoch granularity)", ANY, i)
for i in range(1, 3)
]
eval_epoch_logs = [
call(
"Eval: Queries per second (epoch granularity)",
ANY,
1,
),
call(
"Eval: Batches per second (epoch granularity)",
ANY,
1,
),
]

logger.log.assert_has_calls(
train_batches_step_logs
+ train_queries_step_logs
+ eval_batches_step_logs
+ eval_queries_step_logs,
+ eval_queries_step_logs
+ train_batches_epoch_logs
+ train_queries_epoch_logs
+ eval_epoch_logs,
any_order=True,
)

Expand All @@ -227,6 +251,79 @@ def test_with_predict(self) -> None:
1,
)
],
[
call(
"Predict: Batches per second (epoch granularity)",
ANY,
1,
)
],
)

def test_log_for_epoch(self) -> None:
logger = MagicMock(spec=MetricLogger)
unit = DummyTrainUnit(input_dim=2)
throughput_logger = ThroughputLogger(logger, {"Batches": 1, "Queries": 8})
state = State(entry_point=EntryPoint.TRAIN)

self.assertIsNone(throughput_logger._epoch_start_times.get(ActivePhase.TRAIN))
self.assertEqual(throughput_logger._steps_in_epoch[ActivePhase.TRAIN], 0)
with patch.object(throughput_logger, "_maybe_log_for_step"):
throughput_logger.on_train_step_end(state, unit)
self.assertEqual(throughput_logger._steps_in_epoch[ActivePhase.TRAIN], 1)

with patch("time.perf_counter", return_value=0.5):
throughput_logger.on_train_epoch_start(state, MagicMock(spec=TrainUnit))
self.assertEqual(throughput_logger._epoch_start_times[ActivePhase.TRAIN], 0.5)

throughput_logger._steps_in_epoch[ActivePhase.TRAIN] = (
2 # to assume there were two steps in the epoch
)
logger.log.reset_mock()
with patch("time.perf_counter", return_value=0.6):
throughput_logger._log_for_epoch(state, epoch_logging_for=15)

logger.log.assert_has_calls(
[
call(
"Train: Batches per second (epoch granularity)",
(1 * 2) / (0.6 - 0.5),
15,
),
call(
"Train: Queries per second (epoch granularity)",
(8 * 2) / (0.6 - 0.5),
15,
),
]
)

def test_epoch_logging_time(self) -> None:
logger = MagicMock(spec=MetricLogger)
throughput_logger = ThroughputLogger(logger, {"Queries": 4})
with patch("time.perf_counter", side_effect=[0.1, 0.5, 0.8, 1.5]):
train(
DummyTrainUnit(input_dim=2),
generate_random_dataloader(num_samples=16, input_dim=2, batch_size=4),
max_epochs=2,
max_steps_per_epoch=2,
callbacks=[throughput_logger],
)

logger.log.assert_has_calls(
[
call(
"Train: Queries per second (epoch granularity)",
(4 * 2) / (0.5 - 0.1),
1,
),
call(
"Train: Queries per second (epoch granularity)",
(4 * 2) / (1.5 - 0.8),
2,
),
],
any_order=True,
)

def test_input_validation(self) -> None:
Expand Down
58 changes: 56 additions & 2 deletions torchtnt/framework/callbacks/throughput_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
# pyre-strict


from typing import Mapping
import time
from collections import defaultdict
from typing import Dict, Mapping

from pyre_extensions import none_throws

Expand All @@ -32,9 +34,10 @@
class ThroughputLogger(Callback):
"""
A callback which logs the train/eval/predict/fit throughput. For instance, it can be used to log QPS and number of batches processed per second.
The callback logs the throughput on a step basis.
The callback logs the throughput on a step basis and on an epoch basis.
We measure the throughput by dividing the number of batches processed (times the number of items in batch) by the time it took to process the batch:
On a step granularity, we do this by leveraging the already collected timers for the iteration time and data wait time.
On an epoch granularity, we measure the time between on_train_epoch_start and on_train_epoch_end on this callback to calculate the throughput.

Args:
logger: A a subclass of :class:`torchtnt.utils.loggers.logger.MetricLogger`.
Expand All @@ -46,6 +49,7 @@ class ThroughputLogger(Callback):

Note:
The values reported are only for rank 0.
For more accurate measurement of epoch throughput, it is recommended to place this callback at the end of the callback list.
"""

def __init__(
Expand Down Expand Up @@ -73,12 +77,15 @@ def __init__(
)

self._log_every_n_steps = log_every_n_steps
self._epoch_start_times: Dict[ActivePhase, float] = {}
self._steps_in_epoch: Dict[ActivePhase, int] = defaultdict(int)

def on_train_step_end(self, state: State, unit: TTrainUnit) -> None:
self._maybe_log_for_step(
state,
unit.train_progress.num_steps_completed - 1,
)
self._steps_in_epoch[ActivePhase.TRAIN] += 1

def on_train_end(self, state: State, unit: TTrainUnit) -> None:
self._maybe_log_for_step(
Expand All @@ -92,13 +99,18 @@ def on_eval_step_end(self, state: State, unit: TEvalUnit) -> None:
state,
unit.eval_progress.num_steps_completed - 1,
)
self._steps_in_epoch[ActivePhase.EVALUATE] += 1

def on_eval_end(self, state: State, unit: TEvalUnit) -> None:
self._maybe_log_for_step(
state,
unit.eval_progress.num_steps_completed,
is_step_end_hook=False,
)
self._log_for_epoch(
state,
epoch_logging_for=unit.eval_progress.num_epochs_completed,
)

def on_predict_step_end(self, state: State, unit: TPredictUnit) -> None:
self._maybe_log_for_step(
Expand All @@ -112,6 +124,25 @@ def on_predict_end(self, state: State, unit: TPredictUnit) -> None:
unit.predict_progress.num_steps_completed,
is_step_end_hook=False,
)
self._log_for_epoch(
state,
epoch_logging_for=unit.predict_progress.num_epochs_completed,
)

def on_train_epoch_start(self, state: State, unit: TTrainUnit) -> None:
self._epoch_start_times[ActivePhase.TRAIN] = time.perf_counter()

def on_train_epoch_end(self, state: State, unit: TTrainUnit) -> None:
self._log_for_epoch(
state,
epoch_logging_for=unit.train_progress.num_epochs_completed,
)

def on_eval_start(self, state: State, unit: TEvalUnit) -> None:
self._epoch_start_times[ActivePhase.EVALUATE] = time.perf_counter()

def on_predict_start(self, state: State, unit: TPredictUnit) -> None:
self._epoch_start_times[ActivePhase.PREDICT] = time.perf_counter()

def _maybe_log_for_step(
self,
Expand Down Expand Up @@ -154,3 +185,26 @@ def _maybe_log_for_step(
num_items / total_time,
step_logging_for,
)

def _log_for_epoch(
self,
state: State,
*,
epoch_logging_for: int,
) -> None:
time_since_epoch_start = (
time.perf_counter() - self._epoch_start_times[state.active_phase]
)

steps_in_epoch = self._steps_in_epoch[state.active_phase]
if steps_in_epoch <= 0:
return

for item, num_items in self._throughput_per_batch.items():
self._logger.log(
f"{ACTIVE_PHASE_TO_LABEL_PREFIX[state.active_phase]}: {item} per second (epoch granularity)",
(num_items * steps_in_epoch) / time_since_epoch_start,
epoch_logging_for,
)

self._steps_in_epoch[state.active_phase] = 0
Loading