fix race condition in CommandQueue, massively speeding up HW CI tests (#2129)

* fix command semaphore

* guard

* remove todo

* dead field

Co-authored-by: Matthew Kennedy <makenne@microsoft.com>
This commit is contained in:
Matthew Kennedy 2020-12-24 06:23:46 -08:00 committed by GitHub
parent 7e201990d3
commit 48ffc6cc22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 52 additions and 67 deletions

View File

@ -25,12 +25,12 @@ public class EnduranceTestUtility {
for (int i = 0; i < count; i++) {
EcuTestHelper.currentEngineType = Fields.ET_FORD_ASPIRE;
sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + 3, EcuTestHelper.COMPLEX_COMMAND_RETRY, Timeouts.SET_ENGINE_TIMEOUT, commandQueue);
sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + 3, Timeouts.SET_ENGINE_TIMEOUT, commandQueue);
sleepSeconds(2);
sendCommand(getEnableCommand("self_stimulation"), commandQueue);
// IoUtil.changeRpm(1200);
EcuTestHelper.currentEngineType = Fields.ET_DEFAULT_FRANKENSO;
sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + 28, EcuTestHelper.COMPLEX_COMMAND_RETRY, Timeouts.SET_ENGINE_TIMEOUT, commandQueue);
sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + 28, Timeouts.SET_ENGINE_TIMEOUT, commandQueue);
sleepSeconds(2);
FileLog.MAIN.logLine("++++++++++++++++++++++++++++++++++++ " + i + " +++++++++++++++");
}

View File

@ -413,7 +413,7 @@ public class FunctionalTest {
* This method waits for longer then usual.
*/
private void sendComplexCommand(String command) {
ecu.sendCommand(command, EcuTestHelper.COMPLEX_COMMAND_RETRY, Timeouts.CMD_TIMEOUT);
ecu.sendCommand(command, Timeouts.CMD_TIMEOUT);
}
private static void assertWaveNull(EngineChart chart, String key) {

View File

@ -31,7 +31,7 @@ public class IoUtil {
* @throws IllegalStateException if command was not confirmed
*/
static void sendCommand(String command, CommandQueue commandQueue) {
sendCommand(command, CommandQueue.DEFAULT_TIMEOUT, Timeouts.CMD_TIMEOUT, commandQueue);
sendCommand(command, CommandQueue.DEFAULT_TIMEOUT, commandQueue);
}
public static String getEnableCommand(String settingName) {
@ -45,18 +45,18 @@ public class IoUtil {
/**
* blocking method which would for confirmation from rusEfi
*/
public static void sendCommand(String command, int retryTimeoutMs, int timeoutMs, CommandQueue commandQueue) {
public static void sendCommand(String command, int timeoutMs, CommandQueue commandQueue) {
final CountDownLatch responseLatch = new CountDownLatch(1);
long time = System.currentTimeMillis();
log.info("Sending command [" + command + "]");
final long begin = System.currentTimeMillis();
commandQueue.write(command, retryTimeoutMs, () -> {
commandQueue.write(command, timeoutMs, () -> {
responseLatch.countDown();
log.info("Got confirmation in " + (System.currentTimeMillis() - begin) + "ms");
});
wait(responseLatch, timeoutMs);
if (responseLatch.getCount() > 0)
log.info("No confirmation in " + retryTimeoutMs);
log.info("No confirmation in " + timeoutMs);
log.info("Command [" + command + "] executed in " + (System.currentTimeMillis() - time));
}

View File

@ -29,7 +29,6 @@ public class EcuTestHelper {
};
private static final Logging log = getLogging(EcuTestHelper.class);
public static final int COMPLEX_COMMAND_RETRY = 10000;
public static int currentEngineType;
public final CommandQueue commandQueue;
@NotNull
@ -95,12 +94,12 @@ public class EcuTestHelper {
}
public void sendCommand(String command) {
sendCommand(command, CommandQueue.DEFAULT_TIMEOUT, Timeouts.CMD_TIMEOUT);
sendCommand(command, Timeouts.CMD_TIMEOUT);
}
public void sendCommand(String command, int retryTimeoutMs, int timeoutMs) {
public void sendCommand(String command, int timeoutMs) {
TestHelper.INSTANCE.assertNotFatal();
IoUtil.sendCommand(command, retryTimeoutMs, timeoutMs, commandQueue);
IoUtil.sendCommand(command, timeoutMs, commandQueue);
}
/**
@ -131,7 +130,7 @@ public class EcuTestHelper {
// changing engine type while engine is running does not work well - we rightfully
// get invalid configuration critical errors
sleepSeconds(2);
sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + type, COMPLEX_COMMAND_RETRY, Timeouts.SET_ENGINE_TIMEOUT);
sendCommand("set " + Fields.CMD_ENGINE_TYPE + " " + type, Timeouts.SET_ENGINE_TIMEOUT);
// TODO: document the reason for this sleep?!
sleepSeconds(1);
sendCommand(getEnableCommand(Fields.CMD_PWM));

View File

@ -7,7 +7,11 @@ import org.jetbrains.annotations.NotNull;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import static com.devexperts.logging.Logging.getLogging;
@ -26,17 +30,10 @@ public class CommandQueue {
private static final int COMMAND_CONFIRMATION_TIMEOUT = 1000;
public static final int SLOW_CONFIRMATION_TIMEOUT = 5000;
public static final Class<CommandQueue> COMMAND_QUEUE_CLASS = CommandQueue.class;
private final Object lock = new Object();
private final LinkManager linkManager;
/**
* One complex use-case is when we send out a bunch of commands and then we need to handle all the configurations
* correctly
* TODO: TTL for confirmations?
*/
private Set<String> pendingConfirmations = Collections.synchronizedSet(new HashSet<String>());
private final BlockingQueue<IMethodInvocation> pendingCommands = new LinkedBlockingQueue<>();
private final List<CommandQueueListener> commandListeners = new ArrayList<>();
private final List<Consumer<String>> commandListeners = new ArrayList<>();
private final Runnable runnable;
@ -49,7 +46,7 @@ public class CommandQueue {
return isSlowCommand(cmd) ? SLOW_CONFIRMATION_TIMEOUT : COMMAND_CONFIRMATION_TIMEOUT;
}
public void addListener(CommandQueueListener listener) {
public void addListener(Consumer<String> listener) {
commandListeners.add(listener);
}
@ -75,32 +72,33 @@ public class CommandQueue {
int counter = 0;
String command = commandRequest.getCommand();
while (!pendingConfirmations.contains(command)) {
counter++;
// FileLog.MAIN.logLine("templog sending " + command + " " + System.currentTimeMillis() + " " + new Date());
linkManager.send(command, commandRequest.isFireEvent());
long now = System.currentTimeMillis();
synchronized (lock) {
lock.wait(commandRequest.getTimeout());
}
long timeWaited = System.currentTimeMillis() - now;
if (!pendingConfirmations.contains(command) && timeWaited < commandRequest.getTimeout() / 2) {
/**
* there could be a log of un-releated confirmations, we do not need to send out
* a log of the same command
*/
long extraWaitTime = commandRequest.getTimeout() / 2 - timeWaited;
// FileLog.MAIN.logLine("templog extraWaitTime: " + extraWaitTime);
Thread.sleep(extraWaitTime);
}
}
if (pendingConfirmations.contains(command)) {
commandRequest.getListener().onCommandConfirmation();
pendingConfirmations.remove(command);
}
CountDownLatch cl = new CountDownLatch(1);
if (counter != 1)
linkManager.messageListener.postMessage(CommandQueue.class, "Took " + counter + " attempts");
Consumer<String> listener = confirmStr -> {
// only listen to replies for the correct command
if (command.equals(confirmStr)) {
cl.countDown();
} else {
throw new IllegalStateException("Was waiting for confirmation of " + command + " but got confirmation for " + confirmStr);
}
};
commandListeners.add(listener);
// Actually send the command now that we're listening
linkManager.send(command, commandRequest.isFireEvent());
// Wait for a reply
int timeoutMs = commandRequest.getTimeout();
cl.await(timeoutMs, TimeUnit.MILLISECONDS);
commandListeners.remove(listener);
if (cl.getCount() == 0) {
commandRequest.getListener().onCommandConfirmation();
} else {
throw new IllegalStateException("No confirmation received after timeout of " + timeoutMs + " ms");
}
}
public CommandQueue(LinkManager linkManager) {
@ -137,15 +135,12 @@ public class CommandQueue {
String confirmation = LinkManager.unpackConfirmation(message);
if (confirmation == null)
linkManager.messageListener.postMessage(CommandQueue.class, "Broken confirmation length: " + message);
pendingConfirmations.add(confirmation);
// Poke everyone listening for confirmation
this.commandListeners.forEach(f -> f.accept(confirmation));
if (LinkManager.LOG_LEVEL.isDebugEnabled())
linkManager.messageListener.postMessage(CommandQueue.class, "got valid conf! " + confirmation + ", still pending: " + pendingCommands.size());
// FileLog.MAIN.logLine("templog got valid conf " + confirmation + " " + System.currentTimeMillis() + " " + new Date());
synchronized (lock) {
lock.notifyAll();
}
}
public void write(String command) {
@ -173,10 +168,8 @@ public class CommandQueue {
* @param fireEvent true if we want global even about this comment, i.e. recent commands list to know about this command
*/
public void write(String command, int timeoutMs, InvocationConfirmationListener listener, boolean fireEvent) {
if (fireEvent) {
for (CommandQueueListener cql : commandListeners)
cql.onCommand(command);
this.commandListeners.forEach(c -> c.accept(command));
}
pendingCommands.add(new MethodInvocation(command, timeoutMs, listener, fireEvent));
@ -229,9 +222,4 @@ public class CommandQueue {
'}';
}
}
public interface CommandQueueListener {
void onCommand(String command);
}
}

View File

@ -94,13 +94,11 @@ public class RecentCommands {
public RecentCommands(UIContext uiContext) {
this.uiContext = uiContext;
uiContext.getCommandQueue().addListener(new CommandQueue.CommandQueueListener() {
@Override
public void onCommand(String command) {
uiContext.getCommandQueue().addListener(command -> {
if (!reentrant.get())
add(command);
}
});
);
String value = getConfig().getRoot().getProperty(KEY, null);