From 10efceabfceaf7eb428f89a04639a74e12e737a7 Mon Sep 17 00:00:00 2001 From: rusefi Date: Wed, 8 Jul 2020 00:19:25 -0400 Subject: [PATCH] proxy progress --- .../rusefi/io/tcp/BinaryProtocolProxy.java | 84 +++++++++++++++++++ .../rusefi/io/tcp/BinaryProtocolServer.java | 81 ++++++++++++------ .../io/TcpCommunicationIntegrationTest.java | 29 ++++++- 3 files changed, 165 insertions(+), 29 deletions(-) create mode 100644 java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java new file mode 100644 index 0000000000..7c738b5ef1 --- /dev/null +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java @@ -0,0 +1,84 @@ +package com.rusefi.io.tcp; + +import com.opensr5.Logger; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.function.Function; + +import static com.rusefi.binaryprotocol.BinaryProtocolCommands.COMMAND_PROTOCOL; + +public class BinaryProtocolProxy { + public static void createProxy(Socket targetEcuSocket, int serverProxyPort) { + Function clientSocketRunnableFactory = new Function() { + @Override + public Runnable apply(Socket clientSocket) { + return new Runnable() { + @Override + public void run() { + runProxy(targetEcuSocket, clientSocket); + } + }; + } + }; + BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Logger.CONSOLE); + } + + private static void runProxy(Socket targetEcuSocket, Socket clientSocket) { + /* + * Each client socket is running on it's own thread + */ + try { + DataInputStream targetInputStream = new DataInputStream(targetEcuSocket.getInputStream()); + DataOutputStream targetOutputStream = new DataOutputStream(targetEcuSocket.getOutputStream()); + + DataInputStream clientInputStream = new DataInputStream(clientSocket.getInputStream()); + DataOutputStream clientOutputStream = new DataOutputStream(clientSocket.getOutputStream()); + + while (true) { + byte firstByte = clientInputStream.readByte(); + if (firstByte == COMMAND_PROTOCOL) { + BinaryProtocolServer.handleProtocolCommand(clientSocket); + continue; + } + proxyClientRequestToController(clientInputStream, firstByte, targetOutputStream); + + proxyControllerResponseToClient(targetInputStream, clientOutputStream); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static void proxyControllerResponseToClient(DataInputStream targetInputStream, DataOutputStream clientOutputStream) throws IOException { + short length = targetInputStream.readShort(); + BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(targetInputStream, length); + + System.out.println("Relaying controller response length=" + length); + clientOutputStream.writeShort(length); + clientOutputStream.write(packet.getPacket()); + clientOutputStream.writeInt(packet.getCrc()); + clientOutputStream.flush(); + } + + private static void proxyClientRequestToController(DataInputStream in, byte firstByte, DataOutputStream targetOutputStream) throws IOException { + byte secondByte = in.readByte(); + int length = firstByte * 256 + secondByte; + + BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(in, length); + + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet.getPacket())); + byte command = (byte) dis.read(); + + System.out.println("Relaying client command" + (char) command + "/" + command + " length=" + length); + // sending proxies packet to controller + targetOutputStream.write(firstByte); + targetOutputStream.write(secondByte); + targetOutputStream.write(packet.getPacket()); + targetOutputStream.writeInt(packet.getCrc()); + targetOutputStream.flush(); + } +} diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java index b123c0317b..b4eb525fae 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java @@ -13,6 +13,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import static com.rusefi.binaryprotocol.IoHelper.swap16; import static com.rusefi.config.generated.Fields.*; @@ -22,7 +23,7 @@ import static com.rusefi.config.generated.Fields.*; * serial port simultaneously * * @author Andrey Belomutskiy - * 11/24/15 + * 11/24/15 */ public class BinaryProtocolServer implements BinaryProtocolCommands { @@ -42,6 +43,19 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { public void start(LinkManager linkManager, int port) { logger.info("BinaryProtocolServer on " + port); + + Function clientSocketRunnableFactory = clientSocket -> () -> { + try { + runProxy(linkManager, clientSocket); + } catch (IOException e) { + logger.info("proxy connection: " + e); + } + }; + + tcpServerSocket(port, "BinaryProtocolServer", clientSocketRunnableFactory, logger); + } + + public static void tcpServerSocket(int port, String threadName, Function clientSocketRunnableFactory, final Logger logger) { Runnable runnable = new Runnable() { @SuppressWarnings("InfiniteLoopStatement") @Override @@ -59,23 +73,14 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { // Wait for a connection final Socket clientSocket = serverSocket.accept(); logger.info("Binary protocol proxy port connection"); - new Thread(new Runnable() { - @Override - public void run() { - try { - runProxy(linkManager, clientSocket); - } catch (IOException e) { - logger.info("proxy connection: " + e); - } - } - }, "proxy connection").start(); + new Thread(clientSocketRunnableFactory.apply(clientSocket), "proxy connection").start(); } } catch (IOException e) { throw new IllegalStateException(e); } } }; - new Thread(runnable, "BinaryProtocolServer").start(); + new Thread(runnable, threadName).start(); } @SuppressWarnings("InfiniteLoopStatement") @@ -85,14 +90,9 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { while (true) { byte first = in.readByte(); if (first == COMMAND_PROTOCOL) { - //System.out.println("Ignoring plain F command"); - System.out.println("Got plain F command"); - OutputStream outputStream = clientSocket.getOutputStream(); - outputStream.write(TS_PROTOCOL.getBytes()); - outputStream.flush(); + handleProtocolCommand(clientSocket); continue; } - int length = first * 256 + in.readByte(); System.out.println("Got [" + length + "] length promise"); @@ -100,19 +100,12 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { if (length == 0) throw new IOException("Zero length not expected"); - byte[] packet = new byte[length]; - int size = in.read(packet); - if (size != packet.length) - throw new IllegalStateException(); + byte[] packet = readPromisedBytes(in, length).packet; DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet)); byte command = (byte) dis.read(); System.out.println("Got [" + (char) command + "/" + command + "] command"); - int crc = in.readInt(); - if (crc != IoHelper.getCrc32(packet)) - throw new IllegalStateException("CRC mismatch"); - TcpIoStream stream = new TcpIoStream(logger, linkManager, clientSocket); if (command == COMMAND_HELLO) { stream.sendPacket((TS_OK + Fields.TS_SIGNATURE).getBytes(), logger); @@ -159,6 +152,24 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { } } + public static Packet readPromisedBytes(DataInputStream in, int length) throws IOException { + byte[] packet = new byte[length]; + int size = in.read(packet); + if (size != packet.length) + throw new IllegalStateException(); + int crc = in.readInt(); + if (crc != IoHelper.getCrc32(packet)) + throw new IllegalStateException("CRC mismatch"); + return new Packet(packet, crc); + } + + public static void handleProtocolCommand(Socket clientSocket) throws IOException { + System.out.println("Got plain F command"); + OutputStream outputStream = clientSocket.getOutputStream(); + outputStream.write(TS_PROTOCOL.getBytes()); + outputStream.flush(); + } + private void handleWrite(LinkManager linkManager, byte[] packet, DataInputStream dis, TcpIoStream stream) throws IOException { dis.readShort(); // page int offset = swap16(dis.readShort()); @@ -198,4 +209,22 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { new DataOutputStream(response).writeInt(result); stream.sendPacket(response.toByteArray(), logger); } + + public static class Packet { + private final byte[] packet; + private final int crc; + + public Packet(byte[] packet, int crc) { + this.packet = packet; + this.crc = crc; + } + + public byte[] getPacket() { + return packet; + } + + public int getCrc() { + return crc; + } + } } \ No newline at end of file diff --git a/java_console/ui/src/test/java/com/rusefi/io/TcpCommunicationIntegrationTest.java b/java_console/ui/src/test/java/com/rusefi/io/TcpCommunicationIntegrationTest.java index 4525cd5e9e..7221874b6c 100644 --- a/java_console/ui/src/test/java/com/rusefi/io/TcpCommunicationIntegrationTest.java +++ b/java_console/ui/src/test/java/com/rusefi/io/TcpCommunicationIntegrationTest.java @@ -7,11 +7,14 @@ import com.rusefi.binaryprotocol.BinaryProtocol; import com.rusefi.binaryprotocol.BinaryProtocolState; import com.rusefi.config.Field; import com.rusefi.config.generated.Fields; +import com.rusefi.io.tcp.BinaryProtocolProxy; import com.rusefi.io.tcp.BinaryProtocolServer; import com.rusefi.tune.xml.Constant; import org.jetbrains.annotations.NotNull; import org.junit.Test; +import java.io.*; +import java.net.Socket; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -84,11 +87,31 @@ public class TcpCommunicationIntegrationTest { } @Test - public void testProxy() { + public void testProxy() throws InterruptedException, IOException { ConfigurationImage serverImage = prepareImage(239, createIniField(Fields.CYLINDERSCOUNT)); - int port = 6102; - BinaryProtocolServer server = createVirtualController(serverImage, port); + int controllerPort = 6102; + BinaryProtocolServer server = createVirtualController(serverImage, controllerPort); + int proxyPort = 6103; + BinaryProtocolProxy.createProxy(new Socket("127.0.0.1", controllerPort), proxyPort); + + CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1); + + LinkManager clientManager = new LinkManager(LOGGER); + clientManager.startAndConnect(Integer.toString(proxyPort), new ConnectionStateListener() { + @Override + public void onConnectionEstablished() { + connectionEstablishedCountDownLatch.countDown(); + } + + @Override + public void onConnectionFailed() { + System.out.println("Failed"); + } + }); + assertTrue(connectionEstablishedCountDownLatch.await(30, TimeUnit.SECONDS)); + + clientManager.stop(); } @NotNull