Как я могу отслеживать отдельные состояния для каждого элемента в коллекции, используя машину MassTransit State?C#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Как я могу отслеживать отдельные состояния для каждого элемента в коллекции, используя машину MassTransit State?

Сообщение Anonymous »

Я пытаюсь создать супер простую стратегию торговли сеткой с использованием машины MassTransit Saga Saga, и я сталкиваюсь с проблемой, когда дело доходит до отслеживания состояния для каждого отдельного уровня сетки. < /p>
Идея стратегии сетки заключается в следующем: < /p>

[*] selet and lothers. Геометрическое расстояние).
[*] Игнорируйте ближайший уровень к текущей цене.
[*] Заказы на покупку ниже и продавать заказы выше текущей цены.
[*] Когда заказ на продажу выполняется, поместите новый заказ на покупку на один уровень ниже. /> Повторите неопределенный срок. Однако я действительно хочу иметь отдельные состояния для каждого уровня сетки. В частности: < /p>
  • Для каждого уровня сетки должно быть состояние (Inactive, buy или sell ).
  • Если уровень сетки наиболее близок к текущей цене, состояние должно быть неактивным .
  • Если уровень сетки ниже текущей цены, государство должно быть в целом. Продать .
Стратегическое ввод праметра

[*] Нижний предел: нижняя граница сетки.
Верхний предел: верхняя граница сетки. /> < /ul>
Сетка может быть сгенерирована с помощью арифметического или геометрического расстояния. Ниже приведена версия Python о том, как рассчитываются уровни сетки: < /p>
def get_grids(lower_limit, upper_limit, grid_count, tp="arth"):
if tp == "arth":
grids = np.linspace(lower_limit, upper_limit, grid_count + 1)
elif tp == "geom":
grids = np.geomspace(lower_limit, upper_limit, grid_count + 1)
else:
print("not right range type")
return grids
< /code>
В C#уровни арифметической сетки можно рассчитать следующим образом: < /p>
var step = (upperLimit - lowerLimit) / gridCount;

List gridLevels = new List();
for (var i = 0; i
Что я пробовал (минимальный воспроизводимый пример) < /h2>
var builder = Host.CreateApplicationBuilder(args);

builder.AddEventBus();

var host = builder.Build();

await host.StartAsync();

var bus = host.Services.GetRequiredService();

var stateMachine = new GridStateMachine();

var lowerLimit = 25_000m;
var upperLimit = 35_000m;
var gridCount = 20;

var step = (upperLimit - lowerLimit) / gridCount;

List gridLevels = [];
for (var i = 0; i x.CurrentState);

Event(() => Initialized, e => e.CorrelateById(m => m.Message.CorrelationId));
Event(() => PriceCrossedUp, e => e.CorrelateById(m => m.Message.CorrelationId));
Event(() => PriceCrossedDown, e => e.CorrelateById(m => m.Message.CorrelationId));

Initially(
When(Initialized)
.IfElse(context => IsClosestLevel(context.Message.Price),
then => then.TransitionTo(Inactive),
orElse => orElse.IfElse(context => IsLowerLevel(context.Message.Price),
then => then.TransitionTo(Buy).Then(context => PlaceBuyOrder(context.Message)),
orElse2 => orElse2.TransitionTo(Sell).Then(context => PlaceSellOrder(context.Message))
)
)
);
}

public State Inactive { get; private set; } = null!;
public State Buy { get; private set; } = null!;
public State Sell { get; private set; } = null!;

public Event Initialized { get; private set; } = null!;
public Event PriceCrossedUp { get; private set; } = null!;
public Event PriceCrossedDown { get; private set; } = null!;

private bool IsClosestLevel(decimal price)
{
Console.WriteLine($"Is {price} the closest level?");
return false;
}

private bool IsLowerLevel(decimal price)
{
Console.WriteLine($"Is {price} lower level?");
return false;
}

private bool IsHigherLevel(decimal price)
{
Console.WriteLine($"Is {price} higher level?");
return false;
}

private void PlaceBuyOrder(GridInitialized request)
{
Console.WriteLine("Placing a buy order...");
}

private void PlaceSellOrder(GridInitialized request)
{
Console.WriteLine("Placing a sell order...");
}
}

public class GridInitialized
{
public Guid CorrelationId { get; set; }
public decimal Price { get; set; }
public List GridLevels { get; set; } = [];
}

public class PriceCrossedUp
{
public Guid CorrelationId { get; set; }
}

public class PriceCrossedDown
{
public Guid CorrelationId { get; set; }
}

public static class ServiceCollectionExtensions
{
public static IHostApplicationBuilder AddEventBus(
this IHostApplicationBuilder builder,
Action? massTransitConfiguration = null) =>
AddEventBus(builder, massTransitConfiguration);

public static IHostApplicationBuilder AddEventBus(
this IHostApplicationBuilder builder,
Action? massTransitConfiguration = null)
where TBus : class, IBus
{
ArgumentNullException.ThrowIfNull(builder);

builder.Services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.SetInMemorySagaRepositoryProvider();

var entryAssembly = Assembly.GetEntryAssembly();
x.AddSagaStateMachines(entryAssembly);
x.AddSagas(entryAssembly);
x.AddActivities(entryAssembly);

massTransitConfiguration?.Invoke(x);

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});

cfg.ConfigureEndpoints(context);
});
});

return builder;
}
}
< /code>
Я изо всех сил пытаюсь выяснить, как управлять отдельными состояниями для каждого уровня сетки в MassTransit. Поскольку стратегия сетки может иметь много уровней, мне нужно иметь разные состояния для каждого уровня сетки вместо одного глобального состояния для всей сетки. < /P>
Как я могу это сделать?// Program.cs
var builder = Host.CreateApplicationBuilder(args);

builder.Services.AddSingleton();
builder.AddEventBus(options =>
{
options.AddConsumer();
});

var host = builder.Build();

await host.StartAsync();

var bus = host.Services.GetRequiredService();
var gridLevelTracker = host.Services.GetRequiredService();

// Strategy input parameters
var lowerLimit = 25_000m;
var upperLimit = 35_000m;
var gridCount = 20;

// Calculate arithmetic progression and create grid levels
var step = (upperLimit - lowerLimit) / gridCount;
for (var i = 0; i GridLevelInitialized, x => x.CorrelateById(m => m.Message.GridLevelId));
Event(() => GridLevelShouldBeInactive, x => x.CorrelateById(m => m.Message.GridLevelId));
Event(() => GridLevelShouldBeBuy, x => x.CorrelateById(m => m.Message.GridLevelId));
Event(() => GridLevelShouldBeSell, x => x.CorrelateById(m => m.Message.GridLevelId));

InstanceState(x => x.CurrentState);

Initially(
When(GridLevelInitialized)
.Then(context =>
{
context.Saga.Price = context.Message.Price;
Console.WriteLine($"Grid level {context.Saga.CorrelationId} initialized at price {context.Saga.Price}");
})
.TransitionTo(Pending));

During(Pending,
When(GridLevelShouldBeInactive)
.Then(context =>
{
Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} remains in Inactive state");
})
.TransitionTo(Inactive),
When(GridLevelShouldBeBuy)
.Then(context => {
Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Buy state");
context.Saga.OrderId = PlaceBuyOrder(context.Saga.Price);
})
.TransitionTo(Buy),
When(GridLevelShouldBeSell)
.Then(context => {
Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Sell state");
context.Saga.OrderId = PlaceSellOrder(context.Saga.Price);
})
.TransitionTo(Sell));

During(Buy,
When(GridLevelShouldBeInactive)
.Then(context => {
Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Inactive state");
CancelBuyOrder(context.Saga.Price);
})
.TransitionTo(Inactive),
When(GridLevelShouldBeSell)
.Then(context => {
Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Sell state");
CancelBuyOrder(context.Saga.Price);
context.Saga.OrderId = PlaceSellOrder(context.Saga.Price);
})
.TransitionTo(Sell));

During(Sell,
When(GridLevelShouldBeInactive)
.Then(context => {
Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Inactive state");
CancelSellOrder(context.Saga.Price);
})
.TransitionTo(Inactive),
When(GridLevelShouldBeBuy)
.Then(context => {
Console.WriteLine($"Grid level {context.Saga.CorrelationId} at price {context.Saga.Price} transitioning to Buy state");
CancelSellOrder(context.Saga.Price);
context.Saga.OrderId = PlaceBuyOrder(context.Saga.Price);
})
.TransitionTo(Buy));

SetCompletedWhenFinalized();
}

public State Pending { get; private set; } = null!;
public State Inactive { get; private set; } = null!;
public State Buy { get; private set; } = null!;
public State Sell { get; private set; } = null!;

public Event GridLevelInitialized { get; private set; } = null!;
public Event GridLevelShouldBeInactive { get; private set; } = null!;
public Event GridLevelShouldBeBuy { get; private set; } = null!;
public Event GridLevelShouldBeSell { get; private set; } = null!;

private string PlaceBuyOrder(decimal price)
{
Console.WriteLine($"Placing buy order at {price}");
return Guid.NewGuid().ToString();
}

private string PlaceSellOrder(decimal price)
{
Console.WriteLine($"Placing sell order at {price}");
return Guid.NewGuid().ToString();
}

private void CancelBuyOrder(decimal price)
{
Console.WriteLine($"Cancelling buy order at {price}");
}

private void CancelSellOrder(decimal price)
{
Console.WriteLine($"Cancelling sell order at {price}");
}
}

// Messages.cs
public class GridLevelInitialized
{
public Guid GridLevelId { get; init; }
public decimal Price { get; init; }
}

public class GridLevelShouldBeInactive
{
public Guid GridLevelId { get; init; }
public decimal Price { get; init; }
}

public class GridLevelShouldBeBuy
{
public Guid GridLevelId { get; init; }
public decimal Price { get; init; }
}

public class GridLevelShouldBeSell
{
public Guid GridLevelId { get; init; }
public decimal Price { get; init; }
}

public class CurrentPriceUpdated
{
public decimal CurrentPrice { get; init; }
}

// GridLevelTracker.cs
public interface IGridLevelTracker
{
void RegisterGridLevel(Guid gridLevelId, decimal price);

List GetAllGridLevelIds();

void UpdateCurrentPrice(decimal currentPrice);

(Guid? ClosestLevelId, List LowerLevelIds, List HigherLevelIds) GetLevelsRelativeToCurrentPrice();
}

public class GridLevelTracker : IGridLevelTracker
{
private readonly ConcurrentDictionary _gridLevels = new();
private decimal _currentPrice;

public void RegisterGridLevel(Guid gridLevelId, decimal price)
{
_gridLevels.TryAdd(gridLevelId, price);
}

public List GetAllGridLevelIds()
{
return _gridLevels.Keys.ToList();
}

public void UpdateCurrentPrice(decimal currentPrice)
{
_currentPrice = currentPrice;
}

public (Guid? ClosestLevelId, List LowerLevelIds, List HigherLevelIds) GetLevelsRelativeToCurrentPrice()
{
if (_gridLevels.IsEmpty)
return (null, [], []);

var levels = _gridLevels.ToList();

// Find closest level to current price
var closestLevel = levels
.OrderBy(x => Math.Abs(x.Value - _currentPrice))
.First();

// Find levels below current price, sorted by price in ascending order
var lowerLevels = levels
.Where(x => x.Value < _currentPrice && x.Key != closestLevel.Key)
.OrderBy(x => x.Value) // Sort by price ascending
.Select(x => x.Key)
.ToList();

// Find levels above current price, sorted by price in ascending order
var higherLevels = levels
.Where(x => x.Value > _currentPrice && x.Key != closestLevel.Key)
.OrderBy(x => x.Value) // Sort by price ascending
.Select(x => x.Key)
.ToList();

return (closestLevel.Key, lowerLevels, higherLevels);
}
}

// CurrentPriceUpdatedConsumer.cs
public class CurrentPriceUpdatedConsumer(IGridLevelTracker gridLevelTracker, IBus bus) : IConsumer
{
public async Task Consume(ConsumeContext context)
{
var currentPrice = context.Message.CurrentPrice;

// Update tracker with current price
gridLevelTracker.UpdateCurrentPrice(currentPrice);

// Get grid levels relative to current price
var (closestLevelId, lowerLevelIds, higherLevelIds) = gridLevelTracker.GetLevelsRelativeToCurrentPrice();

// Send events to all levels
foreach (var levelId in lowerLevelIds)
{
await bus.Publish(new GridLevelShouldBeBuy { GridLevelId = levelId });
}

if (closestLevelId.HasValue)
{
await bus.Publish(new GridLevelShouldBeInactive { GridLevelId = closestLevelId.Value });
}

foreach (var levelId in higherLevelIds)
{
await bus.Publish(new GridLevelShouldBeSell { GridLevelId = levelId });
}
}
}

// ServiceCollectionExtensions.cs (remains the same)
public static class ServiceCollectionExtensions
{
public static IHostApplicationBuilder AddEventBus(
this IHostApplicationBuilder builder,
Action? massTransitConfiguration = null) =>
AddEventBus(builder, massTransitConfiguration);

public static IHostApplicationBuilder AddEventBus(
this IHostApplicationBuilder builder,
Action? massTransitConfiguration = null)
where TBus : class, IBus
{
ArgumentNullException.ThrowIfNull(builder);

builder.Services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.SetInMemorySagaRepositoryProvider();

var entryAssembly = Assembly.GetEntryAssembly();
x.AddSagaStateMachines(entryAssembly);
x.AddSagas(entryAssembly);
x.AddActivities(entryAssembly);

massTransitConfiguration?.Invoke(x);

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});

cfg.ConfigureEndpoints(context);
});
});

return builder;
}
}


Подробнее здесь: https://stackoverflow.com/questions/795 ... a-masstran
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»