Почему Apache Cassandra не отвечает на сообщение запуска?JAVA

Программисты JAVA общаются здесь
Anonymous
Почему Apache Cassandra не отвечает на сообщение запуска?

Сообщение Anonymous »

Я хочу сделать небольшую адаптацию драйвера для Apache Cassandra, чтобы понять, как обычно работает взаимодействие на уровне с низким применением. Я использую io.netty для этого, но по какой -то причине неизвестно мне, сервер не отправляет никаких сообщений об ошибке или запросе на авторизацию обратно. Если вы имеете опыт в этом вопросе, сообщите мне, как я могу разрешить свой собственный драйвер. Спасибо.public class NativeCQLConnection extends ChannelInboundHandlerAdapter implements Runnable
{
private final Bootstrap client;
private final NioEventLoopGroup group;

private static final String HOST = "127.0.0.1";
private static final int PORT = 9042;
private final Initializer initializer;
private final Bootstrap handler;

public NativeCQLConnection()
{
this.client = new Bootstrap();
this.group = new NioEventLoopGroup();
this.initializer = new Initializer(this, "cassandra", "cassandra");

this.handler = this.client.group(this.group).channel(NioSocketChannel.class).handler(this.initializer);
}

public static void main(String[] args)
{
new NativeCQLConnection().run();
}

@Override
public void run()
{
handler.connect(HOST, PORT);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
ByteBuf startupMessage = this.initializer.createStartupMessage();
ctx.writeAndFlush(startupMessage);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
this.group.shutdownGracefully();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
cause.printStackTrace();
}

private static final class Initializer extends ChannelInitializer
{
private final ChannelHandler handler;
private final byte[] username;
private final byte[] password;

public Initializer(ChannelHandler handler, String username, String password)
{
this.handler = handler;
this.username = username.getBytes(StandardCharsets.UTF_8);
this.password = password.getBytes(StandardCharsets.UTF_8);
}

@Override
protected void initChannel(SocketChannel channel) throws Exception
{
channel.pipeline().addLast(new MessageDecoder(this));
channel.pipeline().addLast(new MessageEncoder(this));

channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));

channel.pipeline().addLast(this.handler);
}

private static final String CQL_VERSION_OPTION = "CQL_VERSION";
private static final String CQL_VERSION = "3.0.0";
private static final String DRIVER_VERSION_OPTION = "DRIVER_VERSION";
private static final String DRIVER_NAME_OPTION = "DRIVER_NAME";
private static final String DRIVER_NAME = "Apache Cassandra Java Driver";

static final String COMPRESSION_OPTION = "COMPRESSION";
static final String NO_COMPACT_OPTION = "NO_COMPACT";

public ByteBuf createAuthResponse(ChannelHandlerContext ctx) {
byte[] initialToken = initialResponse();
ByteBuf buffer = ctx.alloc().buffer(initialToken.length);
buffer.writeByte(0x0F); // AUTH_RESPONSE opcode
buffer.writeInt(0); // Stream ID
buffer.writeInt(initialToken.length);
buffer.writeBytes(initialToken);
return buffer;
}

public ByteBuf createStartupMessage() {
ImmutableMap.Builder options = new ImmutableMap.Builder();
options.put(CQL_VERSION_OPTION, CQL_VERSION);
options.put(COMPRESSION_OPTION, "");
options.put(NO_COMPACT_OPTION, "true");
options.put(DRIVER_VERSION_OPTION, "3.12.2-SNAPSHOT");
options.put(DRIVER_NAME_OPTION, DRIVER_NAME);

ByteBuf body = Unpooled.buffer();
Writer.writeStringMap(options.build(), body);

ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(0x04); // version protocol
buffer.writeByte(0x00); // flags
buffer.writeByte(0x00); // stream od
buffer.writeByte(0x01); // Opcode (STARTUP)
buffer.writeInt(body.readableBytes()); // length body
buffer.writeBytes(body); // body

return buffer;
}

public byte[] initialResponse() {
byte[] initialToken = new byte[username.length + password.length + 2];
initialToken[0] = 0;
System.arraycopy(username, 0, initialToken, 1, username.length);
initialToken[username.length + 1] = 0;
System.arraycopy(password, 0, initialToken, username.length + 2, password.length);
return initialToken;
}
}

private static final class MessageEncoder extends MessageToMessageEncoder
{
private final Initializer initializer;

public MessageEncoder(Initializer initializer)
{
this.initializer = initializer;
}

@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
out.add(msg.retain());
}
}

private static final class MessageDecoder extends MessageToMessageDecoder {

private final Initializer initializer;

public MessageDecoder(Initializer initializer)
{
this.initializer = initializer;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
System.out.println(msg.toString(StandardCharsets.UTF_8));

if (msg.readableBytes() > 0) {
byte opcodeByte = msg.readByte();
int opcode = Byte.toUnsignedInt(opcodeByte);
System.out.println("Opcode: " + opcode);

if (opcode == 0x03) { // AUTHENTICATE opcode
System.out.println("Server requires authentication");
} else if (opcode == 0x0E) { // AUTH_CHALLENGE opcode
ByteBuf authResponse = this.initializer.createAuthResponse(ctx);
ctx.writeAndFlush(authResponse);
} else {
System.out.println("Unknown opcode: " + opcode);
}
}
}
}

public static final class Writer
{
public static void writeStringMap(Map m, ByteBuf cb) {
cb.writeShort(m.size());
for (Map.Entry entry : m.entrySet()) {
writeString(entry.getKey(), cb);
writeString(entry.getValue(), cb);
}
}

public static void writeString(String str, ByteBuf cb) {
byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
cb.writeShort(bytes.length);
cb.writeBytes(bytes);
}
}
}


Подробнее здесь: https://stackoverflow.com/questions/794 ... up-message

Вернуться в «JAVA»