diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index 5986c084..9481b6f7 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -151,10 +151,12 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader if (response.getEntity != null) { val loadResult = EntityUtils.toString(response.getEntity) val res = MAPPER.readValue(loadResult, new TypeReference[util.HashMap[String, String]]() {}) - if (res.get("status") == "Fail" && !ResponseUtil.isCommitted(res.get("msg"))) throw new StreamLoadException("Commit failed " + loadResult) - else LOG.info("load result {}", loadResult) + if (res.get("status") == "Success" || ResponseUtil.isCommitted(res.get("msg"))) LOG.info("commit transaction {} succeed, load result: {}.", msg.value, loadResult) + else { + LOG.error("commit transaction {} failed. load result: {}", msg.value, loadResult) + throw new StreamLoadException("Commit failed " + loadResult) + } } - } match { case Success(_) => client.close() case Failure(e) =>