rusefi 2020-07-26 16:52:58 -04:00
parent 39ba9a9f19
commit 549afd5354
15 changed files with 171 additions and 96 deletions

View File

@ -58,15 +58,15 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
private static ConcurrentHashMap<String, ThreadFactory> THREAD_FACTORIES_BY_NAME = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, ThreadFactory> THREAD_FACTORIES_BY_NAME = new ConcurrentHashMap<>();
public void start(LinkManager linkManager) { public void start(LinkManager linkManager) {
start(linkManager, DEFAULT_PROXY_PORT, Listener.empty()); start(linkManager, DEFAULT_PROXY_PORT, Listener.empty(), new Context());
} }
public void start(LinkManager linkManager, int port, Listener serverSocketCreationCallback) { public void start(LinkManager linkManager, int port, Listener serverSocketCreationCallback, Context context) {
log.info("BinaryProtocolServer on " + port); log.info("BinaryProtocolServer on " + port);
Function<Socket, Runnable> clientSocketRunnableFactory = clientSocket -> () -> { Function<Socket, Runnable> clientSocketRunnableFactory = clientSocket -> () -> {
try { try {
runProxy(linkManager, clientSocket); runProxy(linkManager, clientSocket, context);
} catch (IOException e) { } catch (IOException e) {
log.info("proxy connection: " + e); log.info("proxy connection: " + e);
} }
@ -107,7 +107,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
log.info("Client socket closed right away " + e); log.info("Client socket closed right away " + e);
continue; continue;
} }
log.info("Binary protocol proxy port connection"); log.info("Accepting binary protocol proxy port connection on " + port);
threadFactory.newThread(clientSocketRunnableFactory.apply(clientSocket)).start(); threadFactory.newThread(clientSocketRunnableFactory.apply(clientSocket)).start();
} }
}; };
@ -121,7 +121,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
} }
@SuppressWarnings("InfiniteLoopStatement") @SuppressWarnings("InfiniteLoopStatement")
private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException { private void runProxy(LinkManager linkManager, Socket clientSocket, Context context) throws IOException {
TcpIoStream stream = new TcpIoStream("[proxy] ", clientSocket); TcpIoStream stream = new TcpIoStream("[proxy] ", clientSocket);
IncomingDataBuffer in = stream.getDataBuffer(); IncomingDataBuffer in = stream.getDataBuffer();
@ -133,7 +133,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
handled.set(true); handled.set(true);
}; };
int length = getPacketLength(in, protocolCommandHandler); int length = getPacketLength(in, protocolCommandHandler, context.getTimeout());
if (handled.get()) { if (handled.get()) {
continue; continue;
} }
@ -305,4 +305,10 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
return crc; return crc;
} }
} }
public static class Context {
public int getTimeout() {
return Timeouts.BINARY_IO_TIMEOUT;
}
}
} }

View File

@ -47,9 +47,14 @@ public class TcpIoStream extends AbstractIoStream {
@Override @Override
public void close() { public void close() {
// we need to guarantee only one onDisconnect invocation for retry logic to be healthy
synchronized (this) {
if (!isClosed()) {
super.close(); super.close();
disconnectListener.onDisconnect("on close");
}
}
FileUtil.close(socket); FileUtil.close(socket);
disconnectListener.onDisconnect();
} }
@Override @Override
@ -79,9 +84,9 @@ public class TcpIoStream extends AbstractIoStream {
} }
public interface DisconnectListener { public interface DisconnectListener {
DisconnectListener VOID = () -> { DisconnectListener VOID = (String message) -> {
}; };
void onDisconnect(); void onDisconnect(String message);
} }
} }

View File

@ -2,11 +2,9 @@ package com.rusefi.proxy;
import com.devexperts.logging.Logging; import com.devexperts.logging.Logging;
import com.rusefi.NamedThreadFactory; import com.rusefi.NamedThreadFactory;
import com.rusefi.Timeouts;
import com.rusefi.binaryprotocol.IncomingDataBuffer; import com.rusefi.binaryprotocol.IncomingDataBuffer;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.commands.HelloCommand; import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.BinaryProtocolProxy;
import com.rusefi.io.tcp.BinaryProtocolServer; import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream; import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.server.SessionDetails; import com.rusefi.server.SessionDetails;
@ -21,29 +19,30 @@ import static com.rusefi.shared.FileUtil.close;
public class BaseBroadcastingThread { public class BaseBroadcastingThread {
private static final Logging log = getLogging(BaseBroadcastingThread.class); private static final Logging log = getLogging(BaseBroadcastingThread.class);
private static final NamedThreadFactory BASE_BROADCASTING_THREAD = new NamedThreadFactory("BaseBroadcastingThread"); private static final NamedThreadFactory BASE_BROADCASTING_THREAD = new NamedThreadFactory("network connector");
private final Thread thread; private final Thread thread;
@SuppressWarnings("InfiniteLoopStatement") @SuppressWarnings("InfiniteLoopStatement")
public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, TcpIoStream.DisconnectListener disconnectListener) throws IOException { public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, TcpIoStream.DisconnectListener disconnectListener, NetworkConnectorContext context) {
thread = BASE_BROADCASTING_THREAD.newThread(() -> { thread = BASE_BROADCASTING_THREAD.newThread(() -> {
TcpIoStream stream = null; TcpIoStream stream = null;
try { try {
stream = new TcpIoStream("[broadcast] ", socket, disconnectListener); stream = new TcpIoStream("[network connector] ", socket, disconnectListener);
IncomingDataBuffer in = stream.getDataBuffer(); IncomingDataBuffer in = stream.getDataBuffer();
boolean isFirstHello = true; boolean isFirstHello = true;
while (true) { while (true) {
int ioTimeout; int ioTimeout;
if (isFirstHello) { if (isFirstHello) {
log.info("Waiting for proxy server to request session details"); log.info("Waiting for proxy server to request session details");
ioTimeout = Timeouts.CMD_TIMEOUT; ioTimeout = context.firstPacketTimeout();
} else { } else {
ioTimeout = BinaryProtocolProxy.USER_IO_TIMEOUT; ioTimeout = context.consecutivePacketTimeout();
} }
log.info("TEMPLOG READ " + ioTimeout);
int length = getPacketLength(in, () -> { int length = getPacketLength(in, () -> {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
}, ioTimeout); }, ioTimeout);
log.info("TEMPLOG len " + 0);
BinaryProtocolServer.Packet packet = readPromisedBytes(in, length); BinaryProtocolServer.Packet packet = readPromisedBytes(in, length);
byte[] payload = packet.getPacket(); byte[] payload = packet.getPacket();

View File

@ -17,6 +17,7 @@ import com.rusefi.server.rusEFISSLContext;
import com.rusefi.tools.online.HttpUtil; import com.rusefi.tools.online.HttpUtil;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -29,10 +30,11 @@ import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
* Connector between rusEFI ECU and rusEFI server * Connector between rusEFI ECU and rusEFI server
* see NetworkConnectorStartup * see NetworkConnectorStartup
*/ */
public class NetworkConnector { public class NetworkConnector implements Closeable {
private final static Logging log = Logging.getLogging(NetworkConnector.class); private final static Logging log = Logging.getLogging(NetworkConnector.class);
private boolean isClosed;
public static NetworkConnectorResult runNetworkConnector(String authToken, String controllerPort, NetworkConnectorContext context, ReconnectListener reconnectListener) { public NetworkConnectorResult runNetworkConnector(String authToken, String controllerPort, NetworkConnectorContext context, ReconnectListener reconnectListener) {
LinkManager controllerConnector = new LinkManager() LinkManager controllerConnector = new LinkManager()
.setCompositeLogicEnabled(false) .setCompositeLogicEnabled(false)
.setNeedPullData(false); .setNeedPullData(false);
@ -65,19 +67,20 @@ public class NetworkConnector {
int oneTimeToken = SessionDetails.createOneTimeCode(); int oneTimeToken = SessionDetails.createOneTimeCode();
new Thread(() -> { BinaryProtocolServer.getThreadFactory("Proxy Reconnect").newThread(() -> {
Semaphore proxyReconnectSemaphore = new Semaphore(1); Semaphore proxyReconnectSemaphore = new Semaphore(1);
try { try {
while (true) { while (!isClosed) {
proxyReconnectSemaphore.acquire(); proxyReconnectSemaphore.acquire();
try { try {
runNetworkConnector(context.serverPortForControllers(), controllerConnector, authToken, () -> { runNetworkConnector(context.serverPortForControllers(), controllerConnector, authToken, (String message) -> {
log.error("Disconnect from proxy server detected, now sleeping " + context.reconnectDelay() + " seconds"); log.error(message + " Disconnect from proxy server detected, now sleeping " + context.reconnectDelay() + " seconds");
sleep(context.reconnectDelay() * Timeouts.SECOND); sleep(context.reconnectDelay() * Timeouts.SECOND);
log.debug("Releasing semaphore");
proxyReconnectSemaphore.release(); proxyReconnectSemaphore.release();
reconnectListener.onReconnect(); reconnectListener.onReconnect();
}, oneTimeToken, controllerInfo); }, oneTimeToken, controllerInfo, context);
} catch (IOException e) { } catch (IOException e) {
log.error("IO error", e); log.error("IO error", e);
} }
@ -85,13 +88,13 @@ public class NetworkConnector {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
}, "Proxy Reconnect").start(); }).start();
return new NetworkConnectorResult(controllerInfo, oneTimeToken); return new NetworkConnectorResult(controllerInfo, oneTimeToken);
} }
@NotNull @NotNull
private static SessionDetails runNetworkConnector(int serverPortForControllers, LinkManager linkManager, String authToken, final TcpIoStream.DisconnectListener disconnectListener, int oneTimeToken, ControllerInfo controllerInfo) throws IOException { private static SessionDetails runNetworkConnector(int serverPortForControllers, LinkManager linkManager, String authToken, final TcpIoStream.DisconnectListener disconnectListener, int oneTimeToken, ControllerInfo controllerInfo, final NetworkConnectorContext context) throws IOException {
IoStream targetEcuSocket = linkManager.getConnector().getBinaryProtocol().getStream(); IoStream targetEcuSocket = linkManager.getConnector().getBinaryProtocol().getStream();
SessionDetails deviceSessionDetails = new SessionDetails(controllerInfo, authToken, oneTimeToken); SessionDetails deviceSessionDetails = new SessionDetails(controllerInfo, authToken, oneTimeToken);
@ -102,12 +105,12 @@ public class NetworkConnector {
socket = rusEFISSLContext.getSSLSocket(HttpUtil.RUSEFI_PROXY_HOSTNAME, serverPortForControllers); socket = rusEFISSLContext.getSSLSocket(HttpUtil.RUSEFI_PROXY_HOSTNAME, serverPortForControllers);
} catch (IOException e) { } catch (IOException e) {
// socket open exception is a special case and should be handled separately // socket open exception is a special case and should be handled separately
disconnectListener.onDisconnect(); disconnectListener.onDisconnect("on socket open");
return deviceSessionDetails; return deviceSessionDetails;
} }
BaseBroadcastingThread baseBroadcastingThread = new BaseBroadcastingThread(socket, BaseBroadcastingThread baseBroadcastingThread = new BaseBroadcastingThread(socket,
deviceSessionDetails, deviceSessionDetails,
disconnectListener) { disconnectListener, context) {
@Override @Override
protected void handleCommand(BinaryProtocolServer.Packet packet, TcpIoStream stream) throws IOException { protected void handleCommand(BinaryProtocolServer.Packet packet, TcpIoStream stream) throws IOException {
super.handleCommand(packet, stream); super.handleCommand(packet, stream);
@ -138,6 +141,11 @@ public class NetworkConnector {
return new ControllerInfo(vehicleName, engineMake, engineCode, controllerSignature); return new ControllerInfo(vehicleName, engineMake, engineCode, controllerSignature);
} }
@Override
public void close() {
isClosed = true;
}
public static class NetworkConnectorResult { public static class NetworkConnectorResult {
static NetworkConnectorResult ERROR = new NetworkConnectorResult(null, 0); static NetworkConnectorResult ERROR = new NetworkConnectorResult(null, 0);
private final ControllerInfo controllerInfo; private final ControllerInfo controllerInfo;

View File

@ -1,5 +1,7 @@
package com.rusefi.proxy; package com.rusefi.proxy;
import com.rusefi.Timeouts;
import com.rusefi.io.tcp.BinaryProtocolProxy;
import com.rusefi.tools.online.ProxyClient; import com.rusefi.tools.online.ProxyClient;
public class NetworkConnectorContext { public class NetworkConnectorContext {
@ -7,6 +9,14 @@ public class NetworkConnectorContext {
return 15; // this one is seconds return 15; // this one is seconds
} }
public int firstPacketTimeout() {
return Timeouts.CMD_TIMEOUT;
}
public int consecutivePacketTimeout() {
return BinaryProtocolProxy.USER_IO_TIMEOUT;
}
public int serverPortForControllers() { public int serverPortForControllers() {
return ProxyClient.SERVER_PORT_FOR_CONTROLLERS; return ProxyClient.SERVER_PORT_FOR_CONTROLLERS;
} }

View File

@ -31,6 +31,7 @@ public class TestHelper {
public static final String TEST_SIGNATURE_2 = "rusEFI 2020.07.11.proteus_f4.1986715563"; public static final String TEST_SIGNATURE_2 = "rusEFI 2020.07.11.proteus_f4.1986715563";
public static final ControllerInfo CONTROLLER_INFO = new ControllerInfo("name", "make", "code", Fields.TS_SIGNATURE); public static final ControllerInfo CONTROLLER_INFO = new ControllerInfo("name", "make", "code", Fields.TS_SIGNATURE);
public static final String TEST_TOKEN_1 = "00000000-1234-1234-1234-123456789012"; public static final String TEST_TOKEN_1 = "00000000-1234-1234-1234-123456789012";
public static final String TEST_TOKEN_3 = "33333333-3333-1234-1234-123456789012";
@NotNull @NotNull
public static ScalarIniField createIniField(Field field) { public static ScalarIniField createIniField(Field field) {
@ -46,7 +47,7 @@ public class TestHelper {
} }
@NotNull @NotNull
public static BinaryProtocolServer createVirtualController(ConfigurationImage ci, int port, Listener serverSocketCreationCallback) { public static BinaryProtocolServer createVirtualController(ConfigurationImage ci, int port, Listener serverSocketCreationCallback, BinaryProtocolServer.Context context) {
BinaryProtocolState state = new BinaryProtocolState(); BinaryProtocolState state = new BinaryProtocolState();
state.setController(ci); state.setController(ci);
state.setCurrentOutputs(new byte[1 + Fields.TS_OUTPUT_SIZE]); state.setCurrentOutputs(new byte[1 + Fields.TS_OUTPUT_SIZE]);
@ -54,12 +55,12 @@ public class TestHelper {
LinkManager linkManager = new LinkManager(); LinkManager linkManager = new LinkManager();
linkManager.setConnector(LinkConnector.getDetachedConnector(state)); linkManager.setConnector(LinkConnector.getDetachedConnector(state));
BinaryProtocolServer server = new BinaryProtocolServer(); BinaryProtocolServer server = new BinaryProtocolServer();
server.start(linkManager, port, serverSocketCreationCallback); server.start(linkManager, port, serverSocketCreationCallback, context);
return server; return server;
} }
@NotNull @NotNull
public static IoStream secureConnectToLocalhost(int controllerPort, Logger logger) { public static IoStream secureConnectToLocalhost(int controllerPort) {
IoStream targetEcuSocket; IoStream targetEcuSocket;
try { try {
targetEcuSocket = new TcpIoStream("[local]", rusEFISSLContext.getSSLSocket(LOCALHOST, controllerPort)); targetEcuSocket = new TcpIoStream("[local]", rusEFISSLContext.getSSLSocket(LOCALHOST, controllerPort));
@ -80,9 +81,9 @@ public class TestHelper {
return targetEcuSocket; return targetEcuSocket;
} }
public static BinaryProtocolServer createVirtualController(int controllerPort, ConfigurationImage controllerImage) throws InterruptedException { public static BinaryProtocolServer createVirtualController(int controllerPort, ConfigurationImage controllerImage, BinaryProtocolServer.Context context) throws InterruptedException {
CountDownLatch controllerCreated = new CountDownLatch(1); CountDownLatch controllerCreated = new CountDownLatch(1);
BinaryProtocolServer server = createVirtualController(controllerImage, controllerPort, parameter -> controllerCreated.countDown()); BinaryProtocolServer server = createVirtualController(controllerImage, controllerPort, parameter -> controllerCreated.countDown(), context);
assertLatch(controllerCreated); assertLatch(controllerCreated);
return server; return server;
} }

View File

@ -25,7 +25,7 @@ public class NetworkConnectorStartup {
NetworkConnectorContext connectorContext = new NetworkConnectorContext(); NetworkConnectorContext connectorContext = new NetworkConnectorContext();
NetworkConnector.NetworkConnectorResult networkConnectorResult = NetworkConnector.runNetworkConnector(authToken, autoDetectedPort, connectorContext, NetworkConnector.ReconnectListener.VOID); NetworkConnector.NetworkConnectorResult networkConnectorResult = new NetworkConnector().runNetworkConnector(authToken, autoDetectedPort, connectorContext, NetworkConnector.ReconnectListener.VOID);
log.info("Running with oneTimeToken=" + networkConnectorResult.getOneTimeToken()); log.info("Running with oneTimeToken=" + networkConnectorResult.getOneTimeToken());
} }
} }

View File

@ -6,6 +6,7 @@ import com.rusefi.binaryprotocol.BinaryProtocol;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.ConnectionStateListener; import com.rusefi.io.ConnectionStateListener;
import com.rusefi.io.LinkManager; import com.rusefi.io.LinkManager;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream; import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.proxy.NetworkConnector; import com.rusefi.proxy.NetworkConnector;
import com.rusefi.proxy.NetworkConnectorContext; import com.rusefi.proxy.NetworkConnectorContext;
@ -86,7 +87,7 @@ public class FullServerTest {
// create virtual controller to which "rusEFI network connector" connects to // create virtual controller to which "rusEFI network connector" connects to
int controllerPort = 7002; int controllerPort = 7002;
ConfigurationImage controllerImage = prepareImage(value, createIniField(Fields.CYLINDERSCOUNT)); ConfigurationImage controllerImage = prepareImage(value, createIniField(Fields.CYLINDERSCOUNT));
TestHelper.createVirtualController(controllerPort, controllerImage); TestHelper.createVirtualController(controllerPort, controllerImage, new BinaryProtocolServer.Context());
NetworkConnectorContext networkConnectorContext = new NetworkConnectorContext() { NetworkConnectorContext networkConnectorContext = new NetworkConnectorContext() {
@Override @Override
@ -96,12 +97,12 @@ public class FullServerTest {
}; };
// start "rusEFI network connector" to connect controller with backend since in real life controller has only local serial port it does not have network // start "rusEFI network connector" to connect controller with backend since in real life controller has only local serial port it does not have network
NetworkConnector.NetworkConnectorResult networkConnectorResult = NetworkConnector.runNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.LOCALHOST + ":" + controllerPort, networkConnectorContext, NetworkConnector.ReconnectListener.VOID); NetworkConnector.NetworkConnectorResult networkConnectorResult = new NetworkConnector().runNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.LOCALHOST + ":" + controllerPort, networkConnectorContext, NetworkConnector.ReconnectListener.VOID);
ControllerInfo controllerInfo = networkConnectorResult.getControllerInfo(); ControllerInfo controllerInfo = networkConnectorResult.getControllerInfo();
TestHelper.assertLatch("controllerRegistered", controllerRegistered); TestHelper.assertLatch("controllerRegistered", controllerRegistered);
SessionDetails authenticatorSessionDetails = new SessionDetails(controllerInfo, MockRusEfiDevice.TEST_TOKEN_3, networkConnectorResult.getOneTimeToken()); SessionDetails authenticatorSessionDetails = new SessionDetails(controllerInfo, TEST_TOKEN_3, networkConnectorResult.getOneTimeToken());
ApplicationRequest applicationRequest = new ApplicationRequest(authenticatorSessionDetails, userDetailsResolver.apply(TestHelper.TEST_TOKEN_1)); ApplicationRequest applicationRequest = new ApplicationRequest(authenticatorSessionDetails, userDetailsResolver.apply(TestHelper.TEST_TOKEN_1));
// start authenticator // start authenticator

View File

@ -1,41 +0,0 @@
package com.rusefi;
import com.rusefi.config.generated.Fields;
import com.rusefi.io.commands.GetOutputsCommand;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.proxy.BaseBroadcastingThread;
import com.rusefi.server.SessionDetails;
import com.rusefi.server.rusEFISSLContext;
import java.io.IOException;
import java.net.Socket;
import static com.rusefi.TestHelper.LOCALHOST;
public class MockRusEfiDevice {
public static final String TEST_TOKEN_3 = "33333333-3333-1234-1234-123456789012";
private final SessionDetails sessionDetails;
public MockRusEfiDevice(String authToken, String signature) {
sessionDetails = TestHelper.createTestSession(authToken, signature);
}
public void connect(int serverPort) throws IOException {
Socket socket = rusEFISSLContext.getSSLSocket(LOCALHOST, serverPort);
BaseBroadcastingThread baseBroadcastingThread = new BaseBroadcastingThread(socket,
sessionDetails,
TcpIoStream.DisconnectListener.VOID) {
@Override
protected void handleCommand(BinaryProtocolServer.Packet packet, TcpIoStream stream) throws IOException {
super.handleCommand(packet, stream);
if (packet.getPacket()[0] == Fields.TS_OUTPUT_COMMAND) {
GetOutputsCommand.sendOutput(stream);
}
}
};
baseBroadcastingThread.start();
}
}

View File

@ -3,7 +3,12 @@ package com.rusefi;
import com.opensr5.Logger; import com.opensr5.Logger;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.IoStream; import com.rusefi.io.IoStream;
import com.rusefi.io.commands.GetOutputsCommand;
import com.rusefi.io.commands.HelloCommand; import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.proxy.BaseBroadcastingThread;
import com.rusefi.proxy.NetworkConnectorContext;
import com.rusefi.proxy.client.LocalApplicationProxy; import com.rusefi.proxy.client.LocalApplicationProxy;
import com.rusefi.server.*; import com.rusefi.server.*;
import com.rusefi.tools.online.HttpUtil; import com.rusefi.tools.online.HttpUtil;
@ -14,9 +19,11 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.Socket;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import static com.rusefi.TestHelper.LOCALHOST;
import static com.rusefi.TestHelper.assertLatch; import static com.rusefi.TestHelper.assertLatch;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -72,8 +79,8 @@ public class ServerTest {
assertEquals(0, backend.getControllersCount()); assertEquals(0, backend.getControllersCount());
new MockRusEfiDevice(TestHelper.TEST_TOKEN_1, TestHelper.TEST_SIGNATURE_1).connect(serverPortForControllers); new MockNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.TEST_SIGNATURE_1).connect(serverPortForControllers);
new MockRusEfiDevice("12345678-1234-1234-1234-123456789012", TestHelper.TEST_SIGNATURE_2).connect(serverPortForControllers); new MockNetworkConnector("12345678-1234-1234-1234-123456789012", TestHelper.TEST_SIGNATURE_2).connect(serverPortForControllers);
assertLatch("onConnected", onConnected); assertLatch("onConnected", onConnected);
@ -147,7 +154,7 @@ covered by FullServerTest
BackendTestHelper.runApplicationConnectorBlocking(backend, serverPortForRemoteUsers); BackendTestHelper.runApplicationConnectorBlocking(backend, serverPortForRemoteUsers);
// start authenticator // start authenticator
IoStream authenticatorToProxyStream = TestHelper.secureConnectToLocalhost(serverPortForRemoteUsers, logger); IoStream authenticatorToProxyStream = TestHelper.secureConnectToLocalhost(serverPortForRemoteUsers);
new HelloCommand("hello").handle(authenticatorToProxyStream); new HelloCommand("hello").handle(authenticatorToProxyStream);
assertLatch(disconnectedCountDownLatch); assertLatch(disconnectedCountDownLatch);
@ -176,10 +183,36 @@ covered by FullServerTest
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, BackendTestHelper.createTestUserResolver().apply(TestHelper.TEST_TOKEN_1)); ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, BackendTestHelper.createTestUserResolver().apply(TestHelper.TEST_TOKEN_1));
// start authenticator // start authenticator
IoStream authenticatorToProxyStream = TestHelper.secureConnectToLocalhost(serverPortForRemoteUsers, logger); IoStream authenticatorToProxyStream = TestHelper.secureConnectToLocalhost(serverPortForRemoteUsers);
LocalApplicationProxy.sendHello(authenticatorToProxyStream, applicationRequest); LocalApplicationProxy.sendHello(authenticatorToProxyStream, applicationRequest);
assertLatch(disconnectedCountDownLatch); assertLatch(disconnectedCountDownLatch);
} }
} }
private static class MockNetworkConnector {
private final SessionDetails sessionDetails;
private MockNetworkConnector(String authToken, String signature) {
sessionDetails = TestHelper.createTestSession(authToken, signature);
}
public void connect(int serverPort) throws IOException {
Socket socket = rusEFISSLContext.getSSLSocket(LOCALHOST, serverPort);
BaseBroadcastingThread baseBroadcastingThread = new BaseBroadcastingThread(socket,
sessionDetails,
TcpIoStream.DisconnectListener.VOID, new NetworkConnectorContext()) {
@Override
protected void handleCommand(BinaryProtocolServer.Packet packet, TcpIoStream stream) throws IOException {
super.handleCommand(packet, stream);
if (packet.getPacket()[0] == Fields.TS_OUTPUT_COMMAND) {
GetOutputsCommand.sendOutput(stream);
}
}
};
baseBroadcastingThread.start();
}
}
} }

View File

@ -51,7 +51,7 @@ public class TcpCommunicationIntegrationTest {
ConfigurationImage serverImage = TestHelper.prepareImage(value, iniField); ConfigurationImage serverImage = TestHelper.prepareImage(value, iniField);
int port = 6100; int port = 6100;
BinaryProtocolServer server = TestHelper.createVirtualController(port, serverImage); BinaryProtocolServer server = TestHelper.createVirtualController(port, serverImage, new BinaryProtocolServer.Context());
CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1); CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1);
@ -88,7 +88,7 @@ public class TcpCommunicationIntegrationTest {
int controllerPort = 6102; int controllerPort = 6102;
// create virtual controller // create virtual controller
TestHelper.createVirtualController(controllerPort, serverImage); TestHelper.createVirtualController(controllerPort, serverImage, new BinaryProtocolServer.Context());
int proxyPort = 6103; int proxyPort = 6103;

View File

@ -1,18 +1,32 @@
package com.rusefi.proxy; package com.rusefi.proxy;
import com.devexperts.logging.Logging;
import com.opensr5.ConfigurationImage; import com.opensr5.ConfigurationImage;
import com.rusefi.BackendTestHelper; import com.rusefi.BackendTestHelper;
import com.rusefi.TestHelper; import com.rusefi.TestHelper;
import com.rusefi.Timeouts;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.server.Backend; import com.rusefi.server.Backend;
import com.rusefi.server.ControllerConnectionState;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.net.MalformedURLException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue; import static com.devexperts.logging.Logging.getLogging;
import static com.rusefi.TestHelper.assertLatch;
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
public class NetworkConnectorTest { public class NetworkConnectorTest {
private static final Logging log = getLogging(NetworkConnectorTest.class);
@Before
public void setup() throws MalformedURLException {
BackendTestHelper.commonServerTest();
}
@Test @Test
public void testReconnect() throws InterruptedException { public void testReconnect() throws InterruptedException {
int serverPortForControllers = 7504; int serverPortForControllers = 7504;
@ -21,7 +35,14 @@ public class NetworkConnectorTest {
// create virtual controller to which "rusEFI network connector" connects to // create virtual controller to which "rusEFI network connector" connects to
int controllerPort = 7502; int controllerPort = 7502;
ConfigurationImage controllerImage = new ConfigurationImage(Fields.TOTAL_CONFIG_SIZE); ConfigurationImage controllerImage = new ConfigurationImage(Fields.TOTAL_CONFIG_SIZE);
TestHelper.createVirtualController(controllerPort, controllerImage); BinaryProtocolServer.Context patientController = new BinaryProtocolServer.Context() {
@Override
public int getTimeout() {
// we need controller to not timeout while we are playing with all backend shutdowns
return 5 * Timeouts.MINUTE;
}
};
TestHelper.createVirtualController(controllerPort, controllerImage, patientController);
NetworkConnectorContext connectorContext = new NetworkConnectorContext() { NetworkConnectorContext connectorContext = new NetworkConnectorContext() {
@Override @Override
@ -29,6 +50,16 @@ public class NetworkConnectorTest {
return 3; return 3;
} }
@Override
public int firstPacketTimeout() {
return 3 * Timeouts.SECOND;
}
@Override
public int consecutivePacketTimeout() {
return 3 * Timeouts.SECOND;
}
@Override @Override
public int serverPortForControllers() { public int serverPortForControllers() {
return serverPortForControllers; return serverPortForControllers;
@ -44,14 +75,35 @@ public class NetworkConnectorTest {
reconnectCounter.countDown(); reconnectCounter.countDown();
} }
}; };
NetworkConnector.NetworkConnectorResult networkConnectorResult = NetworkConnector.runNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.LOCALHOST + ":" + controllerPort, connectorContext, reconnectListener); new NetworkConnector().runNetworkConnector(TestHelper.TEST_TOKEN_1, TestHelper.LOCALHOST + ":" + controllerPort, connectorContext, reconnectListener);
assertTrue(reconnectCounter.await(30, TimeUnit.SECONDS)); assertLatch(reconnectCounter);
// start backend, assert connection, stop backend
log.info("First backend instance");
assertWouldConnect(serverPortForControllers, httpPort);
Backend backend = new Backend(BackendTestHelper.createTestUserResolver(), httpPort); // giving http server time to shut down
// BackendTestHelper.runControllerConnectorBlocking(backend, serverPortForControllers); sleep(3 * Timeouts.SECOND);
// now let's do this again with a new backend instance
log.info("Second backend instance");
assertWouldConnect(serverPortForControllers, httpPort);
}
private void assertWouldConnect(int serverPortForControllers, int httpPort) throws InterruptedException {
CountDownLatch onControllerRegistered = new CountDownLatch(1);
Backend backend = new Backend(BackendTestHelper.createTestUserResolver(), httpPort) {
@Override
public void register(ControllerConnectionState controllerConnectionState) {
super.register(controllerConnectionState);
onControllerRegistered.countDown();
}
};
BackendTestHelper.runControllerConnectorBlocking(backend, serverPortForControllers);
// assert that reconnect actually happened
assertLatch(onControllerRegistered);
backend.close(); backend.close();
} }
} }

View File

@ -53,7 +53,7 @@ public class LocalApplicationProxyTest {
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1)); ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
CountDownLatch disconnected = new CountDownLatch(1); CountDownLatch disconnected = new CountDownLatch(1);
LocalApplicationProxy.startAndRun(context, applicationRequest, -1, disconnected::countDown, LocalApplicationProxy.ConnectionListener.VOID); LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID);
assertLatch(disconnected); assertLatch(disconnected);
mockBackend.close(); mockBackend.close();
@ -94,7 +94,7 @@ public class LocalApplicationProxyTest {
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1)); ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
CountDownLatch disconnected = new CountDownLatch(1); CountDownLatch disconnected = new CountDownLatch(1);
LocalApplicationProxy.startAndRun(context, applicationRequest, -1, disconnected::countDown, LocalApplicationProxy.ConnectionListener.VOID); LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID);
// wait for three output requests to take place // wait for three output requests to take place
assertLatch("gaugePokes", gaugePokes); assertLatch("gaugePokes", gaugePokes);

View File

@ -173,7 +173,7 @@ public class Backend implements Closeable {
IoStream applicationClientStream = null; IoStream applicationClientStream = null;
ApplicationConnectionState applicationConnectionState = null; ApplicationConnectionState applicationConnectionState = null;
try { try {
applicationClientStream = new TcpIoStream("[app] ", applicationSocket); applicationClientStream = new TcpIoStream("[backend-application connector] ", applicationSocket);
// authenticator pushed hello packet on connect // authenticator pushed hello packet on connect
String jsonString = HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer()); String jsonString = HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer());
@ -370,6 +370,7 @@ public class Backend implements Closeable {
@Override @Override
public void close() { public void close() {
log.info("Closing...");
isClosed = true; isClosed = true;
FileUtil.close(applicationConnector); FileUtil.close(applicationConnector);
FileUtil.close(controllerConnector); FileUtil.close(controllerConnector);

View File

@ -43,7 +43,7 @@ public class ControllerConnectionState {
this.clientSocket = clientSocket; this.clientSocket = clientSocket;
this.userDetailsResolver = userDetailsResolver; this.userDetailsResolver = userDetailsResolver;
try { try {
stream = new TcpIoStream("[controller] ", clientSocket); stream = new TcpIoStream("[backend-controller connector] ", clientSocket);
incomingData = stream.getDataBuffer(); incomingData = stream.getDataBuffer();
} catch (IOException e) { } catch (IOException e) {
close(); close();