Я использую функцию подписки на данные в IoTDB 1.3.4, но столкнулся с проблемой, из-за которой я могу подписаться только на те данные, которые существовали до создания темы, и не могу получать новые вставленные данные.
Структура данных измерений следующая:
CREATE DATABASE `root.sg`;
CREATE DEVICE TEMPLATE t1 aligned (lat FLOAT, lng FLOAT);
SET DEVICE TEMPLATE t1 TO `root.sg`;
CREATE TIMESERIES USING DEVICE TEMPLATE ON `root.sg.hk.d1`;
CREATE TOPIC tpc1 WITH ('path' = '`root.sg`.**');
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (1,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (2,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (3,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (4,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (5,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (6,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (7,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (8,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (9,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (10,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (11,30.2,122.2);
insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values s (12,30.2,122.2);
@SneakyThrows
private static void pullTest(String topic) {
int retryCount = 0;
// Subscription: property-style ctor
final Properties config = new Properties();
config.put(ConsumerConstant.HOST_KEY, "192.168.**.**");
config.put(ConsumerConstant.PORT_KEY, 6667);
config.put(ConsumerConstant.USERNAME_KEY,"root");
config.put(ConsumerConstant.PASSWORD_KEY, "root");
config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg2");
config.put(ConsumerConstant.CONSUMER_ID_KEY, "c2");
try (final SubscriptionTreePullConsumer consumer = new SubscriptionTreePullConsumer(config)){
consumer.open();
consumer.subscribe(topic);
while (true) {
final List messages = consumer.poll(10_000L);
if (messages.isEmpty() {
retryCount++;
if (retryCount >= Integer.MAX_VALUE) {
break;
}
}
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()){
System.out.println(dataSet.getColumnNames());
System.out.println(dataSet.getColumnTypes());
while (dataSet.hasNext()){
System.out.println(dataSet.next();
}
}
}
// Auto commit
}
}
}
public static void main(String[] args) {
pullTest("tpc1");
}
Сначала было вставлено 10 записей, а затем дополнительные данные были вставлены через SQL. Однако программа Java не смогла подписаться на новые данные (записи с отметкой времени 11 и 12).
Вывод:
Я использую функцию подписки на данные в IoTDB 1.3.4, но столкнулся с проблемой, из-за которой я могу подписаться только на те данные, которые существовали до создания темы, и не могу получать новые вставленные данные. Структура данных измерений следующая: [code] CREATE DATABASE `root.sg`; CREATE DEVICE TEMPLATE t1 aligned (lat FLOAT, lng FLOAT); SET DEVICE TEMPLATE t1 TO `root.sg`; CREATE TIMESERIES USING DEVICE TEMPLATE ON `root.sg.hk.d1`; CREATE TOPIC tpc1 WITH ('path' = '`root.sg`.**'); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (1,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (2,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (3,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (4,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (5,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (6,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (7,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (8,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (9,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (10,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values (11,30.2,122.2); insert into `root.sg.hk.d1`(timestamp, lat, lng) aligned values s (12,30.2,122.2);
[/code] Исходный код подписки выглядит следующим образом: [code]@SneakyThrows private static void pullTest(String topic) { int retryCount = 0; // Subscription: property-style ctor final Properties config = new Properties(); config.put(ConsumerConstant.HOST_KEY, "192.168.**.**"); config.put(ConsumerConstant.PORT_KEY, 6667); config.put(ConsumerConstant.USERNAME_KEY,"root"); config.put(ConsumerConstant.PASSWORD_KEY, "root"); config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg2"); config.put(ConsumerConstant.CONSUMER_ID_KEY, "c2"); try (final SubscriptionTreePullConsumer consumer = new SubscriptionTreePullConsumer(config)){ consumer.open(); consumer.subscribe(topic); while (true) { final List messages = consumer.poll(10_000L); if (messages.isEmpty() { retryCount++; if (retryCount >= Integer.MAX_VALUE) { break; } } for (final SubscriptionMessage message : messages) { for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()){ System.out.println(dataSet.getColumnNames()); System.out.println(dataSet.getColumnTypes()); while (dataSet.hasNext()){ System.out.println(dataSet.next(); } } } // Auto commit } } }
public static void main(String[] args) { pullTest("tpc1"); } [/code] Сначала было вставлено 10 записей, а затем дополнительные данные были вставлены через SQL. Однако программа Java не смогла подписаться на новые данные (записи с отметкой времени 11 и 12). Вывод: [code] 10:35:29.740 [main] INFO org.apache.iotdb.session.subscription.consumer.base. SubscriptionExecutorServiceManager -- Launching SubscriptionDownStreamDataFlowExecutor with core pool size 10\... [`Time`, `root.sg.hk.d1.lat`, `root.sg.hk.d1.lng`] [INT64, FLOAT, FLOAT] 1 30.2 122.2 2 30.2 122.2 3 30.2 122.2 4 30.2 122.2 5 30.2 122.2 6 30.2 122.2 7 30.2 122.2 8 30.2 122.2 9 30.2 122.2 10 30.2 122.2 [/code] Возникли ли проблемы с моим Java-кодом? Как я могу его изменить?