У меня есть проблема с MassTransit и функцией SAGA. Fault rabbitmq: // localhost/monitoring-job-saga 489a0000-4100-0250-1852-08dd500596e7 shared.messaging contracts.jobsubmitted mainapp.donitoringjobstate (00: 00: 00.0334249)
sysstem.notsupex noble-ispectex noblexpeex или 00334249). В ioc
at massTransit.configuration.RegistrationsersEcollectionExtensions.tempsagarePository1.Send[T](ConsumeContext 1 Контекст, Исагаполитика2 policy, IPipe 1 Далее) в /_/src/masstransit/dependencyInection/configuration/RegistrationserviceCollectionextensions.cs:line 93
at massTransit.middleware.correlatedSagafilter2.Send(ConsumeContext 1 контекст, ipipe`1 Далее)
services.AddMassTransit(x =>
{
x.AddSagaStateMachine()
.InMemoryRepository();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUser);
h.Password(rabbitPass);
});
cfg.UseDelayedMessageScheduler();
cfg.ReceiveEndpoint(sagaQueue, e =>
{
e.StateMachineSaga(
context.GetRequiredService(),
context.GetRequiredService());
});
});
});
< /code>
monitoringjobstatemachine-class: < /p>
using MassTransit;
using MainApp.Data;
using Shared.MessagingContracts;
namespace MainApp
{
public class MonitoringJobStateMachine : MassTransitStateMachine
{
public State Submitted { get; private set; } = null!;
public State Processing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;
public Event JobSubmittedEvent { get; private set; } = default!;
public Event JobCompletedEvent { get; private set; } = default!;
public Event JobFailedEvent { get; private set; } = default!;
public Schedule JobTimeoutSchedule { get; private set; } = default!;
public MonitoringJobStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => JobSubmittedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
x.InsertOnInitial = true;
});
Event(() => JobCompletedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
Event(() => JobFailedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Konfiguriere den Timeout-Scheduler
Schedule(() => JobTimeoutSchedule, x => x.TimeoutTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(10);
s.Received = r => r.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Initialer Übergang: JobSubmitted -> sende ProcessJob und plane Timeout
Initially(
When(JobSubmittedEvent)
.ThenAsync(async context =>
{
// Werte aus der eingehenden Nachricht übernehmen
context.Saga.SubmittedAt = context.Message.Timestamp;
context.Saga.Regions = context.Message.Regions;
context.Saga.CurrentAttempt = 0;
if (context.Saga.Regions != null && context.Saga.Regions.Count > 0)
context.Saga.CurrentRegion = context.Saga.Regions[0];
Console.WriteLine($"[Saga] Job {context.Saga.CorrelationId} submitted. Starting in Region {context.Saga.CurrentRegion}.");
// Sende den ProcessJob-Befehl an die regionalspezifische Queue (z. B. "jobs-de")
await context.Publish(new ProcessJobCommand
{
CorrelationId = context.Saga.CorrelationId,
Region = context.Saga.CurrentRegion,
Attempt = context.Saga.CurrentAttempt
}, publishContext =>
{
// Setzt explizit die Zieladresse für die Nachricht:
publishContext.DestinationAddress = new Uri($"queue:jobs-{context.Saga.CurrentRegion.ToLower()}");
});
})
// Plane den Timeout direkt in der Kette
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.TransitionTo(Processing)
);
During(Processing,
When(JobCompletedEvent)
.Then(ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} completed successfully in region {ctx.Message.Region}.");
})
.Unschedule(JobTimeoutSchedule)
.TransitionTo(Completed),
When(JobFailedEvent)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} failed in region {ctx.Message.Region}. Error: {ctx.Message.ErrorMessage}");
// Erhöhe den Versuchszähler
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
// Setze die neue Region
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
// Sende den neuen ProcessJob-Befehl an die neue regionale Queue
await ctx.Publish(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
// Plane nach einem Fehlschlag (bzw. nach Timeout) einen neuen Timeout
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.IfElse(ctx => ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count,
binder => binder.TransitionTo(Processing),
binder => binder.TransitionTo(Failed)
),
When(JobTimeoutSchedule.Received)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Timeout in region {ctx.Saga.CurrentRegion} for job {ctx.Saga.CorrelationId}.");
// Bei Timeout wird der Versuch ebenfalls erhöht und ein Fallback eingeleitet
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback after timeout: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
await ctx.Publish(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
);
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/794 ... oc-anymore
MassTransit | Отправить сагу больше не доступна в МОК ⇐ C#
Место общения программистов C#
-
Anonymous
1739874295
Anonymous
У меня есть проблема с MassTransit и функцией SAGA. Fault rabbitmq: // localhost/monitoring-job-saga 489a0000-4100-0250-1852-08dd500596e7 shared.messaging contracts.jobsubmitted mainapp.donitoringjobstate (00: 00: 00.0334249)
sysstem.notsupex noble-ispectex noblexpeex или 00334249). В ioc
at massTransit.configuration.RegistrationsersEcollectionExtensions.tempsagarePository1.Send[T](ConsumeContext 1 Контекст, Исагаполитика2 policy, IPipe 1 Далее) в /_/src/masstransit/dependencyInection/configuration/RegistrationserviceCollectionextensions.cs:line 93
at massTransit.middleware.correlatedSagafilter2.Send(ConsumeContext 1 контекст, ipipe`1 Далее)
services.AddMassTransit(x =>
{
x.AddSagaStateMachine()
.InMemoryRepository();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUser);
h.Password(rabbitPass);
});
cfg.UseDelayedMessageScheduler();
cfg.ReceiveEndpoint(sagaQueue, e =>
{
e.StateMachineSaga(
context.GetRequiredService(),
context.GetRequiredService());
});
});
});
< /code>
monitoringjobstatemachine-class: < /p>
using MassTransit;
using MainApp.Data;
using Shared.MessagingContracts;
namespace MainApp
{
public class MonitoringJobStateMachine : MassTransitStateMachine
{
public State Submitted { get; private set; } = null!;
public State Processing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;
public Event JobSubmittedEvent { get; private set; } = default!;
public Event JobCompletedEvent { get; private set; } = default!;
public Event JobFailedEvent { get; private set; } = default!;
public Schedule JobTimeoutSchedule { get; private set; } = default!;
public MonitoringJobStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => JobSubmittedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
x.InsertOnInitial = true;
});
Event(() => JobCompletedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
Event(() => JobFailedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Konfiguriere den Timeout-Scheduler
Schedule(() => JobTimeoutSchedule, x => x.TimeoutTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(10);
s.Received = r => r.CorrelateById(ctx => ctx.Message.CorrelationId);
});
// Initialer Übergang: JobSubmitted -> sende ProcessJob und plane Timeout
Initially(
When(JobSubmittedEvent)
.ThenAsync(async context =>
{
// Werte aus der eingehenden Nachricht übernehmen
context.Saga.SubmittedAt = context.Message.Timestamp;
context.Saga.Regions = context.Message.Regions;
context.Saga.CurrentAttempt = 0;
if (context.Saga.Regions != null && context.Saga.Regions.Count > 0)
context.Saga.CurrentRegion = context.Saga.Regions[0];
Console.WriteLine($"[Saga] Job {context.Saga.CorrelationId} submitted. Starting in Region {context.Saga.CurrentRegion}.");
// Sende den ProcessJob-Befehl an die regionalspezifische Queue (z. B. "jobs-de")
await context.Publish(new ProcessJobCommand
{
CorrelationId = context.Saga.CorrelationId,
Region = context.Saga.CurrentRegion,
Attempt = context.Saga.CurrentAttempt
}, publishContext =>
{
// Setzt explizit die Zieladresse für die Nachricht:
publishContext.DestinationAddress = new Uri($"queue:jobs-{context.Saga.CurrentRegion.ToLower()}");
});
})
// Plane den Timeout direkt in der Kette
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.TransitionTo(Processing)
);
During(Processing,
When(JobCompletedEvent)
.Then(ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} completed successfully in region {ctx.Message.Region}.");
})
.Unschedule(JobTimeoutSchedule)
.TransitionTo(Completed),
When(JobFailedEvent)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} failed in region {ctx.Message.Region}. Error: {ctx.Message.ErrorMessage}");
// Erhöhe den Versuchszähler
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
// Setze die neue Region
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
// Sende den neuen ProcessJob-Befehl an die neue regionale Queue
await ctx.Publish(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
// Plane nach einem Fehlschlag (bzw. nach Timeout) einen neuen Timeout
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.IfElse(ctx => ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count,
binder => binder.TransitionTo(Processing),
binder => binder.TransitionTo(Failed)
),
When(JobTimeoutSchedule.Received)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Timeout in region {ctx.Saga.CurrentRegion} for job {ctx.Saga.CorrelationId}.");
// Bei Timeout wird der Versuch ebenfalls erhöht und ein Fallback eingeleitet
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback after timeout: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
await ctx.Publish(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
);
}
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79447877/masstransit-send-saga-is-not-available-in-ioc-anymore[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия