shutdown improvements

This commit is contained in:
rusefi 2020-07-25 15:27:40 -04:00
parent 617e8f773f
commit 923a289166
5 changed files with 35 additions and 19 deletions

View File

@ -9,15 +9,23 @@ import java.util.concurrent.atomic.AtomicInteger;
public class NamedThreadFactory implements ThreadFactory { public class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger counter = new AtomicInteger(); private final AtomicInteger counter = new AtomicInteger();
private String name; private String name;
private final boolean isDaemon;
public NamedThreadFactory(String name) { public NamedThreadFactory(String name) {
this.name = name; this(name, false);
} }
public NamedThreadFactory(String name, boolean isDaemon) {
this.name = name;
this.isDaemon = isDaemon;
}
@Override @Override
public Thread newThread(@NotNull Runnable r) { public Thread newThread(@NotNull Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r); Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName(name + counter.incrementAndGet()); t.setName(name + counter.incrementAndGet());
t.setDaemon(isDaemon);
return t; return t;
} }
} }

View File

@ -1,10 +1,9 @@
package com.rusefi.io; package com.rusefi.io;
import com.devexperts.logging.Logging; import com.devexperts.logging.Logging;
import com.opensr5.Logger;
import com.opensr5.io.DataListener; import com.opensr5.io.DataListener;
import com.rusefi.NamedThreadFactory;
import com.rusefi.config.generated.Fields; import com.rusefi.config.generated.Fields;
import com.rusefi.io.tcp.BinaryProtocolServer;
import com.rusefi.io.tcp.TcpIoStream; import com.rusefi.io.tcp.TcpIoStream;
import java.io.IOException; import java.io.IOException;
@ -15,26 +14,24 @@ import java.util.concurrent.Executors;
import static com.devexperts.logging.Logging.getLogging; import static com.devexperts.logging.Logging.getLogging;
public interface ByteReader { public interface ByteReader {
NamedThreadFactory THREAD_FACTORY = new NamedThreadFactory("TCP connector loop", true);
Logging log = getLogging(ByteReader.class); Logging log = getLogging(ByteReader.class);
static void runReaderLoop(String loggingPrefix, DataListener listener, ByteReader reader, TcpIoStream.DisconnectListener disconnectListener) { static void runReaderLoop(String loggingPrefix, DataListener listener, ByteReader reader, TcpIoStream.DisconnectListener disconnectListener, TcpIoStream tcpIoStream) {
/** /**
* Threading of the whole input/output does not look healthy at all! * Threading of the whole input/output does not look healthy at all!
* *
* @see #COMMUNICATION_EXECUTOR * @see #COMMUNICATION_EXECUTOR
*/ */
Executor threadExecutor = Executors.newSingleThreadExecutor(r -> { Executor threadExecutor = Executors.newSingleThreadExecutor(THREAD_FACTORY);
Thread t = new Thread(r, "IO executor thread");
t.setDaemon(true); // need daemon thread so that COM thread is also daemon
return t;
});
threadExecutor.execute(() -> { threadExecutor.execute(() -> {
Thread.currentThread().setName("TCP connector loop");
log.info(loggingPrefix + "Running TCP connection loop"); log.info(loggingPrefix + "Running TCP connection loop");
byte inputBuffer[] = new byte[Fields.BLOCKING_FACTOR * 2]; byte inputBuffer[] = new byte[Fields.BLOCKING_FACTOR * 2];
while (true) { while (!tcpIoStream.isClosed()) {
try { try {
int result = reader.read(inputBuffer); int result = reader.read(inputBuffer);
if (result == -1) if (result == -1)

View File

@ -78,7 +78,7 @@ public class TcpIoStream extends AbstractIoStream {
@Override @Override
public void setInputListener(final DataListener listener) { public void setInputListener(final DataListener listener) {
ByteReader.runReaderLoop(loggingPrefix, listener, input::read, disconnectListener); ByteReader.runReaderLoop(loggingPrefix, listener, input::read, disconnectListener, this);
} }
public interface DisconnectListener { public interface DisconnectListener {

View File

@ -2,6 +2,7 @@ package com.rusefi.server;
import com.devexperts.logging.Logging; import com.devexperts.logging.Logging;
import com.rusefi.Listener; import com.rusefi.Listener;
import com.rusefi.NamedThreadFactory;
import com.rusefi.Timeouts; import com.rusefi.Timeouts;
import com.rusefi.binaryprotocol.BinaryProtocol; import com.rusefi.binaryprotocol.BinaryProtocol;
import com.rusefi.core.Sensor; import com.rusefi.core.Sensor;
@ -28,6 +29,7 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.BindException; import java.net.BindException;
import java.util.*; import java.util.*;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static com.devexperts.logging.Logging.getLogging; import static com.devexperts.logging.Logging.getLogging;
@ -53,6 +55,8 @@ public class Backend implements Closeable {
*/ */
private static final int APPLICATION_INACTIVITY_TIMEOUT = 3 * Timeouts.MINUTE; private static final int APPLICATION_INACTIVITY_TIMEOUT = 3 * Timeouts.MINUTE;
static final String AGE = "age"; static final String AGE = "age";
private static final ThreadFactory APPLLICATION_CONNECTION_CLEANUP = new NamedThreadFactory("rusEFI Application connections Cleanup");
private static final ThreadFactory GAUGE_POKER = new NamedThreadFactory("rusEFI gauge poker");
private final FkRegex showOnlineControllers = new FkRegex(ProxyClient.LIST_CONTROLLERS_PATH, private final FkRegex showOnlineControllers = new FkRegex(ProxyClient.LIST_CONTROLLERS_PATH,
(Take) req -> getControllersOnline() (Take) req -> getControllersOnline()
@ -109,7 +113,7 @@ public class Backend implements Closeable {
"</body></html>\n")) "</body></html>\n"))
); );
Front frontEnd = new FtBasic(new BkParallel(new BkSafe(new BkBasic(forkTake)), 4), httpPort); Front frontEnd = new FtBasic(new BkParallel(new BkSafe(new BkBasic(forkTake)), 4), httpPort);
frontEnd.start(() -> isClosed); frontEnd.start(() -> isClosed());
} catch (BindException e) { } catch (BindException e) {
throw new IllegalStateException("While binding " + httpPort, e); throw new IllegalStateException("While binding " + httpPort, e);
} }
@ -120,20 +124,20 @@ public class Backend implements Closeable {
}, "Http Server Thread").start(); }, "Http Server Thread").start();
new Thread(() -> { APPLLICATION_CONNECTION_CLEANUP.newThread(() -> {
while (true) { while (!isClosed()) {
log.info(getApplicationsCount() + " applications, " + getControllersCount() + " controllers"); log.info(getApplicationsCount() + " applications, " + getControllersCount() + " controllers");
runApplicationConnectionsCleanup(); runApplicationConnectionsCleanup();
BinaryProtocol.sleep(applicationTimeout); BinaryProtocol.sleep(applicationTimeout);
} }
}, "rusEFI Application connections Cleanup").start(); }).start();
new Thread(() -> { GAUGE_POKER.newThread(() -> {
while (true) { while (!isClosed()) {
grabOutputs(); grabOutputs();
BinaryProtocol.sleep(SECOND); BinaryProtocol.sleep(SECOND);
} }
}, "rusEFI gauge poker").start(); }).start();
} }
private void grabOutputs() { private void grabOutputs() {

View File

@ -1,10 +1,17 @@
package com.rusefi.ts_plugin; package com.rusefi.ts_plugin;
import com.rusefi.proxy.NetworkConnector;
import javax.swing.*; import javax.swing.*;
public class BroadcastTab { public class BroadcastTab {
private final JComponent content = new JPanel(); private final JComponent content = new JPanel();
public BroadcastTab() {
// NetworkConnector
}
public JComponent getContent() { public JComponent getContent() {
return content; return content;
} }