Skip to content

Commit

Permalink
[fix](stream_load)fix bug for stream
Browse files Browse the repository at this point in the history
1. forbid thed  stream_load without content-length or chunked Transfer Encoding
2. forbid thed  stream_load both with content-length and chunked Transfer Encoding
  • Loading branch information
alexxing662 committed Dec 27, 2023
1 parent 70c1b5f commit f00f235
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 0 deletions.
13 changes: 13 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,19 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
}
}

if (UNLIKELY((http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
!ctx->is_chunked_transfer))) {
LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
"content_length or transfer-encoding=chunked";
return Status::InternalError(
"content_length is empty and transfer-encoding!=chunked, please set content_length "
"or transfer-encoding=chunked");
} else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
ctx->is_chunked_transfer)) {
LOG(WARNING) << "please do not set both content_length and transfer-encoding";
return Status::InternalError("please do not set both content_length and transfer-encoding");
}

if (!http_req->header(HTTP_TIMEOUT).empty()) {
try {
ctx->timeout_second = std::stoi(http_req->header(HTTP_TIMEOUT));
Expand Down
70 changes: 70 additions & 0 deletions regression-test/suites/load_p0/stream_load/test_stream_load.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -1044,5 +1044,75 @@ suite("test_stream_load", "p0") {
} finally {
sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
}

def sql_result = sql """ select Host, HttpPort from backends() where alive = true limit 1; """

log.info(sql_result[0][0].toString())
log.info(sql_result[0][1].toString())
log.info(sql_result[0].size.toString())

def beHost=sql_result[0][0]
def beHttpPort=sql_result[0][1]
log.info("${beHost}".toString())
log.info("${beHttpPort}".toString());

//test be : chunked transfer + Content Length
try {
sql """ DROP TABLE IF EXISTS ${tableName16} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName16} (
`k1` bigint(20) NULL DEFAULT "1",
`k2` bigint(20) NULL ,
`v1` tinyint(4) NULL,
`v2` tinyint(4) NULL,
`v3` tinyint(4) NULL,
`v4` DATETIME NULL
) ENGINE=OLAP
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

def command = "curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H column_separator:| -H ${db}:${tableName16} -H Content-Length:0 -H Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3 -T ${context.dataPath}/test_chunked_transfer.csv http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load"
log.info("test chunked transfer command: ${command}")
def process = command.execute()
code = process.waitFor()
out = process.text
log.info("test chunked transfer result: ${out}".toString())
def json = parseJson(out)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("[INTERNAL_ERROR]please do not set both content_length and transfer-encoding"))
} finally {
sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
}


//test be : no chunked transfer + no Content Length
try {
sql """ DROP TABLE IF EXISTS ${tableName16} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName16} (
`k1` bigint(20) NULL DEFAULT "1",
`k2` bigint(20) NULL ,
`v1` tinyint(4) NULL,
`v2` tinyint(4) NULL,
`v3` tinyint(4) NULL,
`v4` DATETIME NULL
) ENGINE=OLAP
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

def command = "curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H column_separator:| -H ${db}:${tableName16} -H Content-Length: -H Transfer-Encoding: -T ${context.dataPath}/test_chunked_transfer.csv http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load"
log.info("test chunked transfer command: ${command}")
def process = command.execute()
code = process.waitFor()
out = process.text
log.info("test chunked transfer result: ${out}".toString())
def json = parseJson(out)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("[INTERNAL_ERROR]content_length is empty and transfer-encoding!=chunked, please set content_length or transfer-encoding=chunked"))
} finally {
sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
}
}

0 comments on commit f00f235

Please sign in to comment.