so suddenly ServerTest is less reliable?

This commit is contained in:
rusefi 2020-07-19 00:07:37 -04:00
parent 3b5b8714dc
commit 158e5d1876
9 changed files with 37 additions and 17 deletions

View File

@ -22,6 +22,7 @@ import static com.rusefi.binaryprotocol.IoHelper.*;
@ThreadSafe @ThreadSafe
public class IncomingDataBuffer { public class IncomingDataBuffer {
private static final int BUFFER_SIZE = 32768; private static final int BUFFER_SIZE = 32768;
private static String loggingPrefix;
/** /**
* buffer for response bytes from controller * buffer for response bytes from controller
*/ */
@ -33,7 +34,8 @@ public class IncomingDataBuffer {
this.logger = logger; this.logger = logger;
} }
public static IncomingDataBuffer createDataBuffer(IoStream stream, Logger logger) { public static IncomingDataBuffer createDataBuffer(String loggingPrefix, IoStream stream, Logger logger) {
IncomingDataBuffer.loggingPrefix = loggingPrefix;
IncomingDataBuffer incomingData = new IncomingDataBuffer(logger); IncomingDataBuffer incomingData = new IncomingDataBuffer(logger);
stream.setInputListener(incomingData::addData); stream.setInputListener(incomingData::addData);
return incomingData; return incomingData;
@ -49,13 +51,13 @@ public class IncomingDataBuffer {
return null; return null;
int packetSize = swap16(getShort()); int packetSize = swap16(getShort());
logger.trace("Got packet size " + packetSize); logger.trace( loggingPrefix + "Got packet size " + packetSize);
if (packetSize < 0) if (packetSize < 0)
return null; return null;
if (!allowLongResponse && packetSize > Math.max(BinaryProtocolCommands.BLOCKING_FACTOR, Fields.TS_OUTPUT_SIZE) + 10) if (!allowLongResponse && packetSize > Math.max(BinaryProtocolCommands.BLOCKING_FACTOR, Fields.TS_OUTPUT_SIZE) + 10)
return null; return null;
isTimeout = waitForBytes(msg + " body", start, packetSize + 4); isTimeout = waitForBytes(loggingPrefix + msg + " body", start, packetSize + 4);
if (isTimeout) if (isTimeout)
return null; return null;
@ -148,28 +150,28 @@ public class IncomingDataBuffer {
} }
public byte readByte() throws IOException { public byte readByte() throws IOException {
boolean timeout = waitForBytes("readByte", System.currentTimeMillis(), 1); boolean timeout = waitForBytes(loggingPrefix + "readByte", System.currentTimeMillis(), 1);
if (timeout) if (timeout)
throw new IOException("Timeout in readByte"); throw new IOException("Timeout in readByte");
return (byte) getByte(); return (byte) getByte();
} }
public int readInt() throws EOFException { public int readInt() throws EOFException {
boolean timeout = waitForBytes("readInt", System.currentTimeMillis(), 4); boolean timeout = waitForBytes(loggingPrefix + "readInt", System.currentTimeMillis(), 4);
if (timeout) if (timeout)
throw new IllegalStateException("Timeout in readByte"); throw new IllegalStateException("Timeout in readByte");
return swap32(getInt()); return swap32(getInt());
} }
public short readShort() throws EOFException { public short readShort() throws EOFException {
boolean timeout = waitForBytes("readShort", System.currentTimeMillis(), 2); boolean timeout = waitForBytes(loggingPrefix + "readShort", System.currentTimeMillis(), 2);
if (timeout) if (timeout)
throw new IllegalStateException("Timeout in readShort"); throw new IllegalStateException("Timeout in readShort");
return (short) swap16(getShort()); return (short) swap16(getShort());
} }
public int read(byte[] packet) { public int read(byte[] packet) {
boolean timeout = waitForBytes("read", System.currentTimeMillis(), packet.length); boolean timeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length);
if (timeout) if (timeout)
throw new IllegalStateException("Timeout while waiting " + packet.length); throw new IllegalStateException("Timeout while waiting " + packet.length);
getData(packet); getData(packet);

View File

@ -53,7 +53,7 @@ public interface IoStream extends WriteStream {
packet = IoHelper.makeCrc32Packet(plainPacket); packet = IoHelper.makeCrc32Packet(plainPacket);
} }
// todo: verbose mode printHexBinary(plainPacket)) // todo: verbose mode printHexBinary(plainPacket))
logger.info("Sending packet " + BinaryProtocol.findCommand(plainPacket[0]) + " length=" + plainPacket.length); logger.info(getLoggingPrefix() + "Sending packet " + BinaryProtocol.findCommand(plainPacket[0]) + " length=" + plainPacket.length);
write(packet); write(packet);
} }
@ -66,6 +66,8 @@ public interface IoStream extends WriteStream {
void close(); void close();
String getLoggingPrefix();
IncomingDataBuffer getDataBuffer(); IncomingDataBuffer getDataBuffer();
default short readShort() throws EOFException { default short readShort() throws EOFException {

View File

@ -28,7 +28,12 @@ public class SerialIoStreamJSerialComm implements IoStream {
this.sp = sp; this.sp = sp;
this.port = port; this.port = port;
this.logger = logger; this.logger = logger;
this.dataBuffer = IncomingDataBuffer.createDataBuffer(this, logger); this.dataBuffer = IncomingDataBuffer.createDataBuffer("[serial] ", this, logger);
}
@Override
public String getLoggingPrefix() {
return "";
} }
@Override @Override

View File

@ -104,7 +104,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
@SuppressWarnings("InfiniteLoopStatement") @SuppressWarnings("InfiniteLoopStatement")
private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException { private void runProxy(LinkManager linkManager, Socket clientSocket) throws IOException {
TcpIoStream stream = new TcpIoStream(logger, clientSocket); TcpIoStream stream = new TcpIoStream("[proxy] ", logger, clientSocket);
IncomingDataBuffer in = stream.getDataBuffer(); IncomingDataBuffer in = stream.getDataBuffer();
@ -130,7 +130,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
byte command = payload[0]; byte command = payload[0];
System.out.println("Got [" + BinaryProtocol.findCommand(command)); System.out.println("Got command " + BinaryProtocol.findCommand(command));
if (command == Fields.TS_HELLO_COMMAND) { if (command == Fields.TS_HELLO_COMMAND) {
new HelloCommand(logger, Fields.TS_SIGNATURE).handle(stream); new HelloCommand(logger, Fields.TS_SIGNATURE).handle(stream);
@ -159,7 +159,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(payload, 1, payload.length - 1)); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(payload, 1, payload.length - 1));
int offset = swap16(dis.readShort()); int offset = swap16(dis.readShort());
int count = swap16(dis.readShort()); int count = swap16(dis.readShort());
System.out.println("TS_OUTPUT_COMMAND offset=" + offset + "/count=" + count); logger.info("TS_OUTPUT_COMMAND offset=" + offset + "/count=" + count);
byte[] response = new byte[1 + count]; byte[] response = new byte[1 + count];
response[0] = (byte) TS_OK.charAt(0); response[0] = (byte) TS_OK.charAt(0);

View File

@ -20,10 +20,16 @@ public class TcpIoStream implements IoStream {
private final InputStream input; private final InputStream input;
private final OutputStream output; private final OutputStream output;
private final Logger logger; private final Logger logger;
private final String loggingPrefix;
private boolean isClosed; private boolean isClosed;
private final IncomingDataBuffer dataBuffer; private final IncomingDataBuffer dataBuffer;
public TcpIoStream(Logger logger, Socket socket) throws IOException { public TcpIoStream(Logger logger, Socket socket) throws IOException {
this("", logger, socket);
}
public TcpIoStream(String loggingPrefix, Logger logger, Socket socket) throws IOException {
this.loggingPrefix = loggingPrefix;
InputStream input = new BufferedInputStream(socket.getInputStream()); InputStream input = new BufferedInputStream(socket.getInputStream());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
this.logger = logger; this.logger = logger;
@ -33,7 +39,12 @@ public class TcpIoStream implements IoStream {
throw new NullPointerException("output"); throw new NullPointerException("output");
this.output = output; this.output = output;
this.input = input; this.input = input;
this.dataBuffer = IncomingDataBuffer.createDataBuffer(this, logger); this.dataBuffer = IncomingDataBuffer.createDataBuffer(loggingPrefix, this, logger);
}
@Override
public String getLoggingPrefix() {
return loggingPrefix;
} }
@Override @Override

View File

@ -26,7 +26,7 @@ public class LocalApplicationProxy {
* @param authenticatorPort local port we would bind for TunerStudio to connect to * @param authenticatorPort local port we would bind for TunerStudio to connect to
*/ */
static void startAndRun(Logger logger, int serverPortForRemoteUsers, ApplicationRequest applicationRequest, int authenticatorPort) throws IOException { static void startAndRun(Logger logger, int serverPortForRemoteUsers, ApplicationRequest applicationRequest, int authenticatorPort) throws IOException {
IoStream authenticatorToProxyStream = new TcpIoStream(logger, rusEFISSLContext.getSSLSocket(HttpUtil.RUSEFI_PROXY_HOSTNAME, serverPortForRemoteUsers)); IoStream authenticatorToProxyStream = new TcpIoStream("authenticatorToProxyStream ", logger, rusEFISSLContext.getSSLSocket(HttpUtil.RUSEFI_PROXY_HOSTNAME, serverPortForRemoteUsers));
LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(logger, applicationRequest); LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(logger, applicationRequest);
localApplicationProxy.run(authenticatorToProxyStream); localApplicationProxy.run(authenticatorToProxyStream);

View File

@ -18,7 +18,7 @@ public class BaseBroadcastingThread {
private final Thread thread; private final Thread thread;
public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, Logger logger) throws IOException { public BaseBroadcastingThread(Socket socket, SessionDetails sessionDetails, Logger logger) throws IOException {
TcpIoStream stream = new TcpIoStream(logger, socket); TcpIoStream stream = new TcpIoStream("[broadcast] ", logger, socket);
IncomingDataBuffer in = stream.getDataBuffer(); IncomingDataBuffer in = stream.getDataBuffer();
thread = new Thread(() -> { thread = new Thread(() -> {

View File

@ -106,7 +106,7 @@ public class Backend implements Closeable {
// connection from authenticator app which proxies for Tuner Studio // connection from authenticator app which proxies for Tuner Studio
IoStream applicationClientStream = null; IoStream applicationClientStream = null;
try { try {
applicationClientStream = new TcpIoStream(logger, applicationSocket); applicationClientStream = new TcpIoStream("[app] ", logger, applicationSocket);
// authenticator pushed hello packet on connect // authenticator pushed hello packet on connect
String jsonString = HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer(), logger); String jsonString = HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer(), logger);

View File

@ -35,7 +35,7 @@ public class ControllerConnectionState {
this.logger = logger; this.logger = logger;
this.userDetailsResolver = userDetailsResolver; this.userDetailsResolver = userDetailsResolver;
try { try {
stream = new TcpIoStream(logger, clientSocket); stream = new TcpIoStream("[controller] ", logger, clientSocket);
incomingData = stream.getDataBuffer(); incomingData = stream.getDataBuffer();
} catch (IOException e) { } catch (IOException e) {
close(); close();