proxy progress - connector timeout bugfix

This commit is contained in:
rusefi 2020-07-22 19:21:16 -04:00
parent a6982273ea
commit 36e5288ef6
3 changed files with 29 additions and 13 deletions

View File

@ -99,10 +99,14 @@ public class IncomingDataBuffer {
* @return true in case of timeout, false if everything is fine
*/
public boolean waitForBytes(String loggingMessage, long startTimestamp, int count) {
return waitForBytes(Timeouts.BINARY_IO_TIMEOUT, loggingMessage, startTimestamp, count);
}
public boolean waitForBytes(int timeoutMs, String loggingMessage, long startTimestamp, int count) {
logger.info(loggingMessage + ": waiting for " + count + " byte(s)");
synchronized (cbb) {
while (cbb.length() < count) {
int timeout = (int) (startTimestamp + Timeouts.BINARY_IO_TIMEOUT - System.currentTimeMillis());
int timeout = (int) (startTimestamp + timeoutMs - System.currentTimeMillis());
if (timeout <= 0) {
logger.info(loggingMessage + ": timeout. Got only " + cbb.length());
return true; // timeout. Sad face.
@ -155,29 +159,33 @@ public class IncomingDataBuffer {
}
public byte readByte() throws IOException {
boolean timeout = waitForBytes(loggingPrefix + "readByte", System.currentTimeMillis(), 1);
if (timeout)
return readByte(Timeouts.BINARY_IO_TIMEOUT);
}
public byte readByte(int timeoutMs) throws IOException {
boolean isTimeout = waitForBytes(timeoutMs,loggingPrefix + "readByte", System.currentTimeMillis(), 1);
if (isTimeout)
throw new IOException("Timeout in readByte");
return (byte) getByte();
}
public int readInt() throws EOFException {
boolean timeout = waitForBytes(loggingPrefix + "readInt", System.currentTimeMillis(), 4);
if (timeout)
boolean isTimeout = waitForBytes(loggingPrefix + "readInt", System.currentTimeMillis(), 4);
if (isTimeout)
throw new IllegalStateException("Timeout in readByte");
return swap32(getInt());
}
public short readShort() throws EOFException {
boolean timeout = waitForBytes(loggingPrefix + "readShort", System.currentTimeMillis(), 2);
if (timeout)
boolean isTimeout = waitForBytes(loggingPrefix + "readShort", System.currentTimeMillis(), 2);
if (isTimeout)
throw new IllegalStateException("Timeout in readShort");
return (short) swap16(getShort());
}
public int read(byte[] packet) {
boolean timeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length);
if (timeout)
boolean isTimeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length);
if (isTimeout)
throw new IllegalStateException("Timeout while waiting " + packet.length);
getData(packet);
return packet.length;

View File

@ -3,6 +3,7 @@ package com.rusefi.io.tcp;
import com.opensr5.ConfigurationImage;
import com.opensr5.Logger;
import com.rusefi.Listener;
import com.rusefi.Timeouts;
import com.rusefi.binaryprotocol.*;
import com.rusefi.config.generated.Fields;
import com.rusefi.io.IoStream;
@ -183,11 +184,15 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
}
public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler) throws IOException {
byte first = in.readByte();
return getPacketLength(in, protocolCommandHandler, Timeouts.BINARY_IO_TIMEOUT);
}
public static int getPacketLength(IncomingDataBuffer in, Handler protocolCommandHandler, int ioTimeout) throws IOException {
byte first = in.readByte(ioTimeout);
if (first == COMMAND_PROTOCOL) {
protocolCommandHandler.handle();
}
return first * 256 + in.readByte();
return first * 256 + in.readByte(ioTimeout);
}
public static Packet readPromisedBytes(DataInputStream in, int length) throws IOException {

View File

@ -2,7 +2,7 @@ package com.rusefi.proxy;
import com.opensr5.Logger;
import com.rusefi.NamedThreadFactory;
import com.rusefi.binaryprotocol.BinaryProtocol;
import com.rusefi.Timeouts;
import com.rusefi.binaryprotocol.IncomingDataBuffer;
import com.rusefi.config.generated.Fields;
import com.rusefi.io.commands.HelloCommand;
@ -18,8 +18,11 @@ import static com.rusefi.io.tcp.BinaryProtocolServer.readPromisedBytes;
public class BaseBroadcastingThread {
private static final NamedThreadFactory BASE_BROADCASTING_THREAD = new NamedThreadFactory("BaseBroadcastingThread");
// we expect server to at least request output channels once in a while
private static final int IO_TIMEOUT = 600 * Timeouts.SECOND;
private final Thread thread;
@SuppressWarnings("InfiniteLoopStatement")
public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, Logger logger) throws IOException {
TcpIoStream stream = new TcpIoStream("[broadcast] ", logger, socket);
IncomingDataBuffer in = stream.getDataBuffer();
@ -29,7 +32,7 @@ public class BaseBroadcastingThread {
while (true) {
int length = getPacketLength(in, () -> {
throw new UnsupportedOperationException();
});
}, IO_TIMEOUT);
BinaryProtocolServer.Packet packet = readPromisedBytes(in, length);
byte[] payload = packet.getPacket();