refactoring

This commit is contained in:
rusefi 2019-08-14 21:46:00 -04:00
parent 64fc6d0f4f
commit e593ec6a7c
2 changed files with 35 additions and 28 deletions

View File

@ -197,34 +197,7 @@ public class BinaryProtocol implements BinaryProtocolCommands {
private byte[] receivePacket(String msg, boolean allowLongResponse) throws InterruptedException, EOFException {
long start = System.currentTimeMillis();
synchronized (ioLock) {
boolean isTimeout = incomingData.waitForBytes(msg + " header", start, 2);
if (isTimeout)
return null;
int packetSize = swap16(incomingData.getShort());
logger.trace("Got packet size " + packetSize);
if (packetSize < 0)
return null;
if (!allowLongResponse && packetSize > Math.max(BLOCKING_FACTOR, Fields.TS_OUTPUT_SIZE) + 10)
return null;
isTimeout = incomingData.waitForBytes(msg + " body", start, packetSize + 4);
if (isTimeout)
return null;
byte[] packet = new byte[packetSize];
incomingData.getData(packet);
int packetCrc = swap32(incomingData.getInt());
int actualCrc = getCrc32(packet);
boolean isCrcOk = actualCrc == packetCrc;
if (!isCrcOk) {
logger.trace(String.format("%x", actualCrc) + " vs " + String.format("%x", packetCrc));
return null;
}
logger.trace("packet " + Arrays.toString(packet) + ": crc OK");
return packet;
return incomingData.getPacket(logger, msg, allowLongResponse, start);
}
}

View File

@ -2,12 +2,15 @@ package com.rusefi.binaryprotocol;
import com.opensr5.Logger;
import com.rusefi.Timeouts;
import com.rusefi.config.generated.Fields;
import etch.util.CircularByteBuffer;
import net.jcip.annotations.ThreadSafe;
import java.io.EOFException;
import java.util.Arrays;
import static com.rusefi.binaryprotocol.IoHelper.*;
/**
* Thread-safe byte queue with blocking {@link #waitForBytes} method
*
@ -28,6 +31,37 @@ public class IncomingDataBuffer {
this.logger = logger;
}
public byte[] getPacket(Logger logger, String msg, boolean allowLongResponse, long start) throws InterruptedException, EOFException {
boolean isTimeout = waitForBytes(msg + " header", start, 2);
if (isTimeout)
return null;
int packetSize = swap16(getShort());
logger.trace("Got packet size " + packetSize);
if (packetSize < 0)
return null;
if (!allowLongResponse && packetSize > Math.max(BinaryProtocolCommands.BLOCKING_FACTOR, Fields.TS_OUTPUT_SIZE) + 10)
return null;
isTimeout = waitForBytes(msg + " body", start, packetSize + 4);
if (isTimeout)
return null;
byte[] packet = new byte[packetSize];
getData(packet);
int packetCrc = swap32(getInt());
int actualCrc = getCrc32(packet);
boolean isCrcOk = actualCrc == packetCrc;
if (!isCrcOk) {
logger.trace(String.format("%x", actualCrc) + " vs " + String.format("%x", packetCrc));
return null;
}
logger.trace("packet " + Arrays.toString(packet) + ": crc OK");
return packet;
}
public void addData(byte[] freshData) {
logger.info("IncomingDataBuffer: " + freshData.length + " byte(s) arrived");
synchronized (cbb) {