Skip to content

Commit

Permalink
Make BulkIndexError and ScanError serializable
Browse files Browse the repository at this point in the history
  • Loading branch information
seagrine committed Oct 17, 2024
1 parent a62a506 commit a177ec7
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
7 changes: 7 additions & 0 deletions elasticsearch/helpers/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ def __init__(self, message: Any, errors: List[Dict[str, Any]]):
super().__init__(message)
self.errors: List[Dict[str, Any]] = errors

def __reduce__(self):
return (self.__class__, (self.args[0], self.errors))


class ScanError(Exception):
scroll_id: str

def __init__(self, scroll_id: str, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.scroll_id = scroll_id

def __reduce__(self):
return (self.__class__, (self.scroll_id,) + self.args)

23 changes: 22 additions & 1 deletion test_elasticsearch/test_server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import json
import pickle
from datetime import datetime, timedelta
from unittest.mock import call, patch

Expand All @@ -24,7 +25,7 @@
from elastic_transport import ApiResponseMeta, ObjectApiResponse

from elasticsearch import ApiError, helpers
from elasticsearch.helpers import ScanError
from elasticsearch.helpers import BulkIndexError, ScanError


class FailingBulkClient:
Expand Down Expand Up @@ -993,3 +994,23 @@ def test_reindex_index_datastream_op_type_index(sync_client):
query={"query": {"bool": {"filter": {"term": {"type": "answers"}}}}},
op_type="_index",
)

def test_serialize_bulk_index_error():
msg = "message"
errors = {"error": 1}
error = BulkIndexError(msg, errors)
pickled = pickle.dumps(error)
actual = pickle.loads(pickled)
assert actual.__class__ == BulkIndexError
assert actual.errors == error.errors
assert actual.args == error.args

def test_serialize_scan_error():
scroll_id = "message"
args = ("a", "b")
error = ScanError(scroll_id, *args)
pickled = pickle.dumps(error)
actual = pickle.loads(pickled)
assert actual.__class__ == ScanError
assert actual.scroll_id == error.scroll_id
assert actual.args == error.args

0 comments on commit a177ec7

Please sign in to comment.