proxy progress
This commit is contained in:
parent
78306b6d85
commit
10efceabfc
|
@ -0,0 +1,84 @@
|
|||
package com.rusefi.io.tcp;
|
||||
|
||||
import com.opensr5.Logger;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static com.rusefi.binaryprotocol.BinaryProtocolCommands.COMMAND_PROTOCOL;
|
||||
|
||||
public class BinaryProtocolProxy {
|
||||
public static void createProxy(Socket targetEcuSocket, int serverProxyPort) {
|
||||
Function<Socket, Runnable> clientSocketRunnableFactory = new Function<Socket, Runnable>() {
|
||||
@Override
|
||||
public Runnable apply(Socket clientSocket) {
|
||||
return new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
runProxy(targetEcuSocket, clientSocket);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
BinaryProtocolServer.tcpServerSocket(serverProxyPort, "proxy", clientSocketRunnableFactory, Logger.CONSOLE);
|
||||
}
|
||||
|
||||
private static void runProxy(Socket targetEcuSocket, Socket clientSocket) {
|
||||
/*
|
||||
* Each client socket is running on it's own thread
|
||||
*/
|
||||
try {
|
||||
DataInputStream targetInputStream = new DataInputStream(targetEcuSocket.getInputStream());
|
||||
DataOutputStream targetOutputStream = new DataOutputStream(targetEcuSocket.getOutputStream());
|
||||
|
||||
DataInputStream clientInputStream = new DataInputStream(clientSocket.getInputStream());
|
||||
DataOutputStream clientOutputStream = new DataOutputStream(clientSocket.getOutputStream());
|
||||
|
||||
while (true) {
|
||||
byte firstByte = clientInputStream.readByte();
|
||||
if (firstByte == COMMAND_PROTOCOL) {
|
||||
BinaryProtocolServer.handleProtocolCommand(clientSocket);
|
||||
continue;
|
||||
}
|
||||
proxyClientRequestToController(clientInputStream, firstByte, targetOutputStream);
|
||||
|
||||
proxyControllerResponseToClient(targetInputStream, clientOutputStream);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private static void proxyControllerResponseToClient(DataInputStream targetInputStream, DataOutputStream clientOutputStream) throws IOException {
|
||||
short length = targetInputStream.readShort();
|
||||
BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(targetInputStream, length);
|
||||
|
||||
System.out.println("Relaying controller response length=" + length);
|
||||
clientOutputStream.writeShort(length);
|
||||
clientOutputStream.write(packet.getPacket());
|
||||
clientOutputStream.writeInt(packet.getCrc());
|
||||
clientOutputStream.flush();
|
||||
}
|
||||
|
||||
private static void proxyClientRequestToController(DataInputStream in, byte firstByte, DataOutputStream targetOutputStream) throws IOException {
|
||||
byte secondByte = in.readByte();
|
||||
int length = firstByte * 256 + secondByte;
|
||||
|
||||
BinaryProtocolServer.Packet packet = BinaryProtocolServer.readPromisedBytes(in, length);
|
||||
|
||||
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet.getPacket()));
|
||||
byte command = (byte) dis.read();
|
||||
|
||||
System.out.println("Relaying client command" + (char) command + "/" + command + " length=" + length);
|
||||
// sending proxies packet to controller
|
||||
targetOutputStream.write(firstByte);
|
||||
targetOutputStream.write(secondByte);
|
||||
targetOutputStream.write(packet.getPacket());
|
||||
targetOutputStream.writeInt(packet.getCrc());
|
||||
targetOutputStream.flush();
|
||||
}
|
||||
}
|
|
@ -13,6 +13,7 @@ import java.net.ServerSocket;
|
|||
import java.net.Socket;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static com.rusefi.binaryprotocol.IoHelper.swap16;
|
||||
import static com.rusefi.config.generated.Fields.*;
|
||||
|
@ -22,7 +23,7 @@ import static com.rusefi.config.generated.Fields.*;
|
|||
* serial port simultaneously
|
||||
*
|
||||
* @author Andrey Belomutskiy
|
||||
* 11/24/15
|
||||
* 11/24/15
|
||||
*/
|
||||
|
||||
public class BinaryProtocolServer implements BinaryProtocolCommands {
|
||||
|
@ -42,6 +43,19 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
|
|||
|
||||
public void start(LinkManager linkManager, int port) {
|
||||
logger.info("BinaryProtocolServer on " + port);
|
||||
|
||||
Function<Socket, Runnable> clientSocketRunnableFactory = clientSocket -> () -> {
|
||||
try {
|
||||
runProxy(linkManager, clientSocket);
|
||||
} catch (IOException e) {
|
||||
logger.info("proxy connection: " + e);
|
||||
}
|
||||
};
|
||||
|
||||
tcpServerSocket(port, "BinaryProtocolServer", clientSocketRunnableFactory, logger);
|
||||
}
|
||||
|
||||
public static void tcpServerSocket(int port, String threadName, Function<Socket, Runnable> clientSocketRunnableFactory, final Logger logger) {
|
||||
Runnable runnable = new Runnable() {
|
||||
@SuppressWarnings("InfiniteLoopStatement")
|
||||
@Override
|
||||
|
@ -59,23 +73,14 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
|
|||
// Wait for a connection
|
||||
final Socket clientSocket = serverSocket.accept();
|
||||
logger.info("Binary protocol proxy port connection");
|
||||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
runProxy(linkManager, clientSocket);
|
||||
} catch (IOException e) {
|
||||
logger.info("proxy connection: " + e);
|
||||
}
|
||||
}
|
||||
}, "proxy connection").start();
|
||||
new Thread(clientSocketRunnableFactory.apply(clientSocket), "proxy connection").start();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
new Thread(runnable, "BinaryProtocolServer").start();
|
||||
new Thread(runnable, threadName).start();
|
||||
}
|
||||
|
||||
@SuppressWarnings("InfiniteLoopStatement")
|
||||
|
@ -85,14 +90,9 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
|
|||
while (true) {
|
||||
byte first = in.readByte();
|
||||
if (first == COMMAND_PROTOCOL) {
|
||||
//System.out.println("Ignoring plain F command");
|
||||
System.out.println("Got plain F command");
|
||||
OutputStream outputStream = clientSocket.getOutputStream();
|
||||
outputStream.write(TS_PROTOCOL.getBytes());
|
||||
outputStream.flush();
|
||||
handleProtocolCommand(clientSocket);
|
||||
continue;
|
||||
}
|
||||
|
||||
int length = first * 256 + in.readByte();
|
||||
|
||||
System.out.println("Got [" + length + "] length promise");
|
||||
|
@ -100,19 +100,12 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
|
|||
if (length == 0)
|
||||
throw new IOException("Zero length not expected");
|
||||
|
||||
byte[] packet = new byte[length];
|
||||
int size = in.read(packet);
|
||||
if (size != packet.length)
|
||||
throw new IllegalStateException();
|
||||
byte[] packet = readPromisedBytes(in, length).packet;
|
||||
|
||||
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(packet));
|
||||
byte command = (byte) dis.read();
|
||||
System.out.println("Got [" + (char) command + "/" + command + "] command");
|
||||
|
||||
int crc = in.readInt();
|
||||
if (crc != IoHelper.getCrc32(packet))
|
||||
throw new IllegalStateException("CRC mismatch");
|
||||
|
||||
TcpIoStream stream = new TcpIoStream(logger, linkManager, clientSocket);
|
||||
if (command == COMMAND_HELLO) {
|
||||
stream.sendPacket((TS_OK + Fields.TS_SIGNATURE).getBytes(), logger);
|
||||
|
@ -159,6 +152,24 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
|
|||
}
|
||||
}
|
||||
|
||||
public static Packet readPromisedBytes(DataInputStream in, int length) throws IOException {
|
||||
byte[] packet = new byte[length];
|
||||
int size = in.read(packet);
|
||||
if (size != packet.length)
|
||||
throw new IllegalStateException();
|
||||
int crc = in.readInt();
|
||||
if (crc != IoHelper.getCrc32(packet))
|
||||
throw new IllegalStateException("CRC mismatch");
|
||||
return new Packet(packet, crc);
|
||||
}
|
||||
|
||||
public static void handleProtocolCommand(Socket clientSocket) throws IOException {
|
||||
System.out.println("Got plain F command");
|
||||
OutputStream outputStream = clientSocket.getOutputStream();
|
||||
outputStream.write(TS_PROTOCOL.getBytes());
|
||||
outputStream.flush();
|
||||
}
|
||||
|
||||
private void handleWrite(LinkManager linkManager, byte[] packet, DataInputStream dis, TcpIoStream stream) throws IOException {
|
||||
dis.readShort(); // page
|
||||
int offset = swap16(dis.readShort());
|
||||
|
@ -198,4 +209,22 @@ public class BinaryProtocolServer implements BinaryProtocolCommands {
|
|||
new DataOutputStream(response).writeInt(result);
|
||||
stream.sendPacket(response.toByteArray(), logger);
|
||||
}
|
||||
|
||||
public static class Packet {
|
||||
private final byte[] packet;
|
||||
private final int crc;
|
||||
|
||||
public Packet(byte[] packet, int crc) {
|
||||
this.packet = packet;
|
||||
this.crc = crc;
|
||||
}
|
||||
|
||||
public byte[] getPacket() {
|
||||
return packet;
|
||||
}
|
||||
|
||||
public int getCrc() {
|
||||
return crc;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,11 +7,14 @@ import com.rusefi.binaryprotocol.BinaryProtocol;
|
|||
import com.rusefi.binaryprotocol.BinaryProtocolState;
|
||||
import com.rusefi.config.Field;
|
||||
import com.rusefi.config.generated.Fields;
|
||||
import com.rusefi.io.tcp.BinaryProtocolProxy;
|
||||
import com.rusefi.io.tcp.BinaryProtocolServer;
|
||||
import com.rusefi.tune.xml.Constant;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.Socket;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -84,11 +87,31 @@ public class TcpCommunicationIntegrationTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testProxy() {
|
||||
public void testProxy() throws InterruptedException, IOException {
|
||||
ConfigurationImage serverImage = prepareImage(239, createIniField(Fields.CYLINDERSCOUNT));
|
||||
int port = 6102;
|
||||
BinaryProtocolServer server = createVirtualController(serverImage, port);
|
||||
int controllerPort = 6102;
|
||||
BinaryProtocolServer server = createVirtualController(serverImage, controllerPort);
|
||||
int proxyPort = 6103;
|
||||
|
||||
BinaryProtocolProxy.createProxy(new Socket("127.0.0.1", controllerPort), proxyPort);
|
||||
|
||||
CountDownLatch connectionEstablishedCountDownLatch = new CountDownLatch(1);
|
||||
|
||||
LinkManager clientManager = new LinkManager(LOGGER);
|
||||
clientManager.startAndConnect(Integer.toString(proxyPort), new ConnectionStateListener() {
|
||||
@Override
|
||||
public void onConnectionEstablished() {
|
||||
connectionEstablishedCountDownLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnectionFailed() {
|
||||
System.out.println("Failed");
|
||||
}
|
||||
});
|
||||
assertTrue(connectionEstablishedCountDownLatch.await(30, TimeUnit.SECONDS));
|
||||
|
||||
clientManager.stop();
|
||||
}
|
||||
|
||||
@NotNull
|
||||
|
|
Loading…
Reference in New Issue