Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh-3334: Fix Gremlin Reusing Accumulo Iterators #3335

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,6 @@ public void addEdge(final GafferPopEdge edge) {
public Iterator<Vertex> vertices(final Object... vertexIds) {
final boolean getAll = null == vertexIds || 0 == vertexIds.length;
final OperationChain<Iterable<? extends Element>> getOperation;
final Iterable<Vertex> orphanVertices;

LOGGER.debug(GET_DEBUG_MSG, variables.getElementsLimit());
if (getAll) {
Expand All @@ -430,29 +429,28 @@ public Iterator<Vertex> vertices(final Object... vertexIds) {
.then(new Limit<Element>(variables.getElementsLimit(), true))
.build();
}
// Run requested chain on the graph
final Iterable<? extends Element> result = execute(getOperation);
// Run requested chain on the graph and buffer result to set to avoid reusing iterator
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<Vertex> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<Vertex> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Vertex.class::isInstance)
.map(e -> (Vertex) e)
.limit(variables.getElementsLimit())
.iterator();
.collect(Collectors.toSet());

if (IterableUtils.size(translatedResults) >= variables.getElementsLimit()) {
if (translatedResults.size() >= variables.getElementsLimit()) {
LOGGER.warn(
"Result size is greater than or equal to configured limit ({}). Results may have been truncated",
variables.getElementsLimit());
}

// Check for seeds that are not entities but are vertices on an edge (orphan vertices)
orphanVertices = GafferVertexUtils.getOrphanVertices(result, this, vertexIds);
Iterable<Vertex> chainedIterable = IterableUtils.chainedIterable(translatedResults, orphanVertices);
translatedResults.addAll(GafferVertexUtils.getOrphanVertices(result, this, vertexIds));

return chainedIterable.iterator();
return translatedResults.iterator();
}

/**
Expand Down Expand Up @@ -597,19 +595,19 @@ public Iterator<Edge> edges(final Object... elementIds) {
.build();
}

// Run requested chain on the graph
final Iterable<? extends Element> result = execute(getOperation);
// Run requested chain on the graph and buffer to set to avoid reusing iterator
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<Edge> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<Edge> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.limit(variables.getElementsLimit())
.iterator();
.collect(Collectors.toSet());

if (IterableUtils.size(translatedResults) >= variables.getElementsLimit()) {
if (translatedResults.size() >= variables.getElementsLimit()) {
LOGGER.warn(
"Result size is greater than or equal to configured limit ({}). Results may have been truncated",
variables.getElementsLimit());
Expand Down Expand Up @@ -811,20 +809,19 @@ private Iterator<GafferPopVertex> verticesWithSeedsAndView(final List<ElementSee
}
}

// Run operation on graph
final Iterable<? extends Element> result = execute(getOperation);
// Run operation on graph buffer to set to avoid reusing iterator
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<GafferPopVertex> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<GafferPopVertex> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(GafferPopVertex.class::isInstance)
.map(e -> (GafferPopVertex) e)
.limit(variables.getElementsLimit())
.iterator();
.collect(Collectors.toSet());

return translatedResults.iterator();

}

private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> seeds, final Direction direction, final View view) {
Expand All @@ -840,30 +837,29 @@ private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> see
.build())
.build());

List<EntityId> seedList = StreamSupport.stream(getAdjEntitySeeds.spliterator(), false).collect(Collectors.toList());
List<? extends EntityId> seedList = IterableUtils.toList(getAdjEntitySeeds);

// GetAdjacentIds provides list of entity seeds so run a GetElements to get the actual Entities
final Iterable<? extends Element> result = execute(new OperationChain.Builder()
.first(new GetElements.Builder()
.input(seedList)
.build())
.build());
final Set<Element> result = new HashSet<>(IterableUtils.toList(
execute(new OperationChain.Builder()
.first(new GetElements.Builder()
.input(seedList)
.build())
.build())));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<Vertex> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<Vertex> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Vertex.class::isInstance)
.map(e -> (Vertex) e)
.iterator();
.collect(Collectors.toSet());

// Check for seeds that are not entities but are vertices on an edge (orphan vertices)
Iterable<Vertex> chainedIterable = translatedResults;
for (final EntityId seed : seedList) {
Iterable<Vertex> orphanVertices = GafferVertexUtils.getOrphanVertices(result, this, seed.getVertex());
chainedIterable = IterableUtils.chainedIterable(chainedIterable, orphanVertices);
translatedResults.addAll(GafferVertexUtils.getOrphanVertices(result, this, seed.getVertex()));
}
return chainedIterable.iterator();
return translatedResults.iterator();
}

private Iterator<Edge> edgesWithSeedsAndView(final List<ElementSeed> seeds, final Direction direction, final View view) {
Expand Down Expand Up @@ -900,16 +896,16 @@ private Iterator<Edge> edgesWithSeedsAndView(final List<ElementSeed> seeds, fina
}

// Run requested chain on the graph
final Iterable<? extends Element> result = execute(getOperation);
final Set<Element> result = new HashSet<>(IterableUtils.toList(execute(getOperation)));

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this, true);
final Iterable<Edge> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
final Set<Edge> translatedResults = StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.limit(variables.getElementsLimit())
.iterator();
.collect(Collectors.toSet());

return translatedResults.iterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public class GafferPopGraphStep<S, E extends Element> extends GraphStep<S, E> im
public GafferPopGraphStep(final GraphStep<S, E> originalGraphStep) {
super(originalGraphStep.getTraversal(), originalGraphStep.getReturnClass(), originalGraphStep.isStartStep(), originalGraphStep.getIds());
LOGGER.debug("Running custom GraphStep on GafferPopGraph");
originalGraphStep.getLabels().forEach(this::addLabel);
this.labels = originalGraphStep.getLabels();
this.parameters = originalGraphStep.getParameters();

// Save reference to the graph
GafferPopGraph graph = (GafferPopGraph) originalGraphStep.getTraversal().getGraph().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class GafferPopHasStep<S extends Element> extends HasStep<S> {
public GafferPopHasStep(final HasStep<S> originalHasStep) {
super(originalHasStep.getTraversal());
LOGGER.debug("Running custom HasStep on GafferPopGraph");

this.labels = originalHasStep.getLabels();
originalHasStep.getHasContainers().forEach(this::addHasContainer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
* (GafferPop) g.V().out() = [v2, v3, v4, v5]
* </pre>
*/
public class GafferPopVertexStep<E extends Element> extends FlatMapStep<Iterable<Vertex>, E>
public class GafferPopVertexStep<E extends Element> extends FlatMapStep<List<Vertex>, E>
implements AutoCloseable, Configuring {
private static final Logger LOGGER = LoggerFactory.getLogger(GafferPopVertexStep.class);

Expand All @@ -77,6 +77,7 @@ public GafferPopVertexStep(final VertexStep<E> originalVertexStep) {
this.edgeLabels = originalVertexStep.getEdgeLabels();
this.returnClass = originalVertexStep.getReturnClass();
this.traversal = originalVertexStep.getTraversal();
this.labels = originalVertexStep.getLabels();
}

@Override
Expand All @@ -90,7 +91,7 @@ public void configure(final Object... keyValues) {
}

@Override
protected Iterator<E> flatMap(final Traverser.Admin<Iterable<Vertex>> traverser) {
protected Iterator<E> flatMap(final Traverser.Admin<List<Vertex>> traverser) {
return Vertex.class.isAssignableFrom(returnClass) ?
(Iterator<E>) this.vertices(traverser.get()) :
(Iterator<E>) this.edges(traverser.get());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.tinkerpop.gremlin.process.traversal.Traversal.Admin;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
Expand All @@ -27,7 +28,8 @@
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.tinkerpop.process.traversal.step.GafferPopVertexStep;
import uk.gov.gchq.gaffer.tinkerpop.process.traversal.step.LazyFoldStep;

import java.util.List;

/**
* Optimisation strategy to reduce the number of Gaffer operations performed.
Expand All @@ -49,14 +51,13 @@ public void apply(final Admin<?, ?> traversal) {
LOGGER.debug("Inserting FoldStep and replacing VertexStep");

// Replace vertex step
final GafferPopVertexStep<? extends Element> listVertexStep = new GafferPopVertexStep<>(
originalVertexStep);
final GafferPopVertexStep<? extends Element> listVertexStep = new GafferPopVertexStep<>(originalVertexStep);
TraversalHelper.replaceStep(originalVertexStep, listVertexStep, traversal);

// Add in a fold step before the new VertexStep so that the input is the list of
// all vertices
LazyFoldStep<Vertex> lazyFoldStep = new LazyFoldStep<>(originalVertexStep.getTraversal());
TraversalHelper.insertBeforeStep(lazyFoldStep, listVertexStep, traversal);
FoldStep<Vertex, List<Vertex>> foldStep = new FoldStep<>(originalVertexStep.getTraversal());
TraversalHelper.insertBeforeStep(foldStep, listVertexStep, traversal);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import uk.gov.gchq.gaffer.tinkerpop.GafferPopGraph;
import uk.gov.gchq.gaffer.tinkerpop.GafferPopVertex;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -42,18 +42,19 @@ private GafferVertexUtils() {
}

/**
* Util method to extract vertices that are vertices on an edge but do not have an
* Util method to extract vertices that are vertices on an edge but do not have
* an
* associated {@link Vertex} or {@link Entity} the current graph.
* These vertices exist only on an edge.
*
*
* @param result The results from a Gaffer query
* @param graph The GafferPop graph being queried
* @param result The results from a Gaffer query
* @param graph The GafferPop graph being queried
* @param vertexIds The vertexIds that have been used as seeds in the query
* @return Iterable of 'orphan' {@link Vertex}'s
* @return {@link Collection} of 'orphan' {@link Vertex}'s
*/

public static Iterable<Vertex> getOrphanVertices(final Iterable<? extends Element> result, final GafferPopGraph graph, final Object... vertexIds) {
public static Collection<Vertex> getOrphanVertices(final Iterable<? extends Element> result, final GafferPopGraph graph, final Object... vertexIds) {
// Check for any vertex ID seeds that are not returned as Entities
List<Object> orphanVertexIds = Arrays.stream(vertexIds)
.filter(id -> StreamSupport.stream(result.spliterator(), false)
Expand All @@ -73,21 +74,23 @@ public static Iterable<Vertex> getOrphanVertices(final Iterable<? extends Elemen
* @param result The results of a Gaffer query
* @param graph The GafferPop graph being queried
* @param orphanVertexIds Any seeds that were not found to have an entity
* @return Iterable of 'orphan' {@link Vertex}'s
* @return {@link Collection} of 'orphan' {@link Vertex}'s
*/
private static Iterable<Vertex> extractOrphanVerticesFromEdges(final Iterable<? extends Element> result, final GafferPopGraph graph, final List<Object> orphanVertexIds) {
List<Vertex> orphanVertices = new ArrayList<>();
StreamSupport.stream(result.spliterator(), false)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.forEach(e -> {
if (orphanVertexIds.contains(e.getSource()) || orphanVertexIds.equals(e.getSource())) {
orphanVertices.add(new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getSource()), graph));
}
if (orphanVertexIds.contains(e.getDestination()) || orphanVertexIds.equals(e.getDestination())) {
orphanVertices.add(new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getDestination()), graph));
}
});
return orphanVertices;
private static Collection<Vertex> extractOrphanVerticesFromEdges(final Iterable<? extends Element> result, final GafferPopGraph graph, final List<Object> orphanVertexIds) {
return StreamSupport.stream(result.spliterator(), false)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.map(e -> {
if (orphanVertexIds.contains(e.getSource()) || orphanVertexIds.equals(e.getSource())) {
return new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getSource()), graph);
}
if (orphanVertexIds.contains(e.getDestination()) || orphanVertexIds.equals(e.getDestination())) {
return new GafferPopVertex(GafferPopGraph.ID_LABEL, GafferCustomTypeFactory.parseForGraphSONv3(e.getDestination()), graph);
}
return e;
})
.filter(Vertex.class::isInstance)
.map(v -> (Vertex) v)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ void shouldGetAllEdges() {
final Iterator<Edge> edges = graph.edges();

// Then
assertThat(edges).toIterable().containsExactly(createdEdge, knowsEdge);
assertThat(edges).toIterable().containsExactlyInAnyOrder(createdEdge, knowsEdge);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class GafferPopGraphStepStrategyCypherIT {

@BeforeAll
public static void beforeAll() {
GafferPopGraph gafferPopGraph = GafferPopModernTestUtils.createModernGraph(GafferPopGraphStepStrategyCypherIT.class, StoreType.MAP);
GafferPopGraph gafferPopGraph = GafferPopModernTestUtils.createModernGraph(GafferPopGraphStepStrategyCypherIT.class, StoreType.ACCUMULO);
g = gafferPopGraph.traversal();
}

Expand Down
Loading