proxy progress

This commit is contained in:
rusefi 2020-07-17 16:34:08 -04:00
parent 78629835df
commit bc7b677563
10 changed files with 287 additions and 84 deletions

View File

@ -12,10 +12,7 @@ import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.BinaryProtocolProxy;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.server.Backend;
import com.rusefi.server.ClientConnectionState;
import com.rusefi.server.SessionDetails;
import com.rusefi.server.UserDetails;
import com.rusefi.server.*;
import com.rusefi.tools.online.ProxyClient;
import org.junit.Ignore;
import org.junit.Test;
@ -43,7 +40,7 @@ public class ServerTest {
private final static Logger logger = Logger.CONSOLE;
@Test
public void testSessionTimeout() throws InterruptedException, IOException {
public void testControllerSessionTimeout() throws InterruptedException, IOException {
int serverPort = 7000;
int httpPort = 8000;
Function<String, UserDetails> userDetailsResolver = authToken -> new UserDetails(authToken.substring(0, 5), authToken.charAt(6));
@ -54,13 +51,13 @@ public class ServerTest {
Backend backend = new Backend(userDetailsResolver, httpPort, logger) {
@Override
public void register(ClientConnectionState clientConnectionState) {
public void register(ControllerConnectionState clientConnectionState) {
super.register(clientConnectionState);
onConnected.countDown();
}
@Override
public void close(ClientConnectionState inactiveClient) {
public void close(ControllerConnectionState inactiveClient) {
super.close(inactiveClient);
if (getCount() == 0)
allClientsDisconnected.countDown();
@ -77,14 +74,72 @@ public class ServerTest {
assertTrue(onConnected.await(30, TimeUnit.SECONDS));
List<ClientConnectionState> clients = backend.getClients();
List<ControllerConnectionState> clients = backend.getClients();
assertEquals(2, clients.size());
List<UserDetails> onlineUsers = ProxyClient.getOnlineUsers(httpPort);
assertEquals(2, onlineUsers.size());
assertTrue(allClientsDisconnected.await(30, TimeUnit.SECONDS));
}
@Test
public void testInvalidApplicationRequest() throws InterruptedException, IOException {
Function<String, UserDetails> userDetailsResolver = authToken -> new UserDetails(authToken.substring(0, 5), authToken.charAt(6));
int httpPort = 8001;
int serverPortForRemoteUsers = 6801;
CountDownLatch disconnectedCountDownLatch = new CountDownLatch(1);
Backend backend = new Backend(userDetailsResolver, httpPort, logger) {
@Override
public void onDisconnectApplication() {
super.onDisconnectApplication();
disconnectedCountDownLatch.countDown();;
}
};
CountDownLatch applicationServerCreated = new CountDownLatch(1);
backend.runApplicationConnector(serverPortForRemoteUsers, applicationServerCreated);
assertTrue(applicationServerCreated.await(READ_IMAGE_TIMEOUT, TimeUnit.MILLISECONDS));
// start authenticator
IoStream authenticatorToProxyStream = TestHelper.connectToLocalhost(serverPortForRemoteUsers, logger);
new HelloCommand(logger, "hello").handle(authenticatorToProxyStream);
assertTrue(disconnectedCountDownLatch.await(30, TimeUnit.SECONDS));
backend.close();
}
@Test
public void testAuthenticatorConnect() throws InterruptedException, IOException {
int serverPortForRemoteUsers = 6800;
Function<String, UserDetails> userDetailsResolver = authToken -> new UserDetails(authToken.substring(0, 5), authToken.charAt(6));
int httpPort = 8001;
CountDownLatch disconnectedCountDownLatch = new CountDownLatch(1);
Backend backend = new Backend(userDetailsResolver, httpPort, logger) {
@Override
public void onDisconnectApplication() {
super.onDisconnectApplication();
disconnectedCountDownLatch.countDown();;
}
};
CountDownLatch applicationServerCreated = new CountDownLatch(1);
backend.runApplicationConnector(serverPortForRemoteUsers, applicationServerCreated);
assertTrue(applicationServerCreated.await(READ_IMAGE_TIMEOUT, TimeUnit.MILLISECONDS));
SessionDetails sessionDetails = MockRusEfiDevice.createTestSession(MockRusEfiDevice.TEST_TOKEN_1, Fields.TS_SIGNATURE);
ApplicationRequest applicationRequest = new ApplicationRequest(sessionDetails, 123);
// start authenticator
IoStream authenticatorToProxyStream = TestHelper.connectToLocalhost(serverPortForRemoteUsers, logger);
LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(logger, applicationRequest);
localApplicationProxy.run(authenticatorToProxyStream);
assertTrue(disconnectedCountDownLatch.await(30, TimeUnit.SECONDS));
backend.close();
}
@Test
@ -119,7 +174,7 @@ public class ServerTest {
// start network broadcaster to connect controller with backend since in real life controller has only local serial port it does not have network
IoStream targetEcuSocket = TestHelper.createTestStream(controllerPort, logger);
IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort, logger);
HelloCommand.send(targetEcuSocket, logger);
String controllerSignature = HelloCommand.getHelloResponse(targetEcuSocket.getDataBuffer(), logger);
@ -140,12 +195,13 @@ public class ServerTest {
baseBroadcastingThread.start();
SessionDetails authenticatorSessionDetails = new SessionDetails(deviceSessionDetails.getControllerInfo(), MockRusEfiDevice.TEST_TOKEN_3, deviceSessionDetails.getOneTimeToken());
ApplicationRequest applicationRequest = new ApplicationRequest(authenticatorSessionDetails, 123);
// start authenticator
IoStream authenticatorToProxyStream = TestHelper.createTestStream(serverPortForControllers, logger);
// right from connection push session authentication data
new HelloCommand(logger, authenticatorSessionDetails.toJson()).handle(authenticatorToProxyStream);
IoStream authenticatorToProxyStream = TestHelper.connectToLocalhost(serverPortForRemoteUsers, logger);
LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(logger, applicationRequest);
localApplicationProxy.run(authenticatorToProxyStream);
// local port on which authenticator accepts connections from Tuner Studio
@ -177,6 +233,7 @@ public class ServerTest {
String clientValue = iniField.getValue(clientImage);
assertEquals(Double.toString(value), clientValue);
backend.close();
}

View File

@ -46,7 +46,7 @@ public class TestHelper {
}
@NotNull
public static IoStream createTestStream(int controllerPort, Logger logger) {
public static IoStream connectToLocalhost(int controllerPort, Logger logger) {
IoStream targetEcuSocket;
try {
targetEcuSocket = new TcpIoStream(logger, new Socket(ProxyClient.LOCALHOST, controllerPort));

View File

@ -98,7 +98,7 @@ public class TcpCommunicationIntegrationTest {
// connect proxy to virtual controller
IoStream targetEcuSocket = TestHelper.createTestStream(controllerPort, LOGGER);
IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort, LOGGER);
BinaryProtocolProxy.createProxy(targetEcuSocket, proxyPort);
CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1);

View File

@ -0,0 +1,23 @@
package com.rusefi;
import com.opensr5.Logger;
import com.rusefi.io.IoStream;
import com.rusefi.io.commands.HelloCommand;
import com.rusefi.server.ApplicationRequest;
import java.io.IOException;
public class LocalApplicationProxy {
private final Logger logger;
private final ApplicationRequest applicationRequest;
public LocalApplicationProxy(Logger logger, ApplicationRequest applicationRequest) {
this.logger = logger;
this.applicationRequest = applicationRequest;
}
public void run(IoStream authenticatorToProxyStream) throws IOException {
// right from connection push session authentication data
new HelloCommand(logger, applicationRequest.toJson()).handle(authenticatorToProxyStream);
}
}

View File

@ -0,0 +1,60 @@
package com.rusefi.server;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import java.io.StringReader;
import java.util.Objects;
public class ApplicationRequest {
private static final String SESSION = "session";
private static final String USER_ID = "user_id";
private final SessionDetails sessionDetails;
private final int targetUserId;
public ApplicationRequest(SessionDetails sessionDetails, int targetUserId) {
this.sessionDetails = sessionDetails;
this.targetUserId = targetUserId;
}
public SessionDetails getSessionDetails() {
return sessionDetails;
}
public int getTargetUserId() {
return targetUserId;
}
public String toJson() {
JsonObject jsonObject = Json.createObjectBuilder()
.add(SESSION, sessionDetails.toJson())
.add(USER_ID, targetUserId)
.build();
return jsonObject.toString();
}
public static ApplicationRequest valueOf(String jsonString) {
JsonReader reader = Json.createReader(new StringReader(jsonString));
JsonObject jsonObject = reader.readObject();
int targetUserId = jsonObject.getInt(USER_ID);
SessionDetails session = SessionDetails.valueOf(jsonObject.getString(SESSION));
return new ApplicationRequest(session, targetUserId);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ApplicationRequest that = (ApplicationRequest) o;
return targetUserId == that.targetUserId &&
sessionDetails.equals(that.sessionDetails);
}
@Override
public int hashCode() {
return Objects.hash(sessionDetails, targetUserId);
}
}

View File

@ -10,7 +10,6 @@ import org.jetbrains.annotations.NotNull;
import org.takes.Take;
import org.takes.facets.fork.FkRegex;
import org.takes.facets.fork.TkFork;
import org.takes.http.Exit;
import org.takes.http.FtBasic;
import org.takes.rs.RsJson;
@ -19,10 +18,7 @@ import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
@ -33,6 +29,46 @@ public class Backend {
private final FkRegex showOnlineUsers = new FkRegex(ProxyClient.LIST_PATH,
(Take) req -> getUsersOnline()
);
private boolean isClosed;
// guarded by own monitor
private final Set<ControllerConnectionState> clients = new HashSet<>();
// guarded by clients
private HashMap<ControllerKey, ControllerConnectionState> byId = new HashMap<>();
// private final int clientTimeout;
private final Function<String, UserDetails> userDetailsResolver;
private final Logger logger;
public Backend(Function<String, UserDetails> userDetailsResolver, int httpPort, Logger logger) {
// this.clientTimeout = clientTimeout;
this.userDetailsResolver = userDetailsResolver;
this.logger = logger;
new Thread(() -> {
try {
new FtBasic(
new TkFork(showOnlineUsers,
new FkRegex(VERSION_PATH, BACKEND_VERSION),
new FkRegex("/", "<a href='https://rusefi.com/online/'>rusEFI Online</a>")
), httpPort
).start(() -> isClosed);
logger.info("Shutting down backend on port " + httpPort);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}, "Http Server Thread").start();
// new Thread(() -> {
// while (true) {
// runCleanup();
// sleep(clientTimeout);
// }
// }, "rusEFI Server Cleanup").start();
}
public void runApplicationConnector(int serverPortForApplications, CountDownLatch serverCreated) {
BinaryProtocolServer.tcpServerSocket(serverPortForApplications, "ApplicationServer", new Function<Socket, Runnable>() {
@ -49,12 +85,24 @@ public class Backend {
String jsonString = HelloCommand.getHelloResponse(stream.getDataBuffer(), logger);
if (jsonString == null)
return;
SessionDetails applicationSession = SessionDetails.valueOf(jsonString);
logger.info("Application Connected: " + applicationSession);
ApplicationRequest applicationRequest = ApplicationRequest.valueOf(jsonString);
logger.info("Application Connected: " + applicationRequest);
} catch (IOException e) {
ControllerKey controllerKey = new ControllerKey(applicationRequest.getTargetUserId(), applicationRequest.getSessionDetails().getControllerInfo());
ControllerConnectionState state;
synchronized (clients) {
state = byId.get(controllerKey);
}
if (state == null) {
stream.close();
onDisconnectApplication();
logger.info("No controller for " + controllerKey);
}
} catch (Throwable e) {
if (stream != null)
stream.close();
logger.error("Got error " + e);
onDisconnectApplication();
}
}
};
@ -62,6 +110,10 @@ public class Backend {
}, logger, parameter -> serverCreated.countDown());
}
public void onDisconnectApplication() {
logger.info("Disconnecting application");
}
public void runControllerConnector(int serverPortForControllers, CountDownLatch serverCreated) {
BinaryProtocolServer.tcpServerSocket(serverPortForControllers, "ControllerServer", new Function<Socket, Runnable>() {
@Override
@ -69,14 +121,14 @@ public class Backend {
return new Runnable() {
@Override
public void run() {
ClientConnectionState clientConnectionState = new ClientConnectionState(controllerSocket, logger, getUserDetailsResolver());
ControllerConnectionState controllerConnectionState = new ControllerConnectionState(controllerSocket, logger, getUserDetailsResolver());
try {
clientConnectionState.requestControllerInfo();
controllerConnectionState.requestControllerInfo();
register(clientConnectionState);
clientConnectionState.runEndlessLoop();
register(controllerConnectionState);
controllerConnectionState.runEndlessLoop();
} catch (IOException e) {
close(clientConnectionState);
close(controllerConnectionState);
}
}
};
@ -87,8 +139,8 @@ public class Backend {
@NotNull
private RsJson getUsersOnline() throws IOException {
JsonArrayBuilder builder = Json.createArrayBuilder();
List<ClientConnectionState> clients = getClients();
for (ClientConnectionState client : clients) {
List<ControllerConnectionState> clients = getClients();
for (ControllerConnectionState client : clients) {
JsonObject clientObject = Json.createObjectBuilder()
.add(UserDetails.USER_ID, client.getUserDetails().getUserId())
@ -100,44 +152,6 @@ public class Backend {
return new RsJson(builder.build());
}
// guarded by own monitor
private final Set<ClientConnectionState> clients = new HashSet<>();
// private final int clientTimeout;
private final Function<String, UserDetails> userDetailsResolver;
private final Logger logger;
public Backend(Function<String, UserDetails> userDetailsResolver, int httpPort, Logger logger) {
// this.clientTimeout = clientTimeout;
this.userDetailsResolver = userDetailsResolver;
this.logger = logger;
new Thread(new Runnable() {
@Override
public void run() {
try {
new FtBasic(
new TkFork(showOnlineUsers,
new FkRegex(VERSION_PATH, BACKEND_VERSION),
new FkRegex("/", "<a href='https://rusefi.com/online/'>rusEFI Online</a>")
), httpPort
).start(Exit.NEVER);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}).start();
// new Thread(() -> {
// while (true) {
// runCleanup();
// sleep(clientTimeout);
// }
// }, "rusEFI Server Cleanup").start();
}
public Function<String, UserDetails> getUserDetailsResolver() {
return userDetailsResolver;
}
@ -159,21 +173,28 @@ public class Backend {
//
// }
public void register(ClientConnectionState clientConnectionState) {
public void register(ControllerConnectionState controllerConnectionState) {
Objects.requireNonNull(controllerConnectionState.getControllerKey(), "ControllerKey");
synchronized (clients) {
clients.add(clientConnectionState);
clients.add(controllerConnectionState);
byId.put(controllerConnectionState.getControllerKey(), controllerConnectionState);
}
}
public void close(ClientConnectionState inactiveClient) {
public void close(ControllerConnectionState inactiveClient) {
inactiveClient.close();
synchronized (clients) {
// in case of exception in the initialization phase we do not even add client into the the collection
clients.remove(inactiveClient);
byId.remove(inactiveClient.getControllerKey());
}
}
public List<ClientConnectionState> getClients() {
public void close() {
isClosed = true;
}
public List<ControllerConnectionState> getClients() {
synchronized (clients) {
return new ArrayList<>(clients);
}

View File

@ -13,7 +13,7 @@ import java.io.IOException;
import java.net.Socket;
import java.util.function.Function;
public class ClientConnectionState {
public class ControllerConnectionState {
private final Socket clientSocket;
private final Logger logger;
private final Function<String, UserDetails> userDetailsResolver;
@ -29,8 +29,9 @@ public class ClientConnectionState {
* user info from rusEFI database based on auth token
*/
private UserDetails userDetails;
private ControllerKey controllerKey;
public ClientConnectionState(Socket clientSocket, Logger logger, Function<String, UserDetails> userDetailsResolver) {
public ControllerConnectionState(Socket clientSocket, Logger logger, Function<String, UserDetails> userDetailsResolver) {
this.clientSocket = clientSocket;
this.logger = logger;
this.userDetailsResolver = userDetailsResolver;
@ -42,10 +43,9 @@ public class ClientConnectionState {
}
}
// public long getLastActivityTimestamp() {
// return lastActivityTimestamp;
// }
public ControllerKey getControllerKey() {
return controllerKey;
}
public boolean isClosed() {
return isClosed;
@ -67,6 +67,7 @@ public class ClientConnectionState {
logger.info(sessionDetails.getAuthToken() + " New client: " + sessionDetails.getControllerInfo());
userDetails = userDetailsResolver.apply(sessionDetails.getAuthToken());
controllerKey = new ControllerKey(userDetails.getUserId(), sessionDetails.getControllerInfo());
logger.info("User " + userDetails);
}

View File

@ -0,0 +1,35 @@
package com.rusefi.server;
import java.util.Objects;
public class ControllerKey {
private final int userId;
private final ControllerInfo controllerInfo;
public ControllerKey(int userId, ControllerInfo controllerInfo) {
this.userId = userId;
this.controllerInfo = controllerInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ControllerKey that = (ControllerKey) o;
return userId == that.userId &&
controllerInfo.equals(that.controllerInfo);
}
@Override
public int hashCode() {
return Objects.hash(userId, controllerInfo);
}
@Override
public String toString() {
return "ControllerKey{" +
"userId=" + userId +
", controllerInfo=" + controllerInfo +
'}';
}
}

View File

@ -8,9 +8,9 @@ import java.util.Objects;
import java.util.Random;
public class SessionDetails {
public static final String ONE_TIME_TOKEN = "oneTime";
public static final String AUTH_TOKEN = "authToken";
public static final String CONTROLLER = "controller";
private static final String ONE_TIME_TOKEN = "oneTime";
private static final String AUTH_TOKEN = "authToken";
private static final String CONTROLLER = "controller";
private final ControllerInfo controllerInfo;
@ -62,7 +62,6 @@ public class SessionDetails {
return new SessionDetails(controllerInfo, authToken, oneTimeCode);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -8,14 +8,21 @@ public class SessionDetailsTest {
@Test
public void testSerialization() {
ControllerInfo ci = new ControllerInfo("name", "make", "code", "sign");
SessionDetails sd = new SessionDetails(ci, "auth", 123);
String json = sd.toJson();
SessionDetails fromJson = SessionDetails.valueOf(json);
assertEquals(sd, fromJson);
}
@Test
public void testApplcationRequest() {
ControllerInfo ci = new ControllerInfo("name", "make", "code", "sign");
SessionDetails sd = new SessionDetails(ci, "auth", 123);
ApplicationRequest ar = new ApplicationRequest(sd, 321);
String json = ar.toJson();
ApplicationRequest fromJson = ApplicationRequest.valueOf(json);
assertEquals(ar, fromJson);
}
}