proxy progress - WOW TDD totally works

This commit is contained in:
rusefi 2020-07-22 19:54:45 -04:00
parent 36e5288ef6
commit 2de7d590f5
6 changed files with 104 additions and 28 deletions

View File

@ -44,7 +44,7 @@ public class FullServerTest {
CountDownLatch applicationClosed = new CountDownLatch(1);
UserDetailsResolver userDetailsResolver = authToken -> new UserDetails(authToken.substring(0, 5), userId);
int httpPort = 8003;
int httpPort = 8103;
int applicationTimeout = 7 * SECOND;
try (Backend backend = new Backend(userDetailsResolver, httpPort, logger, applicationTimeout) {
@Override

View File

@ -18,7 +18,7 @@ public class ApplicationConnectionState {
if (clientStream.getStreamStats().getPreviousPacketArrivalTime() == 0)
throw new IllegalStateException("Invalid state - no packets on " + this);
if (!state.isUsed())
if (!state.getTwoKindSemaphore().isUsed())
throw new IllegalArgumentException("state is supposed to be used by us");
}
@ -35,7 +35,7 @@ public class ApplicationConnectionState {
try {
clientStream.close();
} finally {
state.release();
state.getTwoKindSemaphore().releaseFromLongTermUsage();
}
}

View File

@ -28,6 +28,8 @@ import java.net.BindException;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import static com.rusefi.Timeouts.SECOND;
/**
* See NetworkConnectorStartup
*/
@ -107,10 +109,31 @@ public class Backend implements Closeable {
new Thread(() -> {
while (true) {
runCleanup();
runApplicationConnectionsCleanup();
BinaryProtocol.sleep(applicationTimeout);
}
}, "rusEFI Application connections Cleanup").start();
new Thread(() -> {
while (true) {
grabOutputs();
BinaryProtocol.sleep(SECOND);
}
}, "rusEFI gauge poker").start();
}
private void grabOutputs() {
List<ControllerConnectionState> controllers = getControllers();
for (ControllerConnectionState controller : controllers) {
if (System.currentTimeMillis() - controller.getStream().getStreamStats().getPreviousPacketArrivalTime() > 20 * SECOND) {
if (controller.getTwoKindSemaphore().acquireForShortTermUsage()) {
try {
} finally {
controller.getTwoKindSemaphore().releaseFromShortTermUsage();
}
}
}
}
}
public void runApplicationConnector(int serverPortForApplications, Listener<?> serverSocketCreationCallback) {
@ -172,7 +195,7 @@ public class Backend implements Closeable {
// no such controller
return null;
}
if (!state.acquire()) {
if (!state.getTwoKindSemaphore().acquireForLongTermUsage()) {
// someone is already talking to this controller
return null;
}
@ -237,7 +260,7 @@ public class Backend implements Closeable {
JsonObject controllerObject = Json.createObjectBuilder()
.add(UserDetails.USER_ID, client.getUserDetails().getUserId())
.add(UserDetails.USERNAME, client.getUserDetails().getUserName())
.add(IS_USED, client.isUsed())
.add(IS_USED, client.getTwoKindSemaphore().isUsed())
.add(ControllerInfo.SIGNATURE, client.getSessionDetails().getControllerInfo().getSignature())
.add(ControllerInfo.VEHICLE_NAME, client.getSessionDetails().getControllerInfo().getVehicleName())
.add(ControllerInfo.ENGINE_MAKE, client.getSessionDetails().getControllerInfo().getEngineMake())
@ -257,7 +280,7 @@ public class Backend implements Closeable {
* we do not push anything into connected applications so we have to run a clean-up loop
* that's different from controllers since we periodically pull outputs from controllers which allows us to detect disconnects
*/
private void runCleanup() {
private void runApplicationConnectionsCleanup() {
List<ApplicationConnectionState> inactiveApplications = new ArrayList<>();
synchronized (lock) {

View File

@ -8,10 +8,11 @@ import com.rusefi.io.commands.GetOutputsCommand;
import com.rusefi.io.commands.HelloCommand;
import com.rusefi.io.tcp.TcpIoStream;
import com.rusefi.shared.FileUtil;
import net.jcip.annotations.GuardedBy;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class ControllerConnectionState {
private final Socket clientSocket;
@ -30,8 +31,8 @@ public class ControllerConnectionState {
*/
private UserDetails userDetails;
private ControllerKey controllerKey;
@GuardedBy("this")
private boolean isUsed;
private final TwoKindSemaphore twoKindSemaphore = new TwoKindSemaphore();
public ControllerConnectionState(Socket clientSocket, Logger logger, UserDetailsResolver userDetailsResolver) {
this.clientSocket = clientSocket;
@ -104,23 +105,7 @@ public class ControllerConnectionState {
throw new IOException("getOutputs: No response");
}
/**
* @return true if acquired successfully, false if not
*/
public synchronized boolean acquire() {
if (isUsed)
return false;
isUsed = true;
return true;
}
public synchronized boolean isUsed() {
return isUsed;
}
public synchronized void release() {
if (!isUsed)
throw new IllegalStateException("Not acquired by anyone");
isUsed = false;
public TwoKindSemaphore getTwoKindSemaphore() {
return twoKindSemaphore;
}
}

View File

@ -0,0 +1,43 @@
package com.rusefi.server;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class TwoKindSemaphore {
// we have to distinguish between long-term usage by application and short-term usage just to refresh gauges
private final static int LONG_TERM = 2;
private final static int SHORT_TERM = 1;
private final Semaphore semaphore = new Semaphore(LONG_TERM);
public void releaseFromLongTermUsage() {
semaphore.release(LONG_TERM);
}
public boolean acquireForShortTermUsage() {
return semaphore.tryAcquire(SHORT_TERM);
}
public void releaseFromShortTermUsage() {
semaphore.release(SHORT_TERM);
}
public boolean isUsed() {
// short-term usages of only one permit does not count
return semaphore.availablePermits() == 0;
}
/**
* @return true if acquired successfully, false if not
*/
public boolean acquireForLongTermUsage() {
return acquireForLongTermUsage(10);
}
public boolean acquireForLongTermUsage(int timeout) {
try {
return semaphore.tryAcquire(LONG_TERM, timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}

View File

@ -0,0 +1,25 @@
package com.rusefi.server;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TwoKindSemaphoreTest {
@Test
public void testTwoKindSemaphore() {
TwoKindSemaphore twoKindSemaphore = new TwoKindSemaphore();
assertFalse(twoKindSemaphore.isUsed());
assertTrue(twoKindSemaphore.acquireForShortTermUsage());
assertFalse(twoKindSemaphore.isUsed());
twoKindSemaphore.releaseFromShortTermUsage();
assertTrue(twoKindSemaphore.acquireForLongTermUsage());
assertTrue(twoKindSemaphore.isUsed());
//
assertFalse(twoKindSemaphore.acquireForLongTermUsage(1));
assertFalse(twoKindSemaphore.acquireForShortTermUsage());
}
}