{
private readonly ApplicationDbContext _context;
private readonly ConcurrentDictionary _pendingForInsertion = new();
private readonly ConcurrentDictionary _pendingForUpdate = new();
private readonly ConcurrentDictionary _completedEntities;
private readonly SemaphoreSlim _contextLock = new(1, 1);
private const int BatchSize = 500;
private ILogger _logger;
public FoundationEntityRepo(ApplicationDbContext context, ILogger logger) : base(context)
{
_logger = logger;
_context = context;
_completedEntities = new ConcurrentDictionary(
_context.CompletedEntities.Select(c => new KeyValuePair(c.ExternalId, true))
);
}
public void MarkAsCompleted(
string externalId,
CancellationToken cancellationToken = default)
{
_completedEntities.TryAdd(externalId, false);
}
public bool IsCompleted(
string externalId,
CancellationToken cancellationToken = default)
{
return _completedEntities.ContainsKey(externalId);
}
public new async Task AddAsync(FoundationEntity entity, CancellationToken cancellationToken = default)
{
await _contextLock.WaitAsync(cancellationToken);
try
{
_pendingForInsertion[entity.ExternalId] = entity;
if (_pendingForInsertion.Count >= BatchSize)
await FlushAsync(_pendingForInsertion, update:false, cancellationToken);
}
finally
{
_contextLock.Release();
}
}
public async Task UpdateAsync(FoundationEntity entity, CancellationToken cancellationToken = default)
{
await _contextLock.WaitAsync(cancellationToken);
try
{
if (_pendingForInsertion.ContainsKey(entity.ExternalId))
return;
_pendingForUpdate[entity.ExternalId] = entity;
if (_pendingForUpdate.Count >= BatchSize)
await FlushAsync(_pendingForUpdate, update:true, CancellationToken.None);
}
finally
{
_contextLock.Release();
}
}
private async Task FlushAsync(ConcurrentDictionary batch, bool update = false, CancellationToken cancellationToken = default)
{
if (batch.IsEmpty) return;
var batchFoundation = batch.Values.ToList();
var completed = batch.Keys
.Where(k => _completedEntities.ContainsKey(k) && !_completedEntities[k])
.Select(k => new CompletedEntity { ExternalId = k })
.ToList();
batch.Clear();
if (update)
{
foreach (var entity in batchFoundation)
{
try
{
DbSet.Update(entity);
await _context.SaveChangesAsync(cancellationToken);
}
catch (DbUpdateConcurrencyException ex)
{
_logger.LogWarning("UpdateConcurrencyException detected on {externalId}", entity.ExternalId);
foreach (var entry in ex.Entries)
{
entry.OriginalValues.SetValues((await entry.GetDatabaseValuesAsync(cancellationToken))!);
}
await _context.SaveChangesAsync(cancellationToken);
}
}
}
else await DbSet.AddRangeAsync(batchFoundation, cancellationToken);
await _context.CompletedEntities.AddRangeAsync(completed, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);
foreach (var id in completed.Select(c => c.ExternalId))
_completedEntities[id] = true;
}
public Task GetByIdAsync(Guid id, CancellationToken cancellationToken = default) {
return DbSet.Where(e => e.Id == id)
.Include(e => e.Parents)
.Include(e => e.Children)
.Include(e => e.Synonyms)
.Include(e => e.Exclusions)
.Include(e => e.Inclusions)
.FirstOrDefaultAsync(cancellationToken);
}
public async Task GetByExternalIdAsync(string externalId, CancellationToken cancellationToken = default)
{
if (_pendingForInsertion.TryGetValue(externalId, out var pendingEntity)
|| _pendingForUpdate.TryGetValue(externalId, out pendingEntity))
return pendingEntity;
await _contextLock.WaitAsync(cancellationToken);
try
{
var entity = DbSet.Where(e => e.ExternalId == externalId)
.Include(e => e.Parents)
.Include(e => e.Children)
.Include(e => e.Synonyms)
.Include(e => e.Exclusions)
.Include(e => e.Inclusions)
.FirstOrDefault();
if (entity != null)
_pendingForUpdate[externalId] = entity;
return entity;
}
finally
{
_contextLock.Release();
}
}
public async Task GetOrCreateByExternalIdAsync(string externalId, Func create, CancellationToken cancellationToken = default)
{
var entity = await GetByExternalIdAsync(externalId, cancellationToken);
if (entity != null)
return entity;
entity = create();
await AddAsync(entity, cancellationToken);
return await GetByExternalIdAsync(externalId, cancellationToken)
?? throw new Exception("Entity should exist after adding it.");
}
}
< /code>
[*] Я использую параллелизм на первом уровне детей корневой сущности. (Это реализовано в реализации WhoService)
private async Task WalkFoundationGraph(
WhoFoundationEntityDto parentDto,
CancellationToken cancellationToken)
{
if (foundationEntityRepo.IsCompleted(parentDto.ExternalId(), cancellationToken))
{
logger.LogDebug("External Id {ExternalId} Found in already completed", parentDto.ExternalId());
return await foundationEntityRepo.GetByExternalIdAsync(parentDto.ExternalId(), cancellationToken)
?? throw new InvalidOperationException("Entity should exist after adding it.");
}
var hasChanges = false;
parentDto.SecureUris();
var foundationParent = await foundationEntityRepo.GetOrCreateByExternalIdAsync(parentDto.ExternalId(),
parentDto.ToFoundationEntity, cancellationToken);
if (parentDto.Child != null)
{
foreach (var childUri in parentDto.Child)
{
var childDto = await GetFoundationEntity(childUri);
var childFoundation = await WalkFoundationGraph(childDto, cancellationToken);
if (!childFoundation.IsParent(foundationParent))
{
foundationParent.Children.Add(childFoundation);
hasChanges = true;
}
if (!foundationParent.IsChild(childFoundation))
{
childFoundation.Parents.Add(foundationParent);
hasChanges = true;
}
}
}
if (hasChanges)
{
await foundationEntityRepo.UpdateAsync(foundationParent, cancellationToken);
}
foundationEntityRepo.MarkAsCompleted(foundationParent.ExternalId, cancellationToken);
return foundationParent;
}
public async Task ImportFoundation(CancellationToken cancellationToken = default)
{
var rootUri = config["Who:FoundationRootUri"] ?? throw new InvalidOperationException("Who:FoundationRootUri is not configured");
var rootDto = await GetFoundationEntity(rootUri);
if (rootDto == null)
throw new InvalidOperationException("Failed to fetch root entity from WHO Foundation.");
if (foundationEntityRepo.IsCompleted(rootDto.ExternalId(), cancellationToken))
return;
rootDto.SecureUris();
var root = await foundationEntityRepo.GetOrCreateByExternalIdAsync(rootDto.ExternalId(),
rootDto.ToFoundationEntity, cancellationToken:cancellationToken);
if (rootDto.Child != null)
{
var childTasks = rootDto.Child.Select(async childUri =>
{
await _semaphoreSlim.WaitAsync(cancellationToken);
try
{
var childDto = await GetFoundationEntity(childUri);
var childFoundation = await WalkFoundationGraph(childDto, cancellationToken);
lock (root)
{
if (!childFoundation.IsParent(root))
root.Children.Add(childFoundation);
if (!root.IsChild(childFoundation))
childFoundation.Parents.Add(root);
}
}
finally
{
_semaphoreSlim.Release();
}
}
);
await Task.WhenAll(childTasks);
}
await foundationEntityRepo.UpdateAsync(root, cancellationToken:cancellationToken);
foundationEntityRepo.MarkAsCompleted(root.ExternalId, cancellationToken);
await foundationEntityRepo.FlushRemainingAsync(cancellationToken);
}
< /code>
Это моя первая работа с одновременной. брошен. PrettyPrint-Override ">if (!foundationParent.IsChild(childFoundation))
{
childFoundation.Parents.Add(foundationParent);
hasChanges = true;
}
< /code>
- Вот почему я пытаюсь справиться с исключением таким образом, что последнее обновление всегда является тем, что следует сохранять.if (update)
{
foreach (var entity in batchFoundation)
{
try
{
DbSet.Update(entity);
await _context.SaveChangesAsync(cancellationToken);
}
catch (DbUpdateConcurrencyException ex)
{
_logger.LogWarning("UpdateConcurrencyException detected on {externalId}", entity.ExternalId);
foreach (var entry in ex.Entries)
{
entry.OriginalValues.SetValues((await entry.GetDatabaseValuesAsync(cancellationToken))!);
}
await _context.SaveChangesAsync(cancellationToken);
}
}
}
< /code>
Но это то, что я получаю вместо < /li>
< /ul>
warn: MRCIS_Plus.API.Features.WhoFoundation.Repos.FoundationEntityRepo[0]
UpdateConcurrencyException detected on 404455908
fail: MRCIS_Plus.API.Middleware.ExceptionHandlingMiddleware[0]
Unhandled exception
System.ArgumentNullException: Value cannot be null. (Parameter 'propertyValues')
at Microsoft.EntityFrameworkCore.Utilities.Check.NotNull[T](T value, String parameterName)
at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.EntryPropertyValues.SetValues(PropertyValues propertyValues)
at MRCIS_Plus.API.Features.WhoFoundation.Repos.FoundationEntityRepo.FlushAsync(ConcurrentDictionary`2 batch, Boolean update, CancellationToken cancellationToken) in C:\Users\haf\Desktop\ET medical API\MRCIS_Plus.API\Features\WhoFoundation\Repos\FoundationEntityRepo.cs:line 103
at MRCIS_Plus.API.Features.WhoFoundation.Repos.FoundationEntityRepo.UpdateAsync(FoundationEntity entity, CancellationToken cancellationToken) in C:\Users\haf\Desktop\ET medical API\MRCIS_Plus.API\Features\WhoFoundation\Repos\FoundationEntityRepo.cs:line 70
< /code>
Теперь, что я узнал, это intry.getDataBaseValuesAsync (CancellationToken) < /code> может быть нулевой только в том случае, если запись была удалена. правильно сохранить отношения на удаленном сервере.
Подробнее здесь: https://stackoverflow.com/questions/797 ... n-database
Мобильная версия