From ee40960623f9a42f7b74d978c92f65e294c310dd Mon Sep 17 00:00:00 2001 From: rusefi Date: Sat, 11 Jul 2020 00:50:09 -0400 Subject: [PATCH] proxy progress --- .../binaryprotocol/IncomingDataBuffer.java | 25 +++++--- .../src/main/java/com/rusefi/io/IoStream.java | 2 +- .../rusefi/io/tcp/BinaryProtocolProxy.java | 4 +- .../rusefi/io/tcp/BinaryProtocolServer.java | 62 ++++++++++++++----- .../main/java/com/rusefi/server/Backend.java | 60 ++++++++++++++++++ .../rusefi/server/ClientConnectionState.java | 51 ++++++++++++++- .../rusefi/autodetect/SerialAutoChecker.java | 2 +- .../java/com/rusefi/MockRusEfiDevice.java | 21 ++++++- .../src/test/java/com/rusefi/ServerTest.java | 47 +++++++++++++- 9 files changed, 239 insertions(+), 35 deletions(-) diff --git a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java index 768601165a..31d4292b4e 100644 --- a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java +++ b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java @@ -8,13 +8,14 @@ import etch.util.CircularByteBuffer; import net.jcip.annotations.ThreadSafe; import java.io.EOFException; +import java.io.IOException; import java.util.Arrays; import static com.rusefi.binaryprotocol.IoHelper.*; /** * Thread-safe byte queue with blocking {@link #waitForBytes} method - * + *

* Andrey Belomutskiy, (c) 2013-2020 * 6/20/2015. */ @@ -38,11 +39,11 @@ public class IncomingDataBuffer { return incomingData; } - public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse) throws InterruptedException, EOFException { + public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse) throws EOFException { return getPacket(logger, msg, allowLongResponse, System.currentTimeMillis()); } - public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse, long start) throws InterruptedException, EOFException { + public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse, long start) throws EOFException { boolean isTimeout = waitForBytes(msg + " header", start, 2); if (isTimeout) return null; @@ -90,7 +91,7 @@ public class IncomingDataBuffer { * * @return true in case of timeout, false if everything is fine */ - public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) throws InterruptedException { + public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) { logger.info(loggingMessage + ": waiting for " + count + " byte(s)"); synchronized (cbb) { while (cbb.length() < count) { @@ -99,7 +100,11 @@ public class IncomingDataBuffer { logger.info(loggingMessage + ": timeout. Got only " + cbb.length()); return true; // timeout. Sad face. } - cbb.wait(timeout); + try { + cbb.wait(timeout); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } } } return false; // looks good! @@ -141,28 +146,28 @@ public class IncomingDataBuffer { } } - public byte readByte() throws EOFException, InterruptedException { + public byte readByte() throws IOException { boolean timeout = waitForBytes("readByte", System.currentTimeMillis(), 1); if (timeout) - throw new IllegalStateException("Timeout in readByte"); + throw new IOException("Timeout in readByte"); return (byte) getByte(); } - public int readInt() throws EOFException, InterruptedException { + public int readInt() throws EOFException { boolean timeout = waitForBytes("readInt", System.currentTimeMillis(), 4); if (timeout) throw new IllegalStateException("Timeout in readByte"); return swap32(getInt()); } - public short readShort() throws EOFException, InterruptedException { + public short readShort() throws EOFException { boolean timeout = waitForBytes("readShort", System.currentTimeMillis(), 2); if (timeout) throw new IllegalStateException("Timeout in readShort"); return (short) swap16(getShort()); } - public int read(byte[] packet) throws InterruptedException { + public int read(byte[] packet) { boolean timeout = waitForBytes("read", System.currentTimeMillis(), packet.length); if (timeout) throw new IllegalStateException("Timeout while waiting " + packet.length); diff --git a/java_console/io/src/main/java/com/rusefi/io/IoStream.java b/java_console/io/src/main/java/com/rusefi/io/IoStream.java index 8f7df97012..7ad5d2e6ef 100644 --- a/java_console/io/src/main/java/com/rusefi/io/IoStream.java +++ b/java_console/io/src/main/java/com/rusefi/io/IoStream.java @@ -53,7 +53,7 @@ public interface IoStream extends WriteStream { IncomingDataBuffer getDataBuffer(); - default short readShort() throws EOFException, InterruptedException { + default short readShort() throws EOFException { return getDataBuffer().readShort(); } } diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java index e18990653d..28729b51fc 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java @@ -46,12 +46,12 @@ public class BinaryProtocolProxy { proxyControllerResponseToClient(targetEcu, clientOutputStream); } - } catch (IOException | InterruptedException e) { + } catch (IOException e) { e.printStackTrace(); } } - private static void proxyControllerResponseToClient(IoStream targetInputStream, DataOutputStream clientOutputStream) throws IOException, InterruptedException { + private static void proxyControllerResponseToClient(IoStream targetInputStream, DataOutputStream clientOutputStream) throws IOException { short length = targetInputStream.readShort(); BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(targetInputStream.getDataBuffer(), length); diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java index f6e2026f5d..216b84678c 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java @@ -14,6 +14,7 @@ import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -49,7 +50,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { Function clientSocketRunnableFactory = clientSocket -> () -> { try { runProxy(linkManager, clientSocket); - } catch (IOException | InterruptedException e) { + } catch (IOException e) { logger.info("proxy connection: " + e); } }; @@ -57,6 +58,15 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { tcpServerSocket(port, "BinaryProtocolServer", clientSocketRunnableFactory, logger, serverSocketCreationCallback); } + /** + * Starts a new thread + * + * @param port server port to accept connections + * @param threadName + * @param clientSocketRunnableFactory method to invoke on a new thread for each new client connection + * @param logger + * @param serverSocketCreationCallback this callback is invoked once we open the server socket + */ public static void tcpServerSocket(int port, String threadName, Function clientSocketRunnableFactory, final Logger logger, Listener serverSocketCreationCallback) { Runnable runnable = new Runnable() { @SuppressWarnings("InfiniteLoopStatement") @@ -69,7 +79,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { logger.error("Error binding server socket" + e); return; } - if (serverSocketCreationCallback!=null) + if (serverSocketCreationCallback != null) serverSocketCreationCallback.onResult(null); try { @@ -88,28 +98,32 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { } @SuppressWarnings("InfiniteLoopStatement") - private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException, InterruptedException { + private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException { TcpIoStream stream = new TcpIoStream(logger, clientSocket); IncomingDataBuffer in = stream.getDataBuffer(); while (true) { - byte first = in.readByte(); - if (first == COMMAND_PROTOCOL) { + AtomicBoolean handled = new AtomicBoolean(); + Handler protocolCommandHandler = () -> { handleProtocolCommand(clientSocket); + handled.set(true); + }; + + int length = getPacketLength(in, protocolCommandHandler); + if (handled.get()) { continue; } - int length = first * 256 + in.readByte(); System.out.println("Got [" + length + "] length promise"); - if (length == 0) - throw new IOException("Zero length not expected"); + byte[] packet = readPromisedBytes(in, length).getPacket(); - byte[] packet = readPromisedBytes(in, length).packet; + if (packet.length == 0) + throw new IOException("Empty packet"); + + byte command = packet[0]; - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet)); - byte command = (byte) dis.read(); System.out.println("Got [" + (char) command + "/" + command + "] command"); if (command == COMMAND_HELLO) { @@ -124,8 +138,10 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { } else if (command == COMMAND_PAGE) { stream.sendPacket(TS_OK.getBytes(), logger); } else if (command == COMMAND_READ) { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet, 1, packet.length - 1)); handleRead(linkManager, dis, stream); } else if (command == Fields.TS_CHUNK_WRITE_COMMAND) { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet, 1, packet.length - 1)); handleWrite(linkManager, packet, dis, stream); } else if (command == Fields.TS_BURN_COMMAND) { stream.sendPacket(new byte[]{TS_RESPONSE_BURN_OK}, logger); @@ -134,6 +150,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { // todo: relay command stream.sendPacket(TS_OK.getBytes(), logger); } else if (command == Fields.TS_OUTPUT_COMMAND) { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet, 1, packet.length - 1)); int offset = swap16(dis.readShort()); int count = swap16(dis.readShort()); System.out.println("TS_OUTPUT_COMMAND offset=" + offset + "/count=" + count); @@ -157,22 +174,30 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { } } + public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler) throws IOException { + byte first = in.readByte(); + if (first == COMMAND_PROTOCOL) { + protocolCommandHandler.handle(); + } + return first * 256 + in.readByte(); + } + public static Packet readPromisedBytes(DataInputStream in, int length) throws IOException { if (length < 0) throw new IllegalArgumentException(String.format("Negative %d %x", length, length)); byte[] packet = new byte[length]; int size = in.read(packet); if (size != packet.length) - throw new IllegalStateException(size + " promised but " + packet.length + " arrived"); + throw new IOException(size + " promised but " + packet.length + " arrived"); int crc = in.readInt(); if (crc != IoHelper.getCrc32(packet)) - throw new IllegalStateException("CRC mismatch"); + throw new IOException("CRC mismatch"); return new Packet(packet, crc); } - public static Packet readPromisedBytes(IncomingDataBuffer in, int length) throws IOException, InterruptedException { - if (length < 0) - throw new IllegalArgumentException(String.format("Negative %d %x", length, length)); + public static Packet readPromisedBytes(IncomingDataBuffer in, int length) throws IOException { + if (length <= 0) + throw new IOException("Unexpected packed length " + length); byte[] packet = new byte[length]; int size = in.read(packet); if (size != packet.length) @@ -184,6 +209,11 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { return new Packet(packet, crc); } + + public interface Handler { + void handle() throws IOException; + } + public static void handleProtocolCommand(Socket clientSocket) throws IOException { System.out.println("Got plain F command"); OutputStream outputStream = clientSocket.getOutputStream(); diff --git a/java_console/io/src/main/java/com/rusefi/server/Backend.java b/java_console/io/src/main/java/com/rusefi/server/Backend.java index d803168a50..c2e23cd1ab 100644 --- a/java_console/io/src/main/java/com/rusefi/server/Backend.java +++ b/java_console/io/src/main/java/com/rusefi/server/Backend.java @@ -1,4 +1,64 @@ package com.rusefi.server; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.rusefi.binaryprotocol.BinaryProtocol.sleep; + public class Backend { + + // guarded by own monitor + private final Set clients = new HashSet<>(); + private final int clientTimeout; + + public Backend(int clientTimeout) { + this.clientTimeout = clientTimeout; + + new Thread(() -> { + while (true) { + runCleanup(); + sleep(clientTimeout); + } + }, "rusEFI Server Cleanup").start(); + + + } + + private void runCleanup() { + List inactiveClients = new ArrayList<>(); + + synchronized (clients) { + long now = System.currentTimeMillis(); + for (ClientConnectionState client : clients) { + if (now - client.getLastActivityTimestamp() > clientTimeout) + inactiveClients.add(client); + } + } + + for (ClientConnectionState inactiveClient : inactiveClients) { + close(inactiveClient); + } + + } + + private void close(ClientConnectionState inactiveClient) { + inactiveClient.close(); + synchronized (clients) { + clients.remove(inactiveClient); + } + } + + public void register(ClientConnectionState clientConnectionState) { + synchronized (clients) { + clients.add(clientConnectionState); + } + } + + public int getCount() { + synchronized (clients) { + return clients.size(); + } + } } diff --git a/java_console/io/src/main/java/com/rusefi/server/ClientConnectionState.java b/java_console/io/src/main/java/com/rusefi/server/ClientConnectionState.java index 4702921729..a7d71d70b7 100644 --- a/java_console/io/src/main/java/com/rusefi/server/ClientConnectionState.java +++ b/java_console/io/src/main/java/com/rusefi/server/ClientConnectionState.java @@ -1,8 +1,55 @@ -package com.rusefi; +package com.rusefi.server; +import com.opensr5.Logger; +import com.rusefi.binaryprotocol.BinaryProtocolCommands; +import com.rusefi.io.tcp.TcpIoStream; + +import java.io.Closeable; +import java.io.IOException; import java.net.Socket; public class ClientConnectionState { - public ClientConnectionState(Socket clientSocket) { + private final Socket clientSocket; + private final Logger logger; + + private long lastActivityTimestamp; + private boolean isClosed; + private TcpIoStream stream; + + public ClientConnectionState(Socket clientSocket, Logger logger) { + this.clientSocket = clientSocket; + this.logger = logger; + try { + stream = new TcpIoStream(logger, clientSocket); + } catch (IOException e) { + close(); + } + } + + public long getLastActivityTimestamp() { + return lastActivityTimestamp; + } + + public void close() { + isClosed = true; + close(clientSocket); + } + + public void sayHello() { + try { + stream.sendPacket(new byte[]{BinaryProtocolCommands.COMMAND_HELLO}, logger); + } catch (IOException e) { + close(); + } + } + + private static void close(Closeable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch (IOException ignored) { + // ignored + } + } } } diff --git a/java_console/ui/src/main/java/com/rusefi/autodetect/SerialAutoChecker.java b/java_console/ui/src/main/java/com/rusefi/autodetect/SerialAutoChecker.java index 620e4f3a9b..af3b301865 100644 --- a/java_console/ui/src/main/java/com/rusefi/autodetect/SerialAutoChecker.java +++ b/java_console/ui/src/main/java/com/rusefi/autodetect/SerialAutoChecker.java @@ -55,7 +55,7 @@ public class SerialAutoChecker implements Runnable { result.set(serialPort); portFound.countDown(); } - } catch (IOException | InterruptedException ignore) { + } catch (IOException ignore) { return; } finally { stream.close(); diff --git a/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java b/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java index f1c2165a42..23392332b4 100644 --- a/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java +++ b/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java @@ -1,13 +1,16 @@ package com.rusefi; import com.opensr5.Logger; -import com.rusefi.binaryprotocol.BinaryProtocolCommands; +import com.rusefi.binaryprotocol.IncomingDataBuffer; +import com.rusefi.io.tcp.BinaryProtocolServer; import com.rusefi.io.tcp.TcpIoStream; import java.io.IOException; import java.net.Socket; import static com.rusefi.io.TcpCommunicationIntegrationTest.LOCALHOST; +import static com.rusefi.io.tcp.BinaryProtocolServer.getPacketLength; +import static com.rusefi.io.tcp.BinaryProtocolServer.readPromisedBytes; public class MockRusEfiDevice { private final String signature; @@ -21,8 +24,22 @@ public class MockRusEfiDevice { public void connect(int serverPort) throws IOException { TcpIoStream stream = new TcpIoStream(logger, new Socket(LOCALHOST, serverPort)); + IncomingDataBuffer in = stream.getDataBuffer(); + + new Thread(() -> { + + try { + while (true) { + int length = getPacketLength(in, () -> { + throw new UnsupportedOperationException(); + }); + byte[] packet = readPromisedBytes(in, length).getPacket(); - + } + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); } } diff --git a/java_console/ui/src/test/java/com/rusefi/ServerTest.java b/java_console/ui/src/test/java/com/rusefi/ServerTest.java index 29278fd26d..d25c308844 100644 --- a/java_console/ui/src/test/java/com/rusefi/ServerTest.java +++ b/java_console/ui/src/test/java/com/rusefi/ServerTest.java @@ -1,12 +1,57 @@ package com.rusefi; +import com.opensr5.Logger; +import com.rusefi.io.tcp.BinaryProtocolServer; +import com.rusefi.server.Backend; +import com.rusefi.server.ClientConnectionState; import org.junit.Test; +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * integration test of the rusEFI online backend process + */ public class ServerTest { + private final static Logger logger = Logger.CONSOLE; + @Test - public void testSessionTimeout() { + public void testSessionTimeout() throws InterruptedException, IOException { int serverPort = 7000; + CountDownLatch serverCreated = new CountDownLatch(1); + + + Backend backend = new Backend(3 * Timeouts.SECOND); + + + BinaryProtocolServer.tcpServerSocket(serverPort, "Server", new Function() { + @Override + public Runnable apply(Socket clientSocket) { + return new Runnable() { + @Override + public void run() { + ClientConnectionState clientConnectionState = new ClientConnectionState(clientSocket, logger); + clientConnectionState.sayHello(); + backend.register(clientConnectionState); + + } + }; + } + }, logger, parameter -> serverCreated.countDown()); + + assertTrue(serverCreated.await(30, TimeUnit.SECONDS)); + assertEquals(0, backend.getCount()); + + + new MockRusEfiDevice("rusEFI 2020.07.06.frankenso_na6.2468827536", logger).connect(serverPort); + new MockRusEfiDevice("rusEFI 2020.07.11.proteus_f4.1986715563", logger).connect(serverPort); } }