time for more tests

This commit is contained in:
rusefi 2020-07-26 00:16:43 -04:00
parent 8ce0974b27
commit 267f2de0ab
9 changed files with 172 additions and 46 deletions

View File

@ -1,6 +1,10 @@
package com.rusefi.io.commands; package com.rusefi.io.commands;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.IoStream;
import com.rusefi.io.tcp.BinaryProtocolServer;
import java.io.IOException;
import static com.rusefi.binaryprotocol.IoHelper.putShort; import static com.rusefi.binaryprotocol.IoHelper.putShort;
import static com.rusefi.binaryprotocol.IoHelper.swap16; import static com.rusefi.binaryprotocol.IoHelper.swap16;
@ -13,4 +17,10 @@ public class GetOutputsCommand {
putShort(packet, 3, swap16(Fields.TS_OUTPUT_SIZE)); putShort(packet, 3, swap16(Fields.TS_OUTPUT_SIZE));
return packet; return packet;
} }
public static void sendOutput(IoStream stream) throws IOException {
byte[] response = new byte[1 + Fields.TS_OUTPUT_SIZE];
response[0] = (byte) BinaryProtocolServer.TS_OK.charAt(0);
stream.sendPacket(response);
}
} }

View File

@ -12,6 +12,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
import static com.devexperts.logging.Logging.getLogging; import static com.devexperts.logging.Logging.getLogging;
@ -28,12 +29,12 @@ public class BinaryProtocolProxy {
*/ */
public static final int USER_IO_TIMEOUT = 10 * Timeouts.MINUTE; public static final int USER_IO_TIMEOUT = 10 * Timeouts.MINUTE;
public static ServerSocketReference createProxy(IoStream targetEcuSocket, int serverProxyPort) { public static ServerSocketReference createProxy(IoStream targetEcuSocket, int serverProxyPort, AtomicInteger relayCommandCounter) {
Function<Socket, Runnable> clientSocketRunnableFactory = clientSocket -> () -> { Function<Socket, Runnable> clientSocketRunnableFactory = clientSocket -> () -> {
TcpIoStream clientStream = null; TcpIoStream clientStream = null;
try { try {
clientStream = new TcpIoStream("[[proxy]] ", clientSocket); clientStream = new TcpIoStream("[[proxy]] ", clientSocket);
runProxy(targetEcuSocket, clientStream); runProxy(targetEcuSocket, clientStream, relayCommandCounter);
} catch (IOException e) { } catch (IOException e) {
log.error("BinaryProtocolProxy::run " + e); log.error("BinaryProtocolProxy::run " + e);
close(clientStream); close(clientStream);
@ -42,12 +43,11 @@ public class BinaryProtocolProxy {
return BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Listener.empty()); return BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Listener.empty());
} }
public static void runProxy(IoStream targetEcu, IoStream clientStream) throws IOException { public static void runProxy(IoStream targetEcu, IoStream clientStream, AtomicInteger relayCommandCounter) throws IOException {
/* /*
* Each client socket is running on it's own thread * Each client socket is running on it's own thread
*/ */
//noinspection InfiniteLoopStatement while (!targetEcu.isClosed()) {
while (true) {
byte firstByte = clientStream.getDataBuffer().readByte(USER_IO_TIMEOUT); byte firstByte = clientStream.getDataBuffer().readByte(USER_IO_TIMEOUT);
if (firstByte == COMMAND_PROTOCOL) { if (firstByte == COMMAND_PROTOCOL) {
clientStream.write(TS_PROTOCOL.getBytes()); clientStream.write(TS_PROTOCOL.getBytes());
@ -55,8 +55,17 @@ public class BinaryProtocolProxy {
} }
BinaryProtocolServer.Packet clientRequest = readClientRequest(clientStream.getDataBuffer(), firstByte); BinaryProtocolServer.Packet clientRequest = readClientRequest(clientStream.getDataBuffer(), firstByte);
sendToTarget(targetEcu, clientRequest); /**
BinaryProtocolServer.Packet controllerResponse = targetEcu.readPacket(); * Two reasons for synchronization:
* - we run gauge poking thread until TunerStudio connects
* - technically there could be two parallel connections to local application port
*/
BinaryProtocolServer.Packet controllerResponse;
synchronized (targetEcu) {
sendToTarget(targetEcu, clientRequest);
controllerResponse = targetEcu.readPacket();
relayCommandCounter.incrementAndGet();
}
log.info("Relaying controller response length=" + controllerResponse.getPacket().length); log.info("Relaying controller response length=" + controllerResponse.getPacket().length);
clientStream.sendPacket(controllerResponse); clientStream.sendPacket(controllerResponse);

View File

@ -104,7 +104,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
try { try {
clientSocket = serverSocket.accept(); clientSocket = serverSocket.accept();
} catch (IOException e) { } catch (IOException e) {
log.info("Client socket closed right away" + e); log.info("Client socket closed right away " + e);
continue; continue;
} }
log.info("Binary protocol proxy port connection"); log.info("Binary protocol proxy port connection");

View File

@ -1,7 +1,9 @@
package com.rusefi.proxy.client; package com.rusefi.proxy.client;
import com.devexperts.logging.Logging; import com.devexperts.logging.Logging;
import com.rusefi.NamedThreadFactory;
import com.rusefi.io.IoStream; import com.rusefi.io.IoStream;
import com.rusefi.io.commands.GetOutputsCommand;
import com.rusefi.io.commands.HelloCommand; import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.BinaryProtocolProxy; import com.rusefi.io.tcp.BinaryProtocolProxy;
import com.rusefi.io.tcp.ServerSocketReference; import com.rusefi.io.tcp.ServerSocketReference;
@ -13,10 +15,14 @@ import com.rusefi.tools.online.ProxyClient;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import static com.devexperts.logging.Logging.getLogging; import static com.devexperts.logging.Logging.getLogging;
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
public class LocalApplicationProxy implements Closeable { public class LocalApplicationProxy implements Closeable {
private static final ThreadFactory THREAD_FACTORY = new NamedThreadFactory("gauge poking");
private static final Logging log = getLogging(LocalApplicationProxy.class); private static final Logging log = getLogging(LocalApplicationProxy.class);
public static final int SERVER_PORT_FOR_APPLICATIONS = HttpUtil.getIntProperty("applications.port", 8002); public static final int SERVER_PORT_FOR_APPLICATIONS = HttpUtil.getIntProperty("applications.port", 8002);
private final ApplicationRequest applicationRequest; private final ApplicationRequest applicationRequest;
@ -35,7 +41,7 @@ public class LocalApplicationProxy implements Closeable {
/** /**
* @param context * @param context
* @param applicationRequest remote session we want to connect to * @param applicationRequest remote session we want to connect to
* @param jsonHttpPort * @param jsonHttpPort
* @param disconnectListener * @param disconnectListener
* @param connectionListener * @param connectionListener
@ -49,7 +55,31 @@ public class LocalApplicationProxy implements Closeable {
IoStream authenticatorToProxyStream = new TcpIoStream("authenticatorToProxyStream ", rusEFISSLContext.getSSLSocket(HttpUtil.RUSEFI_PROXY_HOSTNAME, context.serverPortForRemoteApplications()), disconnectListener); IoStream authenticatorToProxyStream = new TcpIoStream("authenticatorToProxyStream ", rusEFISSLContext.getSSLSocket(HttpUtil.RUSEFI_PROXY_HOSTNAME, context.serverPortForRemoteApplications()), disconnectListener);
LocalApplicationProxy.sendHello(authenticatorToProxyStream, applicationRequest); LocalApplicationProxy.sendHello(authenticatorToProxyStream, applicationRequest);
ServerSocketReference serverHolder = BinaryProtocolProxy.createProxy(authenticatorToProxyStream, context.authenticatorPort()); AtomicInteger relayCommandCounter = new AtomicInteger();
/**
* We need to entertain proxy server and remote controller while user has already connected to proxy but has not yet started TunerStudio
*/
THREAD_FACTORY.newThread(() -> {
long start = System.currentTimeMillis();
try {
while (relayCommandCounter.get() < 4) {
sleep(context.gaugePokingPeriod());
byte[] commandPacket = GetOutputsCommand.createRequest();
synchronized (authenticatorToProxyStream) {
authenticatorToProxyStream.sendPacket(commandPacket);
// we do not really need the data, we just need to take response from the socket
authenticatorToProxyStream.readPacket();
}
}
} catch (IOException e) {
log.error("Gauge poker", e);
}
}).start();
ServerSocketReference serverHolder = BinaryProtocolProxy.createProxy(authenticatorToProxyStream, context.authenticatorPort(), relayCommandCounter);
LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(applicationRequest, serverHolder, authenticatorToProxyStream); LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(applicationRequest, serverHolder, authenticatorToProxyStream);
connectionListener.onConnected(localApplicationProxy); connectionListener.onConnected(localApplicationProxy);
return serverHolder; return serverHolder;

View File

@ -1,5 +1,7 @@
package com.rusefi.proxy.client; package com.rusefi.proxy.client;
import com.rusefi.Timeouts;
import java.io.IOException; import java.io.IOException;
public interface LocalApplicationProxyContext { public interface LocalApplicationProxyContext {
@ -14,4 +16,12 @@ public interface LocalApplicationProxyContext {
* local port on which authenticator accepts connections from Tuner Studio * local port on which authenticator accepts connections from Tuner Studio
*/ */
int authenticatorPort(); int authenticatorPort();
default int startUpIdle() {
return 6 * Timeouts.MINUTE;
}
default int gaugePokingPeriod() {
return 5 * Timeouts.SECOND;
}
} }

View File

@ -1,6 +1,7 @@
package com.rusefi; package com.rusefi;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.commands.GetOutputsCommand;
import com.rusefi.io.tcp.BinaryProtocolServer; import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream; import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.proxy.BaseBroadcastingThread; import com.rusefi.proxy.BaseBroadcastingThread;
@ -30,12 +31,11 @@ public class MockRusEfiDevice {
super.handleCommand(packet, stream); super.handleCommand(packet, stream);
if (packet.getPacket()[0] == Fields.TS_OUTPUT_COMMAND) { if (packet.getPacket()[0] == Fields.TS_OUTPUT_COMMAND) {
byte[] response = new byte[1 + Fields.TS_OUTPUT_SIZE]; GetOutputsCommand.sendOutput(stream);
response[0] = (byte) BinaryProtocolServer.TS_OK.charAt(0);
stream.sendPacket(response);
} }
} }
}; };
baseBroadcastingThread.start(); baseBroadcastingThread.start();
} }
} }

View File

@ -13,6 +13,7 @@ import org.junit.Test;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -94,7 +95,7 @@ public class TcpCommunicationIntegrationTest {
// connect proxy to virtual controller // connect proxy to virtual controller
IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort, LOGGER); IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort, LOGGER);
BinaryProtocolProxy.createProxy(targetEcuSocket, proxyPort); BinaryProtocolProxy.createProxy(targetEcuSocket, proxyPort, new AtomicInteger());
CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1); CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1);

View File

@ -2,37 +2,116 @@ package com.rusefi.proxy.client;
import com.rusefi.BackendTestHelper; import com.rusefi.BackendTestHelper;
import com.rusefi.TestHelper; import com.rusefi.TestHelper;
import com.rusefi.Timeouts;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.IoStream;
import com.rusefi.io.commands.GetOutputsCommand;
import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.BinaryProtocolServer; import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.ServerSocketReference; import com.rusefi.io.tcp.ServerSocketReference;
import com.rusefi.io.tcp.TcpIoStream; import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.server.ApplicationRequest; import com.rusefi.server.ApplicationRequest;
import com.rusefi.server.SessionDetails; import com.rusefi.server.SessionDetails;
import com.rusefi.tools.online.ProxyClient; import com.rusefi.tools.online.ProxyClient;
import org.jetbrains.annotations.NotNull;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.Socket;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.concurrent.atomic.AtomicInteger;
import static com.rusefi.BackendTestHelper.createTestUserResolver; import static com.rusefi.BackendTestHelper.createTestUserResolver;
import static com.rusefi.TestHelper.TEST_TOKEN_1; import static com.rusefi.TestHelper.TEST_TOKEN_1;
import static org.junit.Assert.assertEquals; import static com.rusefi.Timeouts.SECOND;
import static com.rusefi.binaryprotocol.BinaryProtocol.findCommand;
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
import static com.rusefi.shared.FileUtil.close;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class LocalApplicationProxyTest { public class LocalApplicationProxyTest {
private static AtomicInteger portNumber = new AtomicInteger();
@Before @Before
public void setup() throws MalformedURLException { public void setup() throws MalformedURLException {
BackendTestHelper.commonServerTest(); BackendTestHelper.commonServerTest();
} }
@Test @Test
public void testLocalApplication() throws IOException, InterruptedException { public void testDisconnectCallback() throws IOException, InterruptedException {
LocalApplicationProxyContext context = new LocalApplicationProxyContext() { LocalApplicationProxyContext context = createLocalApplicationProxy();
CountDownLatch backendCreated = new CountDownLatch(1);
ServerSocketReference mockBackend = BinaryProtocolServer.tcpServerSocket(context.serverPortForRemoteApplications(), "localAppTest", socket -> () -> {
sleep(Timeouts.SECOND);
close(socket);
}, parameter -> backendCreated.countDown());
assertTrue(backendCreated.await(30, TimeUnit.SECONDS));
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
CountDownLatch disconnected = new CountDownLatch(1);
LocalApplicationProxy.startAndRun(context, applicationRequest, -1, () -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID);
assertTrue(disconnected.await(30, TimeUnit.SECONDS));
mockBackend.close();
}
@Test
public void testGaugePoking() throws IOException, InterruptedException {
LocalApplicationProxyContext context = createLocalApplicationProxy();
CountDownLatch gaugePokes = new CountDownLatch(3);
CountDownLatch backendCreated = new CountDownLatch(1);
ServerSocketReference mockBackend = BinaryProtocolServer.tcpServerSocket(context.serverPortForRemoteApplications(), "localAppTest", socket -> () -> {
try {
IoStream applicationClientStream = new TcpIoStream("gauge", socket);
HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer());
while (gaugePokes.getCount() > 0) {
BinaryProtocolServer.Packet packet = applicationClientStream.readPacket();
System.out.println("Got packet " + findCommand(packet.getPacket()[0]));
if (packet.getPacket().length != 5)
throw new IllegalStateException();
GetOutputsCommand.sendOutput(applicationClientStream);
gaugePokes.countDown();
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
}, parameter -> backendCreated.countDown());
assertTrue(backendCreated.await(30, TimeUnit.SECONDS));
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
LocalApplicationProxy.startAndRun(context, applicationRequest, -1, TcpIoStream.DisconnectListener.VOID, LocalApplicationProxy.ConnectionListener.VOID);
// wait for three output requests to take place
assertTrue(gaugePokes.await(30, TimeUnit.SECONDS));
mockBackend.close();
}
@NotNull
private LocalApplicationProxyContext createLocalApplicationProxy() {
return new LocalApplicationProxyContext() {
private final int remotePort = portNumber.incrementAndGet();
private final int authenticatorPort = portNumber.incrementAndGet();
@Override @Override
public String executeGet(String url) { public String executeGet(String url) {
if (url.endsWith(ProxyClient.VERSION_PATH)) if (url.endsWith(ProxyClient.VERSION_PATH))
@ -42,37 +121,23 @@ public class LocalApplicationProxyTest {
@Override @Override
public int serverPortForRemoteApplications() { public int serverPortForRemoteApplications() {
return 5999; return remotePort;
} }
@Override @Override
public int authenticatorPort() { public int authenticatorPort() {
return 5998; return authenticatorPort;
}
@Override
public int startUpIdle() {
return 7 * Timeouts.SECOND;
}
@Override
public int gaugePokingPeriod() {
return SECOND;
} }
}; };
CountDownLatch backendCreated = new CountDownLatch(1);
ServerSocketReference mockBackend = BinaryProtocolServer.tcpServerSocket(context.serverPortForRemoteApplications(), "localAppTest", new Function<Socket, Runnable>() {
@Override
public Runnable apply(Socket socket) {
return new Runnable() {
@Override
public void run() {
}
};
}
}, parameter -> backendCreated.countDown());
assertTrue(backendCreated.await(30, TimeUnit.SECONDS));
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
LocalApplicationProxy.startAndRun(context, applicationRequest, -1, TcpIoStream.DisconnectListener.VOID, LocalApplicationProxy.ConnectionListener.VOID);
mockBackend.close();
} }
} }

View File

@ -30,6 +30,7 @@ import java.io.IOException;
import java.net.BindException; import java.net.BindException;
import java.util.*; import java.util.*;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static com.devexperts.logging.Logging.getLogging; import static com.devexperts.logging.Logging.getLogging;
@ -199,7 +200,7 @@ public class Backend implements Closeable {
applications.add(applicationConnectionState); applications.add(applicationConnectionState);
} }
BinaryProtocolProxy.runProxy(state.getStream(), applicationClientStream); BinaryProtocolProxy.runProxy(state.getStream(), applicationClientStream, new AtomicInteger());
} catch (Throwable e) { } catch (Throwable e) {
log.info("Application Connector: Got error " + e); log.info("Application Connector: Got error " + e);