parent
34bef1b3d2
commit
f93c5eda8b
|
@ -127,7 +127,7 @@ public class IncomingDataBuffer {
|
||||||
while (cbb.length() < count) {
|
while (cbb.length() < count) {
|
||||||
int timeout = (int) (startTimestamp + timeoutMs - System.currentTimeMillis());
|
int timeout = (int) (startTimestamp + timeoutMs - System.currentTimeMillis());
|
||||||
if (timeout <= 0) {
|
if (timeout <= 0) {
|
||||||
log.info(loggingMessage + ": timeout " + timeoutMs + "ms. Got only " + cbb.length() + " while expecting " + count);
|
log.info(loggingMessage + ": timeout " + timeoutMs + "ms. Got only " + cbb.length() + "byte(s) while expecting " + count);
|
||||||
return true; // timeout. Sad face.
|
return true; // timeout. Sad face.
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class PCanIoStream extends AbstractIoStream {
|
||||||
statusListener.append("Error initializing PCAN: " + status);
|
statusListener.append("Error initializing PCAN: " + status);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
statusListener.append("Hello PCAN!");
|
statusListener.append("Creating PCAN stream...");
|
||||||
return new PCanIoStream(can, statusListener);
|
return new PCanIoStream(can, statusListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import com.rusefi.binaryprotocol.IoHelper;
|
||||||
import com.rusefi.config.generated.Fields;
|
import com.rusefi.config.generated.Fields;
|
||||||
import com.rusefi.io.IoStream;
|
import com.rusefi.io.IoStream;
|
||||||
import com.rusefi.proxy.NetworkConnector;
|
import com.rusefi.proxy.NetworkConnector;
|
||||||
|
import com.rusefi.ui.StatusConsumer;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
|
@ -36,18 +37,18 @@ public class BinaryProtocolProxy {
|
||||||
/**
|
/**
|
||||||
* @return starts a thread and returns a reference to ServerSocketReference
|
* @return starts a thread and returns a reference to ServerSocketReference
|
||||||
*/
|
*/
|
||||||
public static ServerSocketReference createProxy(IoStream targetEcuSocket, int serverProxyPort, ClientApplicationActivityListener clientApplicationActivityListener) throws IOException {
|
public static ServerSocketReference createProxy(IoStream targetEcuSocket, int serverProxyPort, ClientApplicationActivityListener clientApplicationActivityListener, StatusConsumer statusConsumer) throws IOException {
|
||||||
CompatibleFunction<Socket, Runnable> clientSocketRunnableFactory = clientSocket -> () -> {
|
CompatibleFunction<Socket, Runnable> clientSocketRunnableFactory = clientSocket -> () -> {
|
||||||
TcpIoStream clientStream = null;
|
TcpIoStream clientStream = null;
|
||||||
try {
|
try {
|
||||||
clientStream = new TcpIoStream("[[proxy]] ", clientSocket);
|
clientStream = new TcpIoStream("[[proxy]] ", clientSocket);
|
||||||
runProxy(targetEcuSocket, clientStream, clientApplicationActivityListener, USER_IO_TIMEOUT);
|
runProxy(targetEcuSocket, clientStream, clientApplicationActivityListener, USER_IO_TIMEOUT);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("BinaryProtocolProxy::run " + e);
|
statusConsumer.append("ERROR BinaryProtocolProxy::run " + e);
|
||||||
close(clientStream);
|
close(clientStream);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Listener.empty());
|
return BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Listener.empty(), statusConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface ClientApplicationActivityListener {
|
public interface ClientApplicationActivityListener {
|
||||||
|
|
|
@ -15,6 +15,7 @@ import com.rusefi.io.IoStream;
|
||||||
import com.rusefi.io.LinkManager;
|
import com.rusefi.io.LinkManager;
|
||||||
import com.rusefi.io.commands.HelloCommand;
|
import com.rusefi.io.commands.HelloCommand;
|
||||||
import com.rusefi.server.rusEFISSLContext;
|
import com.rusefi.server.rusEFISSLContext;
|
||||||
|
import com.rusefi.ui.StatusConsumer;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
|
@ -58,12 +59,6 @@ public class BinaryProtocolServer {
|
||||||
|
|
||||||
public static final ServerSocketFunction SECURE_SOCKET_FACTORY = rusEFISSLContext::getSSLServerSocket;
|
public static final ServerSocketFunction SECURE_SOCKET_FACTORY = rusEFISSLContext::getSSLServerSocket;
|
||||||
|
|
||||||
public static final ServerSocketFunction PLAIN_SOCKET_FACTORY = port -> {
|
|
||||||
ServerSocket serverSocket = new ServerSocket(port);
|
|
||||||
log.info("ServerSocket " + port + " created. Feel free to point TS at IP Address 'localhost' port " + port);
|
|
||||||
return serverSocket;
|
|
||||||
};
|
|
||||||
|
|
||||||
private final static ConcurrentHashMap<String, ThreadFactory> THREAD_FACTORIES_BY_NAME = new ConcurrentHashMap<>();
|
private final static ConcurrentHashMap<String, ThreadFactory> THREAD_FACTORIES_BY_NAME = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public void start(LinkManager linkManager) {
|
public void start(LinkManager linkManager) {
|
||||||
|
@ -85,7 +80,7 @@ public class BinaryProtocolServer {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
tcpServerSocket(port, "BinaryProtocolServer", clientSocketRunnableFactory, serverSocketCreationCallback);
|
tcpServerSocket(port, "BinaryProtocolServer", clientSocketRunnableFactory, serverSocketCreationCallback, StatusConsumer.ANONYMOUS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -95,10 +90,15 @@ public class BinaryProtocolServer {
|
||||||
* @param threadName
|
* @param threadName
|
||||||
* @param socketRunnableFactory method to invoke on a new thread for each new client connection
|
* @param socketRunnableFactory method to invoke on a new thread for each new client connection
|
||||||
* @param serverSocketCreationCallback this callback is invoked once we open the server socket
|
* @param serverSocketCreationCallback this callback is invoked once we open the server socket
|
||||||
|
* @param statusConsumer
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public static ServerSocketReference tcpServerSocket(int port, String threadName, CompatibleFunction<Socket, Runnable> socketRunnableFactory, Listener serverSocketCreationCallback) throws IOException {
|
public static ServerSocketReference tcpServerSocket(int port, String threadName, CompatibleFunction<Socket, Runnable> socketRunnableFactory, Listener serverSocketCreationCallback, StatusConsumer statusConsumer) throws IOException {
|
||||||
return tcpServerSocket(socketRunnableFactory, port, threadName, serverSocketCreationCallback, PLAIN_SOCKET_FACTORY);
|
return tcpServerSocket(socketRunnableFactory, port, threadName, serverSocketCreationCallback, p -> {
|
||||||
|
ServerSocket serverSocket = new ServerSocket(p);
|
||||||
|
statusConsumer.append("ServerSocket " + p + " created. Feel free to point TS at IP Address 'localhost' port " + p);
|
||||||
|
return serverSocket;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ServerSocketReference tcpServerSocket(CompatibleFunction<Socket, Runnable> clientSocketRunnableFactory, int port, String threadName, Listener serverSocketCreationCallback, ServerSocketFunction nonSecureSocketFunction) throws IOException {
|
public static ServerSocketReference tcpServerSocket(CompatibleFunction<Socket, Runnable> clientSocketRunnableFactory, int port, String threadName, Listener serverSocketCreationCallback, ServerSocketFunction nonSecureSocketFunction) throws IOException {
|
||||||
|
|
|
@ -16,6 +16,7 @@ import com.rusefi.server.ApplicationRequest;
|
||||||
import com.rusefi.server.rusEFISSLContext;
|
import com.rusefi.server.rusEFISSLContext;
|
||||||
import com.rusefi.tools.online.HttpUtil;
|
import com.rusefi.tools.online.HttpUtil;
|
||||||
import com.rusefi.tools.online.ProxyClient;
|
import com.rusefi.tools.online.ProxyClient;
|
||||||
|
import com.rusefi.ui.StatusConsumer;
|
||||||
import org.apache.http.Consts;
|
import org.apache.http.Consts;
|
||||||
import org.apache.http.HttpResponse;
|
import org.apache.http.HttpResponse;
|
||||||
import org.apache.http.NameValuePair;
|
import org.apache.http.NameValuePair;
|
||||||
|
@ -34,7 +35,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import static com.devexperts.logging.Logging.getLogging;
|
import static com.devexperts.logging.Logging.getLogging;
|
||||||
import static com.rusefi.Timeouts.BINARY_IO_TIMEOUT;
|
import static com.rusefi.Timeouts.BINARY_IO_TIMEOUT;
|
||||||
import static com.rusefi.Timeouts.SECOND;
|
|
||||||
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
|
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -136,7 +136,7 @@ public class LocalApplicationProxy implements Closeable {
|
||||||
}).start();
|
}).start();
|
||||||
|
|
||||||
|
|
||||||
ServerSocketReference serverHolder = BinaryProtocolProxy.createProxy(authenticatorToProxyStream, context.authenticatorPort(), clientApplicationActivityListener);
|
ServerSocketReference serverHolder = BinaryProtocolProxy.createProxy(authenticatorToProxyStream, context.authenticatorPort(), clientApplicationActivityListener, StatusConsumer.ANONYMOUS);
|
||||||
LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(applicationRequest, serverHolder, authenticatorToProxyStream);
|
LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(applicationRequest, serverHolder, authenticatorToProxyStream);
|
||||||
connectionListener.onConnected(localApplicationProxy, authenticatorToProxyStream);
|
connectionListener.onConnected(localApplicationProxy, authenticatorToProxyStream);
|
||||||
return serverHolder;
|
return serverHolder;
|
||||||
|
|
|
@ -1,9 +1,17 @@
|
||||||
package com.rusefi.ui;
|
package com.rusefi.ui;
|
||||||
|
|
||||||
|
import com.devexperts.logging.Logging;
|
||||||
|
import com.rusefi.binaryprotocol.IncomingDataBuffer;
|
||||||
|
|
||||||
|
import static com.devexperts.logging.Logging.getLogging;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see StatusWindow
|
* @see StatusWindow
|
||||||
*/
|
*/
|
||||||
public interface StatusConsumer {
|
public interface StatusConsumer {
|
||||||
|
Logging log = getLogging(StatusConsumer.class);
|
||||||
|
|
||||||
|
StatusConsumer ANONYMOUS = log::info;
|
||||||
StatusConsumer VOID = s -> {
|
StatusConsumer VOID = s -> {
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ public class PCanSandbox {
|
||||||
public static void main(String[] args) throws IOException, InterruptedException {
|
public static void main(String[] args) throws IOException, InterruptedException {
|
||||||
AbstractIoStream tsStream = PCanIoStream.createStream();
|
AbstractIoStream tsStream = PCanIoStream.createStream();
|
||||||
if (tsStream == null)
|
if (tsStream == null)
|
||||||
throw new IOException("No PCAN");
|
throw new IOException("Error creating PCAN stream");
|
||||||
|
|
||||||
|
|
||||||
for (int i = 0; i < 17; i++) {
|
for (int i = 0; i < 17; i++) {
|
||||||
|
|
|
@ -8,6 +8,7 @@ import com.rusefi.io.IoStream;
|
||||||
import com.rusefi.io.commands.HelloCommand;
|
import com.rusefi.io.commands.HelloCommand;
|
||||||
import com.rusefi.io.tcp.BinaryProtocolServer;
|
import com.rusefi.io.tcp.BinaryProtocolServer;
|
||||||
import com.rusefi.io.tcp.TcpIoStream;
|
import com.rusefi.io.tcp.TcpIoStream;
|
||||||
|
import com.rusefi.ui.StatusConsumer;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
|
@ -80,6 +81,6 @@ public class TcpServerSandbox {
|
||||||
System.out.println("onResult");
|
System.out.println("onResult");
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
}, StatusConsumer.ANONYMOUS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,9 +14,12 @@ public class CANConnectorStartup {
|
||||||
throw new IOException("Failed to initialise connector");
|
throw new IOException("Failed to initialise connector");
|
||||||
|
|
||||||
String signature = BinaryProtocol.getSignature(tsStream);
|
String signature = BinaryProtocol.getSignature(tsStream);
|
||||||
statusListener.append("Got [" + signature + "] signature via " + tsStream);
|
if (signature == null) {
|
||||||
|
statusListener.append("Error: no ECU signature from " + tsStream);
|
||||||
BinaryProtocolProxy.createProxy(tsStream, TcpConnector.DEFAULT_PORT, BinaryProtocolProxy.ClientApplicationActivityListener.VOID);
|
} else {
|
||||||
|
statusListener.append("Got [" + signature + "] ECU signature via " + tsStream);
|
||||||
|
}
|
||||||
|
BinaryProtocolProxy.createProxy(tsStream, TcpConnector.DEFAULT_PORT, BinaryProtocolProxy.ClientApplicationActivityListener.VOID, statusListener);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class ConsoleTools {
|
||||||
public void onActivity() {
|
public void onActivity() {
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
}, StatusConsumer.ANONYMOUS);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ import com.rusefi.io.can.Elm327Connector;
|
||||||
import com.rusefi.io.serial.SerialIoStream;
|
import com.rusefi.io.serial.SerialIoStream;
|
||||||
import com.rusefi.io.tcp.BinaryProtocolProxy;
|
import com.rusefi.io.tcp.BinaryProtocolProxy;
|
||||||
import com.rusefi.io.tcp.TcpConnector;
|
import com.rusefi.io.tcp.TcpConnector;
|
||||||
|
import com.rusefi.ui.StatusConsumer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -31,7 +32,7 @@ public class Elm327ConnectorStartup {
|
||||||
System.out.println("onActivity");
|
System.out.println("onActivity");
|
||||||
Elm327Connector.whyDoWeNeedToSleepBetweenCommands();
|
Elm327Connector.whyDoWeNeedToSleepBetweenCommands();
|
||||||
}
|
}
|
||||||
});
|
}, StatusConsumer.ANONYMOUS);
|
||||||
|
|
||||||
log.info("Running Elm327 connector for " + autoDetectedPort);
|
log.info("Running Elm327 connector for " + autoDetectedPort);
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import com.rusefi.config.generated.Fields;
|
||||||
import com.rusefi.io.tcp.BinaryProtocolProxy;
|
import com.rusefi.io.tcp.BinaryProtocolProxy;
|
||||||
import com.rusefi.io.tcp.BinaryProtocolServer;
|
import com.rusefi.io.tcp.BinaryProtocolServer;
|
||||||
import com.rusefi.io.tcp.TcpConnector;
|
import com.rusefi.io.tcp.TcpConnector;
|
||||||
|
import com.rusefi.ui.StatusConsumer;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -94,7 +95,8 @@ public class TcpCommunicationIntegrationTest {
|
||||||
// connect proxy to virtual controller
|
// connect proxy to virtual controller
|
||||||
IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort);
|
IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort);
|
||||||
final AtomicInteger relayCommandCounter = new AtomicInteger();
|
final AtomicInteger relayCommandCounter = new AtomicInteger();
|
||||||
BinaryProtocolProxy.createProxy(targetEcuSocket, proxyPort, () -> relayCommandCounter.incrementAndGet());
|
BinaryProtocolProxy.createProxy(targetEcuSocket, proxyPort, () -> relayCommandCounter.incrementAndGet(),
|
||||||
|
StatusConsumer.ANONYMOUS);
|
||||||
|
|
||||||
CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1);
|
CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import com.rusefi.io.tcp.TcpIoStream;
|
||||||
import com.rusefi.server.ApplicationRequest;
|
import com.rusefi.server.ApplicationRequest;
|
||||||
import com.rusefi.server.SessionDetails;
|
import com.rusefi.server.SessionDetails;
|
||||||
import com.rusefi.tools.online.ProxyClient;
|
import com.rusefi.tools.online.ProxyClient;
|
||||||
|
import com.rusefi.ui.StatusConsumer;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -51,7 +52,7 @@ public class LocalApplicationProxyTest {
|
||||||
ServerSocketReference mockBackend = BinaryProtocolServer.tcpServerSocket(context.serverPortForRemoteApplications(), "localAppTest", socket -> () -> {
|
ServerSocketReference mockBackend = BinaryProtocolServer.tcpServerSocket(context.serverPortForRemoteApplications(), "localAppTest", socket -> () -> {
|
||||||
sleep(Timeouts.SECOND);
|
sleep(Timeouts.SECOND);
|
||||||
close(socket);
|
close(socket);
|
||||||
}, parameter -> backendCreated.countDown());
|
}, parameter -> backendCreated.countDown(), StatusConsumer.ANONYMOUS);
|
||||||
assertLatch(backendCreated);
|
assertLatch(backendCreated);
|
||||||
|
|
||||||
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
|
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
|
||||||
|
@ -162,7 +163,7 @@ public class LocalApplicationProxyTest {
|
||||||
throw new IllegalStateException(e);
|
throw new IllegalStateException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}, parameter -> backendCreated.countDown());
|
}, parameter -> backendCreated.countDown(), StatusConsumer.ANONYMOUS);
|
||||||
|
|
||||||
assertLatch(backendCreated);
|
assertLatch(backendCreated);
|
||||||
return mockBackend;
|
return mockBackend;
|
||||||
|
|
Loading…
Reference in New Issue