diff --git a/Project.toml b/Project.toml index 9c7c056..94e5e07 100644 --- a/Project.toml +++ b/Project.toml @@ -4,10 +4,7 @@ authors = ["TheCedarPrince and contributors"] version = "0.0.1" [deps] -Chain = "8be319e6-bccf-4806-a6f7-6fae938471bc" -Combinatorics = "861a8166-3701-5b0c-9a16-15d98fcdc6aa" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" -LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" OMOPCDMCohortCreator = "f525a15e-a73f-4eef-870f-f901257eae22" [compat] diff --git a/src/fairness.jl b/src/fairness.jl index 3dbe608..90c80ce 100644 --- a/src/fairness.jl +++ b/src/fairness.jl @@ -2,14 +2,34 @@ module Fairness using DataFrames using OMOPCDMCohortCreator + import Base: + Fix2 - import Combinatorics: - powerset - import Chain: - @chain + function _counter_reducer(sub, count_name, funcs) + for fun in funcs + sub = fun(sub) + end + sub[:, Not(:person_id)] |> + x -> groupby(x, names(x)) |> + x -> combine(x, nrow => count_name) + end + + function _subset_subjects(vec, process_size) + subsets = [] + for i in 1:process_size:size(vec)[1] + if i + process_size > size(vec)[1] + push!(subsets, vec[i:end]) + else + push!(subsets, vec[i:i+process_size]) + end + end + + return subsets + end function _overlapped_subjects(cohorts, conn) + # Requirement is based on majority of phenotypes provided required_overlapping_phenotypes = ceil(length(cohorts) / 2) subjects = GetCohortSubjects(cohorts, conn) @@ -27,109 +47,223 @@ module Fairness end - function demographic_parity(cohorts, classes, conn) + function demographic_parity( + cohorts, + funcs, + conn; + labels = false, + silver = false, + reference_subjects = "", + process_size = 10000 + ) + if labels == true + _demographic_parity(cohorts, funcs, conn, + reference_subjects, + process_size, + silver) + else + _demographic_parity(cohorts, funcs, conn, reference_subjects, + process_size) + end + end - study, PP, PN = _overlapped_subjects(cohorts, conn) - - dps = DataFrame() - for class in classes - for cohort in cohorts - cohort_subjects = GetCohortSubjects(cohort, conn).subject_id + function _demographic_parity(cohorts, funcs, conn, reference_subjects, process_size, silver) - S = class(study, conn) + _funcs = [Fix2(fun, conn) for fun in funcs] - feature_name = names(S)[2] + if isempty(reference_subjects) + reference_subjects = GetDatabasePersonIDs(conn) + end - for feature in unique(S[:, 2]) - C = class(cohort_subjects, conn) - TP = - filter(row -> row[2] == feature, C) |> - filter(row -> in(row[1], PP)) + cohorts_df = GetCohortSubjects(cohorts, conn) - FP = - filter(row -> row[2] == feature, C) |> - filter(row -> in(row[1], PN)) - - N = filter(row -> row[2] == feature, S) + subsets = _subset_subjects(reference_subjects, process_size) - dp = (nrow(TP) + nrow(FP)) / nrow(N) + denom = DataFrame() + for sub in subsets + denom = vcat(denom, _counter_reducer(sub, :count_denom, _funcs)) + end - push!(dps, Dict(:cohort_definition_id => cohort, Symbol(feature_name) => feature, :dp => dp), cols = :union) + denom = groupby(denom, Not(:count_denom)) |> + x -> combine(x, :count_denom => sum => :count_denom) + + num = DataFrame() + for cohort_idx in unique(cohorts_df.cohort_definition_id) + subjects = filter(row -> row.cohort_definition_id == cohort_idx, cohorts_df).subject_id + subsets = _subset_subjects(subjects, process_size) + for sub in subsets + vals = _counter_reducer(sub, :count_num, _funcs) + vals.cohort_definition_id .= cohort_idx + num = vcat(num, vals) + end + end - end + if silver == true + _, true_subjects, _ = _overlapped_subjects(cohorts, conn) + + subsets = _subset_subjects(true_subjects, process_size) + + silver = DataFrame() + for sub in subsets + silver = vcat(silver, _counter_reducer(sub, :count_num, _funcs)) end + + silver.cohort_definition_id .= :silver + silver = groupby(silver, Not(:count_num)) |> + x -> combine(x, :count_num => sum => :count_num) + + num = vcat(num, silver) + end - return dps + num = groupby(num, Not(:count_num)) |> + x -> combine(x, :count_num => sum => :count_num) + + dps = outerjoin(num, denom; on = names(num)[1:end-2] .|> + x -> Symbol(x) => Symbol(x)) |> + x -> coalesce.(x, 0) + dps.demographic_parity = dps.count_num ./ dps.count_denom + + return dps end - function equality_of_opportunity(cohorts, classes, conn) + function _demographic_parity(cohorts, funcs, conn, reference_subjects, process_size) - study, PP, PN = _overlapped_subjects(cohorts, conn) - - eoos = DataFrame() - for class in classes - for cohort in cohorts - cohort_subjects = GetCohortSubjects(cohort, conn).subject_id + _funcs = [Fix2(fun, conn) for fun in funcs] - S = class(study, conn) + if isempty(reference_subjects) + reference_subjects = GetDatabasePersonIDs(conn) + end - feature_name = names(S)[2] + cohorts = GetCohortSubjects(cohorts, conn).subject_id - for feature in unique(S[:, 2]) - C = class(cohort_subjects, conn) - TP = - filter(row -> row[2] == feature, C) |> - filter(row -> in(row[1], PP)) + subsets = _subset_subjects(reference_subjects, process_size) - P = - filter(row -> row[2] == feature, C) + denom = DataFrame() + for sub in subsets + denom = vcat(denom, _counter_reducer(sub, :count_denom, _funcs)) + end - eoo = nrow(TP) / nrow(P) + denom = groupby(denom, names(denom)[1:end-1]) |> + x -> combine(x, :count_denom => sum => :count_denom) - push!(eoos, Dict(:cohort_definition_id => cohort, Symbol(feature_name) => feature, :eoo => eoo), cols = :union) + subsets = _subset_subjects(cohorts, process_size) - end - end + num = DataFrame() + for sub in subsets + num = vcat(num, _counter_reducer(sub, :count_num, _funcs)) end - return eoos + num = groupby(num, names(num)[1:end-1]) |> + x -> combine(x, :count_num => sum => :count_num) + + dps = outerjoin(num, denom; on = names(num)[1:end-1] .|> + x -> Symbol(x) => Symbol(x)) |> + x -> coalesce.(x, 0) + dps.demographic_parity = dps.count_num ./ dps.count_denom + + return dps end - function predictive_rate_parity(cohorts, classes, conn) + function equality_of_opportunity(cohorts, funcs, conn; reference_subjects = "", process_size = 10000) - study, PP, PN = _overlapped_subjects(cohorts, conn) - - prps = DataFrame() - for class in classes - for cohort in cohorts - cohort_subjects = GetCohortSubjects(cohort, conn).subject_id + _funcs = [Fix2(fun, conn) for fun in funcs] - S = class(study, conn) + study_subjects, true_subjects, false_subjects = _overlapped_subjects(cohorts, conn) - feature_name = names(S)[2] + subsets = _subset_subjects(true_subjects, process_size) - for feature in unique(S[:, 2]) - C = class(cohort_subjects, conn) - TP = - filter(row -> row[2] == feature, C) |> - filter(row -> in(row[1], PP)) + denom = DataFrame() + for sub in subsets + denom = vcat(denom, _counter_reducer(sub, :count_denom, _funcs)) + end - FP = - filter(row -> row[2] == feature, C) |> - filter(row -> in(row[1], PN)) + denom = groupby(denom, names(denom)[1:end-1]) |> + x -> combine(x, :count_denom => sum => :count_denom) - prp = nrow(TP) / (nrow(TP) + nrow(FP)) + eoo = DataFrame() + for cohort_idx in cohorts - push!(prps, Dict(:cohort_definition_id => cohort, Symbol(feature_name) => feature, :prp => prp), cols = :union) + cohort = GetCohortSubjects(cohort_idx, conn) + cohort = filter(row -> in(row.subject_id, true_subjects), cohort) - end + subsets = _subset_subjects(cohort.subject_id, process_size) + + num = DataFrame() + for sub in subsets + num = vcat(num, _counter_reducer(sub, :count_num, _funcs)) end + + num = groupby(num, names(num)[1:end-1]) |> + x -> combine(x, :count_num => sum => :count_num) + + cohort = outerjoin(num, denom; on = names(num)[1:end-1] .|> + x -> Symbol(x) => Symbol(x)) |> + x -> coalesce.(x, 0) + + cohort.equality_of_opportunity = cohort.count_num ./ cohort.count_denom + + cohort.cohort_definition_id = ones(Int, nrow(cohort)) .* cohort_idx + eoo = vcat(eoo, cohort) end - return prps + return eoo + end + + function predictive_rate_parity(cohorts, funcs, conn; reference_subjects = "", process_size = 10000) + + _funcs = [Fix2(fun, conn) for fun in funcs] + + study_subjects, true_subjects, false_subjects = _overlapped_subjects(cohorts, conn) + + prp = DataFrame() + for cohort_idx in cohorts + cohort = GetCohortSubjects(cohort_idx, conn) + true_cohort = filter(row -> in(row.subject_id, true_subjects), cohort) + false_cohort = filter(row -> in(row.subject_id, false_subjects), cohort) + + subsets = _subset_subjects(true_cohort.subject_id, process_size) + + num = DataFrame() + for sub in subsets + num = vcat(num, _counter_reducer(sub, :count_num, _funcs)) + end + + subsets = _subset_subjects(false_cohort.subject_id, process_size) + + if !isempty(subsets) + false_denom = DataFrame() + for sub in subsets + false_denom = vcat(false_denom, _counter_reducer(sub, :count_num, _funcs)) + end + denom = vcat(num, false_denom) + denom = groupby(denom, names(denom)[1:end-1]) |> + x -> combine(x, :count_num => sum => :count_denom) + else + denom = num + denom = groupby(denom, names(denom)[1:end-1]) |> + x -> combine(x, :count_num => sum => :count_denom) + end + + num = groupby(num, names(num)[1:end-1]) |> + x -> combine(x, :count_num => sum => :count_num) + + cohort = outerjoin(num, denom; on = names(num)[1:end-1] .|> + x -> Symbol(x) => Symbol(x)) |> + x -> coalesce.(x, 0) + + cohort.predictive_rate_parity = cohort.count_num ./ cohort.count_denom + + cohort.cohort_definition_id = ones(Int, nrow(cohort)) .* cohort_idx + prp = vcat(prp, cohort) + end + + return prp end + + export demographic_parity, equality_of_opportunity,predictive_rate_parity + end