From cd4ea91adef422e14e471a7a2c1f8fec1db0c729 Mon Sep 17 00:00:00 2001 From: rusefi Date: Thu, 30 Jul 2020 23:31:19 -0400 Subject: [PATCH] proxy progress: PROTOCOL bugfix and unit test --- .../rusefi/binaryprotocol/BinaryProtocol.java | 4 +- .../binaryprotocol/IncomingDataBuffer.java | 2 +- .../rusefi/io/tcp/BinaryProtocolProxy.java | 1 + .../rusefi/io/tcp/BinaryProtocolServer.java | 1 + .../rusefi/proxy/BaseBroadcastingThread.java | 6 +- .../client/LocalApplicationProxyTest.java | 95 +++++++++++++++---- 6 files changed, 85 insertions(+), 24 deletions(-) diff --git a/java_console/io/src/main/java/com/rusefi/binaryprotocol/BinaryProtocol.java b/java_console/io/src/main/java/com/rusefi/binaryprotocol/BinaryProtocol.java index 46f52c525e..60e9ad546e 100644 --- a/java_console/io/src/main/java/com/rusefi/binaryprotocol/BinaryProtocol.java +++ b/java_console/io/src/main/java/com/rusefi/binaryprotocol/BinaryProtocol.java @@ -101,8 +101,10 @@ public class BinaryProtocol implements BinaryProtocolCommands { return "WRITE_CHUNK"; case Fields.TS_OUTPUT_COMMAND: return "TS_OUTPUT_COMMAND"; + case Fields.TS_RESPONSE_OK: + return "TS_RESPONSE_OK"; default: - return "command " + (char) +command + "/" + command; + return "command " + (char) command + "/" + command; } } diff --git a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java index d65bdc4913..d6a63e4417 100644 --- a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java +++ b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java @@ -205,7 +205,7 @@ public class IncomingDataBuffer { public void read(byte[] packet) throws EOFException { boolean isTimeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length); if (isTimeout) - throw new EOFException("Timeout while waiting " + packet.length); + throw new EOFException("Timeout while waiting for " + packet.length + " byte(s)"); getData(packet); } } diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java index 719ad130e3..1fa2e95f98 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java @@ -51,6 +51,7 @@ public class BinaryProtocolProxy { byte firstByte = clientStream.getDataBuffer().readByte(timeoutMs); if (firstByte == COMMAND_PROTOCOL) { clientStream.write(TS_PROTOCOL.getBytes()); + clientStream.flush(); continue; } BinaryProtocolServer.Packet clientRequest = readClientRequest(clientStream.getDataBuffer(), firstByte); diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java index 4e211c9317..6046850649 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java @@ -206,6 +206,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { byte first = in.readByte(ioTimeout); if (first == COMMAND_PROTOCOL) { protocolCommandHandler.handle(); + return 0; } return first * 256 + in.readByte(ioTimeout); } diff --git a/java_console/io/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java b/java_console/io/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java index 85776cf00c..99cba50024 100644 --- a/java_console/io/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java +++ b/java_console/io/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java @@ -13,8 +13,6 @@ import java.io.IOException; import java.net.Socket; import static com.devexperts.logging.Logging.getLogging; -import static com.rusefi.io.tcp.BinaryProtocolServer.getPacketLength; -import static com.rusefi.io.tcp.BinaryProtocolServer.readPromisedBytes; import static com.rusefi.shared.FileUtil.close; public class BaseBroadcastingThread { @@ -38,10 +36,10 @@ public class BaseBroadcastingThread { } else { ioTimeout = context.consecutivePacketTimeout(); } - int length = getPacketLength(in, () -> { + int length = BinaryProtocolServer.getPacketLength(in, () -> { throw new UnsupportedOperationException(); }, ioTimeout); - BinaryProtocolServer.Packet packet = readPromisedBytes(in, length); + BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(in, length); byte[] payload = packet.getPacket(); byte command = payload[0]; diff --git a/java_console/ui/src/test/java/com/rusefi/proxy/client/LocalApplicationProxyTest.java b/java_console/ui/src/test/java/com/rusefi/proxy/client/LocalApplicationProxyTest.java index 2680c2308f..6f0da58b02 100644 --- a/java_console/ui/src/test/java/com/rusefi/proxy/client/LocalApplicationProxyTest.java +++ b/java_console/ui/src/test/java/com/rusefi/proxy/client/LocalApplicationProxyTest.java @@ -3,6 +3,7 @@ package com.rusefi.proxy.client; import com.rusefi.BackendTestHelper; import com.rusefi.TestHelper; import com.rusefi.Timeouts; +import com.rusefi.binaryprotocol.BinaryProtocolCommands; import com.rusefi.config.generated.Fields; import com.rusefi.io.IoStream; import com.rusefi.io.commands.GetOutputsCommand; @@ -19,16 +20,20 @@ import org.junit.Test; import java.io.IOException; import java.net.MalformedURLException; +import java.net.Socket; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static com.rusefi.BackendTestHelper.createTestUserResolver; -import static com.rusefi.TestHelper.TEST_TOKEN_1; -import static com.rusefi.TestHelper.assertLatch; +import static com.rusefi.TestHelper.*; import static com.rusefi.Timeouts.SECOND; import static com.rusefi.binaryprotocol.BinaryProtocol.findCommand; import static com.rusefi.binaryprotocol.BinaryProtocol.sleep; +import static com.rusefi.config.generated.Fields.TS_PROTOCOL; +import static com.rusefi.io.tcp.BinaryProtocolServer.getPacketLength; import static com.rusefi.shared.FileUtil.close; +import static org.junit.Assert.*; public class LocalApplicationProxyTest { private static final AtomicInteger portNumber = new AtomicInteger(4000); @@ -59,12 +64,67 @@ public class LocalApplicationProxyTest { mockBackend.close(); } + @Test + public void testCommandProtocol() throws IOException, InterruptedException { + LocalApplicationProxyContext context = createLocalApplicationProxy(); + CountDownLatch gaugePokes = new CountDownLatch(3); + + try (ServerSocketReference ignored1 = createMockBackend(context, gaugePokes)) { + SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE); + ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1)); + + try (ServerSocketReference ignored2 = LocalApplicationProxy.startAndRun(context, applicationRequest, -1, TcpIoStream.DisconnectListener.VOID, LocalApplicationProxy.ConnectionListener.VOID)) { + TcpIoStream applicationConnection = new TcpIoStream("mock application ", new Socket(LOCALHOST, context.authenticatorPort())); + + byte[] protocolResponse = new byte[TS_PROTOCOL.length()]; + // request + applicationConnection.write(new byte[] {BinaryProtocolCommands.COMMAND_PROTOCOL}); + applicationConnection.flush(); + // response + applicationConnection.getDataBuffer().read(protocolResponse); + assertArrayEquals(protocolResponse, TS_PROTOCOL.getBytes()); + + // request again + applicationConnection.write(new byte[] {BinaryProtocolCommands.COMMAND_PROTOCOL}); + applicationConnection.flush(); + // response again + applicationConnection.getDataBuffer().read(protocolResponse); + assertArrayEquals(protocolResponse, TS_PROTOCOL.getBytes()); + + byte[] commandPacket = GetOutputsCommand.createRequest(); + applicationConnection.sendPacket(commandPacket); + BinaryProtocolServer.Packet response = applicationConnection.readPacket(); + assertEquals(Fields.TS_OUTPUT_SIZE + 1, response.getPacket().length); + } + } + } + @Test public void testGaugePoking() throws IOException, InterruptedException { LocalApplicationProxyContext context = createLocalApplicationProxy(); CountDownLatch gaugePokes = new CountDownLatch(3); + try (ServerSocketReference ignored1 = createMockBackend(context, gaugePokes)) { + + SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE); + ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1)); + + CountDownLatch disconnected = new CountDownLatch(1); + + try (ServerSocketReference ignored2 = LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID)) { + + // wait for three output requests to take place + assertLatch("gaugePokes", gaugePokes); + + // but there must be a disconnect after some time + assertLatch("disconnected", disconnected); + } + } + } + + @NotNull + private static ServerSocketReference createMockBackend(LocalApplicationProxyContext context, CountDownLatch gaugePokes) throws IOException, InterruptedException { CountDownLatch backendCreated = new CountDownLatch(1); ServerSocketReference mockBackend = BinaryProtocolServer.tcpServerSocket(context.serverPortForRemoteApplications(), "localAppTest", socket -> () -> { try { @@ -73,11 +133,23 @@ public class LocalApplicationProxyTest { HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer()); while (!socket.isClosed()) { - BinaryProtocolServer.Packet packet = applicationClientStream.readPacket(); + AtomicBoolean handled = new AtomicBoolean(); + + int length = getPacketLength(applicationClientStream.getDataBuffer(), new BinaryProtocolServer.Handler() { + @Override + public void handle() throws IOException { + applicationClientStream.write(TS_PROTOCOL.getBytes()); + handled.set(true); + } + }); + if (handled.get()) + continue; + + BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(applicationClientStream.getDataBuffer(), length); System.out.println("Got packet " + findCommand(packet.getPacket()[0])); if (packet.getPacket().length != 5) - throw new IllegalStateException(); + throw new IllegalStateException("Unexpected length " + packet.getPacket().length); GetOutputsCommand.sendOutput(applicationClientStream); gaugePokes.countDown(); @@ -89,20 +161,7 @@ public class LocalApplicationProxyTest { }, parameter -> backendCreated.countDown()); assertLatch(backendCreated); - - SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE); - ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1)); - - CountDownLatch disconnected = new CountDownLatch(1); - LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID); - - // wait for three output requests to take place - assertLatch("gaugePokes", gaugePokes); - - // but there must be a disconnect after some time - assertLatch("disconnected", disconnected); - - mockBackend.close(); + return mockBackend; } @NotNull