Код, который я написал до сих пор:
Код: Выделить всё
class KafkaObject
{
// producer, config variables
private HashSet produceTaskList;
public void KafkaProduceAsync(string jsonString)
{
try
{
// extract topic, data from jsonString
var task = producer.ProduceAsync(topic, new Message { Key = topic, Value = data });
_ = task.ContinueWith(self =>
{
produceTaskList.Remove(self);
if (self.IsFaulted)
{
logger.LogError($"Error producing message: {self.Exception}");
}
});
produceTaskList.Add(task);
}
// catch blocks for error handling
}
public async Task WaitProduce()
{
try
{
await Task.WhenAll(produceTaskList);
}
catch (Exception ex)
{
logger.LogError($"Error in WaitProduce: {ex.Message}");
}
logger.LogInformation("Pending Kafka Produce Tasks Finished.");
producer.Flush();
producer.Dispose();
}
}
< /code>
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;
namespace TickDataKafka;
internal class Program
{
static async Task Main()
{
// set url, other variables
Console.WriteLine("Hello world!");
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var reconnectTokenSource = new CancellationTokenSource();
ClientWebSocket socket = new();
bool close = false;
Console.CancelKeyPress += async (s, e) =>
{
e.Cancel = true;
cancellationTokenSource.Cancel();
await kafkaObject.WaitProduce();
};
while (!cancellationToken.IsCancellationRequested)
{
try
{
socket = new();
reconnectTokenSource = new();
var reconnectToken = reconnectTokenSource.Token;
await socket.ConnectAsync(new Uri(url), cancellationToken);
while (socket.State == WebSocketState.Open)
{
var recvBuffer = new byte[512];
reconnectTokenSource.CancelAfter(TimeSpan.FromSeconds(15));
var result = await socket.ReceiveAsync(new ArraySegment(recvBuffer), CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, reconnectToken).Token);
if (result.MessageType == WebSocketMessageType.Close)
{
close = true;
break;
}
Console.WriteLine(Encoding.UTF8.GetString(recvBuffer, 0, result.Count).TrimEnd());
}
}
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested)
{
Console.WriteLine("Closing WebSocket connection.");
break;
}
else
{
reconnectTokenSource.Dispose();
socket.Dispose();
Console.WriteLine("Connection dropped. Attempting to reconnect...");
}
}
catch (Exception ex)
{
Console.WriteLine($"Error connecting: {ex.Message}, {ex.StackTrace}.");
break;
}
if (close)
{
Console.WriteLine("Vendor closed connection.");
break;
}
}
reconnectTokenSource.Dispose();
socket.Dispose();
Console.WriteLine("Socket destroyed.");
}
}
Я также получаю сообщение об ошибке от kafka %4 | 1738735993.355 | derminate | rdkafka#Производитель-1 | [thrd: app]: производитель, завершающий 1 сообщение (279 байтов), все еще в очереди или транзит: используйте fload (), чтобы ждать выдающейся доставки сообщений , даже если я использую производительность. flush () перед Удаление производителя.
Подробнее здесь: https://stackoverflow.com/questions/794 ... a-to-kafka