--- title: Extraction de la fonction de répartition d'une variable en fonction du RFR : création de copules keywords: fastai sidebar: home_sidebar nb_path: "notebooks/calib_and_copule.ipynb" ---
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
# year = "2018"
# IN_PATH = r"C:\Users\Public\Documents\TRAVAIL\csg\data_in/"
# ARROW_IN = (
# IN_PATH
# + "assiettes_pote_brutes_"
# + year
# + "-arrow/assiettes_pote_sorted_"
# + year
# + "*.arrow"
# )
# # IN_POTE="/media/data-nvme/dev/src/LEXIMPACT/fake_pote_partial.parquet"
# IN_POTE = "/media/data-nvme/dev/src/LEXIMPACT/fake_pote_full.parquet"
# # OUT_PATH = "/media/data-nvme/dev/src/LEXIMPACT/copules/"
# OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\csg\data_out/"
import json
import sys
import unittest
import numpy as np
import seaborn as sns
from tqdm import tqdm
tc = unittest.TestCase()
Ajoute des frontières pour les hauts revenus et vérifie le respect du nombre de personne à l'intérieur de chaque frontière.
La découpe se fait en 100 tranches égales, en terme de personnes, auxquelles on ajoute des tranches plus fines sur les hauts revenus.
Cependant le découpage est limité par le respect du secret statistique :
- Pas moins de 12 personnes par tranche
La vérification qu'un foyer de la tranche ne représente pas plus que 85% de la valeur total des montants de la tranche est fait dans une autre fonction. Ici on découpe sans s'occuper du contenu, seulement du nombre d'éléments.
Pour comprendre le besoin de mieux détailler les hauts revenus, voici les foyers qui existaient en 2019 dans les trés hauts revenus fiscaux de références :
Ce faux jeux de données va nous permettre de tester notre solution sur un problème simplifié.
On va considérer que le RFR croit linéairement dans une population de 10 000 foyers. Et une variable va évoluer en fonction du RFR. Tout en pouvant être à zéro.
Nous pourrons ainsi constater facilement si notre distribution générée correspond à celle initiale.
sns.set(rc={"figure.figsize": (20, 8)})
df = get_fake_data(set_some_var_to_zero=True)
sns.scatterplot(data=df)
rfrs_sorted = pandas_to_vaex(df)
une_tranche_rfr = prepare_tranche_rfr_vaex(rfrs_sorted, 1, debug=True)
une_tranche_rfr
%%time
variable = "revkire"
nb_bucket_var = 10
out = compute_copule_vaex(
rfrs_sorted, variable, nb_bucket_var, une_tranche_rfr, debug=True
)
# out
out["copules"][0]["buckets"][5]
s = 0
for i in range(len(out["copules"][0]["buckets"])):
s += out["copules"][0]["buckets"][i]["sum_tranche_var"]
assert s == rfrs_sorted[variable].sum()
del out
%%time
calib = get_calib(rfrs_sorted, variable, 100)
calib["buckets"][3]
nb_bucket_rfr = 100
variable = "var"
copules = get_copules(rfrs_sorted, nb_bucket_rfr, variable, nb_bucket_var)
assert (
copules["copules"][0]["nb_foyer"]["zero"]
+ copules["copules"][0]["nb_foyer"]["nonzero"]
== 1100
)
rfrs_sorted
for cop in copules["copules"][-3:]:
print(
f"Nombre de personnes avec un VAR entre {cop['lower_bound']} et {cop['upper_bound']} : {cop['nb_foyer']}"
)
# assert 14 <= cop["nb_foyer"]["zero"] <= 28
assert compute_pop_copules(copules) == 11_000
df_copules = copules_to_df(copules)
sns.scatterplot(data=df_copules, x=df_copules.index, y="lower_bound")
sns.scatterplot(data=df_copules, x=df_copules.index, y="mean_tranche_var")
On retrouve bien notre distribution initiale :
ax = sns.scatterplot(data=df)
# copules
sns.scatterplot(data=df_copules, x=df_copules.index, y="ratio_nb_above_seuil")
nb_respect_secret_statistique = 12
nb_elements_a_decouper = nb_respect_secret_statistique - 1
nb_bucket = 3
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == []
nb_elements_a_decouper = nb_respect_secret_statistique
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12]
nb_elements_a_decouper = nb_respect_secret_statistique + 1
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [13]
nb_elements_a_decouper = nb_respect_secret_statistique * 2
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 24]
nb_elements_a_decouper = 3 * nb_respect_secret_statistique - 1
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [17, 35]
nb_elements_a_decouper = 3 * nb_respect_secret_statistique
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 24, 36]
nb_elements_a_decouper = 3 * nb_respect_secret_statistique + 1
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 24, 37]
nb_elements_a_decouper = 100
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 25, 37, 50, 62, 75, 87, 100]
nb_elements_a_decouper = nb_respect_secret_statistique * 100
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == 100
assert get_ecart_frontiere(frontieres) != False
nb_bucket = 10
nb_elements_a_decouper = (nb_respect_secret_statistique * 10) * nb_bucket
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == nb_bucket + 1
assert get_ecart_frontiere(frontieres) != False
nb_bucket = 10
nb_elements_a_decouper = (nb_respect_secret_statistique * 100) * nb_bucket
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == nb_bucket + 2
assert get_ecart_frontiere(frontieres) != False
nb_bucket = 10
nb_elements_a_decouper = (nb_respect_secret_statistique * 1_000_00) * nb_bucket
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == nb_bucket + 5
assert get_ecart_frontiere(frontieres) != False
print(get_ecart_frontiere(frontieres)[-1])
print(nb_elements_a_decouper)
get_ecart_frontiere(frontieres)[-1] / nb_elements_a_decouper
print(f"{1e-6:2f}")
test_dict = {"revkire": [0, 1, 2, 3]}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(vdf_test, 1)
tranche_rfr_small_test
test_dict = {"revkire": [0, 0, 0, 0]}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(vdf_test, 1)
test_dict = {"revkire": [0, 1, 0, 0]}
vdf_test = vaex.from_dict(test_dict)
with tc.assertRaises(DatasetNotSorted):
prepare_tranche_rfr_vaex(vdf_test, 1, debug=True)
variable_small_test = "ma_var"
nb_bucket_rfr_small_test = 5
nb_bucket_var_small_test = 3
test_dict = {
"revkire": [0 for i in range(500)] + [i + 1 for i in range(500)] + [500_000],
variable_small_test: [0 for i in range(500)] + [i + 1 for i in range(500)] + [100],
}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(
vdf_test, nb_bucket_rfr_small_test, debug=True
)
assert tranche_rfr_small_test["frontieres_ff"][-1] == vdf_test.count()
assert (
len(tranche_rfr_small_test["frontieres_ff"]) == nb_bucket_rfr_small_test - 2 + 1
) # +1 car on ajoute les derniers 10%
assert (
len(tranche_rfr_small_test["frontieres_RFR"]) == nb_bucket_rfr_small_test - 1 + 1
) # +1 car on ajoute les derniers 10%
assert tranche_rfr_small_test["frontieres_ff"] == [600, 800, 901, 1001]
vdf_test[["revkire"]][1000][0]
tranche_rfr_small_test
variable_small_test = "ma_var"
nb_bucket_rfr_small_test = 3
nb_bucket_var_small_test = 3
test_dict = {
"revkire": [0 for i in range(5)] + [i + 1 for i in range(50)] + [500_000],
variable_small_test: [0 for i in range(5)] + [i + 1 for i in range(50)] + [100],
}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(
vdf_test, nb_bucket_rfr_small_test, debug=True
)
assert tranche_rfr_small_test["frontieres_ff"][-1] == vdf_test.count()
assert len(tranche_rfr_small_test["frontieres_ff"]) == nb_bucket_rfr_small_test
assert len(tranche_rfr_small_test["frontieres_RFR"]) == nb_bucket_rfr_small_test + 1
assert tranche_rfr_small_test["frontieres_ff"] == [18, 37, 56]
tranche_rfr_small_test
variable_small_test = "ma_var"
nb_bucket_rfr_small_test = 3
nb_bucket_var_small_test = 3
var_1 = [random.randint(0, 100) for i in range(5 + 50)]
var_1.sort()
test_dict = {
"revkire": var_1,
variable_small_test: [random.randint(0, 100) for i in range(5 + 50)],
}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(
vdf_test, nb_bucket_rfr_small_test, debug=True
)
assert tranche_rfr_small_test["frontieres_ff"][-1] == vdf_test.count()
assert len(tranche_rfr_small_test["frontieres_ff"]) == nb_bucket_rfr_small_test
assert len(tranche_rfr_small_test["frontieres_RFR"]) == nb_bucket_rfr_small_test + 1
calib = get_calib(rfrs_sorted, variable, 3)
# for b in calib["buckets"]:
# print(b["seuil_var_inf"])
id_rm = 2
new_calib = bucket_merge_with_above(calib, id_rm)
calib["buckets"][id_rm]
calib["buckets"][id_rm + 1]
# for b in new_calib["buckets"]:
# print(b["seuil_var_inf"])
tc.assertEqual(
new_calib["buckets"][id_rm]["seuil_var_inf"],
calib["buckets"][id_rm]["seuil_var_inf"],
)
tc.assertEqual(
new_calib["buckets"][id_rm]["seuil_var_supp"],
calib["buckets"][id_rm + 1]["seuil_var_supp"],
)
sum_pond = (
calib["buckets"][id_rm]["mean_tranche_var"]
* calib["buckets"][id_rm]["nombre_ff_tranche"]
+ calib["buckets"][id_rm + 1]["mean_tranche_var"]
* calib["buckets"][id_rm + 1]["nombre_ff_tranche"]
)
sum_obs = (
calib["buckets"][id_rm]["nombre_ff_tranche"]
+ calib["buckets"][id_rm + 1]["nombre_ff_tranche"]
)
tc.assertEqual(
new_calib["buckets"][id_rm]["mean_tranche_var"],
sum_pond / sum_obs,
)
tc.assertEqual(len(new_calib["buckets"]), 5)
new_calib_reduce = reduce_bucket_number(new_calib, 0.8)
tc.assertEqual(len(new_calib_reduce["buckets"]), 5 - 1)
rfr = []
nb_foy = 16
for i in range(nb_foy):
if i % 2:
var = 5.0 if i <= nb_foy / 2 else 10.0
else:
var = 0.0
un_rfr = {
"revkire": i,
"var": var,
}
rfr.append(un_rfr)
df = pd.DataFrame(rfr)
# df.describe()
vaex_df = pandas_to_vaex(df)
copules = get_copules(vaex_df, 1, "var", 2, nb_respect_secret_statistique=1)
assert len(copules["copules"]) == 1
assert len(copules["copules"][0]["buckets"]) == 3
tc.assertEqual(
copules,
{
"controle": [],
"copules": [
{
"lower_bound": 0,
"upper_bound": 1000000000000000,
"nb_foyer": {"zero": 8, "nonzero": 8},
"buckets": [
{
"seuil_var_inf": 0,
"seuil_var_supp": 2.5,
"nombre_ff_tranche": 8,
"sum_tranche_var": 0,
"mean_tranche_var": 0,
"stdev_tranche_var": 0,
"nb_above_seuil": 8,
"sum_var_above_seuil": 60.0,
"ratio_nb_above_seuil": 0.5,
"mean_var_above_seuil": 7.5,
},
{
"seuil_var_inf": 2.5,
"seuil_var_supp": 7.5,
"nombre_ff_tranche": 4,
"sum_tranche_var": 20.0,
"mean_tranche_var": 5.0,
"stdev_tranche_var": 0.0,
"nb_above_seuil": 4,
"sum_var_above_seuil": 40.0,
"ratio_nb_above_seuil": 0.25,
"mean_var_above_seuil": 10.0,
},
{
"seuil_var_inf": 7.5,
"seuil_var_supp": 10.0,
"nombre_ff_tranche": 4,
"sum_tranche_var": 40.0,
"mean_tranche_var": 10.0,
"stdev_tranche_var": 0.0,
"nb_above_seuil": 0,
"sum_var_above_seuil": 0,
"ratio_nb_above_seuil": 0,
"mean_var_above_seuil": 0,
},
],
}
],
},
)
variable_values = [random.randint(1, 1000) for i in range(50)]
with tc.assertRaises(DatasetNotSorted):
dis = DistribDeVarVaex(
variable_values=variable_values,
variable="variable",
nb_ff=len(variable_values),
nb_bucket_var=2,
lower_bound=50,
upper_bound=1e10,
debug=False,
)
variable_values = [1 for i in range(12)] + [1 for i in range(12)]
dis = DistribDeVarVaex(
variable_values=variable_values,
variable="variable",
nb_ff=len(variable_values),
nb_bucket_var=2,
lower_bound=50,
upper_bound=1e10,
debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][2]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][1]["sum_tranche_var"] == sum(variable_values) / 2
assert res["buckets"][2]["sum_tranche_var"] == sum(variable_values) / 2
tc.assertEqual(res["buckets"][1]["stdev_tranche_var"], 0.0)
variable_values = [1 for i in range(12)] + [2 for i in range(12)]
dis = DistribDeVarVaex(
variable_values=variable_values,
variable="variable",
nb_ff=len(variable_values),
nb_bucket_var=2,
lower_bound=50,
upper_bound=1e10,
debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][2]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][1]["sum_tranche_var"] == 12
assert res["buckets"][2]["sum_tranche_var"] == 24
variable_values = [1 for i in range(12)] + [i + 13 for i in range(12)]
dis = DistribDeVarVaex(
variable_values=variable_values,
variable="variable",
nb_ff=len(variable_values),
nb_bucket_var=2,
lower_bound=50,
upper_bound=1e10,
debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][2]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][1]["sum_tranche_var"] == 12
assert res["buckets"][2]["sum_tranche_var"] == sum(i + 13 for i in range(12))
variable_values = (
[1 for i in range(12)]
+ [i + 13 for i in range(12)]
+ [i * 10 for i in range(12, 12 + 12)]
)
# variable_values.sort()
dis = DistribDeVarVaex(
variable_values=variable_values,
variable="variable",
nb_ff=len(variable_values),
nb_bucket_var=3,
lower_bound=50,
upper_bound=1e10,
debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == 12
assert res["buckets"][2]["nombre_ff_tranche"] == 12
assert res["buckets"][3]["nombre_ff_tranche"] == 12
assert res["buckets"][1]["sum_tranche_var"] == sum(1 for i in range(12))
assert res["buckets"][2]["sum_tranche_var"] == sum(i + 13 for i in range(12))
assert res["buckets"][3]["sum_tranche_var"] == sum(i * 10 for i in range(12, 12 + 12))
assert res["buckets"][1]["sum_var_above_seuil"] == sum(
[i + 13 for i in range(12)] + [i * 10 for i in range(12, 12 + 12)]
)
assert res["buckets"][2]["sum_var_above_seuil"] == sum(
i * 10 for i in range(12, 12 + 12)
)
assert res["buckets"][3]["sum_var_above_seuil"] == 0
variable_values = (
[1 for i in range(12)]
+ [i + 13 for i in range(12)]
+ [30 + i * 10 for i in range(12)]
)
dis = DistribDeVarVaex(
variable_values=variable_values,
variable="variable",
nb_ff=len(variable_values) + 12,
nb_bucket_var=3,
lower_bound=50,
upper_bound=1e10,
debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 12
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 12
assert res["buckets"][1]["nombre_ff_tranche"] == 12
assert res["buckets"][2]["nombre_ff_tranche"] == 12
assert res["buckets"][3]["nombre_ff_tranche"] == 12
assert res["buckets"][1]["sum_tranche_var"] == sum(1 for i in range(12))
assert res["buckets"][2]["sum_tranche_var"] == sum(i + 13 for i in range(12))
assert res["buckets"][3]["sum_tranche_var"] == sum(30 + i * 10 for i in range(12))
assert res["buckets"][1]["sum_var_above_seuil"] == sum(
[i + 13 for i in range(12)] + [30 + i * 10 for i in range(12)]
)
assert res["buckets"][2]["sum_var_above_seuil"] == sum(30 + i * 10 for i in range(12))
assert res["buckets"][3]["sum_var_above_seuil"] == 0
variable_values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
dis = DistribDeVarVaex(
variable_values=variable_values,
variable="variable",
nb_ff=4,
nb_bucket_var=1,
lower_bound=50,
upper_bound=1e10,
debug=False,
)
res = dis.to_dict()
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == 12
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == 12
assert res["buckets"][1]["sum_tranche_var"] == sum(variable_values)
dis = DistribDeVarVaex(
variable_values=[1, 2, 3, 4],
variable="variable",
nb_ff=4,
nb_bucket_var=1,
lower_bound=0,
upper_bound=10 ^ 15,
debug=True,
)
res = dis.to_dict()
assert res["buckets"] == "Error, moins de 12 éléments !"
variable_values = [i + 1 for i in range(13)]
variable = "revkire"
nb_ff = 100
bdr = DistribDeVarVaex(
variable_values=variable_values,
variable=variable,
nb_ff=nb_ff,
nb_bucket_var=4,
debug=False,
)
result = bdr.to_dict()
# result
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert result["buckets"][1]["nombre_ff_tranche"] == len(variable_values)
assert result["buckets"][1]["sum_tranche_var"] == sum(variable_values)
assert len(result["buckets"]) == 2
expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(20)]
variable_values = (
expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
bdr = DistribDeVarVaex(
variable_values=variable_values,
variable=variable,
nb_ff=nb_ff,
nb_bucket_var=nb_bucket_var,
lower_bound=prev_seuil,
upper_bound=seuil,
debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert result["buckets"][1]["nombre_ff_tranche"] == len(expected_1_bucket)
assert result["buckets"][2]["nombre_ff_tranche"] == len(expected_2_bucket)
assert result["buckets"][3]["nombre_ff_tranche"] == len(expected_3_bucket)
assert result["buckets"][4]["nombre_ff_tranche"] == len(expected_4_bucket)
assert result["buckets"][1]["sum_tranche_var"] == sum(expected_1_bucket)
assert result["buckets"][2]["sum_tranche_var"] == sum(expected_2_bucket)
assert result["buckets"][3]["sum_tranche_var"] == sum(expected_3_bucket)
assert result["buckets"][4]["sum_tranche_var"] == sum(expected_4_bucket)
expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(20)]
variable_values = (
expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
bdr = DistribDeVarVaex(
variable_values=variable_values,
variable=variable,
nb_ff=nb_ff,
nb_bucket_var=nb_bucket_var,
lower_bound=prev_seuil,
upper_bound=seuil,
debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert len(result["buckets"]) == 4 + 1
assert result["buckets"][0]["nombre_ff_tranche"] == 20
assert result["buckets"][1]["nombre_ff_tranche"] == len(expected_2_bucket)
assert result["buckets"][2]["nombre_ff_tranche"] == len(expected_3_bucket)
assert result["buckets"][3]["nombre_ff_tranche"] == len(expected_4_bucket)
assert result["buckets"][4]["nombre_ff_tranche"] == len(expected_4_bucket)
assert result["buckets"][1]["sum_tranche_var"] == sum(expected_1_bucket)
assert result["buckets"][2]["sum_tranche_var"] == sum(expected_2_bucket)
assert result["buckets"][3]["sum_tranche_var"] == sum(expected_3_bucket)
assert result["buckets"][4]["sum_tranche_var"] == sum(expected_4_bucket)
expected_1_bucket = [i + 1 for i in range(12)]
expected_2_bucket = [(i + 20) * 2 for i in range(12)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(12)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(10)]
variable_values = (
expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
# print(f"{variable_values=} {len(variable_values)=}")
bdr = DistribDeVarVaex(
variable_values=variable_values,
variable=variable,
nb_ff=nb_ff,
nb_bucket_var=nb_bucket_var,
lower_bound=prev_seuil,
upper_bound=seuil,
debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert len(result["buckets"]) == 3 + 1
assert result["buckets"][1]["nombre_ff_tranche"] == 15
assert result["buckets"][2]["nombre_ff_tranche"] == 15
assert result["buckets"][3]["nombre_ff_tranche"] == 16
assert result["buckets"][0]["sum_tranche_var"] == 0
assert result["buckets"][1]["sum_tranche_var"] == 204
assert result["buckets"][2]["sum_tranche_var"] == 1251
assert result["buckets"][3]["sum_tranche_var"] == 3453
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(result["buckets"])):
s += result["buckets"][i]["sum_tranche_var"]
assert s == sum(variable_values)
expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(19)] + [30000] # 0.851
print(30_000 / sum(variable_values))
variable_values = (
expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
# print(f"{variable_values=} {len(variable_values)=}")
bdr = DistribDeVarVaex(
variable_values=variable_values,
variable=variable,
nb_ff=nb_ff,
nb_bucket_var=nb_bucket_var,
lower_bound=prev_seuil,
upper_bound=seuil,
debug=True,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert len(result["buckets"]) == 4 # Et non 5 à cause du secret statistique
# assert result["buckets"] == ["SECRET STATISTIQUE NON RESPECTE"]
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(result["buckets"])):
s += result["buckets"][i]["sum_tranche_var"]
assert s == sum(variable_values)
expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = (
[(i + 20 * 3) * 4 for i in range(9)]
+ [30000]
+ [(i + 20 * 3) * 4 for i in range(10)]
) # 0.851
print(30_000 / sum(variable_values))
variable_values = (
expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
# print(f"{variable_values=} {len(variable_values)=}")
bdr = DistribDeVarVaex(
variable_values=variable_values,
variable=variable,
nb_ff=nb_ff,
nb_bucket_var=nb_bucket_var,
lower_bound=prev_seuil,
upper_bound=seuil,
debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
# assert result["buckets"] == ["SECRET STATISTIQUE NON RESPECTE"]
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(result["buckets"])):
s += result["buckets"][i]["sum_tranche_var"]
assert s == sum(variable_values)
expected_2_bucket = [2, 2, 2, 2, 2, 2]
expected_3_bucket = [4, 4, 4, 6, 6, 6]
expected_4_bucket = [100, 7, 7, 4000, 2456.654, 6.4658]
variable_values = expected_2_bucket + expected_3_bucket + expected_4_bucket
variable_values.sort()
dis = DistribDeVarVaex(
variable_values=variable_values,
variable="variable",
nb_ff=len(variable_values),
nb_bucket_var=3,
nb_respect_secret_statistique=1,
debug=False,
)
res = dis.to_dict()
tc.assertEqual(
res["buckets"][1]["stdev_tranche_var"], statistics.stdev(expected_2_bucket)
)
tc.assertEqual(
res["buckets"][2]["stdev_tranche_var"], statistics.stdev(expected_3_bucket)
)
tc.assertEqual(
res["buckets"][3]["stdev_tranche_var"], statistics.stdev(expected_4_bucket)
)
%%time
out = compute_copule_vaex(
vdf_test,
variable_small_test,
nb_bucket_var_small_test,
tranche_rfr_small_test,
debug=True,
)
len(out["copules"][-1]["buckets"])
assert len(out["copules"]) == len(tranche_rfr_small_test["frontieres_ff"])
assert len(out["copules"][-1]["buckets"]) == 1 + 1
s = 0
for i in range(len(out["copules"])):
for j in range(len(out["copules"][i]["buckets"])):
s += out["copules"][i]["buckets"][j]["sum_tranche_var"]
assert s == int(vdf_test.sum(f"{variable_small_test}"))
s = 0
for i in range(3):
s += (
out["copules"][i]["nb_foyer"]["zero"] + out["copules"][i]["nb_foyer"]["nonzero"]
)
assert s == int(vdf_test.count(variable_small_test))
s = 0
for i in range(3):
s += out["copules"][i]["buckets"][1]["nombre_ff_tranche"]
assert s == int(
vdf_test.count(variable_small_test, selection=[vdf_test[variable_small_test] > 0])
)
variable_small_test = "ma_var"
nb_bucket_rfr_small_test2 = 10
nb_bucket_var_small_test2 = 3
test_dict = {
"revkire": [0 for i in range(50)] + [i + 1 for i in range(110)] + [500_000],
variable_small_test: [0 for i in range(50)] + [i + 1 for i in range(110)] + [100],
}
vdf_test2 = vaex.from_dict(test_dict)
tranche_rfr_small_test2 = prepare_tranche_rfr_vaex(
vdf_test2, nb_bucket_rfr_small_test2, debug=True
)
tranche_rfr_small_test2
assert tranche_rfr_small_test2["frontieres_ff"][-1] == vdf_test2.count()
assert len(tranche_rfr_small_test2["frontieres_ff"]) == 7
assert tranche_rfr_small_test2["frontieres_ff"] == [64, 80, 96, 112, 128, 144, 161]
out = compute_copule_vaex(
vdf=vdf_test2,
variable=variable_small_test,
nb_bucket_var=nb_bucket_var_small_test2,
tranche_RFR=tranche_rfr_small_test2,
debug=False,
)
s = 0
for i in range(len(out["copules"])):
s += out["copules"][i]["buckets"][1]["sum_tranche_var"]
assert s == int(vdf_test2.sum(f"{variable_small_test}"))
s = 0
for i in range(len(out["copules"])):
s += (
out["copules"][i]["nb_foyer"]["zero"] + out["copules"][i]["nb_foyer"]["nonzero"]
)
assert s == int(vdf_test2.count(variable_small_test))
s = 0
for i in range(len(out["copules"])):
s += out["copules"][i]["buckets"][1]["nombre_ff_tranche"]
assert s == int(
vdf_test2.count(variable_small_test, selection=[vdf_test[variable_small_test] > 0])
)
from nbdev.export import notebook2script
notebook2script()