Строки, не дедуплицированные в таблице потоков с ключами при записи из двух путей Python с совпадающими первичными ключаPython

Программы на Python
Ответить
Anonymous
 Строки, не дедуплицированные в таблице потоков с ключами при записи из двух путей Python с совпадающими первичными ключа

Сообщение Anonymous »

Я записываю данные K-линии в реальном времени в таблицу потоков с ключами DolphinDB из Python с символом и unixTime в качестве составного ключа, но после запроса я продолжаю видеть повторяющиеся строки.
Настройка таблицы (DolphinDB 2.00.16.1):

Код: Выделить всё

colNames = `symbol`exchange`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
colTypes = [SYMBOL,SYMBOL,DATE,DATE,TIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG]

enableTableShareAndPersistence(
keyedStreamTable(`symbol`unixTime, 100000:0, colNames, colTypes),
`stock_candle_stream, cacheSize=80000
)
Данные поступают из Python по двум отдельным путям записи.
Пакетная запись — полный DataFrame помещается за один раз:

Код: Выделить всё

self.session.run("tableInsert{objByName('" + stream_table + "')}", data)
Построчная запись — каждый столбец вставляется отдельно:

Код: Выделить всё

self.writer.insert(
bar.symbol,
mapp.get(bar.exchange, ''),
day,
bar.datetime.date(),
_time,
round(bar.open_price, 2),
round(bar.high_price, 2),
round(bar.low_price, 2),
round(bar.close_price, 2),
int(bar.volume),
bar.amount,
int(bar.datetime.timestamp())
)
Значения символа и unixTime в обоих путях должны совпадать — я зарегистрировал их на стороне Python, и значения кажутся идентичными, оба типа LONG. Тем не менее, строки с одним и тем же составным ключом по-прежнему отображаются в таблице потока как дубликаты.
Я попробовал переключиться с EnableTableShareAndCachePurge на EnableTableShareAndPersistence — тот же результат. Я также попробовал дату + время в качестве составного ключа вместо unixTime — все равно дублируется.
Вопрос: оба пути записи создают то, что выглядит как одинаковые значения ключей для одних и тех же типов данных, но дедупликация не срабатывает. Что может быть причиной этого?

Воспроизводимый пример (сторона DolphinDB)
Основная проблема это несоответствие точности временных меток между двумя путями. Запустите приведенный ниже сценарий в графическом интерфейсе DolphinDB или консоли dolphindb, чтобы убедиться в этом самостоятельно.

Код: Выделить всё

// =====================================================
// Reproducible example: keyedStreamTable dedup failure
// =====================================================

// 1. Clean slate
undef(`stock_candle_stream, SHARED)

// 2. Create the same table structure
colNames = `symbol`exchange`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
colTypes = [SYMBOL,SYMBOL,DATE,DATE,TIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG]

share keyedStreamTable(`symbol`unixTime, 100000:0, colNames, colTypes) as stock_candle_stream

// 3. Seed PRNG for deterministic output
setRandomSeed(42)

// 4. Generate 5 rows of sample data with millisecond-level unixTime
//    (simulating the batch-write path: pandas preserves ms precision, 13 digits)
unixTime_base_ms = 1746000000000L
batchData = table(
take(`AAPL, 5) as symbol,
take(`NASDAQ, 5) as exchange,
take(2026.05.18, 5) as tradingDay,
take(2026.05.18, 5) as date,
take(10:00:00, 5) + 0..4 as time,
rand(100.0, 5) as open,
rand(100.0, 5) as high,
rand(100.0, 5) as low,
rand(100.0, 5) as close,
rand(10000, 5) as volume,
rand(1000000.0, 5) as turnover,
unixTime_base_ms + 0..4 * 60000 as unixTime
)

// Insert batch data — 5 rows, 13-digit unixTime
stock_candle_stream.append!(batchData)

// 5. Generate "same" bars but with second-level unixTime
//    (simulating the row-by-row path: int(timestamp()) → 10 digits)
unixTime_base_s = 1746000000L
rowData = table(
take(`AAPL, 5) as symbol,
take(`NASDAQ, 5) as exchange,
take(2026.05.18, 5) as tradingDay,
take(2026.05.18, 5) as date,
take(10:00:00, 5) + 0..4 as time,
rand(100.0, 5) as open,
rand(100.0, 5) as high,
rand(100.0, 5) as low,
rand(100.0, 5) as close,
rand(10000, 5) as volume,
rand(1000000.0, 5) as turnover,
unixTime_base_s + 0..4 * 60 as unixTime
)

// Insert row-by-row data — 5 rows, 10-digit unixTime
stock_candle_stream.append!(rowData)

// 6.  Count rows — you get 10 instead of 5!
//    Even though each pair of rows represents the "same" bar,
//    the unixTime values differ by a factor of 1000.
select count(*) from stock_candle_stream
// Expected: 10 (5 batch + 5 row-by-row, none deduplicated)

// 7. Inspect the duplicates — compare unixTime values side by side
select symbol, time, open, high, low, close, unixTime,
strFormat(unixTime) as unixTime_raw
from stock_candle_stream
where symbol = `AAPL
order by time, unixTime
// Notice: unixTime shows 1746000000 vs 1746000000000 for the same bar

// =====================================================
// Verification: consistent precision works correctly
// =====================================================

// Insert another bar with the SAME unixTime twice — dedup works
unixTime_test = 1746000060000L
insert into stock_candle_stream values(`AAPL, `NASDAQ, 2026.05.18, 2026.05.18, 10:01:00, 101.0, 102.0, 99.5, 101.5, 8500, 862750.0, unixTime_test)
insert into stock_candle_stream values(`AAPL, `NASDAQ, 2026.05.18, 2026.05.18, 10:01:00, 101.0, 102.0, 99.5, 101.5, 8500, 862750.0, unixTime_test)

// Only 1 row inserted (second insert silently dropped)
select symbol, time, unixTime from stock_candle_stream where unixTime = 1746000060000L
// Expected: 1 row
Воспроизводимый пример (сторона Python)
Для читателей, которые хотят воспроизвести полный поток Python → DolphinDB:

Код: Выделить всё

import numpy as np
import dolphindb as ddb

s = ddb.session()
s.connect("localhost", 8848)

# Run the DolphinDB table creation script first, then:

# Path A — batch write with millisecond timestamps (13 digits)
np.random.seed(42)
n = 5
unixTime_ms = [1746000000000 + i * 60000 for i in range(n)]

s.run("""
data = table(
take(`AAPL, {n}) as symbol,
take(`NASDAQ, {n}) as exchange,
take(2026.05.18, {n}) as tradingDay,
take(2026.05.18, {n}) as date,
take(10:00:00, {n}) + 0..{e} as time,
{open} as open,
{high} as high,
{low} as low,
{close} as close,
{volume} as volume,
{turnover} as turnover,
{unixTime} as unixTime
)
""".format(
n=n, e=n-1,
open=list(np.random.uniform(100, 101, n).round(2)),
high=list(np.random.uniform(101, 102, n).round(2)),
low=list(np.random.uniform(99, 100, n).round(2)),
close=list(np.random.uniform(100, 101, n).round(2)),
volume=list(np.random.randint(5000, 15000, n)),
turnover=list(np.random.uniform(500000, 1500000, n).round(2)),
unixTime=unixTime_ms,
))

s.run("stock_candle_stream.append!(data)")

# Path B — row-by-row write with second timestamps (10 digits)
unixTime_s = [1746000000 + i * 60 for i in range(n)]

for i in range(n):
s.run("""
insert into stock_candle_stream
values(`AAPL, `NASDAQ, 2026.05.18, 2026.05.18, 10:00:00,
{o}, {h}, {l}, {c}, {v}, {t}, {u})
""".format(
o=np.round(np.random.uniform(100, 101), 2),
h=np.round(np.random.uniform(101, 102), 2),
l=np.round(np.random.uniform(99, 100), 2),
c=np.round(np.random.uniform(100, 101), 2),
v=int(np.random.randint(5000, 15000)),
t=np.round(np.random.uniform(500000, 1500000), 2),
u=unixTime_s[i],
))

# Check result — should be 10 rows instead of 5
print(s.run("select count(*) from stock_candle_stream"))
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»