proxy progress: PROTOCOL bugfix and unit test

This commit is contained in:
rusefi 2020-07-30 23:31:19 -04:00
parent 4c0a4b045d
commit e84d8e10fc
6 changed files with 85 additions and 24 deletions

View File

@ -101,8 +101,10 @@ public class BinaryProtocol implements BinaryProtocolCommands {
return "WRITE_CHUNK";
case Fields.TS_OUTPUT_COMMAND:
return "TS_OUTPUT_COMMAND";
case Fields.TS_RESPONSE_OK:
return "TS_RESPONSE_OK";
default:
return "command " + (char) +command + "/" + command;
return "command " + (char) command + "/" + command;
}
}

View File

@ -205,7 +205,7 @@ public class IncomingDataBuffer {
public void read(byte[] packet) throws EOFException {
boolean isTimeout = waitForBytes(loggingPrefix + "read", System.currentTimeMillis(), packet.length);
if (isTimeout)
throw new EOFException("Timeout while waiting " + packet.length);
throw new EOFException("Timeout while waiting for " + packet.length + " byte(s)");
getData(packet);
}
}

View File

@ -51,6 +51,7 @@ public class BinaryProtocolProxy {
byte firstByte = clientStream.getDataBuffer().readByte(timeoutMs);
if (firstByte == COMMAND_PROTOCOL) {
clientStream.write(TS_PROTOCOL.getBytes());
clientStream.flush();
continue;
}
BinaryProtocolServer.Packet clientRequest = readClientRequest(clientStream.getDataBuffer(), firstByte);

View File

@ -206,6 +206,7 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
byte first = in.readByte(ioTimeout);
if (first == COMMAND_PROTOCOL) {
protocolCommandHandler.handle();
return 0;
}
return first * 256 + in.readByte(ioTimeout);
}

View File

@ -13,8 +13,6 @@ import java.io.IOException;
import java.net.Socket;
import static com.devexperts.logging.Logging.getLogging;
import static com.rusefi.io.tcp.BinaryProtocolServer.getPacketLength;
import static com.rusefi.io.tcp.BinaryProtocolServer.readPromisedBytes;
import static com.rusefi.shared.FileUtil.close;
public class BaseBroadcastingThread {
@ -38,10 +36,10 @@ public class BaseBroadcastingThread {
} else {
ioTimeout = context.consecutivePacketTimeout();
}
int length = getPacketLength(in, () -> {
int length = BinaryProtocolServer.getPacketLength(in, () -> {
throw new UnsupportedOperationException();
}, ioTimeout);
BinaryProtocolServer.Packet packet = readPromisedBytes(in, length);
BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(in, length);
byte[] payload = packet.getPacket();
byte command = payload[0];

View File

@ -3,6 +3,7 @@ package com.rusefi.proxy.client;
import com.rusefi.BackendTestHelper;
import com.rusefi.TestHelper;
import com.rusefi.Timeouts;
import com.rusefi.binaryprotocol.BinaryProtocolCommands;
import com.rusefi.config.generated.Fields;
import com.rusefi.io.IoStream;
import com.rusefi.io.commands.GetOutputsCommand;
@ -19,16 +20,20 @@ import org.junit.Test;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static com.rusefi.BackendTestHelper.createTestUserResolver;
import static com.rusefi.TestHelper.TEST_TOKEN_1;
import static com.rusefi.TestHelper.assertLatch;
import static com.rusefi.TestHelper.*;
import static com.rusefi.Timeouts.SECOND;
import static com.rusefi.binaryprotocol.BinaryProtocol.findCommand;
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
import static com.rusefi.config.generated.Fields.TS_PROTOCOL;
import static com.rusefi.io.tcp.BinaryProtocolServer.getPacketLength;
import static com.rusefi.shared.FileUtil.close;
import static org.junit.Assert.*;
public class LocalApplicationProxyTest {
private static final AtomicInteger portNumber = new AtomicInteger(4000);
@ -59,12 +64,67 @@ public class LocalApplicationProxyTest {
mockBackend.close();
}
@Test
public void testCommandProtocol() throws IOException, InterruptedException {
LocalApplicationProxyContext context = createLocalApplicationProxy();
CountDownLatch gaugePokes = new CountDownLatch(3);
try (ServerSocketReference ignored1 = createMockBackend(context, gaugePokes)) {
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
try (ServerSocketReference ignored2 = LocalApplicationProxy.startAndRun(context, applicationRequest, -1, TcpIoStream.DisconnectListener.VOID, LocalApplicationProxy.ConnectionListener.VOID)) {
TcpIoStream applicationConnection = new TcpIoStream("mock application ", new Socket(LOCALHOST, context.authenticatorPort()));
byte[] protocolResponse = new byte[TS_PROTOCOL.length()];
// request
applicationConnection.write(new byte[] {BinaryProtocolCommands.COMMAND_PROTOCOL});
applicationConnection.flush();
// response
applicationConnection.getDataBuffer().read(protocolResponse);
assertArrayEquals(protocolResponse, TS_PROTOCOL.getBytes());
// request again
applicationConnection.write(new byte[] {BinaryProtocolCommands.COMMAND_PROTOCOL});
applicationConnection.flush();
// response again
applicationConnection.getDataBuffer().read(protocolResponse);
assertArrayEquals(protocolResponse, TS_PROTOCOL.getBytes());
byte[] commandPacket = GetOutputsCommand.createRequest();
applicationConnection.sendPacket(commandPacket);
BinaryProtocolServer.Packet response = applicationConnection.readPacket();
assertEquals(Fields.TS_OUTPUT_SIZE + 1, response.getPacket().length);
}
}
}
@Test
public void testGaugePoking() throws IOException, InterruptedException {
LocalApplicationProxyContext context = createLocalApplicationProxy();
CountDownLatch gaugePokes = new CountDownLatch(3);
try (ServerSocketReference ignored1 = createMockBackend(context, gaugePokes)) {
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
CountDownLatch disconnected = new CountDownLatch(1);
try (ServerSocketReference ignored2 = LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID)) {
// wait for three output requests to take place
assertLatch("gaugePokes", gaugePokes);
// but there must be a disconnect after some time
assertLatch("disconnected", disconnected);
}
}
}
@NotNull
private static ServerSocketReference createMockBackend(LocalApplicationProxyContext context, CountDownLatch gaugePokes) throws IOException, InterruptedException {
CountDownLatch backendCreated = new CountDownLatch(1);
ServerSocketReference mockBackend = BinaryProtocolServer.tcpServerSocket(context.serverPortForRemoteApplications(), "localAppTest", socket -> () -> {
try {
@ -73,11 +133,23 @@ public class LocalApplicationProxyTest {
HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer());
while (!socket.isClosed()) {
BinaryProtocolServer.Packet packet = applicationClientStream.readPacket();
AtomicBoolean handled = new AtomicBoolean();
int length = getPacketLength(applicationClientStream.getDataBuffer(), new BinaryProtocolServer.Handler() {
@Override
public void handle() throws IOException {
applicationClientStream.write(TS_PROTOCOL.getBytes());
handled.set(true);
}
});
if (handled.get())
continue;
BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(applicationClientStream.getDataBuffer(), length);
System.out.println("Got packet " + findCommand(packet.getPacket()[0]));
if (packet.getPacket().length != 5)
throw new IllegalStateException();
throw new IllegalStateException("Unexpected length " + packet.getPacket().length);
GetOutputsCommand.sendOutput(applicationClientStream);
gaugePokes.countDown();
@ -89,20 +161,7 @@ public class LocalApplicationProxyTest {
}, parameter -> backendCreated.countDown());
assertLatch(backendCreated);
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, createTestUserResolver().apply(TEST_TOKEN_1));
CountDownLatch disconnected = new CountDownLatch(1);
LocalApplicationProxy.startAndRun(context, applicationRequest, -1, (String message) -> disconnected.countDown(), LocalApplicationProxy.ConnectionListener.VOID);
// wait for three output requests to take place
assertLatch("gaugePokes", gaugePokes);
// but there must be a disconnect after some time
assertLatch("disconnected", disconnected);
mockBackend.close();
return mockBackend;
}
@NotNull