Skip to content

Commit

Permalink
Generate Manifold Merge (#25)
Browse files Browse the repository at this point in the history
* WIP

* fix configuration expectations

* snip snip snip
  • Loading branch information
claytongentry authored Feb 11, 2025
1 parent 5973b6d commit 2513c4a
Show file tree
Hide file tree
Showing 7 changed files with 597 additions and 108 deletions.
4 changes: 4 additions & 0 deletions lib/manifold/api/workspace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Manifold
module API
# Handles terraform configuration generation
class TerraformGenerator
attr_accessor :manifold_config

def initialize(name, vectors, vector_service, manifold_yaml)
@name = name
@vectors = vectors
Expand All @@ -18,6 +20,7 @@ def generate(path)
config.add_vector(vector_config)
end
config.merge_config = @manifold_yaml["dimensions"]&.fetch("merge", nil) if @manifold_yaml["dimensions"]
config.manifold_config = @manifold_yaml
config.write(path)
end
end
Expand Down Expand Up @@ -145,6 +148,7 @@ def vectors

def generate_terraform
terraform_generator = TerraformGenerator.new(name, vectors, @vector_service, manifold_yaml)
terraform_generator.manifold_config = manifold_yaml
terraform_generator.generate(terraform_main_path)
end

Expand Down
293 changes: 252 additions & 41 deletions lib/manifold/terraform/workspace_configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,222 @@

module Manifold
module Terraform
# Handles building metrics SQL for manifold routines
class MetricsBuilder
def initialize(manifold_config)
@manifold_config = manifold_config
end

def build_metrics_struct
return "" unless @manifold_config&.dig("contexts") && @manifold_config&.dig("metrics")

context_structs = @manifold_config["contexts"].map do |name, config|
condition = build_context_condition(name, config)
metrics = build_context_metrics(condition)
"STRUCT(#{metrics}) AS #{name}"
end

context_structs.join(",\n")
end

private

def build_context_metrics(condition)
metrics = []
add_count_metrics(metrics, condition)
add_sum_metrics(metrics, condition)
metrics.join(",\n")
end

def add_count_metrics(metrics, condition)
return unless @manifold_config.dig("metrics", "countif")

metrics << "COUNTIF(#{condition}) AS #{@manifold_config["metrics"]["countif"]}"
end

def add_sum_metrics(metrics, condition)
@manifold_config.dig("metrics", "sumif")&.each do |name, config|
metrics << "SUM(IF(#{condition}, #{config["field"]}, 0)) AS #{name}"
end
end

def build_context_condition(_name, config)
return config unless config.is_a?(Hash)

operator = config["operator"]
fields = config["fields"]
build_operator_condition(operator, fields)
end

def build_operator_condition(operator, fields)
conditions = fields.map { |f| @manifold_config["contexts"][f] }
case operator
when "AND", "OR" then join_conditions(conditions, operator)
when "NOT" then negate_condition(conditions.first)
when "NAND", "NOR" then negate_joined_conditions(conditions, operator[1..])
when "XOR" then build_xor_condition(conditions)
when "XNOR" then build_xnor_condition(conditions)
else config
end
end

def join_conditions(conditions, operator)
conditions.join(" #{operator} ")
end

def negate_condition(condition)
"NOT (#{condition})"
end

def negate_joined_conditions(conditions, operator)
"NOT (#{join_conditions(conditions, operator)})"
end

def build_xor_condition(conditions)
"(#{conditions[0]} AND NOT #{conditions[1]}) OR (NOT #{conditions[0]} AND #{conditions[1]})"
end

def build_xnor_condition(conditions)
"(#{conditions[0]} AND #{conditions[1]}) OR (NOT #{conditions[0]} AND NOT #{conditions[1]})"
end
end

# Handles building SQL for manifold routines
class SQLBuilder
def initialize(name, manifold_config)
@name = name
@manifold_config = manifold_config
end

def build_manifold_merge_sql(_metrics_builder, &)
return "" unless valid_config?

<<~SQL
MERGE #{@name}.Manifold AS target USING (
#{build_metrics_cte(&)}
#{build_final_select}
) AS source
ON source.id = target.id AND source.timestamp = target.timestamp
#{build_merge_actions}
SQL
end

def build_dimensions_merge_sql(source_sql)
<<~SQL
MERGE #{@name}.Dimensions AS TARGET
USING (
#{source_sql}
) AS source
ON source.id = target.id
WHEN MATCHED THEN UPDATE SET target.dimensions = source.dimensions
WHEN NOT MATCHED THEN INSERT ROW;
SQL
end

private

def valid_config?
source_table && timestamp_field
end

def build_metrics_cte(&)
<<~SQL
WITH Metrics AS (
#{build_metrics_select(&)}
)
SQL
end

def build_metrics_select(&block)
<<~SQL
SELECT
dimensions.id id,
TIMESTAMP_TRUNC(#{timestamp_field}, #{interval}) timestamp,
STRUCT(
#{block.call}
) AS metrics
FROM `#{source_table}`
#{where_clause}
GROUP BY 1, 2
SQL
end

def build_final_select
<<~SQL
SELECT id, timestamp, #{@name}.Dimensions.dimensions, Metrics.metrics
FROM Metrics
LEFT JOIN #{@name}.Dimensions USING (id)
SQL
end

def build_merge_actions
<<~SQL
WHEN MATCHED THEN
UPDATE SET
metrics = source.metrics,
dimensions = source.dimensions
WHEN NOT MATCHED THEN
INSERT ROW;
SQL
end

def source_table
@manifold_config["source"]
end

def interval
@manifold_config&.dig("timestamp", "interval") || "DAY"
end

def where_clause
return "" unless @manifold_config["filter"]

"WHERE #{@manifold_config["filter"]}"
end

def timestamp_field
@manifold_config&.dig("timestamp", "field")
end
end

# Handles building table configurations
class TableConfigBuilder
def initialize(name)
@name = name
end

def build_table_configs
{
"dimensions" => dimensions_table_config,
"manifold" => manifold_table_config
}
end

private

def dimensions_table_config
build_table_config("Dimensions")
end

def manifold_table_config
build_table_config("Manifold")
end

def build_table_config(table_id)
{
"dataset_id" => @name,
"project" => "${var.project_id}",
"table_id" => table_id,
"schema" => "${file(\"${path.module}/tables/#{table_id.downcase}.json\")}",
"depends_on" => ["google_bigquery_dataset.#{@name}"]
}
end
end

# Represents a Terraform configuration for a Manifold workspace.
class WorkspaceConfiguration < Configuration
attr_reader :name
attr_writer :merge_config, :manifold_config

def initialize(name)
super()
Expand All @@ -17,14 +230,12 @@ def add_vector(vector_config)
@vectors << vector_config
end

attr_writer :merge_config

def as_json
{
"variable" => variables_block,
"resource" => {
"google_bigquery_dataset" => dataset_config,
"google_bigquery_table" => table_config,
"google_bigquery_table" => TableConfigBuilder.new(name).build_table_configs,
"google_bigquery_routine" => routine_config
}.compact
}
Expand All @@ -51,68 +262,68 @@ def dataset_config
}
end

def table_config
{
"dimensions" => dimensions_table_config,
"manifold" => manifold_table_config
}
end
def routine_config
routines = {
"merge_dimensions" => dimensions_routine_attributes,
"merge_manifold" => manifold_routine_attributes
}.compact

def dimensions_table_config
{
"dataset_id" => name,
"project" => "${var.project_id}",
"table_id" => "Dimensions",
"schema" => "${file(\"${path.module}/tables/dimensions.json\")}",
"depends_on" => ["google_bigquery_dataset.#{name}"]
}
routines.empty? ? nil : routines
end

def manifold_table_config
def dimensions_routine_attributes
return nil if @vectors.empty? || @merge_config.nil?

{
"dataset_id" => name,
"project" => "${var.project_id}",
"table_id" => "Manifold",
"schema" => "${file(\"${path.module}/tables/manifold.json\")}",
"routine_id" => "merge_dimensions",
"routine_type" => "PROCEDURE",
"language" => "SQL",
"definition_body" => dimensions_merge_routine,
"depends_on" => ["google_bigquery_dataset.#{name}"]
}
end

def routine_config
return nil if @vectors.empty? || @merge_config.nil?
def dimensions_merge_routine
return "" if @vectors.empty? || @merge_config.nil?

{
"merge_dimensions" => routine_attributes
}
source_sql = File.read(Pathname.pwd.join(@merge_config["source"]))
SQLBuilder.new(name, @manifold_config).build_dimensions_merge_sql(source_sql)
end

def routine_attributes
def manifold_routine_attributes
return nil unless valid_manifold_config?

{
"dataset_id" => name,
"project" => "${var.project_id}",
"routine_id" => "merge_dimensions",
"routine_id" => "merge_manifold",
"routine_type" => "PROCEDURE",
"language" => "SQL",
"definition_body" => merge_routine_definition,
"definition_body" => manifold_merge_routine,
"depends_on" => ["google_bigquery_dataset.#{name}"]
}
end

def merge_routine_definition
source_sql = read_source_sql(@merge_config["source"])
<<~SQL
MERGE #{name}.Dimensions AS TARGET
USING (
#{source_sql}
) AS source
ON source.id = target.id
WHEN MATCHED THEN UPDATE SET target.dimensions = source.dimensions
WHEN NOT MATCHED THEN INSERT ROW;
SQL
def manifold_merge_routine
metrics_builder = MetricsBuilder.new(@manifold_config)
sql_builder = SQLBuilder.new(name, @manifold_config)
sql_builder.build_manifold_merge_sql(metrics_builder) do
metrics_builder.build_metrics_struct
end
end

def valid_manifold_config?
return false unless @manifold_config

required_fields_present?
end

def read_source_sql(source_path)
File.read(Pathname.pwd.join(source_path))
def required_fields_present?
%w[source timestamp.field contexts metrics].all? do |field|
@manifold_config&.dig(*field.split("."))
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/manifold/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Manifold
VERSION = "0.0.15"
VERSION = "0.0.16"
end
Loading

0 comments on commit 2513c4a

Please sign in to comment.