Npgsql блокирует поток на неопределенный срок в асинхронной средеC#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Npgsql блокирует поток на неопределенный срок в асинхронной среде

Сообщение Anonymous »

Я пытаюсь найти решение проблемы, с которой столкнулся при использовании популярной библиотеки Postgres C#, Npgsql. Я не уверен, проблема ли это в Npgsql или нет, хотя подозреваю, что это потому, что мой код очень простой. Проблема, которую я вижу, заключается в следующем: когда я иногда вызываю асинхронный метод в Npgsql, а не всегда, поток блокируется. Насколько я могу судить, эта проблема возникает случайным образом. В результате, поскольку я работаю в среде Microsoft Orleans (что может иметь отношение к поиску решения), поток блокируется на неопределенный срок, что делает один из рабочих потоков Orleans неспособным обрабатывать работу. По мере того, как я делаю больше вызовов Npgsql, эти заблокированные потоки накапливаются, и в конечном итоге система Орлеана задыхается из-за нехватки потоков.

Так что я действительно не понимаю, в чем проблема может быть, но поскольку блокировка всегда происходит одним и тем же методом и поскольку она, по-видимому, происходит в какой-то подпрограмме Npgsql, я думаю, что будет справедливо исследовать Npgsql дальше.

Вот мой код, который используется в поставщике хранилища Орлеана (специальный класс, который обрабатывает уровень персистентности системы.)

Код: Выделить всё

var sql = $"SELECT * FROM objects WHERE id = @id";

using (var connection = new NpgsqlConnection(connectionString))
using (var cmd = new NpgsqlCommand(sql, connection))
{
try
{
await connection.OpenAsync();

cmd.Parameters.AddWithValue("id", id);

using(var reader = await connection.ExecuteReaderAsync(cmd))
{
if (reader.HasRows)
{
var objects = await ProtobufSQL.DataReaderToType(modelType, reader);
var data = objects[0];
state.Data = data;
}
}
catch (Exception e)
{
Log.Error(1, e.Message, e);
}
}
Это исходный код класса Protobuf SQL:

Код: Выделить всё

public class ProtobufSQL
{
public static List FlattenToSQLColumns(IMessage message, MessageDescriptor descriptor, string prefix = null)
{
var fields = descriptor.Fields.InDeclarationOrder();

var columns = new List();

for (var i = 0; i < fields.Count; i++)
{
var field = fields[i];
var columnName = field.Name.ToLower();

if (field.Name == "id")
{
ByteString bytes = (ByteString)field.Accessor.GetValue(message);
var uuid = new Guid(bytes.ToByteArray());
columns.Add(new Tuple("id", uuid));
}
else if (field.FieldType == FieldType.Message)
{
var embeddedDescriptor = field.MessageType;
var embeddedMessage = field.Accessor.GetValue(message);
if (field.IsRepeated)
{
throw new Exception("Repeated complex types are not supported, create a foreign key reference in a new object instead.");
}
else
{
columns.AddRange(FlattenToSQLColumns((IMessage)embeddedMessage, embeddedDescriptor, $"{columnName}."));
}
}
else if (field.FieldType == FieldType.Group)
{
throw new Exception("Groups are not supported by ProtobufSQL.");
}
else
{
var columnValue = field.Accessor.GetValue(message);
var key = prefix + columnName;
if (field.IsRepeated)
{
var enumerableColumnValue = columnValue as IEnumerable;
Type listTypeOf = enumerableColumnValue.GetType().GetGenericArguments()[0];
Type listType = typeof(List).MakeGenericType(listTypeOf);
dynamic valueList = Activator.CreateInstance(listType);
foreach (var item in enumerableColumnValue)
{
valueList.Add((dynamic)item);
}
columns.Add(new Tuple(key, valueList.ToArray()));
}
else
{
columns.Add(new Tuple(key, columnValue));
}
}
}

return columns;
}

public static async Task DataReaderToType(Type type, DbDataReader reader)
{
var descriptor = (MessageDescriptor)type.GetProperty("Descriptor").GetValue(null);

IList  objects = new List();

while (await reader.ReadAsync())
{
var obj = Activator.CreateInstance(type);
TraverseDbRow(reader, descriptor, obj);
objects.Add((IMessage)obj);
}

return objects.ToArray();
}

private static void TraverseDbRow(DbDataReader reader, MessageDescriptor descriptor, object obj, string prefix = null)
{
var fields = descriptor.Fields.InFieldNumberOrder();

for (var i = 0; i < fields.Count; i++)
{
var field = fields[i];
if (field.FieldType == FieldType.Message)
{
if (field.IsRepeated)
{
// Repeated embedded types will be broken out into a separate table,
// so there's no need to handle them here.
}
else if (field.IsMap)
{
throw new Exception("Maps are not yet supported by ProtobufSQL.");
}
else
{
var embeddedDescriptor = field.MessageType;
var embeddedObj = Activator.CreateInstance(embeddedDescriptor.ClrType);
TraverseDbRow(reader, embeddedDescriptor, embeddedObj, $"{prefix}{field.Name}.");
}
}
else if (field.FieldType == FieldType.Group)
{
throw new Exception("Groups are not supported by ProtobufSQL.");
}
else
{
var columnName = prefix + field.Name;
try
{
var columnValue = reader[columnName];
var propertyInfo = obj.GetType().GetProperty(field.Name, BindingFlags.IgnoreCase | BindingFlags.Public | BindingFlags.Instance);

if (field.Name == "id")
{
var guid = (Guid)columnValue;
ByteString bytes = ByteString.CopyFrom(guid.ToByteArray());
propertyInfo.SetValue(obj, bytes);
}
else if (field.IsRepeated)
{
var repeated = propertyInfo.GetValue(obj);
var addRange = repeated.GetType().GetMethod("AddRange");
addRange.Invoke(repeated, new object[] { columnValue });
}
else if (field.IsMap)
{
throw new Exception("Maps are not yet supported by ProtobufSQL.");
}
else
{
propertyInfo.SetValue(obj, Convert.ChangeType(columnValue, propertyInfo.PropertyType));
}
}
catch (IndexOutOfRangeException e)
{
// columnName was not present in the response
}
}
}
}
}
У меня также есть этот снимок экрана стека потока, когда он был заблокирован:
Изображение


Я действительно не знаю, что со всем этим делать. Надеюсь, найдется кто-нибудь, обладающий небольшими знаниями, которые помогут мне продолжить! Спасибо.

Подробнее здесь: https://stackoverflow.com/questions/439 ... nvironment
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Npgsql блокирует поток на неопределенный срок в асинхронной среде
    Anonymous » » в форуме C#
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • Почему этот код CUDA на неопределенный срок на неопределенный срок?
    Гость » » в форуме C++
    0 Ответы
    19 Просмотры
    Последнее сообщение Гость
  • Почему этот код CUDA на неопределенный срок на неопределенный срок?
    Anonymous » » в форуме C++
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Почему этот код CUDA на неопределенный срок на неопределенный срок?
    Anonymous » » в форуме C++
    0 Ответы
    20 Просмотры
    Последнее сообщение Anonymous
  • Почему этот код CUDA на неопределенный срок на неопределенный срок?
    Anonymous » » в форуме C++
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous

Вернуться в «C#»