diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java index 0104bcd1fa..5da1d462e3 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java @@ -58,15 +58,15 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { private static ConcurrentHashMap THREAD_FACTORIES_BY_NAME = new ConcurrentHashMap<>(); public void start(LinkManager linkManager) { - start(linkManager, DEFAULT_PROXY_PORT, Listener.empty()); + start(linkManager, DEFAULT_PROXY_PORT, Listener.empty(), new Context()); } - public void start(LinkManager linkManager, int port, Listener serverSocketCreationCallback) { + public void start(LinkManager linkManager, int port, Listener serverSocketCreationCallback, Context context) { log.info("BinaryProtocolServer on " + port); Function clientSocketRunnableFactory = clientSocket -> () -> { try { - runProxy(linkManager, clientSocket); + runProxy(linkManager, clientSocket, context); } catch (IOException e) { log.info("proxy connection: " + e); } @@ -107,7 +107,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { log.info("Client socket closed right away " + e); continue; } - log.info("Binary protocol proxy port connection"); + log.info("Accepting binary protocol proxy port connection on " + port); threadFactory.newThread(clientSocketRunnableFactory.apply(clientSocket)).start(); } }; @@ -121,7 +121,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { } @SuppressWarnings("InfiniteLoopStatement") - private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException { + private void runProxy(LinkManager linkManager, Socket clientSocket, Context context) throws IOException { TcpIoStream stream = new TcpIoStream("[proxy] ", clientSocket); IncomingDataBuffer in = stream.getDataBuffer(); @@ -133,7 +133,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { handled.set(true); }; - int length = getPacketLength(in, protocolCommandHandler); + int length = getPacketLength(in, protocolCommandHandler, context.getTimeout()); if (handled.get()) { continue; } @@ -305,4 +305,10 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { return crc; } } + + public static class Context { + public int getTimeout() { + return Timeouts.BINARY_IO_TIMEOUT; + } + } } \ No newline at end of file diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/TcpIoStream.java b/java_console/io/src/main/java/com/rusefi/io/tcp/TcpIoStream.java index 0b3bd7cacb..862d1c93c1 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/TcpIoStream.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/TcpIoStream.java @@ -47,9 +47,14 @@ public class TcpIoStream extends AbstractIoStream { @Override public void close() { - super.close(); + // we need to guarantee only one onDisconnect invocation for retry logic to be healthy + synchronized (this) { + if (!isClosed()) { + super.close(); + disconnectListener.onDisconnect("on close"); + } + } FileUtil.close(socket); - disconnectListener.onDisconnect(); } @Override @@ -79,9 +84,9 @@ public class TcpIoStream extends AbstractIoStream { } public interface DisconnectListener { - DisconnectListener VOID = () -> { + DisconnectListener VOID = (String message) -> { }; - void onDisconnect(); + void onDisconnect(String message); } } diff --git a/java_console/io/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java b/java_console/io/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java index b25cc05dee..c88af8a363 100644 --- a/java_console/io/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java +++ b/java_console/io/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java @@ -2,11 +2,9 @@ package com.rusefi.proxy; import com.devexperts.logging.Logging; import com.rusefi.NamedThreadFactory; -import com.rusefi.Timeouts; import com.rusefi.binaryprotocol.IncomingDataBuffer; import com.rusefi.config.generated.Fields; import com.rusefi.io.commands.HelloCommand; -import com.rusefi.io.tcp.BinaryProtocolProxy; import com.rusefi.io.tcp.BinaryProtocolServer; import com.rusefi.io.tcp.TcpIoStream; import com.rusefi.server.SessionDetails; @@ -21,29 +19,30 @@ import static com.rusefi.shared.FileUtil.close; public class BaseBroadcastingThread { private static final Logging log = getLogging(BaseBroadcastingThread.class); - private static final NamedThreadFactory BASE_BROADCASTING_THREAD = new NamedThreadFactory("BaseBroadcastingThread"); + private static final NamedThreadFactory BASE_BROADCASTING_THREAD = new NamedThreadFactory("network connector"); private final Thread thread; @SuppressWarnings("InfiniteLoopStatement") - public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, TcpIoStream.DisconnectListener disconnectListener) throws IOException { - + public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, TcpIoStream.DisconnectListener disconnectListener, NetworkConnectorContext context) { thread = BASE_BROADCASTING_THREAD.newThread(() -> { TcpIoStream stream = null; try { - stream = new TcpIoStream("[broadcast] ", socket, disconnectListener); + stream = new TcpIoStream("[network connector] ", socket, disconnectListener); IncomingDataBuffer in = stream.getDataBuffer(); boolean isFirstHello = true; while (true) { int ioTimeout; if (isFirstHello) { log.info("Waiting for proxy server to request session details"); - ioTimeout = Timeouts.CMD_TIMEOUT; + ioTimeout = context.firstPacketTimeout(); } else { - ioTimeout = BinaryProtocolProxy.USER_IO_TIMEOUT; + ioTimeout = context.consecutivePacketTimeout(); } + log.info("TEMPLOG READ " + ioTimeout); int length = getPacketLength(in, () -> { throw new UnsupportedOperationException(); }, ioTimeout); + log.info("TEMPLOG len " + 0); BinaryProtocolServer.Packet packet = readPromisedBytes(in, length); byte[] payload = packet.getPacket(); diff --git a/java_console/io/src/main/java/com/rusefi/proxy/NetworkConnector.java b/java_console/io/src/main/java/com/rusefi/proxy/NetworkConnector.java index 2d4075a661..f7d873c591 100644 --- a/java_console/io/src/main/java/com/rusefi/proxy/NetworkConnector.java +++ b/java_console/io/src/main/java/com/rusefi/proxy/NetworkConnector.java @@ -17,6 +17,7 @@ import com.rusefi.server.rusEFISSLContext; import com.rusefi.tools.online.HttpUtil; import org.jetbrains.annotations.NotNull; +import java.io.Closeable; import java.io.IOException; import java.net.Socket; import java.util.concurrent.CountDownLatch; @@ -29,10 +30,11 @@ import static com.rusefi.binaryprotocol.BinaryProtocol.sleep; * Connector between rusEFI ECU and rusEFI server * see NetworkConnectorStartup */ -public class NetworkConnector { +public class NetworkConnector implements Closeable { private final static Logging log = Logging.getLogging(NetworkConnector.class); + private boolean isClosed; - public static NetworkConnectorResult runNetworkConnector(String authToken, String controllerPort, NetworkConnectorContext context, ReconnectListener reconnectListener) { + public NetworkConnectorResult runNetworkConnector(String authToken, String controllerPort, NetworkConnectorContext context, ReconnectListener reconnectListener) { LinkManager controllerConnector = new LinkManager() .setCompositeLogicEnabled(false) .setNeedPullData(false); @@ -65,19 +67,20 @@ public class NetworkConnector { int oneTimeToken = SessionDetails.createOneTimeCode(); - new Thread(() -> { + BinaryProtocolServer.getThreadFactory("Proxy Reconnect").newThread(() -> { Semaphore proxyReconnectSemaphore = new Semaphore(1); try { - while (true) { + while (!isClosed) { proxyReconnectSemaphore.acquire(); try { - runNetworkConnector(context.serverPortForControllers(), controllerConnector, authToken, () -> { - log.error("Disconnect from proxy server detected, now sleeping " + context.reconnectDelay() + " seconds"); + runNetworkConnector(context.serverPortForControllers(), controllerConnector, authToken, (String message) -> { + log.error(message + " Disconnect from proxy server detected, now sleeping " + context.reconnectDelay() + " seconds"); sleep(context.reconnectDelay() * Timeouts.SECOND); + log.debug("Releasing semaphore"); proxyReconnectSemaphore.release(); reconnectListener.onReconnect(); - }, oneTimeToken, controllerInfo); + }, oneTimeToken, controllerInfo, context); } catch (IOException e) { log.error("IO error", e); } @@ -85,13 +88,13 @@ public class NetworkConnector { } catch (InterruptedException e) { throw new IllegalStateException(e); } - }, "Proxy Reconnect").start(); + }).start(); return new NetworkConnectorResult(controllerInfo, oneTimeToken); } @NotNull - private static SessionDetails runNetworkConnector(int serverPortForControllers, LinkManager linkManager, String authToken, final TcpIoStream.DisconnectListener disconnectListener, int oneTimeToken, ControllerInfo controllerInfo) throws IOException { + private static SessionDetails runNetworkConnector(int serverPortForControllers, LinkManager linkManager, String authToken, final TcpIoStream.DisconnectListener disconnectListener, int oneTimeToken, ControllerInfo controllerInfo, final NetworkConnectorContext context) throws IOException { IoStream targetEcuSocket = linkManager.getConnector().getBinaryProtocol().getStream(); SessionDetails deviceSessionDetails = new SessionDetails(controllerInfo, authToken, oneTimeToken); @@ -102,12 +105,12 @@ public class NetworkConnector { socket = rusEFISSLContext.getSSLSocket(HttpUtil.RUSEFI_PROXY_HOSTNAME, serverPortForControllers); } catch (IOException e) { // socket open exception is a special case and should be handled separately - disconnectListener.onDisconnect(); + disconnectListener.onDisconnect("on socket open"); return deviceSessionDetails; } BaseBroadcastingThread baseBroadcastingThread = new BaseBroadcastingThread(socket, deviceSessionDetails, - disconnectListener) { + disconnectListener, context) { @Override protected void handleCommand(BinaryProtocolServer.Packet packet, TcpIoStream stream) throws IOException { super.handleCommand(packet, stream); @@ -138,6 +141,11 @@ public class NetworkConnector { return new ControllerInfo(vehicleName, engineMake, engineCode, controllerSignature); } + @Override + public void close() { + isClosed = true; + } + public static class NetworkConnectorResult { static NetworkConnectorResult ERROR = new NetworkConnectorResult(null, 0); private final ControllerInfo controllerInfo; diff --git a/java_console/io/src/main/java/com/rusefi/proxy/NetworkConnectorContext.java b/java_console/io/src/main/java/com/rusefi/proxy/NetworkConnectorContext.java index b200296095..5336f4758c 100644 --- a/java_console/io/src/main/java/com/rusefi/proxy/NetworkConnectorContext.java +++ b/java_console/io/src/main/java/com/rusefi/proxy/NetworkConnectorContext.java @@ -1,5 +1,7 @@ package com.rusefi.proxy; +import com.rusefi.Timeouts; +import com.rusefi.io.tcp.BinaryProtocolProxy; import com.rusefi.tools.online.ProxyClient; public class NetworkConnectorContext { @@ -7,6 +9,14 @@ public class NetworkConnectorContext { return 15; // this one is seconds } + public int firstPacketTimeout() { + return Timeouts.CMD_TIMEOUT; + } + + public int consecutivePacketTimeout() { + return BinaryProtocolProxy.USER_IO_TIMEOUT; + } + public int serverPortForControllers() { return ProxyClient.SERVER_PORT_FOR_CONTROLLERS; } diff --git a/java_console/io/src/test/java/com/rusefi/TestHelper.java b/java_console/io/src/test/java/com/rusefi/TestHelper.java index 61b8af35e2..d7e2ec0ec8 100644 --- a/java_console/io/src/test/java/com/rusefi/TestHelper.java +++ b/java_console/io/src/test/java/com/rusefi/TestHelper.java @@ -31,6 +31,7 @@ public class TestHelper { public static final String TEST_SIGNATURE_2 = "rusEFI 2020.07.11.proteus_f4.1986715563"; public static final ControllerInfo CONTROLLER_INFO = new ControllerInfo("name", "make", "code", Fields.TS_SIGNATURE); public static final String TEST_TOKEN_1 = "00000000-1234-1234-1234-123456789012"; + public static final String TEST_TOKEN_3 = "33333333-3333-1234-1234-123456789012"; @NotNull public static ScalarIniField createIniField(Field field) { @@ -46,7 +47,7 @@ public class TestHelper { } @NotNull - public static BinaryProtocolServer createVirtualController(ConfigurationImage ci, int port, Listener serverSocketCreationCallback) { + public static BinaryProtocolServer createVirtualController(ConfigurationImage ci, int port, Listener serverSocketCreationCallback, BinaryProtocolServer.Context context) { BinaryProtocolState state = new BinaryProtocolState(); state.setController(ci); state.setCurrentOutputs(new byte[1 + Fields.TS_OUTPUT_SIZE]); @@ -54,12 +55,12 @@ public class TestHelper { LinkManager linkManager = new LinkManager(); linkManager.setConnector(LinkConnector.getDetachedConnector(state)); BinaryProtocolServer server = new BinaryProtocolServer(); - server.start(linkManager, port, serverSocketCreationCallback); + server.start(linkManager, port, serverSocketCreationCallback, context); return server; } @NotNull - public static IoStream secureConnectToLocalhost(int controllerPort, Logger logger) { + public static IoStream secureConnectToLocalhost(int controllerPort) { IoStream targetEcuSocket; try { targetEcuSocket = new TcpIoStream("[local]", rusEFISSLContext.getSSLSocket(LOCALHOST, controllerPort)); @@ -80,9 +81,9 @@ public class TestHelper { return targetEcuSocket; } - public static BinaryProtocolServer createVirtualController(int controllerPort, ConfigurationImage controllerImage) throws InterruptedException { + public static BinaryProtocolServer createVirtualController(int controllerPort, ConfigurationImage controllerImage, BinaryProtocolServer.Context context) throws InterruptedException { CountDownLatch controllerCreated = new CountDownLatch(1); - BinaryProtocolServer server = createVirtualController(controllerImage, controllerPort, parameter -> controllerCreated.countDown()); + BinaryProtocolServer server = createVirtualController(controllerImage, controllerPort, parameter -> controllerCreated.countDown(), context); assertLatch(controllerCreated); return server; } diff --git a/java_console/ui/src/main/java/com/rusefi/tools/NetworkConnectorStartup.java b/java_console/ui/src/main/java/com/rusefi/tools/NetworkConnectorStartup.java index f053ccdf3c..87e8fd3589 100644 --- a/java_console/ui/src/main/java/com/rusefi/tools/NetworkConnectorStartup.java +++ b/java_console/ui/src/main/java/com/rusefi/tools/NetworkConnectorStartup.java @@ -25,7 +25,7 @@ public class NetworkConnectorStartup { NetworkConnectorContext connectorContext = new NetworkConnectorContext(); - NetworkConnector.NetworkConnectorResult networkConnectorResult = NetworkConnector.runNetworkConnector(authToken, autoDetectedPort, connectorContext, NetworkConnector.ReconnectListener.VOID); + NetworkConnector.NetworkConnectorResult networkConnectorResult = new NetworkConnector().runNetworkConnector(authToken, autoDetectedPort, connectorContext, NetworkConnector.ReconnectListener.VOID); log.info("Running with oneTimeToken=" + networkConnectorResult.getOneTimeToken()); } } diff --git a/java_console/ui/src/test/java/com/rusefi/FullServerTest.java b/java_console/ui/src/test/java/com/rusefi/FullServerTest.java index 9c74430ac6..244b2a7ad3 100644 --- a/java_console/ui/src/test/java/com/rusefi/FullServerTest.java +++ b/java_console/ui/src/test/java/com/rusefi/FullServerTest.java @@ -6,6 +6,7 @@ import com.rusefi.binaryprotocol.BinaryProtocol; import com.rusefi.config.generated.Fields; import com.rusefi.io.ConnectionStateListener; import com.rusefi.io.LinkManager; +import com.rusefi.io.tcp.BinaryProtocolServer; import com.rusefi.io.tcp.TcpIoStream; import com.rusefi.proxy.NetworkConnector; import com.rusefi.proxy.NetworkConnectorContext; @@ -86,7 +87,7 @@ public class FullServerTest { // create virtual controller to which "rusEFI network connector" connects to int controllerPort = 7002; ConfigurationImage controllerImage = prepareImage(value, createIniField(Fields.CYLINDERSCOUNT)); - TestHelper.createVirtualController(controllerPort, controllerImage); + TestHelper.createVirtualController(controllerPort, controllerImage, new BinaryProtocolServer.Context()); NetworkConnectorContext networkConnectorContext = new NetworkConnectorContext() { @Override @@ -96,12 +97,12 @@ public class FullServerTest { }; // start "rusEFI network connector" to connect controller with backend since in real life controller has only local serial port it does not have network - NetworkConnector.NetworkConnectorResult networkConnectorResult = NetworkConnector.runNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.LOCALHOST + ":" + controllerPort, networkConnectorContext, NetworkConnector.ReconnectListener.VOID); + NetworkConnector.NetworkConnectorResult networkConnectorResult = new NetworkConnector().runNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.LOCALHOST + ":" + controllerPort, networkConnectorContext, NetworkConnector.ReconnectListener.VOID); ControllerInfo controllerInfo = networkConnectorResult.getControllerInfo(); TestHelper.assertLatch("controllerRegistered", controllerRegistered); - SessionDetails authenticatorSessionDetails = new SessionDetails(controllerInfo, MockRusEfiDevice.TEST_TOKEN_3, networkConnectorResult.getOneTimeToken()); + SessionDetails authenticatorSessionDetails = new SessionDetails(controllerInfo, TEST_TOKEN_3, networkConnectorResult.getOneTimeToken()); ApplicationRequest applicationRequest = new ApplicationRequest(authenticatorSessionDetails, userDetailsResolver.apply(TestHelper.TEST_TOKEN_1)); // start authenticator diff --git a/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java b/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java deleted file mode 100644 index ccd36daf82..0000000000 --- a/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.rusefi; - -import com.rusefi.config.generated.Fields; -import com.rusefi.io.commands.GetOutputsCommand; -import com.rusefi.io.tcp.BinaryProtocolServer; -import com.rusefi.io.tcp.TcpIoStream; -import com.rusefi.proxy.BaseBroadcastingThread; -import com.rusefi.server.SessionDetails; -import com.rusefi.server.rusEFISSLContext; - -import java.io.IOException; -import java.net.Socket; - -import static com.rusefi.TestHelper.LOCALHOST; - -public class MockRusEfiDevice { - public static final String TEST_TOKEN_3 = "33333333-3333-1234-1234-123456789012"; - private final SessionDetails sessionDetails; - - public MockRusEfiDevice(String authToken, String signature) { - sessionDetails = TestHelper.createTestSession(authToken, signature); - } - - public void connect(int serverPort) throws IOException { - Socket socket = rusEFISSLContext.getSSLSocket(LOCALHOST, serverPort); - BaseBroadcastingThread baseBroadcastingThread = new BaseBroadcastingThread(socket, - sessionDetails, - TcpIoStream.DisconnectListener.VOID) { - @Override - protected void handleCommand(BinaryProtocolServer.Packet packet, TcpIoStream stream) throws IOException { - super.handleCommand(packet, stream); - - if (packet.getPacket()[0] == Fields.TS_OUTPUT_COMMAND) { - GetOutputsCommand.sendOutput(stream); - } - } - }; - baseBroadcastingThread.start(); - } - -} diff --git a/java_console/ui/src/test/java/com/rusefi/ServerTest.java b/java_console/ui/src/test/java/com/rusefi/ServerTest.java index 793d722822..df25aeacc3 100644 --- a/java_console/ui/src/test/java/com/rusefi/ServerTest.java +++ b/java_console/ui/src/test/java/com/rusefi/ServerTest.java @@ -3,7 +3,12 @@ package com.rusefi; import com.opensr5.Logger; import com.rusefi.config.generated.Fields; import com.rusefi.io.IoStream; +import com.rusefi.io.commands.GetOutputsCommand; import com.rusefi.io.commands.HelloCommand; +import com.rusefi.io.tcp.BinaryProtocolServer; +import com.rusefi.io.tcp.TcpIoStream; +import com.rusefi.proxy.BaseBroadcastingThread; +import com.rusefi.proxy.NetworkConnectorContext; import com.rusefi.proxy.client.LocalApplicationProxy; import com.rusefi.server.*; import com.rusefi.tools.online.HttpUtil; @@ -14,9 +19,11 @@ import org.junit.Test; import java.io.IOException; import java.net.MalformedURLException; +import java.net.Socket; import java.util.List; import java.util.concurrent.CountDownLatch; +import static com.rusefi.TestHelper.LOCALHOST; import static com.rusefi.TestHelper.assertLatch; import static org.junit.Assert.assertEquals; @@ -72,8 +79,8 @@ public class ServerTest { assertEquals(0, backend.getControllersCount()); - new MockRusEfiDevice(TestHelper.TEST_TOKEN_1, TestHelper.TEST_SIGNATURE_1).connect(serverPortForControllers); - new MockRusEfiDevice("12345678-1234-1234-1234-123456789012", TestHelper.TEST_SIGNATURE_2).connect(serverPortForControllers); + new MockNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.TEST_SIGNATURE_1).connect(serverPortForControllers); + new MockNetworkConnector("12345678-1234-1234-1234-123456789012", TestHelper.TEST_SIGNATURE_2).connect(serverPortForControllers); assertLatch("onConnected", onConnected); @@ -147,7 +154,7 @@ covered by FullServerTest BackendTestHelper.runApplicationConnectorBlocking(backend, serverPortForRemoteUsers); // start authenticator - IoStream authenticatorToProxyStream = TestHelper.secureConnectToLocalhost(serverPortForRemoteUsers, logger); + IoStream authenticatorToProxyStream = TestHelper.secureConnectToLocalhost(serverPortForRemoteUsers); new HelloCommand("hello").handle(authenticatorToProxyStream); assertLatch(disconnectedCountDownLatch); @@ -176,10 +183,36 @@ covered by FullServerTest ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, BackendTestHelper.createTestUserResolver().apply(TestHelper.TEST_TOKEN_1)); // start authenticator - IoStream authenticatorToProxyStream = TestHelper.secureConnectToLocalhost(serverPortForRemoteUsers, logger); + IoStream authenticatorToProxyStream = TestHelper.secureConnectToLocalhost(serverPortForRemoteUsers); LocalApplicationProxy.sendHello(authenticatorToProxyStream, applicationRequest); assertLatch(disconnectedCountDownLatch); } } + + private static class MockNetworkConnector { + private final SessionDetails sessionDetails; + + private MockNetworkConnector(String authToken, String signature) { + sessionDetails = TestHelper.createTestSession(authToken, signature); + } + + public void connect(int serverPort) throws IOException { + Socket socket = rusEFISSLContext.getSSLSocket(LOCALHOST, serverPort); + BaseBroadcastingThread baseBroadcastingThread = new BaseBroadcastingThread(socket, + sessionDetails, + TcpIoStream.DisconnectListener.VOID, new NetworkConnectorContext()) { + @Override + protected void handleCommand(BinaryProtocolServer.Packet packet, TcpIoStream stream) throws IOException { + super.handleCommand(packet, stream); + + if (packet.getPacket()[0] == Fields.TS_OUTPUT_COMMAND) { + GetOutputsCommand.sendOutput(stream); + } + } + }; + baseBroadcastingThread.start(); + } + + } } diff --git a/java_console/ui/src/test/java/com/rusefi/io/TcpCommunicationIntegrationTest.java b/java_console/ui/src/test/java/com/rusefi/io/TcpCommunicationIntegrationTest.java index 40d8851af0..1c34f7e38d 100644 --- a/java_console/ui/src/test/java/com/rusefi/io/TcpCommunicationIntegrationTest.java +++ b/java_console/ui/src/test/java/com/rusefi/io/TcpCommunicationIntegrationTest.java @@ -51,7 +51,7 @@ public class TcpCommunicationIntegrationTest { ConfigurationImage serverImage = TestHelper.prepareImage(value, iniField); int port = 6100; - BinaryProtocolServer server = TestHelper.createVirtualController(port, serverImage); + BinaryProtocolServer server = TestHelper.createVirtualController(port, serverImage, new BinaryProtocolServer.Context()); CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1); @@ -88,7 +88,7 @@ public class TcpCommunicationIntegrationTest { int controllerPort = 6102; // create virtual controller - TestHelper.createVirtualController(controllerPort, serverImage); + TestHelper.createVirtualController(controllerPort, serverImage, new BinaryProtocolServer.Context()); int proxyPort = 6103; diff --git a/java_console/ui/src/test/java/com/rusefi/proxy/NetworkConnectorTest.java b/java_console/ui/src/test/java/com/rusefi/proxy/NetworkConnectorTest.java index 57121d599e..854b685536 100644 --- a/java_console/ui/src/test/java/com/rusefi/proxy/NetworkConnectorTest.java +++ b/java_console/ui/src/test/java/com/rusefi/proxy/NetworkConnectorTest.java @@ -1,18 +1,32 @@ package com.rusefi.proxy; +import com.devexperts.logging.Logging; import com.opensr5.ConfigurationImage; import com.rusefi.BackendTestHelper; import com.rusefi.TestHelper; +import com.rusefi.Timeouts; import com.rusefi.config.generated.Fields; +import com.rusefi.io.tcp.BinaryProtocolServer; import com.rusefi.server.Backend; +import com.rusefi.server.ControllerConnectionState; +import org.junit.Before; import org.junit.Test; +import java.net.MalformedURLException; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertTrue; +import static com.devexperts.logging.Logging.getLogging; +import static com.rusefi.TestHelper.assertLatch; +import static com.rusefi.binaryprotocol.BinaryProtocol.sleep; public class NetworkConnectorTest { + private static final Logging log = getLogging(NetworkConnectorTest.class); + + @Before + public void setup() throws MalformedURLException { + BackendTestHelper.commonServerTest(); + } + @Test public void testReconnect() throws InterruptedException { int serverPortForControllers = 7504; @@ -21,7 +35,14 @@ public class NetworkConnectorTest { // create virtual controller to which "rusEFI network connector" connects to int controllerPort = 7502; ConfigurationImage controllerImage = new ConfigurationImage(Fields.TOTAL_CONFIG_SIZE); - TestHelper.createVirtualController(controllerPort, controllerImage); + BinaryProtocolServer.Context patientController = new BinaryProtocolServer.Context() { + @Override + public int getTimeout() { + // we need controller to not timeout while we are playing with all backend shutdowns + return 5 * Timeouts.MINUTE; + } + }; + TestHelper.createVirtualController(controllerPort, controllerImage, patientController); NetworkConnectorContext connectorContext = new NetworkConnectorContext() { @Override @@ -29,6 +50,16 @@ public class NetworkConnectorTest { return 3; } + @Override + public int firstPacketTimeout() { + return 3 * Timeouts.SECOND; + } + + @Override + public int consecutivePacketTimeout() { + return 3 * Timeouts.SECOND; + } + @Override public int serverPortForControllers() { return serverPortForControllers; @@ -44,14 +75,35 @@ public class NetworkConnectorTest { reconnectCounter.countDown(); } }; - NetworkConnector.NetworkConnectorResult networkConnectorResult = NetworkConnector.runNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.LOCALHOST + ":" + controllerPort, connectorContext, reconnectListener); + new NetworkConnector().runNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.LOCALHOST + ":" + controllerPort, connectorContext, reconnectListener); - assertTrue(reconnectCounter.await(30, TimeUnit.SECONDS)); + assertLatch(reconnectCounter); + // start backend, assert connection, stop backend + log.info("First backend instance"); + assertWouldConnect(serverPortForControllers, httpPort); - Backend backend = new Backend(BackendTestHelper.createTestUserResolver(), httpPort); -// BackendTestHelper.runControllerConnectorBlocking(backend, serverPortForControllers); + // giving http server time to shut down + sleep(3 * Timeouts.SECOND); + // now let's do this again with a new backend instance + log.info("Second backend instance"); + assertWouldConnect(serverPortForControllers, httpPort); + } + + private void assertWouldConnect(int serverPortForControllers, int httpPort) throws InterruptedException { + CountDownLatch onControllerRegistered = new CountDownLatch(1); + Backend backend = new Backend(BackendTestHelper.createTestUserResolver(), httpPort) { + @Override + public void register(ControllerConnectionState controllerConnectionState) { + super.register(controllerConnectionState); + onControllerRegistered.countDown(); + } + }; + BackendTestHelper.runControllerConnectorBlocking(backend, serverPortForControllers); + + // assert that reconnect actually happened + assertLatch(onControllerRegistered); backend.close(); } } diff --git a/java_console/ui/src/test/java/com/rusefi/proxy/client/LocalApplicationProxyTest.java b/java_console/ui/src/test/java/com/rusefi/proxy/client/LocalApplicationProxyTest.java index 4b39fd8b4d..2680c2308f 100644 --- a/java_console/ui/src/test/java/com/rusefi/proxy/client/LocalApplicationProxyTest.java +++ b/java_console/ui/src/test/java/com/rusefi/proxy/client/LocalApplicationProxyTest.java @@ -53,7 +53,7 @@ public class LocalApplicationProxyTest { ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1)); CountDownLatch disconnected = new CountDownLatch(1); - LocalApplicationProxy.startAndRun(context, applicationRequest, -1, disconnected::countDown, LocalApplicationProxy.ConnectionListener.VOID); + LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID); assertLatch(disconnected); mockBackend.close(); @@ -94,7 +94,7 @@ public class LocalApplicationProxyTest { ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1)); CountDownLatch disconnected = new CountDownLatch(1); - LocalApplicationProxy.startAndRun(context, applicationRequest, -1, disconnected::countDown, LocalApplicationProxy.ConnectionListener.VOID); + LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID); // wait for three output requests to take place assertLatch("gaugePokes", gaugePokes); diff --git a/java_tools/proxy_server/src/main/java/com/rusefi/server/Backend.java b/java_tools/proxy_server/src/main/java/com/rusefi/server/Backend.java index 88472b4d99..baaba549eb 100644 --- a/java_tools/proxy_server/src/main/java/com/rusefi/server/Backend.java +++ b/java_tools/proxy_server/src/main/java/com/rusefi/server/Backend.java @@ -173,7 +173,7 @@ public class Backend implements Closeable { IoStream applicationClientStream = null; ApplicationConnectionState applicationConnectionState = null; try { - applicationClientStream = new TcpIoStream("[app] ", applicationSocket); + applicationClientStream = new TcpIoStream("[backend-application connector] ", applicationSocket); // authenticator pushed hello packet on connect String jsonString = HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer()); @@ -370,6 +370,7 @@ public class Backend implements Closeable { @Override public void close() { + log.info("Closing..."); isClosed = true; FileUtil.close(applicationConnector); FileUtil.close(controllerConnector); diff --git a/java_tools/proxy_server/src/main/java/com/rusefi/server/ControllerConnectionState.java b/java_tools/proxy_server/src/main/java/com/rusefi/server/ControllerConnectionState.java index 93e2ee4536..153986e2b9 100644 --- a/java_tools/proxy_server/src/main/java/com/rusefi/server/ControllerConnectionState.java +++ b/java_tools/proxy_server/src/main/java/com/rusefi/server/ControllerConnectionState.java @@ -43,7 +43,7 @@ public class ControllerConnectionState { this.clientSocket = clientSocket; this.userDetailsResolver = userDetailsResolver; try { - stream = new TcpIoStream("[controller] ", clientSocket); + stream = new TcpIoStream("[backend-controller connector] ", clientSocket); incomingData = stream.getDataBuffer(); } catch (IOException e) { close();