Вот мои определения:
Код: Выделить всё
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public int Version { get; set; }
public Guid ActivationId { get; set; }
}
public static class MessageContracts
{
static bool _initialized;
public static void Initialize()
{
if (_initialized)
return;
GlobalTopology.Send.UseCorrelationId(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId(x => x.ActivationId);
GlobalTopology.Send.UseCorrelationId
(x => x.ActivationId);
_initialized = true;
}
}
Код: Выделить всё
public class StartReconstructionConsumer : IConsumer
{
readonly ILogger _Logger;
private readonly int _DelaySeconds = 5;
public StartReconstructionConsumer(ILogger logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext context)
{
var activationId = context.Message.ActivationId;
_Logger.LogInformation($"Received Scan: {activationId}");
await Task.Delay(_DelaySeconds * 1000);
_Logger.LogInformation($"Finish Scan: {activationId}");
await context.Publish(new { ActivationId = activationId });
}
}
public class ProcessingFinishedConsumer : IConsumer
{
readonly ILogger _Logger;
public ProcessingFinishedConsumer(ILogger logger)
{
_Logger = logger;
}
public async Task Consume(ConsumeContext context)
{
_Logger.LogInformation($"Finish {context.Message.ActivationId}");
await Task.CompletedTask;
}
}
Код: Выделить всё
public class ArcStateMachine: MassTransitStateMachine
{
static ArcStateMachine()
{
MessageContracts.Initialize();
}
public ArcStateMachine()
{
InstanceState(x => x.CurrentState);
Initially(
When(ProcessingStartedEvent)
.Then(context =>
{
Console.WriteLine(">> ProcessingStartedEvent");
context.Instance.ActivationId = context.Data.ActivationId;
})
.TransitionTo(ProcessingStartedState));
During(ProcessingStartedState,
When(ReconstructionFinishedEvent)
.Then(context =>
{
Console.WriteLine(">> ReconstructionFinishedEvent");
context.Instance.ActivationId = context.Data.ActivationId;
})
.Publish(context =>
{
return context.Init
(new { ActivationId = context.Data.ActivationId });
})
.TransitionTo(ProcessingFinishedState)
.Finalize());
}
public State ProcessingStartedState { get; }
public State ReconstructionStartedState { get; }
public State ReconstructionFinishedState { get; }
public State ProcessingFinishedState { get; }
public Event ProcessingStartedEvent { get; }
public Event ReconstructionStartedEvent { get; }
public Event ReconstructionFinishedEvent { get; }
public Event ProcessingFinishedEvent { get; }
}
Код: Выделить всё
var rabbitHost = Configuration["RABBIT_MQ_HOST"];
if (rabbitHost.IsNotEmpty())
{
services.AddMassTransit(cnf =>
{
var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];
var machine = new ArcStateMachine();
var repository = MongoDbSagaRepository.Create(connectionString,
"mongoRepo", "WorkflowState");
cnf.AddConsumer(typeof(StartReconstructionConsumer));
cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
cnf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), hst =>
{
hst.Username("guest");
hst.Password("guest");
});
cfg.ConfigureEndpoints(context);
cfg.ReceiveEndpoint(BusConstants.SagaQueue,
e => e.StateMachineSaga(machine, repository));
});
});
services.AddMassTransitHostedService();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
});
}
- Когда на самом деле событие публикуется в результате публикации сообщение? Т.е. в моем примере await _BusInstance.Bus.Publish(new { ActivationId = id }); вызывается из WebApi, который используется StartReconstructionConsumer, но когда на самом деле конечный автомат начинает действовать с помощью Initially (Когда(ProcessingStartedEvent)...?
- Моя обработка должна гарантировать, что я уже нахожусь в ProcessingStartedState состояние, чтобы во время(ProcessingStartedState, When(ReconstructionFinishedEvent)... действовать правильно. Итак, как мне гарантировать, что мой потребитель, который срабатывает при получении StartProcessingMessage, может опубликовать ReconstructionFinishedMessage, который должен инициировать это во время? Правильно ли я строю обмен сообщениями?
- В настоящее время для контекста ожидания.Publish( new { ActivationId = activeId }); Я получаю исключение в журналах, в котором указано R-FAULT Rabbitmq://localhost/saga.service d4070000-7b3b-704d-0f10-08d99942c959 Nanox.GC.Shared.AppCore. Messages.ReconstructionFinishedMessage ReconCaller.Saga.ArcProcess(00:00:04.1132604), в то время как этим руководством в сообщении на самом деле является MessageId. И мое сообщение в RabbitMQ перенаправляется в saga.service_error с исключением. Типы сообщений не должны быть системными типами: System.Threading.Tasks.Task (Параметр ' T').
Мое намерение состоит в том, чтобы инициировать обработку, которая будет состоять из нескольких этапов, обрабатываемых последовательно несколькими потребителями. Итак, здесь я попытался создать простой StateMachine, который запускается всякий раз, когда кто-то вызывает StartProcessing, затем каждый потребитель выполняет свою работу и запускает FinishedStepX, который переводит конечный автомат на новый шаг и инициирует следующего потребителя. до тех пор, пока вся обработка не будет завершена и конечный автомат не сообщит ProcessingComplete.
Спасибо за любую помощь, заранее
Подробнее здесь: https://stackoverflow.com/questions/697 ... -exception