proxy progress

This commit is contained in:
rusefi 2020-07-11 00:50:09 -04:00
parent 4703b4be6d
commit 3c6e9acf3b
9 changed files with 239 additions and 35 deletions

View File

@ -8,13 +8,14 @@ import etch.util.CircularByteBuffer;
import net.jcip.annotations.ThreadSafe;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import static com.rusefi.binaryprotocol.IoHelper.*;
/**
* Thread-safe byte queue with blocking {@link #waitForBytes} method
*
* <p>
* Andrey Belomutskiy, (c) 2013-2020
* 6/20/2015.
*/
@ -38,11 +39,11 @@ public class IncomingDataBuffer {
return incomingData;
}
public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse) throws InterruptedException, EOFException {
public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse) throws EOFException {
return getPacket(logger, msg, allowLongResponse, System.currentTimeMillis());
}
public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse, long start) throws InterruptedException, EOFException {
public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse, long start) throws EOFException {
boolean isTimeout = waitForBytes(msg + " header", start, 2);
if (isTimeout)
return null;
@ -90,7 +91,7 @@ public class IncomingDataBuffer {
*
* @return true in case of timeout, false if everything is fine
*/
public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) throws InterruptedException {
public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) {
logger.info(loggingMessage + ": waiting for " + count + " byte(s)");
synchronized (cbb) {
while (cbb.length() < count) {
@ -99,7 +100,11 @@ public class IncomingDataBuffer {
logger.info(loggingMessage + ": timeout. Got only " + cbb.length());
return true; // timeout. Sad face.
}
cbb.wait(timeout);
try {
cbb.wait(timeout);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
return false; // looks good!
@ -141,28 +146,28 @@ public class IncomingDataBuffer {
}
}
public byte readByte() throws EOFException, InterruptedException {
public byte readByte() throws IOException {
boolean timeout = waitForBytes("readByte", System.currentTimeMillis(), 1);
if (timeout)
throw new IllegalStateException("Timeout in readByte");
throw new IOException("Timeout in readByte");
return (byte) getByte();
}
public int readInt() throws EOFException, InterruptedException {
public int readInt() throws EOFException {
boolean timeout = waitForBytes("readInt", System.currentTimeMillis(), 4);
if (timeout)
throw new IllegalStateException("Timeout in readByte");
return swap32(getInt());
}
public short readShort() throws EOFException, InterruptedException {
public short readShort() throws EOFException {
boolean timeout = waitForBytes("readShort", System.currentTimeMillis(), 2);
if (timeout)
throw new IllegalStateException("Timeout in readShort");
return (short) swap16(getShort());
}
public int read(byte[] packet) throws InterruptedException {
public int read(byte[] packet) {
boolean timeout = waitForBytes("read", System.currentTimeMillis(), packet.length);
if (timeout)
throw new IllegalStateException("Timeout while waiting " + packet.length);

View File

@ -53,7 +53,7 @@ public interface IoStream extends WriteStream {
IncomingDataBuffer getDataBuffer();
default short readShort() throws EOFException, InterruptedException {
default short readShort() throws EOFException {
return getDataBuffer().readShort();
}
}

View File

@ -46,12 +46,12 @@ public class BinaryProtocolProxy {
proxyControllerResponseToClient(targetEcu, clientOutputStream);
}
} catch (IOException | InterruptedException e) {
} catch (IOException e) {
e.printStackTrace();
}
}
private static void proxyControllerResponseToClient(IoStream targetInputStream, DataOutputStream clientOutputStream) throws IOException, InterruptedException {
private static void proxyControllerResponseToClient(IoStream targetInputStream, DataOutputStream clientOutputStream) throws IOException {
short length = targetInputStream.readShort();
BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(targetInputStream.getDataBuffer(), length);

View File

@ -14,6 +14,7 @@ import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@ -49,7 +50,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
Function<Socket, Runnable> clientSocketRunnableFactory = clientSocket -> () -> {
try {
runProxy(linkManager, clientSocket);
} catch (IOException | InterruptedException e) {
} catch (IOException e) {
logger.info("proxy connection: " + e);
}
};
@ -57,6 +58,15 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
tcpServerSocket(port, "BinaryProtocolServer", clientSocketRunnableFactory, logger, serverSocketCreationCallback);
}
/**
* Starts a new thread
*
* @param port server port to accept connections
* @param threadName
* @param clientSocketRunnableFactory method to invoke on a new thread for each new client connection
* @param logger
* @param serverSocketCreationCallback this callback is invoked once we open the server socket
*/
public static void tcpServerSocket(int port, String threadName, Function<Socket, Runnable> clientSocketRunnableFactory, final Logger logger, Listener serverSocketCreationCallback) {
Runnable runnable = new Runnable() {
@SuppressWarnings("InfiniteLoopStatement")
@ -69,7 +79,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
logger.error("Error binding server socket" + e);
return;
}
if (serverSocketCreationCallback!=null)
if (serverSocketCreationCallback != null)
serverSocketCreationCallback.onResult(null);
try {
@ -88,28 +98,32 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
}
@SuppressWarnings("InfiniteLoopStatement")
private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException, InterruptedException {
private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException {
TcpIoStream stream = new TcpIoStream(logger, clientSocket);
IncomingDataBuffer in = stream.getDataBuffer();
while (true) {
byte first = in.readByte();
if (first == COMMAND_PROTOCOL) {
AtomicBoolean handled = new AtomicBoolean();
Handler protocolCommandHandler = () -> {
handleProtocolCommand(clientSocket);
handled.set(true);
};
int length = getPacketLength(in, protocolCommandHandler);
if (handled.get()) {
continue;
}
int length = first * 256 + in.readByte();
System.out.println("Got [" + length + "] length promise");
if (length == 0)
throw new IOException("Zero length not expected");
byte[] packet = readPromisedBytes(in, length).getPacket();
byte[] packet = readPromisedBytes(in, length).packet;
if (packet.length == 0)
throw new IOException("Empty packet");
byte command = packet[0];
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet));
byte command = (byte) dis.read();
System.out.println("Got [" + (char) command + "/" + command + "] command");
if (command == COMMAND_HELLO) {
@ -124,8 +138,10 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
} else if (command == COMMAND_PAGE) {
stream.sendPacket(TS_OK.getBytes(), logger);
} else if (command == COMMAND_READ) {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet, 1, packet.length - 1));
handleRead(linkManager, dis, stream);
} else if (command == Fields.TS_CHUNK_WRITE_COMMAND) {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet, 1, packet.length - 1));
handleWrite(linkManager, packet, dis, stream);
} else if (command == Fields.TS_BURN_COMMAND) {
stream.sendPacket(new byte[]{TS_RESPONSE_BURN_OK}, logger);
@ -134,6 +150,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
// todo: relay command
stream.sendPacket(TS_OK.getBytes(), logger);
} else if (command == Fields.TS_OUTPUT_COMMAND) {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet, 1, packet.length - 1));
int offset = swap16(dis.readShort());
int count = swap16(dis.readShort());
System.out.println("TS_OUTPUT_COMMAND offset=" + offset + "/count=" + count);
@ -157,22 +174,30 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
}
}
public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler) throws IOException {
byte first = in.readByte();
if (first == COMMAND_PROTOCOL) {
protocolCommandHandler.handle();
}
return first * 256 + in.readByte();
}
public static Packet readPromisedBytes(DataInputStream in, int length) throws IOException {
if (length < 0)
throw new IllegalArgumentException(String.format("Negative %d %x", length, length));
byte[] packet = new byte[length];
int size = in.read(packet);
if (size != packet.length)
throw new IllegalStateException(size + " promised but " + packet.length + " arrived");
throw new IOException(size + " promised but " + packet.length + " arrived");
int crc = in.readInt();
if (crc != IoHelper.getCrc32(packet))
throw new IllegalStateException("CRC mismatch");
throw new IOException("CRC mismatch");
return new Packet(packet, crc);
}
public static Packet readPromisedBytes(IncomingDataBuffer in, int length) throws IOException, InterruptedException {
if (length < 0)
throw new IllegalArgumentException(String.format("Negative %d %x", length, length));
public static Packet readPromisedBytes(IncomingDataBuffer in, int length) throws IOException {
if (length <= 0)
throw new IOException("Unexpected packed length " + length);
byte[] packet = new byte[length];
int size = in.read(packet);
if (size != packet.length)
@ -184,6 +209,11 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
return new Packet(packet, crc);
}
public interface Handler {
void handle() throws IOException;
}
public static void handleProtocolCommand(Socket clientSocket) throws IOException {
System.out.println("Got plain F command");
OutputStream outputStream = clientSocket.getOutputStream();

View File

@ -1,4 +1,64 @@
package com.rusefi.server;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
public class Backend {
// guarded by own monitor
private final Set<ClientConnectionState> clients = new HashSet<>();
private final int clientTimeout;
public Backend(int clientTimeout) {
this.clientTimeout = clientTimeout;
new Thread(() -> {
while (true) {
runCleanup();
sleep(clientTimeout);
}
}, "rusEFI Server Cleanup").start();
}
private void runCleanup() {
List<ClientConnectionState> inactiveClients = new ArrayList<>();
synchronized (clients) {
long now = System.currentTimeMillis();
for (ClientConnectionState client : clients) {
if (now - client.getLastActivityTimestamp() > clientTimeout)
inactiveClients.add(client);
}
}
for (ClientConnectionState inactiveClient : inactiveClients) {
close(inactiveClient);
}
}
private void close(ClientConnectionState inactiveClient) {
inactiveClient.close();
synchronized (clients) {
clients.remove(inactiveClient);
}
}
public void register(ClientConnectionState clientConnectionState) {
synchronized (clients) {
clients.add(clientConnectionState);
}
}
public int getCount() {
synchronized (clients) {
return clients.size();
}
}
}

View File

@ -1,8 +1,55 @@
package com.rusefi;
package com.rusefi.server;
import com.opensr5.Logger;
import com.rusefi.binaryprotocol.BinaryProtocolCommands;
import com.rusefi.io.tcp.TcpIoStream;
import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
public class ClientConnectionState {
public ClientConnectionState(Socket clientSocket) {
private final Socket clientSocket;
private final Logger logger;
private long lastActivityTimestamp;
private boolean isClosed;
private TcpIoStream stream;
public ClientConnectionState(Socket clientSocket, Logger logger) {
this.clientSocket = clientSocket;
this.logger = logger;
try {
stream = new TcpIoStream(logger, clientSocket);
} catch (IOException e) {
close();
}
}
public long getLastActivityTimestamp() {
return lastActivityTimestamp;
}
public void close() {
isClosed = true;
close(clientSocket);
}
public void sayHello() {
try {
stream.sendPacket(new byte[]{BinaryProtocolCommands.COMMAND_HELLO}, logger);
} catch (IOException e) {
close();
}
}
private static void close(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException ignored) {
// ignored
}
}
}
}

View File

@ -55,7 +55,7 @@ public class SerialAutoChecker implements Runnable {
result.set(serialPort);
portFound.countDown();
}
} catch (IOException | InterruptedException ignore) {
} catch (IOException ignore) {
return;
} finally {
stream.close();

View File

@ -1,13 +1,16 @@
package com.rusefi;
import com.opensr5.Logger;
import com.rusefi.binaryprotocol.BinaryProtocolCommands;
import com.rusefi.binaryprotocol.IncomingDataBuffer;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream;
import java.io.IOException;
import java.net.Socket;
import static com.rusefi.io.TcpCommunicationIntegrationTest.LOCALHOST;
import static com.rusefi.io.tcp.BinaryProtocolServer.getPacketLength;
import static com.rusefi.io.tcp.BinaryProtocolServer.readPromisedBytes;
public class MockRusEfiDevice {
private final String signature;
@ -21,8 +24,22 @@ public class MockRusEfiDevice {
public void connect(int serverPort) throws IOException {
TcpIoStream stream = new TcpIoStream(logger, new Socket(LOCALHOST, serverPort));
IncomingDataBuffer in = stream.getDataBuffer();
new Thread(() -> {
try {
while (true) {
int length = getPacketLength(in, () -> {
throw new UnsupportedOperationException();
});
byte[] packet = readPromisedBytes(in, length).getPacket();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}

View File

@ -1,12 +1,57 @@
package com.rusefi;
import com.opensr5.Logger;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.server.Backend;
import com.rusefi.server.ClientConnectionState;
import org.junit.Test;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* integration test of the rusEFI online backend process
*/
public class ServerTest {
private final static Logger logger = Logger.CONSOLE;
@Test
public void testSessionTimeout() {
public void testSessionTimeout() throws InterruptedException, IOException {
int serverPort = 7000;
CountDownLatch serverCreated = new CountDownLatch(1);
Backend backend = new Backend(3 * Timeouts.SECOND);
BinaryProtocolServer.tcpServerSocket(serverPort, "Server", new Function<Socket, Runnable>() {
@Override
public Runnable apply(Socket clientSocket) {
return new Runnable() {
@Override
public void run() {
ClientConnectionState clientConnectionState = new ClientConnectionState(clientSocket, logger);
clientConnectionState.sayHello();
backend.register(clientConnectionState);
}
};
}
}, logger, parameter -> serverCreated.countDown());
assertTrue(serverCreated.await(30, TimeUnit.SECONDS));
assertEquals(0, backend.getCount());
new MockRusEfiDevice("rusEFI 2020.07.06.frankenso_na6.2468827536", logger).connect(serverPort);
new MockRusEfiDevice("rusEFI 2020.07.11.proteus_f4.1986715563", logger).connect(serverPort);
}
}