Транзакции микросервисов с сагами оркестраторомC#

Место общения программистов C#
Ответить
Anonymous
 Транзакции микросервисов с сагами оркестратором

Сообщение Anonymous »

Я пытаюсь создать приложение микросервисов. Я хочу сделать проверку службы Auth/Register/Vogin Gateway, и изменить заголовки для прохождения внутренних служб и службы профиля пользователей.
Я сделал оркестратор SAGA с массовым транспортом в ASP.NET Core. Я получаю только исключения тайм -аута. Если да - SAGA попытки Создайте профиль автофинала с идентификатором профиля пользователя. Если да - пользователь получает ответ и т. Д. Auth, пользователь, сага -оркестратор - все разные службы < /p>
Код запроса службы Auth: < /p>
var correlationId = Guid.NewGuid();
var response = await requestClient.GetResponse(new StartRegistrationCommand
{
CorrelationId = correlationId,
UserEmail = email,
}, timeout: TimeSpan.FromSeconds(30));
< /code>
Авторитет Auth Mass Transit Register: < /p>
builder.Services.AddMassTransit(x =>
{
x.AddConsumer();
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(builder.Configuration["AUTH_SAGA_RABBIT_MQ_HOST"], h =>
{
h.Username(builder.Configuration["AUTH_SAGA_RABBIT_MQ_USER"]);
h.Password(builder.Configuration["AUTH_SAGA_RABBIT_MQ_PASSWORD"]);
});
cfg.ReceiveEndpoint("auth_profile_queue", e =>
{
e.ConfigureConsumer(ctx);
});
});

x.AddRequestClient(timeout: TimeSpan.FromMinutes(5));
});
< /code>
Код события службы пользователя: < /p>
public async Task Consume(ConsumeContext context)
{
try
{
logger.LogInformation($"Creating empty user profile of saga registration process. Correlation: {context.CorrelationId}");
var result = await userService.CreateEmptyProfileAsync(context.Message.Email);
if (!result.Success)
{
logger.LogError($"Failed to create profile: {context.Message.Email} with error: {result.ErrorMessage} and code: {result.ErrorCode}");
await context.Publish(new UserRejectedCommand(context.Message.CorrelationId));
return;
}

await context.RespondAsync(new UserCreatedCommand(context.Message.CorrelationId, result.ProfileId));
// await context.Publish(new UserCreatedCommand(context.Message.CorrelationId, result.ProfileId));
}
catch (Exception e)
{
logger.LogError($"Failed to create profile: {context.Message.Email} with error: {e.Message}");
// await context.Publish(new UserRejectedCommand(context.Message.CorrelationId));
await context.RespondAsync(new UserRejectedCommand(context.Message.CorrelationId));
}
}
< /code>
Код регистра оркестратора саги: < /p>
builder.Services.AddMassTransit(x =>
{
x.AddSagaStateMachine()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext();
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
});
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(builder.Configuration["SAGA_RABBIT_MQ_HOST"], h =>
{
h.Username(builder.Configuration["SAGA_RABBIT_MQ_USER"]);
h.Password(builder.Configuration["SAGA_RABBIT_MQ_PASSWORD"]);
});
cfg.UseJsonSerializer();
cfg.ReceiveEndpoint("saga_queue", e =>
{
e.ConfigureSaga(ctx);
});
cfg.UseMessageRetry(r => r.Intervals(100, 500, 1000));
});
});
< /code>
orchestrator saga: < /p>
public class RegistrationSaga : MassTransitStateMachine
{
private readonly ILogger _logger;

public State ProfileCreating { get; private set; }
public State AuthCreating { get; private set; }
public State Completed { get; private set; }
public State Failed { get; private set; }

public Event StartRegistration { get; private set; }

public Event ProfileCreated { get; private set; }
public Event ProfileRejected { get; private set; }

public Event AuthCreated { get; private set; }
public Event AuthRejected { get; private set; }

public RegistrationSaga(ILogger logger)
{
_logger = logger;
InstanceState(x => x.CurrentState);

Event(() => StartRegistration, x => x.CorrelateById(context => context.Message.CorrelationId));

Event(() => ProfileCreated, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => ProfileRejected, x => x.CorrelateById(context => context.Message.CorrelationId));

Event(() => AuthCreated, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => AuthRejected, x => x.CorrelateById(context => context.Message.CorrelationId));

Initially(
When(StartRegistration)
.Then(ctx =>
{
ctx.Saga.CorrelationId = ctx.Message.CorrelationId;
ctx.Saga.Email = ctx.Message.UserEmail;

ctx.Saga.RequestId = ctx.RequestId;
ctx.Saga.ResponseAddress = ctx.ResponseAddress;

logger.LogInformation($"Registration started: {ctx.Saga.CorrelationId}, RequestId: {ctx.RequestId}, ResponseAddress: {ctx.ResponseAddress}");
})
.Publish(ctx => new CreateUserCommand(ctx.Saga.CorrelationId, ctx.Saga.Email))
.TransitionTo(ProfileCreating)
);

During(ProfileCreating,
When(ProfileCreated)
.Then(ctx =>
{
ctx.Saga.CorrelationId = ctx.Message.CorrelationId;
ctx.Saga.UserProfileId = ctx.Message.UserProfileId;

logger.LogInformation($"Profile created for registration: {ctx.Saga.CorrelationId}");
})
.Publish(ctx => new CreateAuthCommand(ctx.Saga.CorrelationId, ctx.Saga.Email, "password_placeholder", ctx.Saga.UserProfileId))
.TransitionTo(AuthCreating),

When(ProfileRejected)
.Then(ctx =>
{
ctx.Saga.CorrelationId = ctx.Message.CorrelationId;
logger.LogError($"Registration profile rejected: {ctx.Saga.RequestId}");
})
.RespondAsync(ctx => ctx.Init(new {
CorrelationId = ctx.Message.CorrelationId,
Error = "Profile creation failed"
}))
.TransitionTo(Failed)
.Finalize()
);

During(AuthCreating,
When(AuthCreated)
.Then(ctx =>
{
ctx.Saga.CorrelationId = ctx.Message.CorrelationId;
logger.LogInformation($"Registration completed: {ctx.Saga.CorrelationId}");
})
.Respond(ctx => new RegistrationInfoMessage
{
CorrelationId = ctx.Message.CorrelationId,
UserProfileId = ctx.Saga.UserProfileId,
})
.TransitionTo(Completed)
.Finalize(),

When(AuthRejected)
.Then(ctx =>
{
ctx.Saga.CorrelationId = ctx.Message.CorrelationId;
logger.LogError($"Registration auth rejected: {ctx.Saga.CorrelationId}");
})
// Компенсация - удаляем профиль
.Publish(ctx => new DeleteUserCommand(ctx.Saga.CorrelationId, ctx.Saga.UserProfileId))
.Respond(ctx => new RegistrationInfoMessage
{
CorrelationId = ctx.Message.CorrelationId,
Error = "Auth creation failed"
})
.TransitionTo(Failed)
.Finalize()
);

SetCompletedWhenFinalized();
}
}


Подробнее здесь: https://stackoverflow.com/questions/797 ... chestrator
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»