Мы используем поток изменений с использованием драйвера C# mongodb для выполнения некоторой работы в реальном времени. Мы прослушиваем несколько операций вставки и обновления данных сбора баз данных. Каждый раз через несколько часов мы получали следующее исключение.
$Index находился за пределами массива.
в Microsoft.Extensions.Logging.LogValuesFormatter.GetValue(значения Object[], индекс Int32) в Microsoft.Extensions.Logging.FormattedLogValues.GetEnumerator()+MoveNext() в Serilog.Extensions.Logging.SerilogLogger.Log[TState ](LogLevel logLevel, EventId eventId, состояние TState, исключение исключения, средство форматирования Func3) в Microsoft.Extensions.Logging.Logger1.Microsoft.Extensions.Logging.ILogger.Log[TState](LogLevel logLevel , EventId eventId, состояние TState, исключение исключения, средство форматирования Func3) в Microsoft.Extensions.Logging.LoggerExtensions.Log(регистратор ILogger, LogLevel logLevel, EventId eventId, исключение исключения, строковое сообщение, Object[] args) в SyncServer .Services.MultiDatabaseChangeStreamManager.cDisplayClass9_0.d.MoveNext() в /app/jenkins/workspace/flecx-prod-changestream/src/Host/SyncServer/Services/MultiDatabaseChangeStreamManager.cs:line 167 — End трассировки стека из предыдущего местоположения — в MongoDB.Driver.IAsyncCursorExtensions.ForEachAsync[TDocument](источник IAsyncCursor1, процессор Func`3, CancellationToken cancelToken) в SyncServer.Services.MultiDatabaseChangeStreamManager.StartChangeStreams() в /app/jenkins /workspace/flecx-prod-changestream/src/Host/SyncServer/Services/MultiDatabaseChangeStreamManager.cs:строка 109
В строке 109 находится код

Код функции обмена
var конвейер = новый EmptyPipelineDefinition() .Match(change =>change.OperationType == ChangeStreamOperationType.Insert ||change.OperationType == ChangeStreamOperationType.Update) .Match(y => client.SyncSettings.WatchedDatabases.Contains(y.DatabaseNamespace.DatabaseName)) .Match(x => client.SyncSettings.WatchedCollections.Contains(x.CollectionNamespace.CollectionName)); вар резюмеТокен = новый BsonDocument (); var logCollection = data.GetCollection("logstream"); var LastLog = logCollection.AsQueryable().Where(x => x.Status == LogstreamStatus.Completed).OrderByDescending(o => о.CreatedOn).FirstOrDefault(); если (lastLog не равен нулю) { _logger.LogInformation("Найден последний неудачный журнал"); резюмеТокен = LastLog.ResumeToken; } еще { _logger.LogInformation("Последний журнал не найден, получение токена возобновления напрямую из mongodb"); вар previousCursor = ждут базы данных.WatchAsync(); резюмеТокен = предыдущийКурсор.GetResumeToken(); _logger.LogInformation("Токен возобновления получен"); предыдущийКурсор.Dispose(); } варианты вар = новый ChangeStreamOptions {ResumeAfter = резюмеToken, BatchSize = 5}; CancellationTokenSource cancelTokenSource = новый CancellationTokenSource(); Токен CancellationToken = cancelTokenSource.Token; пытаться { используя (var курсор = ожидание mongoclient.WatchAsync(конвейер, параметры, cancelToken: токен)) { ждут курсора.ForEachAsync(асинхронное изменение => { _logger.LogInformation("Проверка изменений для | {documentId} | {operationType}",change.DocumentKey["_id"],change.OperationType); var log = Logstream.Create(client.ClientId, изменить.DocumentKey, изменить.FullDocument, изменить.OperationType == ChangeStreamOperationType.Insert ? null: изменить.UpdateDescription.UpdatedFields, изменить.ResumeToken, изменить.OperationType, изменить.CollectionNamespace.CollectionName) ; пытаться { переключатель (change.OperationType) { случай ChangeStreamOperationType.Insert: { _logger.LogInformation($"Вставить объект = {change.FullDocument.ToJson()}"); _logger.LogInformation($"ReplayChangeToTarget start"); ждут ReplayChangeToTarget(изменение, client.SyncSettings, изменение.CollectionNamespace.CollectionName); _logger.LogInformation($"ReplayChangeToTarget end"); _logger.LogInformation($"Вставка журналов"); ждут logCollection.InsertOneAsync(журнал); _logger.LogInformation($"журналы вставлены"); перерыв; } случай ChangeStreamOperationType.Update: { _logger.LogInformation($"Обновить объект = {change.UpdateDescription.UpdatedFields.ToJson()}"); _logger.LogInformation($"ReplayChangeToTarget start"); ждут ReplayChangeToTarget(изменение, client.SyncSettings, изменение.CollectionNamespace.CollectionName); _logger.LogInformation($"ReplayChangeToTarget end"); _logger.LogInformation($"Вставка журналов"); ждут logCollection.InsertOneAsync(журнал); _logger.LogInformation($"журналы вставлены"); перерыв; } } } улов (например, MongoWriteException) { если (например, WriteError.Code == 11000) { Console.WriteLine($"Ключ дублирования исключения: {change.FullDocument.GetValue("_id")}, игнорировать..."); log.Status = LogstreamStatus.Completed; ждут logCollection.InsertOneAsync(журнал); } еще { _logger.LogInformation($"MongoException Exception :: {ex.Message}", ex.ToString()); ждут SendEmailReport(client.ClientName, изменить.CollectionNamespace.CollectionName, ex.Message); отменитьTokenSource.Cancel(); } } поймать (Исключение ex) { _logger.LogInformation($"ReplayChangeToTarget Exception :: {ex.Message}", ex.ToString()); ждут SendEmailReport(client.ClientName, изменить.CollectionNamespace.CollectionName, ex.Message); отменитьTokenSource.Cancel(); } }, cancelToken: токен); } } поймать (Исключение ex) { await SendEmailReport("App", "App", ex.Message); _logger.LogInformation($"Collection.WatchAsync Exception :: ${ex.Message}", ex.ToString()); _logger.LogInformation(ex.StackTrace); } частная асинхронная задача ReplayChangeToTarget (изменение ChangeStreamDocument, настройки SyncSettings, имя коллекции строк) { _logger.LogInformation($"Цель синхронизации запущена для {collectionName}"); База данных IMongoDatabase; клиент IMongoClient; если (settings.IsSSLEnable) { var path = Environment.CurrentDirectory + Path.DirectorySeparatorChar + «Сертификаты» + Path.DirectorySeparatorChar + settings.CertificateFilePath; клиент = _dbManager.GetClientFromUrl(settings.ConnectionString, путь, settings.Password); база данных = client.GetDatabase(settings.Database); } еще { база данных = _dbManager.GetMongoDbFromUrl(settings.ConnectionString); } коллекция var = база данных.GetCollection(имя_коллекции); переключатель (change.OperationType) { случай ChangeStreamOperationType.Insert: { _logger.LogInformation($"Вставка в целевую коллекцию = {collectionName}"); ждут коллекции.InsertOneAsync(change.FullDocument); _logger.LogInformation($"Успешно добавлено в целевую коллекцию = {collectionName}"); перерыв; } случай ChangeStreamOperationType.Update: { _logger.LogInformation($"Обновление целевой коллекции = {collectionName}"); var filter = Builders.Filter.Eq("_id",change.DocumentKey["_id"]); поля вар = изменение.UpdateDescription.UpdatedFields; вар updateDefination = новый List(); foreach (var dataField в полях) { updateDefination.Add(Builders.Update.Set(dataField.Name, dataField.Value)); } если (updateDefination.Count > 0) { var комбинированныйUpdate = Builders.Update.Combine(updateDefination); ждут коллекции.UpdateOneAsync(фильтр, комбинированныйUpdate); _logger.LogInformation($"Успешно обновлено до целевой коллекции = {collectionName}"); } перерыв; } } _logger.LogInformation($"Цель синхронизации для {collectionName} завершена"); }