Функция плагина DolphinDB не может записать данные в DFSC++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Функция плагина DolphinDB не может записать данные в DFS

Сообщение Anonymous »

Я разрабатываю плагин DuckDB для DolphinDB и имею следующий тестовый пример:

Код: Выделить всё

@testing:case="test_duckdb_load_to_dfs_with_transform"
-- A. Prepare DFS table
dbPath = "dfs://duck_test_db"
if(existsDatabase(dbPath)) dropDatabase(dbPath)
db = database(dbPath, VALUE, 2023.01.01..2023.01.10)
dummy = table(1:0, `ts`val, [TIMESTAMP, DOUBLE])
pt = db.createPartitionedTable(dummy, "pt", "ts")

-- B. Prepare data source (generate 100,000 rows in DuckDB in-memory table)
conn = duckdb::connect("")
duckdb::execute(conn, "CREATE TABLE source AS SELECT
CAST('2023-01-01 00:00:00' AS TIMESTAMP) + INTERVAL (range) SECOND as ts,
CAST(random() AS DOUBLE) as val
FROM range(100000)")

-- C. Define Transform function (add 100 to val)
def myTrans(mutable t) {
return select ts, val+100 from t
}

-- D.  Execute load
rowCount = duckdb::load(conn, "SELECT * FROM source", pt, 10000, myTrans)
assert 1, rowCount == 100000
assert 2, (exec count(*) from pt) == 100000
При выполнении кода я получаю следующую ошибку:

Код: Выделить всё

rowCount = duckdb::load(conn, "SELECT * FROM source", pt, 10000, myTrans) => Append Error: Can't append data to a segmented table that contains external partitions.
Код функции моего плагина C++ выглядит следующим образом:

Код: Выделить всё

extern "C" __declspec(dllexport) ConstantSP duckdbLoad(Heap* heap, vector& args) {
if (args.size() < 3) throw IllegalArgumentException("duckdb::load", "Usage: load(conn, sql, destTable, [batchSize], [transform])");

DuckDBConn* wrapper = (DuckDBConn*)args[0].get();
duckdb_connection conn = wrapper->getConn();
string queryOrTable = args[1]->getString();
TableSP destTable = (TableSP)args[2];
int batchSize = (args.size() >= 4 && !args[3]->isNull()) ? args[3]->getInt() : 65536;
FunctionDefSP transform = (args.size() >= 5 && !args[4]->isNull()) ? (FunctionDefSP)args[4] : nullptr;

string sql = queryOrTable;
if (queryOrTable.find_first_of(" \t\n\r") == string::npos) {
sql = "SELECT * FROM " + queryOrTable;
}

duckdb_prepared_statement stmt;
if (duckdb_prepare(conn, sql.c_str(), &stmt) == DuckDBError) {
string err = duckdb_prepare_error(stmt);
duckdb_destroy_prepare(&stmt);
throw RuntimeException(err);
}

duckdb_result result;
if (duckdb_execute_prepared(stmt, &result) == DuckDBError) {
string err = duckdb_result_error(&result);
duckdb_destroy_result(&result);
duckdb_destroy_prepare(&stmt);
throw RuntimeException(err);
}

int colCount = (int)duckdb_column_count(&result);
vector colNames;
for(int i=0; iresize(v->size() + chunkRows);
fillDolphinVector(duckdb_data_chunk_get_vector(chunk, i), v, (idx_t)chunkRows);
}
accumulatedRows += chunkRows;

if (accumulatedRows >= batchSize) {
TableSP batchTable = Util::createTable(colNames, batchCols);
if (!transform.isNull()) {
vector tArgs = {batchTable};
batchTable = transform->call(heap, tArgs);
}

vector toAppend;
for(int i=0; icolumns(); ++i) toAppend.push_back(batchTable->getColumn(i));

int inserted; string errMsg;
if (!destTable->append(toAppend, inserted, errMsg)) {
duckdb_destroy_data_chunk(&chunk);
duckdb_destroy_result(&result);
duckdb_destroy_prepare(&stmt);
throw RuntimeException("Append Error: " + errMsg);
}
totalInserted += inserted;
accumulatedRows = 0;
batchCols = createBatchCols(batchSize);
}
duckdb_destroy_data_chunk(&chunk);
}

if (accumulatedRows > 0) {
TableSP batchTable = Util::createTable(colNames, batchCols);
if (!transform.isNull()) {
vector tArgs = {batchTable};
batchTable = transform->call(heap, tArgs);
}
vector toAppend;
for(int i=0; icolumns();  ++i) toAppend.push_back(batchTable->getColumn(i));
int inserted; string errMsg;
if (destTable->append(toAppend, inserted, errMsg)) {
totalInserted += inserted;
}
}

duckdb_destroy_result(&result);
duckdb_destroy_prepare(&stmt);
return new Long(totalInserted);
}
Я определил, что ошибка возникает из строки destTable->append(toAppend, Inserted, errMsg), поэтому я также попробовал заменить ее на:

Код: Выделить всё

destTable->append(heap, toAppend, inserted, errMsg)
Но это не дало никакого эффекта. Как мне изменить код, чтобы решить эту проблему?

Подробнее здесь: https://stackoverflow.com/questions/799 ... ata-to-dfs
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»