Если я использую необработанная команда SQL, она работает:
Код: Выделить всё
using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using System.Text.Json.Nodes;
string connectionString = "Server=localhost; Database=pubs; User ID=***; Password=***; TrustServerCertificate=True;";
string bootstrapServers = "localhost:9092";
string topicName = "cbsource";
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "CB_SQL_consumer_EF_Core",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var cancellationTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationTokenSource.Cancel();
};
try
{
using (var consumer = new ConsumerBuilder(consumerConfig).Build())
{
consumer.Subscribe(topicName);
var dbContext = new PubDbContext(connectionString);
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
if (consumeResult.Message.Value != null)
{
Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
JsonObject? jsonObject = JsonSerializer.Deserialize(consumeResult.Message.Value);
if (jsonObject != null)
{
int? outerTableColumn1Value = null;
string? outerTableColumn2Value = null;
int? innerTableColumn1Value = null;
string? innerTableColumn2Value = null;
outerTableColumn1Value = int.TryParse(jsonObject["outer1"]?.ToString(), out int parsedNumber)
? parsedNumber : null;
outerTableColumn2Value = jsonObject["outer2"]?.ToString();
var inner1Value = jsonObject["inner"]?["inner1"]?.ToString();
var inner2Value = jsonObject["inner"]?["inner2"]?.ToString();
if (!string.IsNullOrEmpty(inner1Value) || !string.IsNullOrEmpty(inner2Value))
{
innerTableColumn1Value = int.TryParse(inner1Value, out parsedNumber)
? parsedNumber : null;
innerTableColumn2Value = inner2Value;
Console.WriteLine("Inner table exists.");
}
else
{
Console.WriteLine("Inner table does not exist.");
}
var transaction = dbContext.Database.BeginTransaction();
try
{
string sqlCommand = @"SELECT TOP 1 * FROM OuterTable WHERE Outer1 = {0}";
string updateCommand = "";
string insertCommand = "";
var outerEntity = await dbContext.OuterTable
.FromSqlRaw(sqlCommand, outerTableColumn1Value)
.FirstOrDefaultAsync();
if (outerEntity != null)
{
updateCommand = @"UPDATE OuterTable SET Outer2 = {1} WHERE Outer1 = {0}";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(updateCommand,
outerTableColumn1Value, outerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Outer table row updated.");
}
else
{
insertCommand = @"INSERT INTO OuterTable (Outer1, Outer2) VALUES ({0}, {1})";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(insertCommand,
outerTableColumn1Value, outerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Outer table row inserted.");
}
sqlCommand = @"SELECT TOP 1 * FROM InnerTable WHERE Inner1 = {0}";
var innerEntity = await dbContext.InnerTable
.FromSqlRaw(sqlCommand, innerTableColumn1Value)
.FirstOrDefaultAsync();
if (innerEntity != null)
{
updateCommand = @"UPDATE InnerTable SET Inner2 = {1} WHERE Inner1 = {0}";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(updateCommand,
innerTableColumn1Value, innerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Inner table row updated.");
}
else
{
insertCommand = @"INSERT INTO InnerTable (Inner1, Inner2) VALUES ({0}, {1})";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(insertCommand,
innerTableColumn1Value, innerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Inner table row inserted.");
}
await transaction.CommitAsync();
}
catch (Exception ex)
{
await transaction.RollbackAsync();
Console.WriteLine($"Error: {ex.Message}. Transaction rolled back.");
}
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error during consume: {e.Error.Reason}");
}
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer has been cancelled.");
}
public class PubDbContext : DbContext
{
private readonly string _connectionString;
public PubDbContext(string connectionString)
{
_connectionString = connectionString;
}
public DbSet OuterTable { get; set; }
public DbSet InnerTable { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer(_connectionString);
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity().HasNoKey();
modelBuilder.Entity().HasNoKey();
}
}
public class OuterTable
{
public int? Outer1 { get; set; }
public string? Outer2 { get; set; }
}
public class InnerTable
{
public int? Inner1 { get; set; }
public string? Inner2 { get; set; }
}
В любом случае, источником являются документы Couchbase JSON (с использованием Kafka для получения сообщения) следующим образом:

Итак, я имею дело с вложенными JSON-документы. Некоторые из них имеют внутреннюю структуру.
В любом случае, другие сказали мне, что приведенный выше код отклоняется от духа ORM, поскольку следует избегать выполнения необработанных команд SQL в коде. Итак, я сталкиваюсь со следующим.
Код: Выделить всё
using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using System.Text.Json.Nodes;
string connectionString = "Server=localhost; Database=pubs; User ID=***; Password=***; TrustServerCertificate=True;";
string bootstrapServers = "localhost:9092";
string topicName = "cbsource";
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "CB_SQL_consumer_ORM",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var cancellationTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationTokenSource.Cancel();
};
var optionsBuilder = new DbContextOptionsBuilder();
optionsBuilder.UseSqlServer(connectionString);
try
{
using (var consumer = new ConsumerBuilder(consumerConfig).Build())
{
consumer.Subscribe(topicName);
int i = 0;
int j = 0;
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
if (consumeResult.Message.Value != null)
{
Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
JsonObject? jsonObject = JsonSerializer.Deserialize(consumeResult.Message.Value);
if (jsonObject != null)
{
int outerTableColumn1Value = int.TryParse(jsonObject["outer1"]?.ToString(), out int outer1Parsed) ? outer1Parsed : --i;
string? outerTableColumn2Value = jsonObject["outer2"]?.ToString();
string? inner1Value = jsonObject["inner"]?["inner1"]?.ToString();
string? inner2Value = jsonObject["inner"]?["inner2"]?.ToString();
int innerTableColumn1Value = int.TryParse(inner1Value, out int inner1Parsed) ? inner1Parsed : --j;
string? innerTableColumn2Value = inner2Value;
using (var context = new ApplicationDbContext(optionsBuilder.Options))
{
using (var transaction = context.Database.BeginTransaction())
{
try
{
var outerRow = await context.OuterTable
.FirstOrDefaultAsync(o => o.Outer1 == outerTableColumn1Value);
if (outerRow != null)
{
outerRow.Outer2 = outerTableColumn2Value;
context.OuterTable.Update(outerRow);
Console.WriteLine("Outer table row updated.");
}
else
{
var outerTable = new OuterTable
{
Outer1 = outerTableColumn1Value,
Outer2 = outerTableColumn2Value
};
await context.OuterTable.AddAsync(outerTable);
Console.WriteLine("Outer table row inserted.");
}
var innerRow = await context.InnerTable
.FirstOrDefaultAsync(o => o.Inner1 == innerTableColumn1Value);
if (innerRow != null)
{
innerRow.Inner2 = innerTableColumn2Value;
context.InnerTable.Update(innerRow);
Console.WriteLine("Inner table row updated.");
}
else
{
var innerTable = new InnerTable
{
Inner1 = innerTableColumn1Value,
Inner2 = innerTableColumn2Value
};
await context.InnerTable.AddAsync(innerTable);
Console.WriteLine("Inner table row inserted.");
}
await context.SaveChangesAsync();
await transaction.CommitAsync();
}
catch (Exception ex)
{
transaction.Rollback();
Console.WriteLine($"Error: {ex.Message}. Transaction rolled back.");
}
}
}
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error during consume: {e.Error.Reason}");
}
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer has been cancelled.");
}
public class ApplicationDbContext : DbContext
{
public DbSet OuterTable { get; set; }
public DbSet InnerTable { get; set; }
public ApplicationDbContext(DbContextOptions options) : base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity().HasKey(o => o.Outer1);
modelBuilder.Entity().HasKey(i => i.Inner1);
}
}
public class OuterTable
{
public int? Outer1 { get; set; }
public string? Outer2 { get; set; }
}
public class InnerTable
{
public int? Inner1 { get; set; }
public string? Inner2 { get; set; }
}
Код: Выделить всё
modelBuilder.Entity().HasKey(o => o.Outer1);
modelBuilder.Entity().HasKey(i => i.Inner1);
Я видел сообщение, в котором предлагалось использовать составные ключи, но вопрос в том, что некоторые документы JSON не имеют внешних /inner вообще, но я все еще настаиваю на том, чтобы присваивать им значения NULL. Поэтому составление ключей в моем случае, похоже, не работает.
Здесь я также прикрепляю таблицы SQL Server, сгенерированные моим первым кодом, и то, что мне действительно нужно.


Подробнее здесь: https://stackoverflow.com/questions/793 ... imary-keys