Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Added metrics, error handling and documentation for RedisConcurrentEx…
Browse files Browse the repository at this point in the history
…ecutionLimitDAO
  • Loading branch information
aravindanr committed Nov 18, 2021
1 parent 6f131dd commit bda3337
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public boolean exceedsLimit(Task task) {
return true;
}
} catch (Exception e) {
Monitors.error(CLASS_NAME, "exceedsInProgressLimit");
Monitors.error(CLASS_NAME, "exceedsLimit");
String errorMsg = String.format("Failed to get in progress limit - %s:%s in workflow :%s",
task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId());
LOGGER.error(errorMsg, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@

package com.netflix.conductor.redis.limit;

import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.redis.limit.config.RedisConcurrentExecutionLimitProperties;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand All @@ -27,44 +30,81 @@

import java.util.Optional;

@Trace
@Component
@ConditionalOnProperty(value = "conductor.redis-concurrent-execution-limit.enabled", havingValue = "true")
public class RedisConcurrentExecutionLimitDAO implements ConcurrentExecutionLimitDAO {

private static final Logger log = LoggerFactory.getLogger(RedisConcurrentExecutionLimitDAO.class);
private static final Logger LOGGER = LoggerFactory.getLogger(RedisConcurrentExecutionLimitDAO.class);
private static final String CLASS_NAME = RedisConcurrentExecutionLimitDAO.class.getSimpleName();

private final StringRedisTemplate stringRedisTemplate;
private final ConductorProperties properties;
private final RedisConcurrentExecutionLimitProperties properties;

public RedisConcurrentExecutionLimitDAO(StringRedisTemplate stringRedisTemplate, ConductorProperties properties) {
public RedisConcurrentExecutionLimitDAO(StringRedisTemplate stringRedisTemplate, RedisConcurrentExecutionLimitProperties properties) {
this.stringRedisTemplate = stringRedisTemplate;
this.properties = properties;
}

/**
* Adds the {@link Task} identifier to a Redis Set for the {@link TaskDef}'s name.
*
* @param task The {@link Task} object.
*/
@Override
public void addTaskToLimit(Task task) {
String taskId = task.getTaskId();
String taskDefName = task.getTaskDefName();

String keyName = createKeyName(taskDefName);

stringRedisTemplate.opsForSet().add(keyName, taskId);

log.debug("Added taskId: {} to key: {}", taskId, keyName);
try {
Monitors.recordDaoRequests(CLASS_NAME, "addTaskToLimit", task.getTaskType(), task.getWorkflowType());
String taskId = task.getTaskId();
String taskDefName = task.getTaskDefName();
String keyName = createKeyName(taskDefName);

stringRedisTemplate.opsForSet().add(keyName, taskId);

LOGGER.debug("Added taskId: {} to key: {}", taskId, keyName);
} catch (Exception e) {
Monitors.error(CLASS_NAME, "addTaskToLimit");
String errorMsg = String
.format("Error updating taskDefLimit for task - %s:%s in workflow: %s", task.getTaskDefName(),
task.getTaskId(), task.getWorkflowInstanceId());
LOGGER.error(errorMsg, e);
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e);
}
}

/**
* Remove the {@link Task} identifier from the Redis Set for the {@link TaskDef}'s name.
*
* @param task The {@link Task} object.
*/
@Override
public void removeTaskFromLimit(Task task) {
String taskId = task.getTaskId();
String taskDefName = task.getTaskDefName();

String keyName = createKeyName(taskDefName);

stringRedisTemplate.opsForSet().remove(keyName, taskId);

log.debug("Removed taskId: {} from key: {}", taskId, keyName);
try {
Monitors.recordDaoRequests(CLASS_NAME, "removeTaskFromLimit", task.getTaskType(), task.getWorkflowType());
String taskId = task.getTaskId();
String taskDefName = task.getTaskDefName();

String keyName = createKeyName(taskDefName);

stringRedisTemplate.opsForSet().remove(keyName, taskId);

LOGGER.debug("Removed taskId: {} from key: {}", taskId, keyName);
} catch (Exception e) {
Monitors.error(CLASS_NAME, "removeTaskFromLimit");
String errorMsg = String
.format("Error updating taskDefLimit for task - %s:%s in workflow: %s", task.getTaskDefName(),
task.getTaskId(), task.getWorkflowInstanceId());
LOGGER.error(errorMsg, e);
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e);
}
}

/**
* Checks if the {@link Task} identifier is in the Redis Set and size of the set is more than the {@link TaskDef#concurrencyLimit()}.
*
* @param task The {@link Task} object.
* @return true if the task id is not in the set and size of the set is more than the {@link TaskDef#concurrencyLimit()}.
*/
@Override
public boolean exceedsLimit(Task task) {
Optional<TaskDef> taskDefinition = task.getTaskDefinition();
Expand All @@ -76,30 +116,33 @@ public boolean exceedsLimit(Task task) {
return false;
}

String taskId = task.getTaskId();
String taskDefName = task.getTaskDefName();

String keyName = createKeyName(taskDefName);
try {
Monitors.recordDaoRequests(CLASS_NAME, "exceedsLimit", task.getTaskType(), task.getWorkflowType());
String taskId = task.getTaskId();
String taskDefName = task.getTaskDefName();
String keyName = createKeyName(taskDefName);

boolean isMember = ObjectUtils.defaultIfNull(stringRedisTemplate.opsForSet().isMember(keyName, taskId), false);
long size = ObjectUtils.defaultIfNull(stringRedisTemplate.opsForSet().size(keyName), -1L);
boolean isMember = ObjectUtils.defaultIfNull(stringRedisTemplate.opsForSet().isMember(keyName, taskId), false);
long size = ObjectUtils.defaultIfNull(stringRedisTemplate.opsForSet().size(keyName), -1L);

log.debug("Task: {} is {}, size: {} and limit: {}", taskId, isMember ? "member": "non-member", size, limit);
LOGGER.debug("Task: {} is {} of {}, size: {} and limit: {}", taskId, isMember ? "a member" : "not a member", keyName, size, limit);

return !isMember && size >= limit;
return !isMember && size >= limit;
} catch (Exception e) {
Monitors.error(CLASS_NAME, "exceedsLimit");
String errorMsg = String.format("Failed to get in progress limit - %s:%s in workflow :%s",
task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId());
LOGGER.error(errorMsg, e);
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg);
}
}

private String createKeyName(String taskDefName) {
StringBuilder builder = new StringBuilder();
String appId = properties.getAppId();
String stackName = properties.getStack();

if (StringUtils.isNotBlank(appId)) {
builder.append(appId).append('.');
}
String namespace = properties.getNamespace();

if (StringUtils.isNotBlank(stackName)) {
builder.append(stackName).append('.');
if (StringUtils.isNotBlank(namespace)) {
builder.append(namespace).append(':');
}

return builder.append(taskDefName).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public enum RedisType {

private String clientName;

private String namespace = "conductor";

public RedisType getType() {
return type;
}
Expand Down Expand Up @@ -81,4 +83,12 @@ public String getPassword() {
public void setPassword(String password) {
this.password = password;
}

public String getNamespace() {
return namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package com.netflix.conductor.redis.limit
import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.metadata.workflow.WorkflowTask
import com.netflix.conductor.core.config.ConductorProperties
import com.netflix.conductor.redis.limit.config.RedisConcurrentExecutionLimitProperties
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory
import org.springframework.data.redis.core.StringRedisTemplate
Expand All @@ -37,10 +37,10 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification {

StringRedisTemplate redisTemplate

ConductorProperties properties
RedisConcurrentExecutionLimitProperties properties

def setup() {
properties = new ConductorProperties(appId: 'conductor', stack: this.getClass().simpleName)
properties = new RedisConcurrentExecutionLimitProperties(namespace: 'conductor')
redisTemplate = new StringRedisTemplate(new JedisConnectionFactory(new RedisStandaloneConfiguration(redis.host, redis.firstMappedPort)))
dao = new RedisConcurrentExecutionLimitDAO(redisTemplate, properties)
}
Expand All @@ -49,7 +49,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification {
given:
def taskId = 'task1'
def taskDefName = 'task_def_name1'
def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String
def keyName = "${properties.namespace}:$taskDefName" as String

Task task = new Task(taskId: taskId, taskDefName: taskDefName)

Expand All @@ -66,7 +66,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification {
given:
def taskId = 'task1'
def taskDefName = 'task_def_name1'
def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String
def keyName = "${properties.namespace}:$taskDefName" as String

redisTemplate.opsForSet().add(keyName, taskId)

Expand Down Expand Up @@ -102,7 +102,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification {
given:
def taskId = 'task1'
def taskDefName = 'task_def_name1'
def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String
def keyName = "${properties.namespace}:$taskDefName" as String

Task task = new Task(taskId: taskId, taskDefName: taskDefName, workflowTask: new WorkflowTask(taskDefinition: new TaskDef(concurrentExecLimit: 2)))

Expand All @@ -119,7 +119,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification {
given:
def taskId = 'task1'
def taskDefName = 'task_def_name1'
def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String
def keyName = "${properties.namespace}:$taskDefName" as String

Task task = new Task(taskId: taskId, taskDefName: taskDefName, workflowTask: new WorkflowTask(taskDefinition: new TaskDef(concurrentExecLimit: 2)))

Expand All @@ -137,7 +137,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification {
given:
def taskId = 'task1'
def taskDefName = 'task_def_name1'
def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String
def keyName = "${properties.namespace}:$taskDefName" as String

Task task = new Task(taskId: taskId, taskDefName: taskDefName, workflowTask: new WorkflowTask(taskDefinition: new TaskDef(concurrentExecLimit: 2)))

Expand All @@ -152,7 +152,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification {
retVal
}

def "verify createKeyName ignores appId and stack if they are not present"() {
def "verify createKeyName ignores namespace if its not present"() {
given:
def dao = new RedisConcurrentExecutionLimitDAO(null, conductorProperties)

Expand All @@ -163,7 +163,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification {
keyName == expectedKeyName

where:
conductorProperties << [new ConductorProperties(stack: null, appId: null), new ConductorProperties(appId: 'conductor', stack: 'stack'), new ConductorProperties(appId: null, stack: 'stack'), new ConductorProperties(appId: 'conductor', stack: null)]
expectedKeyName << ['taskdefname', 'conductor.stack.taskdefname', 'stack.taskdefname', 'conductor.taskdefname']
conductorProperties << [new RedisConcurrentExecutionLimitProperties(), new RedisConcurrentExecutionLimitProperties(namespace: null), new RedisConcurrentExecutionLimitProperties(namespace: 'test')]
expectedKeyName << ['conductor:taskdefname', 'taskdefname', 'test:taskdefname']
}
}

0 comments on commit bda3337

Please sign in to comment.