Anonymous
Функция плагина DolphinDB не может записать данные в DFS
Сообщение
Anonymous » 03 мар 2026, 16:48
Я разрабатываю плагин DuckDB для DolphinDB и имею следующий тестовый пример:
Код: Выделить всё
@testing:case="test_duckdb_load_to_dfs_with_transform"
-- A. Prepare DFS table
dbPath = "dfs://duck_test_db"
if(existsDatabase(dbPath)) dropDatabase(dbPath)
db = database(dbPath, VALUE, 2023.01.01..2023.01.10)
dummy = table(1:0, `ts`val, [TIMESTAMP, DOUBLE])
pt = db.createPartitionedTable(dummy, "pt", "ts")
-- B. Prepare data source (generate 100,000 rows in DuckDB in-memory table)
conn = duckdb::connect("")
duckdb::execute(conn, "CREATE TABLE source AS SELECT
CAST('2023-01-01 00:00:00' AS TIMESTAMP) + INTERVAL (range) SECOND as ts,
CAST(random() AS DOUBLE) as val
FROM range(100000)")
-- C. Define Transform function (add 100 to val)
def myTrans(mutable t) {
return select ts, val+100 from t
}
-- D. Execute load
rowCount = duckdb::load(conn, "SELECT * FROM source", pt, 10000, myTrans)
assert 1, rowCount == 100000
assert 2, (exec count(*) from pt) == 100000
При выполнении кода я получаю следующую ошибку:
Код: Выделить всё
rowCount = duckdb::load(conn, "SELECT * FROM source", pt, 10000, myTrans) => Append Error: Can't append data to a segmented table that contains external partitions.
Код функции моего плагина C++ выглядит следующим образом:
Код: Выделить всё
extern "C" __declspec(dllexport) ConstantSP duckdbLoad(Heap* heap, vector& args) {
if (args.size() < 3) throw IllegalArgumentException("duckdb::load", "Usage: load(conn, sql, destTable, [batchSize], [transform])");
DuckDBConn* wrapper = (DuckDBConn*)args[0].get();
duckdb_connection conn = wrapper->getConn();
string queryOrTable = args[1]->getString();
TableSP destTable = (TableSP)args[2];
int batchSize = (args.size() >= 4 && !args[3]->isNull()) ? args[3]->getInt() : 65536;
FunctionDefSP transform = (args.size() >= 5 && !args[4]->isNull()) ? (FunctionDefSP)args[4] : nullptr;
string sql = queryOrTable;
if (queryOrTable.find_first_of(" \t\n\r") == string::npos) {
sql = "SELECT * FROM " + queryOrTable;
}
duckdb_prepared_statement stmt;
if (duckdb_prepare(conn, sql.c_str(), &stmt) == DuckDBError) {
string err = duckdb_prepare_error(stmt);
duckdb_destroy_prepare(&stmt);
throw RuntimeException(err);
}
duckdb_result result;
if (duckdb_execute_prepared(stmt, &result) == DuckDBError) {
string err = duckdb_result_error(&result);
duckdb_destroy_result(&result);
duckdb_destroy_prepare(&stmt);
throw RuntimeException(err);
}
int colCount = (int)duckdb_column_count(&result);
vector colNames;
for(int i=0; iresize(v->size() + chunkRows);
fillDolphinVector(duckdb_data_chunk_get_vector(chunk, i), v, (idx_t)chunkRows);
}
accumulatedRows += chunkRows;
if (accumulatedRows >= batchSize) {
TableSP batchTable = Util::createTable(colNames, batchCols);
if (!transform.isNull()) {
vector tArgs = {batchTable};
batchTable = transform->call(heap, tArgs);
}
vector toAppend;
for(int i=0; icolumns(); ++i) toAppend.push_back(batchTable->getColumn(i));
int inserted; string errMsg;
if (!destTable->append(toAppend, inserted, errMsg)) {
duckdb_destroy_data_chunk(&chunk);
duckdb_destroy_result(&result);
duckdb_destroy_prepare(&stmt);
throw RuntimeException("Append Error: " + errMsg);
}
totalInserted += inserted;
accumulatedRows = 0;
batchCols = createBatchCols(batchSize);
}
duckdb_destroy_data_chunk(&chunk);
}
if (accumulatedRows > 0) {
TableSP batchTable = Util::createTable(colNames, batchCols);
if (!transform.isNull()) {
vector tArgs = {batchTable};
batchTable = transform->call(heap, tArgs);
}
vector toAppend;
for(int i=0; icolumns(); ++i) toAppend.push_back(batchTable->getColumn(i));
int inserted; string errMsg;
if (destTable->append(toAppend, inserted, errMsg)) {
totalInserted += inserted;
}
}
duckdb_destroy_result(&result);
duckdb_destroy_prepare(&stmt);
return new Long(totalInserted);
}
Я определил, что ошибка возникает из строки destTable->append(toAppend, Inserted, errMsg), поэтому я также попробовал заменить ее на:
Код: Выделить всё
destTable->append(heap, toAppend, inserted, errMsg)
Но это не дало никакого эффекта. Как мне изменить код, чтобы решить эту проблему?
Подробнее здесь:
https://stackoverflow.com/questions/799 ... ata-to-dfs
1772545680
Anonymous
Я разрабатываю плагин DuckDB для DolphinDB и имею следующий тестовый пример: [code]@testing:case="test_duckdb_load_to_dfs_with_transform" -- A. Prepare DFS table dbPath = "dfs://duck_test_db" if(existsDatabase(dbPath)) dropDatabase(dbPath) db = database(dbPath, VALUE, 2023.01.01..2023.01.10) dummy = table(1:0, `ts`val, [TIMESTAMP, DOUBLE]) pt = db.createPartitionedTable(dummy, "pt", "ts") -- B. Prepare data source (generate 100,000 rows in DuckDB in-memory table) conn = duckdb::connect("") duckdb::execute(conn, "CREATE TABLE source AS SELECT CAST('2023-01-01 00:00:00' AS TIMESTAMP) + INTERVAL (range) SECOND as ts, CAST(random() AS DOUBLE) as val FROM range(100000)") -- C. Define Transform function (add 100 to val) def myTrans(mutable t) { return select ts, val+100 from t } -- D. Execute load rowCount = duckdb::load(conn, "SELECT * FROM source", pt, 10000, myTrans) assert 1, rowCount == 100000 assert 2, (exec count(*) from pt) == 100000 [/code] При выполнении кода я получаю следующую ошибку: [code]rowCount = duckdb::load(conn, "SELECT * FROM source", pt, 10000, myTrans) => Append Error: Can't append data to a segmented table that contains external partitions. [/code] Код функции моего плагина C++ выглядит следующим образом: [code]extern "C" __declspec(dllexport) ConstantSP duckdbLoad(Heap* heap, vector& args) { if (args.size() < 3) throw IllegalArgumentException("duckdb::load", "Usage: load(conn, sql, destTable, [batchSize], [transform])"); DuckDBConn* wrapper = (DuckDBConn*)args[0].get(); duckdb_connection conn = wrapper->getConn(); string queryOrTable = args[1]->getString(); TableSP destTable = (TableSP)args[2]; int batchSize = (args.size() >= 4 && !args[3]->isNull()) ? args[3]->getInt() : 65536; FunctionDefSP transform = (args.size() >= 5 && !args[4]->isNull()) ? (FunctionDefSP)args[4] : nullptr; string sql = queryOrTable; if (queryOrTable.find_first_of(" \t\n\r") == string::npos) { sql = "SELECT * FROM " + queryOrTable; } duckdb_prepared_statement stmt; if (duckdb_prepare(conn, sql.c_str(), &stmt) == DuckDBError) { string err = duckdb_prepare_error(stmt); duckdb_destroy_prepare(&stmt); throw RuntimeException(err); } duckdb_result result; if (duckdb_execute_prepared(stmt, &result) == DuckDBError) { string err = duckdb_result_error(&result); duckdb_destroy_result(&result); duckdb_destroy_prepare(&stmt); throw RuntimeException(err); } int colCount = (int)duckdb_column_count(&result); vector colNames; for(int i=0; iresize(v->size() + chunkRows); fillDolphinVector(duckdb_data_chunk_get_vector(chunk, i), v, (idx_t)chunkRows); } accumulatedRows += chunkRows; if (accumulatedRows >= batchSize) { TableSP batchTable = Util::createTable(colNames, batchCols); if (!transform.isNull()) { vector tArgs = {batchTable}; batchTable = transform->call(heap, tArgs); } vector toAppend; for(int i=0; icolumns(); ++i) toAppend.push_back(batchTable->getColumn(i)); int inserted; string errMsg; if (!destTable->append(toAppend, inserted, errMsg)) { duckdb_destroy_data_chunk(&chunk); duckdb_destroy_result(&result); duckdb_destroy_prepare(&stmt); throw RuntimeException("Append Error: " + errMsg); } totalInserted += inserted; accumulatedRows = 0; batchCols = createBatchCols(batchSize); } duckdb_destroy_data_chunk(&chunk); } if (accumulatedRows > 0) { TableSP batchTable = Util::createTable(colNames, batchCols); if (!transform.isNull()) { vector tArgs = {batchTable}; batchTable = transform->call(heap, tArgs); } vector toAppend; for(int i=0; icolumns(); ++i) toAppend.push_back(batchTable->getColumn(i)); int inserted; string errMsg; if (destTable->append(toAppend, inserted, errMsg)) { totalInserted += inserted; } } duckdb_destroy_result(&result); duckdb_destroy_prepare(&stmt); return new Long(totalInserted); } [/code] Я определил, что ошибка возникает из строки destTable->append(toAppend, Inserted, errMsg), поэтому я также попробовал заменить ее на: [code]destTable->append(heap, toAppend, inserted, errMsg) [/code] Но это не дало никакого эффекта. Как мне изменить код, чтобы решить эту проблему? Подробнее здесь: [url]https://stackoverflow.com/questions/79900174/dolphindb-plugin-function-fails-to-write-data-to-dfs[/url]