Skip to content

Commit

Permalink
Support for replacing varchar and varbinary types in external tables (#…
Browse files Browse the repository at this point in the history
…266)

* Add nameservice to hadoopimpersonationconfig

* Remove condition that restricts column replacement to only UNKNOWN

* revert ha change in this branch
  • Loading branch information
ravjotbrar authored Oct 29, 2021
1 parent 99dc485 commit 78316be
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,22 @@ class VerticaDistributedFilesystemWritePipe(val config: DistributedFilesystemWri
}

logger.debug("The infer statement is: " + inferStatement)

jdbcLayer.query(inferStatement) match {
case Left(err) => Left(InferExternalTableSchemaError(err))
case Right(resultSet) =>
try {
val iterate = resultSet.next
val createExternalTableStatement = resultSet.getString("INFER_EXTERNAL_TABLE_DDL")

val containsVarchar = createExternalTableStatement.toLowerCase.contains("varchar")
val containsVarbinary = createExternalTableStatement.toLowerCase.contains("varbinary")

if(containsVarchar || containsVarbinary) {
logger.warn("The parquet data contains a column of type varchar or varbinary. " +
"Lengths of these column types cannot be determined from the data and will truncate to the default length (80). " +
"Please provide a partial schema with StringType to replace varchar and BinaryType to replace varbinary, or manually create an external table.")
}

if(inferStatement.contains(EscapeUtils.sqlEscape(s"${config.fileStoreConfig.externalTableAddress.stripSuffix("/")}/**/*.parquet"))) {
logger.info("Inferring partial schema from dataframe")
schemaTools.inferExternalTableSchema(createExternalTableStatement, config.schema, tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,22 +444,20 @@ class SchemaTools extends SchemaToolsInterface {
val schemaList = schemaString.split(",").toList

val updatedSchema: String = schemaList.map(col => {
if(col.contains(unknown)) {
val indexOfFirstDoubleQuote = col.indexOf("\"")
val indexOfSpace = col.indexOf(" ", indexOfFirstDoubleQuote)
val colName = col.substring(indexOfFirstDoubleQuote, indexOfSpace)
val indexOfFirstDoubleQuote = col.indexOf("\"")
val indexOfSpace = col.indexOf(" ", indexOfFirstDoubleQuote)
val colName = col.substring(indexOfFirstDoubleQuote, indexOfSpace)

val fieldType = schema.collect {
case field if(addDoubleQuotes(field.name) == colName) => field.dataType.simpleString
}
if(fieldType.nonEmpty) {
colName + " " + fieldType.head
}
else {
col
}
val fieldType = schema.collect {
case field if(addDoubleQuotes(field.name) == colName) => field.dataType.simpleString
}
if(fieldType.nonEmpty) {
colName + " " + fieldType.head
}

else {
col
}
else { col }
}).mkString(",")

if(updatedSchema.contains(unknown)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ class SchemaToolsTests extends AnyFlatSpec with BeforeAndAfterAll with MockFacto
val createExternalTableStmt = "create external table \"sales\"(" +
"\"tx_id\" int," +
"\"date\" UNKNOWN," +
"\"region\" UNKNOWN" +
"\"region\" varchar" +
") as copy from \'/data/\' parquet"
val schemaTools = new SchemaTools
schemaTools.inferExternalTableSchema(createExternalTableStmt, schema, "sales") match {
Expand Down

0 comments on commit 78316be

Please sign in to comment.