refactoring
This commit is contained in:
parent
1ab645af78
commit
33ec84c120
|
@ -167,6 +167,9 @@ public class BinaryProtocol implements BinaryProtocolCommands {
|
||||||
return logger;
|
return logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* the whole dynamic 'switch to binary protocol' still does not work great
|
||||||
|
*/
|
||||||
public void switchToBinaryProtocol() {
|
public void switchToBinaryProtocol() {
|
||||||
// we do not have reliable implementation yet :(
|
// we do not have reliable implementation yet :(
|
||||||
for (int i = 0; i < 15; i++)
|
for (int i = 0; i < 15; i++)
|
||||||
|
@ -185,7 +188,7 @@ public class BinaryProtocol implements BinaryProtocolCommands {
|
||||||
stream.write((SWITCH_TO_BINARY_COMMAND + "\n").getBytes());
|
stream.write((SWITCH_TO_BINARY_COMMAND + "\n").getBytes());
|
||||||
// todo: document why is ioLock needed here?
|
// todo: document why is ioLock needed here?
|
||||||
synchronized (ioLock) {
|
synchronized (ioLock) {
|
||||||
boolean isTimeout = incomingData.waitForBytes(2, start, "switch to binary");
|
boolean isTimeout = incomingData.waitForBytes("switch to binary", start, 2);
|
||||||
if (isTimeout) {
|
if (isTimeout) {
|
||||||
logger.info(new Date() + ": Timeout waiting for switch response");
|
logger.info(new Date() + ": Timeout waiting for switch response");
|
||||||
close();
|
close();
|
||||||
|
@ -246,7 +249,7 @@ public class BinaryProtocol implements BinaryProtocolCommands {
|
||||||
private byte[] receivePacket(String msg, boolean allowLongResponse) throws InterruptedException, EOFException {
|
private byte[] receivePacket(String msg, boolean allowLongResponse) throws InterruptedException, EOFException {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
synchronized (ioLock) {
|
synchronized (ioLock) {
|
||||||
boolean isTimeout = incomingData.waitForBytes(2, start, msg + " header");
|
boolean isTimeout = incomingData.waitForBytes(msg + " header", start, 2);
|
||||||
if (isTimeout)
|
if (isTimeout)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
|
@ -257,7 +260,7 @@ public class BinaryProtocol implements BinaryProtocolCommands {
|
||||||
if (!allowLongResponse && packetSize > Math.max(BLOCKING_FACTOR, Fields.TS_OUTPUT_SIZE) + 10)
|
if (!allowLongResponse && packetSize > Math.max(BLOCKING_FACTOR, Fields.TS_OUTPUT_SIZE) + 10)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
isTimeout = incomingData.waitForBytes(packetSize + 4, start, msg + " body");
|
isTimeout = incomingData.waitForBytes(msg + " body", start, packetSize + 4);
|
||||||
if (isTimeout)
|
if (isTimeout)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,8 @@ import java.io.EOFException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Thread-safe byte queue with blocking {@link #waitForBytes} method
|
||||||
|
*
|
||||||
* (c) Andrey Belomutskiy
|
* (c) Andrey Belomutskiy
|
||||||
* 6/20/2015.
|
* 6/20/2015.
|
||||||
*/
|
*/
|
||||||
|
@ -39,13 +41,15 @@ public class IncomingDataBuffer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Blocking method which would for specified amout of data
|
||||||
|
*
|
||||||
* @return true in case of timeout, false if everything is fine
|
* @return true in case of timeout, false if everything is fine
|
||||||
*/
|
*/
|
||||||
public boolean waitForBytes(int count, long start, String msg) throws InterruptedException {
|
public boolean waitForBytes(String msg, long startTimestamp, int count) throws InterruptedException {
|
||||||
logger.info("Waiting for " + count + " byte(s): " + msg);
|
logger.info("Waiting for " + count + " byte(s): " + msg);
|
||||||
synchronized (cbb) {
|
synchronized (cbb) {
|
||||||
while (cbb.length() < count) {
|
while (cbb.length() < count) {
|
||||||
int timeout = (int) (start + Timeouts.BINARY_IO_TIMEOUT - System.currentTimeMillis());
|
int timeout = (int) (startTimestamp + Timeouts.BINARY_IO_TIMEOUT - System.currentTimeMillis());
|
||||||
if (timeout <= 0) {
|
if (timeout <= 0) {
|
||||||
return true; // timeout. Sad face.
|
return true; // timeout. Sad face.
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,16 +27,6 @@ import java.nio.BufferOverflowException;
|
||||||
*/
|
*/
|
||||||
@NotThreadSafe
|
@NotThreadSafe
|
||||||
public class CircularByteBuffer extends ByteBuffer {
|
public class CircularByteBuffer extends ByteBuffer {
|
||||||
/**
|
|
||||||
* Constructs the CircularByteBuffer.
|
|
||||||
*
|
|
||||||
* @param size
|
|
||||||
*/
|
|
||||||
public CircularByteBuffer(int size) {
|
|
||||||
this.size = size;
|
|
||||||
buf = new byte[size];
|
|
||||||
}
|
|
||||||
|
|
||||||
private final int size;
|
private final int size;
|
||||||
|
|
||||||
private final byte[] buf;
|
private final byte[] buf;
|
||||||
|
@ -47,6 +37,23 @@ public class CircularByteBuffer extends ByteBuffer {
|
||||||
|
|
||||||
private int nextPut;
|
private int nextPut;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs the CircularByteBuffer.
|
||||||
|
*
|
||||||
|
* @param size
|
||||||
|
*/
|
||||||
|
public CircularByteBuffer(int size) {
|
||||||
|
this.size = size;
|
||||||
|
buf = new byte[size];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
length = 0;
|
||||||
|
nextGet = 0;
|
||||||
|
nextPut = 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int size() {
|
public int size() {
|
||||||
return size;
|
return size;
|
||||||
|
@ -57,13 +64,6 @@ public class CircularByteBuffer extends ByteBuffer {
|
||||||
return length;
|
return length;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void clear() {
|
|
||||||
length = 0;
|
|
||||||
nextGet = 0;
|
|
||||||
nextPut = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte get() throws EOFException {
|
public byte get() throws EOFException {
|
||||||
if (isEmpty())
|
if (isEmpty())
|
||||||
|
|
Loading…
Reference in New Issue