diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java index 82d86f7152..450a68b954 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java @@ -270,7 +270,7 @@ public boolean exceedsLimit(Task task) { return true; } } catch (Exception e) { - Monitors.error(CLASS_NAME, "exceedsInProgressLimit"); + Monitors.error(CLASS_NAME, "exceedsLimit"); String errorMsg = String.format("Failed to get in progress limit - %s:%s in workflow :%s", task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); diff --git a/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAO.java b/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAO.java index e91b2ff86e..83308e992e 100644 --- a/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAO.java +++ b/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAO.java @@ -13,10 +13,13 @@ package com.netflix.conductor.redis.limit; +import com.netflix.conductor.annotations.Trace; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskDef; -import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.exception.ApplicationException; import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO; +import com.netflix.conductor.metrics.Monitors; +import com.netflix.conductor.redis.limit.config.RedisConcurrentExecutionLimitProperties; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -27,44 +30,81 @@ import java.util.Optional; +@Trace @Component @ConditionalOnProperty(value = "conductor.redis-concurrent-execution-limit.enabled", havingValue = "true") public class RedisConcurrentExecutionLimitDAO implements ConcurrentExecutionLimitDAO { - private static final Logger log = LoggerFactory.getLogger(RedisConcurrentExecutionLimitDAO.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RedisConcurrentExecutionLimitDAO.class); + private static final String CLASS_NAME = RedisConcurrentExecutionLimitDAO.class.getSimpleName(); private final StringRedisTemplate stringRedisTemplate; - private final ConductorProperties properties; + private final RedisConcurrentExecutionLimitProperties properties; - public RedisConcurrentExecutionLimitDAO(StringRedisTemplate stringRedisTemplate, ConductorProperties properties) { + public RedisConcurrentExecutionLimitDAO(StringRedisTemplate stringRedisTemplate, RedisConcurrentExecutionLimitProperties properties) { this.stringRedisTemplate = stringRedisTemplate; this.properties = properties; } + /** + * Adds the {@link Task} identifier to a Redis Set for the {@link TaskDef}'s name. + * + * @param task The {@link Task} object. + */ @Override public void addTaskToLimit(Task task) { - String taskId = task.getTaskId(); - String taskDefName = task.getTaskDefName(); - - String keyName = createKeyName(taskDefName); - - stringRedisTemplate.opsForSet().add(keyName, taskId); - - log.debug("Added taskId: {} to key: {}", taskId, keyName); + try { + Monitors.recordDaoRequests(CLASS_NAME, "addTaskToLimit", task.getTaskType(), task.getWorkflowType()); + String taskId = task.getTaskId(); + String taskDefName = task.getTaskDefName(); + String keyName = createKeyName(taskDefName); + + stringRedisTemplate.opsForSet().add(keyName, taskId); + + LOGGER.debug("Added taskId: {} to key: {}", taskId, keyName); + } catch (Exception e) { + Monitors.error(CLASS_NAME, "addTaskToLimit"); + String errorMsg = String + .format("Error updating taskDefLimit for task - %s:%s in workflow: %s", task.getTaskDefName(), + task.getTaskId(), task.getWorkflowInstanceId()); + LOGGER.error(errorMsg, e); + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e); + } } + /** + * Remove the {@link Task} identifier from the Redis Set for the {@link TaskDef}'s name. + * + * @param task The {@link Task} object. + */ @Override public void removeTaskFromLimit(Task task) { - String taskId = task.getTaskId(); - String taskDefName = task.getTaskDefName(); - - String keyName = createKeyName(taskDefName); - - stringRedisTemplate.opsForSet().remove(keyName, taskId); - - log.debug("Removed taskId: {} from key: {}", taskId, keyName); + try { + Monitors.recordDaoRequests(CLASS_NAME, "removeTaskFromLimit", task.getTaskType(), task.getWorkflowType()); + String taskId = task.getTaskId(); + String taskDefName = task.getTaskDefName(); + + String keyName = createKeyName(taskDefName); + + stringRedisTemplate.opsForSet().remove(keyName, taskId); + + LOGGER.debug("Removed taskId: {} from key: {}", taskId, keyName); + } catch (Exception e) { + Monitors.error(CLASS_NAME, "removeTaskFromLimit"); + String errorMsg = String + .format("Error updating taskDefLimit for task - %s:%s in workflow: %s", task.getTaskDefName(), + task.getTaskId(), task.getWorkflowInstanceId()); + LOGGER.error(errorMsg, e); + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e); + } } + /** + * Checks if the {@link Task} identifier is in the Redis Set and size of the set is more than the {@link TaskDef#concurrencyLimit()}. + * + * @param task The {@link Task} object. + * @return true if the task id is not in the set and size of the set is more than the {@link TaskDef#concurrencyLimit()}. + */ @Override public boolean exceedsLimit(Task task) { Optional taskDefinition = task.getTaskDefinition(); @@ -76,30 +116,33 @@ public boolean exceedsLimit(Task task) { return false; } - String taskId = task.getTaskId(); - String taskDefName = task.getTaskDefName(); - - String keyName = createKeyName(taskDefName); + try { + Monitors.recordDaoRequests(CLASS_NAME, "exceedsLimit", task.getTaskType(), task.getWorkflowType()); + String taskId = task.getTaskId(); + String taskDefName = task.getTaskDefName(); + String keyName = createKeyName(taskDefName); - boolean isMember = ObjectUtils.defaultIfNull(stringRedisTemplate.opsForSet().isMember(keyName, taskId), false); - long size = ObjectUtils.defaultIfNull(stringRedisTemplate.opsForSet().size(keyName), -1L); + boolean isMember = ObjectUtils.defaultIfNull(stringRedisTemplate.opsForSet().isMember(keyName, taskId), false); + long size = ObjectUtils.defaultIfNull(stringRedisTemplate.opsForSet().size(keyName), -1L); - log.debug("Task: {} is {}, size: {} and limit: {}", taskId, isMember ? "member": "non-member", size, limit); + LOGGER.debug("Task: {} is {} of {}, size: {} and limit: {}", taskId, isMember ? "a member" : "not a member", keyName, size, limit); - return !isMember && size >= limit; + return !isMember && size >= limit; + } catch (Exception e) { + Monitors.error(CLASS_NAME, "exceedsLimit"); + String errorMsg = String.format("Failed to get in progress limit - %s:%s in workflow :%s", + task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId()); + LOGGER.error(errorMsg, e); + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg); + } } private String createKeyName(String taskDefName) { StringBuilder builder = new StringBuilder(); - String appId = properties.getAppId(); - String stackName = properties.getStack(); - - if (StringUtils.isNotBlank(appId)) { - builder.append(appId).append('.'); - } + String namespace = properties.getNamespace(); - if (StringUtils.isNotBlank(stackName)) { - builder.append(stackName).append('.'); + if (StringUtils.isNotBlank(namespace)) { + builder.append(namespace).append(':'); } return builder.append(taskDefName).toString(); diff --git a/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/config/RedisConcurrentExecutionLimitProperties.java b/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/config/RedisConcurrentExecutionLimitProperties.java index ba1e3e629b..d4890101f1 100644 --- a/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/config/RedisConcurrentExecutionLimitProperties.java +++ b/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/config/RedisConcurrentExecutionLimitProperties.java @@ -34,6 +34,8 @@ public enum RedisType { private String clientName; + private String namespace = "conductor"; + public RedisType getType() { return type; } @@ -81,4 +83,12 @@ public String getPassword() { public void setPassword(String password) { this.password = password; } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } } diff --git a/redis-concurrency-limit/src/test/groovy/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAOSpec.groovy b/redis-concurrency-limit/src/test/groovy/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAOSpec.groovy index a05b656181..e74039387d 100644 --- a/redis-concurrency-limit/src/test/groovy/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAOSpec.groovy +++ b/redis-concurrency-limit/src/test/groovy/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAOSpec.groovy @@ -16,7 +16,7 @@ package com.netflix.conductor.redis.limit import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.metadata.workflow.WorkflowTask -import com.netflix.conductor.core.config.ConductorProperties +import com.netflix.conductor.redis.limit.config.RedisConcurrentExecutionLimitProperties import org.springframework.data.redis.connection.RedisStandaloneConfiguration import org.springframework.data.redis.connection.jedis.JedisConnectionFactory import org.springframework.data.redis.core.StringRedisTemplate @@ -37,10 +37,10 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification { StringRedisTemplate redisTemplate - ConductorProperties properties + RedisConcurrentExecutionLimitProperties properties def setup() { - properties = new ConductorProperties(appId: 'conductor', stack: this.getClass().simpleName) + properties = new RedisConcurrentExecutionLimitProperties(namespace: 'conductor') redisTemplate = new StringRedisTemplate(new JedisConnectionFactory(new RedisStandaloneConfiguration(redis.host, redis.firstMappedPort))) dao = new RedisConcurrentExecutionLimitDAO(redisTemplate, properties) } @@ -49,7 +49,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification { given: def taskId = 'task1' def taskDefName = 'task_def_name1' - def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String + def keyName = "${properties.namespace}:$taskDefName" as String Task task = new Task(taskId: taskId, taskDefName: taskDefName) @@ -66,7 +66,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification { given: def taskId = 'task1' def taskDefName = 'task_def_name1' - def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String + def keyName = "${properties.namespace}:$taskDefName" as String redisTemplate.opsForSet().add(keyName, taskId) @@ -102,7 +102,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification { given: def taskId = 'task1' def taskDefName = 'task_def_name1' - def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String + def keyName = "${properties.namespace}:$taskDefName" as String Task task = new Task(taskId: taskId, taskDefName: taskDefName, workflowTask: new WorkflowTask(taskDefinition: new TaskDef(concurrentExecLimit: 2))) @@ -119,7 +119,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification { given: def taskId = 'task1' def taskDefName = 'task_def_name1' - def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String + def keyName = "${properties.namespace}:$taskDefName" as String Task task = new Task(taskId: taskId, taskDefName: taskDefName, workflowTask: new WorkflowTask(taskDefinition: new TaskDef(concurrentExecLimit: 2))) @@ -137,7 +137,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification { given: def taskId = 'task1' def taskDefName = 'task_def_name1' - def keyName = "${properties.appId}.${properties.stack}.$taskDefName" as String + def keyName = "${properties.namespace}:$taskDefName" as String Task task = new Task(taskId: taskId, taskDefName: taskDefName, workflowTask: new WorkflowTask(taskDefinition: new TaskDef(concurrentExecLimit: 2))) @@ -152,7 +152,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification { retVal } - def "verify createKeyName ignores appId and stack if they are not present"() { + def "verify createKeyName ignores namespace if its not present"() { given: def dao = new RedisConcurrentExecutionLimitDAO(null, conductorProperties) @@ -163,7 +163,7 @@ class RedisConcurrentExecutionLimitDAOSpec extends Specification { keyName == expectedKeyName where: - conductorProperties << [new ConductorProperties(stack: null, appId: null), new ConductorProperties(appId: 'conductor', stack: 'stack'), new ConductorProperties(appId: null, stack: 'stack'), new ConductorProperties(appId: 'conductor', stack: null)] - expectedKeyName << ['taskdefname', 'conductor.stack.taskdefname', 'stack.taskdefname', 'conductor.taskdefname'] + conductorProperties << [new RedisConcurrentExecutionLimitProperties(), new RedisConcurrentExecutionLimitProperties(namespace: null), new RedisConcurrentExecutionLimitProperties(namespace: 'test')] + expectedKeyName << ['conductor:taskdefname', 'taskdefname', 'test:taskdefname'] } }