Skip to content

Commit

Permalink
add label prefix configuration item for doris sink to track writing s…
Browse files Browse the repository at this point in the history
…ource when troubleshooting
  • Loading branch information
wary committed Oct 24, 2024
1 parent 4557e8f commit e9cb4e5
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader
ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT) && autoRedirect

private val enableGroupCommit: Boolean = streamLoadProps.contains(ConfigurationOptions.GROUP_COMMIT)

/**
* execute stream load
*
Expand Down Expand Up @@ -311,7 +312,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader
*
* if load data to be directly, check node available will be done before return.
*
* @throws [ [ org.apache.doris.spark.exception.StreamLoadException]]
* @throws [ [ org.apache.doris.spark.exception.StreamLoadException]]
* @return address
*/
@throws[StreamLoadException]
Expand Down Expand Up @@ -384,7 +385,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader
/**
* generate load label
*
* spark_streamload_YYYYMMDD_HHMMSS_{UUID}
* {label_prefix}_YYYYMMDD_HHMMSS_{UUID}
*
* @return load label
*/
Expand All @@ -393,7 +394,8 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader
return null;
}
val calendar = Calendar.getInstance
"spark_streamload_" +
val labelPrefix = streamLoadProps.getOrElse("label_prefix", "spark_streamload")
labelPrefix + "_" +
f"${calendar.get(Calendar.YEAR)}${calendar.get(Calendar.MONTH) + 1}%02d${calendar.get(Calendar.DAY_OF_MONTH)}%02d" +
f"_${calendar.get(Calendar.HOUR_OF_DAY)}%02d${calendar.get(Calendar.MINUTE)}%02d${calendar.get(Calendar.SECOND)}%02d" +
f"_${UUID.randomUUID.toString.replaceAll("-", "")}"
Expand Down

0 comments on commit e9cb4e5

Please sign in to comment.