proxy progress: PROTOCOL bugfix and unit test
This commit is contained in:
parent
cc606bf515
commit
cd4ea91ade
|
@ -101,8 +101,10 @@ public class BinaryProtocol implements BinaryProtocolCommands {
|
|||
return "WRITE_CHUNK";
|
||||
case Fields.TS_OUTPUT_COMMAND:
|
||||
return "TS_OUTPUT_COMMAND";
|
||||
case Fields.TS_RESPONSE_OK:
|
||||
return "TS_RESPONSE_OK";
|
||||
default:
|
||||
return "command " + (char) +command + "/" + command;
|
||||
return "command " + (char) command + "/" + command;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -205,7 +205,7 @@ public class IncomingDataBuffer {
|
|||
public void read(byte[] packet) throws EOFException {
|
||||
boolean isTimeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length);
|
||||
if (isTimeout)
|
||||
throw new EOFException("Timeout while waiting " + packet.length);
|
||||
throw new EOFException("Timeout while waiting for " + packet.length + " byte(s)");
|
||||
getData(packet);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ public class BinaryProtocolProxy {
|
|||
byte firstByte = clientStream.getDataBuffer().readByte(timeoutMs);
|
||||
if (firstByte == COMMAND_PROTOCOL) {
|
||||
clientStream.write(TS_PROTOCOL.getBytes());
|
||||
clientStream.flush();
|
||||
continue;
|
||||
}
|
||||
BinaryProtocolServer.Packet clientRequest = readClientRequest(clientStream.getDataBuffer(), firstByte);
|
||||
|
|
|
@ -206,6 +206,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
|
|||
byte first = in.readByte(ioTimeout);
|
||||
if (first == COMMAND_PROTOCOL) {
|
||||
protocolCommandHandler.handle();
|
||||
return 0;
|
||||
}
|
||||
return first * 256 + in.readByte(ioTimeout);
|
||||
}
|
||||
|
|
|
@ -13,8 +13,6 @@ import java.io.IOException;
|
|||
import java.net.Socket;
|
||||
|
||||
import static com.devexperts.logging.Logging.getLogging;
|
||||
import static com.rusefi.io.tcp.BinaryProtocolServer.getPacketLength;
|
||||
import static com.rusefi.io.tcp.BinaryProtocolServer.readPromisedBytes;
|
||||
import static com.rusefi.shared.FileUtil.close;
|
||||
|
||||
public class BaseBroadcastingThread {
|
||||
|
@ -38,10 +36,10 @@ public class BaseBroadcastingThread {
|
|||
} else {
|
||||
ioTimeout = context.consecutivePacketTimeout();
|
||||
}
|
||||
int length = getPacketLength(in, () -> {
|
||||
int length = BinaryProtocolServer.getPacketLength(in, () -> {
|
||||
throw new UnsupportedOperationException();
|
||||
}, ioTimeout);
|
||||
BinaryProtocolServer.Packet packet = readPromisedBytes(in, length);
|
||||
BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(in, length);
|
||||
byte[] payload = packet.getPacket();
|
||||
|
||||
byte command = payload[0];
|
||||
|
|
|
@ -3,6 +3,7 @@ package com.rusefi.proxy.client;
|
|||
import com.rusefi.BackendTestHelper;
|
||||
import com.rusefi.TestHelper;
|
||||
import com.rusefi.Timeouts;
|
||||
import com.rusefi.binaryprotocol.BinaryProtocolCommands;
|
||||
import com.rusefi.config.generated.Fields;
|
||||
import com.rusefi.io.IoStream;
|
||||
import com.rusefi.io.commands.GetOutputsCommand;
|
||||
|
@ -19,16 +20,20 @@ import org.junit.Test;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.Socket;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static com.rusefi.BackendTestHelper.createTestUserResolver;
|
||||
import static com.rusefi.TestHelper.TEST_TOKEN_1;
|
||||
import static com.rusefi.TestHelper.assertLatch;
|
||||
import static com.rusefi.TestHelper.*;
|
||||
import static com.rusefi.Timeouts.SECOND;
|
||||
import static com.rusefi.binaryprotocol.BinaryProtocol.findCommand;
|
||||
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
|
||||
import static com.rusefi.config.generated.Fields.TS_PROTOCOL;
|
||||
import static com.rusefi.io.tcp.BinaryProtocolServer.getPacketLength;
|
||||
import static com.rusefi.shared.FileUtil.close;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class LocalApplicationProxyTest {
|
||||
private static final AtomicInteger portNumber = new AtomicInteger(4000);
|
||||
|
@ -59,12 +64,67 @@ public class LocalApplicationProxyTest {
|
|||
mockBackend.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommandProtocol() throws IOException, InterruptedException {
|
||||
LocalApplicationProxyContext context = createLocalApplicationProxy();
|
||||
CountDownLatch gaugePokes = new CountDownLatch(3);
|
||||
|
||||
try (ServerSocketReference ignored1 = createMockBackend(context, gaugePokes)) {
|
||||
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
|
||||
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
|
||||
|
||||
try (ServerSocketReference ignored2 = LocalApplicationProxy.startAndRun(context, applicationRequest, -1, TcpIoStream.DisconnectListener.VOID, LocalApplicationProxy.ConnectionListener.VOID)) {
|
||||
TcpIoStream applicationConnection = new TcpIoStream("mock application ", new Socket(LOCALHOST, context.authenticatorPort()));
|
||||
|
||||
byte[] protocolResponse = new byte[TS_PROTOCOL.length()];
|
||||
// request
|
||||
applicationConnection.write(new byte[] {BinaryProtocolCommands.COMMAND_PROTOCOL});
|
||||
applicationConnection.flush();
|
||||
// response
|
||||
applicationConnection.getDataBuffer().read(protocolResponse);
|
||||
assertArrayEquals(protocolResponse, TS_PROTOCOL.getBytes());
|
||||
|
||||
// request again
|
||||
applicationConnection.write(new byte[] {BinaryProtocolCommands.COMMAND_PROTOCOL});
|
||||
applicationConnection.flush();
|
||||
// response again
|
||||
applicationConnection.getDataBuffer().read(protocolResponse);
|
||||
assertArrayEquals(protocolResponse, TS_PROTOCOL.getBytes());
|
||||
|
||||
byte[] commandPacket = GetOutputsCommand.createRequest();
|
||||
applicationConnection.sendPacket(commandPacket);
|
||||
BinaryProtocolServer.Packet response = applicationConnection.readPacket();
|
||||
assertEquals(Fields.TS_OUTPUT_SIZE + 1, response.getPacket().length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGaugePoking() throws IOException, InterruptedException {
|
||||
LocalApplicationProxyContext context = createLocalApplicationProxy();
|
||||
|
||||
CountDownLatch gaugePokes = new CountDownLatch(3);
|
||||
|
||||
try (ServerSocketReference ignored1 = createMockBackend(context, gaugePokes)) {
|
||||
|
||||
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
|
||||
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
|
||||
|
||||
CountDownLatch disconnected = new CountDownLatch(1);
|
||||
|
||||
try (ServerSocketReference ignored2 = LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID)) {
|
||||
|
||||
// wait for three output requests to take place
|
||||
assertLatch("gaugePokes", gaugePokes);
|
||||
|
||||
// but there must be a disconnect after some time
|
||||
assertLatch("disconnected", disconnected);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static ServerSocketReference createMockBackend(LocalApplicationProxyContext context, CountDownLatch gaugePokes) throws IOException, InterruptedException {
|
||||
CountDownLatch backendCreated = new CountDownLatch(1);
|
||||
ServerSocketReference mockBackend = BinaryProtocolServer.tcpServerSocket(context.serverPortForRemoteApplications(), "localAppTest", socket -> () -> {
|
||||
try {
|
||||
|
@ -73,11 +133,23 @@ public class LocalApplicationProxyTest {
|
|||
HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer());
|
||||
|
||||
while (!socket.isClosed()) {
|
||||
BinaryProtocolServer.Packet packet = applicationClientStream.readPacket();
|
||||
AtomicBoolean handled = new AtomicBoolean();
|
||||
|
||||
int length = getPacketLength(applicationClientStream.getDataBuffer(), new BinaryProtocolServer.Handler() {
|
||||
@Override
|
||||
public void handle() throws IOException {
|
||||
applicationClientStream.write(TS_PROTOCOL.getBytes());
|
||||
handled.set(true);
|
||||
}
|
||||
});
|
||||
if (handled.get())
|
||||
continue;
|
||||
|
||||
BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(applicationClientStream.getDataBuffer(), length);
|
||||
System.out.println("Got packet " + findCommand(packet.getPacket()[0]));
|
||||
|
||||
if (packet.getPacket().length != 5)
|
||||
throw new IllegalStateException();
|
||||
throw new IllegalStateException("Unexpected length " + packet.getPacket().length);
|
||||
|
||||
GetOutputsCommand.sendOutput(applicationClientStream);
|
||||
gaugePokes.countDown();
|
||||
|
@ -89,20 +161,7 @@ public class LocalApplicationProxyTest {
|
|||
}, parameter -> backendCreated.countDown());
|
||||
|
||||
assertLatch(backendCreated);
|
||||
|
||||
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
|
||||
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
|
||||
|
||||
CountDownLatch disconnected = new CountDownLatch(1);
|
||||
LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID);
|
||||
|
||||
// wait for three output requests to take place
|
||||
assertLatch("gaugePokes", gaugePokes);
|
||||
|
||||
// but there must be a disconnect after some time
|
||||
assertLatch("disconnected", disconnected);
|
||||
|
||||
mockBackend.close();
|
||||
return mockBackend;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
|
|
Loading…
Reference in New Issue