Код: Выделить всё
@testing:case="test_duckdb_load_to_dfs_with_transform"
-- A. Prepare DFS table
dbPath = "dfs://duck_test_db"
if(existsDatabase(dbPath)) dropDatabase(dbPath)
db = database(dbPath, VALUE, 2023.01.01..2023.01.10)
dummy = table(1:0, `ts`val, [TIMESTAMP, DOUBLE])
pt = db.createPartitionedTable(dummy, "pt", "ts")
-- B. Prepare data source (generate 100,000 rows in DuckDB in-memory table)
conn = duckdb::connect("")
duckdb::execute(conn, "CREATE TABLE source AS SELECT
CAST('2023-01-01 00:00:00' AS TIMESTAMP) + INTERVAL (range) SECOND as ts,
CAST(random() AS DOUBLE) as val
FROM range(100000)")
-- C. Define Transform function (add 100 to val)
def myTrans(mutable t) {
return select ts, val+100 from t
}
-- D. Execute load
rowCount = duckdb::load(conn, "SELECT * FROM source", pt, 10000, myTrans)
assert 1, rowCount == 100000
assert 2, (exec count(*) from pt) == 100000
Код: Выделить всё
rowCount = duckdb::load(conn, "SELECT * FROM source", pt, 10000, myTrans) => Append Error: Can't append data to a segmented table that contains external partitions.
Код: Выделить всё
extern "C" __declspec(dllexport) ConstantSP duckdbLoad(Heap* heap, vector& args) {
if (args.size() < 3) throw IllegalArgumentException("duckdb::load", "Usage: load(conn, sql, destTable, [batchSize], [transform])");
DuckDBConn* wrapper = (DuckDBConn*)args[0].get();
duckdb_connection conn = wrapper->getConn();
string queryOrTable = args[1]->getString();
TableSP destTable = (TableSP)args[2];
int batchSize = (args.size() >= 4 && !args[3]->isNull()) ? args[3]->getInt() : 65536;
FunctionDefSP transform = (args.size() >= 5 && !args[4]->isNull()) ? (FunctionDefSP)args[4] : nullptr;
string sql = queryOrTable;
if (queryOrTable.find_first_of(" \t\n\r") == string::npos) {
sql = "SELECT * FROM " + queryOrTable;
}
duckdb_prepared_statement stmt;
if (duckdb_prepare(conn, sql.c_str(), &stmt) == DuckDBError) {
string err = duckdb_prepare_error(stmt);
duckdb_destroy_prepare(&stmt);
throw RuntimeException(err);
}
duckdb_result result;
if (duckdb_execute_prepared(stmt, &result) == DuckDBError) {
string err = duckdb_result_error(&result);
duckdb_destroy_result(&result);
duckdb_destroy_prepare(&stmt);
throw RuntimeException(err);
}
int colCount = (int)duckdb_column_count(&result);
vector colNames;
for(int i=0; iresize(v->size() + chunkRows);
fillDolphinVector(duckdb_data_chunk_get_vector(chunk, i), v, (idx_t)chunkRows);
}
accumulatedRows += chunkRows;
if (accumulatedRows >= batchSize) {
TableSP batchTable = Util::createTable(colNames, batchCols);
if (!transform.isNull()) {
vector tArgs = {batchTable};
batchTable = transform->call(heap, tArgs);
}
vector toAppend;
for(int i=0; icolumns(); ++i) toAppend.push_back(batchTable->getColumn(i));
int inserted; string errMsg;
if (!destTable->append(toAppend, inserted, errMsg)) {
duckdb_destroy_data_chunk(&chunk);
duckdb_destroy_result(&result);
duckdb_destroy_prepare(&stmt);
throw RuntimeException("Append Error: " + errMsg);
}
totalInserted += inserted;
accumulatedRows = 0;
batchCols = createBatchCols(batchSize);
}
duckdb_destroy_data_chunk(&chunk);
}
if (accumulatedRows > 0) {
TableSP batchTable = Util::createTable(colNames, batchCols);
if (!transform.isNull()) {
vector tArgs = {batchTable};
batchTable = transform->call(heap, tArgs);
}
vector toAppend;
for(int i=0; icolumns(); ++i) toAppend.push_back(batchTable->getColumn(i));
int inserted; string errMsg;
if (destTable->append(toAppend, inserted, errMsg)) {
totalInserted += inserted;
}
}
duckdb_destroy_result(&result);
duckdb_destroy_prepare(&stmt);
return new Long(totalInserted);
}
Код: Выделить всё
destTable->append(heap, toAppend, inserted, errMsg)
Подробнее здесь: https://stackoverflow.com/questions/799 ... ata-to-dfs