diff --git a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java
index 768601165a..31d4292b4e 100644
--- a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java
+++ b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java
@@ -8,13 +8,14 @@ import etch.util.CircularByteBuffer;
import net.jcip.annotations.ThreadSafe;
import java.io.EOFException;
+import java.io.IOException;
import java.util.Arrays;
import static com.rusefi.binaryprotocol.IoHelper.*;
/**
* Thread-safe byte queue with blocking {@link #waitForBytes} method
- *
+ *
* Andrey Belomutskiy, (c) 2013-2020
* 6/20/2015.
*/
@@ -38,11 +39,11 @@ public class IncomingDataBuffer {
return incomingData;
}
- public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse) throws InterruptedException, EOFException {
+ public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse) throws EOFException {
return getPacket(logger, msg, allowLongResponse, System.currentTimeMillis());
}
- public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse, long start) throws InterruptedException, EOFException {
+ public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse, long start) throws EOFException {
boolean isTimeout = waitForBytes(msg + " header", start, 2);
if (isTimeout)
return null;
@@ -90,7 +91,7 @@ public class IncomingDataBuffer {
*
* @return true in case of timeout, false if everything is fine
*/
- public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) throws InterruptedException {
+ public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) {
logger.info(loggingMessage + ": waiting for " + count + " byte(s)");
synchronized (cbb) {
while (cbb.length() < count) {
@@ -99,7 +100,11 @@ public class IncomingDataBuffer {
logger.info(loggingMessage + ": timeout. Got only " + cbb.length());
return true; // timeout. Sad face.
}
- cbb.wait(timeout);
+ try {
+ cbb.wait(timeout);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
}
}
return false; // looks good!
@@ -141,28 +146,28 @@ public class IncomingDataBuffer {
}
}
- public byte readByte() throws EOFException, InterruptedException {
+ public byte readByte() throws IOException {
boolean timeout = waitForBytes("readByte", System.currentTimeMillis(), 1);
if (timeout)
- throw new IllegalStateException("Timeout in readByte");
+ throw new IOException("Timeout in readByte");
return (byte) getByte();
}
- public int readInt() throws EOFException, InterruptedException {
+ public int readInt() throws EOFException {
boolean timeout = waitForBytes("readInt", System.currentTimeMillis(), 4);
if (timeout)
throw new IllegalStateException("Timeout in readByte");
return swap32(getInt());
}
- public short readShort() throws EOFException, InterruptedException {
+ public short readShort() throws EOFException {
boolean timeout = waitForBytes("readShort", System.currentTimeMillis(), 2);
if (timeout)
throw new IllegalStateException("Timeout in readShort");
return (short) swap16(getShort());
}
- public int read(byte[] packet) throws InterruptedException {
+ public int read(byte[] packet) {
boolean timeout = waitForBytes("read", System.currentTimeMillis(), packet.length);
if (timeout)
throw new IllegalStateException("Timeout while waiting " + packet.length);
diff --git a/java_console/io/src/main/java/com/rusefi/io/IoStream.java b/java_console/io/src/main/java/com/rusefi/io/IoStream.java
index 8f7df97012..7ad5d2e6ef 100644
--- a/java_console/io/src/main/java/com/rusefi/io/IoStream.java
+++ b/java_console/io/src/main/java/com/rusefi/io/IoStream.java
@@ -53,7 +53,7 @@ public interface IoStream extends WriteStream {
IncomingDataBuffer getDataBuffer();
- default short readShort() throws EOFException, InterruptedException {
+ default short readShort() throws EOFException {
return getDataBuffer().readShort();
}
}
diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java
index e18990653d..28729b51fc 100644
--- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java
+++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java
@@ -46,12 +46,12 @@ public class BinaryProtocolProxy {
proxyControllerResponseToClient(targetEcu, clientOutputStream);
}
- } catch (IOException | InterruptedException e) {
+ } catch (IOException e) {
e.printStackTrace();
}
}
- private static void proxyControllerResponseToClient(IoStream targetInputStream, DataOutputStream clientOutputStream) throws IOException, InterruptedException {
+ private static void proxyControllerResponseToClient(IoStream targetInputStream, DataOutputStream clientOutputStream) throws IOException {
short length = targetInputStream.readShort();
BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(targetInputStream.getDataBuffer(), length);
diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java
index f6e2026f5d..216b84678c 100644
--- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java
+++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java
@@ -14,6 +14,7 @@ import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -49,7 +50,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
Function clientSocketRunnableFactory = clientSocket -> () -> {
try {
runProxy(linkManager, clientSocket);
- } catch (IOException | InterruptedException e) {
+ } catch (IOException e) {
logger.info("proxy connection: " + e);
}
};
@@ -57,6 +58,15 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
tcpServerSocket(port, "BinaryProtocolServer", clientSocketRunnableFactory, logger, serverSocketCreationCallback);
}
+ /**
+ * Starts a new thread
+ *
+ * @param port server port to accept connections
+ * @param threadName
+ * @param clientSocketRunnableFactory method to invoke on a new thread for each new client connection
+ * @param logger
+ * @param serverSocketCreationCallback this callback is invoked once we open the server socket
+ */
public static void tcpServerSocket(int port, String threadName, Function clientSocketRunnableFactory, final Logger logger, Listener serverSocketCreationCallback) {
Runnable runnable = new Runnable() {
@SuppressWarnings("InfiniteLoopStatement")
@@ -69,7 +79,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
logger.error("Error binding server socket" + e);
return;
}
- if (serverSocketCreationCallback!=null)
+ if (serverSocketCreationCallback != null)
serverSocketCreationCallback.onResult(null);
try {
@@ -88,28 +98,32 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
}
@SuppressWarnings("InfiniteLoopStatement")
- private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException, InterruptedException {
+ private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException {
TcpIoStream stream = new TcpIoStream(logger, clientSocket);
IncomingDataBuffer in = stream.getDataBuffer();
while (true) {
- byte first = in.readByte();
- if (first == COMMAND_PROTOCOL) {
+ AtomicBoolean handled = new AtomicBoolean();
+ Handler protocolCommandHandler = () -> {
handleProtocolCommand(clientSocket);
+ handled.set(true);
+ };
+
+ int length = getPacketLength(in, protocolCommandHandler);
+ if (handled.get()) {
continue;
}
- int length = first * 256 + in.readByte();
System.out.println("Got [" + length + "] length promise");
- if (length == 0)
- throw new IOException("Zero length not expected");
+ byte[] packet = readPromisedBytes(in, length).getPacket();
- byte[] packet = readPromisedBytes(in, length).packet;
+ if (packet.length == 0)
+ throw new IOException("Empty packet");
+
+ byte command = packet[0];
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet));
- byte command = (byte) dis.read();
System.out.println("Got [" + (char) command + "/" + command + "] command");
if (command == COMMAND_HELLO) {
@@ -124,8 +138,10 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
} else if (command == COMMAND_PAGE) {
stream.sendPacket(TS_OK.getBytes(), logger);
} else if (command == COMMAND_READ) {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet, 1, packet.length - 1));
handleRead(linkManager, dis, stream);
} else if (command == Fields.TS_CHUNK_WRITE_COMMAND) {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet, 1, packet.length - 1));
handleWrite(linkManager, packet, dis, stream);
} else if (command == Fields.TS_BURN_COMMAND) {
stream.sendPacket(new byte[]{TS_RESPONSE_BURN_OK}, logger);
@@ -134,6 +150,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
// todo: relay command
stream.sendPacket(TS_OK.getBytes(), logger);
} else if (command == Fields.TS_OUTPUT_COMMAND) {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet, 1, packet.length - 1));
int offset = swap16(dis.readShort());
int count = swap16(dis.readShort());
System.out.println("TS_OUTPUT_COMMAND offset=" + offset + "/count=" + count);
@@ -157,22 +174,30 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
}
}
+ public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler) throws IOException {
+ byte first = in.readByte();
+ if (first == COMMAND_PROTOCOL) {
+ protocolCommandHandler.handle();
+ }
+ return first * 256 + in.readByte();
+ }
+
public static Packet readPromisedBytes(DataInputStream in, int length) throws IOException {
if (length < 0)
throw new IllegalArgumentException(String.format("Negative %d %x", length, length));
byte[] packet = new byte[length];
int size = in.read(packet);
if (size != packet.length)
- throw new IllegalStateException(size + " promised but " + packet.length + " arrived");
+ throw new IOException(size + " promised but " + packet.length + " arrived");
int crc = in.readInt();
if (crc != IoHelper.getCrc32(packet))
- throw new IllegalStateException("CRC mismatch");
+ throw new IOException("CRC mismatch");
return new Packet(packet, crc);
}
- public static Packet readPromisedBytes(IncomingDataBuffer in, int length) throws IOException, InterruptedException {
- if (length < 0)
- throw new IllegalArgumentException(String.format("Negative %d %x", length, length));
+ public static Packet readPromisedBytes(IncomingDataBuffer in, int length) throws IOException {
+ if (length <= 0)
+ throw new IOException("Unexpected packed length " + length);
byte[] packet = new byte[length];
int size = in.read(packet);
if (size != packet.length)
@@ -184,6 +209,11 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
return new Packet(packet, crc);
}
+
+ public interface Handler {
+ void handle() throws IOException;
+ }
+
public static void handleProtocolCommand(Socket clientSocket) throws IOException {
System.out.println("Got plain F command");
OutputStream outputStream = clientSocket.getOutputStream();
diff --git a/java_console/io/src/main/java/com/rusefi/server/Backend.java b/java_console/io/src/main/java/com/rusefi/server/Backend.java
index d803168a50..c2e23cd1ab 100644
--- a/java_console/io/src/main/java/com/rusefi/server/Backend.java
+++ b/java_console/io/src/main/java/com/rusefi/server/Backend.java
@@ -1,4 +1,64 @@
package com.rusefi.server;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
+
public class Backend {
+
+ // guarded by own monitor
+ private final Set clients = new HashSet<>();
+ private final int clientTimeout;
+
+ public Backend(int clientTimeout) {
+ this.clientTimeout = clientTimeout;
+
+ new Thread(() -> {
+ while (true) {
+ runCleanup();
+ sleep(clientTimeout);
+ }
+ }, "rusEFI Server Cleanup").start();
+
+
+ }
+
+ private void runCleanup() {
+ List inactiveClients = new ArrayList<>();
+
+ synchronized (clients) {
+ long now = System.currentTimeMillis();
+ for (ClientConnectionState client : clients) {
+ if (now - client.getLastActivityTimestamp() > clientTimeout)
+ inactiveClients.add(client);
+ }
+ }
+
+ for (ClientConnectionState inactiveClient : inactiveClients) {
+ close(inactiveClient);
+ }
+
+ }
+
+ private void close(ClientConnectionState inactiveClient) {
+ inactiveClient.close();
+ synchronized (clients) {
+ clients.remove(inactiveClient);
+ }
+ }
+
+ public void register(ClientConnectionState clientConnectionState) {
+ synchronized (clients) {
+ clients.add(clientConnectionState);
+ }
+ }
+
+ public int getCount() {
+ synchronized (clients) {
+ return clients.size();
+ }
+ }
}
diff --git a/java_console/io/src/main/java/com/rusefi/server/ClientConnectionState.java b/java_console/io/src/main/java/com/rusefi/server/ClientConnectionState.java
index 4702921729..a7d71d70b7 100644
--- a/java_console/io/src/main/java/com/rusefi/server/ClientConnectionState.java
+++ b/java_console/io/src/main/java/com/rusefi/server/ClientConnectionState.java
@@ -1,8 +1,55 @@
-package com.rusefi;
+package com.rusefi.server;
+import com.opensr5.Logger;
+import com.rusefi.binaryprotocol.BinaryProtocolCommands;
+import com.rusefi.io.tcp.TcpIoStream;
+
+import java.io.Closeable;
+import java.io.IOException;
import java.net.Socket;
public class ClientConnectionState {
- public ClientConnectionState(Socket clientSocket) {
+ private final Socket clientSocket;
+ private final Logger logger;
+
+ private long lastActivityTimestamp;
+ private boolean isClosed;
+ private TcpIoStream stream;
+
+ public ClientConnectionState(Socket clientSocket, Logger logger) {
+ this.clientSocket = clientSocket;
+ this.logger = logger;
+ try {
+ stream = new TcpIoStream(logger, clientSocket);
+ } catch (IOException e) {
+ close();
+ }
+ }
+
+ public long getLastActivityTimestamp() {
+ return lastActivityTimestamp;
+ }
+
+ public void close() {
+ isClosed = true;
+ close(clientSocket);
+ }
+
+ public void sayHello() {
+ try {
+ stream.sendPacket(new byte[]{BinaryProtocolCommands.COMMAND_HELLO}, logger);
+ } catch (IOException e) {
+ close();
+ }
+ }
+
+ private static void close(Closeable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException ignored) {
+ // ignored
+ }
+ }
}
}
diff --git a/java_console/ui/src/main/java/com/rusefi/autodetect/SerialAutoChecker.java b/java_console/ui/src/main/java/com/rusefi/autodetect/SerialAutoChecker.java
index 620e4f3a9b..af3b301865 100644
--- a/java_console/ui/src/main/java/com/rusefi/autodetect/SerialAutoChecker.java
+++ b/java_console/ui/src/main/java/com/rusefi/autodetect/SerialAutoChecker.java
@@ -55,7 +55,7 @@ public class SerialAutoChecker implements Runnable {
result.set(serialPort);
portFound.countDown();
}
- } catch (IOException | InterruptedException ignore) {
+ } catch (IOException ignore) {
return;
} finally {
stream.close();
diff --git a/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java b/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java
index f1c2165a42..23392332b4 100644
--- a/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java
+++ b/java_console/ui/src/test/java/com/rusefi/MockRusEfiDevice.java
@@ -1,13 +1,16 @@
package com.rusefi;
import com.opensr5.Logger;
-import com.rusefi.binaryprotocol.BinaryProtocolCommands;
+import com.rusefi.binaryprotocol.IncomingDataBuffer;
+import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream;
import java.io.IOException;
import java.net.Socket;
import static com.rusefi.io.TcpCommunicationIntegrationTest.LOCALHOST;
+import static com.rusefi.io.tcp.BinaryProtocolServer.getPacketLength;
+import static com.rusefi.io.tcp.BinaryProtocolServer.readPromisedBytes;
public class MockRusEfiDevice {
private final String signature;
@@ -21,8 +24,22 @@ public class MockRusEfiDevice {
public void connect(int serverPort) throws IOException {
TcpIoStream stream = new TcpIoStream(logger, new Socket(LOCALHOST, serverPort));
+ IncomingDataBuffer in = stream.getDataBuffer();
+
+ new Thread(() -> {
+
+ try {
+ while (true) {
+ int length = getPacketLength(in, () -> {
+ throw new UnsupportedOperationException();
+ });
+ byte[] packet = readPromisedBytes(in, length).getPacket();
-
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }).start();
}
}
diff --git a/java_console/ui/src/test/java/com/rusefi/ServerTest.java b/java_console/ui/src/test/java/com/rusefi/ServerTest.java
index 29278fd26d..d25c308844 100644
--- a/java_console/ui/src/test/java/com/rusefi/ServerTest.java
+++ b/java_console/ui/src/test/java/com/rusefi/ServerTest.java
@@ -1,12 +1,57 @@
package com.rusefi;
+import com.opensr5.Logger;
+import com.rusefi.io.tcp.BinaryProtocolServer;
+import com.rusefi.server.Backend;
+import com.rusefi.server.ClientConnectionState;
import org.junit.Test;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * integration test of the rusEFI online backend process
+ */
public class ServerTest {
+ private final static Logger logger = Logger.CONSOLE;
+
@Test
- public void testSessionTimeout() {
+ public void testSessionTimeout() throws InterruptedException, IOException {
int serverPort = 7000;
+ CountDownLatch serverCreated = new CountDownLatch(1);
+
+
+ Backend backend = new Backend(3 * Timeouts.SECOND);
+
+
+ BinaryProtocolServer.tcpServerSocket(serverPort, "Server", new Function() {
+ @Override
+ public Runnable apply(Socket clientSocket) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ ClientConnectionState clientConnectionState = new ClientConnectionState(clientSocket, logger);
+ clientConnectionState.sayHello();
+ backend.register(clientConnectionState);
+
+ }
+ };
+ }
+ }, logger, parameter -> serverCreated.countDown());
+
+ assertTrue(serverCreated.await(30, TimeUnit.SECONDS));
+ assertEquals(0, backend.getCount());
+
+
+ new MockRusEfiDevice("rusEFI 2020.07.06.frankenso_na6.2468827536", logger).connect(serverPort);
+ new MockRusEfiDevice("rusEFI 2020.07.11.proteus_f4.1986715563", logger).connect(serverPort);
}
}