proxy progress

IT WORKS!
This commit is contained in:
rusefi 2020-07-17 20:19:52 -04:00
parent bc7b677563
commit 1045c24b9e
11 changed files with 131 additions and 77 deletions

View File

@ -2,26 +2,32 @@ package com.rusefi.io.tcp;
import com.opensr5.Logger; import com.opensr5.Logger;
import com.rusefi.binaryprotocol.BinaryProtocol; import com.rusefi.binaryprotocol.BinaryProtocol;
import com.rusefi.binaryprotocol.IncomingDataBuffer;
import com.rusefi.io.IoStream; import com.rusefi.io.IoStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
import java.util.function.Function; import java.util.function.Function;
import static com.rusefi.binaryprotocol.BinaryProtocolCommands.COMMAND_PROTOCOL; import static com.rusefi.binaryprotocol.BinaryProtocolCommands.COMMAND_PROTOCOL;
import static com.rusefi.config.generated.Fields.TS_PROTOCOL;
public class BinaryProtocolProxy { public class BinaryProtocolProxy {
public static void createProxy(IoStream targetEcuSocket, int serverProxyPort) { public static void createProxy(Logger logger, IoStream targetEcuSocket, int serverProxyPort) {
Function<Socket, Runnable> clientSocketRunnableFactory = new Function<Socket, Runnable>() { Function<Socket, Runnable> clientSocketRunnableFactory = new Function<Socket, Runnable>() {
@Override @Override
public Runnable apply(Socket clientSocket) { public Runnable apply(Socket clientSocket) {
return new Runnable() { return new Runnable() {
@Override @Override
public void run() { public void run() {
runProxy(targetEcuSocket, clientSocket); try {
TcpIoStream clientStream = new TcpIoStream(logger, clientSocket);
runProxy(targetEcuSocket, clientStream);
} catch (IOException e) {
e.printStackTrace();
}
} }
}; };
} }
@ -29,41 +35,31 @@ public class BinaryProtocolProxy {
BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Logger.CONSOLE, null); BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Logger.CONSOLE, null);
} }
private static void runProxy(IoStream targetEcu, Socket clientSocket) { public static void runProxy(IoStream targetEcu, IoStream clientStream) throws IOException {
/* /*
* Each client socket is running on it's own thread * Each client socket is running on it's own thread
*/ */
try {
DataInputStream clientInputStream = new DataInputStream(clientSocket.getInputStream());
DataOutputStream clientOutputStream = new DataOutputStream(clientSocket.getOutputStream());
while (true) { while (true) {
byte firstByte = clientInputStream.readByte(); byte firstByte = clientStream.getDataBuffer().readByte();
if (firstByte == COMMAND_PROTOCOL) { if (firstByte == COMMAND_PROTOCOL) {
BinaryProtocolServer.handleProtocolCommand(clientSocket); clientStream.write(TS_PROTOCOL.getBytes());
continue; continue;
}
proxyClientRequestToController(clientInputStream, firstByte, targetEcu);
proxyControllerResponseToClient(targetEcu, clientOutputStream);
} }
} catch (IOException e) { proxyClientRequestToController(clientStream.getDataBuffer(), firstByte, targetEcu);
e.printStackTrace();
proxyControllerResponseToClient(targetEcu, clientStream);
} }
} }
public static void proxyControllerResponseToClient(IoStream targetInputStream, DataOutputStream clientOutputStream) throws IOException { public static void proxyControllerResponseToClient(IoStream targetInputStream, IoStream clientOutputStream) throws IOException {
BinaryProtocolServer.Packet packet = targetInputStream.readPacket(); BinaryProtocolServer.Packet packet = targetInputStream.readPacket();
System.out.println("Relaying controller response length=" + packet.getPacket().length); System.out.println("Relaying controller response length=" + packet.getPacket().length);
// todo: replace with IoStream#sendPacket? clientOutputStream.sendPacket(packet);
clientOutputStream.writeShort(packet.getPacket().length);
clientOutputStream.write(packet.getPacket());
clientOutputStream.writeInt(packet.getCrc());
clientOutputStream.flush();
} }
private static void proxyClientRequestToController(DataInputStream in, byte firstByte, IoStream targetOutputStream) throws IOException { private static void proxyClientRequestToController(IncomingDataBuffer in, byte firstByte, IoStream targetOutputStream) throws IOException {
byte secondByte = in.readByte(); byte secondByte = in.readByte();
int length = firstByte * 256 + secondByte; int length = firstByte * 256 + secondByte;

View File

@ -13,5 +13,6 @@
<orderEntry type="library" exported="" name="httpclient" level="project" /> <orderEntry type="library" exported="" name="httpclient" level="project" />
<orderEntry type="library" exported="" name="annotations" level="project" /> <orderEntry type="library" exported="" name="annotations" level="project" />
<orderEntry type="library" exported="" scope="TEST" name="junit" level="project" /> <orderEntry type="library" exported="" scope="TEST" name="junit" level="project" />
<orderEntry type="library" name="javax.json" level="project" />
</component> </component>
</module> </module>

View File

@ -0,0 +1,46 @@
package com.rusefi;
import com.opensr5.Logger;
import com.rusefi.io.IoStream;
import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.server.ControllerInfo;
import com.rusefi.server.SessionDetails;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.net.Socket;
import static com.rusefi.tools.online.ProxyClient.LOCALHOST;
/**
* Connector between rusEFI ECU and rusEFI server
*/
public class NetworkConnector {
@NotNull
static SessionDetails runNetworkConnector(int serverPortForControllers, IoStream targetEcuSocket, final Logger logger, String authToken) throws IOException {
HelloCommand.send(targetEcuSocket, logger);
String controllerSignature = HelloCommand.getHelloResponse(targetEcuSocket.getDataBuffer(), logger);
// todo: request vehicle info from controller
ControllerInfo ci = new ControllerInfo("vehicle", "make", "code", controllerSignature);
SessionDetails deviceSessionDetails = new SessionDetails(ci, authToken, SessionDetails.createOneTimeCode());
BaseBroadcastingThread baseBroadcastingThread = new BaseBroadcastingThread(new Socket(LOCALHOST, serverPortForControllers),
deviceSessionDetails,
logger) {
@Override
protected void handleCommand(BinaryProtocolServer.Packet packet, TcpIoStream stream) throws IOException {
super.handleCommand(packet, stream);
targetEcuSocket.sendPacket(packet);
BinaryProtocolServer.Packet response = targetEcuSocket.readPacket();
stream.sendPacket(response);
}
};
baseBroadcastingThread.start();
return deviceSessionDetails;
}
}

View File

@ -9,16 +9,11 @@ import com.rusefi.io.ConnectionStateListener;
import com.rusefi.io.IoStream; import com.rusefi.io.IoStream;
import com.rusefi.io.LinkManager; import com.rusefi.io.LinkManager;
import com.rusefi.io.commands.HelloCommand; import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.BinaryProtocolProxy;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.server.*; import com.rusefi.server.*;
import com.rusefi.tools.online.ProxyClient; import com.rusefi.tools.online.ProxyClient;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.net.Socket;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -28,13 +23,14 @@ import java.util.function.Function;
import static com.rusefi.TestHelper.createIniField; import static com.rusefi.TestHelper.createIniField;
import static com.rusefi.TestHelper.prepareImage; import static com.rusefi.TestHelper.prepareImage;
import static com.rusefi.Timeouts.READ_IMAGE_TIMEOUT; import static com.rusefi.Timeouts.READ_IMAGE_TIMEOUT;
import static com.rusefi.tools.online.ProxyClient.LOCALHOST;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
/** /**
* integration test of the rusEFI online backend process * integration test of the rusEFI online backend process
* At the moment this test is very loose with timing it must be unreliable? * At the moment this test is very loose with timing it must be unreliable?
*
* https://github.com/rusefi/web_backend/blob/master/documentation/rusEFI%20remote.png
*/ */
public class ServerTest { public class ServerTest {
private final static Logger logger = Logger.CONSOLE; private final static Logger logger = Logger.CONSOLE;
@ -91,9 +87,9 @@ public class ServerTest {
CountDownLatch disconnectedCountDownLatch = new CountDownLatch(1); CountDownLatch disconnectedCountDownLatch = new CountDownLatch(1);
Backend backend = new Backend(userDetailsResolver, httpPort, logger) { Backend backend = new Backend(userDetailsResolver, httpPort, logger) {
@Override @Override
public void onDisconnectApplication() { protected void onDisconnectApplication() {
super.onDisconnectApplication(); super.onDisconnectApplication();
disconnectedCountDownLatch.countDown();; disconnectedCountDownLatch.countDown();
} }
}; };
@ -110,7 +106,7 @@ public class ServerTest {
} }
@Test @Test
public void testAuthenticatorConnect() throws InterruptedException, IOException { public void testAuthenticatorRequestUnknownSession() throws InterruptedException, IOException {
int serverPortForRemoteUsers = 6800; int serverPortForRemoteUsers = 6800;
Function<String, UserDetails> userDetailsResolver = authToken -> new UserDetails(authToken.substring(0, 5), authToken.charAt(6)); Function<String, UserDetails> userDetailsResolver = authToken -> new UserDetails(authToken.substring(0, 5), authToken.charAt(6));
@ -120,9 +116,9 @@ public class ServerTest {
Backend backend = new Backend(userDetailsResolver, httpPort, logger) { Backend backend = new Backend(userDetailsResolver, httpPort, logger) {
@Override @Override
public void onDisconnectApplication() { protected void onDisconnectApplication() {
super.onDisconnectApplication(); super.onDisconnectApplication();
disconnectedCountDownLatch.countDown();; disconnectedCountDownLatch.countDown();
} }
}; };
@ -143,14 +139,22 @@ public class ServerTest {
} }
@Test @Test
@Ignore
public void testRelayWorkflow() throws InterruptedException, IOException { public void testRelayWorkflow() throws InterruptedException, IOException {
ScalarIniField iniField = TestHelper.createIniField(Fields.CYLINDERSCOUNT); ScalarIniField iniField = TestHelper.createIniField(Fields.CYLINDERSCOUNT);
int value = 241; int value = 241;
int userId = 7;
Function<String, UserDetails> userDetailsResolver = authToken -> new UserDetails(authToken.substring(0, 5), authToken.charAt(6)); CountDownLatch controllerRegistered = new CountDownLatch(1);
Function<String, UserDetails> userDetailsResolver = authToken -> new UserDetails(authToken.substring(0, 5), userId);
int httpPort = 8001; int httpPort = 8001;
Backend backend = new Backend(userDetailsResolver, httpPort, logger); Backend backend = new Backend(userDetailsResolver, httpPort, logger) {
@Override
protected void onRegister(ControllerConnectionState controllerConnectionState) {
super.onRegister(controllerConnectionState);
controllerRegistered.countDown();
}
};
int serverPortForControllers = 7001; int serverPortForControllers = 7001;
int serverPortForRemoteUsers = 7003; int serverPortForRemoteUsers = 7003;
@ -165,7 +169,7 @@ public class ServerTest {
assertTrue(applicationServerCreated.await(READ_IMAGE_TIMEOUT, TimeUnit.MILLISECONDS)); assertTrue(applicationServerCreated.await(READ_IMAGE_TIMEOUT, TimeUnit.MILLISECONDS));
// create virtual controller // create virtual controller to which "rusEFI network connector" connects to
int controllerPort = 7002; int controllerPort = 7002;
ConfigurationImage controllerImage = prepareImage(value, createIniField(Fields.CYLINDERSCOUNT)); ConfigurationImage controllerImage = prepareImage(value, createIniField(Fields.CYLINDERSCOUNT));
CountDownLatch controllerCreated = new CountDownLatch(1); CountDownLatch controllerCreated = new CountDownLatch(1);
@ -173,45 +177,24 @@ public class ServerTest {
assertTrue(controllerCreated.await(READ_IMAGE_TIMEOUT, TimeUnit.MILLISECONDS)); assertTrue(controllerCreated.await(READ_IMAGE_TIMEOUT, TimeUnit.MILLISECONDS));
// start network broadcaster to connect controller with backend since in real life controller has only local serial port it does not have network // start "rusEFI network connector" to connect controller with backend since in real life controller has only local serial port it does not have network
IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort, logger); IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort, logger);
HelloCommand.send(targetEcuSocket, logger); SessionDetails deviceSessionDetails = NetworkConnector.runNetworkConnector(serverPortForControllers, targetEcuSocket, logger, MockRusEfiDevice.TEST_TOKEN_1);
String controllerSignature = HelloCommand.getHelloResponse(targetEcuSocket.getDataBuffer(), logger);
SessionDetails deviceSessionDetails = MockRusEfiDevice.createTestSession(MockRusEfiDevice.TEST_TOKEN_1, controllerSignature); assertTrue(controllerRegistered.await(READ_IMAGE_TIMEOUT, TimeUnit.MILLISECONDS));
BaseBroadcastingThread baseBroadcastingThread = new BaseBroadcastingThread(new Socket(LOCALHOST, serverPortForControllers),
deviceSessionDetails,
logger) {
@Override
protected void handleCommand(BinaryProtocolServer.Packet packet, TcpIoStream stream) throws IOException {
super.handleCommand(packet, stream);
targetEcuSocket.sendPacket(packet);
BinaryProtocolServer.Packet response = targetEcuSocket.readPacket();
stream.sendPacket(response);
}
};
baseBroadcastingThread.start();
SessionDetails authenticatorSessionDetails = new SessionDetails(deviceSessionDetails.getControllerInfo(), MockRusEfiDevice.TEST_TOKEN_3, deviceSessionDetails.getOneTimeToken()); SessionDetails authenticatorSessionDetails = new SessionDetails(deviceSessionDetails.getControllerInfo(), MockRusEfiDevice.TEST_TOKEN_3, deviceSessionDetails.getOneTimeToken());
ApplicationRequest applicationRequest = new ApplicationRequest(authenticatorSessionDetails, 123); ApplicationRequest applicationRequest = new ApplicationRequest(authenticatorSessionDetails, userId);
// start authenticator // start authenticator
int authenticatorPort = 7004; // local port on which authenticator accepts connections from Tuner Studio
IoStream authenticatorToProxyStream = TestHelper.connectToLocalhost(serverPortForRemoteUsers, logger); IoStream authenticatorToProxyStream = TestHelper.connectToLocalhost(serverPortForRemoteUsers, logger);
LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(logger, applicationRequest); LocalApplicationProxy.startAndRun(logger, applicationRequest, authenticatorPort, authenticatorToProxyStream);
localApplicationProxy.run(authenticatorToProxyStream);
// local port on which authenticator accepts connections from Tuner Studio
int authenticatorPort = 7004;
BinaryProtocolProxy.createProxy(authenticatorToProxyStream, authenticatorPort);
CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1); CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1);
// connect to proxy and read virtual controller through it // connect to proxy and read virtual controller through it
LinkManager clientManager = new LinkManager(logger); LinkManager clientManager = new LinkManager(logger);
clientManager.startAndConnect(ProxyClient.LOCALHOST + ":" + authenticatorPort, new ConnectionStateListener() { clientManager.startAndConnect(ProxyClient.LOCALHOST + ":" + authenticatorPort, new ConnectionStateListener() {

View File

@ -99,7 +99,7 @@ public class TcpCommunicationIntegrationTest {
// connect proxy to virtual controller // connect proxy to virtual controller
IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort, LOGGER); IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort, LOGGER);
BinaryProtocolProxy.createProxy(targetEcuSocket, proxyPort); BinaryProtocolProxy.createProxy(LOGGER, targetEcuSocket, proxyPort);
CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1); CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1);

View File

@ -3,6 +3,7 @@ package com.rusefi;
import com.opensr5.Logger; import com.opensr5.Logger;
import com.rusefi.io.IoStream; import com.rusefi.io.IoStream;
import com.rusefi.io.commands.HelloCommand; import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.BinaryProtocolProxy;
import com.rusefi.server.ApplicationRequest; import com.rusefi.server.ApplicationRequest;
import java.io.IOException; import java.io.IOException;
@ -16,6 +17,18 @@ public class LocalApplicationProxy {
this.applicationRequest = applicationRequest; this.applicationRequest = applicationRequest;
} }
/**
* @param applicationRequest remote session we want to connect to
* @param authenticatorPort local port we would bind for TunerStudio to connect to
* @param authenticatorToProxyStream
*/
static void startAndRun(Logger logger, ApplicationRequest applicationRequest, int authenticatorPort, IoStream authenticatorToProxyStream) throws IOException {
LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(logger, applicationRequest);
localApplicationProxy.run(authenticatorToProxyStream);
BinaryProtocolProxy.createProxy(logger, authenticatorToProxyStream, authenticatorPort);
}
public void run(IoStream authenticatorToProxyStream) throws IOException { public void run(IoStream authenticatorToProxyStream) throws IOException {
// right from connection push session authentication data // right from connection push session authentication data
new HelloCommand(logger, applicationRequest.toJson()).handle(authenticatorToProxyStream); new HelloCommand(logger, applicationRequest.toJson()).handle(authenticatorToProxyStream);

View File

@ -3,6 +3,7 @@ package com.rusefi.server;
import com.opensr5.Logger; import com.opensr5.Logger;
import com.rusefi.io.IoStream; import com.rusefi.io.IoStream;
import com.rusefi.io.commands.HelloCommand; import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.BinaryProtocolProxy;
import com.rusefi.io.tcp.BinaryProtocolServer; import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream; import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.tools.online.ProxyClient; import com.rusefi.tools.online.ProxyClient;
@ -77,12 +78,13 @@ public class Backend {
return new Runnable() { return new Runnable() {
@Override @Override
public void run() { public void run() {
IoStream stream = null; // connection from authenticator app which proxies for Tuner Studio
IoStream applicationClientStream = null;
try { try {
stream = new TcpIoStream(logger, applicationSocket); applicationClientStream = new TcpIoStream(logger, applicationSocket);
// authenticator pushed hello packet on connect // authenticator pushed hello packet on connect
String jsonString = HelloCommand.getHelloResponse(stream.getDataBuffer(), logger); String jsonString = HelloCommand.getHelloResponse(applicationClientStream.getDataBuffer(), logger);
if (jsonString == null) if (jsonString == null)
return; return;
ApplicationRequest applicationRequest = ApplicationRequest.valueOf(jsonString); ApplicationRequest applicationRequest = ApplicationRequest.valueOf(jsonString);
@ -94,13 +96,18 @@ public class Backend {
state = byId.get(controllerKey); state = byId.get(controllerKey);
} }
if (state == null) { if (state == null) {
stream.close(); applicationClientStream.close();
onDisconnectApplication(); onDisconnectApplication();
logger.info("No controller for " + controllerKey); logger.info("No controller for " + controllerKey);
return;
} }
BinaryProtocolProxy.runProxy(state.getStream(), applicationClientStream);
} catch (Throwable e) { } catch (Throwable e) {
if (stream != null) if (applicationClientStream != null)
stream.close(); applicationClientStream.close();
e.printStackTrace();
logger.error("Got error " + e); logger.error("Got error " + e);
onDisconnectApplication(); onDisconnectApplication();
} }
@ -110,7 +117,7 @@ public class Backend {
}, logger, parameter -> serverCreated.countDown()); }, logger, parameter -> serverCreated.countDown());
} }
public void onDisconnectApplication() { protected void onDisconnectApplication() {
logger.info("Disconnecting application"); logger.info("Disconnecting application");
} }
@ -126,7 +133,7 @@ public class Backend {
controllerConnectionState.requestControllerInfo(); controllerConnectionState.requestControllerInfo();
register(controllerConnectionState); register(controllerConnectionState);
controllerConnectionState.runEndlessLoop(); // controllerConnectionState.runEndlessLoop();
} catch (IOException e) { } catch (IOException e) {
close(controllerConnectionState); close(controllerConnectionState);
} }
@ -179,6 +186,10 @@ public class Backend {
clients.add(controllerConnectionState); clients.add(controllerConnectionState);
byId.put(controllerConnectionState.getControllerKey(), controllerConnectionState); byId.put(controllerConnectionState.getControllerKey(), controllerConnectionState);
} }
onRegister(controllerConnectionState);
}
protected void onRegister(ControllerConnectionState controllerConnectionState) {
} }
public void close(ControllerConnectionState inactiveClient) { public void close(ControllerConnectionState inactiveClient) {

View File

@ -43,6 +43,10 @@ public class ControllerConnectionState {
} }
} }
public IoStream getStream() {
return stream;
}
public ControllerKey getControllerKey() { public ControllerKey getControllerKey() {
return controllerKey; return controllerKey;
} }