proxy progress - connector timeout bugfix

This commit is contained in:
rusefi 2020-07-22 19:21:16 -04:00
parent 8c55c851b0
commit 3902655f72
3 changed files with 29 additions and 13 deletions

View File

@ -99,10 +99,14 @@ public class IncomingDataBuffer {
* @return true in case of timeout, false if everything is fine * @return true in case of timeout, false if everything is fine
*/ */
public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) { public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) {
return waitForBytes(Timeouts.BINARY_IO_TIMEOUT, loggingMessage, startTimestamp, count);
}
public boolean waitForBytes(int timeoutMs, String loggingMessage, long startTimestamp, int count) {
logger.info(loggingMessage + ": waiting for " + count + " byte(s)"); logger.info(loggingMessage + ": waiting for " + count + " byte(s)");
synchronized (cbb) { synchronized (cbb) {
while (cbb.length() < count) { while (cbb.length() < count) {
int timeout = (int) (startTimestamp + Timeouts.BINARY_IO_TIMEOUT - System.currentTimeMillis()); int timeout = (int) (startTimestamp + timeoutMs - System.currentTimeMillis());
if (timeout <= 0) { if (timeout <= 0) {
logger.info(loggingMessage + ": timeout. Got only " + cbb.length()); logger.info(loggingMessage + ": timeout. Got only " + cbb.length());
return true; // timeout. Sad face. return true; // timeout. Sad face.
@ -155,29 +159,33 @@ public class IncomingDataBuffer {
} }
public byte readByte() throws IOException { public byte readByte() throws IOException {
boolean timeout = waitForBytes(loggingPrefix + "readByte", System.currentTimeMillis(), 1); return readByte(Timeouts.BINARY_IO_TIMEOUT);
if (timeout) }
public byte readByte(int timeoutMs) throws IOException {
boolean isTimeout = waitForBytes(timeoutMs,loggingPrefix + "readByte", System.currentTimeMillis(), 1);
if (isTimeout)
throw new IOException("Timeout in readByte"); throw new IOException("Timeout in readByte");
return (byte) getByte(); return (byte) getByte();
} }
public int readInt() throws EOFException { public int readInt() throws EOFException {
boolean timeout = waitForBytes(loggingPrefix + "readInt", System.currentTimeMillis(), 4); boolean isTimeout = waitForBytes(loggingPrefix + "readInt", System.currentTimeMillis(), 4);
if (timeout) if (isTimeout)
throw new IllegalStateException("Timeout in readByte"); throw new IllegalStateException("Timeout in readByte");
return swap32(getInt()); return swap32(getInt());
} }
public short readShort() throws EOFException { public short readShort() throws EOFException {
boolean timeout = waitForBytes(loggingPrefix + "readShort", System.currentTimeMillis(), 2); boolean isTimeout = waitForBytes(loggingPrefix + "readShort", System.currentTimeMillis(), 2);
if (timeout) if (isTimeout)
throw new IllegalStateException("Timeout in readShort"); throw new IllegalStateException("Timeout in readShort");
return (short) swap16(getShort()); return (short) swap16(getShort());
} }
public int read(byte[] packet) { public int read(byte[] packet) {
boolean timeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length); boolean isTimeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length);
if (timeout) if (isTimeout)
throw new IllegalStateException("Timeout while waiting " + packet.length); throw new IllegalStateException("Timeout while waiting " + packet.length);
getData(packet); getData(packet);
return packet.length; return packet.length;

View File

@ -3,6 +3,7 @@ package com.rusefi.io.tcp;
import com.opensr5.ConfigurationImage; import com.opensr5.ConfigurationImage;
import com.opensr5.Logger; import com.opensr5.Logger;
import com.rusefi.Listener; import com.rusefi.Listener;
import com.rusefi.Timeouts;
import com.rusefi.binaryprotocol.*; import com.rusefi.binaryprotocol.*;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.IoStream; import com.rusefi.io.IoStream;
@ -183,11 +184,15 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
} }
public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler) throws IOException { public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler) throws IOException {
byte first = in.readByte(); return getPacketLength(in, protocolCommandHandler, Timeouts.BINARY_IO_TIMEOUT);
}
public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler, int ioTimeout) throws IOException {
byte first = in.readByte(ioTimeout);
if (first == COMMAND_PROTOCOL) { if (first == COMMAND_PROTOCOL) {
protocolCommandHandler.handle(); protocolCommandHandler.handle();
} }
return first * 256 + in.readByte(); return first * 256 + in.readByte(ioTimeout);
} }
public static Packet readPromisedBytes(DataInputStream in, int length) throws IOException { public static Packet readPromisedBytes(DataInputStream in, int length) throws IOException {

View File

@ -2,7 +2,7 @@ package com.rusefi.proxy;
import com.opensr5.Logger; import com.opensr5.Logger;
import com.rusefi.NamedThreadFactory; import com.rusefi.NamedThreadFactory;
import com.rusefi.binaryprotocol.BinaryProtocol; import com.rusefi.Timeouts;
import com.rusefi.binaryprotocol.IncomingDataBuffer; import com.rusefi.binaryprotocol.IncomingDataBuffer;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.commands.HelloCommand; import com.rusefi.io.commands.HelloCommand;
@ -18,8 +18,11 @@ import static com.rusefi.io.tcp.BinaryProtocolServer.readPromisedBytes;
public class BaseBroadcastingThread { public class BaseBroadcastingThread {
private static final NamedThreadFactory BASE_BROADCASTING_THREAD = new NamedThreadFactory("BaseBroadcastingThread"); private static final NamedThreadFactory BASE_BROADCASTING_THREAD = new NamedThreadFactory("BaseBroadcastingThread");
// we expect server to at least request output channels once in a while
private static final int IO_TIMEOUT = 600 * Timeouts.SECOND;
private final Thread thread; private final Thread thread;
@SuppressWarnings("InfiniteLoopStatement")
public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, Logger logger) throws IOException { public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, Logger logger) throws IOException {
TcpIoStream stream = new TcpIoStream("[broadcast] ", logger, socket); TcpIoStream stream = new TcpIoStream("[broadcast] ", logger, socket);
IncomingDataBuffer in = stream.getDataBuffer(); IncomingDataBuffer in = stream.getDataBuffer();
@ -29,7 +32,7 @@ public class BaseBroadcastingThread {
while (true) { while (true) {
int length = getPacketLength(in, () -> { int length = getPacketLength(in, () -> {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
}); }, IO_TIMEOUT);
BinaryProtocolServer.Packet packet = readPromisedBytes(in, length); BinaryProtocolServer.Packet packet = readPromisedBytes(in, length);
byte[] payload = packet.getPacket(); byte[] payload = packet.getPacket();