Skip to content

Commit

Permalink
First pass of a Date Range Query Interceptor.
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Hughes <[email protected]>
  • Loading branch information
jnh5y committed Sep 14, 2020
1 parent 1b3fef4 commit 1d90cb3
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/***********************************************************************
* Copyright (c) 2013-2020 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.index.planning

import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit

import com.typesafe.scalalogging.LazyLogging
import org.geotools.data.{DataStore, Query}
import org.locationtech.geomesa.filter.{Bounds, FilterHelper, FilterValues}
import org.locationtech.geomesa.index.conf.QueryHints
import org.opengis.feature.simple.SimpleFeatureType
import org.geotools.data.{DataStore, Query}
import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex
import org.locationtech.geomesa.index.geotools.GeoMesaDataStore
import org.locationtech.geomesa.index.index.z2.{XZ2Index, Z2Index}
import org.locationtech.geomesa.index.index.z3.{XZ3Index, Z3Index}
import org.locationtech.geomesa.index.strategies.SpatioTemporalFilterStrategy
import org.opengis.feature.simple.SimpleFeatureType

class DateRangerQueryInterceptor extends QueryInterceptor with LazyLogging {
private var ds: DataStore = _
private var sft: SimpleFeatureType = _
/**
* Called exactly once after the interceptor is instantiated
*
* @param ds data store
* @param sft simple feature type
*/
override def init(ds: DataStore, sft: SimpleFeatureType): Unit = {
this.ds = ds
this.sft = sft
}

/**
* Modifies the query in place
*
* @param query query
*/
override def rewrite(query: Query): Unit = {
QueryInterceptor.skipExecution.set(true)

This comment has been minimized.

Copy link
@elahrvivaz

elahrvivaz Sep 15, 2020

Contributor

instead of a thread local here, you could maybe use set a query hint

val plan = ds.asInstanceOf[GeoMesaDataStore[_]].getQueryPlan(query)

This comment has been minimized.

Copy link
@elahrvivaz

elahrvivaz Sep 15, 2020

Contributor

if you need access to the query plan, there might be a better way to do this, as this will end up generating the query plan twice. we can override the default planner using the system property "geomesa.strategy.decider" - that gives you access to the different available plans, with the bounds already extracted, etc. see

QueryInterceptor.skipExecution.set(false)
val index: GeoMesaFeatureIndex[_, _] = plan.head.filter.index

index match {
case _ : Z2Index | _ : XZ2Index |_ : Z3Index | _ : XZ3Index =>
val length = calculateDateRange(query)
println(s"Length for query ${query.getFilter} is $length days")
if (length >= 10) {
throw new IllegalArgumentException("Date ranges need to be shorter than 10 days!")
} else {
logger.debug("Didn't update query")
}
case _ =>
println("Got another index")
}
}

private def calculateDateRange(query: Query): Long = {
var length: Long = 0
val temporalRange: FilterValues[Bounds[ZonedDateTime]] = FilterHelper.extractIntervals(query.getFilter, "dtg")

logger.debug(s"Got ranges $temporalRange")
temporalRange.foreach { bounds =>
val upper: ZonedDateTime = bounds.upper.value.get
val lower = bounds.lower.value.get
length += Math.abs(ChronoUnit.DAYS.between(upper, lower))
}
logger.debug(s"Time range for dtg detected to be $length days.")
length
}

override def close(): Unit = { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package org.locationtech.geomesa.index.planning

import java.io.Closeable
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import com.typesafe.scalalogging.LazyLogging
Expand Down Expand Up @@ -43,6 +44,7 @@ trait QueryInterceptor extends Closeable {
}

object QueryInterceptor extends LazyLogging {
protected [planning] val skipExecution = new AtomicBoolean(false)

/**
* Manages query interceptors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.locationtech.geomesa.filter.{FilterHelper, andFilters, ff, orFilters}
import org.locationtech.geomesa.index.conf.QueryHints
import org.locationtech.geomesa.index.geoserver.ViewParams
import org.locationtech.geomesa.index.iterators.{DensityScan, StatsScan}
import org.locationtech.geomesa.index.planning.QueryInterceptor.QueryInterceptorFactory
import org.locationtech.geomesa.index.planning.QueryInterceptor.{QueryInterceptorFactory, skipExecution}
import org.locationtech.geomesa.index.utils.{ExplainLogging, Explainer}
import org.locationtech.geomesa.utils.bin.BinaryOutputEncoder
import org.locationtech.geomesa.utils.collection.CloseableIterator
Expand Down Expand Up @@ -61,8 +61,10 @@ trait QueryRunner {

// query rewriting
interceptors(sft).foreach { interceptor =>
interceptor.rewrite(query)
QueryRunner.logger.trace(s"Query rewritten by $interceptor to: $query")
if (!skipExecution.get) {
interceptor.rewrite(query)
QueryRunner.logger.trace(s"Query rewritten by $interceptor to: $query")
}
}

// set query hints - we need this in certain situations where we don't have access to the query directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.locationtech.geomesa.index.geotools.GeoMesaDataStoreTest._
import org.locationtech.geomesa.index.index.attribute.AttributeIndex
import org.locationtech.geomesa.index.index.id.IdIndex
import org.locationtech.geomesa.index.index.z3.Z3Index
import org.locationtech.geomesa.index.planning.QueryInterceptor
import org.locationtech.geomesa.index.planning.{DateRangerQueryInterceptor, QueryInterceptor}
import org.locationtech.geomesa.index.process.GeoMesaProcessVisitor
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.Configs
Expand All @@ -46,6 +46,7 @@ import org.specs2.runner.JUnitRunner

@RunWith(classOf[JUnitRunner])
class GeoMesaDataStoreTest extends Specification {
sequential

import org.locationtech.geomesa.utils.geotools.CRS_EPSG_4326

Expand Down Expand Up @@ -120,6 +121,41 @@ class GeoMesaDataStoreTest extends Specification {
results = SelfClosingIterator(ds.getFeatureReader(new Query(sft.getTypeName, ECQL.toFilter("bbox(geom,39,54,51,56)")), Transaction.AUTO_COMMIT)).toSeq
results must haveLength(10)
}

"intercept and stop queries" in {
val sft = SimpleFeatureTypes.createType("rewrite", "name:String:index=true,age:Int,dtg:Date,*geom:Point:srid=4326")
sft.getUserData.put(SimpleFeatureTypes.Configs.QueryInterceptors, classOf[DateRangerQueryInterceptor].getName)

val ds = new TestGeoMesaDataStore(true)
ds.createSchema(sft)

ds.getFeatureSource(sft.getTypeName).addFeatures(new ListFeatureCollection(sft, features.toArray[SimpleFeature]))

// Normal queries should work
var filter = ECQL.toFilter("bbox(geom,39,54,51,56)")
var results = SelfClosingIterator(ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT)).toSeq
results must haveLength(10)

// Long time ranges should fail.
filter = ECQL.toFilter("dtg > '2018-01-01T00:00:00.000Z' and dtg < '2019-01-01T00:00:00.000Z'")
ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT) must throwAn[IllegalArgumentException]

filter = ECQL.toFilter("bbox(geom,38,53,42,57) AND dtg > '2018-01-01T00:00:00.000Z' and dtg < '2018-01-12T00:00:00.000Z'")
ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT) must throwAn[IllegalArgumentException]

filter = ECQL.toFilter("name = 'name1' AND dtg > '2018-01-01T00:00:00.000Z' and dtg < '2018-01-12T00:00:00.000Z'")
results = SelfClosingIterator(ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT)).toSeq
results must haveLength(1)

filter = ECQL.toFilter("dtg > '2018-01-01T00:00:00.000Z' and dtg < '2018-01-02T00:00:00.000Z'")
results = SelfClosingIterator(ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT)).toSeq
results must haveLength(10)

filter = ECQL.toFilter("bbox(geom,38,53,42,57) AND dtg during 2018-01-01T00:00:00.000Z/2018-01-01T12:00:00.000Z")
results = SelfClosingIterator(ds.getFeatureReader(new Query(sft.getTypeName, filter), Transaction.AUTO_COMMIT)).toSeq
results must haveLength(3)
}

"update schemas" in {
foreach(Seq(true, false)) { partitioning =>
val ds = new TestGeoMesaDataStore(true)
Expand Down

0 comments on commit 1d90cb3

Please sign in to comment.