Skip to content

Commit

Permalink
apacheGH-42168: [Python][Parquet] Pyarrow store decimal as integer (a…
Browse files Browse the repository at this point in the history
…pache#42169)

This PR exposes the [store_decimal_as_integer](https://arrow.apache.org/docs/cpp/api/formats.html#_CPPv4N7parquet16WriterProperties7Builder31enable_store_decimal_as_integerEv) parquet writer property to pyarrow

### Rationale for this change

This will allow storing fixed-point decimals as integers in the parquet format and take advantage of more efficient storage codecs

### What changes are included in this PR?

Pyarrow parquet writer and related Cython 

### Are these changes tested?

Tests were included for the new parameter

### Are there any user-facing changes?

Docstrings were updated

* GitHub Issue: apache#42168

Lead-authored-by: feik <briankiefer@outlook.com>
Co-authored-by: Brian Kiefer <briankiefer@outlook.com>
Co-authored-by: Raúl Cumplido <raulcumplido@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
  • Loading branch information
bkief and raulcd authored Jul 1, 2024
1 parent ae984f4 commit edfa343
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 2 deletions.
2 changes: 2 additions & 0 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
write_page_index=self._properties["write_page_index"],
write_page_checksum=self._properties["write_page_checksum"],
sorting_columns=self._properties["sorting_columns"],
store_decimal_as_integer=self._properties["store_decimal_as_integer"],
)

def _set_arrow_properties(self):
Expand Down Expand Up @@ -664,6 +665,7 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
encryption_config=None,
write_page_checksum=False,
sorting_columns=None,
store_decimal_as_integer=False,
)

self._set_properties()
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* disable_statistics()
Builder* enable_statistics()
Builder* enable_statistics(const c_string& path)
Builder* enable_store_decimal_as_integer()
Builder* disable_store_decimal_as_integer()
Builder* data_pagesize(int64_t size)
Builder* encoding(ParquetEncoding encoding)
Builder* encoding(const c_string& path,
Expand Down Expand Up @@ -595,6 +597,7 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
write_page_index=*,
write_page_checksum=*,
sorting_columns=*,
store_decimal_as_integer=*,
) except *


Expand Down
19 changes: 17 additions & 2 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,9 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
dictionary_pagesize_limit=None,
write_page_index=False,
write_page_checksum=False,
sorting_columns=None) except *:
sorting_columns=None,
store_decimal_as_integer=False) except *:

"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
Expand Down Expand Up @@ -1942,6 +1944,16 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
"'use_byte_stream_split' cannot be passed"
"together with 'column_encoding'")

# store_decimal_as_integer

if isinstance(store_decimal_as_integer, bool):
if store_decimal_as_integer:
props.enable_store_decimal_as_integer()
else:
props.disable_store_decimal_as_integer()
else:
raise TypeError("'store_decimal_as_integer' must be a boolean")

# column_encoding
# encoding map - encode individual columns

Expand Down Expand Up @@ -2115,6 +2127,7 @@ cdef class ParquetWriter(_Weakrefable):
int64_t write_batch_size
int64_t dictionary_pagesize_limit
object store_schema
object store_decimal_as_integer

def __cinit__(self, where, Schema schema not None, use_dictionary=None,
compression=None, version=None,
Expand All @@ -2136,7 +2149,8 @@ cdef class ParquetWriter(_Weakrefable):
store_schema=True,
write_page_index=False,
write_page_checksum=False,
sorting_columns=None):
sorting_columns=None,
store_decimal_as_integer=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
Expand Down Expand Up @@ -2170,6 +2184,7 @@ cdef class ParquetWriter(_Weakrefable):
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
sorting_columns=sorting_columns,
store_decimal_as_integer=store_decimal_as_integer,
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
Expand Down
21 changes: 21 additions & 0 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,23 @@ def _sanitize_table(table, new_schema, flavor):
Specify the sort order of the data being written. The writer does not sort
the data nor does it verify that the data is sorted. The sort order is
written to the row group metadata, which can then be used by readers.
store_decimal_as_integer : bool, default False
Allow decimals with 1 <= precision <= 18 to be stored as integers.
In Parquet, DECIMAL can be stored in any of the following physical types:
- int32: for 1 <= precision <= 9.
- int64: for 10 <= precision <= 18.
- fixed_len_byte_array: precision is limited by the array size.
Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits.
- binary: precision is unlimited. The minimum number of bytes to store the
unscaled value is used.
By default, this is DISABLED and all decimal types annotate fixed_len_byte_array.
When enabled, the writer will use the following physical types to store decimals:
- int32: for 1 <= precision <= 9.
- int64: for 10 <= precision <= 18.
- fixed_len_byte_array: for precision > 18.
As a consequence, decimal columns stored in integer types are more compact.
"""

_parquet_writer_example_doc = """\
Expand Down Expand Up @@ -968,6 +985,7 @@ def __init__(self, where, schema, filesystem=None,
write_page_index=False,
write_page_checksum=False,
sorting_columns=None,
store_decimal_as_integer=False,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -1020,6 +1038,7 @@ def __init__(self, where, schema, filesystem=None,
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
sorting_columns=sorting_columns,
store_decimal_as_integer=store_decimal_as_integer,
**options)
self.is_open = True

Expand Down Expand Up @@ -1873,6 +1892,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
write_page_index=False,
write_page_checksum=False,
sorting_columns=None,
store_decimal_as_integer=False,
**kwargs):
# Implementor's note: when adding keywords here / updating defaults, also
# update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions
Expand Down Expand Up @@ -1903,6 +1923,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
sorting_columns=sorting_columns,
store_decimal_as_integer=store_decimal_as_integer,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
Expand Down
56 changes: 56 additions & 0 deletions python/pyarrow/tests/parquet/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.

import os
from collections import OrderedDict
import io
import warnings
from shutil import copytree
from decimal import Decimal

import numpy as np
import pytest
Expand Down Expand Up @@ -357,6 +359,60 @@ def test_byte_stream_split():
use_dictionary=False)


def test_store_decimal_as_integer(tempdir):
arr_decimal_1_9 = pa.array(list(map(Decimal, range(100))),
type=pa.decimal128(5, 2))
arr_decimal_10_18 = pa.array(list(map(Decimal, range(100))),
type=pa.decimal128(16, 9))
arr_decimal_gt18 = pa.array(list(map(Decimal, range(100))),
type=pa.decimal128(22, 2))
arr_bool = pa.array([True, False] * 50)
data_decimal = [arr_decimal_1_9, arr_decimal_10_18, arr_decimal_gt18]
table = pa.Table.from_arrays(data_decimal, names=['a', 'b', 'c'])

# Check with store_decimal_as_integer.
_check_roundtrip(table,
expected=table,
compression="gzip",
use_dictionary=False,
store_decimal_as_integer=True)

# Check physical type in parquet schema
pqtestfile_path = os.path.join(tempdir, 'test.parquet')
pq.write_table(table, pqtestfile_path,
compression="gzip",
use_dictionary=False,
store_decimal_as_integer=True)

pqtestfile = pq.ParquetFile(pqtestfile_path)
pqcol_decimal_1_9 = pqtestfile.schema.column(0)
pqcol_decimal_10_18 = pqtestfile.schema.column(1)

assert pqcol_decimal_1_9.physical_type == 'INT32'
assert pqcol_decimal_10_18.physical_type == 'INT64'

# Check with store_decimal_as_integer and delta-int encoding.
# DELTA_BINARY_PACKED requires parquet physical type to be INT64 or INT32
_check_roundtrip(table,
expected=table,
compression="gzip",
use_dictionary=False,
store_decimal_as_integer=True,
column_encoding={
'a': 'DELTA_BINARY_PACKED',
'b': 'DELTA_BINARY_PACKED'
})

# Check with mixed column types.
mixed_table = pa.Table.from_arrays(
[arr_decimal_1_9, arr_decimal_10_18, arr_decimal_gt18, arr_bool],
names=['a', 'b', 'c', 'd'])
_check_roundtrip(mixed_table,
expected=mixed_table,
use_dictionary=False,
store_decimal_as_integer=True)


def test_column_encoding():
arr_float = pa.array(list(map(float, range(100))))
arr_int = pa.array(list(map(int, range(100))))
Expand Down

0 comments on commit edfa343

Please sign in to comment.