Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix betweenness centrality memory limits #437

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions cpp/betweenness_centrality_module/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,6 @@ add_query_module(betweenness_centrality 1 "${betweenness_centrality_src}")
target_link_libraries(betweenness_centrality PRIVATE mg_utility)
target_include_directories(betweenness_centrality PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})

# Module tests
if (NOT MAGE_CUGRAPH_ENABLE)
include(GoogleTest)
set(betweenness_centrality_src
betweenness_centrality_test.cpp
algorithm/betweenness_centrality.cpp)

add_executable(betweenness_centrality_test "${betweenness_centrality_src}")
target_link_libraries(betweenness_centrality_test PRIVATE mg_utility mage_gtest)
gtest_add_tests(TARGET betweenness_centrality_test)
endif()

######################################################################################

# Online betweenness centrality module

set(betweenness_centrality_online_src
Expand All @@ -37,17 +23,3 @@ add_query_module(betweenness_centrality_online 1 "${betweenness_centrality_onlin
# Link external libraries
target_link_libraries(betweenness_centrality_online PRIVATE mg_utility)
target_include_directories(betweenness_centrality_online PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})

# Module tests
if (NOT MAGE_CUGRAPH_ENABLE)
include(GoogleTest)
set(betweenness_centrality_online_src
betweenness_centrality_online_test.cpp
algorithm/betweenness_centrality.cpp
algorithm_online/betweenness_centrality_online.cpp
../biconnected_components_module/algorithm/biconnected_components.cpp)

add_executable(betweenness_centrality_online_test "${betweenness_centrality_online_src}")
target_link_libraries(betweenness_centrality_online_test PRIVATE mg_utility mage_gtest)
gtest_add_tests(TARGET betweenness_centrality_online_test)
endif()
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "betweenness_centrality.hpp"

#include <omp.h>
#include <queue>
#include <stack>
#include <vector>

#include "betweenness_centrality.hpp"
#include "mg_procedure.h"

namespace betweenness_centrality_util {

Expand Down Expand Up @@ -58,15 +60,18 @@ void Normalize(std::vector<double> &vec, double constant) {

namespace betweenness_centrality_alg {

std::vector<double> BetweennessCentrality(const mg_graph::GraphView<> &graph, bool directed, bool normalize,
std::vector<double> BetweennessCentrality(const mg_graph::GraphView<> &graph, mgp_graph *mg_graph, bool directed, bool normalize,
int threads) {
auto number_of_nodes = graph.Nodes().size();
std::vector<double> betweenness_centrality(number_of_nodes, 0);

// perform bfs for every node in the graph
omp_set_dynamic(0);
omp_set_num_threads(threads);
#pragma omp parallel for
#pragma omp parallel
{
[[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(mg_graph);
#pragma omp for
for (std::uint64_t node_id = 0; node_id < number_of_nodes; node_id++) {
// data structures used in BFS
std::stack<std::uint64_t> visited;
Expand Down Expand Up @@ -98,6 +103,8 @@ std::vector<double> BetweennessCentrality(const mg_graph::GraphView<> &graph, bo
}
}
}
[[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(mg_graph);
}

if (normalize) {
// normalized by dividing the value by the number of pairs of nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <vector>

#include <mg_graph.hpp>
#include "mg_procedure.h"

namespace betweenness_centrality_util {

Expand Down Expand Up @@ -40,7 +41,7 @@ namespace betweenness_centrality_alg {
///@return A vector that contains betweenness centrality scores placed on indices that correspond
/// to the identifiers of the nodes.
///
std::vector<double> BetweennessCentrality(const mg_graph::GraphView<> &graph, bool directed, bool normalize,
std::vector<double> BetweennessCentrality(const mg_graph::GraphView<> &graph, mgp_graph *mg_graph, bool directed, bool normalize,
int threads);

} // namespace betweenness_centrality_alg
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <mg_graph.hpp>
#include <mg_procedure.h>

#include "../../biconnected_components_module/algorithm/biconnected_components.hpp"
#include "../algorithm/betweenness_centrality.hpp"
Expand Down Expand Up @@ -49,10 +50,10 @@ std::unordered_map<std::uint64_t, double> OnlineBC::NormalizeBC(
return normalized_bc_scores;
}

void OnlineBC::CallBrandesAlgorithm(const mg_graph::GraphView<> &graph, const std::uint64_t threads) {
void OnlineBC::CallBrandesAlgorithm(const mg_graph::GraphView<> &graph, mgp_graph *mg_graph, const std::uint64_t threads) {
this->node_bc_scores.clear();

const auto bc_scores = betweenness_centrality_alg::BetweennessCentrality(graph, false, false, threads);
const auto bc_scores = betweenness_centrality_alg::BetweennessCentrality(graph, mg_graph, false, false, threads);
for (std::uint64_t node_id = 0; node_id < graph.Nodes().size(); ++node_id) {
this->node_bc_scores[graph.GetMemgraphNodeId(node_id)] = bc_scores[node_id];
}
Expand Down Expand Up @@ -400,9 +401,9 @@ void OnlineBC::iCentralIteration(const mg_graph::GraphView<> &graph, const Opera
}
}

std::unordered_map<std::uint64_t, double> OnlineBC::Set(const mg_graph::GraphView<> &graph, const bool normalize,
std::unordered_map<std::uint64_t, double> OnlineBC::Set(const mg_graph::GraphView<> &graph, mgp_graph *mg_graph, const bool normalize,
const std::uint64_t threads) {
CallBrandesAlgorithm(graph, threads);
CallBrandesAlgorithm(graph, mg_graph, threads);
this->initialized = true;

if (normalize) return NormalizeBC(this->node_bc_scores, graph.Nodes().size());
Expand All @@ -424,22 +425,22 @@ std::unordered_map<std::uint64_t, double> OnlineBC::Get(const mg_graph::GraphVie
}

std::unordered_map<std::uint64_t, double> OnlineBC::EdgeUpdate(
const mg_graph::GraphView<> &prior_graph, const mg_graph::GraphView<> &current_graph, const Operation operation,
const mg_graph::GraphView<> &prior_graph, const mg_graph::GraphView<> &current_graph, mgp_graph *mg_graph, const Operation operation,
const std::pair<std::uint64_t, std::uint64_t> updated_edge, const bool normalize, const std::uint64_t threads) {
if (operation == Operation::CREATE_EDGE) {
const bool first_endpoint_isolated =
prior_graph.Neighbours(prior_graph.GetInnerNodeId(updated_edge.first)).size() == 0;
prior_graph.Neighbours(prior_graph.GetInnerNodeId(updated_edge.first)).empty();
const bool second_endpoint_isolated =
prior_graph.Neighbours(prior_graph.GetInnerNodeId(updated_edge.second)).size() == 0;
prior_graph.Neighbours(prior_graph.GetInnerNodeId(updated_edge.second)).empty();

if (first_endpoint_isolated && second_endpoint_isolated) {
return Set(current_graph, normalize, threads);
return Set(current_graph, mg_graph, normalize, threads);
} else if (first_endpoint_isolated) {
return NodeEdgeUpdate(current_graph, Operation::CREATE_ATTACH_NODE, updated_edge.first, updated_edge, normalize);
} else if (second_endpoint_isolated) {
return NodeEdgeUpdate(current_graph, Operation::CREATE_ATTACH_NODE, updated_edge.second, updated_edge, normalize);
} else {
if (!Connected(prior_graph)) return Set(current_graph, normalize, threads);
if (!Connected(prior_graph)) return Set(current_graph, mg_graph, normalize, threads);
}
}

Expand Down Expand Up @@ -474,14 +475,19 @@ std::unordered_map<std::uint64_t, double> OnlineBC::EdgeUpdate(

omp_set_dynamic(0);
omp_set_num_threads(threads);
#pragma omp parallel for
#pragma omp parallel
{
[[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(mg_graph);
#pragma omp for
for (std::uint64_t i = 0; i < array_size; i++) {
auto node_id = affected_bcc_nodes_array[i];
if (distances_first.at(node_id) != distances_second.at(node_id)) {
iCentralIteration(graph_without_updated_edge, operation, node_id, affected_bcc_nodes,
affected_bcc_articulation_points, updated_edge, peripheral_subgraph_orders);
}
}
[[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(mg_graph);
}

if (normalize) return NormalizeBC(this->node_bc_scores, current_graph.Nodes().size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include "data_structures/graph_view.hpp"

#include <mg_procedure.h>
#include <omp.h>

///@brief Remove repeat elements from vector except for the first instance. The vector is modified in-place.
Expand Down Expand Up @@ -62,7 +64,7 @@ class OnlineBC {
///
///@param graph Current graph
///@param threads Number of concurrent threads
void CallBrandesAlgorithm(const mg_graph::GraphView<> &graph, const std::uint64_t threads);
void CallBrandesAlgorithm(const mg_graph::GraphView<> &graph, mgp_graph *mg_graph, const std::uint64_t threads);

///@brief Returns whether the (undirected) graph is connected.
///
Expand Down Expand Up @@ -191,7 +193,7 @@ class OnlineBC {
///@param threads Number of concurrent threads
///
///@return {node ID, BC score} pairs
std::unordered_map<std::uint64_t, double> Set(const mg_graph::GraphView<> &graph, const bool normalize = true,
std::unordered_map<std::uint64_t, double> Set(const mg_graph::GraphView<> &graph, mgp_graph *mg_graph, const bool normalize = true,
const std::uint64_t threads = std::thread::hardware_concurrency());

///@brief Returns previously computed betweennness centrality scores.
Expand All @@ -218,7 +220,7 @@ class OnlineBC {
///
///@return {node ID, BC score} pairs
std::unordered_map<std::uint64_t, double> EdgeUpdate(
const mg_graph::GraphView<> &prior_graph, const mg_graph::GraphView<> &current_graph, const Operation operation,
const mg_graph::GraphView<> &prior_graph, const mg_graph::GraphView<> &current_graph, mgp_graph *mg_graph, const Operation operation,
const std::pair<std::uint64_t, std::uint64_t> updated_edge, const bool normalize = true,
const std::uint64_t threads = std::thread::hardware_concurrency());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void GetBetweennessCentrality(mgp_list *args, mgp_graph *memgraph_graph, mgp_res
auto graph_type = directed ? mg_graph::GraphType::kDirectedGraph : mg_graph::GraphType::kUndirectedGraph;

auto graph = mg_utility::GetGraphView(memgraph_graph, result, memory, graph_type);
auto BC = betweenness_centrality_alg::BetweennessCentrality(*graph, directed, normalize, threads);
auto BC = betweenness_centrality_alg::BetweennessCentrality(*graph, memgraph_graph, directed, normalize, threads);

auto number_of_nodes = graph->Nodes().size();
for (std::uint64_t node_id = 0; node_id < number_of_nodes; ++node_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void Set(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memo
if (threads <= 0) threads = std::thread::hardware_concurrency();

auto graph = mg_utility::GetGraphView(memgraph_graph, result, memory, mg_graph::GraphType::kUndirectedGraph);
const auto node_bc_scores = algorithm.Set(*graph, normalize, threads);
const auto node_bc_scores = algorithm.Set(*graph, memgraph_graph, normalize, threads);

for (const auto [node_id, bc_score] : node_bc_scores) {
InsertOnlineBCRecord(memgraph_graph, result, memory, node_id, bc_score);
Expand All @@ -78,7 +78,7 @@ void Get(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memo

std::unordered_map<uint64_t, double> node_bc_scores;
if (!algorithm.Initialized())
node_bc_scores = algorithm.Set(*graph, normalize);
node_bc_scores = algorithm.Set(*graph, memgraph_graph, normalize);
else
node_bc_scores = algorithm.Get(*graph, normalize);

Expand All @@ -100,7 +100,7 @@ void Update(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_m
std::unordered_map<uint64_t, double> node_bc_scores;

if (!algorithm.Initialized()) {
node_bc_scores = algorithm.Set(*graph, normalize, threads);
node_bc_scores = algorithm.Set(*graph, memgraph_graph, normalize, threads);
} else {
std::vector<std::uint64_t> graph_nodes_ids;
for (const auto [node_id] : graph->Nodes()) {
Expand Down Expand Up @@ -135,15 +135,15 @@ void Update(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_m
for (const auto created_edge : created_edges) {
prior_edges_ids.push_back(created_edge);
graph = mg_generate::BuildGraph(graph_nodes_ids, prior_edges_ids, mg_graph::GraphType::kUndirectedGraph);
node_bc_scores = algorithm.EdgeUpdate(*prior_graph, *graph, online_bc::Operation::CREATE_EDGE, created_edge,
node_bc_scores = algorithm.EdgeUpdate(*prior_graph, *graph, memgraph_graph, online_bc::Operation::CREATE_EDGE, created_edge,
normalize, threads);
prior_graph = std::move(graph);
}
for (const auto deleted_edge : deleted_edges) {
prior_edges_ids.erase(std::remove(prior_edges_ids.begin(), prior_edges_ids.end(), deleted_edge),
prior_edges_ids.end());
graph = mg_generate::BuildGraph(graph_nodes_ids, prior_edges_ids, mg_graph::GraphType::kUndirectedGraph);
node_bc_scores = algorithm.EdgeUpdate(*prior_graph, *graph, online_bc::Operation::DELETE_EDGE, deleted_edge,
node_bc_scores = algorithm.EdgeUpdate(*prior_graph, *graph, memgraph_graph, online_bc::Operation::DELETE_EDGE, deleted_edge,
normalize, threads);
prior_graph = std::move(graph);
}
Expand All @@ -163,7 +163,7 @@ void Update(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_m
algorithm.NodeEdgeUpdate(*graph, online_bc::Operation::DETACH_DELETE_NODE, deleted_nodes[0], deleted_edges[0],
normalize);
} else { // Default to offline update
node_bc_scores = algorithm.Set(*graph, normalize, threads);
node_bc_scores = algorithm.Set(*graph, memgraph_graph, normalize, threads);
}
}

Expand Down
Loading