Quarkus-Messaging-AMQP CloudEvent с расширениямиJAVA

Программисты JAVA общаются здесь
Anonymous
Quarkus-Messaging-AMQP CloudEvent с расширениями

Сообщение Anonymous »

Я создаю услугу Quarkus, которая принимает запрос SOAP и помещает данные в очередь AMQP после спецификации CloudEvent. Я хочу добавить дополнительный параметр метаданных (расширение), но в итоге сообщение не имеет его, и я не понимаю, почему и как заставить его работать (Quarkus для меня новый). < /P>
Код выглядит следующим образом:
Служба обмена сообщениями (с методами экспериментов, которые добавляют сообщения при запуске, и потребляет и печатает сообщения из очереди) < /p>
package com.example.messaging;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.ce.CloudEventMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import com.example.domain.Event;
import org.eclipse.microprofile.reactive.messaging.*;
import org.jboss.logging.Logger;

import java.time.ZonedDateTime;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class MessagingService {

private static final Logger LOG = Logger.getLogger(MessagingService.class);

@Inject
@Channel("events-out")
Emitter emitter;

@SuppressWarnings("unchecked")
public void send(Event event, String actionType) {
LOG.debug("Sending event with event number [%s]".formatted(event.getNumber()));
var message = Message.of(event);
var metadata = message.getMetadata(OutgoingCloudEventMetadata.class)
.orElseGet(() -> OutgoingCloudEventMetadata.builder().build());
message.addMetadata(OutgoingCloudEventMetadata.from(metadata)
.withExtension("action_type", actionType) // FIXME Does not work. Parameter is missing in the message.
.build());
emitter.send(message);
}

void onStart(@Observes StartupEvent ev) {
test();
}

public void test() {
var event1 = new Event();
event1.setNumber(1);
event1.setBool(true);
event1.setText("Hello World");
event1.setDatetime(ZonedDateTime.now().minusDays(1));

var event2 = new Event();
event2.setNumber(2);
event2.setDatetime(ZonedDateTime.now());

send(event1, "test");
send(event2, "test");
}

@Incoming("events-in")
public CompletionStage printMessage(Message message) {
System.out.println(message.getPayload());
message.getMetadata(CloudEventMetadata.class)
.ifPresent(metadata -> {
System.out.println("CloudEvent ID: " + metadata.getId());
System.out.println("CloudEvent Type: " + metadata.getType());
System.out.println("CloudEvent Source: " + metadata.getSource());
System.out.println("CloudEvent Timestamp: " + metadata.getTimeStamp());
System.out.println("CloudEvent content type: " + metadata.getDataContentType());
System.out.println("CloudEvent spec version: " + metadata.getSpecVersion());
System.out.println("CloudEvent extensions: " + metadata.getExtensions());
});

return message.ack();
}
}

класс фиктивного домена , чтобы проверить, работает ли он с необходимыми типами данных
package com.example.domain;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.xml.bind.annotation.XmlElement;
import jakarta.xml.bind.annotation.XmlRootElement;
import jakarta.xml.bind.annotation.XmlType;

import java.time.ZonedDateTime;
import java.util.Objects;

@XmlType(name = "Event", propOrder = {"text", "number", "bool", "datetime"})
@XmlRootElement(name = "Event")
public class Event {

private String text;
private Integer number;
private Boolean bool;
private ZonedDateTime datetime;

@JsonProperty("Text")
public String getText() {
return text;
}

@XmlElement(name = "Text")
public void setText(String text) {
this.text = text;
}

@JsonProperty("Number")
public Integer getNumber() {
return number;
}

@XmlElement(name = "Number")
public void setNumber(Integer number) {
this.number = number;
}

@JsonProperty("Bool")
public Boolean getBool() {
return bool;
}

@XmlElement(name = "Bool")
public void setBool(Boolean bool) {
this.bool = bool;
}

@JsonProperty("DateTime")
public ZonedDateTime getDatetime() {
return datetime;
}

@XmlElement(name = "DateTime")
public void setDatetime(ZonedDateTime datetime) {
this.datetime = datetime;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Event event = (Event) o;
return Objects.equals(text, event.text) && Objects.equals(number, event.number) && Objects.equals(bool, event.bool) && Objects.equals(datetime, event.datetime);
}

@Override
public int hashCode() {
return Objects.hash(text, number, bool, datetime);
}
}

build.gradle.kts
plugins {
java
id("io.quarkus")
}

repositories {
mavenCentral()
mavenLocal()
}

val quarkusPlatformGroupId: String by project
val quarkusPlatformArtifactId: String by project
val quarkusPlatformVersion: String by project

dependencies {
implementation(enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}"))
implementation(enforcedPlatform("${quarkusPlatformGroupId}:quarkus-cxf-bom:${quarkusPlatformVersion}"))

implementation("io.quarkus:quarkus-resteasy")
implementation("io.quarkiverse.cxf:quarkus-cxf:1.0.1")
implementation("io.quarkus:quarkus-arc")
implementation("io.quarkus:quarkus-messaging-amqp")

testImplementation("io.quarkus:quarkus-junit5")
testImplementation("io.rest-assured:rest-assured")
}

group = "com.example"
version = "1.0-SNAPSHOT"

java {
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
}

tasks.withType {
systemProperty("java.util.logging.manager", "org.jboss.logmanager.LogManager")
}
tasks.withType {
options.encoding = "UTF-8"
options.compilerArgs.add("-parameters")
}

application.properties
quarkus.cxf.path = /soap
quarkus.cxf.logging.enabled-for = both
quarkus.cxf.logging.pretty = true

mp.messaging.outgoing.events-out.address=events
mp.messaging.outgoing.events-out.cloud-events-type=com.example.service.event
mp.messaging.outgoing.events-out.cloud-events-source=/source
mp.messaging.outgoing.events-out.connector=smallrye-amqp

mp.messaging.incoming.events-in.address=events

конечная точка SOAP (вероятно, не релевантно)
package com.example.server;

import com.example.domain.Event;
import jakarta.jws.WebMethod;
import jakarta.jws.WebService;

@WebService
public interface EventService {

@WebMethod
String add(Event event);

@WebMethod
String update(Event event);

@WebMethod
String delete(Event event);
}
< /code>
package com.example.server;

import com.example.domain.Event;
import io.quarkiverse.cxf.annotation.CXFEndpoint;
import jakarta.inject.Inject;
import jakarta.jws.WebService;
import com.example.messaging.MessagingService;
import org.jboss.logging.Logger;

@CXFEndpoint("/event")
@WebService(serviceName = "EventService")
public class EventServiceImpl implements EventService {

private static final Logger LOG = Logger.getLogger(EventServiceImpl.class);

@Inject
MessagingService messagingService;

@Override
public String add(Event event) {
return processEvent(event, "add");
}

@Override
public String update(Event event) {
return processEvent(event, "update");
}

@Override
public String delete(Event event) {
return processEvent(event, "delete");
}

private String processEvent(Event event, String actionType) {
LOG.info("Event received. Event number [%s]; Action type [%s]".formatted(event.getNumber(), actionType));
try {
messagingService.send(event, actionType);
} catch (Exception e) {
LOG.error(e);
return "Unexpected error occurred while processing event";
}
return "Event processed";
}

}


Подробнее здесь: https://stackoverflow.com/questions/794 ... extentions

Вернуться в «JAVA»