diff --git a/java_console/io/src/com/rusefi/binaryprotocol/BinaryProtocol.java b/java_console/io/src/com/rusefi/binaryprotocol/BinaryProtocol.java index 9cfdc0d4e6..cd3bbb6450 100644 --- a/java_console/io/src/com/rusefi/binaryprotocol/BinaryProtocol.java +++ b/java_console/io/src/com/rusefi/binaryprotocol/BinaryProtocol.java @@ -4,6 +4,8 @@ import com.rusefi.*; import com.rusefi.core.Pair; import com.rusefi.io.DataListener; import com.rusefi.io.IoStream; +import com.rusefi.io.LinkManager; +import com.rusefi.io.serial.PortHolder; import com.rusefi.io.serial.SerialIoStream; import etch.util.CircularByteBuffer; import jssc.SerialPort; @@ -38,7 +40,7 @@ public class BinaryProtocol { public static BinaryProtocol instance; public boolean isClosed; - public BinaryProtocol(final Logger logger, SerialIoStream stream) { + public BinaryProtocol(final Logger logger, IoStream stream) { this.logger = logger; this.stream = stream; @@ -59,13 +61,60 @@ public class BinaryProtocol { } } }; - this.stream.addEventListener(listener); + stream.addEventListener(listener); } public BinaryProtocol(Logger logger, SerialPort serialPort) { this(logger, new SerialIoStream(serialPort, logger)); } + private static void sleep() { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + public boolean connect(DataListener listener) { + switchToBinaryProtocol(); + readImage(TsPageSize.IMAGE_SIZE); + if (isClosed) + return false; + + startTextPullThread(listener); + return true; + } + + public void startTextPullThread(final DataListener listener) { + if (!LinkManager.COMMUNICATION_QUEUE.isEmpty()) { + System.out.println("Current queue: " + LinkManager.COMMUNICATION_QUEUE.size()); + } + Runnable textPull = new Runnable() { + @Override + public void run() { + while (!isClosed) { +// FileLog.rlog("queue: " + LinkManager.COMMUNICATION_QUEUE.toString()); + if (LinkManager.COMMUNICATION_QUEUE.isEmpty()) { + LinkManager.COMMUNICATION_EXECUTOR.submit(new Runnable() { + @Override + public void run() { + String text = requestText(); + if (text != null) + listener.onDataArrived((text + "\r\n").getBytes()); + } + }); + } + sleep(); + } + FileLog.MAIN.logLine("Stopping text pull"); + } + }; + Thread tr = new Thread(textPull); + tr.setName("text pull"); + tr.start(); + } + public Logger getLogger() { return logger; } diff --git a/java_console/io/src/com/rusefi/io/serial/PortHolder.java b/java_console/io/src/com/rusefi/io/serial/PortHolder.java index 8ddac81f89..ba6afd98e8 100644 --- a/java_console/io/src/com/rusefi/io/serial/PortHolder.java +++ b/java_console/io/src/com/rusefi/io/serial/PortHolder.java @@ -2,7 +2,6 @@ package com.rusefi.io.serial; import com.rusefi.FileLog; import com.rusefi.Timeouts; -import com.rusefi.TsPageSize; import com.rusefi.binaryprotocol.BinaryProtocol; import com.rusefi.core.MessagesCentral; import com.rusefi.io.CommandQueue; @@ -50,7 +49,7 @@ public class PortHolder { return result; } - public boolean open(String port, final DataListener listener) { + private boolean open(String port, final DataListener listener) { SerialPort serialPort = new SerialPort(port); try { FileLog.MAIN.logLine("Opening " + port + " @ " + BAUD_RATE); @@ -78,37 +77,7 @@ public class PortHolder { bp = new BinaryProtocol(FileLog.LOGGER, new SerialIoStream(serialPort, FileLog.LOGGER)); - bp.switchToBinaryProtocol(); - bp.readImage(TsPageSize.IMAGE_SIZE); - if (bp.isClosed) - return false; - - if (!LinkManager.COMMUNICATION_QUEUE.isEmpty()) { - System.out.println("Current queue: " + LinkManager.COMMUNICATION_QUEUE.size()); - } - Runnable textPull = new Runnable() { - @Override - public void run() { - while (!bp.isClosed) { -// FileLog.rlog("queue: " + LinkManager.COMMUNICATION_QUEUE.toString()); - if (LinkManager.COMMUNICATION_QUEUE.isEmpty()) { - LinkManager.COMMUNICATION_EXECUTOR.submit(new Runnable() { - @Override - public void run() { - String text = bp.requestText(); - if (text != null) - listener.onDataArrived((text + "\r\n").getBytes()); - } - }); - } - sleep(); - } - FileLog.MAIN.logLine("Stopping text pull"); - } - }; - Thread tr = new Thread(textPull); - tr.setName("text pull"); - tr.start(); + return bp.connect(listener); // // try { @@ -121,15 +90,6 @@ public class PortHolder { // } catch (SerialPortException e) { // return false; // } - return true; - } - - private void sleep() { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } } public static void setupPort(SerialPort serialPort, int baudRate) throws SerialPortException { diff --git a/java_console/io/src/com/rusefi/io/tcp/TcpConnector.java b/java_console/io/src/com/rusefi/io/tcp/TcpConnector.java index 22aa1cf4f0..42118527d6 100644 --- a/java_console/io/src/com/rusefi/io/tcp/TcpConnector.java +++ b/java_console/io/src/com/rusefi/io/tcp/TcpConnector.java @@ -2,9 +2,8 @@ package com.rusefi.io.tcp; import com.rusefi.FileLog; import com.rusefi.core.EngineState; -import com.rusefi.io.CommandQueue; -import com.rusefi.io.LinkConnector; -import com.rusefi.io.LinkManager; +import com.rusefi.core.ResponseBuffer; +import com.rusefi.io.*; import java.io.*; import java.net.Socket; @@ -19,8 +18,9 @@ public class TcpConnector implements LinkConnector { public final static int DEFAULT_PORT = 29001; public static final String LOCALHOST = "localhost"; private final int port; - private BufferedWriter writer; private boolean withError; + private BufferedInputStream stream; + private IoStream ioStream; public TcpConnector(String port) { try { @@ -86,34 +86,29 @@ public class TcpConnector implements LinkConnector { @Override public void connect(LinkManager.LinkStateListener listener) { FileLog.MAIN.logLine("Connecting to " + port); - BufferedInputStream stream; try { Socket socket = new Socket(LOCALHOST, port); - writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); + OutputStream os = socket.getOutputStream(); stream = new BufferedInputStream(socket.getInputStream()); + ioStream = new TcpIoStream(os, stream); } catch (IOException e) { throw new IllegalStateException("Failed to connect to simulator", e); } +// listener.onConnectionEstablished(); - final BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); - - LinkManager.IO_EXECUTOR.execute(new Runnable() { + final ResponseBuffer rb = new ResponseBuffer(new ResponseBuffer.ResponseListener() { @Override - public void run() { - Thread.currentThread().setName("TCP connector loop"); - FileLog.MAIN.logLine("Running TCP connection loop"); - while (true) { - try { - String line = reader.readLine(); - LinkManager.engineState.processNewData(line + "\r\n"); - } catch (IOException e) { - System.err.println("End of connection"); - return; - } - } + public void onResponse(String line) { + LinkManager.engineState.processNewData(line + "\r\n"); } }); + ioStream.addEventListener(new DataListener() { + @Override + public void onDataArrived(byte[] freshData) { + rb.append(new String(freshData)); + } + }); } @Override @@ -136,8 +131,7 @@ public class TcpConnector implements LinkConnector { String command = LinkManager.encodeCommand(text); FileLog.MAIN.logLine("Writing " + command); try { - writer.write(command + "\n"); - writer.flush(); + ioStream.write((command + "\n").getBytes()); } catch (IOException e) { withError = true; System.err.println("err in send"); diff --git a/java_console/io/src/com/rusefi/io/tcp/TcpIoStream.java b/java_console/io/src/com/rusefi/io/tcp/TcpIoStream.java index 61732278d4..4a0b16d199 100644 --- a/java_console/io/src/com/rusefi/io/tcp/TcpIoStream.java +++ b/java_console/io/src/com/rusefi/io/tcp/TcpIoStream.java @@ -1,15 +1,27 @@ package com.rusefi.io.tcp; +import com.rusefi.FileLog; import com.rusefi.io.DataListener; import com.rusefi.io.IoStream; +import com.rusefi.io.LinkManager; +import java.io.BufferedInputStream; import java.io.IOException; +import java.io.OutputStream; /** * (c) Andrey Belomutskiy * 5/11/2015. */ public class TcpIoStream implements IoStream { + private final OutputStream os; + private final BufferedInputStream stream; + + public TcpIoStream(OutputStream os, BufferedInputStream stream) { + this.os = os; + this.stream = stream; + } + @Override public void close() { @@ -17,7 +29,8 @@ public class TcpIoStream implements IoStream { @Override public void write(byte[] bytes) throws IOException { - + os.write(bytes); + os.flush(); } @Override @@ -26,7 +39,25 @@ public class TcpIoStream implements IoStream { } @Override - public void addEventListener(DataListener listener) { + public void addEventListener(final DataListener listener) { + LinkManager.IO_EXECUTOR.execute(new Runnable() { + @Override + public void run() { + Thread.currentThread().setName("TCP connector loop"); + FileLog.MAIN.logLine("Running TCP connection loop"); + + byte b[] = new byte[1]; + while (true) { + try { + stream.read(b); + listener.onDataArrived(b); + } catch (IOException e) { + System.err.println("End of connection"); + return; + } + } + } + }); } }