proxy progress

This commit is contained in:
rusefi 2020-07-11 22:30:56 -04:00
parent 15d0e29436
commit 439dbaff55
19 changed files with 452 additions and 155 deletions

View File

@ -0,0 +1,9 @@
<component name="libraryTable">
<library name="javax.json">
<CLASSES>
<root url="jar://$PROJECT_DIR$/lib/server/javax.json.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</component>

View File

@ -0,0 +1,12 @@
<component name="libraryTable">
<library name="takes">
<CLASSES>
<root url="jar://$PROJECT_DIR$/lib/server/cactoos.jar!/" />
<root url="jar://$PROJECT_DIR$/lib/server/takes.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$PROJECT_DIR$/lib/server/takes-sources.jar!/" />
</SOURCES>
</library>
</component>

View File

@ -11,6 +11,7 @@
<module fileurl="file://$PROJECT_DIR$/logging/logging.iml" filepath="$PROJECT_DIR$/logging/logging.iml" />
<module fileurl="file://$PROJECT_DIR$/logging-api/logging-api.iml" filepath="$PROJECT_DIR$/logging-api/logging-api.iml" />
<module fileurl="file://$PROJECT_DIR$/models/models.iml" filepath="$PROJECT_DIR$/models/models.iml" />
<module fileurl="file://$PROJECT_DIR$/../java_tools/proxy_server/proxy_server.iml" filepath="$PROJECT_DIR$/../java_tools/proxy_server/proxy_server.iml" />
<module fileurl="file://$PROJECT_DIR$/romraider/romraider.iml" filepath="$PROJECT_DIR$/romraider/romraider.iml" />
<module fileurl="file://$PROJECT_DIR$/shared_io/shared_io.iml" filepath="$PROJECT_DIR$/shared_io/shared_io.iml" />
<module fileurl="file://$PROJECT_DIR$/shared_ui/shared_ui.iml" filepath="$PROJECT_DIR$/shared_ui/shared_ui.iml" />

View File

@ -10,5 +10,6 @@
<orderEntry type="module" module-name="io" />
<orderEntry type="module" module-name="models" />
<orderEntry type="module" module-name="logging" />
<orderEntry type="library" name="javax.json" level="project" />
</component>
</module>

View File

@ -1,6 +1,7 @@
<project default="jar">
<property name="jar_file_folder" value="../java_console_binary"/>
<property name="jar_file" value="${jar_file_folder}/rusefi_console.jar"/>
<property name="lib_list" value="../java_tools/configuration_definition/lib/snakeyaml.jar:lib/json-simple-1.1.1.jar:lib/server/javax.json.jar:lib/server/cactoos.jar:lib/server/takes.jar:lib/json-simple-1.1.1.jar:lib/jaxb-api.jar:lib/httpclient.jar:lib/httpmime.jar:lib/httpcore.jar:lib/jSerialComm.jar:lib/jcip-annotations-1.0.jar:lib/jlatexmath-1.0.6.jar:lib/swing-layout-1.0.jar:lib/jep.jar:lib/log4j.jar:lib/junit.jar:lib/SteelSeries-3.9.30.jar:lib/annotations.jar:lib/miglayout-4.0.jar:lib/surfaceplotter-2.0.1.jar"/>
<target name="clean">
<delete dir="build"/>
@ -39,11 +40,13 @@
<target name="compile">
<mkdir dir="build/classes"/>
<javac debug="yes" destdir="build/classes"
classpath="../java_tools/configuration_definition/lib/snakeyaml.jar:lib/json-simple-1.1.1.jar:lib/jaxb-api.jar:lib/httpclient.jar:lib/httpmime.jar:lib/httpcore.jar:lib/jSerialComm.jar:lib/jcip-annotations-1.0.jar:lib/jlatexmath-1.0.6.jar:lib/swing-layout-1.0.jar:lib/jep.jar:lib/log4j.jar:lib/junit.jar:lib/SteelSeries-3.9.30.jar:lib/annotations.jar:lib/miglayout-4.0.jar:lib/surfaceplotter-2.0.1.jar">
classpath="${lib_list}">
<src path="autotest/src"/>
<src path="autoupdate/src"/>
<src path="../java_tools/configuration_definition/src"/>
<src path="../java_tools/enum_to_string/src"/>
<src path="../java_tools/proxy_server/src/main/java"/>
<src path="../java_tools/proxy_server/src/test/java"/>
<src path="io/src/main/java"/>
<src path="io/src/test/java"/>
<src path="models/src/main/java"/>
@ -76,7 +79,7 @@
<jvmarg value="-XX:+HeapDumpOnOutOfMemoryError"/>
<formatter type="brief"/>
<classpath
path="build/classes:lib/junit.jar:lib/SteelSeries-3.9.30.jar:lib/miglayout-4.0.jar"/>
path="build/classes:lib/junit.jar:${lib_list}:lib/commons-logging.jar"/>
<batchtest todir="build">
<fileset dir="autotest/src" includes="**/test/**/*Test.java"/>
<fileset dir="autoupdate/src" includes="**/test/**/*Test.java"/>

View File

@ -288,7 +288,7 @@ public class BinaryProtocol implements BinaryProtocolCommands {
setController(newVersion);
}
private byte[] receivePacket(String msg, boolean allowLongResponse) throws InterruptedException, EOFException {
private byte[] receivePacket(String msg, boolean allowLongResponse) throws EOFException {
long start = System.currentTimeMillis();
synchronized (ioLock) {
return incomingData.getPacket(logger, msg, allowLongResponse, start);
@ -408,7 +408,7 @@ public class BinaryProtocol implements BinaryProtocolCommands {
sendPacket(packet);
return receivePacket(msg, allowLongResponse);
} catch (InterruptedException | IOException e) {
} catch (IOException e) {
logger.error(msg + ": executeCommand failed: " + e);
close();
return null;

View File

@ -1,70 +0,0 @@
package com.rusefi.server;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
public class Backend {
// guarded by own monitor
private final Set<ClientConnectionState> clients = new HashSet<>();
private final int clientTimeout;
public Backend(int clientTimeout) {
this.clientTimeout = clientTimeout;
new Thread(() -> {
while (true) {
runCleanup();
sleep(clientTimeout);
}
}, "rusEFI Server Cleanup").start();
}
private void runCleanup() {
List<ClientConnectionState> inactiveClients = new ArrayList<>();
synchronized (clients) {
long now = System.currentTimeMillis();
for (ClientConnectionState client : clients) {
if (now - client.getLastActivityTimestamp() > clientTimeout)
inactiveClients.add(client);
}
}
for (ClientConnectionState inactiveClient : inactiveClients) {
close(inactiveClient);
}
}
public void register(ClientConnectionState clientConnectionState) {
synchronized (clients) {
clients.add(clientConnectionState);
}
}
private void close(ClientConnectionState inactiveClient) {
inactiveClient.close();
synchronized (clients) {
clients.remove(inactiveClient);
}
}
public List<ClientConnectionState> getClients() {
synchronized (clients) {
return new ArrayList<>(clients);
}
}
public int getCount() {
synchronized (clients) {
return clients.size();
}
}
}

View File

@ -1,74 +0,0 @@
package com.rusefi.server;
import com.opensr5.Logger;
import com.rusefi.auth.AutoTokenUtil;
import com.rusefi.binaryprotocol.BinaryProtocolCommands;
import com.rusefi.binaryprotocol.IncomingDataBuffer;
import com.rusefi.io.IoStream;
import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.TcpIoStream;
import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import static com.rusefi.binaryprotocol.IoHelper.checkResponseCode;
public class ClientConnectionState {
private final Socket clientSocket;
private final Logger logger;
private long lastActivityTimestamp;
private boolean isClosed;
private IoStream stream;
private IncomingDataBuffer incomingData;
public ClientConnectionState(Socket clientSocket, Logger logger) {
this.clientSocket = clientSocket;
this.logger = logger;
try {
stream = new TcpIoStream(logger, clientSocket);
incomingData = stream.getDataBuffer();
} catch (IOException e) {
close();
}
}
public long getLastActivityTimestamp() {
return lastActivityTimestamp;
}
public void close() {
isClosed = true;
close(clientSocket);
}
public void sayHello() {
try {
HelloCommand.send(stream, logger);
byte[] response = incomingData.getPacket(logger, "", false);
if (!checkResponseCode(response, BinaryProtocolCommands.RESPONSE_OK))
return;
String tokenAndSignature = new String(response, 1, response.length - 1);
String token = tokenAndSignature.length() > AutoTokenUtil.TOKEN_LENGTH ? tokenAndSignature.substring(0, AutoTokenUtil.TOKEN_LENGTH) : null;
if (!AutoTokenUtil.isToken(token))
throw new IOException("Invalid token");
String signature = tokenAndSignature.substring(AutoTokenUtil.TOKEN_LENGTH);
logger.info(token + " New client: " + signature);
} catch (IOException e) {
close();
}
}
private static void close(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException ignored) {
// ignored
}
}
}
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,24 +1,40 @@
package com.rusefi;
import com.opensr5.Logger;
import com.rusefi.io.TcpCommunicationIntegrationTest;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.server.Backend;
import com.rusefi.server.ClientConnectionState;
import com.rusefi.server.UserDetails;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.jetbrains.annotations.NotNull;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.junit.Test;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
import static com.rusefi.server.Backend.LIST_PATH;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* integration test of the rusEFI online backend process
* At the moment this test is very loose with timing it must be unreliable?
*/
public class ServerTest {
private final static Logger logger = Logger.CONSOLE;
@ -30,8 +46,19 @@ public class ServerTest {
CountDownLatch serverCreated = new CountDownLatch(1);
Backend backend = new Backend(5 * Timeouts.SECOND);
Function<String, UserDetails> userDetailsResolver = authToken -> new UserDetails(authToken.substring(0, 5), authToken.charAt(6));
CountDownLatch allClientsDisconnected = new CountDownLatch(1);
int httpPort = 8000;
Backend backend = new Backend(userDetailsResolver, httpPort) {
@Override
public void close(ClientConnectionState inactiveClient) {
super.close(inactiveClient);
if (getCount() == 0)
allClientsDisconnected.countDown();
}
};
BinaryProtocolServer.tcpServerSocket(serverPort, "Server", new Function<Socket, Runnable>() {
@Override
@ -39,14 +66,15 @@ public class ServerTest {
return new Runnable() {
@Override
public void run() {
ClientConnectionState clientConnectionState = new ClientConnectionState(clientSocket, logger);
clientConnectionState.sayHello();
backend.register(clientConnectionState);
while(true) {
ClientConnectionState clientConnectionState = new ClientConnectionState(clientSocket, logger, backend.getUserDetailsResolver());
try {
clientConnectionState.sayHello();
backend.register(clientConnectionState);
clientConnectionState.runEndlessLoop();
} catch (IOException e) {
backend.close(clientConnectionState);
}
}
};
}
@ -66,6 +94,38 @@ public class ServerTest {
List<ClientConnectionState> clients = backend.getClients();
assertEquals(2, clients.size());
List<UserDetails> onlineUsers = getOnlineUsers(httpPort);
assertEquals(2, onlineUsers.size());
assertTrue(allClientsDisconnected.await(30, TimeUnit.SECONDS));
}
@NotNull
private List<UserDetails> getOnlineUsers(int httpPort) throws IOException {
HttpClient httpclient = new DefaultHttpClient();
String url = "http://" + TcpCommunicationIntegrationTest.LOCALHOST + ":" + httpPort + LIST_PATH;
System.out.println("Connecting to " + url);
HttpGet httpget = new HttpGet(url);
HttpResponse httpResponse = httpclient.execute(httpget);
HttpEntity entity = httpResponse.getEntity();
String responseString = EntityUtils.toString(entity, "UTF-8");
JSONParser parser = new JSONParser();
List<UserDetails> userLists = new ArrayList<>();
try {
JSONArray array = (JSONArray) parser.parse(responseString);
for (int i = 0; i < array.size(); i++) {
JSONObject element = (JSONObject) array.get(i);
userLists.add(UserDetails.valueOf(element));
}
System.out.println("object=" + array);
} catch (ParseException e) {
throw new IllegalStateException(e);
}
return userLists;
}
}

View File

@ -28,5 +28,7 @@
<orderEntry type="library" name="jSerialComm" level="project" />
<orderEntry type="library" name="json-simple" level="project" />
<orderEntry type="module" module-name="models" />
<orderEntry type="module" module-name="proxy_server" scope="TEST" />
<orderEntry type="library" scope="TEST" name="takes" level="project" />
</component>
</module>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="takes" level="project" />
<orderEntry type="module" module-name="io" />
<orderEntry type="module" module-name="shared_io" />
<orderEntry type="library" scope="TEST" name="javax.json" level="project" />
<orderEntry type="library" name="javax.json" level="project" />
<orderEntry type="library" name="json-simple" level="project" />
</component>
</module>

View File

@ -0,0 +1,126 @@
package com.rusefi.server;
import org.jetbrains.annotations.NotNull;
import org.takes.Take;
import org.takes.facets.fork.FkRegex;
import org.takes.facets.fork.TkFork;
import org.takes.http.Exit;
import org.takes.http.FtBasic;
import org.takes.rs.RsJson;
import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
public class Backend {
public static final String LIST_PATH = "/list_online";
private final FkRegex showOnlineUsers = new FkRegex(LIST_PATH,
(Take) req -> getUsersOnline()
);
@NotNull
private RsJson getUsersOnline() throws IOException {
JsonArrayBuilder builder = Json.createArrayBuilder();
List<ClientConnectionState> clients = getClients();
for (ClientConnectionState client : clients) {
JsonObject clientObject = Json.createObjectBuilder()
.add(UserDetails.USER_ID, client.getUserDetails().getId())
.add(UserDetails.USERNAME, client.getUserDetails().getUserName())
.add("signature", client.getSignature())
.build();
builder.add(clientObject);
}
return new RsJson(builder.build());
}
// guarded by own monitor
private final Set<ClientConnectionState> clients = new HashSet<>();
// private final int clientTimeout;
private final Function<String, UserDetails> userDetailsResolver;
public Backend(Function<String, UserDetails> userDetailsResolver, int httpPort) {
// this.clientTimeout = clientTimeout;
this.userDetailsResolver = userDetailsResolver;
new Thread(new Runnable() {
@Override
public void run() {
try {
new FtBasic(
new TkFork(showOnlineUsers,
new FkRegex("/", "<a href='https://rusefi.com/online/'>rusEFI Online</a>")
), httpPort
).start(Exit.NEVER);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}).start();
// new Thread(() -> {
// while (true) {
// runCleanup();
// sleep(clientTimeout);
// }
// }, "rusEFI Server Cleanup").start();
}
public Function<String, UserDetails> getUserDetailsResolver() {
return userDetailsResolver;
}
// private void runCleanup() {
// List<ClientConnectionState> inactiveClients = new ArrayList<>();
//
// synchronized (clients) {
// long now = System.currentTimeMillis();
// for (ClientConnectionState client : clients) {
// if (now - client.getLastActivityTimestamp() > clientTimeout)
// inactiveClients.add(client);
// }
// }
//
// for (ClientConnectionState inactiveClient : inactiveClients) {
// close(inactiveClient);
// }
//
// }
public void register(ClientConnectionState clientConnectionState) {
synchronized (clients) {
clients.add(clientConnectionState);
}
}
public void close(ClientConnectionState inactiveClient) {
inactiveClient.close();
synchronized (clients) {
// in case of exception in the initialization phase we do not even add client into the the collection
clients.remove(inactiveClient);
}
}
public List<ClientConnectionState> getClients() {
synchronized (clients) {
return new ArrayList<>(clients);
}
}
public int getCount() {
synchronized (clients) {
return clients.size();
}
}
}

View File

@ -0,0 +1,105 @@
package com.rusefi.server;
import com.opensr5.Logger;
import com.rusefi.auth.AutoTokenUtil;
import com.rusefi.binaryprotocol.BinaryProtocolCommands;
import com.rusefi.binaryprotocol.IncomingDataBuffer;
import com.rusefi.io.IoStream;
import com.rusefi.io.commands.GetOutputsCommand;
import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.TcpIoStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.util.function.Function;
import static com.rusefi.binaryprotocol.IoHelper.checkResponseCode;
public class ClientConnectionState {
private final Socket clientSocket;
private final Logger logger;
private final Function<String, UserDetails> userDetailsResolver;
private long lastActivityTimestamp;
private boolean isClosed;
private IoStream stream;
private IncomingDataBuffer incomingData;
private UserDetails userDetails;
private String signature;
public ClientConnectionState(Socket clientSocket, Logger logger, Function<String, UserDetails> userDetailsResolver) {
this.clientSocket = clientSocket;
this.logger = logger;
this.userDetailsResolver = userDetailsResolver;
try {
stream = new TcpIoStream(logger, clientSocket);
incomingData = stream.getDataBuffer();
} catch (IOException e) {
close();
}
}
// public long getLastActivityTimestamp() {
// return lastActivityTimestamp;
// }
public boolean isClosed() {
return isClosed;
}
public void close() {
isClosed = true;
close(clientSocket);
}
public void sayHello() throws IOException {
HelloCommand.send(stream, logger);
byte[] response = incomingData.getPacket(logger, "", false);
if (!checkResponseCode(response, BinaryProtocolCommands.RESPONSE_OK))
return;
String tokenAndSignature = new String(response, 1, response.length - 1);
String authToken = tokenAndSignature.length() > AutoTokenUtil.TOKEN_LENGTH ? tokenAndSignature.substring(0, AutoTokenUtil.TOKEN_LENGTH) : null;
if (!AutoTokenUtil.isToken(authToken))
throw new IOException("Invalid token");
signature = tokenAndSignature.substring(AutoTokenUtil.TOKEN_LENGTH);
logger.info(authToken + " New client: " + signature);
userDetails = userDetailsResolver.apply(authToken);
logger.info("User " + userDetails);
}
public UserDetails getUserDetails() {
return userDetails;
}
public String getSignature() {
return signature;
}
private static void close(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException ignored) {
// ignored
}
}
}
public void runEndlessLoop() throws IOException {
while (true) {
byte[] commandPacket = GetOutputsCommand.createRequest();
stream.sendPacket(commandPacket, logger);
byte[] packet = incomingData.getPacket(logger, "msg", true);
if (packet == null)
throw new IOException("No response");
}
}
}

View File

@ -0,0 +1,39 @@
package com.rusefi.server;
import org.json.simple.JSONObject;
public class UserDetails {
public static final String USER_ID = "user_id";
public static final String USERNAME = "username";
private final String userName;
private final int id;
public UserDetails(String userName, int id) {
this.userName = userName;
this.id = id;
}
public static UserDetails valueOf(JSONObject element) {
long userId = (long) element.get(USER_ID);
String userName = (String) element.get(USERNAME);
return new UserDetails(userName, (int) userId);
}
public String getUserName() {
return userName;
}
public int getId() {
return id;
}
@Override
public String toString() {
return "UserDetails{" +
"userName='" + userName + '\'' +
", id=" + id +
'}';
}
}

View File

@ -0,0 +1,65 @@
package com.rusefi.server;
import org.takes.Request;
import org.takes.Response;
import org.takes.Take;
import org.takes.facets.fork.FkRegex;
import org.takes.facets.fork.TkFork;
import org.takes.http.Exit;
import org.takes.http.FtBasic;
import org.takes.misc.Href;
import org.takes.rq.RqHref;
import org.takes.rs.RsJson;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonObject;
public class ServerSandbox {
public static void main(final String... args) throws Exception {
new FtBasic(
new TkFork(
new FkRegex("/list", new Take() {
@Override
public Response act(Request request) throws Exception {
Href href = new RqHref.Base(request).href();
// URI uri = href.uri();
Iterable<String> values = href.param("user");
String name = values.iterator().next();
JsonArray result = Json.createArrayBuilder()
.add(Json.createObjectBuilder().add("name", name))
.add(Json.createObjectBuilder().add("name", name))
.build();
return new RsJson(result);
}
}),
new FkRegex("/", "hello, world!")
), 8080
).start(Exit.NEVER);
}
public static final class User implements RsJson.Source {
private final String name;
public User(String name) {
this.name = name;
}
@Override
public JsonObject toJson() {
return Json.createObjectBuilder()
.add("name", name)
.build();
}
}
}