Skip to content

Commit

Permalink
minor streamwriter changes to allow compression
Browse files Browse the repository at this point in the history
  • Loading branch information
mlathara committed Sep 5, 2024
1 parent aa4eff5 commit 09f49fe
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/genomicsdb.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,15 @@ cdef class _GenomicsDB:
flatten_intervals=False,
json_output=None,
arrow_output=None,
# batching only used with arrow_output
batching=False):
# batching/compress only used with arrow_output
batching=False,
compress=None):
""" Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """

if json_output is not None:
return self.query_variant_calls_json(array, column_ranges, row_ranges, query_protobuf, json_output);
elif arrow_output is not None:
return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching);
return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching, compress);
elif flatten_intervals is True:
return self.query_variant_calls_columnar(array, column_ranges, row_ranges, query_protobuf)
else:
Expand Down Expand Up @@ -318,7 +319,8 @@ cdef class _GenomicsDB:
column_ranges=None,
row_ranges=None,
query_protobuf: query_pb.QueryConfiguration=None,
batching=False):
batching=False,
compress=None):
""" Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """

cdef ArrowVariantCallProcessor processor
Expand Down Expand Up @@ -366,19 +368,24 @@ cdef class _GenomicsDB:
schema_capsule = pycapsule_get_arrow_schema(arrow_schema)
schema_obj = _ArrowSchemaWrapper._import_from_c_capsule(schema_capsule)
schema = pa.schema(schema_obj.children_schema)
yield schema.serialize().to_pybytes()
else:
raise GenomicsDBException("Failed to retrieve arrow schema for query_variant_calls()")

cdef void* arrow_array = NULL
w_opts = pa.ipc.IpcWriteOptions(allow_64bit=True, compression=compress)
while True:
try:
arrow_array = processor.arrow_array()
if arrow_array:
array_capsule = pycapsule_get_arrow_array(arrow_array)
array_obj = _ArrowArrayWrapper._import_from_c_capsule(schema_capsule, array_capsule)
arrays = [pa.array(array_obj.child(i)) for i in range(array_obj.n_children)]
yield pa.RecordBatch.from_arrays(arrays, schema=schema).serialize().to_pybytes()
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
sink = pa.BufferOutputStream()
writer = pa.RecordBatchStreamWriter(sink, schema, options=w_opts)
writer.write_batch(batch)
writer.close()
yield sink.getvalue().to_pybytes()
else:
break
except Exception as e:
Expand Down

0 comments on commit 09f49fe

Please sign in to comment.