From 02ff68645a1e0950791125671cd9e88ee2592762 Mon Sep 17 00:00:00 2001 From: Vincent Moens Date: Thu, 11 Jul 2024 09:27:39 +0100 Subject: [PATCH] [BugFix] Refactor map and map_iter (#869) --- .../unittest/linux/scripts/environment.yml | 1 + .github/unittest/linux/scripts/run_test.sh | 2 +- tensordict/persistent.py | 10 +++- test/test_tensordict.py | 50 ++++++++++++------- 4 files changed, 42 insertions(+), 21 deletions(-) diff --git a/.github/unittest/linux/scripts/environment.yml b/.github/unittest/linux/scripts/environment.yml index ca7bd067b..ad6da13ad 100644 --- a/.github/unittest/linux/scripts/environment.yml +++ b/.github/unittest/linux/scripts/environment.yml @@ -14,6 +14,7 @@ dependencies: - pytest-mock - pytest-instafail - pytest-rerunfailures + - pytest-timeout - expecttest - coverage - h5py diff --git a/.github/unittest/linux/scripts/run_test.sh b/.github/unittest/linux/scripts/run_test.sh index 9fc821efb..43f44eff8 100755 --- a/.github/unittest/linux/scripts/run_test.sh +++ b/.github/unittest/linux/scripts/run_test.sh @@ -19,6 +19,6 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$lib_dir export MKL_THREADING_LAYER=GNU coverage run -m pytest test/smoke_test.py -v --durations 20 -coverage run -m pytest --instafail -v --durations 20 +coverage run -m pytest --instafail -v --durations 20 --timeout 120 coverage run -m pytest ./benchmarks --instafail -v --durations 20 coverage xml -i diff --git a/tensordict/persistent.py b/tensordict/persistent.py index a2a528afc..db4e21a40 100644 --- a/tensordict/persistent.py +++ b/tensordict/persistent.py @@ -804,7 +804,13 @@ def map( initargs=(seed, queue, worker_threads), maxtasksperchild=max_tasks_per_child, ) as pool: - return self.map(fn, dim=dim, chunksize=chunksize, pool=pool) + return self.map( + fn, + dim=dim, + chunksize=chunksize, + pool=pool, + index_with_generator=index_with_generator, + ) num_workers = pool._processes dim_orig = dim if dim < 0: @@ -863,7 +869,7 @@ def newfn(item_and_out): for item in imap: if item is not None: if out is not None: - if chunksize: + if chunksize != 0: end = start + item.shape[dim] chunk = slice(start, end) out[chunk].update_(item) diff --git a/test/test_tensordict.py b/test/test_tensordict.py index cf633c0ce..be6490646 100644 --- a/test/test_tensordict.py +++ b/test/test_tensordict.py @@ -8827,6 +8827,7 @@ def test_map_seed(self): pytest.skip( reason="Using max_tasks_per_child is unstable and can cause multiple processes to start over even though all jobs are completed", ) + gc.collect() if mp.get_start_method(allow_none=True) is None: mp.set_start_method("spawn") @@ -8870,6 +8871,7 @@ def test_map_seed(self): ) def test_map_seed_single(self): + gc.collect() # A cheap version of the previous test if mp.get_start_method(allow_none=True) is None: mp.set_start_method("spawn") @@ -8916,35 +8918,42 @@ def test_map_seed_single(self): @pytest.mark.parametrize("h5", [False, True]) @pytest.mark.parametrize("has_out", [False, True]) def test_index_with_generator(self, chunksize, num_chunks, h5, has_out, tmpdir): + gc.collect() input = TensorDict({"a": torch.arange(10), "b": torch.arange(10)}, [10]) if h5: tmpdir = pathlib.Path(tmpdir) - input = input.to_h5(tmpdir / "file.h5") + input_h5 = input.to_h5(tmpdir / "file.h5") + assert input.shape == input_h5.shape + input = input_h5 if has_out: output_generator = torch.zeros_like(self.selectfn(input.to_tensordict())) output_split = torch.zeros_like(self.selectfn(input.to_tensordict())) else: output_generator = None output_split = None - output_generator = input.map( - self.selectfn, - num_workers=2, - index_with_generator=True, - num_chunks=num_chunks, - chunksize=chunksize, - out=output_generator, - ) - output_split = input.map( - self.selectfn, - num_workers=2, - index_with_generator=True, - num_chunks=num_chunks, - chunksize=chunksize, - out=output_split, - ) + with mp.get_context("fork").Pool(2) as pool: + output_generator = input.map( + self.selectfn, + num_workers=2, + index_with_generator=True, + num_chunks=num_chunks, + chunksize=chunksize, + out=output_generator, + pool=pool, + ) + output_split = input.map( + self.selectfn, + num_workers=2, + index_with_generator=True, + num_chunks=num_chunks, + chunksize=chunksize, + out=output_split, + pool=pool, + ) assert (output_generator == output_split).all() def test_map_unbind(self): + gc.collect() if mp.get_start_method(allow_none=True) is None: mp.set_start_method("spawn") td0 = TensorDict({"0": 0}, []) @@ -8961,6 +8970,7 @@ def _assert_is_memmap(data): @pytest.mark.parametrize("chunksize", [0, 5]) def test_map_inplace(self, chunksize): + gc.collect() if mp.get_start_method(allow_none=True) is None: mp.set_start_method("spawn") # Tests that we can return None values @@ -8978,6 +8988,7 @@ def selectfn(input): "start_method", [None, "spawn" if torch.cuda.is_available() else "fork"] ) def test_map_with_out(self, mmap, chunksize, tmpdir, start_method): + gc.collect() tmpdir = Path(tmpdir) input = TensorDict({"a": torch.arange(10), "b": torch.arange(10)}, [10]) if mmap: @@ -9001,7 +9012,8 @@ def nontensor_check(cls, td): ) return td - def test_non_tensor(self): + def test_map_non_tensor(self): + gc.collect() # with NonTensorStack td = TensorDict( {"tensor": torch.arange(10), "non_tensor": "a string!"}, batch_size=[10] @@ -9026,6 +9038,7 @@ def _return_identical(td): "chunksize,num_chunks", [[0, None], [11, None], [None, 11]] ) def test_map_iter(self, chunksize, num_chunks, shuffle): + gc.collect() torch.manual_seed(0) td = TensorDict( { @@ -9075,6 +9088,7 @@ def test_map_iter(self, chunksize, num_chunks, shuffle): "chunksize,num_chunks", [[0, None], [11, None], [None, 11]] ) def test_map_iter_interrupt_early(self, chunksize, num_chunks, shuffle): + gc.collect() torch.manual_seed(0) td = TensorDict( {