Skip to content

Commit

Permalink
minor streamwriter changes to allow compression
Browse files Browse the repository at this point in the history
  • Loading branch information
mlathara committed Sep 5, 2024
1 parent aa4eff5 commit 6f335ee
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 29 deletions.
19 changes: 13 additions & 6 deletions src/genomicsdb.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,15 @@ cdef class _GenomicsDB:
flatten_intervals=False,
json_output=None,
arrow_output=None,
# batching only used with arrow_output
batching=False):
# batching/compress only used with arrow_output
batching=False,
compress=None):
""" Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """

if json_output is not None:
return self.query_variant_calls_json(array, column_ranges, row_ranges, query_protobuf, json_output);
elif arrow_output is not None:
return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching);
return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching, compress);
elif flatten_intervals is True:
return self.query_variant_calls_columnar(array, column_ranges, row_ranges, query_protobuf)
else:
Expand Down Expand Up @@ -318,7 +319,8 @@ cdef class _GenomicsDB:
column_ranges=None,
row_ranges=None,
query_protobuf: query_pb.QueryConfiguration=None,
batching=False):
batching=False,
compress=None):
""" Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """

cdef ArrowVariantCallProcessor processor
Expand Down Expand Up @@ -366,19 +368,24 @@ cdef class _GenomicsDB:
schema_capsule = pycapsule_get_arrow_schema(arrow_schema)
schema_obj = _ArrowSchemaWrapper._import_from_c_capsule(schema_capsule)
schema = pa.schema(schema_obj.children_schema)
yield schema.serialize().to_pybytes()
else:
raise GenomicsDBException("Failed to retrieve arrow schema for query_variant_calls()")

cdef void* arrow_array = NULL
w_opts = pa.ipc.IpcWriteOptions(allow_64bit=True, compression=compress)
while True:
try:
arrow_array = processor.arrow_array()
if arrow_array:
array_capsule = pycapsule_get_arrow_array(arrow_array)
array_obj = _ArrowArrayWrapper._import_from_c_capsule(schema_capsule, array_capsule)
arrays = [pa.array(array_obj.child(i)) for i in range(array_obj.n_children)]
yield pa.RecordBatch.from_arrays(arrays, schema=schema).serialize().to_pybytes()
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
sink = pa.BufferOutputStream()
writer = pa.RecordBatchStreamWriter(sink, schema, options=w_opts)
writer.write_batch(batch)
writer.close()
yield sink.getvalue().to_pybytes()
else:
break
except Exception as e:
Expand Down
24 changes: 8 additions & 16 deletions test/test_genomicsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,21 @@ def test_connect_and_query_with_protobuf(setup):
json_output=9999)

# test with query protobuf and arrow output
first = True
schema = pa.schema([("null_field", pa.string())])
for output in gdb.query_variant_calls(row_ranges=[(0, 3)], array="t0_1_2", arrow_output=True):
if first:
schema = pa.ipc.read_schema(pa.py_buffer(output))
first = False
else:
batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema)
assert batch.num_columns == 6
assert batch.num_rows == 5
reader = pa.ipc.open_stream(output)
batch = reader.read_next_batch()
assert batch.num_columns == 6
assert batch.num_rows == 5

first = True
batch = None
for output in gdb.query_variant_calls(
row_ranges=[(0, 3)], array="t0_1_2", arrow_output=True, batching=True
):
if first:
schema = pa.ipc.read_schema(pa.py_buffer(output))
first = False
else:
batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema)
assert batch.num_columns == 6
assert batch.num_rows == 1 or batch.num_rows == 3
reader = pa.ipc.open_stream(output)
batch = reader.read_next_batch()
assert batch.num_columns == 6
assert batch.num_rows == 1 or batch.num_rows == 3

# test with query contig interval and no results
interval = query_coords.ContigInterval()
Expand Down
10 changes: 3 additions & 7 deletions test/test_genomicsdb_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,10 @@ def test_genomicsdb_demo_with_arrow_output(self):
start = time.time()
gdb = genomicsdb.connect_with_protobuf(self.query_config)
print("\nSummary for batching mode=" + str(batching_mode) + ":")
first = True
for output in gdb.query_variant_calls(arrow_output=True, batching=batching_mode):
if first:
schema = pa.ipc.read_schema(pa.py_buffer(output))
first = False
else:
batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema)
print("batch num_rows="+str(batch.num_rows)+" num_columns="+str(batch.num_columns))
reader = pa.ipc.open_stream(output)
batch = reader.read_next_batch()
print("batch num_rows="+str(batch.num_rows)+" num_columns="+str(batch.num_columns))
print("\tElapsed time: " + str(time.time() - start))

if __name__ == '__main__':
Expand Down

0 comments on commit 6f335ee

Please sign in to comment.