enh: [TS-6195] support cols in stream (#30580)

This commit is contained in:
xinsheng Ren 2025-04-02 11:31:08 +08:00 committed by GitHub
parent 88f2e71dc4
commit 463430ab15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 159 additions and 16 deletions

View File

@ -46,6 +46,7 @@ typedef struct STableKeyInfo {
typedef struct SWinKey {
uint64_t groupId;
TSKEY ts;
int32_t numInGroup;
} SWinKey;
typedef struct SSessionKey {

View File

@ -1269,12 +1269,14 @@ int32_t encodeSWinKey(void** buf, SWinKey* key) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, key->ts);
tlen += taosEncodeFixedU64(buf, key->groupId);
tlen += taosEncodeFixedI32(buf, key->numInGroup);
return tlen;
}
void* decodeSWinKey(void* buf, SWinKey* key) {
buf = taosDecodeFixedI64(buf, &key->ts);
buf = taosDecodeFixedU64(buf, &key->groupId);
buf = taosDecodeFixedI32(buf, &key->numInGroup);
return buf;
}

View File

@ -3955,6 +3955,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
}
key.groupId = pSrcBlock->info.id.groupId;
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);
key.numInGroup = pCtx->pExpr->pExpr->_function.bindExprID;
}
char* buf = NULL;

View File

@ -4593,6 +4593,8 @@ static int32_t jsonToDataType(const SJson* pJson, void* pObj) {
static const char* jkExprDataType = "DataType";
static const char* jkExprAliasName = "AliasName";
static const char* jkExprUserAlias = "UserAlias";
static const char* jkExprRelateTo = "RelatedTo";
static const char* jkExprBindExprID = "BindExprID";
static int32_t exprNodeToJson(const void* pObj, SJson* pJson) {
const SExprNode* pNode = (const SExprNode*)pObj;
@ -4604,6 +4606,12 @@ static int32_t exprNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkExprUserAlias, pNode->userAlias);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExprRelateTo, pNode->relatedTo);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExprBindExprID, pNode->bindExprID);
}
return code;
}
@ -4618,6 +4626,12 @@ static int32_t jsonToExprNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkExprUserAlias, pNode->userAlias);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExprRelateTo, &pNode->relatedTo);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExprBindExprID, &pNode->bindExprID);
}
return code;
}
@ -6665,6 +6679,7 @@ static int32_t jsonToSetOperator(const SJson* pJson, void* pObj) {
static const char* jkSelectStmtDistinct = "Distinct";
static const char* jkSelectStmtProjections = "Projections";
static const char* jkSelectStmtProjectionsBind = "ProjectionsBind";
static const char* jkSelectStmtFrom = "From";
static const char* jkSelectStmtWhere = "Where";
static const char* jkSelectStmtPartitionBy = "PartitionBy";
@ -6690,6 +6705,9 @@ static int32_t selectStmtToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSelectStmtProjections, pNode->pProjectionList);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSelectStmtProjectionsBind, pNode->pProjectionBindList);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSelectStmtFrom, nodeToJson, pNode->pFromTable);
}
@ -6752,6 +6770,9 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSelectStmtProjections, &pNode->pProjectionList);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSelectStmtProjectionsBind, &pNode->pProjectionBindList);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSelectStmtFrom, &pNode->pFromTable);
}

View File

@ -1415,6 +1415,7 @@ cols_func(A) ::= COLS(B).
cols_func_para_list(A) ::= function_expression(B) NK_COMMA cols_func_expression_list(C). { A = createColsFuncParamNodeList(pCxt, B, C, NULL); }
cols_func_expression(A) ::= expr_or_subquery(B). { A = releaseRawExprNode(pCxt, B); }
cols_func_expression(A) ::= NK_STAR(B). { A = createColumnNode(pCxt, NULL, &B); }
cols_func_expression(A) ::= expr_or_subquery(B) column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C);}
cols_func_expression(A) ::= expr_or_subquery(B) AS column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C);}

View File

@ -2987,7 +2987,7 @@ static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFu
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (isStarParam(pPara)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s(*) is only supported in SELECTed list", pFunc->functionName);
"%s(*) is only supported in selected list", pFunc->functionName);
}
}
if (tsKeepColumnName && 1 == LIST_LENGTH(pFunc->pParameterList) && !pFunc->node.asAlias && !pFunc->node.asParam) {
@ -5838,6 +5838,11 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) {
SNodeList* pCols = NULL;
code = createAllColumns(pCxt, false, &pCols);
if (TSDB_CODE_SUCCESS == code) {
SNode* tmp = NULL;
FOREACH(tmp, pCols) {
((SExprNode*)tmp)->bindExprID = ((SExprNode*)pNode)->bindExprID;
((SExprNode*)tmp)->relatedTo = ((SExprNode*)pNode)->relatedTo;
}
INSERT_LIST(pSelect->pProjectionList, pCols);
ERASE_NODE(pSelect->pProjectionList);
continue;
@ -5941,18 +5946,18 @@ static int32_t translateClausePosition(STranslateContext* pCxt, SNodeList* pProj
return TSDB_CODE_SUCCESS;
}
static int32_t rewriteColsFunction(STranslateContext* pCxt, SNodeList** nodeList, SNodeList** selectFuncList);
static int32_t rewriteColsFunction(STranslateContext* pCxt, ESqlClause clause, SNodeList** nodeList, SNodeList** selectFuncList);
static int32_t rewriteHavingColsNode(STranslateContext* pCxt, SNode** pNode, SNodeList** selectFuncList);
static int32_t prepareColumnExpansion(STranslateContext* pCxt, ESqlClause clause, SSelectStmt* pSelect) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t len = LIST_LENGTH(pSelect->pProjectionBindList);
if (clause == SQL_CLAUSE_SELECT) {
code = rewriteColsFunction(pCxt, &pSelect->pProjectionList, &pSelect->pProjectionBindList);
code = rewriteColsFunction(pCxt, clause, &pSelect->pProjectionList, &pSelect->pProjectionBindList);
} else if (clause == SQL_CLAUSE_HAVING) {
code = rewriteHavingColsNode(pCxt, &pSelect->pHaving, &pSelect->pProjectionBindList);
} else if (clause == SQL_CLAUSE_ORDER_BY) {
code = rewriteColsFunction(pCxt, &pSelect->pOrderByList, &pSelect->pProjectionBindList);
code = rewriteColsFunction(pCxt, clause, &pSelect->pOrderByList, &pSelect->pProjectionBindList);
} else {
code =
generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Invalid clause for column expansion");
@ -7885,6 +7890,7 @@ typedef struct SCheckColsFuncCxt {
bool hasColsFunc;
SNodeList** selectFuncList;
int32_t status;
ESqlClause clause;
} SCheckColsFuncCxt;
static bool isColsFuncByName(SFunctionNode* pFunc) {
@ -7958,6 +7964,16 @@ static EDealRes checkHasColsFunc(SNode** pNode, void* pContext){
return DEAL_RES_CONTINUE;
}
static bool isStarColumn(SNode* pNode) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SColumnNode* pCol = (SColumnNode*)pNode;
if (strcmp(pCol->colName, "*") == 0) {
return true;
}
}
return false;
}
static int32_t checkMultColsFuncParam(SNodeList* pParameterList) {
if (!pParameterList || pParameterList->length < 2) {
return TSDB_CODE_PAR_INVALID_COLS_FUNCTION;
@ -8003,7 +8019,7 @@ static EDealRes rewriteSingleColsFunc(SNode** pNode, void* pContext) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (isColsFuncByName(pFunc)) {
if(pFunc->pParameterList->length > 2) {
pCxt->status = TSDB_CODE_PAR_INVALID_COLS_SELECTFUNC;
pCxt->status = TSDB_CODE_PAR_INVALID_COLS_FUNCTION;
return DEAL_RES_ERROR;
}
SNode* pSelectFunc = nodesListGetNode(pFunc->pParameterList, 0);
@ -8013,6 +8029,11 @@ static EDealRes rewriteSingleColsFunc(SNode** pNode, void* pContext) {
parserError("%s Invalid cols function, the first parameter must be a select function", __func__);
return DEAL_RES_ERROR;
}
if (pCxt->clause != SQL_CLAUSE_SELECT && isStarColumn(pExpr)) {
pCxt->status = TSDB_CODE_PAR_INVALID_COLS_FUNCTION;
parserError("%s Invalid cols function, the parameters '*' is invalid.", __func__);
return DEAL_RES_ERROR;
}
if (pFunc->node.asAlias) {
if (((SExprNode*)pExpr)->asAlias) {
pCxt->status = TSDB_CODE_PAR_INVALID_COLS_ALIAS;
@ -8079,7 +8100,7 @@ static int32_t rewriteHavingColsNode(STranslateContext* pCxt, SNode** pNode, SNo
return code;
}
static int32_t rewriteColsFunction(STranslateContext* pCxt, SNodeList** nodeList, SNodeList** selectFuncList) {
static int32_t rewriteColsFunction(STranslateContext* pCxt, ESqlClause clause, SNodeList** nodeList, SNodeList** selectFuncList) {
int32_t code = TSDB_CODE_SUCCESS;
bool needRewrite = false;
SNode** pNode = NULL;
@ -8091,7 +8112,7 @@ static int32_t rewriteColsFunction(STranslateContext* pCxt, SNodeList** nodeList
}
needRewrite = true;
} else {
SCheckColsFuncCxt pSelectFuncCxt = {false, selectFuncList, TSDB_CODE_SUCCESS};
SCheckColsFuncCxt pSelectFuncCxt = {false, selectFuncList, TSDB_CODE_SUCCESS, clause};
nodesRewriteExpr(pNode, rewriteSingleColsFunc, &pSelectFuncCxt);
if (pSelectFuncCxt.status != TSDB_CODE_SUCCESS) {
return pSelectFuncCxt.status;
@ -8102,9 +8123,6 @@ static int32_t rewriteColsFunction(STranslateContext* pCxt, SNodeList** nodeList
SNodeList* pNewNodeList = NULL;
SNode* pNewNode = NULL;
if (needRewrite) {
if (pCxt->createStream) {
return TSDB_CODE_PAR_INVALID_COLS_FUNCTION;
}
code = nodesMakeList(&pNewNodeList);
if (NULL == pNewNodeList) {
return code;
@ -8149,6 +8167,12 @@ static int32_t rewriteColsFunction(STranslateContext* pCxt, SNodeList** nodeList
for (int i = 1; i < pFunc->pParameterList->length; ++i) {
SNode* pExpr = nodesListGetNode(pFunc->pParameterList, i);
if (clause != SQL_CLAUSE_SELECT && isStarColumn(pExpr)) {
code = TSDB_CODE_PAR_INVALID_COLS_FUNCTION;
parserError("%s Invalid cols function, the parameters '*' is invalid.", __func__);
goto _end;
}
code = nodesCloneNode(pExpr, &pNewNode);
if(TSDB_CODE_SUCCESS != code) goto _end;
if (nodesIsExprNode(pNewNode)) {

View File

@ -1802,6 +1802,7 @@ int stateKeyEncode(void* k, char* buf) {
int len = 0;
len += taosEncodeFixedU64((void**)&buf, key->key.groupId);
len += taosEncodeFixedI64((void**)&buf, key->key.ts);
len += taosEncodeFixedI32((void**)&buf, key->key.numInGroup);
len += taosEncodeFixedI64((void**)&buf, key->opNum);
return len;
}
@ -1811,6 +1812,7 @@ int stateKeyDecode(void* k, char* buf) {
char* p = buf;
p = taosDecodeFixedU64(p, &key->key.groupId);
p = taosDecodeFixedI64(p, &key->key.ts);
p = taosDecodeFixedI32(p, &key->key.numInGroup);
p = taosDecodeFixedI64(p, &key->opNum);
return p - buf;
}
@ -1911,6 +1913,7 @@ int winKeyEncode(void* k, char* buf) {
int len = 0;
len += taosEncodeFixedU64((void**)&buf, key->groupId);
len += taosEncodeFixedI64((void**)&buf, key->ts);
len += taosEncodeFixedI32((void**)&buf, key->numInGroup);
return len;
}
@ -1920,6 +1923,7 @@ int winKeyDecode(void* k, char* buf) {
char* p = buf;
p = taosDecodeFixedU64(p, &key->groupId);
p = taosDecodeFixedI64(p, &key->ts);
p = taosDecodeFixedI32(p, &key->numInGroup);
return len;
}
@ -3370,7 +3374,7 @@ void streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
int32_t* pVLen) {
if (!pCur) return -1;
SStateKey tkey;
SStateKey tkey = {0};
SStateKey* pKtmp = &tkey;
if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {

View File

@ -65,7 +65,7 @@ void *backendOpen() {
for (int32_t i = 0; i < size; i++) {
int64_t ts = taosGetTimestampMs();
SWinKey key; // = {.groupId = (uint64_t)(i), .ts = ts};
SWinKey key {0}; // = {.groupId = (uint64_t)(i), .ts = ts};
key.groupId = (uint64_t)(i);
key.ts = ts;
const char *val = "value data";

View File

@ -89,7 +89,7 @@ if $data10 != @ -> Merge (columns=3 width=24 input_order=unknown output_order=
return -1
endi
sql explain select count(*), last_row(f1), min(f1) from sta interval(1s);
if $data10 != @ -> Merge (columns=4 width=106 input_order=asc output_order=asc mode=sort)@ then
if $data10 != @ -> Merge (columns=4 width=122 input_order=asc output_order=asc mode=sort)@ then
return -1
endi
sql explain select distinct count(*), last_row(f1), min(f1) from tba1;

View File

@ -730,6 +730,9 @@ class TDTestCase:
tdSql.checkData(1, 0, 1)
tdSql.checkData(1, 1, 'c2')
self.condition_check(t1 != "", 1, 2, 'st2')
tdSql.error(f'select count(1), cols(last({col_name}),*) {t1} from {from_table} group by tbname order by cols(last({col_name}), *)')
#tdSql.query(f'select count(1), cols(last({col_name}),c2) {t1} from {from_table} group by tbname order by cols(last({col_name}), *)')
tdSql.query(f'select count(1), cols(last({col_name}),c2) {t1} from {from_table} group by tbname order by cols(last({col_name}), c2) desc')
tdSql.checkRows(2)
@ -1059,9 +1062,74 @@ class TDTestCase:
tdSql.checkData(0, 2, 1)
def stream_cols_test(self):
tdSql.error(f'CREATE STREAM last_col_s1 INTO last_col1 AS SELECT cols(last(ts), ts, c0) FROM meters PARTITION BY tbname INTERVAL(1s) SLIDING(1s);', TSDB_CODE_PAR_INVALID_COLS_FUNCTION)
tdSql.query(f'CREATE STREAM last_col_s INTO last_col AS SELECT last(ts), c0 FROM meters PARTITION BY tbname INTERVAL(1s) SLIDING(1s);')
tdSql.execute(f'CREATE STREAM last_col_s1 INTO {self.dbname}.last_col1 AS SELECT cols(last(ts), ts, c0) FROM {self.dbname}.meters PARTITION BY tbname INTERVAL(1s) SLIDING(1s);')
tdSql.execute(f'CREATE STREAM last_col_s2 INTO {self.dbname}.last_col2 AS SELECT last(ts), c0 FROM {self.dbname}.meters PARTITION BY tbname INTERVAL(1s) SLIDING(1s);')
tdSql.waitedQuery(f'show streams', 2, 10)
sleep(5)
tdSql.execute(f'insert into {self.dbname}.d0 values(1734574930000, 0, 1, NULL, NULL)')
tdSql.execute(f'insert into {self.dbname}.d0 values(1734574931000, 1, 1, NULL, NULL)')
tdSql.execute(f'insert into {self.dbname}.d0 values(1734574932000, 2, 2, NULL, NULL)')
tdSql.execute(f'insert into {self.dbname}.d0 values(1734574933000, 3, 3, NULL, NULL)')
tdSql.waitedQuery(f'select * from {self.dbname}.last_col2', 3, 10)
tdSql.waitedQuery(f'select * from {self.dbname}.last_col1', 3, 10)
tdSql.query(f'select * from {self.dbname}.last_col1')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 1734574930000)
tdSql.checkData(0, 1, 1734574930000)
tdSql.checkData(0, 2, 0)
tdSql.checkData(1, 0, 1734574931000)
tdSql.checkData(1, 1, 1734574931000)
tdSql.checkData(1, 2, 1)
tdSql.checkData(2, 0, 1734574932000)
tdSql.checkData(2, 1, 1734574932000)
tdSql.checkData(2, 2, 2)
def stream_cols_test2(self):
db2 = "test2"
tdSql.execute(f'create database {db2}')
tdSql.execute(f'create table {db2}.st (ts timestamp, c0 int) tags (t1 int)')
tdSql.execute(f'create table {db2}.st_1 using {db2}.st tags(1)')
tdSql.execute(f'create table {db2}.st_2 using {db2}.st tags(2)')
tdSql.execute(f'CREATE STREAM col1 INTO {db2}.colt1 AS SELECT cols(min(c0), ts min_ts, c0 min_c0), cols(max(c0), ts max_ts, c0 max_c0) FROM {db2}.st PARTITION BY tbname INTERVAL(1s) SLIDING(1s);')
tdSql.execute(f'CREATE STREAM col2 INTO {db2}.colt2 AS SELECT min(c0), max(c0) FROM {db2}.st PARTITION BY tbname INTERVAL(1s) SLIDING(1s);')
tdSql.waitedQuery(f'show streams', 4, 10)
time.sleep(5)
tdSql.execute(f'insert into {db2}.st_1 values(1734574930000, 0), (1734574930100, 1), (1734574930200, 2), (1734574930300, 3)')
tdSql.execute(f'insert into {db2}.st_1 values(1734574931000, 1), (1734574931100, 2), (1734574931200, 3), (1734574931300, 4)')
tdSql.execute(f'insert into {db2}.st_1 values(1734574932000, 2), (1734574932100, 3), (1734574932200, 4), (1734574932300, 5)')
tdSql.execute(f'insert into {db2}.st_1 values(1734574933000, 3), (1734574933100, 4), (1734574933200, 5), (1734574933300, 6)')
tdSql.waitedQuery(f'select * from {db2}.colt2', 3, 10)
tdSql.query(f'select * from {db2}.colt2')
tdSql.checkRows(3)
tdSql.checkData(0, 1, 0)
tdSql.checkData(0, 2, 3)
tdSql.checkData(1, 1, 1)
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 1, 2)
tdSql.checkData(2, 2, 5)
tdSql.query(f"select * from {db2}.colt1")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 1734574930000)
tdSql.checkData(0, 2, 0)
tdSql.checkData(0, 3, 1734574930300)
tdSql.checkData(0, 4, 3)
tdSql.checkData(1, 1, 1734574931000)
tdSql.checkData(1, 2, 1)
tdSql.checkData(1, 3, 1734574931300)
tdSql.checkData(1, 4, 4)
tdSql.checkData(2, 1, 1734574932000)
tdSql.checkData(2, 2, 2)
tdSql.checkData(2, 3, 1734574932300)
tdSql.checkData(2, 4, 5)
def include_null_test(self):
tdSql.execute(f'insert into {self.dbname}.d0 values(1734574929010, 0, NULL, NULL, NULL)')
tdSql.execute(f'insert into {self.dbname}.d0 values(1734574929011, NULL, 1, NULL, NULL)')
@ -1243,6 +1311,25 @@ class TDTestCase:
tdSql.checkData(0, 1, 1734574929000)
tdSql.checkData(1, 0, 'd0')
tdSql.checkData(1, 1, 1734574929014)
def star_test(self):
tdLog.info("star_test")
tdSql.query(f'select tbname, cols(last(ts), *) from test.meters group by tbname having cols(last(ts), ts) > 1734574929000')
tdSql.checkRows(1)
tdSql.checkCols(7)
tdSql.checkData(0, 0, 'd0')
tdSql.checkData(0, 1, 1734574929014)
tdSql.query(f'select tbname, cols(last(ts), *) from test.meters group by tbname having cols(last(ts), ts) = 1734574929000')
tdSql.checkRows(1)
tdSql.checkCols(7)
tdSql.checkData(0, 0, 'd1')
tdSql.checkData(0, 1, 1734574929000)
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, 1)
tdSql.checkData(0, 4, 'c2')
tdSql.checkData(0, 5, True)
tdSql.error(f'select tbname, cols(last(ts), *) from test.meters group by tbname having cols(last(ts), *) = 1734574929000')
def test_null2(self):
dbname = "test_null2"
@ -1311,15 +1398,17 @@ class TDTestCase:
self.subquery_test()
self.window_test()
self.join_test()
self.stream_cols_test()
self.test_in_interval()
self.include_null_test()
self.long_column_name_test()
self.having_test("test.meters", False)
self.having_test("(select tbname, * from test.meters)", True)
self.star_test()
self.test_null2()
self.window_test2()
self.stream_cols_test()
self.stream_cols_test2()
def stop(self):