Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExeTera Questions #289

Open
NettaShemesh103 opened this issue Apr 23, 2022 · 3 comments
Open

ExeTera Questions #289

NettaShemesh103 opened this issue Apr 23, 2022 · 3 comments

Comments

@NettaShemesh103
Copy link

Hello ,
My name is Netta Shemesh and I’m 23 years old from Israel. Recently I have read your article “Accessible data curation and analytics for international-scale citizen science datasets” and I am doing a project for university (Israel Institute of Technology) on your paper.
I have several questions about ExeTera, the Python-based software package you wrote.
First, I would like to know if you are still working on the software, and have you succeeded in apply Dask array in your code?
Secondly, I haven’t understood precisely why the Dask DataFrame could not import the data and what are the exceptions that raise during the Artificial Joins that made the Dask program to fail.
In addition I was wondering why applying Dask array on your software will make all operations on ExeTera fields to become streaming by default.
Thank you in advance,
Netta Shemesh

@atbenmurray
Copy link
Member

Posting responses here for continuity with the issue:

Hi Netta,

Thank you for your interest in ExeTera. My apologies for my slightly delayed response; it has been a busy week and now I am catching up on emails.

Firstly, yes, we are still working on ExeTera.

ExeTera continues to have an application within our school and we are also looking for other applications that require the kind of scale it provides.

We haven't yet had the opportunity to rewrite the backend using dask, but we are in the process of scheduling the future work, including dask integration. I cannot give you a timescale for that right now, however.

I don't recall the precise errors that were raised during the dask benchmarking with the artificial dataset, but we tried a number of approaches to make it work and were unable to do so. I'll rerun the experiment for dask and then give you instructions on how to run the artificial join code in the evaluation repo so you can experiment for yourself (it will need a machine with a lot of RAM).

Using dask arrays means that we can make use of the dask compute engine, which builds directed graphs of the operations to be performed and parallelises / distributes them as appropriate. We need to implement a number of key operations ourselves as dask does not natively support the operations that we want to perform outside of dask dataframe, but its API permits the specification of custom graph fragments through which those can be implemented.

Are you able to give me more details about your project and its aims? I might be able to assist your enquiries more effectively if you can give me an overview of what you want to do.

Yours,
Ben

@atbenmurray
Copy link
Member

atbenmurray commented Apr 25, 2022

Here is a stacktrace from a failing dask merge scenario:

Traceback (most recent call last):
  File "dask_join_scenario.py", line 32, in <module>
    go(int(sys.argv[1]), int(sys.argv[2]))
  File "dask_join_scenario.py", line 20, in go
    m_df.to_hdf('d_m_df_{}_{}.hdf'.format(l_length, r_length), key='/data')
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/dataframe/core.py", line 1554, in to_hdf
    return to_hdf(self, path_or_buf, key, mode, append, **kwargs)
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/dataframe/io/hdf.py", line 251, in to_hdf
    compute_as_if_collection(
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/base.py", line 317, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 552, in get_sync
    return get_async(
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 495, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 537, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 233, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 233, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 224, in execute_task
    result = pack_exception(e, dumps)
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/local.py", line 219, in execute_task
    result = _execute_task(task, data)
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/dask/dataframe/io/hdf.py", line 27, in _pd_to_hdf
    pd_to_hdf(*args, **kwargs)
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/core/generic.py", line 2763, in to_hdf
    pytables.to_hdf(
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 314, in to_hdf
    f(store)
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 282, in <lambda>
    f = lambda store: store.append(
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 1262, in append
    self._write_to_group(
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 1772, in _write_to_group
    s.write(
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 4299, in write
    table = self._create_axes(
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 3915, in _create_axes
    blocks, blk_items = self._get_blocks_and_items(
  File "/home/bm18/miniconda3/envs/exetera/lib/python3.8/site-packages/pandas/io/pytables.py", line 4076, in _get_blocks_and_items
    raise ValueError(
ValueError: cannot match existing table structure for [l1,rpk,fk,r0,r1] on appending data

If you look at the last line, you have the ValueError where the table structure appears to be inconsistent for the subtables that dask generates while performing the merge. We noted that this occurred for some join sizes and not others. We were not able to find a solution to work around the issue at the time and we felt that it represented the kind of technical hurdle that a data analyst should not have to solve. Also, given the performance disparity between dask dataframe merge and our merge we also considered that further time trying to work around the issue was not productive.

To summarise, our view is that the dask dataframe implementation was a "low hanging fruit" implementation that gave dask dataframe like functionality, but is sufficiently problematic to be worth tackling from a completely different design direction.

@atbenmurray
Copy link
Member

You can clone the ExeTeraEval repository and do the following if you want to replicate the dask evaluation.

You'll need to run two commands:

# arguments: left row count, right row count, partition row count
python create_dask_join_scenario.py 20000000 200000000 2000000

# arguments: left row count, right row count
python dask_join_scenario.py 20000000 200000000

If you are using numbers like the above, I suggest you use a machine with a large amount of RAM (>64GB)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants