Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A process in the process pool was terminated abruptly while the future was running or pending. #8233

Open
fancy45daddy opened this issue Oct 8, 2024 · 0 comments

Comments

@fancy45daddy
Copy link

❓ Questions and Help

I want to run pytorch xla on kaggle tpu v3-8 and use all core in tpu. But I always get A process in the process pool was terminated abruptly while the future was running or pending.

Source code:

%%bash
python3 -m pip install -U imageio[ffmpeg] controlnet-aux openmim
mim install mmengine mmcv mmdet mmpose
curl -O https://raw.githubusercontent.com/huggingface/controlnet_aux/master/src/controlnet_aux/dwpose/dwpose_config/dwpose-l_384x288.py
curl -O https://raw.githubusercontent.com/huggingface/controlnet_aux/master/src/controlnet_aux/dwpose/yolox_config/yolox_l_8xb8-300e_coco.py

import torch_xla, diffusers, builtins, imageio, os, PIL.Image, controlnet_aux, sys, torch
os.environ.pop('TPU_PROCESS_ADDRESSES')

reader = imageio.get_reader('/kaggle/input/controlnet/pose.mp4', 'ffmpeg')
openpose = controlnet_aux.DWposeDetector(det_config='yolox_l_8xb8-300e_coco.py', pose_config='dwpose-l_384x288.py')
poses = [openpose(PIL.Image.fromarray(reader.get_data(_)).resize((512, 768))) for _ in builtins.range(16)] #reader.count_frames()
length = builtins.len(poses) // 8
fps = reader.get_meta_data().get('fps')

def process(index):
    controlnet = diffusers.ControlNetModel.from_pretrained('lllyasviel/control_v11p_sd15_openpose', torch_dtype=torch.bfloat16)
    pipeline = diffusers.StableDiffusionControlNetPipeline.from_single_file('https://huggingface.co/chaowenguo/pal/blob/main/chilloutMix-Ni.safetensors', config='chaowenguo/stable-diffusion-v1-5', safety_checker=None, controlnet=controlnet, use_safetensors=True, torch_dtype=torch.bfloat16).to(torch_xla.core.xla_model.xla_device())
    pipeline.scheduler = diffusers.DPMSolverMultistepScheduler.from_config(pipeline.scheduler.config)
    pipeline.enable_attention_slicing()
    pipeline.unet.set_attn_processor(diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.CrossFrameAttnProcessor())
    pipeline.controlnet.set_attn_processor(diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.CrossFrameAttnProcessor())
    pose = sys.modules['__main__'].poses[index * sys.modules['__main__'].length:(index + 1) * sys.modules['__main__'].length]
    imageio.mimsave(f'{index}.mp4', pipeline(prompt=['gorgeous slim young cleavage robust boob japanese girl, wearing white deep V bandeau pantie, smile lying on white bed, best quality, extremely detailed'] * builtins.len(pose), negative_prompt=['monochrome, lowres, bad anatomy, worst quality, low quality'] * builtins.len(pose), image=pose, num_inference_steps=20, latents=torch.randn((1, 4, 96, 64), device=torch_xla.core.xla_model.xla_device(), dtype=torch.bfloat16).repeat(builtins.len(pose), 1, 1, 1)).images, fps=fps)

torch_xla.distributed.xla_multiprocessing.spawn(process, start_method='fork')

and get

BrokenProcessPool                         Traceback (most recent call last)
Cell In[2], line 20
     17     pose = sys.modules['__main__'].poses[index * sys.modules['__main__'].length:(index + 1) * sys.modules['__main__'].length]
     18     imageio.mimsave(f'{index}.mp4', pipeline(prompt=['gorgeous slim young cleavage robust boob japanese girl, wearing white deep V bandeau pantie, smile lying on white bed, best quality, extremely detailed'] * builtins.len(pose), negative_prompt=['monochrome, lowres, bad anatomy, worst quality, low quality'] * builtins.len(pose), image=pose, num_inference_steps=20, latents=torch.randn((1, 4, 96, 64), device=torch_xla.core.xla_model.xla_device(), dtype=torch.bfloat16).repeat(builtins.len(pose), 1, 1, 1)).images, fps=fps)
---> 20 torch_xla.distributed.xla_multiprocessing.spawn(process, start_method='fork')
     21 result = []
     22 for _ in builtins.range(8):

File /usr/local/lib/python3.10/site-packages/torch_xla/runtime.py:95, in requires_pjrt.<locals>.wrapper(*args, **kwargs)
     91 if not using_pjrt():
     92   raise NotImplementedError('`{}` not implemented for XRT'.format(
     93       fn.__name__))
---> 95 return fn(*args, **kwargs)

File /usr/local/lib/python3.10/site-packages/torch_xla/distributed/xla_multiprocessing.py:38, in spawn(fn, args, nprocs, join, daemon, start_method)
      6 @xr.requires_pjrt
      7 def spawn(fn,
      8           args=(),
   (...)
     11           daemon=False,
     12           start_method='spawn'):
     13   """Enables multi processing based replication.
     14 
     15   Args:
   (...)
     36     return None.
     37   """
---> 38   return pjrt.spawn(fn, nprocs, start_method, args)

File /usr/local/lib/python3.10/site-packages/torch_xla/_internal/pjrt.py:214, in spawn(fn, nprocs, start_method, args)
    211 elif nprocs is not None:
    212   logging.warning('Unsupported nprocs (%d), ignoring...' % nprocs)
--> 214 run_multiprocess(spawn_fn, start_method=start_method)

File /usr/local/lib/python3.10/site-packages/torch_xla/runtime.py:95, in requires_pjrt.<locals>.wrapper(*args, **kwargs)
     91 if not using_pjrt():
     92   raise NotImplementedError('`{}` not implemented for XRT'.format(
     93       fn.__name__))
---> 95 return fn(*args, **kwargs)

File /usr/local/lib/python3.10/site-packages/torch_xla/_internal/pjrt.py:174, in run_multiprocess(fn, start_method, *args, **kwargs)
    168   mp_fn = functools.partial(
    169       _run_thread_per_device,
    170       local_world_size=num_processes,
    171       fn=functools.partial(fn, *args, **kwargs),
    172       initializer_fn=initialize_multiprocess)
    173   process_results = executor.map(mp_fn, range(num_processes))
--> 174   replica_results = list(
    175       itertools.chain.from_iterable(
    176           result.items() for result in process_results))
    178 return _merge_replica_results(replica_results)

File /usr/local/lib/python3.10/site-packages/torch_xla/_internal/pjrt.py:175, in <genexpr>(.0)
    168   mp_fn = functools.partial(
    169       _run_thread_per_device,
    170       local_world_size=num_processes,
    171       fn=functools.partial(fn, *args, **kwargs),
    172       initializer_fn=initialize_multiprocess)
    173   process_results = executor.map(mp_fn, range(num_processes))
    174   replica_results = list(
--> 175       itertools.chain.from_iterable(
    176           result.items() for result in process_results))
    178 return _merge_replica_results(replica_results)

File /usr/local/lib/python3.10/concurrent/futures/process.py:575, in _chain_from_iterable_of_lists(iterable)
    569 def _chain_from_iterable_of_lists(iterable):
    570     """
    571     Specialized implementation of itertools.chain.from_iterable.
    572     Each item in *iterable* should be a list.  This function is
    573     careful not to keep references to yielded objects.
    574     """
--> 575     for element in iterable:
    576         element.reverse()
    577         while element:

File /usr/local/lib/python3.10/concurrent/futures/_base.py:621, in Executor.map.<locals>.result_iterator()
    618 while fs:
    619     # Careful not to keep a reference to the popped future
    620     if timeout is None:
--> 621         yield _result_or_cancel(fs.pop())
    622     else:
    623         yield _result_or_cancel(fs.pop(), end_time - time.monotonic())

File /usr/local/lib/python3.10/concurrent/futures/_base.py:319, in _result_or_cancel(***failed resolving arguments***)
    317 try:
    318     try:
--> 319         return fut.result(timeout)
    320     finally:
    321         fut.cancel()

File /usr/local/lib/python3.10/concurrent/futures/_base.py:458, in Future.result(self, timeout)
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File /usr/local/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Single process on single tpu core works well but can not work multiple process and use all cores on tpu.

Please help.

You can copy and test my code in https://www.kaggle.com/code/chaowenguoback/stablediffusion. Please help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant