Почему я могу подписаться только на те данные, которые существовали до создания темы в Apache IoTDB, но не на новые даннJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Почему я могу подписаться только на те данные, которые существовали до создания темы в Apache IoTDB, но не на новые данн

Сообщение Anonymous »

Я использую функцию подписки на данные в IoTDB 1.3.4, но столкнулся с проблемой, из-за которой я могу подписаться только на те данные, которые существовали до создания темы, и не могу получать новые вставленные данные.
Структура данных измерений следующая:

Код: Выделить всё

  CREATE DATABASE `root.sg`;
CREATE DEVICE TEMPLATE t1 aligned (lat FLOAT, lng FLOAT);
SET DEVICE TEMPLATE t1 TO `root.sg`;
CREATE TIMESERIES USING DEVICE TEMPLATE ON `root.sg.hk.d1`;
CREATE TOPIC tpc1 WITH ('path' = '`root.sg`.**');
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (1,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (2,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (3,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (4,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (5,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (6,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (7,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (8,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (9,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (10,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (11,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values s (12,30.2,122.2);

Исходный код подписки выглядит следующим образом:

Код: Выделить всё

@SneakyThrows
private static void pullTest(String topic) {
int retryCount = 0;
// Subscription: property-style ctor
final Properties config = new Properties();
config.put(ConsumerConstant.HOST_KEY, "192.168.**.**");
config.put(ConsumerConstant.PORT_KEY, 6667);
config.put(ConsumerConstant.USERNAME_KEY,"root");
config.put(ConsumerConstant.PASSWORD_KEY, "root");
config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg2");
config.put(ConsumerConstant.CONSUMER_ID_KEY, "c2");
try (final SubscriptionTreePullConsumer consumer = new SubscriptionTreePullConsumer(config)){
consumer.open();
consumer.subscribe(topic);
while (true) {
final List messages = consumer.poll(10_000L);
if (messages.isEmpty() {
retryCount++;
if (retryCount >= Integer.MAX_VALUE) {
break;
}
}
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()){
System.out.println(dataSet.getColumnNames());
System.out.println(dataSet.getColumnTypes());
while (dataSet.hasNext()){
System.out.println(dataSet.next();
}
}
}
// Auto commit
}
}
}

public static void main(String[] args) {
pullTest("tpc1");
}
Сначала было вставлено 10 записей, а затем дополнительные данные были вставлены через SQL. Однако программа Java не смогла подписаться на новые данные (записи с отметкой времени 11 и 12).
Вывод:

Код: Выделить всё

  10:35:29.740 [main] INFO org.apache.iotdb.session.subscription.consumer.base. SubscriptionExecutorServiceManager -- Launching SubscriptionDownStreamDataFlowExecutor with core pool size 10\...
[`Time`, `root.sg.hk.d1.lat`, `root.sg.hk.d1.lng`]
[INT64, FLOAT, FLOAT]
1 30.2 122.2
2 30.2 122.2
3 30.2 122.2
4 30.2 122.2
5 30.2 122.2
6 30.2 122.2
7 30.2 122.2
8 30.2 122.2
9 30.2 122.2
10 30.2 122.2
Возникли ли проблемы с моим Java-кодом? Как я могу его изменить?


Подробнее здесь: https://stackoverflow.com/questions/797 ... c-in-apach
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»