From b657c637153a4898deb15e9952d74e6aa62ecd93 Mon Sep 17 00:00:00 2001 From: rusefi Date: Sat, 25 Jul 2020 20:38:15 -0400 Subject: [PATCH] all this is too broken, need proper test for all this :( --- .../binaryprotocol/IncomingDataBuffer.java | 10 ++++++- .../src/main/java/com/rusefi/io/IoStream.java | 4 +-- .../rusefi/io/tcp/BinaryProtocolProxy.java | 28 +++++++++++-------- .../rusefi/io/tcp/BinaryProtocolServer.java | 1 + 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java index 784e8e02c9..4117492925 100644 --- a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java +++ b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java @@ -50,6 +50,10 @@ public class IncomingDataBuffer { return getPacket(msg, allowLongResponse, System.currentTimeMillis()); } + /** + * why does this method return NULL in case of timeout?! + * todo: there is a very similar BinaryProtocolServer#readPromisedBytes which throws exception in case of timeout + */ public byte[] getPacket(String msg, boolean allowLongResponse, long start) throws EOFException { boolean isTimeout = waitForBytes(msg + " header", start, 2); if (isTimeout) @@ -78,13 +82,17 @@ public class IncomingDataBuffer { log.debug(String.format("%x", actualCrc) + " vs " + String.format("%x", packetCrc)); return null; } - streamStats.onPacketArrived(); + onPacketArrived(); if (log.debugEnabled()) log.debug("packet " + Arrays.toString(packet) + ": crc OK"); return packet; } + public void onPacketArrived() { + streamStats.onPacketArrived(); + } + public void addData(byte[] freshData) { log.info("IncomingDataBuffer: " + freshData.length + " byte(s) arrived"); synchronized (cbb) { diff --git a/java_console/io/src/main/java/com/rusefi/io/IoStream.java b/java_console/io/src/main/java/com/rusefi/io/IoStream.java index 4ddfef2f99..d49bb6cd3b 100644 --- a/java_console/io/src/main/java/com/rusefi/io/IoStream.java +++ b/java_console/io/src/main/java/com/rusefi/io/IoStream.java @@ -1,7 +1,6 @@ package com.rusefi.io; import com.devexperts.logging.Logging; -import com.opensr5.Logger; import com.opensr5.io.DataListener; import com.opensr5.io.WriteStream; import com.rusefi.binaryprotocol.BinaryProtocol; @@ -11,6 +10,7 @@ import com.rusefi.io.serial.AbstractIoStream; import com.rusefi.io.tcp.BinaryProtocolServer; import org.jetbrains.annotations.NotNull; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; @@ -23,7 +23,7 @@ import static com.devexperts.logging.Logging.getLogging; *

* 5/11/2015. */ -public interface IoStream extends WriteStream { +public interface IoStream extends WriteStream, Closeable { Logging log = getLogging(IoStream.class); static String printHexBinary(byte[] data) { diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java index d32f77e1b3..a38d188c11 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolProxy.java @@ -5,6 +5,7 @@ import com.rusefi.Timeouts; import com.rusefi.binaryprotocol.BinaryProtocol; import com.rusefi.binaryprotocol.IncomingDataBuffer; import com.rusefi.io.IoStream; +import org.jetbrains.annotations.NotNull; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -15,6 +16,7 @@ import java.util.function.Function; import static com.devexperts.logging.Logging.getLogging; import static com.rusefi.binaryprotocol.BinaryProtocolCommands.COMMAND_PROTOCOL; import static com.rusefi.config.generated.Fields.TS_PROTOCOL; +import static com.rusefi.shared.FileUtil.close; public class BinaryProtocolProxy { private static final Logging log = getLogging(BinaryProtocolProxy.class); @@ -27,11 +29,13 @@ public class BinaryProtocolProxy { public static ServerHolder createProxy(IoStream targetEcuSocket, int serverProxyPort) { Function clientSocketRunnableFactory = clientSocket -> () -> { + TcpIoStream clientStream = null; try { - TcpIoStream clientStream = new TcpIoStream("[[proxy]] ", clientSocket); + clientStream = new TcpIoStream("[[proxy]] ", clientSocket); runProxy(targetEcuSocket, clientStream); } catch (IOException e) { log.error("BinaryProtocolProxy::run " + e); + close(clientStream); } }; return BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, null); @@ -48,25 +52,25 @@ public class BinaryProtocolProxy { clientStream.write(TS_PROTOCOL.getBytes()); continue; } - proxyClientRequestToController(clientStream.getDataBuffer(), firstByte, targetEcu); + BinaryProtocolServer.Packet clientRequest = readClientRequest(clientStream.getDataBuffer(), firstByte); - proxyControllerResponseToClient(targetEcu, clientStream); + sendToTarget(targetEcu, clientRequest); + BinaryProtocolServer.Packet controllerResponse = targetEcu.readPacket(); + + log.info("Relaying controller response length=" + controllerResponse.getPacket().length); + clientStream.sendPacket(controllerResponse); } } - public static void proxyControllerResponseToClient(IoStream targetInputStream, IoStream clientOutputStream) throws IOException { - BinaryProtocolServer.Packet packet = targetInputStream.readPacket(); - - log.info("Relaying controller response length=" + packet.getPacket().length); - clientOutputStream.sendPacket(packet); - } - - private static void proxyClientRequestToController(IncomingDataBuffer in, byte firstByte, IoStream targetOutputStream) throws IOException { + @NotNull + private static BinaryProtocolServer.Packet readClientRequest(IncomingDataBuffer in, byte firstByte) throws IOException { byte secondByte = in.readByte(); int length = firstByte * 256 + secondByte; - BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(in, length); + return BinaryProtocolServer.readPromisedBytes(in, length); + } + private static void sendToTarget(IoStream targetOutputStream, BinaryProtocolServer.Packet packet) throws IOException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet.getPacket())); byte command = (byte) dis.read(); diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java index 4052d95acd..ec1cf99f08 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java @@ -229,6 +229,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { int fromPacket = IoHelper.getCrc32(packet); if (crc != fromPacket) throw new IllegalStateException("CRC mismatch crc=" + Integer.toString(crc, 16) + " vs packet=" + Integer.toString(fromPacket, 16) + " len=" + packet.length + " data: " + IoStream.printHexBinary(packet)); + in.onPacketArrived(); return new Packet(packet, crc); }