From 48ffc6cc223b742cf59b8e27a14cec74c193472c Mon Sep 17 00:00:00 2001 From: Matthew Kennedy Date: Thu, 24 Dec 2020 06:23:46 -0800 Subject: [PATCH] fix race condition in CommandQueue, massively speeding up HW CI tests (#2129) * fix command semaphore * guard * remove todo * dead field Co-authored-by: Matthew Kennedy --- .../src/com/rusefi/EnduranceTestUtility.java | 4 +- .../src/com/rusefi/FunctionalTest.java | 2 +- .../autotest/src/com/rusefi/IoUtil.java | 8 +- .../functional_tests/EcuTestHelper.java | 9 +- .../main/java/com/rusefi/io/CommandQueue.java | 90 ++++++++----------- .../java/com/rusefi/ui/RecentCommands.java | 6 +- 6 files changed, 52 insertions(+), 67 deletions(-) diff --git a/java_console/autotest/src/com/rusefi/EnduranceTestUtility.java b/java_console/autotest/src/com/rusefi/EnduranceTestUtility.java index 48309793ea..2e0e47cdee 100644 --- a/java_console/autotest/src/com/rusefi/EnduranceTestUtility.java +++ b/java_console/autotest/src/com/rusefi/EnduranceTestUtility.java @@ -25,12 +25,12 @@ public class EnduranceTestUtility { for (int i = 0; i < count; i++) { EcuTestHelper.currentEngineType = Fields.ET_FORD_ASPIRE; - sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + 3, EcuTestHelper.COMPLEX_COMMAND_RETRY, Timeouts.SET_ENGINE_TIMEOUT, commandQueue); + sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + 3, Timeouts.SET_ENGINE_TIMEOUT, commandQueue); sleepSeconds(2); sendCommand(getEnableCommand("self_stimulation"), commandQueue); // IoUtil.changeRpm(1200); EcuTestHelper.currentEngineType = Fields.ET_DEFAULT_FRANKENSO; - sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + 28, EcuTestHelper.COMPLEX_COMMAND_RETRY, Timeouts.SET_ENGINE_TIMEOUT, commandQueue); + sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + 28, Timeouts.SET_ENGINE_TIMEOUT, commandQueue); sleepSeconds(2); FileLog.MAIN.logLine("++++++++++++++++++++++++++++++++++++ " + i + " +++++++++++++++"); } diff --git a/java_console/autotest/src/com/rusefi/FunctionalTest.java b/java_console/autotest/src/com/rusefi/FunctionalTest.java index 796d00f96c..a89d1e609d 100644 --- a/java_console/autotest/src/com/rusefi/FunctionalTest.java +++ b/java_console/autotest/src/com/rusefi/FunctionalTest.java @@ -413,7 +413,7 @@ public class FunctionalTest { * This method waits for longer then usual. */ private void sendComplexCommand(String command) { - ecu.sendCommand(command, EcuTestHelper.COMPLEX_COMMAND_RETRY, Timeouts.CMD_TIMEOUT); + ecu.sendCommand(command, Timeouts.CMD_TIMEOUT); } private static void assertWaveNull(EngineChart chart, String key) { diff --git a/java_console/autotest/src/com/rusefi/IoUtil.java b/java_console/autotest/src/com/rusefi/IoUtil.java index 410a6748eb..7521ea89bf 100644 --- a/java_console/autotest/src/com/rusefi/IoUtil.java +++ b/java_console/autotest/src/com/rusefi/IoUtil.java @@ -31,7 +31,7 @@ public class IoUtil { * @throws IllegalStateException if command was not confirmed */ static void sendCommand(String command, CommandQueue commandQueue) { - sendCommand(command, CommandQueue.DEFAULT_TIMEOUT, Timeouts.CMD_TIMEOUT, commandQueue); + sendCommand(command, CommandQueue.DEFAULT_TIMEOUT, commandQueue); } public static String getEnableCommand(String settingName) { @@ -45,18 +45,18 @@ public class IoUtil { /** * blocking method which would for confirmation from rusEfi */ - public static void sendCommand(String command, int retryTimeoutMs, int timeoutMs, CommandQueue commandQueue) { + public static void sendCommand(String command, int timeoutMs, CommandQueue commandQueue) { final CountDownLatch responseLatch = new CountDownLatch(1); long time = System.currentTimeMillis(); log.info("Sending command [" + command + "]"); final long begin = System.currentTimeMillis(); - commandQueue.write(command, retryTimeoutMs, () -> { + commandQueue.write(command, timeoutMs, () -> { responseLatch.countDown(); log.info("Got confirmation in " + (System.currentTimeMillis() - begin) + "ms"); }); wait(responseLatch, timeoutMs); if (responseLatch.getCount() > 0) - log.info("No confirmation in " + retryTimeoutMs); + log.info("No confirmation in " + timeoutMs); log.info("Command [" + command + "] executed in " + (System.currentTimeMillis() - time)); } diff --git a/java_console/autotest/src/com/rusefi/functional_tests/EcuTestHelper.java b/java_console/autotest/src/com/rusefi/functional_tests/EcuTestHelper.java index 4e53a707b1..63ea9064a8 100644 --- a/java_console/autotest/src/com/rusefi/functional_tests/EcuTestHelper.java +++ b/java_console/autotest/src/com/rusefi/functional_tests/EcuTestHelper.java @@ -29,7 +29,6 @@ public class EcuTestHelper { }; private static final Logging log = getLogging(EcuTestHelper.class); - public static final int COMPLEX_COMMAND_RETRY = 10000; public static int currentEngineType; public final CommandQueue commandQueue; @NotNull @@ -95,12 +94,12 @@ public class EcuTestHelper { } public void sendCommand(String command) { - sendCommand(command, CommandQueue.DEFAULT_TIMEOUT, Timeouts.CMD_TIMEOUT); + sendCommand(command, Timeouts.CMD_TIMEOUT); } - public void sendCommand(String command, int retryTimeoutMs, int timeoutMs) { + public void sendCommand(String command, int timeoutMs) { TestHelper.INSTANCE.assertNotFatal(); - IoUtil.sendCommand(command, retryTimeoutMs, timeoutMs, commandQueue); + IoUtil.sendCommand(command, timeoutMs, commandQueue); } /** @@ -131,7 +130,7 @@ public class EcuTestHelper { // changing engine type while engine is running does not work well - we rightfully // get invalid configuration critical errors sleepSeconds(2); - sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + type, COMPLEX_COMMAND_RETRY, Timeouts.SET_ENGINE_TIMEOUT); + sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + type, Timeouts.SET_ENGINE_TIMEOUT); // TODO: document the reason for this sleep?! sleepSeconds(1); sendCommand(getEnableCommand(Fields.CMD_PWM)); diff --git a/java_console/io/src/main/java/com/rusefi/io/CommandQueue.java b/java_console/io/src/main/java/com/rusefi/io/CommandQueue.java index 86e3a6ec62..df8ee03adc 100644 --- a/java_console/io/src/main/java/com/rusefi/io/CommandQueue.java +++ b/java_console/io/src/main/java/com/rusefi/io/CommandQueue.java @@ -7,7 +7,11 @@ import org.jetbrains.annotations.NotNull; import java.util.*; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; import static com.devexperts.logging.Logging.getLogging; @@ -26,17 +30,10 @@ public class CommandQueue { private static final int COMMAND_CONFIRMATION_TIMEOUT = 1000; public static final int SLOW_CONFIRMATION_TIMEOUT = 5000; public static final Class COMMAND_QUEUE_CLASS = CommandQueue.class; - private final Object lock = new Object(); private final LinkManager linkManager; - /** - * One complex use-case is when we send out a bunch of commands and then we need to handle all the configurations - * correctly - * TODO: TTL for confirmations? - */ - private Set pendingConfirmations = Collections.synchronizedSet(new HashSet()); private final BlockingQueue pendingCommands = new LinkedBlockingQueue<>(); - private final List commandListeners = new ArrayList<>(); + private final List> commandListeners = new ArrayList<>(); private final Runnable runnable; @@ -49,7 +46,7 @@ public class CommandQueue { return isSlowCommand(cmd) ? SLOW_CONFIRMATION_TIMEOUT : COMMAND_CONFIRMATION_TIMEOUT; } - public void addListener(CommandQueueListener listener) { + public void addListener(Consumer listener) { commandListeners.add(listener); } @@ -75,32 +72,33 @@ public class CommandQueue { int counter = 0; String command = commandRequest.getCommand(); - while (!pendingConfirmations.contains(command)) { - counter++; -// FileLog.MAIN.logLine("templog sending " + command + " " + System.currentTimeMillis() + " " + new Date()); - linkManager.send(command, commandRequest.isFireEvent()); - long now = System.currentTimeMillis(); - synchronized (lock) { - lock.wait(commandRequest.getTimeout()); - } - long timeWaited = System.currentTimeMillis() - now; - if (!pendingConfirmations.contains(command) && timeWaited < commandRequest.getTimeout() / 2) { - /** - * there could be a log of un-releated confirmations, we do not need to send out - * a log of the same command - */ - long extraWaitTime = commandRequest.getTimeout() / 2 - timeWaited; -// FileLog.MAIN.logLine("templog extraWaitTime: " + extraWaitTime); - Thread.sleep(extraWaitTime); - } - } - if (pendingConfirmations.contains(command)) { - commandRequest.getListener().onCommandConfirmation(); - pendingConfirmations.remove(command); - } + CountDownLatch cl = new CountDownLatch(1); - if (counter != 1) - linkManager.messageListener.postMessage(CommandQueue.class, "Took " + counter + " attempts"); + Consumer listener = confirmStr -> { + // only listen to replies for the correct command + if (command.equals(confirmStr)) { + cl.countDown(); + } else { + throw new IllegalStateException("Was waiting for confirmation of " + command + " but got confirmation for " + confirmStr); + } + }; + + commandListeners.add(listener); + + // Actually send the command now that we're listening + linkManager.send(command, commandRequest.isFireEvent()); + + // Wait for a reply + int timeoutMs = commandRequest.getTimeout(); + cl.await(timeoutMs, TimeUnit.MILLISECONDS); + + commandListeners.remove(listener); + + if (cl.getCount() == 0) { + commandRequest.getListener().onCommandConfirmation(); + } else { + throw new IllegalStateException("No confirmation received after timeout of " + timeoutMs + " ms"); + } } public CommandQueue(LinkManager linkManager) { @@ -137,15 +135,12 @@ public class CommandQueue { String confirmation = LinkManager.unpackConfirmation(message); if (confirmation == null) linkManager.messageListener.postMessage(CommandQueue.class, "Broken confirmation length: " + message); - pendingConfirmations.add(confirmation); + + // Poke everyone listening for confirmation + this.commandListeners.forEach(f -> f.accept(confirmation)); + if (LinkManager.LOG_LEVEL.isDebugEnabled()) linkManager.messageListener.postMessage(CommandQueue.class, "got valid conf! " + confirmation + ", still pending: " + pendingCommands.size()); - -// FileLog.MAIN.logLine("templog got valid conf " + confirmation + " " + System.currentTimeMillis() + " " + new Date()); - - synchronized (lock) { - lock.notifyAll(); - } } public void write(String command) { @@ -173,11 +168,9 @@ public class CommandQueue { * @param fireEvent true if we want global even about this comment, i.e. recent commands list to know about this command */ public void write(String command, int timeoutMs, InvocationConfirmationListener listener, boolean fireEvent) { - - if (fireEvent) { - for (CommandQueueListener cql : commandListeners) - cql.onCommand(command); - } + if (fireEvent) { + this.commandListeners.forEach(c -> c.accept(command)); + } pendingCommands.add(new MethodInvocation(command, timeoutMs, listener, fireEvent)); } @@ -229,9 +222,4 @@ public class CommandQueue { '}'; } } - - public interface CommandQueueListener { - void onCommand(String command); - } - } diff --git a/java_console/ui/src/main/java/com/rusefi/ui/RecentCommands.java b/java_console/ui/src/main/java/com/rusefi/ui/RecentCommands.java index a697a97d03..a49bb79159 100644 --- a/java_console/ui/src/main/java/com/rusefi/ui/RecentCommands.java +++ b/java_console/ui/src/main/java/com/rusefi/ui/RecentCommands.java @@ -94,13 +94,11 @@ public class RecentCommands { public RecentCommands(UIContext uiContext) { this.uiContext = uiContext; - uiContext.getCommandQueue().addListener(new CommandQueue.CommandQueueListener() { - @Override - public void onCommand(String command) { + uiContext.getCommandQueue().addListener(command -> { if (!reentrant.get()) add(command); } - }); + ); String value = getConfig().getRoot().getProperty(KEY, null);