From b49c2eb09daabe1c1fc26cabd85c838d7720806e Mon Sep 17 00:00:00 2001 From: Pimeng Fu Date: Tue, 30 Jul 2019 15:13:42 -0700 Subject: [PATCH 1/9] Add expression prettifier (#524) --- ArgusWeb/app/js/controllers/viewMetrics.js | 44 ++++++- ArgusWeb/app/js/services/utilService.js | 145 +++++++++++++++++++++ ArgusWeb/app/js/templates/viewmetrics.html | 12 ++ 3 files changed, 199 insertions(+), 2 deletions(-) diff --git a/ArgusWeb/app/js/controllers/viewMetrics.js b/ArgusWeb/app/js/controllers/viewMetrics.js index 81148e811..a3ee585f9 100644 --- a/ArgusWeb/app/js/controllers/viewMetrics.js +++ b/ArgusWeb/app/js/controllers/viewMetrics.js @@ -2,8 +2,8 @@ 'use strict'; angular.module('argus.controllers.viewMetrics', ['ngResource']) -.controller('ViewMetrics', ['$location', '$routeParams', '$scope', '$compile', 'growl', 'Metrics', 'Annotations', 'SearchService', 'Controls', 'ChartDataProcessingService', 'DateHandlerService', 'InputTracker', - function ($location, $routeParams, $scope, $compile, growl, Metrics, Annotations, SearchService, Controls, ChartDataProcessingService, DateHandlerService, InputTracker) { +.controller('ViewMetrics', ['$location', '$routeParams', '$scope', '$compile', 'growl', 'Metrics', 'Annotations', 'SearchService', 'Controls', 'ChartDataProcessingService', 'DateHandlerService', 'UtilService', + function ($location, $routeParams, $scope, $compile, growl, Metrics, Annotations, SearchService, Controls, ChartDataProcessingService, DateHandlerService, UtilService) { var lastParams; var noMorePages = false; $scope.annotationType = 'ALERT'; @@ -319,4 +319,44 @@ angular.module('argus.controllers.viewMetrics', ['ngResource']) }; $scope.getMetricData(); + + $scope.editorShown = false; + $scope.treeText = ''; + $scope.prettify = function() { + $scope.editorShown = true; + $scope.treeText = UtilService.prettifyExpression($scope.expression); + }; + $scope.hide = function() { + $scope.editorShown = false; + }; + $scope.textAreaOnChange = function() { + var tree = UtilService.getExpressionTree($scope.treeText); + $scope.expression = UtilService.flatTree(tree); + }; + $scope.editorLoaded = function (editor) { + editor.setSize(null, 'auto'); + editor.on('keydown', function(editor, event){ + event.stopPropagation(); + }); + }; + $scope.editorOptions = { + lineWrapping: true, + lineNumbers: true, + mode: 'htmlmixed', + viewportMargin: Infinity, + tabSize: 2, + foldGutter: true, + gutters: ['CodeMirror-linenumbers', 'CodeMirror-foldgutter'], + autoCloseTags: true, + matchTags: {bothTags: true}, + extraKeys: { /* key board short cuts in the the editor */ + 'Alt-Space': 'autocomplete', + 'Ctrl-Alt-F': function(editor) { + editor.setOption('fullScreen', !editor.getOption('fullScreen')); + }, + 'Esc': function(editor) { + if (editor.getOption('fullScreen')) editor.setOption('fullScreen', false); + }, + } + }; }]); diff --git a/ArgusWeb/app/js/services/utilService.js b/ArgusWeb/app/js/services/utilService.js index 3bec97910..135f03453 100644 --- a/ArgusWeb/app/js/services/utilService.js +++ b/ArgusWeb/app/js/services/utilService.js @@ -1,5 +1,6 @@ /*global angular:false, copyProperties:false */ 'use strict'; + angular.module('argus.services.utils', []) .service('UtilService', ['$filter', function($filter) { var options = { @@ -94,6 +95,150 @@ angular.module('argus.services.utils', []) target[i] = obj[i]; } return target; + }, + + ExpressionNode: class { + constructor(type, text){ + this.type = type + this.text = text + this.children = [] + } + appendChild(node){ + this.children.push(node) + } + }, + + getExpressionTree: function(expression){ + expression = expression.trim() + const n = expression.length + const stack = [] + let curT = undefined //current transform + let tmpText = '' + let tmpType = 'expression' + + for(let i = 0; i < n; i ++ ) { + const c = expression[i] + if(c.match(/\s/)) continue + let node + switch (c) { + case '(': + if (curT) { + stack.push(curT) + } + curT = new this.ExpressionNode('transform', tmpText) + tmpText = '' + tmpType = 'expression' + continue + case ')': + if(tmpText !== ''){ + node = new this.ExpressionNode(tmpType, tmpText) + curT.appendChild(node) + } + if (stack.length === 0){ + //end of outter most expression + return curT + } + const lastT = stack.pop() + lastT.appendChild(curT) //add just ended transform to parent + curT = lastT + tmpText = '' + tmpType = 'expression' + continue + case ',': + if (tmpText === '') continue // xxx),xxx + if (tmpType === 'tag') { //do not take comma as seperator + tmpText += c + continue + } + node = new this.ExpressionNode(tmpType, tmpText) + curT.appendChild(node) + tmpText = '' + tmpType = 'expression' + continue + case '{': + tmpText += c + tmpType = 'tag' //TODO: add tag children for expression + continue + case '}': + tmpText += c + tmpType = 'expression' + continue + case '#': + tmpText += c + tmpType = 'constant' + continue + default: + tmpText += c + } + } + if (tmpText !== '') { + //just a normal expression without transform + return new this.ExpressionNode(tmpType, tmpText) + } + return curT // if there is a tranform, root should be returned in the loop + }, + + printTree: function(depth, isFirstChild, stringArr, previousNode, node) { + const indentation = ' '.repeat(depth * 2) + if (previousNode && previousNode.type === 'transform'){ + stringArr.push(`\n${indentation}`) + } + if (isFirstChild){ + stringArr.push(indentation) //indentation + } + if (node.type === 'transform'){ + stringArr.push(`${node.text}(\n`) + let isFirstChild = true + let previousChild + for(let child of node.children){ + if (!isFirstChild) { + stringArr.push(',') + } + this.printTree(depth + 1, isFirstChild, stringArr, previousChild, child) + previousChild = child + if (isFirstChild) isFirstChild = false + } + stringArr.push(`\n${indentation})`) + } else { + stringArr.push(node.text) + } + }, + + printTreeFlat: function(stringArr, node) { + if (node.type === 'transform'){ + stringArr.push(`${node.text}(`) + let isFirstChild = true + for(let child of node.children){ + if (!isFirstChild) { + stringArr.push(',') + } + this.printTreeFlat(stringArr,child) + if (isFirstChild) isFirstChild = false + } + stringArr.push(`)`) + } else { + stringArr.push(node.text) + } + }, + + prettifyExpression: function(expression) { + const tree = this.getExpressionTree(expression) + const stringArr = [] + this.printTree(0, true, stringArr, undefined, tree) + return stringArr.join('') + }, + + flatTree: function(tree) { + const stringArr = [] + this.printTreeFlat(stringArr, tree) + return stringArr.join('') + }, + + typeOfNode: function(text) { + const firstChar = text.trim()[0] + if (firstChar === '#') return 'constant' + if (/[A-Z]/.test(firstChar)) return 'transform' + return 'expression' } }; return options; diff --git a/ArgusWeb/app/js/templates/viewmetrics.html b/ArgusWeb/app/js/templates/viewmetrics.html index 9be16685f..ed1c4468e 100644 --- a/ArgusWeb/app/js/templates/viewmetrics.html +++ b/ArgusWeb/app/js/templates/viewmetrics.html @@ -24,6 +24,18 @@ Annotation type: + + + +
+
+
+
+
+
From 87655bea21be33aba603f399b50b3e3d253ef9f5 Mon Sep 17 00:00:00 2001 From: Rajavardhan Sarkapally Date: Tue, 30 Jul 2019 16:00:17 -0700 Subject: [PATCH 2/9] Slice (#530) * W-5335462 Alias transform changes * MetricServiceSnapshot changes * Slice transform implementation * SLice transform implementation * Slice transform implementation * Improving regex to accept start and end * Spelling correction --- .../metric/transform/SliceTransform.java | 115 ++++++++++++++++++ .../metric/transform/TransformFactory.java | 5 +- ArgusCore/src/main/javacc/MetricReader.jj | 7 ++ .../metric/transform/SliceTransformTest.java | 109 +++++++++++++++++ 4 files changed, 235 insertions(+), 1 deletion(-) create mode 100644 ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/transform/SliceTransform.java create mode 100644 ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/transform/SliceTransformTest.java diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/transform/SliceTransform.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/transform/SliceTransform.java new file mode 100644 index 000000000..296c93040 --- /dev/null +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/transform/SliceTransform.java @@ -0,0 +1,115 @@ +package com.salesforce.dva.argus.service.metric.transform; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.salesforce.dva.argus.entity.Metric; +import com.salesforce.dva.argus.system.SystemAssert; +import com.salesforce.dva.argus.util.QueryContext; +import com.salesforce.dva.argus.util.TransformUtil; +/** + * It provides methods to implement Slice transform + * @author Raj Sarkapally + * + */ +public class SliceTransform implements Transform{ + private static String START_TIME = "start"; + private static String END_TIME = "end"; + private static long SECOND_IN_MILLI=1000l; + + @Override + public List transform(QueryContext context, List metrics) { + throw new UnsupportedOperationException("Slice Transform needs interval start time and end time."); + } + + @Override + public List transform(QueryContext queryContext, + List metrics, List constants) { + SystemAssert.requireArgument(constants != null, "Slice Transform needs interval start time and end time."); + SystemAssert.requireArgument(constants.size() == 2, "Slice Transform must provide exactly 2 constants which are interval start time and interval end time."); + + String startEndTimePattern= "("+ START_TIME + "|"+ END_TIME +")(\\s*[+-]\\s*\\d+[smhd])?"; + String sliceStartTime = constants.get(0).trim(); + String sliceEndTime = constants.get(1).trim(); + SystemAssert.requireArgument((isLong(sliceStartTime) || sliceStartTime.matches(startEndTimePattern)), "The start time of Slice transform is invalid."); + SystemAssert.requireArgument((isLong(sliceEndTime) || sliceEndTime.matches(startEndTimePattern)), "The end time of Slice transform is invalid."); + + long sliceStartTimeInMilli = calculateTime(sliceStartTime, queryContext.getChildContexts().get(0).getExpression().getStartTimestamp(), + queryContext.getChildContexts().get(0).getExpression().getEndTimestamp()); + + long sliceEndTimeInMilli = calculateTime(sliceEndTime, queryContext.getChildContexts().get(0).getExpression().getStartTimestamp(), + queryContext.getChildContexts().get(0).getExpression().getEndTimestamp()); + + metrics.forEach(metric -> { + Map slicedDatapoints = new HashMap<>(); + metric.getDatapoints().forEach((timestamp,value) ->{ + if(timestamp >= sliceStartTimeInMilli && timestamp <=sliceEndTimeInMilli) { + slicedDatapoints.put(timestamp, value); + } + }); + metric.setDatapoints(slicedDatapoints); + }); + return metrics; + } + + @Override + public List transform(QueryContext queryContext, + List... metrics) { + throw new UnsupportedOperationException("Slice Transform doesn't need list of list."); + } + + @Override + public String getResultScopeName() { + return TransformFactory.Function.SLICE.name(); + } + + private long calculateTime(String time,long queryStartTime, long queryEndTime) { + if(isLong(time)) { + return Long.valueOf(time); + }else { + long startREndtime; + String remTimeString; + if(time.contains(START_TIME)) { + startREndtime=queryStartTime; + remTimeString=time.substring(START_TIME.length()).trim(); + if(remTimeString.isEmpty()) { + return queryStartTime; + } + }else { + startREndtime=queryEndTime; + remTimeString=time.substring(END_TIME.length()).trim(); + if(remTimeString.isEmpty()) { + return queryEndTime; + } + } + return calculate(startREndtime, remTimeString.charAt(0), SECOND_IN_MILLI * TransformUtil.getWindowInSeconds(remTimeString.substring(1).trim())); + } + } + + private long calculate(long operand1, char operator, long operand2) { + switch(operator) { + case '+': + return operand1 + operand2; + case '-': + return operand1 - operand2; + case '*': + return operand1 * operand2; + case '/': + return operand1/operand2; + default: + return operand1-operand2; + } + } + + private boolean isLong(String s) { + try { + Long.valueOf(s); + return true; + }catch(NumberFormatException e) { + return false; + }catch(Throwable t) { + return false; + } + } +} diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/transform/TransformFactory.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/transform/TransformFactory.java index b7ea9c44c..e0dfbd5f2 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/transform/TransformFactory.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/transform/TransformFactory.java @@ -200,6 +200,8 @@ public Transform getTransform(String functionName) { return new InterpolateTransform(); case RATE: return new RateTransform(); + case SLICE: + return new SliceTransform(); default: throw new UnsupportedOperationException(functionName); } // end switch @@ -277,7 +279,8 @@ public enum Function { ANOMALY_KMEANS("ANOMALY_KMEANS", "Calculates an anomaly score (0-100) for each value of the metric based on a K-means clustering of the metric data."), ANOMALY_RPCA("ANOMALY_RPCA", "Calculates an anomaly score (0-100) for each value of the metric based on the RPCA matrix decomposition algorithm."), INTERPOLATE("INTERPOLATE", "Performs interpolation of multiple time series, that can then be used for aggregation"), - RATE("RATE", "Performs Rate for all given time series"); + RATE("RATE", "Performs Rate for all given time series"), + SLICE("SLICE", "Removes data points before interval start time and after interval end time. "); private final String _name; private final String _description; diff --git a/ArgusCore/src/main/javacc/MetricReader.jj b/ArgusCore/src/main/javacc/MetricReader.jj index 9b3732a3e..d35b429f8 100644 --- a/ArgusCore/src/main/javacc/MetricReader.jj +++ b/ArgusCore/src/main/javacc/MetricReader.jj @@ -311,6 +311,7 @@ TOKEN : { < GROUPBY : "GROUPBY" > } TOKEN : { < GROUPBYTAG : "GROUPBYTAG" > } TOKEN : { < INTERPOLATE : "INTERPOLATE" > } TOKEN : { < RATE : "RATE" > } +TOKEN : { < SLICE : "SLICE" > } TOKEN : { < METADATA_INCLUDE : "METADATA_INCLUDE" > } TOKEN : { < METADATA_EXCLUDE : "METADATA_EXCLUDE" > } @@ -782,6 +783,12 @@ private String getFunctionNameAndUpdateContext(QueryContextHolder contextHolder) } | t = + { + updateQueryContextWithFunction(t.image, contextHolder); + return t.image; + } + | + t = { updateQueryContextWithFunction(t.image, contextHolder); return t.image; diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/transform/SliceTransformTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/transform/SliceTransformTest.java new file mode 100644 index 000000000..9ff8011ac --- /dev/null +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/transform/SliceTransformTest.java @@ -0,0 +1,109 @@ +package com.salesforce.dva.argus.service.metric.transform; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Test; + +import com.salesforce.dva.argus.entity.Metric; +import com.salesforce.dva.argus.util.QueryContext; +import com.salesforce.dva.argus.util.TSDBQueryExpression; + +public class SliceTransformTest { + private static final String TEST_SCOPE = "test-scope"; + private static final String TEST_METRIC = "test-metric"; + private static final long SECOND=1000l; + + @Test + public void testWithAbsoluteTime() { + SliceTransform transform = new SliceTransform(); + + Metric actualMetric = new Metric(TEST_SCOPE, TEST_METRIC); + Map actualDps = new HashMap(); + actualDps.put(1l, 1d); + actualDps.put(2l, 2d); + actualDps.put(3l, 3d); + actualDps.put(4l, 4d); + actualDps.put(5l, 5d); + + actualMetric.setDatapoints(actualDps); + + List constants = new ArrayList(); + constants.add("2"); + constants.add("4"); + + QueryContext context = new QueryContext(); + + QueryContext childContext = new QueryContext(); + TSDBQueryExpression exp = new TSDBQueryExpression(); + exp.setStartTimestamp(1l); + exp.setEndTimestamp(6l); + childContext.setExpression(exp); + + context.setChildContexts(Arrays.asList(childContext)); + + List actual = transform.transform(context, Arrays.asList(actualMetric),constants); + + Metric expectedMetric = new Metric(TEST_SCOPE, TEST_METRIC); + Map expectedDps = new HashMap(); + expectedDps.put(2l, 2d); + expectedDps.put(3l, 3d); + expectedDps.put(4l, 4d); + + expectedMetric.setDatapoints(expectedDps); + + List expected = Arrays.asList(expectedMetric); + + assertEquals(expected.get(0), actual.get(0)); + assertEquals(expected.get(0).getDatapoints(), actual.get(0).getDatapoints()); + } + + @Test + public void testWithRelativeTime() { + SliceTransform transform = new SliceTransform(); + + Metric actualMetric = new Metric(TEST_SCOPE, TEST_METRIC); + Map actualDps = new HashMap(); + actualDps.put(1*SECOND, 1d); + actualDps.put(2*SECOND, 2d); + actualDps.put(3*SECOND, 3d); + actualDps.put(4*SECOND, 4d); + actualDps.put(5*SECOND, 5d); + actualDps.put(6*SECOND, 6d); + actualDps.put(7*SECOND, 7d); + + actualMetric.setDatapoints(actualDps); + + List constants = new ArrayList(); + constants.add("start + 2s"); + constants.add("end-2s"); + + QueryContext context = new QueryContext(); + QueryContext childContext = new QueryContext(); + TSDBQueryExpression exp = new TSDBQueryExpression(); + exp.setStartTimestamp(1*SECOND); + exp.setEndTimestamp(7*SECOND); + childContext.setExpression(exp); + + context.setChildContexts(Arrays.asList(childContext)); + List actual = transform.transform(context, Arrays.asList(actualMetric),constants); + + Metric expectedMetric = new Metric(TEST_SCOPE, TEST_METRIC); + Map expectedDps = new HashMap(); + expectedDps.put(3*SECOND, 3d); + expectedDps.put(4*SECOND, 4d); + expectedDps.put(5*SECOND, 5d); + + expectedMetric.setDatapoints(expectedDps); + + List expected = Arrays.asList(expectedMetric); + + assertEquals(expected.get(0), actual.get(0)); + assertEquals(expected.get(0).getDatapoints(), actual.get(0).getDatapoints()); + } +} From 938938b7cb3c1768535e42ca1941d2b58433e5b0 Mon Sep 17 00:00:00 2001 From: Pimeng Fu Date: Wed, 31 Jul 2019 11:49:39 -0700 Subject: [PATCH 3/9] Add login instruction (#541) --- ArgusWeb/app/js/templates/login.html | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/ArgusWeb/app/js/templates/login.html b/ArgusWeb/app/js/templates/login.html index 4ac066149..da2d35d2c 100644 --- a/ArgusWeb/app/js/templates/login.html +++ b/ArgusWeb/app/js/templates/login.html @@ -3,7 +3,16 @@
-

Login to Argus

+

+ + Login to Argus + (please use SSO username/password) + + +


From 0c2fdf758ae18f1ccdcc396656ca935bca044a19 Mon Sep 17 00:00:00 2001 From: Colby Guan Date: Thu, 1 Aug 2019 13:05:58 -0700 Subject: [PATCH 4/9] fix duplicate IDB DKS config key (#545) --- .../dva/argus/service/metric/metadata/DefaultIDBClient.java | 6 +++--- .../ElasticSearchConsumerOffsetMetricsServiceTest.java | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/metadata/DefaultIDBClient.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/metadata/DefaultIDBClient.java index b7b1eb8d7..e589eb94d 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/metadata/DefaultIDBClient.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/metadata/DefaultIDBClient.java @@ -69,8 +69,8 @@ CloseableHttpClient createClient(SystemConfiguration config, int connectionCount keystore = new DynamicKeyStoreBuilder() .withMonitoredDirectory(config.getValue(Property.IDB_KEYSTORE_MONITORED_DIRECTORY.getName(), Property.IDB_KEYSTORE_MONITORED_DIRECTORY.getDefaultValue())) - .withCADirectory(config.getValue(Property.IDB_KEYSTORE_CA_DIRECTORY.getName(), - Property.IDB_KEYSTORE_CA_DIRECTORY.getDefaultValue())) + .withCADirectory(config.getValue(Property.IDB_CA_DIRECTORY.getName(), + Property.IDB_CA_DIRECTORY.getDefaultValue())) .withStartThread(true).build(); SSLContext sslContext = keystore.getSSLContext(); SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext); @@ -196,7 +196,7 @@ public enum Property { IDB_ENDPOINT("service.property.idbclient.endpoint", "https://cfg0-cidbapima1-0-prd.data.sfdc.net:443"), IDB_CONN_COUNT("service.property.idbclient.conn.count", "30"), IDB_KEYSTORE_MONITORED_DIRECTORY("service.property.idbclient.keystore.monitored.directory", "/etc/pki_service/sfdc/argus-ajnaconsumer"), - IDB_KEYSTORE_CA_DIRECTORY("service.property.idbclient.keystore.monitored.directory", "/etc/pki_service/ca"), + IDB_CA_DIRECTORY("service.property.idbclient.ca.directory", "/etc/pki_service/ca"), IDB_CACHE_TTL_SECS("service.property.idbclient.cache.ttl.secs", "1800"), IDB_CONN_TIMEOUT("service.property.idbclient.conn.timeout", "30000"), IDB_SOCKET_TIMEOUT("service.property.idbclient.socket.timeout", "30000"); diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsServiceTest.java index aa680be09..b8b6f71bf 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsServiceTest.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsServiceTest.java @@ -19,6 +19,7 @@ import org.elasticsearch.client.RestClient; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; @@ -322,6 +323,7 @@ public void testConstructQuery() throws IOException { assertEquals(convertToPrettyJson(expectedOutput), convertToPrettyJson(actualOutput)); } + @Ignore @Test public void testConsumerOffsetSchemaRecordListMapper() throws IOException { mapper = ElasticSearchConsumerOffsetMetricsService.getMetricObjectMapper(new ConsumerOffsetRecordList.IndexSerializer(), new ConsumerOffsetRecordList.Deserializer()); From 92c6b23f0b3e7a766c99da9e540453fb73138e54 Mon Sep 17 00:00:00 2001 From: Sudhanshu Bahety Date: Thu, 1 Aug 2019 23:07:56 -0700 Subject: [PATCH 5/9] W-6124967 [Data Lag Detection] Query Annotation ES cluster to detect data lag (Alert Client) (#511) * Adding datalag interface for implementation * Adding javadoc to interface * Adding class for consumer offset datalag calculation * Adding triggering and clearing thresholds * Adding inertia logic * Testing framework completed * Adding tests for data lag methods * Adding debug mode checks * Adding more cases for data lag checks * Adding metric query parsing logic and further checks * Adding flexibility while handling cases --- .../dva/argus/service/MetricService.java | 24 + .../dva/argus/service/MonitorService.java | 1 + .../service/alert/DefaultAlertService.java | 9 +- .../service/metric/DefaultMetricService.java | 29 ++ ...ticSearchConsumerOffsetMetricsService.java | 2 +- .../service/metric/MetricQueryProcessor.java | 20 +- .../argus/service/monitor/DataLagMonitor.java | 217 --------- .../monitor/DataLagMonitorConsumerOffset.java | 429 ++++++++++++++++++ .../monitor/DataLagMonitorGoldenMetric.java | 266 +++++++++++ .../argus/service/monitor/DataLagService.java | 118 +++++ .../monitor/DefaultMonitorService.java | 38 +- .../dva/argus/system/SystemConfiguration.java | 13 +- .../dva/argus/system/SystemInitializer.java | 3 + .../com/salesforce/dva/argus/TestUtils.java | 12 +- .../dva/argus/service/MonitorServiceTest.java | 44 +- .../alert/DefaultAlertServiceTest.java | 11 +- .../ElasticSearchAnnotationServiceTest.java | 7 +- ...earchConsumerOffsetMetricsServiceTest.java | 5 +- .../service/monitor/DataLagServiceTest.java | 260 +++++++++++ 19 files changed, 1201 insertions(+), 307 deletions(-) delete mode 100644 ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitor.java create mode 100644 ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorConsumerOffset.java create mode 100644 ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorGoldenMetric.java create mode 100644 ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagService.java create mode 100644 ArgusCore/src/test/java/com/salesforce/dva/argus/service/monitor/DataLagServiceTest.java diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/MetricService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/MetricService.java index 29336b64f..6bde2df51 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/MetricService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/MetricService.java @@ -156,6 +156,30 @@ public interface MetricService extends Service { */ List getQueries(List expression, long relativeTo); + /** + * Returns a list of MetricQuery objects corresponding to the given expression where the query time range is relativeTo by the given value. + * + * @param expression The metric expressions to evaluate. Cannot be null, but may be empty. All entries must be a valid metric expression. + * @param relativeTo The timestamp from which the start and end times should be relative to. Only applied when using + * relative timestamps in expressions. + * For e.g. If the expression is -1h:argus.jvm:mem.heap.used:avg, 1 hour should be subtracted from + * relativeTo + * @return The corresponding list of metric query objects. Will never return null. + */ + List parseToMetricQuery(String expression, long relativeTo); + + /** + * Returns a list of MetricQuery objects corresponding to the given expression where the query time range is relativeTo by the given value. + * + * @param expression The list of metric expressions to evaluate. Cannot be null, but may be empty. All entries must be a valid metric expression. + * @param relativeTo The timestamp from which the start and end times should be relative to. Only applied when using + * relative timestamps in expressions. + * For e.g. If the expression is -1h:argus.jvm:mem.heap.used:avg, 1 hour should be subtracted from + * relativeTo + * @return The corresponding list of metric query objects. Will never return null. + */ + List parseToMetricQuery(List expression, long relativeTo); + /** * Returns list of DC from the metric query list, if present. * @param mQList The list of MetricQuery expressions to evaluate. Cannot be null, but may be empty. All entries must be a valid metric expression. diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/MonitorService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/MonitorService.java index f8bbe4ba7..5202c4f0b 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/MonitorService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/MonitorService.java @@ -277,6 +277,7 @@ public static enum Counter { QUERY_STORE_BLOOM_CREATED_APPROXIMATE_ELEMENT_COUNT("argus.core", "querystore.bloomfilter.created.approximate.element.count", MetricType.COUNTER), DATALAG_PER_DC_TIME_LAG("argus.core", "datalag.seconds"), + DATALAG_PER_DC_OFFSET_LAG("argus.core", "datalag.offset"), QUERY_DATAPOINTS_LIMIT_EXCEEDED("argus.core", "query.datapoints.limit.exceeded"), ELASTIC_SEARCH_GET_FAILURES("argus.core", "elastic.search.get.failures", MetricType.COUNTER), diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/alert/DefaultAlertService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/alert/DefaultAlertService.java index d26beaa31..767cceb32 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/alert/DefaultAlertService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/alert/DefaultAlertService.java @@ -63,6 +63,7 @@ import com.salesforce.dva.argus.service.mail.EmailContext; import com.salesforce.dva.argus.service.metric.MetricQueryResult; import com.salesforce.dva.argus.service.metric.transform.MissingDataException; +import com.salesforce.dva.argus.service.monitor.DataLagService; import com.salesforce.dva.argus.service.tsdb.MetricQuery; import com.salesforce.dva.argus.system.SystemConfiguration; import com.salesforce.dva.argus.service.alert.testing.AlertTestResults; @@ -408,7 +409,7 @@ private void loadWhiteListRegexPatterns() { if (_whiteListedScopeRegexPatterns == null) { - String whiteListedScopesProperty = _configuration.getValue(SystemConfiguration.Property.DATA_LAG_WHITE_LISTED_SCOPES); + String whiteListedScopesProperty = _configuration.getValue(DataLagService.Property.DATA_LAG_WHITE_LISTED_SCOPES.getName(), DataLagService.Property.DATA_LAG_WHITE_LISTED_SCOPES.getDefaultValue()); if (!StringUtils.isEmpty(whiteListedScopesProperty)) { _whiteListedScopeRegexPatterns = Stream.of(whiteListedScopesProperty.split(",")).map(elem -> Pattern.compile(elem.toLowerCase())).collect(Collectors.toList()); @@ -420,7 +421,7 @@ private void loadWhiteListRegexPatterns() if (_whiteListedUserRegexPatterns == null) { - String whiteListedUsersProperty = _configuration.getValue(SystemConfiguration.Property.DATA_LAG_WHITE_LISTED_USERS); + String whiteListedUsersProperty = _configuration.getValue(DataLagService.Property.DATA_LAG_WHITE_LISTED_USERS.getName(), DataLagService.Property.DATA_LAG_WHITE_LISTED_USERS.getDefaultValue()); if (!StringUtils.isEmpty(whiteListedUsersProperty)) { _whiteListedUserRegexPatterns = Stream.of(whiteListedUsersProperty.split(",")).map(elem -> Pattern.compile(elem.toLowerCase())).collect(Collectors.toList()); @@ -538,7 +539,7 @@ public Integer executeScheduledAlerts(int alertCount, int timeout) { NotificationProcessor np = new NotificationProcessor(this, _logger); _monitorService.modifyCounter(Counter.ALERTS_EVALUATED_TOTAL, alerts.size(), new HashMap<>()); - boolean datalagMonitorEnabled = Boolean.valueOf(_configuration.getValue(SystemConfiguration.Property.DATA_LAG_MONITOR_ENABLED)); + boolean datalagMonitorEnabled = Boolean.valueOf(_configuration.getValue(DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getName(), DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getDefaultValue())); AtomicInteger numberOfAlertsEvaluated = new AtomicInteger(alerts.size()); for (Alert alert : alerts) { @@ -1764,7 +1765,7 @@ public boolean testEvaluateAlert(Alert alert, Long alertEvaluationTime, AlertTes // Evaluate Alert, Triggers, Notifications ----------------------------------------------------------------- // TODO - enable datalag monitor in alert testing? - boolean datalagMonitorEnabled = Boolean.valueOf(_configuration.getValue(SystemConfiguration.Property.DATA_LAG_MONITOR_ENABLED)); // TODO - get default value + boolean datalagMonitorEnabled = Boolean.valueOf(_configuration.getValue(DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getName(), DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getDefaultValue())); // TODO - get default value long jobStartTime = System.currentTimeMillis(); long evaluateEndTime = 0; diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/DefaultMetricService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/DefaultMetricService.java index d9f1469d3..6b68b6191 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/DefaultMetricService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/DefaultMetricService.java @@ -45,6 +45,7 @@ import com.salesforce.dva.argus.system.SystemAssert; import com.salesforce.dva.argus.system.SystemConfiguration; import com.salesforce.dva.argus.system.SystemException; +import com.salesforce.dva.argus.util.QueryContext; import com.salesforce.dva.argus.util.QueryContextHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,6 +213,34 @@ public List getQueries(List expressions, long relativeTo) { } return queries; } + + @Override + public List parseToMetricQuery(String expressions, long relativeTo) { + requireNotDisposed(); + SystemAssert.requireArgument(MetricReader.isValid(expressions), "Illegal metric expression found: " + expressions); + return parseToMetricQuery(Arrays.asList(expressions), relativeTo); + } + + + @Override + public List parseToMetricQuery(List expressions, long relativeTo) { + requireNotDisposed(); + + MetricReader reader = _metricReaderProviderForQueries.get(); + List queries = new ArrayList<>(); + + try { + for (String expression : expressions) { + _logger.debug("Parsing expression to metric query for {}", expression); + QueryContextHolder contextHolder = new QueryContextHolder(); + reader.parse(expression, relativeTo, MetricQuery.class, contextHolder, false); + queries.add(MetricQueryProcessor.convertTSDBQueryToMetricQuery(contextHolder.getCurrentQueryContext().getExpression())); + } + } catch (ParseException ex) { + throw new SystemException("Failed to parse the given expression", ex); + } + return queries; + } @Override public void dispose() { diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsService.java index ede30c73a..be29f1964 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsService.java @@ -352,7 +352,7 @@ public Map> getMetrics(List queries) { try { for (MetricQuery query : queries) { List consumerOffsetMetrics = new ArrayList<>(); - String queryJson = constructQuery(query, from, scrollSize); + String queryJson = constructQuery(new MetricQuery(query), from, scrollSize); final long start = System.currentTimeMillis(); Request request = new Request(ElasticSearchUtils.HttpMethod.POST.getName(), requestUrl); request.setEntity(new StringEntity(queryJson, ContentType.APPLICATION_JSON)); diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/MetricQueryProcessor.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/MetricQueryProcessor.java index 94d13ff92..caa4c4252 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/MetricQueryProcessor.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/MetricQueryProcessor.java @@ -104,10 +104,7 @@ public void mergeQueryResults(MetricQueryResult parentResult, MetricQueryResult } } - private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) { - final long start = System.currentTimeMillis(); - - MetricQueryResult queryResult = new MetricQueryResult(); + public static MetricQuery convertTSDBQueryToMetricQuery(TSDBQueryExpression expression) { Long startTimestamp = expression.getStartTimestamp(); Long endTimestamp = expression.getEndTimestamp(); String namespace = expression.getNamespace(); @@ -130,6 +127,16 @@ private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) { query.setAggregator(aggregator); } + return query; + } + + private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) { + final long start = System.currentTimeMillis(); + + MetricQueryResult queryResult = new MetricQueryResult(); + + + MetricQuery query = convertTSDBQueryToMetricQuery(expression); List queries = _discoveryService.getMatchingQueries(query); if (queries.size() == 0) { // No metrics inflow to argus in last DEFAULT_RETENTION_DISCOVERY_DAYS days. Save the raw query processed within inBoundMetricQuery. @@ -157,7 +164,8 @@ private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) { } Collections.sort(metrics); queryResult.setMetricsList(metrics); - queryResult.setQueryTimeRangeInMillis(endTimestamp-startTimestamp); + Long startTimestamp = expression.getStartTimestamp(); + queryResult.setQueryTimeRangeInMillis(expression.getEndTimestamp() - startTimestamp); queryResult.setQueryStartTimeMillis(startTimestamp); if(queries.size() !=1 || queries.get(0) != query) { queryResult.setNumDiscoveryResults(queries.size()); @@ -176,7 +184,7 @@ private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) { /* * We replace the aggregator to provide a non-interpolated default behavior for MIN, MAX and SUM */ - private Aggregator getSubstituteAggregator(Aggregator aggregator) { + private static Aggregator getSubstituteAggregator(Aggregator aggregator) { switch (aggregator) { case MIN: return Aggregator.MIMMIN; diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitor.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitor.java deleted file mode 100644 index ee8153257..000000000 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitor.java +++ /dev/null @@ -1,217 +0,0 @@ -package com.salesforce.dva.argus.service.monitor; - -import java.text.MessageFormat; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.AbstractMap.SimpleEntry; - -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; - -import com.salesforce.dva.argus.service.MonitorService; -import com.salesforce.dva.argus.service.TSDBService; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.salesforce.dva.argus.entity.Metric; -import com.salesforce.dva.argus.service.MetricService; -import com.salesforce.dva.argus.system.SystemConfiguration; - -/* - * This class runs a thread which periodically checks if there is data lag on Argus side. - * - */ -public class DataLagMonitor extends Thread{ - - - private String _hostName; - - private long _dataLagThreshold; - - private Map _isDataLaggingbyDCMap = new TreeMap<>(); - - private Map _expressionPerDC = new TreeMap<>(); - - private Map _lagPerDC = new TreeMap<>(); - - private MetricService _metricService; - - private TSDBService _tsdbService; - - private static Long SLEEP_INTERVAL_MILLIS = 60*1000L; - - private static final Double MAX_LAG_TIME_MILLIS = 4.0*60*60*1000; - - private final Logger _logger = LoggerFactory.getLogger(DataLagMonitor.class); - - private SystemConfiguration _sysConfig; - - private final ExecutorCompletionService>> _completionService; - - public DataLagMonitor(SystemConfiguration sysConfig, MetricService metricService, TSDBService tsdbService) { - super("datalag-monitor"); - _sysConfig = sysConfig; - _metricService = metricService; - _tsdbService = tsdbService; - _hostName = SystemConfiguration.getHostname(); - init(); - _completionService = new ExecutorCompletionService<>(Executors.newFixedThreadPool(5)); - _logger.info("Data lag monitor initialized"); - } - - private void init() { - String _defaultExpression = _sysConfig.getValue(SystemConfiguration.Property.DATA_LAG_DEFAULT_EXPRESSION); - _dataLagThreshold = Long.valueOf(_sysConfig.getValue(SystemConfiguration.Property.DATA_LAG_THRESHOLD)); - try { - JsonObject _dataLagQueryExpressions = new JsonParser().parse(_sysConfig.getValue(SystemConfiguration.Property.DATA_LAG_QUERY_EXPRESSION)).getAsJsonObject(); - Set> entries = _dataLagQueryExpressions.entrySet(); - for (Map.Entry entry : entries) { - String currentExpression = entry.getKey(); - JsonArray dcList = entry.getValue().getAsJsonArray(); - for (JsonElement value : dcList) { - try { - String currentDC = value.getAsString(); - _expressionPerDC.put(currentDC, currentExpression.replace("#DC#", currentDC)); - _isDataLaggingbyDCMap.put(currentDC, false); - } catch (Exception ex) { - _logger.error("Exception occured while parsing the datalag expression for DC: " + value + ", using default expression. Exception: {}", ex); - } - } - } - } catch (Exception ex) { - _logger.error("Exception occured while parsing the datalag expression json list, using default expression. Exception: {}", ex); - } - - for (String dc : _sysConfig.getValue(SystemConfiguration.Property.DC_LIST).split(",")) { - if (!_expressionPerDC.containsKey(dc)) { - _expressionPerDC.put(dc, _defaultExpression); - _isDataLaggingbyDCMap.put(dc, false); - } - _lagPerDC.put(dc, 0.0); - } - } - - @Override - public void run() { - _logger.info("Data lag monitor thread started"); - boolean firstTime = true; - String currentDC = null; - while (!isInterrupted()) { - try { - if (!firstTime) { - sleep(SLEEP_INTERVAL_MILLIS); - } else { - // waiting 5 seconds for everything to initialize - sleep(5 * 1000); - firstTime = false; - } - - final Long currTime = System.currentTimeMillis(); - for (String dc : _expressionPerDC.keySet()) { - _completionService.submit(() -> { - List metrics = new ArrayList<>(); - try { - metrics = _metricService.getMetrics(_expressionPerDC.get(dc), currTime).getMetricsList(); - } catch (Exception e) { - metrics.clear(); - _logger.error("Metric Service failed to get metric for expression: " + _expressionPerDC.get(dc) + " while being queried by DataLagMonitor, for DC: " + dc); - } - - return new SimpleEntry<>(dc, metrics); - }); - } - - for (int idx = 0; idx < _expressionPerDC.size(); ++idx) { - try { - Future>> future = _completionService.take(); - SimpleEntry> result = future.get(); - currentDC = result.getKey(); - List metrics = result.getValue(); - double lagTimeInMillis = getLagTimeInMillis(currentDC, currTime, metrics); - _lagPerDC.put(currentDC, lagTimeInMillis); - _isDataLaggingbyDCMap.put(currentDC, lagTimeInMillis > _dataLagThreshold); - pushLagTimeMetric(currentDC, currTime, lagTimeInMillis / 1000.0); - } catch (Exception ex) { - _logger.error(MessageFormat.format("Exception thrown while evaluating lag time for dc: {0} with message: {1}", currentDC, ex)); - } - } - - } catch (Exception e) { - _logger.error("Exception thrown in data lag monitor thread - " + ExceptionUtils.getFullStackTrace(e)); - } - } - } - - private double getLagTimeInMillis(String currentDC, Long currTime, List metrics) { - double lagTimeInMillis; - if (metrics == null || metrics.isEmpty()) { - _logger.info("Data lag detected as metric list is empty for DC: " + currentDC); - lagTimeInMillis = Math.min(MAX_LAG_TIME_MILLIS, _lagPerDC.get(currentDC) + SLEEP_INTERVAL_MILLIS); - } else { - //Assuming only one time series in result - Metric currMetric = metrics.get(0); - if (currMetric.getDatapoints() == null || currMetric.getDatapoints().size() == 0) { - _logger.info("Data lag detected as data point list is empty for DC: " + currentDC); - lagTimeInMillis = Math.min(MAX_LAG_TIME_MILLIS, _lagPerDC.get(currentDC) + SLEEP_INTERVAL_MILLIS); - } else { - long lastDataPointTime = Collections.max(currMetric.getDatapoints().keySet()); - lagTimeInMillis = (currTime - lastDataPointTime); - } - } - - return lagTimeInMillis; - } - - private void pushLagTimeMetric(String currentDC, Long currTime, double lagTime) { - Metric trackingMetric = new Metric(MonitorService.Counter.DATALAG_PER_DC_TIME_LAG.getScope(), MonitorService.Counter.DATALAG_PER_DC_TIME_LAG.getMetric()); - ExecutorService _executorService = Executors.newSingleThreadExecutor(); - Map tags = new HashMap<>(); - - tags.put("dc", currentDC); - tags.put("host", _hostName); - - trackingMetric.setTags(tags); - Map currentDatapoint = new HashMap<>(); - currentDatapoint.put(currTime, lagTime); - trackingMetric.setDatapoints(currentDatapoint); - - try { - _executorService.submit(()->{ - _tsdbService.putMetrics(Collections.singletonList(trackingMetric)); - _logger.debug(MessageFormat.format("Pushing datalag metric - hostname:{0}, dc:{1}, lagTime:{2}",_hostName, currentDC, lagTime)); - }); - } catch (Exception ex) { - _logger.error("Exception occurred while pushing datalag metric to tsdb - {}", ex.getMessage()); - } finally { - _executorService.shutdown(); - } - } - - public boolean isDataLagging(String currentDC) { - try { - if (currentDC != null && currentDC.length() > 0 && _isDataLaggingbyDCMap.containsKey(currentDC)) { - return _isDataLaggingbyDCMap.get(currentDC); - } else { - return _isDataLaggingbyDCMap.values().stream() - .reduce((e1, e2) -> (e1 || e2)) - .get(); - } - } catch (Exception ex) { - _logger.error(MessageFormat.format("Failed to identify whether DC {0} was lagging.", currentDC)); - return false; - } - } -} \ No newline at end of file diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorConsumerOffset.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorConsumerOffset.java new file mode 100644 index 000000000..c46afc993 --- /dev/null +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorConsumerOffset.java @@ -0,0 +1,429 @@ +/* + * + * * Copyright (c) 2016, Salesforce.com, Inc. + * * All rights reserved. + * * + * * Redistribution and use in source and binary forms, with or without + * * modification, are permitted provided that the following conditions are met: + * * + * * 1. Redistributions of source code must retain the above copyright notice, + * * this list of conditions and the following disclaimer. + * * + * * 2. Redistributions in binary form must reproduce the above copyright notice, + * * this list of conditions and the following disclaimer in the documentation + * * and/or other materials provided with the distribution. + * * + * * 3. Neither the name of Salesforce.com nor the names of its contributors may + * * be used to endorse or promote products derived from this software without + * * specific prior written permission. + * * + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * * POSSIBILITY OF SUCH DAMAGE. + * + */ + +/** + * Implements data lag detection on alert client side using consumer offset lag posted by kafka consumers in the upstream. + * @author Sudhanshu.Bahety (sudhanshu.bahety@salesforce.com) + * */ +package com.salesforce.dva.argus.service.monitor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.inject.Inject; +import com.salesforce.dva.argus.entity.Metric; +import com.salesforce.dva.argus.service.MailService; +import com.salesforce.dva.argus.service.MetricService; +import com.salesforce.dva.argus.service.MetricStorageService; +import com.salesforce.dva.argus.service.MonitorService; +import com.salesforce.dva.argus.service.TSDBService; +import com.salesforce.dva.argus.service.mail.EmailContext; +import com.salesforce.dva.argus.service.tsdb.MetricQuery; +import com.salesforce.dva.argus.system.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.NotFoundException; +import java.text.MessageFormat; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import static com.salesforce.dva.argus.system.SystemAssert.requireArgument; + +public class DataLagMonitorConsumerOffset implements DataLagService { + + private String hostName; + private Map dataLagTriggerThresholdPerDC = new HashMap<>(); + private Map dataLagClearThresholdPerDC = new HashMap<>(); + private Map expressionPerDC = new HashMap<>(); + private Map lagStatePerDC = new HashMap<>(); + private Set dcSet = new HashSet<>(); + private MetricStorageService consumerOffsetMetricService; + private MetricService metricService; + private TSDBService tsdbService; + private MailService mailService; + private Set enforceLagPresentSet; + private final Logger logger = LoggerFactory.getLogger(DataLagMonitorConsumerOffset.class); + private SystemConfiguration sysConfig; + + private static final String TOPIC_TAG = "topic"; + private static final Long SLEEP_INTERVAL_MILLIS = 60 * 1000L; + private static Long datalagInertia = 5 * 60 * 1000L; + + // TODO: Remove this once verification completes. + private static Boolean isInDebugMode = false; + private static String DEFAULT_EMAIL = "sudhanshu.bahety@salesforce.com"; + private static final String DEFAULT_SUBJECT = "Data Lag Consumer Offset Method detected a state change"; + private static String DEBUG_PREFIX; + + @Inject + public DataLagMonitorConsumerOffset(SystemConfiguration config, MetricStorageService consumerOffsetMetricService, MetricService metricService, TSDBService tsdbService, MailService mailService) { + this.sysConfig = config; + initDebug(); + this.consumerOffsetMetricService = consumerOffsetMetricService; + this.tsdbService = tsdbService; + this.metricService = metricService; + this.mailService = mailService; + this.hostName = SystemConfiguration.getHostname(); + datalagInertia = Long.valueOf(sysConfig.getValue(Property.DATA_LAG_INERTIA.getName(), Property.DATA_LAG_INERTIA.getDefaultValue())); + init(); + this.logger.info(DEBUG_PREFIX + "Data lag consumer offset monitor initialized"); + } + + // TODO: Remove this once verification completes. + private void initDebug() { + isInDebugMode = Boolean.valueOf(sysConfig.getValue(Property.DATA_LAG_DEBUG.getName(), Property.DATA_LAG_DEBUG.getDefaultValue())); + DEFAULT_EMAIL = String.valueOf(sysConfig.getValue(Property.DATA_LAG_EMAIL.getName(), Property.DATA_LAG_EMAIL.getDefaultValue())); + if (isInDebugMode) { + DEBUG_PREFIX = "[DEBUG-DATALAG] "; + } else { + DEBUG_PREFIX = ""; + } + } + + private void init() { + dcSet = Sets.newHashSet(sysConfig.getValue(SystemConfiguration.Property.DC_LIST).split(",")); + enforceLagPresentSet = Sets.newHashSet(sysConfig.getValue(DataLagService.Property.DATA_LAG_ENFORCE_DC_LIST.getName(), DataLagService.Property.DATA_LAG_ENFORCE_DC_LIST.getDefaultValue()).trim().toUpperCase().split(",")); + // Read expression per DC from the config file + String defaultExpression = sysConfig.getValue(Property.DATA_LAG_DEFAULT_EXPRESSION.getName(), Property.DATA_LAG_DEFAULT_EXPRESSION.getDefaultValue()); + try { + JsonObject dataLagQueryExpressions = new JsonParser().parse(sysConfig.getValue(Property.DATA_LAG_QUERY_EXPRESSION.getName(), Property.DATA_LAG_QUERY_EXPRESSION.getDefaultValue())).getAsJsonObject(); + for (Map.Entry entry : dataLagQueryExpressions.entrySet()) { + String currentExpression = entry.getKey().trim(); + JsonArray dcList = entry.getValue().getAsJsonArray(); + for (JsonElement value : dcList) { + try { + String currentDC = value.getAsString().trim().toUpperCase(); + expressionPerDC.put(currentDC, currentExpression.replace("#DC#", currentDC.toLowerCase()));//Note: When we post from AKC, all DCs are in lower case. + dcSet.add(currentDC); + } catch (Exception ex) { + logger.error(DEBUG_PREFIX + "Exception occured while parsing the datalag expression for DC: " + value + ", using default expression. Exception: " + ex); + } + } + } + } catch (Exception ex) { + logger.error(DEBUG_PREFIX + "Exception occured while parsing the datalag expression json list, using default expression. Exception: ", ex); + } finally { + // Fill with default values + for (String dc : dcSet) { + dc = dc.trim().toUpperCase(); + if (!expressionPerDC.containsKey(dc)) { + expressionPerDC.put(dc, defaultExpression); + } + lagStatePerDC.put(dc, false); + } + } + + // Read default thresholds. + Long defaultTriggerThreshold = Long.valueOf(sysConfig.getValue(Property.DATA_LAG_DEFAULT_TRIGGER_THRESHOLD.getName(), Property.DATA_LAG_DEFAULT_TRIGGER_THRESHOLD.getDefaultValue())); + Long defaultClearThreshold = Long.valueOf(sysConfig.getValue(Property.DATA_LAG_DEFAULT_CLEAR_THRESHOLD.getName(), Property.DATA_LAG_DEFAULT_CLEAR_THRESHOLD.getDefaultValue())); + // Read DC per threshold from the config file. + readThresholds(sysConfig.getValue(Property.DATA_LAG_TRIGGER_THRESHOLD.getName(), Property.DATA_LAG_TRIGGER_THRESHOLD.getDefaultValue()), dataLagTriggerThresholdPerDC, defaultTriggerThreshold); + readThresholds(sysConfig.getValue(Property.DATA_LAG_CLEAR_THRESHOLD.getName(), Property.DATA_LAG_CLEAR_THRESHOLD.getDefaultValue()), dataLagClearThresholdPerDC, defaultClearThreshold); + } + + private void readThresholds(String thresholdProperty, Map dataLagThresholdPerDC, Long defaultValue) { + requireArgument(dataLagThresholdPerDC != null, "Data lag threshold per dc cannot be null"); + requireArgument(defaultValue != null, "Default threshold value cannot be null"); + JsonObject dataLagThresholdObject = null; + try { + dataLagThresholdObject = new JsonParser().parse(thresholdProperty).getAsJsonObject(); + Set> entriesForThreshold = dataLagThresholdObject.entrySet(); + for (Map.Entry entry : entriesForThreshold) { + Long currentThreshold = Long.valueOf(entry.getKey().trim()); + JsonArray dcList = entry.getValue().getAsJsonArray(); + for (JsonElement value : dcList) { + try { + String currentDC = value.getAsString().trim().toUpperCase(); + dataLagThresholdPerDC.put(currentDC, currentThreshold); + dcSet.add(currentDC); + } catch (Exception ex) { + logger.error(DEBUG_PREFIX + "Exception occured while parsing threshold for DC: " + value + ", using default threshold. Exception: ", ex); + } + } + } + } catch (Exception ex) { + logger.error(DEBUG_PREFIX + "Exception occured while parsing threshold value per dc. Exception: ", ex); + } finally { + dcSet.stream() + .filter(dc -> !dataLagThresholdPerDC.containsKey(dc)) + .forEach(dc -> dataLagThresholdPerDC.put(dc, defaultValue)); + } + } + + @Override + public void run() { + logger.info(DEBUG_PREFIX + "Data lag consumer offset monitor thread started"); + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(SLEEP_INTERVAL_MILLIS); + queryMetricsForDC(dcSet, System.currentTimeMillis()).forEach(this::computeDataLag); + } catch (Exception e) { + logger.error(DEBUG_PREFIX + "Exception thrown in data lag monitor thread: " + e); + } + } + } + + @Override + public Map> queryMetricsForDC(Set dcSet, Long startTime) { + requireArgument(dcSet != null && !dcSet.isEmpty(), "DCs for which data lag is to be queried cannot be null or empty"); + if (startTime == null) { + logger.warn(DEBUG_PREFIX + "Start time from which data lag is to be computed is null, taking current value by default"); + startTime = System.currentTimeMillis(); + } + + Long startTimeFinal = startTime; + + Map> result = new HashMap<>(); + List mQList = dcSet.stream().parallel() + .map(expressionPerDC::get) + .map(expression -> metricService.parseToMetricQuery(expression, startTimeFinal)) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + if (mQList.size() != dcSet.size()) { + logger.error(DEBUG_PREFIX + "Metric Query Size does not match number of dcs present. Metric Query: {}, DCs: {}", mQList, dcSet); + } + + consumerOffsetMetricService.getMetrics(mQList).forEach((mQ, mList) -> result.put(getDCFromTopic(mQ.getTags().get(TOPIC_TAG)), mList)); + return result; + } + + @VisibleForTesting + protected String getDCFromTopic(String topic) { + requireArgument(topic != null, "Topic for which dc is to be extracted cannot be null"); + String finalTopic = topic.toUpperCase(); + for(String s: dcSet) { + if (finalTopic.contains(s)) { + return s; + } + } + throw new NotFoundException(DEBUG_PREFIX + "No Data center could be inferred from topic: " + topic); + } + + /* + In current implementation, inertia value is same as the look back window for which metric is queried. + This helps in reducing the problem of only checking if all the value returned in time series violates the condition. + TODO: If the look back window is different from the inertia period, the logic has to be changed. + */ + @Override + public Boolean computeDataLag(String dc, List metricList) { + + if (metricList.size() <= 0) { + logger.error(DEBUG_PREFIX + "No Metrics could be obtained for dc: {}, disabling data lag by default.", dc); + lagStatePerDC.put(dc, false); + return false; + } + else if (metricList.size() != 1) { + logger.warn(DEBUG_PREFIX + "More than 1 metrics returned for a single dc: {}, Metric list: {}\nCombining all data points to compute data lag.",dc, metricList); + } + requireArgument(dc != null, "Data center for which data lag is to be computed cannot be null"); + + dc = dc.trim().toUpperCase(); + Map datapoints = new HashMap<>(); + metricList.forEach(m -> datapoints.putAll(m.getDatapoints())); + Long triggeringThreshold = dataLagTriggerThresholdPerDC.get(dc); + Long clearingThreshold = dataLagClearThresholdPerDC.get(dc); + boolean isTriggering = true, isClearing = true, initialState = lagStatePerDC.get(dc); + + if(datapoints.size() == 0) { + logger.warn(DEBUG_PREFIX + "No metrics retrieved for Metrics: {}", metricList); + logger.warn(DEBUG_PREFIX + "Enabling data lag for dc: {}", dc); + lagStatePerDC.put(dc, true); + // If we are unable to retrieve metric for the current minute, we are resorting to the default value of 0. + pushMetric(System.currentTimeMillis(), 0.0, dc); + } else { + for (Double currentValue : datapoints.values()) { + isTriggering &= (currentValue >= triggeringThreshold); + isClearing &= (currentValue < clearingThreshold); + + if (!isTriggering && !isClearing) { + break; + } + } + + if (isTriggering && isClearing) { + logger.error(DEBUG_PREFIX + MessageFormat.format("Both Triggering and Clearing conditions cannot hold true at the same time. datapoints: {0}, Triggering threshold: {1}, Clearing threshold: {2}", datapoints, triggeringThreshold, clearingThreshold)); + lagStatePerDC.put(dc, true); + } + else if (isTriggering) { + lagStatePerDC.put(dc, true); + } else if (isClearing) { + lagStatePerDC.put(dc, false); + } + pushMetric(System.currentTimeMillis(), Collections.max(datapoints.values()), dc); + } + + // TODO: Remove this once verification completes. + if ( isInDebugMode && (initialState ^ lagStatePerDC.get(dc)) ) { // Notify whenever there is a state change. + + StringBuilder message = new StringBuilder(); + String state = isTriggering ? "Triggering" : "Clearing"; + + message.append("

Data lag state change was detected by consumer offset method.

"); + message.append(MessageFormat.format("DC: {0}, State: {1}, Triggering Threshold: {2}, Clearing Threshold: {3}", dc, state, triggeringThreshold, clearingThreshold)); + message.append(MessageFormat.format("
List retrieved by ES: {0}", metricList)); + EmailContext.Builder emailContextBuilder = new EmailContext.Builder() + .withRecipients(Sets.newHashSet(DEFAULT_EMAIL)) + .withSubject(DEFAULT_SUBJECT) + .withEmailBody(message.toString()) + .withContentType("text/html; charset=utf-8") + .withEmailPriority(MailService.Priority.HIGH); + mailService.sendMessage(emailContextBuilder.build()); + } + + return lagStatePerDC.get(dc); + } + + @Override + public Boolean isDataLagging(String currentDC) { + if (currentDC == null) { + return false; + } + currentDC = currentDC.trim().toUpperCase(); + + if (enforceLagPresentSet.contains(currentDC)) { + return true; + } + if (lagStatePerDC.containsKey(currentDC)) { + return lagStatePerDC.get(currentDC); + } + + return lagStatePerDC.values() + .stream() + .reduce((e1, e2) -> (e1 || e2)) + .orElse(false); + } + + @Override + public void pushMetric(Long time, Double value, String dc) { + requireArgument( value != null, "Value of conusmer offset metric cannot be null"); + if (time == null) { + logger.warn("Time when the metric is pushed is null. Using current time"); + time = System.currentTimeMillis(); + } + if (dc == null) { + logger.warn("DC for which metric is pushed is null. Using NO_DC_SPECIFIED as value"); + dc = "NO_DC_SPECIFIED"; + } + + String finalDC = dc; + + Metric trackingMetric = new Metric(MonitorService.Counter.DATALAG_PER_DC_OFFSET_LAG.getScope(), MonitorService.Counter.DATALAG_PER_DC_OFFSET_LAG.getMetric()); + ExecutorService _executorService = Executors.newSingleThreadExecutor(); + Map tags = new HashMap<>(); + + tags.put("dc", dc); + tags.put("host", hostName); + + trackingMetric.setTags(tags); + Map currentDatapoint = new HashMap<>(); + currentDatapoint.put(time, value); + trackingMetric.setDatapoints(currentDatapoint); + + try { + _executorService.submit(()->{ + tsdbService.putMetrics(Collections.singletonList(trackingMetric)); + logger.debug(DEBUG_PREFIX + MessageFormat.format("Pushing datalag metric - hostname:{0}, dc:{1}, offset:{2}", hostName, finalDC, value)); + }); + } catch (Exception ex) { + logger.error(DEBUG_PREFIX + "Exception occurred while pushing datalag metric to tsdb: ", ex); + } finally { + _executorService.shutdown(); + } + } + + /** + * The set of implementation specific configuration properties. + * + */ + public enum Property { + + /** Minute Threshold before you enable data lag */ + DATA_LAG_TRIGGER_THRESHOLD("system.property.data.lag.consumer.offset.trigger.threshold", "thresholdPerDC"), + /** Minute Threshold before you disable data lag */ + DATA_LAG_CLEAR_THRESHOLD("system.property.data.lag.consumer.offset.clear.threshold", "thresholdPerDC"), + /** Expression per dc to determine data lag */ + DATA_LAG_QUERY_EXPRESSION("system.property.data.lag.consumer.offset.expression.list","expressionListPerDC"), + /** Default expression if the expression for dc cannot be queried. */ + DATA_LAG_DEFAULT_EXPRESSION("system.property.data.lag.consumer.offset.default.expression","defaultExpression"), + /** Default threshold if the trigger threshold for dc is not specified. */ + DATA_LAG_DEFAULT_TRIGGER_THRESHOLD("system.property.data.lag.consumer.offset.default.trigger.threshold","23000"), + /** Default threshold if the clear threshold for dc is not specified. */ + DATA_LAG_DEFAULT_CLEAR_THRESHOLD("system.property.data.lag.consumer.offset.default.clear.threshold","5000"), + /** Inertia value for which data lag should continuously hold true. */ + DATA_LAG_INERTIA("system.property.data.lag.consumer.offset.default.inertia.millis","300000"), + + // TODO: Remove this once verification completes. + /** Data lag to be run in debug mode to check the behaviour */ + DATA_LAG_DEBUG("system.property.data.lag.consumer.offset.debug.mode", "true"), + /** Default email for debugging purposes */ + DATA_LAG_EMAIL("system.property.data.lag.consumer.offset.debug.email", "sudhanshu.bahety@salesforce.com"); + + private final String _name; + private final String _defaultValue; + + Property(String name, String defaultValue) { + _name = name; + _defaultValue = defaultValue; + } + + /** + * Returns the property name. + * + * @return The property name. + */ + public String getName() { + return _name; + } + + /** + * Returns the default value for the property. + * + * @return The default value. + */ + public String getDefaultValue() { + return _defaultValue; + } + } +} diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorGoldenMetric.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorGoldenMetric.java new file mode 100644 index 000000000..ee2965a80 --- /dev/null +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorGoldenMetric.java @@ -0,0 +1,266 @@ +package com.salesforce.dva.argus.service.monitor; + +import com.google.common.collect.Sets; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.inject.Inject; +import com.salesforce.dva.argus.entity.Metric; +import com.salesforce.dva.argus.service.MetricService; +import com.salesforce.dva.argus.service.MonitorService; +import com.salesforce.dva.argus.service.TSDBService; +import com.salesforce.dva.argus.system.SystemConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static com.salesforce.dva.argus.system.SystemAssert.requireArgument; + +/* + * This class runs a thread which periodically checks if there is data lag on Argus side. + * + */ +public class DataLagMonitorGoldenMetric implements DataLagService { + + + private String _hostName; + private long _dataLagThreshold; + private Map _isDataLaggingbyDCMap = new TreeMap<>(); + private Map _expressionPerDC = new TreeMap<>(); + private Map _lagPerDC = new TreeMap<>(); + private Set enforceLagPresentSet; + private MetricService _metricService; + private TSDBService _tsdbService; + private static Long SLEEP_INTERVAL_MILLIS = 60*1000L; + private static final Double MAX_LAG_TIME_MILLIS = 4.0*60*60*1000; + private final Logger _logger = LoggerFactory.getLogger(DataLagMonitorGoldenMetric.class); + private SystemConfiguration _sysConfig; + private final ExecutorCompletionService>> _completionService; + + @Inject + public DataLagMonitorGoldenMetric(SystemConfiguration sysConfig, MetricService metricService, TSDBService tsdbService) { + _sysConfig = sysConfig; + _metricService = metricService; + _tsdbService = tsdbService; + _hostName = SystemConfiguration.getHostname(); + init(); + _completionService = new ExecutorCompletionService<>(Executors.newFixedThreadPool(5)); + _logger.info("Data lag golden metric monitor initialized"); + } + + private void init() { + String _defaultExpression = _sysConfig.getValue(Property.DATA_LAG_DEFAULT_EXPRESSION.getName(), Property.DATA_LAG_DEFAULT_EXPRESSION.getDefaultValue()); + _dataLagThreshold = Long.valueOf(_sysConfig.getValue(Property.DATA_LAG_THRESHOLD.getName(), Property.DATA_LAG_THRESHOLD.getDefaultValue())); + try { + JsonObject _dataLagQueryExpressions = new JsonParser().parse(_sysConfig.getValue(Property.DATA_LAG_QUERY_EXPRESSION.getName(), Property.DATA_LAG_QUERY_EXPRESSION.getDefaultValue())).getAsJsonObject(); + Set> entries = _dataLagQueryExpressions.entrySet(); + for (Map.Entry entry : entries) { + String currentExpression = entry.getKey().trim(); + JsonArray dcList = entry.getValue().getAsJsonArray(); + for (JsonElement value : dcList) { + try { + String currentDC = value.getAsString().trim(); + _expressionPerDC.put(currentDC, currentExpression.replace("#DC#", currentDC)); + _isDataLaggingbyDCMap.put(currentDC, false); + } catch (Exception ex) { + _logger.error("Exception occured while parsing the datalag expression for DC: " + value + ", using default expression. Exception: {0}", ex); + } + } + } + } catch (Exception ex) { + _logger.error("Exception occured while parsing the datalag expression json list, using default expression. Exception: ", ex); + } + + for (String dc : _sysConfig.getValue(SystemConfiguration.Property.DC_LIST).split(",")) { + if (!_expressionPerDC.containsKey(dc)) { + _expressionPerDC.put(dc, _defaultExpression); + _isDataLaggingbyDCMap.put(dc, false); + } + _lagPerDC.put(dc, 0.0); + } + + enforceLagPresentSet = Sets.newHashSet(_sysConfig.getValue(DataLagService.Property.DATA_LAG_ENFORCE_DC_LIST.getName(), DataLagService.Property.DATA_LAG_ENFORCE_DC_LIST.getDefaultValue()).split(",")); + } + + @Override + public void run() { + _logger.info("Data lag golden metric monitor thread started"); + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(SLEEP_INTERVAL_MILLIS); + queryMetricsForDC(_expressionPerDC.keySet(), System.currentTimeMillis()).forEach(this::computeDataLag); + } catch (Exception e) { + _logger.error("Exception thrown in data lag golden metric monitor thread: ", e); + } + } + } + + @Override + public Boolean isDataLagging(String currentDC) { + if (currentDC == null) { + return false; + } + currentDC = currentDC.trim().toUpperCase(); + + if (enforceLagPresentSet.contains(currentDC) ) { + return true; + } + if (_isDataLaggingbyDCMap.containsKey(currentDC) ) { + return _isDataLaggingbyDCMap.get(currentDC); + } + return _isDataLaggingbyDCMap.values() + .stream() + .reduce((e1, e2) -> (e1 || e2)) + .orElse(false); + } + + @Override + public Map > queryMetricsForDC(Set dcSet, Long startTime) { + requireArgument(dcSet != null && !dcSet.isEmpty(), "DCs for which data lag is to be queried cannot be null or empty"); + requireArgument(startTime != null, "start time from which query begins cannot be empty"); + + Map> metricsPerDC = new HashMap<>(); + for (String dc : dcSet) { + _completionService.submit(() -> { + List metrics = new ArrayList<>(); + try { + metrics = _metricService.getMetrics(_expressionPerDC.get(dc), startTime).getMetricsList(); + } catch (Exception e) { + metrics.clear(); + _logger.error("Metric Service failed to get metric for expression: " + _expressionPerDC.get(dc) + " while being queried by DataLagMonitorGoldenMetric, for DC: " + dc + " Exception: ", e); + } + + return new SimpleEntry<>(dc, metrics); + }); + } + + for (int idx = 0; idx < dcSet.size(); ++idx) { + try { + Future>> future = _completionService.take(); + SimpleEntry> result = future.get(); + String currentDC = result.getKey(); + List metrics = result.getValue(); + metricsPerDC.put(currentDC, metrics); + } catch (Exception e) { + _logger.error(MessageFormat.format("Exception thrown while evaluating lag time for dc with message: ", e)); + } + } + return metricsPerDC; + } + + @Override + public Boolean computeDataLag(String dc, List metrics) { + requireArgument(dc != null, "Data center for which data lag is to be computed cannot be null"); + + double lagTimeInMillis; + Long currTime = System.currentTimeMillis(); + + if (metrics == null || metrics.isEmpty()) { + _logger.info("Data lag detected as metric list is empty for DC: " + dc); + lagTimeInMillis = Math.min(MAX_LAG_TIME_MILLIS, _lagPerDC.get(dc) + SLEEP_INTERVAL_MILLIS); + } else { + if (metrics.size() > 1) { + _logger.warn("More than 1 metric returned by the metric service while querying for data lag: {}", metrics); + } + //Assuming only one time series in result. + Metric currMetric = metrics.get(0); + if (currMetric.getDatapoints() == null || currMetric.getDatapoints().size() == 0) { + _logger.info("Data lag detected as data point list is empty for DC: " + dc); + lagTimeInMillis = Math.min(MAX_LAG_TIME_MILLIS, _lagPerDC.get(dc) + SLEEP_INTERVAL_MILLIS); + } else { + long lastDataPointTime = Collections.max(currMetric.getDatapoints().keySet()); + lagTimeInMillis = (currTime - lastDataPointTime); + } + } + + _lagPerDC.put(dc, lagTimeInMillis); + _isDataLaggingbyDCMap.put(dc, lagTimeInMillis > _dataLagThreshold); + pushMetric(currTime, lagTimeInMillis, dc); + + return lagTimeInMillis > _dataLagThreshold; + } + + @Override + public void pushMetric(Long currTime, Double lagTime, String currentDC) { + requireArgument(currTime != null, "Time when the metric is pushed should not be null"); + requireArgument( lagTime != null, "Value of conusmer offset metric cannot be null"); + requireArgument(currentDC != null, "Should specify data center for which offset is being pushed"); + + Metric trackingMetric = new Metric(MonitorService.Counter.DATALAG_PER_DC_TIME_LAG.getScope(), MonitorService.Counter.DATALAG_PER_DC_TIME_LAG.getMetric()); + ExecutorService _executorService = Executors.newSingleThreadExecutor(); + Map tags = new HashMap<>(); + + tags.put("dc", currentDC); + tags.put("host", _hostName); + + trackingMetric.setTags(tags); + Map currentDatapoint = new HashMap<>(); + currentDatapoint.put(currTime, lagTime); + trackingMetric.setDatapoints(currentDatapoint); + + try { + _executorService.submit(()->{ + _tsdbService.putMetrics(Collections.singletonList(trackingMetric)); + _logger.debug(MessageFormat.format("Pushing datalag metric - hostname:{0}, dc:{1}, lagTime:{2}",_hostName, currentDC, lagTime)); + }); + } catch (Exception ex) { + _logger.error("Exception occurred while pushing datalag metric to tsdb: ", ex); + } finally { + _executorService.shutdown(); + } + } + + /** + * The set of implementation specific configuration properties. + * + */ + public enum Property { + + /** Minute Threshold before you enable data lag */ + DATA_LAG_THRESHOLD("system.property.data.lag.threshold.millis", "300000"), + /** Expression per dc to determine data lag */ + DATA_LAG_QUERY_EXPRESSION("system.property.data.lag.expression.list","expressionListPerDC"), + /** Default expression if the expression for dc cannot be queried. */ + DATA_LAG_DEFAULT_EXPRESSION("system.property.data.lag.default.expression","defaultExpression"); + + private final String _name; + private final String _defaultValue; + + Property(String name, String defaultValue) { + _name = name; + _defaultValue = defaultValue; + } + + /** + * Returns the property name. + * + * @return The property name. + */ + public String getName() { + return _name; + } + + /** + * Returns the default value for the property. + * + * @return The default value. + */ + public String getDefaultValue() { + return _defaultValue; + } + } +} diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagService.java new file mode 100644 index 000000000..0500b2438 --- /dev/null +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagService.java @@ -0,0 +1,118 @@ +/* + * + * * Copyright (c) 2016, Salesforce.com, Inc. + * * All rights reserved. + * * + * * Redistribution and use in source and binary forms, with or without + * * modification, are permitted provided that the following conditions are met: + * * + * * 1. Redistributions of source code must retain the above copyright notice, + * * this list of conditions and the following disclaimer. + * * + * * 2. Redistributions in binary form must reproduce the above copyright notice, + * * this list of conditions and the following disclaimer in the documentation + * * and/or other materials provided with the distribution. + * * + * * 3. Neither the name of Salesforce.com nor the names of its contributors may + * * be used to endorse or promote products derived from this software without + * * specific prior written permission. + * * + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * * POSSIBILITY OF SUCH DAMAGE. + * + */ +package com.salesforce.dva.argus.service.monitor; + +import com.salesforce.dva.argus.entity.Metric; + +import java.util.Map; +import java.util.Set; + +import java.util.List; +/** + * Interface to check for data lag based on various approaches. + * @author sudhanshu.bahety + */ +public interface DataLagService extends Runnable { + /** + * + * @param dcSet List of dc for which we need to make query + * @param startTime start time for the query + * @return Mapping of metric per dc that is to be used to compute data lag + */ + public Map> queryMetricsForDC(Set dcSet, Long startTime); + + /** + * + * @param dc name of the data centre + * @param metricList List of metrics for the specific data centre + * @return Status based on hypothesis whether data is lagging in dc or not + */ + public Boolean computeDataLag(String dc, List metricList); + + /** + * + * @param dc name of the data centre + * @return if data is lagging in the dc + */ + public Boolean isDataLagging(String dc); + + /** + * + * @param time time when the lag metric is pushed + * @param value value of the lag metric + * @param dc dc corresponding to the lag metric + */ + public void pushMetric(Long time, Double value, String dc); + + /** + * The set of implementation specific configuration properties. + * + */ + public enum Property { + + /** Flag to enable/disable monitoring */ + DATA_LAG_MONITOR_ENABLED("system.property.monitor.data.lag", "false"), + /** Whitelist scopes for which data lag always evaluates to false*/ + DATA_LAG_WHITE_LISTED_SCOPES("system.property.data.lag.whitelisted.scopes", "whiteListedScope"), + /** Whitelist scope of user for which data lag always evaluates to false*/ + DATA_LAG_WHITE_LISTED_USERS("system.property.data.lag.whitelisted.username", "default"), + /** List of DC for which data lag present should always evaluate to true*/ + DATA_LAG_ENFORCE_DC_LIST("system.property.data.lag.enforce.dc.list", "dcList"); + + private final String _name; + private final String _defaultValue; + + Property(String name, String defaultValue) { + _name = name; + _defaultValue = defaultValue; + } + + /** + * Returns the property name. + * + * @return The property name. + */ + public String getName() { + return _name; + } + + /** + * Returns the default value for the property. + * + * @return The default value. + */ + public String getDefaultValue() { + return _defaultValue; + } + } +} diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DefaultMonitorService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DefaultMonitorService.java index e8f7e31c0..c570156be 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DefaultMonitorService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DefaultMonitorService.java @@ -47,6 +47,7 @@ import com.salesforce.dva.argus.service.DashboardService; import com.salesforce.dva.argus.service.MailService; import com.salesforce.dva.argus.service.MetricService; +import com.salesforce.dva.argus.service.MetricStorageService; import com.salesforce.dva.argus.service.MonitorService; import com.salesforce.dva.argus.service.ServiceManagementService; import com.salesforce.dva.argus.service.TSDBService; @@ -123,7 +124,10 @@ public class DefaultMonitorService extends DefaultJPAService implements MonitorS private final SystemConfiguration _sysConfig; private final MBeanServer _mbeanServer; private Thread _monitorThread; - private DataLagMonitor _dataLagMonitorThread; + private Thread _dataLagMonitorThread; + private DataLagService _dataLagService; + // TODO: remove this after data lag consumer offset service verification is over. + private MetricStorageService _metricStorageService; //~ Constructors ********************************************************************************************************************************* @@ -142,8 +146,8 @@ public class DefaultMonitorService extends DefaultJPAService implements MonitorS */ @Inject public DefaultMonitorService(TSDBService tsdbService, UserService userService, AlertService alertService, - ServiceManagementService serviceManagementService, DashboardService dashboardService, MetricService metricService, MailService mailService, - SystemConfiguration sysConfig) { + ServiceManagementService serviceManagementService, DashboardService dashboardService, MetricService metricService, MailService mailService, + SystemConfiguration sysConfig, DataLagService dataLagService, MetricStorageService metricStorageService) { super(null, sysConfig); requireArgument(tsdbService != null, "TSDB service cannot be null."); requireArgument(userService != null, "User service cannot be null."); @@ -162,6 +166,9 @@ public DefaultMonitorService(TSDBService tsdbService, UserService userService, A _mbeanServer = ManagementFactory.getPlatformMBeanServer(); _metrics = new ConcurrentHashMap<>(); _registeredMetrics = new ConcurrentHashMap<>(); + _dataLagService = dataLagService; + // TODO: remove this after data lag consumer offset service verification is over. + _metricStorageService = metricStorageService; } //~ Methods ************************************************************************************************************************************** @@ -215,23 +222,26 @@ public synchronized void startRecordingCounters() { _checkAlertExistence(true); _monitorThread = new MonitorThread("system-monitor"); - _monitorThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - - @Override - public void uncaughtException(Thread t, Throwable e) { - _logger.error("Uncaught exception occurred while pushing monitor counters for {}. Reason: {}", HOSTNAME, e.getMessage()); - t.interrupt(); + _monitorThread.setUncaughtExceptionHandler((t, e) -> { + _logger.error("Uncaught exception occurred while pushing monitor counters for {}. Reason: {}", HOSTNAME, e.getMessage()); + t.interrupt(); } - }); + ); _monitorThread.start(); _logger.info("System monitor thread started."); - if (Boolean.valueOf(_sysConfig.getValue(com.salesforce.dva.argus.system.SystemConfiguration.Property.DATA_LAG_MONITOR_ENABLED))) { + // TODO: remove this after data lag consumer offset service verification is over. + if (Boolean.valueOf(_sysConfig.getValue(DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getName(), DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getDefaultValue()))) { _logger.info("Starting data lag monitor thread."); - _dataLagMonitorThread = new DataLagMonitor(_sysConfig, _metricService, _tsdbService); + _dataLagMonitorThread = new Thread(_dataLagService, "datalag-monitor-thread-" + _dataLagService.getClass().getSimpleName()); _dataLagMonitorThread.start(); _logger.info("Data lag monitor thread started."); + + if(Boolean.valueOf(_sysConfig.getValue(DataLagMonitorConsumerOffset.Property.DATA_LAG_DEBUG.getName(), DataLagMonitorConsumerOffset.Property.DATA_LAG_DEBUG.getDefaultValue()))) { + _logger.info("Starting data lag consumer offset monitor thread in debug mode"); + new Thread(new DataLagMonitorConsumerOffset(_sysConfig, _metricStorageService, _metricService, _tsdbService, _mailService)).start(); + } } } } @@ -394,8 +404,8 @@ public synchronized void dispose() { @Override public boolean isDataLagging(String dataCenter) { - if(_dataLagMonitorThread!=null) { - return _dataLagMonitorThread.isDataLagging(dataCenter); + if(_dataLagMonitorThread != null) { + return _dataLagService.isDataLagging(dataCenter); }else { return false; } diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java index b4d2cc84e..19121083b 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java @@ -240,17 +240,12 @@ public enum Property { REFOCUS_CLIENT_THREADS("system.property.refocus.client.threads", "1"), REFOCUS_CLIENT_CONNECT_TIMEOUT("system.property.refocus.client.connect.timeout", "10000"), - DATA_LAG_THRESHOLD("system.property.data.lag.threshold.millis", "300000"), - DATA_LAG_MONITOR_ENABLED("system.property.monitor.data.lag", "false"), - DATA_LAG_WHITE_LISTED_SCOPES("system.property.data.lag.whitelisted.scopes", "whiteListedScope"), - DATA_LAG_WHITE_LISTED_USERS("system.property.data.lag.whitelisted.username", "default"), - DATA_LAG_QUERY_EXPRESSION("system.property.data.lag.expression.list","expressionListPerDC"), - DATA_LAG_DEFAULT_EXPRESSION("system.property.data.lag.default.expression","defaultExpression"), + DATA_LAG_SERVICE_IMPL_CLASS("service.binding.datalagservice", "com.salesforce.dva.argus.service.monitor.DataLagMonitorGoldenMetric"), + DATA_LAG_SERVICE_PROPERTY_FILE("service.config.datalagservice","placeholder_datalagservice.properties"), CLIENT_THREADS("system.property.client.threads", "2"), CLIENT_CONNECT_TIMEOUT("system.property.client.connect.timeout", "10000"), - DC_DEFAULT("system.property.dc.default", "defaultDC"), DC_LIST("system.property.dc.list", "DC1,DC2,DC3,DC4,DC5"), @@ -301,6 +296,7 @@ public enum Property { ANNOTATION_STORAGE_SERVICE_IMPL_CLASS("service.binding.annotation.storage", "com.salesforce.dva.argus.service.tsdb.DefaultTSDBService"), ANNOTATION_STORAGE_SERVICE_PROPERTY_FILE("service.config.annotation.storage","placeholder_annotation.storage.properties"), + IMAGE_STORAGE_SERVICE_IMPL_CLASS("service.binding.image.storage", "com.salesforce.dva.argus.service.image.ElasticSearchImageService"), AKC_CONSUMER_OFFSET_STORAGE_SERVICE_IMPL_CLASS("service.binding.akc.consumer.offset.storage", "com.salesforce.dva.argus.service.metric.NoOperationMetricsStorageService"), @@ -325,7 +321,8 @@ public enum Property { PKI_MONITORED_DIRECTORY("pki.monitored.directory", "/etc/pki_service/sfdc/argus-client"), PKI_CA_DIRECTORY("pki.ca.directory", "/etc/pki_service/ca"); - private final String _name; + + private final String _name; private final String _defaultValue; private Property(String name, String defaultValue) { diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemInitializer.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemInitializer.java index 2d3c550a7..1ebb998c4 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemInitializer.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemInitializer.java @@ -90,6 +90,7 @@ import com.salesforce.dva.argus.service.metric.AsyncMetricService; import com.salesforce.dva.argus.service.metric.metadata.IDBClient; import com.salesforce.dva.argus.service.metric.metadata.MetadataService; +import com.salesforce.dva.argus.service.monitor.DataLagService; import com.salesforce.dva.argus.service.monitor.DefaultMonitorService; import com.salesforce.dva.argus.service.oauth.DefaultOAuthAuthorizationCodeService; import com.salesforce.dva.argus.service.schema.DefaultDiscoveryService; @@ -280,6 +281,7 @@ private void configureServices() { bindConcreteClass(Property.AKC_CONSUMER_OFFSET_STORAGE_SERVICE_IMPL_CLASS, MetricStorageService.class); bindConcreteClass(Property.IDB_CLIENT_IMPL_CLASS, IDBClient.class); bindConcreteClass(Property.METADATA_SERVICE_IMPL_CLASS, MetadataService.class); + bindConcreteClass(Property.DATA_LAG_SERVICE_IMPL_CLASS, DataLagService.class); // Named annotation binding bindConcreteClassWithNamedAnnotation(getConcreteClassToBind(Property.TSDB_SERVICE_IMPL_CLASS, TSDBService.class), TSDBService.class); @@ -351,6 +353,7 @@ private Properties getServiceSpecificProperties() { readFile(properties, _systemConfiguration.getValue(Property.ANNOTATION_STORAGE_SERVICE_PROPERTY_FILE)); readFile(properties, _systemConfiguration.getValue(Property.AKC_CONSUMER_OFFSET_STORAGE_SERVICE_PROPERTY_FILE)); readFile(properties, _systemConfiguration.getValue(Property.IDB_CLIENT_PROPERTY_FILE)); + readFile(properties, _systemConfiguration.getValue(Property.DATA_LAG_SERVICE_PROPERTY_FILE)); return properties; } } diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/TestUtils.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/TestUtils.java index 06073f209..21cb5a6ae 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/TestUtils.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/TestUtils.java @@ -43,6 +43,7 @@ import com.salesforce.dva.argus.system.SystemConfiguration; import com.salesforce.dva.argus.system.SystemException; import com.salesforce.dva.argus.system.SystemMain; +import org.powermock.reflect.Whitebox; import java.io.IOException; import java.io.InputStream; @@ -293,13 +294,21 @@ public static void setStaticField(Class clazz, String fieldName, Object value field.setAccessible(true); Field modifiers = Field.class.getDeclaredField("modifiers"); modifiers.setAccessible(true); - modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL); + modifiers.set(field, field.getModifiers() & ~Modifier.FINAL); field.set(null, value); } catch (Exception ex) { fail(ex.getMessage()); } } + public static void setField(Object obj, String fieldName, Object value) { + try { + Whitebox.setInternalState(obj, fieldName, value); + } catch (Exception ex) { + fail(ex.getMessage()); + } + } + public static Notification getNotification(String notificationName, String notifierName, Alert alert, List subscriptionList) { Notification notification = new Notification(notificationName, alert, notifierName, subscriptionList, 5000L); @@ -320,5 +329,4 @@ public static Metric getMetric() { public static Trigger getTrigger(Alert alert, Trigger.TriggerType triggerType, String triggerName, String triggerThreshold, String triggerInertiaMillis) { return new Trigger(alert, triggerType, triggerName, Double.parseDouble(triggerThreshold), Long.parseLong(triggerInertiaMillis)); } - } diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/MonitorServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/MonitorServiceTest.java index 4f33f4e7d..8e5eb8264 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/MonitorServiceTest.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/MonitorServiceTest.java @@ -33,12 +33,13 @@ import com.salesforce.dva.argus.TestUtils; import com.salesforce.dva.argus.entity.Alert; -import com.salesforce.dva.argus.service.monitor.DataLagMonitor; +import com.salesforce.dva.argus.service.monitor.DataLagMonitorGoldenMetric; import com.salesforce.dva.argus.service.monitor.DefaultMonitorService; import com.salesforce.dva.argus.system.SystemConfiguration; import com.salesforce.dva.argus.system.SystemMain; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -163,47 +164,6 @@ public void run() { assertEquals(expectedCounterValue, jmxValue, DOUBLE_COMPARISON_MAX_DELTA); } - @Test - public void testDatalagIncrement() { - DataLagMonitor dataLagMonitor = new DataLagMonitor(system.getConfiguration(), metricServiceMock, tsdbMock); - Field testField = null; // Checks superclasses. - try { - testField = dataLagMonitor.getClass().getDeclaredField("_lagPerDC"); - testField.setAccessible(true); - } catch (NoSuchFieldException e) { - fail(); - } - Map _lagPerDCTest = new TreeMap<>(); - Map expectedOutput = new TreeMap<>(); - Double minute = 1.0 * 60 * 1000; - _lagPerDCTest.put("DC1", 0.0); - expectedOutput.put("DC1", minute); - _lagPerDCTest.put("DC2", 1.0 * 60 * 60 * 1000); - expectedOutput.put("DC2", 1.0 * 60 * 60 * 1000 + minute); - _lagPerDCTest.put("DC3", 2.0 * 60 * 60 * 1000); - expectedOutput.put("DC3", 2.0 * 60 * 60 * 1000 + minute); - _lagPerDCTest.put("DC4", 4.0 * 60 * 60 * 1000); - expectedOutput.put("DC4", 4.0 * 60 * 60 * 1000); - _lagPerDCTest.put("DC5", 7.0 * 60 * 60 * 1000); - expectedOutput.put("DC5", 4.0 * 60 * 60 * 1000); - try { - testField.set(dataLagMonitor, _lagPerDCTest); - } catch (IllegalAccessException e) { - fail(); - } - - for(String dc: _lagPerDCTest.keySet()) { - Double lagTime = null; - try { - lagTime = Whitebox.invokeMethod(dataLagMonitor, "getLagTimeInMillis", dc, System.currentTimeMillis(), null); - } catch (Exception e) { - fail(); - } - assertEquals(expectedOutput.get(dc), lagTime, 0.01); - } - - } - @Test(timeout = 5000L) public void testMonotonicCounterConcurrentUpdates() throws Exception { final MonitorService.Counter counter = MonitorService.Counter.ALERTS_SCHEDULED; diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/alert/DefaultAlertServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/alert/DefaultAlertServiceTest.java index f6e9afaa1..75f68a5db 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/alert/DefaultAlertServiceTest.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/alert/DefaultAlertServiceTest.java @@ -43,6 +43,7 @@ import com.salesforce.dva.argus.service.metric.MetricQueryResult; import com.salesforce.dva.argus.service.metric.transform.TransformFactory; +import com.salesforce.dva.argus.service.monitor.DataLagService; import com.salesforce.dva.argus.service.tsdb.MetricQuery; import com.salesforce.dva.argus.system.SystemConfiguration; import com.salesforce.dva.argus.util.RequestContextHolder; @@ -1531,15 +1532,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { } private void enableDatalagMonitoring(boolean isDataLagging) { - Field testField = null; // Checks superclasses. - try { - Whitebox.setInternalState(SystemConfiguration.Property.DATA_LAG_MONITOR_ENABLED, "_defaultValue", Boolean.toString(isDataLagging)); - testField = alertService.getClass().getDeclaredField("_configuration"); - testField.setAccessible(true); - testField.set(alertService, system.getConfiguration()); - } catch (NoSuchFieldException | IllegalAccessException e) { - fail(); - } + TestUtils.setField(DataLagService.Property.DATA_LAG_MONITOR_ENABLED, "_defaultValue", Boolean.toString(isDataLagging)); } private Metric _createMetric(String scope, String metricName, int triggerMinValue, int inertiaPeriod) { diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/annotation/ElasticSearchAnnotationServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/annotation/ElasticSearchAnnotationServiceTest.java index f2c584375..76790f113 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/annotation/ElasticSearchAnnotationServiceTest.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/annotation/ElasticSearchAnnotationServiceTest.java @@ -59,7 +59,12 @@ import org.powermock.reflect.Whitebox; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsServiceTest.java index b8b6f71bf..3d5793290 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsServiceTest.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/metric/ElasticSearchConsumerOffsetMetricsServiceTest.java @@ -1,6 +1,5 @@ package com.salesforce.dva.argus.service.metric; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; @@ -463,10 +462,10 @@ private ElasticSearchConsumerOffsetMetricsService _initializeSpyService(ElasticS if (isPut) { when(ElasticSearchUtils.performESRequest(eq(restClient), any(), any())).thenReturn(mapper.readValue(reply, ElasticSearchUtils.PutResponse.class)); } else { + when(ElasticSearchUtils.extractResponse(any())).thenReturn(reply); mapper = ElasticSearchConsumerOffsetMetricsService.getMetricObjectMapper(new ConsumerOffsetRecordList.IndexSerializer(), new ConsumerOffsetRecordList.Deserializer()); - ConsumerOffsetRecordList ret = mapper.readValue(reply, new TypeReference() {}); - when(ElasticSearchUtils.toEntity(any(), any(),any())).thenReturn(ret); + when(ElasticSearchUtils.toEntity(any(), any(),any())).thenCallRealMethod(); } when(ElasticSearchUtils.convertTimestampToMillis(any())).thenCallRealMethod(); diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/monitor/DataLagServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/monitor/DataLagServiceTest.java new file mode 100644 index 000000000..1a2c35041 --- /dev/null +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/monitor/DataLagServiceTest.java @@ -0,0 +1,260 @@ +/* + * + * * Copyright (c) 2016, Salesforce.com, Inc. + * * All rights reserved. + * * + * * Redistribution and use in source and binary forms, with or without + * * modification, are permitted provided that the following conditions are met: + * * + * * 1. Redistributions of source code must retain the above copyright notice, + * * this list of conditions and the following disclaimer. + * * + * * 2. Redistributions in binary form must reproduce the above copyright notice, + * * this list of conditions and the following disclaimer in the documentation + * * and/or other materials provided with the distribution. + * * + * * 3. Neither the name of Salesforce.com nor the names of its contributors may + * * be used to endorse or promote products derived from this software without + * * specific prior written permission. + * * + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * * POSSIBILITY OF SUCH DAMAGE. + * + */ + +package com.salesforce.dva.argus.service.monitor; + +import com.salesforce.dva.argus.TestUtils; +import com.salesforce.dva.argus.entity.Metric; +import com.salesforce.dva.argus.service.MailService; +import com.salesforce.dva.argus.service.MetricService; +import com.salesforce.dva.argus.service.MetricStorageService; +import com.salesforce.dva.argus.service.TSDBService; +import com.salesforce.dva.argus.service.mail.DefaultMailService; +import com.salesforce.dva.argus.service.metric.DefaultMetricService; +import com.salesforce.dva.argus.service.metric.ElasticSearchConsumerOffsetMetricsService; +import com.salesforce.dva.argus.service.tsdb.DefaultTSDBService; +import com.salesforce.dva.argus.service.tsdb.MetricQuery; +import com.salesforce.dva.argus.system.SystemConfiguration; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(SystemConfiguration.class) +public class DataLagServiceTest { + + private DataLagMonitorGoldenMetric goldenMetricService; + private DataLagMonitorConsumerOffset consumerOffsetService; + @Mock SystemConfiguration systemConfigGoldenMetric; + @Mock SystemConfiguration systemConfigConsumerOffset; + @Mock MetricService mockedMetricService; + @Mock MetricStorageService mockedMetricStorageService; + @Mock TSDBService mockedTSDBService; + @Mock MailService mailService; + + private static final List DC_LIST = Arrays.asList("DC1", "DC2", "DC3", "DC4"); + private static final String DEFAULT_EXPRESSION = "-5m:scope.default:metric.default:max:1m-max"; + + @Before + public void setUp() { + setupMockServices(); + goldenMetricService = spy(new DataLagMonitorGoldenMetric(systemConfigGoldenMetric, mockedMetricService, mockedTSDBService)); + consumerOffsetService = spy(new DataLagMonitorConsumerOffset(systemConfigConsumerOffset, mockedMetricStorageService, mockedMetricService, mockedTSDBService, mailService)); + } + + private void setupMockServices() { + systemConfigGoldenMetric = mock(SystemConfiguration.class); + systemConfigConsumerOffset = mock(SystemConfiguration.class); + mockedMetricService = mock(DefaultMetricService.class); + mockedMetricStorageService = mock(ElasticSearchConsumerOffsetMetricsService.class); + mockedTSDBService = mock(DefaultTSDBService.class); + mailService = mock(DefaultMailService.class); + when(mailService.sendMessage(any())).thenReturn(true); + + when(systemConfigConsumerOffset.getValue(SystemConfiguration.Property.DC_LIST)).thenReturn(String.join(",", DC_LIST)); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagService.Property.DATA_LAG_ENFORCE_DC_LIST, "DC4"); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagMonitorConsumerOffset.Property.DATA_LAG_DEFAULT_EXPRESSION, DEFAULT_EXPRESSION); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagMonitorConsumerOffset.Property.DATA_LAG_INERTIA, "60000"); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagMonitorConsumerOffset.Property.DATA_LAG_CLEAR_THRESHOLD, "{\"16\":[\"DC1\"], \"13\": [\"DC3\"]}"); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagMonitorConsumerOffset.Property.DATA_LAG_DEFAULT_CLEAR_THRESHOLD, "10"); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagMonitorConsumerOffset.Property.DATA_LAG_TRIGGER_THRESHOLD, "{\"36\":[\"DC1\",\"DC2\"], \"33\": [\"DC3\"]}"); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagMonitorConsumerOffset.Property.DATA_LAG_DEFAULT_TRIGGER_THRESHOLD, "20"); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagMonitorConsumerOffset.Property.DATA_LAG_QUERY_EXPRESSION, "{\"-5m:scope.test:metric.test{groupId=*testGroupId*,topic=*test.#DC#.topic*}:max:1m-max\":[\"DC1\",\"DC2\",\"DC3\"]}"); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagMonitorConsumerOffset.Property.DATA_LAG_DEBUG, "false"); + setupDefaultSysConfigValues(systemConfigConsumerOffset, DataLagMonitorConsumerOffset.Property.DATA_LAG_EMAIL, "test@example.com"); + + when(systemConfigGoldenMetric.getValue(SystemConfiguration.Property.DC_LIST)).thenReturn(String.join(",", DC_LIST)); + setupDefaultSysConfigValues(systemConfigGoldenMetric, DataLagService.Property.DATA_LAG_ENFORCE_DC_LIST, "DC4"); + setupDefaultSysConfigValues(systemConfigGoldenMetric, DataLagMonitorGoldenMetric.Property.DATA_LAG_DEFAULT_EXPRESSION, DEFAULT_EXPRESSION); + setupDefaultSysConfigValues(systemConfigGoldenMetric, DataLagMonitorGoldenMetric.Property.DATA_LAG_THRESHOLD, "10000"); // 10 seconds threshold. + setupDefaultSysConfigValues(systemConfigGoldenMetric, DataLagMonitorGoldenMetric.Property.DATA_LAG_QUERY_EXPRESSION, "{\"-1h:scope.#DC#.test:metric.test:avg:1m-sum\":[\"DC1\",\"DC2\"], \"-4h:scope.#DC#.test2:metric.test2:avg:1m-sum\": [\"DC3\"]}"); + + } + + @Test + public void testQueryForDC() { + Map tags = new HashMap<>(); + tags.put("groupId", "*testGroupId*"); + Long currentTime = System.currentTimeMillis(); + for( String dc: DC_LIST) { + //ConsumerOffset. + tags.put("topic", "*test." + dc + ".topic*"); + String expression = "-5m:scope.test:metric.test{groupId=*testGroupId*,topic=*test." + dc.toLowerCase() + ".topic*}:max:1m-max"; + if (dc.equalsIgnoreCase("DC4")) { + expression = DEFAULT_EXPRESSION; + } + List mQList = Arrays.asList(new MetricQuery("scope.test", "metric.test", tags, currentTime - 5 * 60 * 1000L, currentTime)); + when(mockedMetricService.getQueries(expression, currentTime)).thenReturn(mQList); + assertEquals(dc, consumerOffsetService.getDCFromTopic("*test." + dc.toLowerCase() + ".topic*")); + + //GoldenMetric. + if(dc.equalsIgnoreCase("DC4")) { + when(mockedMetricService.getMetrics(DEFAULT_EXPRESSION, currentTime)).thenReturn(null); + } else if(dc.equalsIgnoreCase("DC3")) { + when(mockedMetricService.getMetrics("-1h:scope." + dc + ".test2:metric.test2:avg:1m-sum", currentTime)).thenReturn(null); + } else { + when(mockedMetricService.getMetrics("-1h:scope." + dc + ".test:metric.test:avg:1m-sum", currentTime)).thenReturn(null); + } + } + consumerOffsetService.queryMetricsForDC(new HashSet<>(DC_LIST), currentTime); + goldenMetricService.queryMetricsForDC(new HashSet<>(DC_LIST), currentTime); + } + + @Test + public void testComputeDataLag() { + Metric triggerM = new Metric("scope.test", "metric.test"); + Metric clearM = new Metric("scope.test", "metric.test"); + Metric noChange = new Metric("scope.test", "metric.test"); + Map lagState = new HashMap<>(); + Long currTime = System.currentTimeMillis(); + for(int i = 0 ; i < 10; i++) { + triggerM.addDatapoint( currTime - (i + 1) * 20_000, 42.0 + i % 3); + clearM.addDatapoint(currTime - i * 1000, 1.0 + i % 4); + noChange.addDatapoint(currTime - i * 3000, i * 2.0); // Not all datapoints satisfy clear criterion. + } + + assertTrue(consumerOffsetService.computeDataLag("DC1", Arrays.asList(triggerM))); + assertFalse(consumerOffsetService.computeDataLag("DC1", Arrays.asList(clearM))); + + assertTrue(goldenMetricService.computeDataLag("DC1", Arrays.asList(triggerM))); + assertFalse(goldenMetricService.computeDataLag("DC1", Arrays.asList(clearM))); + + lagState.put("DC1", true); // Default to true and it should not change. + TestUtils.setField(consumerOffsetService, "lagStatePerDC", lagState); + assertTrue(consumerOffsetService.computeDataLag("DC1", Arrays.asList(noChange))); + assertFalse(goldenMetricService.computeDataLag("DC1", Arrays.asList(noChange))); + } + + @Test + public void testIsDataLagging() { + assertTrue(consumerOffsetService.isDataLagging("DC4")); //enforceLagPresentSet. + assertFalse(consumerOffsetService.isDataLagging("Dc1 ")); + assertFalse(consumerOffsetService.isDataLagging("dc5")); + assertTrue(goldenMetricService.isDataLagging("DC4")); //enforceLagPresentSet. + assertFalse(goldenMetricService.isDataLagging("dc2 ")); + assertFalse(goldenMetricService.isDataLagging("dc5")); + + Map lagState = new HashMap<>(); + // Set all dc to enable data lag. + DC_LIST.forEach(dc -> lagState.put(dc, true)); + TestUtils.setField(consumerOffsetService, "lagStatePerDC", lagState); + assertTrue(consumerOffsetService.isDataLagging(" dC8 ")); + assertTrue(consumerOffsetService.isDataLagging(" dC3")); + + TestUtils.setField(goldenMetricService, "_isDataLaggingbyDCMap", lagState); + assertTrue(goldenMetricService.isDataLagging(" dC8 ")); + assertTrue(goldenMetricService.isDataLagging(" dC1")); + + //Set some dc data lag state to be true and check for rest. + List enableDataLagForSomeDC = Arrays.asList("DC1", "DC3"); + DC_LIST.forEach(dc -> lagState.put(dc, false)); + enableDataLagForSomeDC.forEach(dc -> lagState.put(dc, true)); + TestUtils.setField(consumerOffsetService, "lagStatePerDC", lagState); + TestUtils.setField(goldenMetricService, "_isDataLaggingbyDCMap", lagState); + DC_LIST.forEach(dc -> { + if (enableDataLagForSomeDC.contains(dc) || dc.equalsIgnoreCase("DC4")) { + assertTrue(consumerOffsetService.isDataLagging(dc)); + assertTrue(goldenMetricService.isDataLagging(dc)); + } else { + assertFalse(consumerOffsetService.isDataLagging(dc)); + assertFalse(goldenMetricService.isDataLagging(dc)); + } + } ); + } + + @Test + public void testPushMetric() { + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + List expectedOutput = new ArrayList<>(); + doNothing().when(mockedTSDBService).putMetrics(anyList()); + for( String dc: DC_LIST) { + Long currentTime = System.currentTimeMillis(); + Metric m = new Metric("argus.core", "datalag.offset"); + m.setTag("dc", dc); + m.setTag("host", SystemConfiguration.getHostname()); + m.addDatapoint(currentTime, 1.0); + consumerOffsetService.pushMetric(currentTime, 1.0, dc); + expectedOutput.add(m); + + m = new Metric(m); + m.setMetric("datalag.seconds"); + goldenMetricService.pushMetric(currentTime, 1.0, dc); + expectedOutput.add(m); + } + verify(mockedTSDBService, times(8)).putMetrics(captor.capture()); + List actualOutput = captor.getAllValues().stream().flatMap(Collection::stream).collect(Collectors.toList()); + Collections.sort(actualOutput); + Collections.sort(expectedOutput); + assertEquals(expectedOutput, actualOutput); + } + + private void setupDefaultSysConfigValues(SystemConfiguration mocksysConfig, DataLagService.Property p, String reply) { + when(mocksysConfig.getValue(p.getName(), p.getDefaultValue())).thenReturn(reply); + } + + private void setupDefaultSysConfigValues(SystemConfiguration mocksysConfig, DataLagMonitorConsumerOffset.Property p, String reply) { + when(mocksysConfig.getValue(p.getName(), p.getDefaultValue())).thenReturn(reply); + } + + private void setupDefaultSysConfigValues(SystemConfiguration mocksysConfig, DataLagMonitorGoldenMetric.Property p, String reply) { + when(mocksysConfig.getValue(p.getName(), p.getDefaultValue())).thenReturn(reply); + } +} From a61b89f20f3d94b33a4afa0a80b28ebedbe7372d Mon Sep 17 00:00:00 2001 From: Sudhanshu Bahety Date: Fri, 2 Aug 2019 17:08:39 -0700 Subject: [PATCH 6/9] W-6124967 [Data Lag Detection] Query Annotation ES cluster to detect data lag (Alert Client) - 2 (#547) --- .../monitor/DataLagMonitorConsumerOffset.java | 57 ++++++++++++++----- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorConsumerOffset.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorConsumerOffset.java index c46afc993..92f409a82 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorConsumerOffset.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/monitor/DataLagMonitorConsumerOffset.java @@ -53,11 +53,14 @@ import com.salesforce.dva.argus.service.mail.EmailContext; import com.salesforce.dva.argus.service.tsdb.MetricQuery; import com.salesforce.dva.argus.system.SystemConfiguration; +import com.salesforce.dva.argus.system.SystemException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.NotFoundException; import java.text.MessageFormat; +import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -65,6 +68,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -97,6 +101,8 @@ public class DataLagMonitorConsumerOffset implements DataLagService { private static final String DEFAULT_SUBJECT = "Data Lag Consumer Offset Method detected a state change"; private static String DEBUG_PREFIX; + private final ExecutorCompletionService>> completionService; + @Inject public DataLagMonitorConsumerOffset(SystemConfiguration config, MetricStorageService consumerOffsetMetricService, MetricService metricService, TSDBService tsdbService, MailService mailService) { this.sysConfig = config; @@ -107,6 +113,7 @@ public DataLagMonitorConsumerOffset(SystemConfiguration config, MetricStorageSer this.mailService = mailService; this.hostName = SystemConfiguration.getHostname(); datalagInertia = Long.valueOf(sysConfig.getValue(Property.DATA_LAG_INERTIA.getName(), Property.DATA_LAG_INERTIA.getDefaultValue())); + completionService = new ExecutorCompletionService<>(Executors.newFixedThreadPool(5)); init(); this.logger.info(DEBUG_PREFIX + "Data lag consumer offset monitor initialized"); } @@ -215,18 +222,42 @@ public Map> queryMetricsForDC(Set dcSet, Long start Long startTimeFinal = startTime; - Map> result = new HashMap<>(); - List mQList = dcSet.stream().parallel() - .map(expressionPerDC::get) - .map(expression -> metricService.parseToMetricQuery(expression, startTimeFinal)) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - if (mQList.size() != dcSet.size()) { - logger.error(DEBUG_PREFIX + "Metric Query Size does not match number of dcs present. Metric Query: {}, DCs: {}", mQList, dcSet); + Map> metricsPerDC = new HashMap<>(); + + for (String dc : dcSet) { + completionService.submit(() -> { + List metrics = new ArrayList<>(); + String currentDcExpression = expressionPerDC.get(dc); + List metricQueryList = metricService.parseToMetricQuery(currentDcExpression, startTimeFinal); + try { + metrics = consumerOffsetMetricService.getMetrics(metricQueryList).values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + } catch (Exception e) { + metrics.clear(); + logger.error(DEBUG_PREFIX + "Consumer Offset Metric Service failed to get metric for expression: " + currentDcExpression + " while being queried by DataLagMonitorConsumerOffset, for DC: " + dc + ". Querying TSDB for metrics. Exception: ", e); + } + try { + if (metrics.size() == 0) { + logger.warn(DEBUG_PREFIX + "Cannot retrieve metrics from ES cluster. Querying TSDB for metrics."); + metrics = metricService.getMetrics(currentDcExpression, startTimeFinal).getMetricsList(); + } + } catch (Exception e) { + metrics.clear(); + logger.error(DEBUG_PREFIX + "TSDB Metric Service failed to get metric for expression: " + currentDcExpression + " while being queried by DataLagMonitorConsumerOffset, for DC: " + dc + " Exception: ", e); + } + return new AbstractMap.SimpleEntry<>(dc, metrics); + }); } - consumerOffsetMetricService.getMetrics(mQList).forEach((mQ, mList) -> result.put(getDCFromTopic(mQ.getTags().get(TOPIC_TAG)), mList)); - return result; + for (int idx = 0; idx < dcSet.size(); ++idx) { + try { + AbstractMap.SimpleEntry> result = completionService.take().get(); + metricsPerDC.put(result.getKey(), result.getValue()); + } catch (Exception e) { + logger.error(DEBUG_PREFIX + "Exception occured while querying metrics", e); + } + } + + return metricsPerDC; } @VisibleForTesting @@ -250,9 +281,9 @@ protected String getDCFromTopic(String topic) { public Boolean computeDataLag(String dc, List metricList) { if (metricList.size() <= 0) { - logger.error(DEBUG_PREFIX + "No Metrics could be obtained for dc: {}, disabling data lag by default.", dc); - lagStatePerDC.put(dc, false); - return false; + logger.error(DEBUG_PREFIX + "No Metrics could be obtained for dc: {}, enabling data lag by default.", dc); + lagStatePerDC.put(dc, true); + return true; } else if (metricList.size() != 1) { logger.warn(DEBUG_PREFIX + "More than 1 metrics returned for a single dc: {}, Metric list: {}\nCombining all data points to compute data lag.",dc, metricList); From d555052e68fc3f65decf416efe48faba257f2f1c Mon Sep 17 00:00:00 2001 From: Sudhanshu Bahety Date: Mon, 5 Aug 2019 11:45:58 -0700 Subject: [PATCH 7/9] Make function non-static (#550) --- .../dva/argus/service/metric/DefaultMetricService.java | 2 +- .../dva/argus/service/metric/MetricQueryProcessor.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/DefaultMetricService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/DefaultMetricService.java index 6b68b6191..93594276b 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/DefaultMetricService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/DefaultMetricService.java @@ -234,7 +234,7 @@ public List parseToMetricQuery(List expressions, long relat _logger.debug("Parsing expression to metric query for {}", expression); QueryContextHolder contextHolder = new QueryContextHolder(); reader.parse(expression, relativeTo, MetricQuery.class, contextHolder, false); - queries.add(MetricQueryProcessor.convertTSDBQueryToMetricQuery(contextHolder.getCurrentQueryContext().getExpression())); + queries.add(_queryProcessor.convertTSDBQueryToMetricQuery(contextHolder.getCurrentQueryContext().getExpression())); } } catch (ParseException ex) { throw new SystemException("Failed to parse the given expression", ex); diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/MetricQueryProcessor.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/MetricQueryProcessor.java index caa4c4252..23f67f025 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/MetricQueryProcessor.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/metric/MetricQueryProcessor.java @@ -104,7 +104,7 @@ public void mergeQueryResults(MetricQueryResult parentResult, MetricQueryResult } } - public static MetricQuery convertTSDBQueryToMetricQuery(TSDBQueryExpression expression) { + public MetricQuery convertTSDBQueryToMetricQuery(TSDBQueryExpression expression) { Long startTimestamp = expression.getStartTimestamp(); Long endTimestamp = expression.getEndTimestamp(); String namespace = expression.getNamespace(); @@ -184,7 +184,7 @@ private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) { /* * We replace the aggregator to provide a non-interpolated default behavior for MIN, MAX and SUM */ - private static Aggregator getSubstituteAggregator(Aggregator aggregator) { + private Aggregator getSubstituteAggregator(Aggregator aggregator) { switch (aggregator) { case MIN: return Aggregator.MIMMIN; From 96b91e051c04bcacf54f12e6c3af3b61c47fe24a Mon Sep 17 00:00:00 2001 From: Sanjana Chandrashekar Date: Mon, 5 Aug 2019 15:23:32 -0700 Subject: [PATCH 8/9] Updated to use alert enqueue timestamp while computing the notification tracking ID (#551) --- .../salesforce/dva/argus/service/alert/DefaultAlertService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/alert/DefaultAlertService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/alert/DefaultAlertService.java index 767cceb32..bc18c20ee 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/alert/DefaultAlertService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/alert/DefaultAlertService.java @@ -1322,7 +1322,7 @@ public void sendNotification(Trigger trigger, Metric metric, History history, No } } - Long timestamp = (triggerFiredTime != null) ? triggerFiredTime : System.currentTimeMillis(); + Long timestamp = (alertEnqueueTime != null) ? alertEnqueueTime : System.currentTimeMillis(); String alertEvaluationTrackingID = getAlertEvaluationTrackingID(alert, timestamp); NotificationContext context = new NotificationContext(alert, trigger, notification, triggerFiredTime, From 25f86b72f208d60d60156b63eacb5f0c84edc006 Mon Sep 17 00:00:00 2001 From: Pimeng Fu Date: Mon, 5 Aug 2019 15:38:31 -0700 Subject: [PATCH 9/9] Add parentheses folding for prettify editor (#549) * Add expression prettifier * Add login instruction * Add parentheses folding to prettify editor --- ArgusWeb/app/js/controllers/viewMetrics.js | 11 ++--- ArgusWeb/app/js/services/utilService.js | 48 ++++++++++++++++++++++ 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/ArgusWeb/app/js/controllers/viewMetrics.js b/ArgusWeb/app/js/controllers/viewMetrics.js index a3ee585f9..b3037423a 100644 --- a/ArgusWeb/app/js/controllers/viewMetrics.js +++ b/ArgusWeb/app/js/controllers/viewMetrics.js @@ -1,7 +1,7 @@ /*global angular:false, console:false */ 'use strict'; -angular.module('argus.controllers.viewMetrics', ['ngResource']) +angular.module('argus.controllers.viewMetrics', ['ngResource', 'ui.codemirror']) .controller('ViewMetrics', ['$location', '$routeParams', '$scope', '$compile', 'growl', 'Metrics', 'Annotations', 'SearchService', 'Controls', 'ChartDataProcessingService', 'DateHandlerService', 'UtilService', function ($location, $routeParams, $scope, $compile, growl, Metrics, Annotations, SearchService, Controls, ChartDataProcessingService, DateHandlerService, UtilService) { var lastParams; @@ -339,16 +339,17 @@ angular.module('argus.controllers.viewMetrics', ['ngResource']) event.stopPropagation(); }); }; + $scope.editorOptions = { lineWrapping: true, lineNumbers: true, - mode: 'htmlmixed', + mode: 'julia', viewportMargin: Infinity, tabSize: 2, - foldGutter: true, + foldGutter: { + rangeFinder: UtilService.rangeFinderParentheses, + }, gutters: ['CodeMirror-linenumbers', 'CodeMirror-foldgutter'], - autoCloseTags: true, - matchTags: {bothTags: true}, extraKeys: { /* key board short cuts in the the editor */ 'Alt-Space': 'autocomplete', 'Ctrl-Alt-F': function(editor) { diff --git a/ArgusWeb/app/js/services/utilService.js b/ArgusWeb/app/js/services/utilService.js index 135f03453..3b55f613e 100644 --- a/ArgusWeb/app/js/services/utilService.js +++ b/ArgusWeb/app/js/services/utilService.js @@ -239,6 +239,54 @@ angular.module('argus.services.utils', []) if (firstChar === '#') return 'constant' if (/[A-Z]/.test(firstChar)) return 'transform' return 'expression' + }, + + rangeFinderParentheses: function(cm, start) { + var line = start.line, lineText = cm.getLine(line); + var tokenType; + + function findOpening(openCh) { + for (var at = start.ch, pass = 0;;) { + var found = at <= 0 ? -1 : lineText.lastIndexOf(openCh, at - 1); + if (found == -1) { + if (pass == 1) break; + pass = 1; + at = lineText.length; + continue; + } + if (pass == 1 && found < start.ch) break; + tokenType = cm.getTokenTypeAt(CodeMirror.Pos(line, found + 1)); + if (!/^(comment|string)/.test(tokenType)) return found + 1; + at = found - 1; + } + } + + var startToken = "(", endToken = ")", startCh = findOpening("("); + if (startCh == null) { + startToken = "[", endToken = "]"; + startCh = findOpening("["); + } + + if (startCh == null) return; + var count = 1, lastLine = cm.lastLine(), end, endCh; + outer: for (var i = line; i <= lastLine; ++i) { + var text = cm.getLine(i), pos = i == line ? startCh : 0; + for (;;) { + var nextOpen = text.indexOf(startToken, pos), nextClose = text.indexOf(endToken, pos); + if (nextOpen < 0) nextOpen = text.length; + if (nextClose < 0) nextClose = text.length; + pos = Math.min(nextOpen, nextClose); + if (pos == text.length) break; + if (cm.getTokenTypeAt(CodeMirror.Pos(i, pos + 1)) == tokenType) { + if (pos == nextOpen) ++count; + else if (!--count) { end = i; endCh = pos; break outer; } + } + ++pos; + } + } + if (end == null || line == end && endCh == startCh) return; + return {from: CodeMirror.Pos(line, startCh), + to: CodeMirror.Pos(end, endCh)}; } }; return options;