Skip to content

Commit

Permalink
Merge pull request #8 from manan164/priority_fix
Browse files Browse the repository at this point in the history
Remove caching of messages.
  • Loading branch information
v1r3n authored Apr 25, 2023
2 parents 57cecac + 9a827e6 commit ced757d
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public interface ConductorQueue {

String getShardName();

int getPollCount();

default double getScore(long now, QueueMessage msg) {
double score = 0;
if (msg.getTimeout() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public abstract class QueueMonitor {

private final ExecutorService executorService;

private final AtomicInteger pollCount = new AtomicInteger(0);
private final String queueName;

private int queueUnackTime = 30_000;
Expand All @@ -54,17 +53,17 @@ public QueueMonitor(String queueName) {
}

public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {

if (count <= 0) {
log.warn("Negative poll count {}");
// Negative number shouldn't happen, but it can be zero and in that case we don't do
// anything!
return new ArrayList<>();
}
List<QueueMessage> messages = new ArrayList<>();
int pendingCount = pollCount.getAndSet(Math.max(pollCount.get(), count));
if (peekedMessages.isEmpty()) {
__peekedMessages();
} else if (peekedMessages.size() < pendingCount) {
try {
executorService.submit(() -> __peekedMessages());
} catch (RejectedExecutionException rejectedExecutionException) {
}
if (count > maxPollCount) {
count = maxPollCount;
}
__peekedMessages(count);

long now = clock.millis();
boolean waited = false;
Expand All @@ -85,7 +84,6 @@ public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {
}
if (now > message.getExpiry()) {
peekedMessages.clear();
pollCount.addAndGet(count);
return new ArrayList<>();
}
messages.add(message);
Expand All @@ -108,20 +106,9 @@ public void setQueueUnackTime(int queueUnackTime) {

protected abstract long queueSize();

private synchronized void __peekedMessages() {
private synchronized void __peekedMessages(int count) {
try {

int count = Math.min(maxPollCount, pollCount.get());
if (count <= 0) {
if (count < 0) {
log.warn("Negative poll count {}", pollCount.get());
pollCount.set(0);
}
// Negative number shouldn't happen, but it can be zero and in that case we don't do
// anything!
return;
}

log.trace("Polling {} messages from {} with size {}", count, queueName, size);

double now = Long.valueOf(clock.millis() + 1).doubleValue();
Expand All @@ -146,13 +133,8 @@ private synchronized void __peekedMessages() {
message.setExpiry(messageExpiry);
peekedMessages.add(message);
}
pollCount.addAndGet(-1 * (response.size() / 2));
} catch (Throwable t) {
log.warn(t.getMessage(), t);
}
}

public int getPollCount() {
return pollCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ public String getShardName() {
return null;
}

@Override
public int getPollCount() {
return queueMonitor.getPollCount();
}

private List<String> getPayloads(String[] messageIds) {
List<String> payloads = jedis.hmget(payloadKey, messageIds);
return payloads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ public String getShardName() {
return null;
}

@Override
public int getPollCount() {
return queueMonitor.getPollCount();
}

private List<String> getPayloads(String[] messageIds) {
try (Jedis jedis = jedisPool.getResource()) {
List<String> payloads = jedis.hmget(payloadKey, messageIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,29 +299,6 @@ public void testClearQueues() {
redisQueue.flush();
assertEquals(0, redisQueue.size());
}
@Test
public void testPollCountOrdering() {
ConductorRedisQueue redisQueue = new ConductorRedisQueue("test",jedisPool);

redisQueue.push(Arrays.asList(new QueueMessage(String.valueOf(1), "1", 0,1)));
redisQueue.push(Arrays.asList(new QueueMessage(String.valueOf(2), "2", 0,1)));

redisQueue.pop(1, 100, TimeUnit.MILLISECONDS);
redisQueue.pop(1, 100, TimeUnit.MILLISECONDS);
//Pop 10 times
for(int i=0;i<10;i++) {
redisQueue.pop(1, 100, TimeUnit.MILLISECONDS);
}
// Pollcount should not change
assertEquals(1, redisQueue.getPollCount());

redisQueue.push(Arrays.asList(new QueueMessage(String.valueOf(3), "3", 0,1)));
redisQueue.push(Arrays.asList(new QueueMessage(String.valueOf(4), "4", 0,1)));
assertEquals(1, redisQueue.getPollCount());

redisQueue.pop(2, 100, TimeUnit.MILLISECONDS);
assertEquals(0, redisQueue.getPollCount());
}

@Test
public void testPriority() {
Expand Down Expand Up @@ -350,6 +327,27 @@ public void testPriority() {
}
}

@Test
public void testStrictPriority() {
redisQueue.flush();

redisQueue.pop(100,0,TimeUnit.MILLISECONDS);

for(int i = 1; i <= 100; i++)
{
redisQueue.push(Arrays.asList(new QueueMessage(String.valueOf(i),"", 0, i)));
}

List<QueueMessage> messages = redisQueue.pop(1,0,TimeUnit.MILLISECONDS); // 1 item popped, 99 items priority messed up priority in Redis
assertEquals(1, messages.size());
assertEquals("1", messages.get(0).getId());

// push message with highest priority, and it should get poped first
redisQueue.push(Arrays.asList(new QueueMessage(String.valueOf(1),"", 0, 1)));
assertEquals(1, messages.size());
assertEquals("1", messages.get(0).getId());
}

@Test
public void testDelayedPriority() {

Expand Down

0 comments on commit ced757d

Please sign in to comment.