--- title: Extraction de la fonction de répartition d'une variable en fonction du RFR : création de copules keywords: fastai sidebar: home_sidebar nb_path: "notebooks/calib_and_copule.ipynb" ---
{% raw %}
{% endraw %} {% raw %}
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
{% endraw %} {% raw %}
# year = "2018"
# IN_PATH = r"C:\Users\Public\Documents\TRAVAIL\csg\data_in/"
# ARROW_IN = (
#     IN_PATH
#     + "assiettes_pote_brutes_"
#     + year
#     + "-arrow/assiettes_pote_sorted_"
#     + year
#     + "*.arrow"
# )

# # IN_POTE="/media/data-nvme/dev/src/LEXIMPACT/fake_pote_partial.parquet"
# IN_POTE = "/media/data-nvme/dev/src/LEXIMPACT/fake_pote_full.parquet"
# # OUT_PATH = "/media/data-nvme/dev/src/LEXIMPACT/copules/"
# OUT_PATH = r"C:\Users\Public\Documents\TRAVAIL\csg\data_out/"
{% endraw %} {% raw %}
{% endraw %} {% raw %}
import json
import sys
import unittest

import numpy as np
import seaborn as sns
from tqdm import tqdm

tc = unittest.TestCase()
{% endraw %} {% raw %}

class DatasetNotSorted[source]

DatasetNotSorted() :: Exception

Common base class for all non-exit exceptions.

{% endraw %} {% raw %}
{% endraw %}

Méthode de découpage de frontière dynamique

Ajoute des frontières pour les hauts revenus et vérifie le respect du nombre de personne à l'intérieur de chaque frontière.

La découpe se fait en 100 tranches égales, en terme de personnes, auxquelles on ajoute des tranches plus fines sur les hauts revenus.

Cependant le découpage est limité par le respect du secret statistique :

- Pas moins de 12 personnes par tranche

La vérification qu'un foyer de la tranche ne représente pas plus que 85% de la valeur total des montants de la tranche est fait dans une autre fonction. Ici on découpe sans s'occuper du contenu, seulement du nombre d'éléments.

Pour comprendre le besoin de mieux détailler les hauts revenus, voici les foyers qui existaient en 2019 dans les trés hauts revenus fiscaux de références :

  • Entre 10 millions et 100 millions : 294 soit 7.538461538461539 7 personnes sur 1_000_000
  • Entre 1 million et 10 millions : 10 061 soit 2.5797435897435896 2 personnes sur 10_000
  • Entre 500 000 € et 1 million : 22 745 soit 5.832051282051282 sur 6 personnes sur 10_000
  • Entre 250 000 et 500 000 : 88 849 soit 2.278179487179487 sur 2 personnes sur 1_000
  • Entre 150 000 et 250 000 : 236 470 soit 6.063333333333333 sur 6 personnes sur 1_000
{% raw %}

get_frontieres[source]

get_frontieres(nb_lignes:int, nb_bucket:int, ajout_ratio_supp=[0.1, 0.01, 0.001, 0.0001, 1e-05, 1e-06], nb_respect_secret_statistique=12, debug=False)

{% endraw %} {% raw %}
{% endraw %}

Méthode de calcul des tranches de RFR

Objectif : Déterminer les tranches de RFR.

On appel la méthode de découpe en tranche, puis on retire celles où le revenu le plus élevé est zéro.

{% raw %}

prepare_tranche_rfr_vaex[source]

prepare_tranche_rfr_vaex(vdx_sort:DataFrameLocal, nb_bucket_rfr:int, nb_respect_secret_statistique=12, debug=False)

Objectif: Splitter le RFR en m buckets Dans chaque bucket on stocke toutes les valeurs non nulles de "variable" ::vdx_sort:: Le dataset, trié par RFR ::nb_bucket_rfr:: Nombre de tranches souhaitées ::debug:: Pour activer un mode debug, qui affiche des traces

{% endraw %} {% raw %}
{% endraw %}

Méthode de préparation des tranches de variables à analyser

Objectif : Pour chaque tranche de RFR :

  • Extraire les valeurs de la variable secondaire.
  • Retirer les valeurs à 0.
  • Les triers par ordre croissant.
  • Appeler la méthode de calcul des copules DistribDeVar.
{% raw %}

compute_copule_vaex[source]

compute_copule_vaex(vdf:DataFrameLocal, variable:str, nb_bucket_var:int, tranche_RFR:List[T], debug=False, nb_respect_secret_statistique=12)

On nous donne des tranches de RFR, en nombre de personne, et en valeur de RFR Pour chacune de ses tranches on doit extraire les valeurs de 'variable' On ne garde que celle supérieure à 0 et on les envoie à DistribDeVarVaex ::vdf:: Le jeux de données ::variable:: Nom de la variable secondaire. ::nb_bucket_var:: Nombre de tranches de variable secondaire souhaités. ::tranche_RFR:: La liste des tranches de RFR. ::debug:: Pour activer un mode debug, qui affiche des traces. ::nb_respect_secret_statistique:: Nombre minimal d'individus pour respecter le secret statistique.

{% endraw %} {% raw %}
{% endraw %}

Méthode de calcul des copules

Objectif :

  • Découper en tranche la variable secondaire, dans une tranche de RFR donné.
  • Vérifier le respect du secret statistique.
  • Sauvegarder le nombre de foyer et la somme de la variable pour chaque tranche.
{% raw %}

class DistribDeVarVaex[source]

DistribDeVarVaex(variable_values:List[T], variable:str, nb_ff:int, nb_bucket_var=10, lower_bound=0, upper_bound=5, nb_respect_secret_statistique=12, debug=False)

On créée une classe qui, pour un bucket de RFR donné [lower_bound, upper_bound], va générer la distribution des Rk (ou autre Variable) de ce bucket (que l'on a dans liste_des_rk). Cette distribution est retournée sous la forme: resultat = [ [Nb de gens1,Somme des Rk 1],[Nb2, Sum2], [], ... , [Nb N, Sum N]] avec N le nb de buckets de Rk

{% endraw %} {% raw %}
{% endraw %}

Fabrication d'un faux jeux de données

Ce faux jeux de données va nous permettre de tester notre solution sur un problème simplifié.

On va considérer que le RFR croit linéairement dans une population de 10 000 foyers. Et une variable va évoluer en fonction du RFR. Tout en pouvant être à zéro.

Nous pourrons ainsi constater facilement si notre distribution générée correspond à celle initiale.

{% raw %}

get_fake_data[source]

get_fake_data(nb_echantillon_zero=1000, nb_echantillon=10000, var_name='var', set_some_var_to_zero=False, exponent=1.5, divider=15)

Génération d'un faux jeu de données.

{% endraw %} {% raw %}
{% endraw %} {% raw %}
sns.set(rc={"figure.figsize": (20, 8)})
df = get_fake_data(set_some_var_to_zero=True)
sns.scatterplot(data=df)
{% endraw %} {% raw %}

pandas_to_vaex[source]

pandas_to_vaex(df)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
rfrs_sorted = pandas_to_vaex(df)
{% endraw %}

Génération de calibration

Les calibrations sont des copules avec une seule tranche de RFR.

{% raw %}
une_tranche_rfr = prepare_tranche_rfr_vaex(rfrs_sorted, 1, debug=True)
une_tranche_rfr
{% endraw %} {% raw %}
%%time
variable = "revkire"
nb_bucket_var = 10
out = compute_copule_vaex(
    rfrs_sorted, variable, nb_bucket_var, une_tranche_rfr, debug=True
)
# out
{% endraw %} {% raw %}
out["copules"][0]["buckets"][5]
{% endraw %} {% raw %}
s = 0
for i in range(len(out["copules"][0]["buckets"])):
    s += out["copules"][0]["buckets"][i]["sum_tranche_var"]
assert s == rfrs_sorted[variable].sum()
{% endraw %} {% raw %}
del out
{% endraw %} {% raw %}

get_calib[source]

get_calib(vdf, variable, nb_bucket_var, nb_respect_secret_statistique=12)

::vdf:: Vaex DataFrame ::variable:: Column name to calibrate ::nb_bucket_var:: Number of bucket in wich to split the dataframe ::nb_respect_secret_statistique:: Minimal number of sample in a bucket

{% endraw %} {% raw %}
{% endraw %} {% raw %}
%%time
calib = get_calib(rfrs_sorted, variable, 100)
calib["buckets"][3]
{% endraw %}

Fusion de tranche

{% raw %}

bucket_merge_with_above[source]

bucket_merge_with_above(calib_in, id_rm:int)

This method merge two bucket together. ::calib:: The buckets list ::id_rm:: The index of the bucket to merge with the bucket above

{% endraw %} {% raw %}
{% endraw %}

Fusion automatique de tranches

{% raw %}

reduce_bucket_number[source]

reduce_bucket_number(calib, max_gap:int)

This method scans a bucket list and merges all buckets where ::calib:: The buckets list ::max_gap:: The ratio below which the bucket will be merged

{% endraw %} {% raw %}
{% endraw %}

Génération de copule

{% raw %}

get_copules[source]

get_copules(vdf, nb_bucket_rfr, variable, nb_bucket_var, nb_respect_secret_statistique=12)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
nb_bucket_rfr = 100
variable = "var"
copules = get_copules(rfrs_sorted, nb_bucket_rfr, variable, nb_bucket_var)
assert (
    copules["copules"][0]["nb_foyer"]["zero"]
    + copules["copules"][0]["nb_foyer"]["nonzero"]
    == 1100
)
{% endraw %} {% raw %}
rfrs_sorted
{% endraw %} {% raw %}
for cop in copules["copules"][-3:]:
    print(
        f"Nombre de personnes avec un VAR entre {cop['lower_bound']} et {cop['upper_bound']} : {cop['nb_foyer']}"
    )
    # assert 14 <= cop["nb_foyer"]["zero"] <= 28
{% endraw %} {% raw %}

compute_pop_copules[source]

compute_pop_copules(copules)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
assert compute_pop_copules(copules) == 11_000
{% endraw %}

Convertion copules JSON vers dataframe

{% raw %}

copules_to_df[source]

copules_to_df(copules)

{% endraw %} {% raw %}

calib_to_df[source]

calib_to_df(calib)

{% endraw %} {% raw %}
{% endraw %}

Affichage

{% raw %}
df_copules = copules_to_df(copules)
{% endraw %} {% raw %}
sns.scatterplot(data=df_copules, x=df_copules.index, y="lower_bound")
sns.scatterplot(data=df_copules, x=df_copules.index, y="mean_tranche_var")
{% endraw %}

On retrouve bien notre distribution initiale :

{% raw %}
ax = sns.scatterplot(data=df)
# copules
{% endraw %} {% raw %}
sns.scatterplot(data=df_copules, x=df_copules.index, y="ratio_nb_above_seuil")
{% endraw %}

TESTS

{% raw %}
nb_respect_secret_statistique = 12
{% endraw %}

Tests découpage de frontières

{% raw %}

get_ecart_frontiere[source]

get_ecart_frontiere(frontieres, nb_respect_secret_statistique=12)

{% endraw %} {% raw %}
{% endraw %}

Pas assez d'éléments

{% raw %}
nb_elements_a_decouper = nb_respect_secret_statistique - 1
nb_bucket = 3
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == []
{% endraw %}

Juste assez d'éléments

{% raw %}
nb_elements_a_decouper = nb_respect_secret_statistique
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12]
{% endraw %}

Pas assez d'éléments pour en faire deux

{% raw %}
nb_elements_a_decouper = nb_respect_secret_statistique + 1
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [13]
{% endraw %}

Juste assez d'éléments pour en faire deux

{% raw %}
nb_elements_a_decouper = nb_respect_secret_statistique * 2
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 24]
{% endraw %}

Pas assez d'éléments pour en faire trois

{% raw %}
nb_elements_a_decouper = 3 * nb_respect_secret_statistique - 1
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [17, 35]
{% endraw %}

Juste assez d'éléments pour en faire trois

{% raw %}
nb_elements_a_decouper = 3 * nb_respect_secret_statistique
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 24, 36]
{% endraw %}

Assez d'éléments pour en faire trois

{% raw %}
nb_elements_a_decouper = 3 * nb_respect_secret_statistique + 1
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 24, 37]
{% endraw %}

Pas assez d'éléments pour en faire 100

{% raw %}
nb_elements_a_decouper = 100
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=True)
assert frontieres == [12, 25, 37, 50, 62, 75, 87, 100]
{% endraw %}

Juste assez d'éléments pour en faire 100

{% raw %}
nb_elements_a_decouper = nb_respect_secret_statistique * 100
nb_bucket = 100
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == 100
assert get_ecart_frontiere(frontieres) != False
{% endraw %}

Assez d'éléments pour ajouter la tranche de 10%

{% raw %}
nb_bucket = 10
nb_elements_a_decouper = (nb_respect_secret_statistique * 10) * nb_bucket
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == nb_bucket + 1
assert get_ecart_frontiere(frontieres) != False
{% endraw %}

Assez d'éléments pour ajouter la tranche de 1%

{% raw %}
nb_bucket = 10
nb_elements_a_decouper = (nb_respect_secret_statistique * 100) * nb_bucket
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == nb_bucket + 2
assert get_ecart_frontiere(frontieres) != False
{% endraw %}

Assez d'éléments pour ajouter la tranche de 0.000001 (1 pour 1 million)

{% raw %}
nb_bucket = 10
nb_elements_a_decouper = (nb_respect_secret_statistique * 1_000_00) * nb_bucket
frontieres = get_frontieres(nb_elements_a_decouper, nb_bucket, debug=False)
assert len(frontieres) == nb_bucket + 5
assert get_ecart_frontiere(frontieres) != False
{% endraw %} {% raw %}
print(get_ecart_frontiere(frontieres)[-1])
print(nb_elements_a_decouper)
get_ecart_frontiere(frontieres)[-1] / nb_elements_a_decouper
{% endraw %} {% raw %}
print(f"{1e-6:2f}")
{% endraw %}

Test calcul des tranches de RFR

Test de tri du dataset

{% raw %}
test_dict = {"revkire": [0, 1, 2, 3]}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(vdf_test, 1)
tranche_rfr_small_test
{% endraw %} {% raw %}
test_dict = {"revkire": [0, 0, 0, 0]}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(vdf_test, 1)
{% endraw %} {% raw %}
test_dict = {"revkire": [0, 1, 0, 0]}
vdf_test = vaex.from_dict(test_dict)
with tc.assertRaises(DatasetNotSorted):
    prepare_tranche_rfr_vaex(vdf_test, 1, debug=True)
{% endraw %}

Tests des tranches retournées

{% raw %}
variable_small_test = "ma_var"
nb_bucket_rfr_small_test = 5
nb_bucket_var_small_test = 3
test_dict = {
    "revkire": [0 for i in range(500)] + [i + 1 for i in range(500)] + [500_000],
    variable_small_test: [0 for i in range(500)] + [i + 1 for i in range(500)] + [100],
}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(
    vdf_test, nb_bucket_rfr_small_test, debug=True
)
assert tranche_rfr_small_test["frontieres_ff"][-1] == vdf_test.count()
assert (
    len(tranche_rfr_small_test["frontieres_ff"]) == nb_bucket_rfr_small_test - 2 + 1
)  # +1 car on ajoute les derniers 10%
assert (
    len(tranche_rfr_small_test["frontieres_RFR"]) == nb_bucket_rfr_small_test - 1 + 1
)  # +1 car on ajoute les derniers 10%
assert tranche_rfr_small_test["frontieres_ff"] == [600, 800, 901, 1001]
{% endraw %} {% raw %}
vdf_test[["revkire"]][1000][0]
{% endraw %} {% raw %}
tranche_rfr_small_test
{% endraw %} {% raw %}
variable_small_test = "ma_var"
nb_bucket_rfr_small_test = 3
nb_bucket_var_small_test = 3
test_dict = {
    "revkire": [0 for i in range(5)] + [i + 1 for i in range(50)] + [500_000],
    variable_small_test: [0 for i in range(5)] + [i + 1 for i in range(50)] + [100],
}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(
    vdf_test, nb_bucket_rfr_small_test, debug=True
)
assert tranche_rfr_small_test["frontieres_ff"][-1] == vdf_test.count()
assert len(tranche_rfr_small_test["frontieres_ff"]) == nb_bucket_rfr_small_test
assert len(tranche_rfr_small_test["frontieres_RFR"]) == nb_bucket_rfr_small_test + 1
assert tranche_rfr_small_test["frontieres_ff"] == [18, 37, 56]
{% endraw %} {% raw %}
tranche_rfr_small_test
{% endraw %}

Test de vérification du tri

{% raw %}
variable_small_test = "ma_var"
nb_bucket_rfr_small_test = 3
nb_bucket_var_small_test = 3
var_1 = [random.randint(0, 100) for i in range(5 + 50)]
var_1.sort()
test_dict = {
    "revkire": var_1,
    variable_small_test: [random.randint(0, 100) for i in range(5 + 50)],
}
vdf_test = vaex.from_dict(test_dict)
tranche_rfr_small_test = prepare_tranche_rfr_vaex(
    vdf_test, nb_bucket_rfr_small_test, debug=True
)
assert tranche_rfr_small_test["frontieres_ff"][-1] == vdf_test.count()
assert len(tranche_rfr_small_test["frontieres_ff"]) == nb_bucket_rfr_small_test
assert len(tranche_rfr_small_test["frontieres_RFR"]) == nb_bucket_rfr_small_test + 1
{% endraw %}

Test de fusion de tranche

{% raw %}
calib = get_calib(rfrs_sorted, variable, 3)
# for b in calib["buckets"]:
#     print(b["seuil_var_inf"])
id_rm = 2
new_calib = bucket_merge_with_above(calib, id_rm)

calib["buckets"][id_rm]
calib["buckets"][id_rm + 1]

# for b in new_calib["buckets"]:
#     print(b["seuil_var_inf"])

tc.assertEqual(
    new_calib["buckets"][id_rm]["seuil_var_inf"],
    calib["buckets"][id_rm]["seuil_var_inf"],
)
tc.assertEqual(
    new_calib["buckets"][id_rm]["seuil_var_supp"],
    calib["buckets"][id_rm + 1]["seuil_var_supp"],
)
sum_pond = (
    calib["buckets"][id_rm]["mean_tranche_var"]
    * calib["buckets"][id_rm]["nombre_ff_tranche"]
    + calib["buckets"][id_rm + 1]["mean_tranche_var"]
    * calib["buckets"][id_rm + 1]["nombre_ff_tranche"]
)
sum_obs = (
    calib["buckets"][id_rm]["nombre_ff_tranche"]
    + calib["buckets"][id_rm + 1]["nombre_ff_tranche"]
)
tc.assertEqual(
    new_calib["buckets"][id_rm]["mean_tranche_var"],
    sum_pond / sum_obs,
)
{% endraw %}

Tests de réduction du nombre de tranche

{% raw %}
tc.assertEqual(len(new_calib["buckets"]), 5)
new_calib_reduce = reduce_bucket_number(new_calib, 0.8)
tc.assertEqual(len(new_calib_reduce["buckets"]), 5 - 1)
{% endraw %}

Tests de calcul des copules dans les tranches de RFR

{% raw %}
rfr = []
nb_foy = 16
for i in range(nb_foy):
    if i % 2:
        var = 5.0 if i <= nb_foy / 2 else 10.0
    else:
        var = 0.0
    un_rfr = {
        "revkire": i,
        "var": var,
    }
    rfr.append(un_rfr)
df = pd.DataFrame(rfr)
# df.describe()
{% endraw %} {% raw %}
vaex_df = pandas_to_vaex(df)

copules = get_copules(vaex_df, 1, "var", 2, nb_respect_secret_statistique=1)
assert len(copules["copules"]) == 1
assert len(copules["copules"][0]["buckets"]) == 3
tc.assertEqual(
    copules,
    {
        "controle": [],
        "copules": [
            {
                "lower_bound": 0,
                "upper_bound": 1000000000000000,
                "nb_foyer": {"zero": 8, "nonzero": 8},
                "buckets": [
                    {
                        "seuil_var_inf": 0,
                        "seuil_var_supp": 2.5,
                        "nombre_ff_tranche": 8,
                        "sum_tranche_var": 0,
                        "mean_tranche_var": 0,
                        "stdev_tranche_var": 0,
                        "nb_above_seuil": 8,
                        "sum_var_above_seuil": 60.0,
                        "ratio_nb_above_seuil": 0.5,
                        "mean_var_above_seuil": 7.5,
                    },
                    {
                        "seuil_var_inf": 2.5,
                        "seuil_var_supp": 7.5,
                        "nombre_ff_tranche": 4,
                        "sum_tranche_var": 20.0,
                        "mean_tranche_var": 5.0,
                        "stdev_tranche_var": 0.0,
                        "nb_above_seuil": 4,
                        "sum_var_above_seuil": 40.0,
                        "ratio_nb_above_seuil": 0.25,
                        "mean_var_above_seuil": 10.0,
                    },
                    {
                        "seuil_var_inf": 7.5,
                        "seuil_var_supp": 10.0,
                        "nombre_ff_tranche": 4,
                        "sum_tranche_var": 40.0,
                        "mean_tranche_var": 10.0,
                        "stdev_tranche_var": 0.0,
                        "nb_above_seuil": 0,
                        "sum_var_above_seuil": 0,
                        "ratio_nb_above_seuil": 0,
                        "mean_var_above_seuil": 0,
                    },
                ],
            }
        ],
    },
)
{% endraw %}

Test vérification du tri

{% raw %}
variable_values = [random.randint(1, 1000) for i in range(50)]

with tc.assertRaises(DatasetNotSorted):
    dis = DistribDeVarVaex(
        variable_values=variable_values,
        variable="variable",
        nb_ff=len(variable_values),
        nb_bucket_var=2,
        lower_bound=50,
        upper_bound=1e10,
        debug=False,
    )
{% endraw %}

Test deux buckets, sans 0

{% raw %}
variable_values = [1 for i in range(12)] + [1 for i in range(12)]
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_ff=len(variable_values),
    nb_bucket_var=2,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][2]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][1]["sum_tranche_var"] == sum(variable_values) / 2
assert res["buckets"][2]["sum_tranche_var"] == sum(variable_values) / 2
tc.assertEqual(res["buckets"][1]["stdev_tranche_var"], 0.0)
{% endraw %}

Test deux buckets, deux groupes identique, sans 0

{% raw %}
variable_values = [1 for i in range(12)] + [2 for i in range(12)]
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_ff=len(variable_values),
    nb_bucket_var=2,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][2]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][1]["sum_tranche_var"] == 12
assert res["buckets"][2]["sum_tranche_var"] == 24
{% endraw %}

Test deux buckets, valeurs différentes, sans 0

{% raw %}
variable_values = [1 for i in range(12)] + [i + 13 for i in range(12)]
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_ff=len(variable_values),
    nb_bucket_var=2,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][2]["nombre_ff_tranche"] == len(variable_values) / 2
assert res["buckets"][1]["sum_tranche_var"] == 12
assert res["buckets"][2]["sum_tranche_var"] == sum(i + 13 for i in range(12))
{% endraw %}

Test trois buckets, sans 0

{% raw %}
variable_values = (
    [1 for i in range(12)]
    + [i + 13 for i in range(12)]
    + [i * 10 for i in range(12, 12 + 12)]
)
# variable_values.sort()
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_ff=len(variable_values),
    nb_bucket_var=3,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == 12
assert res["buckets"][2]["nombre_ff_tranche"] == 12
assert res["buckets"][3]["nombre_ff_tranche"] == 12
assert res["buckets"][1]["sum_tranche_var"] == sum(1 for i in range(12))
assert res["buckets"][2]["sum_tranche_var"] == sum(i + 13 for i in range(12))
assert res["buckets"][3]["sum_tranche_var"] == sum(i * 10 for i in range(12, 12 + 12))
assert res["buckets"][1]["sum_var_above_seuil"] == sum(
    [i + 13 for i in range(12)] + [i * 10 for i in range(12, 12 + 12)]
)
assert res["buckets"][2]["sum_var_above_seuil"] == sum(
    i * 10 for i in range(12, 12 + 12)
)
assert res["buckets"][3]["sum_var_above_seuil"] == 0
{% endraw %}

Test trois buckets, plus un de 0

{% raw %}
variable_values = (
    [1 for i in range(12)]
    + [i + 13 for i in range(12)]
    + [30 + i * 10 for i in range(12)]
)
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_ff=len(variable_values) + 12,
    nb_bucket_var=3,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
# res
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 12
assert res["nb_foyer"]["nonzero"] == len(variable_values)
assert res["buckets"][0]["nombre_ff_tranche"] == 12
assert res["buckets"][1]["nombre_ff_tranche"] == 12
assert res["buckets"][2]["nombre_ff_tranche"] == 12
assert res["buckets"][3]["nombre_ff_tranche"] == 12
assert res["buckets"][1]["sum_tranche_var"] == sum(1 for i in range(12))
assert res["buckets"][2]["sum_tranche_var"] == sum(i + 13 for i in range(12))
assert res["buckets"][3]["sum_tranche_var"] == sum(30 + i * 10 for i in range(12))
assert res["buckets"][1]["sum_var_above_seuil"] == sum(
    [i + 13 for i in range(12)] + [30 + i * 10 for i in range(12)]
)
assert res["buckets"][2]["sum_var_above_seuil"] == sum(30 + i * 10 for i in range(12))
assert res["buckets"][3]["sum_var_above_seuil"] == 0
{% endraw %}

Test un seul bucket, sans 0

{% raw %}
variable_values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_ff=4,
    nb_bucket_var=1,
    lower_bound=50,
    upper_bound=1e10,
    debug=False,
)
res = dis.to_dict()
assert res["lower_bound"] == 50
assert res["nb_foyer"]["zero"] == 0
assert res["nb_foyer"]["nonzero"] == 12
assert res["buckets"][0]["nombre_ff_tranche"] == 0
assert res["buckets"][1]["nombre_ff_tranche"] == 12
assert res["buckets"][1]["sum_tranche_var"] == sum(variable_values)
{% endraw %}

Test bucket trop petit

{% raw %}
dis = DistribDeVarVaex(
    variable_values=[1, 2, 3, 4],
    variable="variable",
    nb_ff=4,
    nb_bucket_var=1,
    lower_bound=0,
    upper_bound=10 ^ 15,
    debug=True,
)
res = dis.to_dict()
assert res["buckets"] == "Error, moins de 12 éléments !"
{% endraw %}

Tres peu de données

{% raw %}
variable_values = [i + 1 for i in range(13)]
variable = "revkire"
nb_ff = 100
bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_ff=nb_ff,
    nb_bucket_var=4,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert result["buckets"][1]["nombre_ff_tranche"] == len(variable_values)
assert result["buckets"][1]["sum_tranche_var"] == sum(variable_values)
assert len(result["buckets"]) == 2
{% endraw %}

Nominal

{% raw %}
expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(20)]

variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_ff=nb_ff,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert result["buckets"][1]["nombre_ff_tranche"] == len(expected_1_bucket)
assert result["buckets"][2]["nombre_ff_tranche"] == len(expected_2_bucket)
assert result["buckets"][3]["nombre_ff_tranche"] == len(expected_3_bucket)
assert result["buckets"][4]["nombre_ff_tranche"] == len(expected_4_bucket)
assert result["buckets"][1]["sum_tranche_var"] == sum(expected_1_bucket)
assert result["buckets"][2]["sum_tranche_var"] == sum(expected_2_bucket)
assert result["buckets"][3]["sum_tranche_var"] == sum(expected_3_bucket)
assert result["buckets"][4]["sum_tranche_var"] == sum(expected_4_bucket)
{% endraw %}

Cas d'exactement 12 foyers

{% raw %}
expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(20)]
variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_ff=nb_ff,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert len(result["buckets"]) == 4 + 1
assert result["buckets"][0]["nombre_ff_tranche"] == 20
assert result["buckets"][1]["nombre_ff_tranche"] == len(expected_2_bucket)
assert result["buckets"][2]["nombre_ff_tranche"] == len(expected_3_bucket)
assert result["buckets"][3]["nombre_ff_tranche"] == len(expected_4_bucket)
assert result["buckets"][4]["nombre_ff_tranche"] == len(expected_4_bucket)

assert result["buckets"][1]["sum_tranche_var"] == sum(expected_1_bucket)
assert result["buckets"][2]["sum_tranche_var"] == sum(expected_2_bucket)
assert result["buckets"][3]["sum_tranche_var"] == sum(expected_3_bucket)
assert result["buckets"][4]["sum_tranche_var"] == sum(expected_4_bucket)
{% endraw %}

Cas où il manque des foyers

{% raw %}
expected_1_bucket = [i + 1 for i in range(12)]
expected_2_bucket = [(i + 20) * 2 for i in range(12)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(12)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(10)]

variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456
# print(f"{variable_values=} {len(variable_values)=}")
bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_ff=nb_ff,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert len(result["buckets"]) == 3 + 1
assert result["buckets"][1]["nombre_ff_tranche"] == 15
assert result["buckets"][2]["nombre_ff_tranche"] == 15
assert result["buckets"][3]["nombre_ff_tranche"] == 16
assert result["buckets"][0]["sum_tranche_var"] == 0
assert result["buckets"][1]["sum_tranche_var"] == 204
assert result["buckets"][2]["sum_tranche_var"] == 1251
assert result["buckets"][3]["sum_tranche_var"] == 3453
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(result["buckets"])):
    s += result["buckets"][i]["sum_tranche_var"]
assert s == sum(variable_values)
{% endraw %}

Cas où un foyer dépasse les autres, à la fin

{% raw %}
expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = [(i + 20 * 3) * 4 for i in range(19)] + [30000]  # 0.851

print(30_000 / sum(variable_values))

variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)

variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456

# print(f"{variable_values=} {len(variable_values)=}")

bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_ff=nb_ff,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=True,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
assert len(result["buckets"]) == 4  # Et non 5 à cause du secret statistique
# assert result["buckets"] == ["SECRET STATISTIQUE NON RESPECTE"]
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(result["buckets"])):
    s += result["buckets"][i]["sum_tranche_var"]
assert s == sum(variable_values)
{% endraw %}

Cas où un foyer dépasse les autres, au milieu

{% raw %}
expected_1_bucket = [i + 1 for i in range(20)]
expected_2_bucket = [(i + 20) * 2 for i in range(20)]
expected_3_bucket = [(i + 20 * 2) * 3 for i in range(20)]
expected_4_bucket = (
    [(i + 20 * 3) * 4 for i in range(9)]
    + [30000]
    + [(i + 20 * 3) * 4 for i in range(10)]
)  # 0.851

print(30_000 / sum(variable_values))

variable_values = (
    expected_1_bucket + expected_2_bucket + expected_3_bucket + expected_4_bucket
)
variable = "revkire"
nb_ff = 100
nb_bucket_var = 4
prev_seuil = 5648
seuil = 897456

# print(f"{variable_values=} {len(variable_values)=}")

bdr = DistribDeVarVaex(
    variable_values=variable_values,
    variable=variable,
    nb_ff=nb_ff,
    nb_bucket_var=nb_bucket_var,
    lower_bound=prev_seuil,
    upper_bound=seuil,
    debug=False,
)
result = bdr.to_dict()
# result
assert result["lower_bound"] == prev_seuil
assert result["upper_bound"] == seuil
assert result["nb_foyer"]["zero"] == nb_ff - len(variable_values)
assert result["nb_foyer"]["nonzero"] == len(variable_values)
# assert result["buckets"] == ["SECRET STATISTIQUE NON RESPECTE"]
# La somme des copules de la variable doit être égale à la somme de la variable
s = 0
for i in range(len(result["buckets"])):
    s += result["buckets"][i]["sum_tranche_var"]
assert s == sum(variable_values)
{% endraw %}

Vérification du calcul de la variance

{% raw %}
expected_2_bucket = [2, 2, 2, 2, 2, 2]
expected_3_bucket = [4, 4, 4, 6, 6, 6]
expected_4_bucket = [100, 7, 7, 4000, 2456.654, 6.4658]
variable_values = expected_2_bucket + expected_3_bucket + expected_4_bucket
variable_values.sort()
dis = DistribDeVarVaex(
    variable_values=variable_values,
    variable="variable",
    nb_ff=len(variable_values),
    nb_bucket_var=3,
    nb_respect_secret_statistique=1,
    debug=False,
)
res = dis.to_dict()
tc.assertEqual(
    res["buckets"][1]["stdev_tranche_var"], statistics.stdev(expected_2_bucket)
)
tc.assertEqual(
    res["buckets"][2]["stdev_tranche_var"], statistics.stdev(expected_3_bucket)
)
tc.assertEqual(
    res["buckets"][3]["stdev_tranche_var"], statistics.stdev(expected_4_bucket)
)
{% endraw %}

Tests de la préparation des tranches de variables à analyser

Tests avec peu de données

{% raw %}
%%time
out = compute_copule_vaex(
    vdf_test,
    variable_small_test,
    nb_bucket_var_small_test,
    tranche_rfr_small_test,
    debug=True,
)
{% endraw %} {% raw %}
len(out["copules"][-1]["buckets"])
{% endraw %} {% raw %}
assert len(out["copules"]) == len(tranche_rfr_small_test["frontieres_ff"])
assert len(out["copules"][-1]["buckets"]) == 1 + 1
{% endraw %} {% raw %}
s = 0
for i in range(len(out["copules"])):
    for j in range(len(out["copules"][i]["buckets"])):
        s += out["copules"][i]["buckets"][j]["sum_tranche_var"]
assert s == int(vdf_test.sum(f"{variable_small_test}"))
{% endraw %} {% raw %}
s = 0
for i in range(3):
    s += (
        out["copules"][i]["nb_foyer"]["zero"] + out["copules"][i]["nb_foyer"]["nonzero"]
    )
assert s == int(vdf_test.count(variable_small_test))
{% endraw %} {% raw %}
s = 0
for i in range(3):
    s += out["copules"][i]["buckets"][1]["nombre_ff_tranche"]
assert s == int(
    vdf_test.count(variable_small_test, selection=[vdf_test[variable_small_test] > 0])
)
{% endraw %}

Test avec beaucoup de petites valeurs

{% raw %}
variable_small_test = "ma_var"
nb_bucket_rfr_small_test2 = 10
nb_bucket_var_small_test2 = 3
test_dict = {
    "revkire": [0 for i in range(50)] + [i + 1 for i in range(110)] + [500_000],
    variable_small_test: [0 for i in range(50)] + [i + 1 for i in range(110)] + [100],
}
vdf_test2 = vaex.from_dict(test_dict)
tranche_rfr_small_test2 = prepare_tranche_rfr_vaex(
    vdf_test2, nb_bucket_rfr_small_test2, debug=True
)
tranche_rfr_small_test2
{% endraw %} {% raw %}
assert tranche_rfr_small_test2["frontieres_ff"][-1] == vdf_test2.count()
assert len(tranche_rfr_small_test2["frontieres_ff"]) == 7
assert tranche_rfr_small_test2["frontieres_ff"] == [64, 80, 96, 112, 128, 144, 161]
{% endraw %} {% raw %}
out = compute_copule_vaex(
    vdf=vdf_test2,
    variable=variable_small_test,
    nb_bucket_var=nb_bucket_var_small_test2,
    tranche_RFR=tranche_rfr_small_test2,
    debug=False,
)
{% endraw %} {% raw %}
 
{% endraw %} {% raw %}
s = 0
for i in range(len(out["copules"])):
    s += out["copules"][i]["buckets"][1]["sum_tranche_var"]
assert s == int(vdf_test2.sum(f"{variable_small_test}"))
{% endraw %} {% raw %}
s = 0
for i in range(len(out["copules"])):
    s += (
        out["copules"][i]["nb_foyer"]["zero"] + out["copules"][i]["nb_foyer"]["nonzero"]
    )
assert s == int(vdf_test2.count(variable_small_test))
{% endraw %} {% raw %}
s = 0
for i in range(len(out["copules"])):
    s += out["copules"][i]["buckets"][1]["nombre_ff_tranche"]
assert s == int(
    vdf_test2.count(variable_small_test, selection=[vdf_test[variable_small_test] > 0])
)
{% endraw %} {% raw %}
from nbdev.export import notebook2script

notebook2script()
{% endraw %}