From 36e5288ef61e43a3553e355deb866f5e315a63b9 Mon Sep 17 00:00:00 2001 From: rusefi Date: Wed, 22 Jul 2020 19:21:16 -0400 Subject: [PATCH] proxy progress - connector timeout bugfix --- .../binaryprotocol/IncomingDataBuffer.java | 26 ++++++++++++------- .../rusefi/io/tcp/BinaryProtocolServer.java | 9 +++++-- .../rusefi/proxy/BaseBroadcastingThread.java | 7 +++-- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java index 65372d0a0e..017fe2af9e 100644 --- a/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java +++ b/java_console/io/src/main/java/com/rusefi/binaryprotocol/IncomingDataBuffer.java @@ -99,10 +99,14 @@ public class IncomingDataBuffer { * @return true in case of timeout, false if everything is fine */ public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) { + return waitForBytes(Timeouts.BINARY_IO_TIMEOUT, loggingMessage, startTimestamp, count); + } + + public boolean waitForBytes(int timeoutMs, String loggingMessage, long startTimestamp, int count) { logger.info(loggingMessage + ": waiting for " + count + " byte(s)"); synchronized (cbb) { while (cbb.length() < count) { - int timeout = (int) (startTimestamp + Timeouts.BINARY_IO_TIMEOUT - System.currentTimeMillis()); + int timeout = (int) (startTimestamp + timeoutMs - System.currentTimeMillis()); if (timeout <= 0) { logger.info(loggingMessage + ": timeout. Got only " + cbb.length()); return true; // timeout. Sad face. @@ -155,29 +159,33 @@ public class IncomingDataBuffer { } public byte readByte() throws IOException { - boolean timeout = waitForBytes(loggingPrefix + "readByte", System.currentTimeMillis(), 1); - if (timeout) + return readByte(Timeouts.BINARY_IO_TIMEOUT); + } + + public byte readByte(int timeoutMs) throws IOException { + boolean isTimeout = waitForBytes(timeoutMs,loggingPrefix + "readByte", System.currentTimeMillis(), 1); + if (isTimeout) throw new IOException("Timeout in readByte"); return (byte) getByte(); } public int readInt() throws EOFException { - boolean timeout = waitForBytes(loggingPrefix + "readInt", System.currentTimeMillis(), 4); - if (timeout) + boolean isTimeout = waitForBytes(loggingPrefix + "readInt", System.currentTimeMillis(), 4); + if (isTimeout) throw new IllegalStateException("Timeout in readByte"); return swap32(getInt()); } public short readShort() throws EOFException { - boolean timeout = waitForBytes(loggingPrefix + "readShort", System.currentTimeMillis(), 2); - if (timeout) + boolean isTimeout = waitForBytes(loggingPrefix + "readShort", System.currentTimeMillis(), 2); + if (isTimeout) throw new IllegalStateException("Timeout in readShort"); return (short) swap16(getShort()); } public int read(byte[] packet) { - boolean timeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length); - if (timeout) + boolean isTimeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length); + if (isTimeout) throw new IllegalStateException("Timeout while waiting " + packet.length); getData(packet); return packet.length; diff --git a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java index 49df1ad78a..910340293c 100644 --- a/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java +++ b/java_console/io/src/main/java/com/rusefi/io/tcp/BinaryProtocolServer.java @@ -3,6 +3,7 @@ package com.rusefi.io.tcp; import com.opensr5.ConfigurationImage; import com.opensr5.Logger; import com.rusefi.Listener; +import com.rusefi.Timeouts; import com.rusefi.binaryprotocol.*; import com.rusefi.config.generated.Fields; import com.rusefi.io.IoStream; @@ -183,11 +184,15 @@ public class BinaryProtocolServer implements BinaryProtocolCommands { } public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler) throws IOException { - byte first = in.readByte(); + return getPacketLength(in, protocolCommandHandler, Timeouts.BINARY_IO_TIMEOUT); + } + + public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler, int ioTimeout) throws IOException { + byte first = in.readByte(ioTimeout); if (first == COMMAND_PROTOCOL) { protocolCommandHandler.handle(); } - return first * 256 + in.readByte(); + return first * 256 + in.readByte(ioTimeout); } public static Packet readPromisedBytes(DataInputStream in, int length) throws IOException { diff --git a/java_tools/proxy_server/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java b/java_tools/proxy_server/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java index 1e28b39392..4806b7fc7b 100644 --- a/java_tools/proxy_server/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java +++ b/java_tools/proxy_server/src/main/java/com/rusefi/proxy/BaseBroadcastingThread.java @@ -2,7 +2,7 @@ package com.rusefi.proxy; import com.opensr5.Logger; import com.rusefi.NamedThreadFactory; -import com.rusefi.binaryprotocol.BinaryProtocol; +import com.rusefi.Timeouts; import com.rusefi.binaryprotocol.IncomingDataBuffer; import com.rusefi.config.generated.Fields; import com.rusefi.io.commands.HelloCommand; @@ -18,8 +18,11 @@ import static com.rusefi.io.tcp.BinaryProtocolServer.readPromisedBytes; public class BaseBroadcastingThread { private static final NamedThreadFactory BASE_BROADCASTING_THREAD = new NamedThreadFactory("BaseBroadcastingThread"); + // we expect server to at least request output channels once in a while + private static final int IO_TIMEOUT = 600 * Timeouts.SECOND; private final Thread thread; + @SuppressWarnings("InfiniteLoopStatement") public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, Logger logger) throws IOException { TcpIoStream stream = new TcpIoStream("[broadcast] ", logger, socket); IncomingDataBuffer in = stream.getDataBuffer(); @@ -29,7 +32,7 @@ public class BaseBroadcastingThread { while (true) { int length = getPacketLength(in, () -> { throw new UnsupportedOperationException(); - }); + }, IO_TIMEOUT); BinaryProtocolServer.Packet packet = readPromisedBytes(in, length); byte[] payload = packet.getPacket();