parent
cad9791b35
commit
582537a5bf
|
@ -127,7 +127,7 @@ public class IncomingDataBuffer {
|
|||
while (cbb.length() < count) {
|
||||
int timeout = (int) (startTimestamp + timeoutMs - System.currentTimeMillis());
|
||||
if (timeout <= 0) {
|
||||
log.info(loggingMessage + ": timeout " + timeoutMs + "ms. Got only " + cbb.length() + " while expecting " + count);
|
||||
log.info(loggingMessage + ": timeout " + timeoutMs + "ms. Got only " + cbb.length() + "byte(s) while expecting " + count);
|
||||
return true; // timeout. Sad face.
|
||||
}
|
||||
try {
|
||||
|
|
|
@ -67,7 +67,7 @@ public class PCanIoStream extends AbstractIoStream {
|
|||
statusListener.append("Error initializing PCAN: " + status);
|
||||
return null;
|
||||
}
|
||||
statusListener.append("Hello PCAN!");
|
||||
statusListener.append("Creating PCAN stream...");
|
||||
return new PCanIoStream(can, statusListener);
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import com.rusefi.binaryprotocol.IoHelper;
|
|||
import com.rusefi.config.generated.Fields;
|
||||
import com.rusefi.io.IoStream;
|
||||
import com.rusefi.proxy.NetworkConnector;
|
||||
import com.rusefi.ui.StatusConsumer;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -36,18 +37,18 @@ public class BinaryProtocolProxy {
|
|||
/**
|
||||
* @return starts a thread and returns a reference to ServerSocketReference
|
||||
*/
|
||||
public static ServerSocketReference createProxy(IoStream targetEcuSocket, int serverProxyPort, ClientApplicationActivityListener clientApplicationActivityListener) throws IOException {
|
||||
public static ServerSocketReference createProxy(IoStream targetEcuSocket, int serverProxyPort, ClientApplicationActivityListener clientApplicationActivityListener, StatusConsumer statusConsumer) throws IOException {
|
||||
CompatibleFunction<Socket, Runnable> clientSocketRunnableFactory = clientSocket -> () -> {
|
||||
TcpIoStream clientStream = null;
|
||||
try {
|
||||
clientStream = new TcpIoStream("[[proxy]] ", clientSocket);
|
||||
runProxy(targetEcuSocket, clientStream, clientApplicationActivityListener, USER_IO_TIMEOUT);
|
||||
} catch (IOException e) {
|
||||
log.error("BinaryProtocolProxy::run " + e);
|
||||
statusConsumer.append("ERROR BinaryProtocolProxy::run " + e);
|
||||
close(clientStream);
|
||||
}
|
||||
};
|
||||
return BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Listener.empty());
|
||||
return BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Listener.empty(), statusConsumer);
|
||||
}
|
||||
|
||||
public interface ClientApplicationActivityListener {
|
||||
|
|
|
@ -15,6 +15,7 @@ import com.rusefi.io.IoStream;
|
|||
import com.rusefi.io.LinkManager;
|
||||
import com.rusefi.io.commands.HelloCommand;
|
||||
import com.rusefi.server.rusEFISSLContext;
|
||||
import com.rusefi.ui.StatusConsumer;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
|
@ -58,12 +59,6 @@ public class BinaryProtocolServer {
|
|||
|
||||
public static final ServerSocketFunction SECURE_SOCKET_FACTORY = rusEFISSLContext::getSSLServerSocket;
|
||||
|
||||
public static final ServerSocketFunction PLAIN_SOCKET_FACTORY = port -> {
|
||||
ServerSocket serverSocket = new ServerSocket(port);
|
||||
log.info("ServerSocket " + port + " created. Feel free to point TS at IP Address 'localhost' port " + port);
|
||||
return serverSocket;
|
||||
};
|
||||
|
||||
private final static ConcurrentHashMap<String, ThreadFactory> THREAD_FACTORIES_BY_NAME = new ConcurrentHashMap<>();
|
||||
|
||||
public void start(LinkManager linkManager) {
|
||||
|
@ -85,7 +80,7 @@ public class BinaryProtocolServer {
|
|||
}
|
||||
};
|
||||
|
||||
tcpServerSocket(port, "BinaryProtocolServer", clientSocketRunnableFactory, serverSocketCreationCallback);
|
||||
tcpServerSocket(port, "BinaryProtocolServer", clientSocketRunnableFactory, serverSocketCreationCallback, StatusConsumer.ANONYMOUS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,10 +90,15 @@ public class BinaryProtocolServer {
|
|||
* @param threadName
|
||||
* @param socketRunnableFactory method to invoke on a new thread for each new client connection
|
||||
* @param serverSocketCreationCallback this callback is invoked once we open the server socket
|
||||
* @param statusConsumer
|
||||
* @return
|
||||
*/
|
||||
public static ServerSocketReference tcpServerSocket(int port, String threadName, CompatibleFunction<Socket, Runnable> socketRunnableFactory, Listener serverSocketCreationCallback) throws IOException {
|
||||
return tcpServerSocket(socketRunnableFactory, port, threadName, serverSocketCreationCallback, PLAIN_SOCKET_FACTORY);
|
||||
public static ServerSocketReference tcpServerSocket(int port, String threadName, CompatibleFunction<Socket, Runnable> socketRunnableFactory, Listener serverSocketCreationCallback, StatusConsumer statusConsumer) throws IOException {
|
||||
return tcpServerSocket(socketRunnableFactory, port, threadName, serverSocketCreationCallback, p -> {
|
||||
ServerSocket serverSocket = new ServerSocket(p);
|
||||
statusConsumer.append("ServerSocket " + p + " created. Feel free to point TS at IP Address 'localhost' port " + p);
|
||||
return serverSocket;
|
||||
});
|
||||
}
|
||||
|
||||
public static ServerSocketReference tcpServerSocket(CompatibleFunction<Socket, Runnable> clientSocketRunnableFactory, int port, String threadName, Listener serverSocketCreationCallback, ServerSocketFunction nonSecureSocketFunction) throws IOException {
|
||||
|
|
|
@ -16,6 +16,7 @@ import com.rusefi.server.ApplicationRequest;
|
|||
import com.rusefi.server.rusEFISSLContext;
|
||||
import com.rusefi.tools.online.HttpUtil;
|
||||
import com.rusefi.tools.online.ProxyClient;
|
||||
import com.rusefi.ui.StatusConsumer;
|
||||
import org.apache.http.Consts;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.NameValuePair;
|
||||
|
@ -34,7 +35,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
|
||||
import static com.devexperts.logging.Logging.getLogging;
|
||||
import static com.rusefi.Timeouts.BINARY_IO_TIMEOUT;
|
||||
import static com.rusefi.Timeouts.SECOND;
|
||||
import static com.rusefi.binaryprotocol.BinaryProtocol.sleep;
|
||||
|
||||
/**
|
||||
|
@ -136,7 +136,7 @@ public class LocalApplicationProxy implements Closeable {
|
|||
}).start();
|
||||
|
||||
|
||||
ServerSocketReference serverHolder = BinaryProtocolProxy.createProxy(authenticatorToProxyStream, context.authenticatorPort(), clientApplicationActivityListener);
|
||||
ServerSocketReference serverHolder = BinaryProtocolProxy.createProxy(authenticatorToProxyStream, context.authenticatorPort(), clientApplicationActivityListener, StatusConsumer.ANONYMOUS);
|
||||
LocalApplicationProxy localApplicationProxy = new LocalApplicationProxy(applicationRequest, serverHolder, authenticatorToProxyStream);
|
||||
connectionListener.onConnected(localApplicationProxy, authenticatorToProxyStream);
|
||||
return serverHolder;
|
||||
|
|
|
@ -1,9 +1,17 @@
|
|||
package com.rusefi.ui;
|
||||
|
||||
import com.devexperts.logging.Logging;
|
||||
import com.rusefi.binaryprotocol.IncomingDataBuffer;
|
||||
|
||||
import static com.devexperts.logging.Logging.getLogging;
|
||||
|
||||
/**
|
||||
* @see StatusWindow
|
||||
*/
|
||||
public interface StatusConsumer {
|
||||
Logging log = getLogging(StatusConsumer.class);
|
||||
|
||||
StatusConsumer ANONYMOUS = log::info;
|
||||
StatusConsumer VOID = s -> {
|
||||
};
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ public class PCanSandbox {
|
|||
public static void main(String[] args) throws IOException, InterruptedException {
|
||||
AbstractIoStream tsStream = PCanIoStream.createStream();
|
||||
if (tsStream == null)
|
||||
throw new IOException("No PCAN");
|
||||
throw new IOException("Error creating PCAN stream");
|
||||
|
||||
|
||||
for (int i = 0; i < 17; i++) {
|
||||
|
|
|
@ -8,6 +8,7 @@ import com.rusefi.io.IoStream;
|
|||
import com.rusefi.io.commands.HelloCommand;
|
||||
import com.rusefi.io.tcp.BinaryProtocolServer;
|
||||
import com.rusefi.io.tcp.TcpIoStream;
|
||||
import com.rusefi.ui.StatusConsumer;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
|
@ -80,6 +81,6 @@ public class TcpServerSandbox {
|
|||
System.out.println("onResult");
|
||||
|
||||
}
|
||||
});
|
||||
}, StatusConsumer.ANONYMOUS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,9 +14,12 @@ public class CANConnectorStartup {
|
|||
throw new IOException("Failed to initialise connector");
|
||||
|
||||
String signature = BinaryProtocol.getSignature(tsStream);
|
||||
statusListener.append("Got [" + signature + "] signature via " + tsStream);
|
||||
|
||||
BinaryProtocolProxy.createProxy(tsStream, TcpConnector.DEFAULT_PORT, BinaryProtocolProxy.ClientApplicationActivityListener.VOID);
|
||||
if (signature == null) {
|
||||
statusListener.append("Error: no ECU signature from " + tsStream);
|
||||
} else {
|
||||
statusListener.append("Got [" + signature + "] ECU signature via " + tsStream);
|
||||
}
|
||||
BinaryProtocolProxy.createProxy(tsStream, TcpConnector.DEFAULT_PORT, BinaryProtocolProxy.ClientApplicationActivityListener.VOID, statusListener);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -131,7 +131,7 @@ public class ConsoleTools {
|
|||
public void onActivity() {
|
||||
|
||||
}
|
||||
});
|
||||
}, StatusConsumer.ANONYMOUS);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import com.rusefi.io.can.Elm327Connector;
|
|||
import com.rusefi.io.serial.SerialIoStream;
|
||||
import com.rusefi.io.tcp.BinaryProtocolProxy;
|
||||
import com.rusefi.io.tcp.TcpConnector;
|
||||
import com.rusefi.ui.StatusConsumer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -31,7 +32,7 @@ public class Elm327ConnectorStartup {
|
|||
System.out.println("onActivity");
|
||||
Elm327Connector.whyDoWeNeedToSleepBetweenCommands();
|
||||
}
|
||||
});
|
||||
}, StatusConsumer.ANONYMOUS);
|
||||
|
||||
log.info("Running Elm327 connector for " + autoDetectedPort);
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import com.rusefi.config.generated.Fields;
|
|||
import com.rusefi.io.tcp.BinaryProtocolProxy;
|
||||
import com.rusefi.io.tcp.BinaryProtocolServer;
|
||||
import com.rusefi.io.tcp.TcpConnector;
|
||||
import com.rusefi.ui.StatusConsumer;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -94,7 +95,8 @@ public class TcpCommunicationIntegrationTest {
|
|||
// connect proxy to virtual controller
|
||||
IoStream targetEcuSocket = TestHelper.connectToLocalhost(controllerPort);
|
||||
final AtomicInteger relayCommandCounter = new AtomicInteger();
|
||||
BinaryProtocolProxy.createProxy(targetEcuSocket, proxyPort, () -> relayCommandCounter.incrementAndGet());
|
||||
BinaryProtocolProxy.createProxy(targetEcuSocket, proxyPort, () -> relayCommandCounter.incrementAndGet(),
|
||||
StatusConsumer.ANONYMOUS);
|
||||
|
||||
CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1);
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import com.rusefi.io.tcp.TcpIoStream;
|
|||
import com.rusefi.server.ApplicationRequest;
|
||||
import com.rusefi.server.SessionDetails;
|
||||
import com.rusefi.tools.online.ProxyClient;
|
||||
import com.rusefi.ui.StatusConsumer;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -51,7 +52,7 @@ public class LocalApplicationProxyTest {
|
|||
ServerSocketReference mockBackend = BinaryProtocolServer.tcpServerSocket(context.serverPortForRemoteApplications(), "localAppTest", socket -> () -> {
|
||||
sleep(Timeouts.SECOND);
|
||||
close(socket);
|
||||
}, parameter -> backendCreated.countDown());
|
||||
}, parameter -> backendCreated.countDown(), StatusConsumer.ANONYMOUS);
|
||||
assertLatch(backendCreated);
|
||||
|
||||
SessionDetails sessionDetails = TestHelper.createTestSession(TEST_TOKEN_1, Fields.TS_SIGNATURE);
|
||||
|
@ -162,7 +163,7 @@ public class LocalApplicationProxyTest {
|
|||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
||||
}, parameter -> backendCreated.countDown());
|
||||
}, parameter -> backendCreated.countDown(), StatusConsumer.ANONYMOUS);
|
||||
|
||||
assertLatch(backendCreated);
|
||||
return mockBackend;
|
||||
|
|
Loading…
Reference in New Issue