Skip to content

Commit

Permalink
Adding option to disable Numba during tests (#277)
Browse files Browse the repository at this point in the history
* Adding option to disable Numba during tests to run code as pure Python to get coverage information

* Add check for correctly raised warning

* Update to unitest job name

* Fix

* Adding journal test back

Signed-off-by: Eric Kerfoot <eric.kerfoot@kcl.ac.uk>

* Install problem fix

* Install problem fix
  • Loading branch information
ericspod authored Mar 28, 2022
1 parent 8fe26ca commit 66697ec
Show file tree
Hide file tree
Showing 20 changed files with 261 additions and 235 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
branches: [ master ]

jobs:
build:
unittests:

runs-on: ${{ matrix.os }}
strategy:
Expand All @@ -36,7 +36,7 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with unittest and coverage
run: |
./runtests.sh --coverage
./runtests.sh --coverage --no-numba
- name: Upload coverage
uses: codecov/codecov-action@v1
with:
Expand Down
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ temp/
*.swp
.DS_Store

# temporary testing data MedNIST
tests/testing_data/MedNIST*
tests/testing_data/*Hippocampus*

# clang format tool
.clang-format-bin/

Expand Down
2 changes: 1 addition & 1 deletion exetera/bin/exetera_perf_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def hs_test_1(length, val_column_count):


# from numba import njit
# @njit
# njit
def fast_sum(d_it):
dsum = np.int64(0)
for d in d_it:
Expand Down
125 changes: 63 additions & 62 deletions exetera/core/csv_reader_speedup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import csv
import time
from xmlrpc.client import boolean
from numba import njit,jit
#from numba import njit
import numpy as np
from exetera.core import utils
import time
Expand All @@ -10,6 +10,7 @@
from typing import Union


from exetera.core.utils import exetera_njit

def get_file_stat(source: Union[str, StringIO], chunk_row_size: int):
"""
Expand Down Expand Up @@ -72,85 +73,85 @@ def read_file_using_fast_csv_reader(source: Union[str, StringIO],

column_val_total_count = column_offsets[-1]

with utils.Timer("read_file_using_fast_csv_reader"):
chunk_index = 0
hasHeader = True
#with utils.Timer("read_file_using_fast_csv_reader"):
chunk_index = 0
hasHeader = True

accumulated_written_rows = 0
accumulated_written_rows = 0

# initialize column_inds, column_vals ouside of while-loop
column_inds = np.zeros((count_columns, count_rows + 1), dtype=np.int64) # add one more row for initial index 0
# column_vals = np.zeros((count_columns, val_row_count), dtype=np.uint8)
column_vals = np.zeros(np.int64(column_val_total_count), dtype=np.uint8)
# initialize column_inds, column_vals ouside of while-loop
column_inds = np.zeros((count_columns, count_rows + 1), dtype=np.int64) # add one more row for initial index 0

# column_vals = np.zeros((count_columns, val_row_count), dtype=np.uint8)
column_vals = np.zeros(np.int64(column_val_total_count), dtype=np.uint8)

# make ndarray larger factor
larger_factor = 2
is_indices_full, is_values_full = False, False
# make ndarray larger factor
larger_factor = 2
is_indices_full, is_values_full = False, False

content = None
start_index = 0
content = None
start_index = 0

ch = 0
while chunk_index < total_byte_size:
if stop_after_rows and accumulated_written_rows >= stop_after_rows:
break
ch = 0
while chunk_index < total_byte_size:
if stop_after_rows and accumulated_written_rows >= stop_after_rows:
break

# reads chunk size of file content
# when indices or values is full, we need to call fast_csv_reader again, but we don't want to read same content again
if not is_indices_full and not is_values_full:
content = np.fromfile(source, count=chunk_byte_size, offset=chunk_index, dtype=np.uint8)
start_index = 0
# reads chunk size of file content
# when indices or values is full, we need to call fast_csv_reader again, but we don't want to read same content again
if not is_indices_full and not is_values_full:
content = np.fromfile(source, count=chunk_byte_size, offset=chunk_index, dtype=np.uint8)
start_index = 0

length_content = content.shape[0]
if length_content == 0:
break
length_content = content.shape[0]
if length_content == 0:
break

# check if there's newline at EOF in the last chunk. add one if it's missing
if chunk_index + length_content == total_byte_size and content[-1] != NEWLINE_VALUE:
content = np.append(content, NEWLINE_VALUE)
# check if there's newline at EOF in the last chunk. add one if it's missing
if chunk_index + length_content == total_byte_size and content[-1] != NEWLINE_VALUE:
content = np.append(content, NEWLINE_VALUE)

offset_pos, written_row_count, is_indices_full, is_values_full, val_full_col_idx = fast_csv_reader(content, start_index, column_inds, column_vals, column_offsets, hasHeader, ESCAPE_VALUE, SEPARATOR_VALUE, NEWLINE_VALUE, WHITE_SPACE_VALUE)

# convert and write
for ith, i_c in enumerate(index_map):
if field_importer_list and field_importer_list[ith]:
field_importer_list[ith].import_part(column_inds, column_vals, column_offsets, i_c, written_row_count)

# make column_inds larger if it gets full before reach the end of chunk
if is_indices_full:
indices_row_count = column_inds.shape[1] - 1
column_inds = np.zeros((count_columns, np.uint32(indices_row_count * larger_factor + 1)), dtype=np.int64)

# make column_values larger if it gets full before reach the end of chunk
if is_values_full and val_full_col_idx != -1:
col_val_count = column_offsets[val_full_col_idx + 1] - column_offsets[val_full_col_idx]
delta = col_val_count * (larger_factor - 1)
column_offsets = np.concatenate((column_offsets[:val_full_col_idx + 1], column_offsets[val_full_col_idx + 1:] + np.int64(delta)))
column_val_total_count = column_offsets[-1]
column_vals = np.zeros(np.int64(column_val_total_count), dtype=np.uint8)

offset_pos, written_row_count, is_indices_full, is_values_full, val_full_col_idx = fast_csv_reader(content, start_index, column_inds, column_vals, column_offsets, hasHeader, ESCAPE_VALUE, SEPARATOR_VALUE, NEWLINE_VALUE, WHITE_SPACE_VALUE)

# convert and write
for ith, i_c in enumerate(index_map):
if field_importer_list and field_importer_list[ith]:
field_importer_list[ith].import_part(column_inds, column_vals, column_offsets, i_c, written_row_count)

# make column_inds larger if it gets full before reach the end of chunk
if is_indices_full:
indices_row_count = column_inds.shape[1] - 1
column_inds = np.zeros((count_columns, np.uint32(indices_row_count * larger_factor + 1)), dtype=np.int64)

# make column_values larger if it gets full before reach the end of chunk
if is_values_full and val_full_col_idx != -1:
col_val_count = column_offsets[val_full_col_idx + 1] - column_offsets[val_full_col_idx]
delta = col_val_count * (larger_factor - 1)
column_offsets = np.concatenate((column_offsets[:val_full_col_idx + 1], column_offsets[val_full_col_idx + 1:] + np.int64(delta)))
column_val_total_count = column_offsets[-1]
column_vals = np.zeros(np.int64(column_val_total_count), dtype=np.uint8)

# reassign
if is_indices_full or is_values_full:
start_index = offset_pos
else:
chunk_index += offset_pos
# reassign
if is_indices_full or is_values_full:
start_index = offset_pos
else:
chunk_index += offset_pos

hasHeader = False
accumulated_written_rows += written_row_count
ch += 1
print(f"{ch} chunks, {accumulated_written_rows} accumulated_written_rows parsed in {time.time() - time0}s")
hasHeader = False
accumulated_written_rows += written_row_count
ch += 1

#print(f"{ch} chunks, {accumulated_written_rows} accumulated_written_rows parsed in {time.time() - time0}s")

# complete at the end
for ith in range(len(index_map)):
field_importer_list[ith].complete()

print(f"Total time {time.time() - time0}s")
#print(f"Total time {time.time() - time0}s")
return accumulated_written_rows


@njit
@exetera_njit
def fast_csv_reader(source: Union[str, StringIO],
start_index: int,
column_inds: np.ndarray,
Expand Down
4 changes: 2 additions & 2 deletions exetera/core/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def export_to_csv(destination, datastore, fields):
if length != expected_length:
raise ValueError(f"field '{f[1]}' is not the same length as the first field:"
f"expected {expected_length} but got {length}")
print('expected_length:', expected_length)
#print('expected_length:', expected_length)
readers = [datastore.get_reader(f[1]) for f in fields]
transforms = [transform_from_reader_type(r) for r in readers]

Expand All @@ -50,7 +50,7 @@ def export_to_csv(destination, datastore, fields):
row = [None] * len(fields)
for chunk in datastore.chunks(expected_length):
s = slice(chunk[0], chunk[1])
print(s)
#print(s)
chunks = [r[s] for r in readers]
for i_r in range(chunk[1] - chunk[0]):
for i_c, c in enumerate(chunks):
Expand Down
49 changes: 12 additions & 37 deletions exetera/core/journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,40 +45,32 @@ def journal_table(session, schema, old_src, new_src, src_pk, result):
old_only_keys = old_keys.difference(new_keys)
new_only_keys = new_keys.difference(old_keys)

with utils.Timer("sorting old ids"):
old_ids = session.get(old_src[src_pk])
old_ids_ = old_ids.data[:]
old_ids_valid_from = session.get(old_src['j_valid_from']).data[:]
old_sorted_index = session.dataset_sort_index((old_ids_, old_ids_valid_from))
old_ids = session.get(old_src[src_pk])
old_ids_ = old_ids.data[:]
old_ids_valid_from = session.get(old_src['j_valid_from']).data[:]
old_sorted_index = session.dataset_sort_index((old_ids_, old_ids_valid_from))
old_count = len(old_ids_)

with utils.Timer("sorting new_ids"):
new_ids_ = session.get(new_src[src_pk]).data[:]
new_sorted_index = session.dataset_sort_index((new_ids_,))
new_ids_ = session.get(new_src[src_pk]).data[:]
new_sorted_index = session.dataset_sort_index((new_ids_,))
new_count = len(new_ids_)

# print("old_ids:", old_ids_[old_sorted_index[:20]])
# print("new_ids:", new_ids_[new_sorted_index[:20]])

# get the row maps for rows that we need to compare
with utils.Timer("generating row_maps for merging"):
old_ids_ = old_ids_[old_sorted_index]
new_ids_ = new_ids_[new_sorted_index]
old_map, new_map = ops.ordered_generate_journalling_indices(old_ids_, new_ids_)
old_ids_ = old_ids_[old_sorted_index]
new_ids_ = new_ids_[new_sorted_index]
old_map, new_map = ops.ordered_generate_journalling_indices(old_ids_, new_ids_)

to_keep = np.zeros(len(old_map), dtype=bool)

schema_fields = schema.fields.keys()
common_keys = [k for k in schema_fields if k in common_keys]
print("old_map:", old_map)
print("new_map:", new_map)

for k in common_keys:
if k in (src_pk, 'j_valid_from', 'j_valid_to'):
continue
old_f = session.get(old_src[k])
new_f = session.get(new_src[k])
print(k)

if isinstance(old_f, flds.Field) and old_f.indexed:
old_f_i_, old_f_v_ = session.apply_index(old_sorted_index, old_f)
new_f_i_, new_f_v_ = session.apply_index(new_sorted_index, new_f)
Expand All @@ -90,9 +82,6 @@ def journal_table(session, schema, old_src, new_src, src_pk, result):
new_f_ = session.apply_index(new_sorted_index, new_f)
ops.compare_rows_for_journalling(old_map, new_map, old_f_, new_f_, to_keep)

print("to_keep:", to_keep.astype(np.uint8))
print(to_keep.sum(), len(to_keep))

merged_length = len(old_ids.data) + to_keep.sum()

only_in_old = 0
Expand All @@ -114,7 +103,7 @@ def journal_table(session, schema, old_src, new_src, src_pk, result):
continue
old_f = session.get(old_src[k])
new_f = session.get(new_src[k])
print(k)

if isinstance(old_f, flds.Field) and old_f.indexed:
old_f_i_, old_f_v_ = session.apply_index(old_sorted_index, old_f)
new_f_i_, new_f_v_ = session.apply_index(new_sorted_index, new_f)
Expand All @@ -136,21 +125,11 @@ def journal_table(session, schema, old_src, new_src, src_pk, result):
ops.merge_journalled_entries(old_map, new_map, to_keep, old_f_v_, new_f_v_, dest_)
dest_f = new_f.create_like(result, k)
dest_f.data.write(dest_)

print("old_count:", old_count)
print("new_count:", new_count)
print("only in old:", only_in_old)
print("only in new:", only_in_new)
print("updated:", updated)
print("not updated:", not_updated)
print("post journal count:", merged_length)


# merge the tables - the new field length is the original table field length + the number
# of elements from the new table that are being kept




def journal_test_harness(session, schema, old_file, new_file, dest_file):

with h5py.File(old_file, 'r') as old_src:
Expand All @@ -164,9 +143,5 @@ def journal_test_harness(session, schema, old_file, new_file, dest_file):

tables = [k for k in schema.keys() if k in old_tables]
for t in tables:
print("journaling {}".format(t))
t0 = time.time()
journal_table(session, schema[t], old_src[t], new_src[t],
'id', dest.create_group(t))
t1 = time.time()
print("test finished in {} seconds", t1 - t0)
Loading

0 comments on commit 66697ec

Please sign in to comment.