Как возобновить поток изменений MongoDB в первом документе, а не просто изменить его после начала прослушивания?C#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Как возобновить поток изменений MongoDB в первом документе, а не просто изменить его после начала прослушивания?

Сообщение Anonymous »

Моя цель в этом приложении — создать логику, которая будет отслеживать базу данных и запускать действия при добавлении документа в базу данных (например, отправку электронного письма). Однако, поскольку это приложение не может быть запущено при первом заполнении базы данных, как я могу вручную создать ResumeToken, указывающий на самый первый документ, добавленный в коллекцию, чтобы при первом запуске я мог начать с самого начала? и повторять изменения, пока не дойду до конца. Я понимаю, что мне нужно будет сохранить ResumeToken из LastChangeStreamDocument для будущих перезапусков, но меня интересует сценарий «первого запуска». Я думал, что enumerator.Reset(); был правильным вариантом, но он выдал исключение, указывающее, что он не поддерживается.

Я следовал инструкциям тест, представленный в https://github.com/mongodb/mongo-csharp ... xamples.cs, и успешно настроил поток изменений с помощью следующего кода

Код: Выделить всё

mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");

var collection = sandboxDB.GetCollection("CollectionToMonitor");

try
{
var cursor = collection.Watch();
var enumerator = cursor.ToEnumerable().GetEnumerator();

enumerator.MoveNext();  //Blocks until a record is UPDATED in the database
var lastChangeStreamDocument = enumerator.Current;
enumerator.Dispose();
//lastChangeStreamDocument.FullDocument.Should().Be(document);

}
catch( Exception ex)
{
Logger.WriteException(ex);
}
Однако в этом коде строка enumerator.MoveNext() блокируется до тех пор, пока документ не будет ОБНОВЛЕН, поэтому я могу получить ссылку на обновленные документы только после настройки потока изменений.

У меня возникла идея выполнить поиск в базе данных local.oplog и получить UUID первого документа, вставленного в коллекцию, и она удалась, однако я не вижу способ преобразовать эту ссылку в Объект ResumeToken, который я могу передать методу просмотра.



Обновление:

Похоже, что ResumeToken хранится в формате Base64, который содержит метку времени, o._id ObjectID, а также UUID ui из записи oplog. Мне нужно еще немного просмотреть код, но из этого исходного кода видно (https://github.com/mongodb/mongo/blob/c ... n_test.cpp), что существуют разные форматы РезюмеТокены. Надеюсь, с помощью этой информации я смогу создать свой собственный ResumeToken, соответствующий формату, ожидаемому базой данных.



Обновление №2:

После дополнительных исследований я наткнулся на код для анализа key_string в монго по адресу github.com/mongodb/mongo/src/mongo/db/storage/key_string.cpp. Этот файл содержит определение CType. Я декодировал Base64 в массив байтов, а затем с помощью определений перечисления CType я смог немного больше понять, как создать свой собственный ResumeToken.

Рассмотрим следующий пример. :
Я зафиксировал ResumeToken в ChangeStream после обновления документа.

Код: Выделить всё

glp9zsgAAAABRmRfaWQAZFp9zH40PyabFRwB/ABaEAQESw1YexhL967nKLXsT5Z+BA==
Это декодируется в массив байтов:

Код: Выделить всё

82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04
Что я расшифровал:

Код: Выделить всё

//Timestamp (of oplog entry??)
82    //CType::TimeStamp
5a 7d ce c8 00 00 00 01   //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
//  that integer converts to 0x5A7DCEC8

//Unknown Object
46    //CType::Object
64 5f 69 64     //Either expecting a 32b value or null terminated
00    //Null terminator or divider

//Document ID
64    //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc  //o._id value from oplog entry
00    //OID expecting null terminated

//UUID
5a    //CType::BinData
10    //Length (16b)
04    //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e  //UUID value from oplog entry
04    //Unknown byte.  Perhaps end of ResumeToken, or end of UUID mark?
Проблема, с которой я сейчас столкнулся, заключается в том, что если у меня много записей oplog для коллекции и я использую значения ts, ui и o._id из первой записи в oplog чтобы создать свой собственный ResumeToken (жестко запрограммировать неизвестный блок 0x4664 5f69 6400, а также конечный байт 0x04, тогда сервер принимает это как действительный ResumeToken при настройке коллекции.Watch Однако документ, возвращаемый вызовом enumerator.moveNext(), всегда возвращает третью запись оплога, а не вторую!

Я нервничаю, полагаясь на это в производстве, не зная назначения этого 12-байтового блока, а также не зная, почему я указываю на третью, а не на вторую запись.



Обновление №3:

Эти блоки байтов, о которых идет речь:

Код: Выделить всё

46 64 5f 69 64 00

0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL
Следующий блок байтов описывает ObjectId затронутого документа или его ключ «_id». Так какое же значение имеет символ «d»?

Подробнее здесь: https://stackoverflow.com/questions/486 ... ust-change
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»