--- title: Injection de données à l'aide de Monte-Carlo keywords: fastai sidebar: home_sidebar nb_path: "notebooks/retraitement_erfs-fpr/modules/copules_01_algo_monte_carlo.ipynb" ---
import json
import math
from typing import List
# Import util seuleemnt au notebook
import seaborn as sns
from leximpact_prepare_data.calib_and_copules import (
get_calib,
get_copules,
get_fake_data,
pandas_to_vaex,
)
from leximpact_prepare_data.toolbase import create_simulation
sns.set(rc={"figure.figsize": (20, 8)})
Nous voulons affecter des nouveau revenus (nommé par le terme générique de variable) à notre jeux de données. Pour que ces revenus soient réalistes nous allons utiliser les statistiques extraites de POTE pour affecter des revenus en utilisant la méthode de Monte-Carlo. La méthode Monte-Carlo, désigne une famille de méthodes algorithmiques visant à calculer une valeur numérique approchée en utilisant des procédés aléatoires, c'est-à-dire des techniques probabilistes. Le nom de ces méthodes, qui fait allusion aux jeux de hasard pratiqués au casino de Monte-Carlo, a été inventé en 1947. Voir https://fr.wikipedia.org/wiki/M%C3%A9thode_de_Monte-Carlo
Description de l'algorithme :
Cette algorithme reposant sur l'aléatoire et notre échantillon étant petit il va falloir l'exécuter de nombreuses fois puis conserver la moyenne de tous ces essais pour approcher un résultat réaliste.
ATTENTION : comme on a une bascule entre 0 ou un nombre, la moyenne n'est pas forcément la bonne solution.
rfr = []
nb_foy = 16
for i in range(nb_foy):
# Probabilité de 0 : 0.5
if i % 2:
var = 5.0 if i <= nb_foy / 2 else 10.0
else:
var = 0.0
un_rfr = {
"rfr": i,
"var": var,
}
rfr.append(un_rfr)
df = pd.DataFrame(rfr)
df.describe()
df
vaex_df = pandas_to_vaex(df)
_ = vaex_df.rename("rfr", "revkire")
tiny_buckets = get_copules(vaex_df, 1, "var", 2, nb_respect_secret_statistique=1)
tiny_buckets["copules"][0]["buckets"][1]
monte_carlo_simulation(tiny_buckets["copules"][0], debug=True)
df["monte_carlo_fake_var"] = df["rfr"].apply(
lambda rfr: monte_carlo_simulation(tiny_buckets["copules"][0], debug=False)
)
df.describe()
On constate bien ci-dessus qu'avec uniquement 16 tirages aléatoires, il est difficile de trouver le bon résultat.
Il va donc falloir faire plusieurs tirages.
On pourrait imaginer faire la moyenne des tirages : mais ce n'est pas si simple car on lisserait toutes les données alors qu'il est intéressant de conserver des écarts. Surtout que l'on insert déjà une moyenne.
On va donc déterminer ce qu'est un meilleur tirage et ainsi conserver le meilleur.
La somme ou la moyenne ne sont pas une bonne mesure d'erreur. Car cela peut cacher des grosses disparités, or notre but est d'avoir une distribution la plus similaire possible à l'originale. Cependant on doit se contenter de ce que l'on peut extraire de la base POTE.
errors = []
min_error = 1e9
nb_tirage = 50
for i in tqdm(range(nb_tirage)):
df["monte_carlo_fake_var_" + str(i)] = df["rfr"].apply(
lambda rfr: monte_carlo_simulation(tiny_buckets["copules"][0], debug=False)
)
error = (df["var"].mean() - df["monte_carlo_fake_var_" + str(i)].mean()) ** 2
if error < min_error:
min_error = error
df["monte_carlo_fake_var"] = df["monte_carlo_fake_var_" + str(i)]
errors.append(error)
rmse = math.sqrt(sum(errors) / nb_tirage)
print(f"Erreur quadratic moyenne {rmse}, erreur minimale : {min_error} ")
df = df.copy() # Avoid fragmenting error
print(
f'Nb var à 0 : {df[df["var"].values == 0.0]["var"].count()} Nb var à 5 : {df[df["var"].values == 5.0]["var"].count()} Nb var à 10 : {df[df["var"].values == 10.0]["var"].count()}'
)
print(
f'Nb MC var à 0 : {df[df["monte_carlo_fake_var"].values == 0.0]["monte_carlo_fake_var"].count()} Nb MC var à 5 : {df[df["monte_carlo_fake_var"].values == 5.0]["monte_carlo_fake_var"].count()} Nb MC var à 10 : {df[df["monte_carlo_fake_var"].values == 10.0]["monte_carlo_fake_var"].count()}'
)
Avec 10 tirages seulement on augmente fortement les chances de tomber juste. Avec 50 on doit approcher les 100% de réussite pour notre cas.
Cette problématique d'estimation du nombre de tirage suffisant fait l'objet de recherches : https://apps.dtic.mil/dtic/tr/fulltext/u2/a621501.pdf
Il y a aussi d'autres pistes : https://fr.wikipedia.org/wiki/%C3%89chantillonnage_pr%C3%A9f%C3%A9rentiel
df["var"].mean()
df["monte_carlo_fake_var"].mean()
df.describe()
df["var"].hist()
df.monte_carlo_fake_var.hist(alpha=0.8)
On voit ici en quoi la moyenne n'est pas une bonne métrique : malgrès une erreur de 0.00% on voit bien que la distribution n'est pas la même.
Nous allons pourtant conserver se fonctionnement car nous allons l'appliquer à des sous-parties des données.
Ce faux jeux de données va nous permettre de tester notre solution sur un problème simplifié.
On va considérer que le RFR croit linéairement dans une population de 10 000 foyers. Et une variable va évoluer en fonction du RFR. Tout en pouvant être à zéro.
Nous pourrons ainsi constater facilement si notre distribution générée correspond à celle initiale.
df = get_fake_data(var_name="fake_var", set_some_var_to_zero=True)
df.plot(alpha=0.9)
print(f"Sum revkire : {df.revkire.sum():,}€ Sum rfr : {df.fake_var.sum():,}€")
vaex_df = pandas_to_vaex(df)
copules = get_copules(vaex_df, 5, "fake_var", 5)
bucket = copules["copules"][3]
seuil_inf = bucket["lower_bound"]
seuil_supp = bucket["upper_bound"]
df_bucket = df.query("@seuil_inf <= revkire < @seuil_supp").copy()
serie = df_bucket["revkire"].apply(lambda rfr: monte_carlo_simulation(bucket))
df_tmp = pd.DataFrame({"mc_var": serie})
bucket["buckets"][-2]
# df_copules.tail(10)
On ne peut pas appliquer la même méthode que précédemment.
Car même avec 1 000 tirages on n'arrive pas à avoir de bons résultats.
Essayons de faire les tirages à l'intérieur de chaque bucket de RFR.
Problème : POTE est beaucoup plus grand que l'ERFS, on ne peut donc pas reconstituer des copules de l'ERFS pour les comparers aux copules de POTE. Comment évaluer POTE vis à vis de l'ERFS ? Par la moyenne et la variance sur la tranche de RFR ? C'est très limitatif mais il va falloir faire avec.
mean_absolute_percentage_error([1], [1])
mean_absolute_percentage_error([769581, 0.001], [769582, 0.002])
tc.assertEqual(
get_calib(pandas_to_vaex(df_bucket), "fake_var", 5)["buckets"], bucket["buckets"]
)
tc.assertAlmostEqual(
eval_error(bucket["buckets"], df_bucket[["fake_var"]], debug=True), 0.0, places=3
)
tc.assertAlmostEqual(eval_error(bucket["buckets"], df_tmp, debug=True), 0.0, places=2)
df_test = pd.DataFrame(
[0, 1, 2, 3, 4, 10, 20, 30, 40, 50, 60], columns=["monte_carlo_fake_var"]
)
test_bucket = [
{"seuil_var_inf": 1, "seuil_var_supp": 10, "mean_tranche_var": 20},
{"seuil_var_inf": 10, "seuil_var_supp": 40, "mean_tranche_var": 15},
{"seuil_var_inf": 40, "seuil_var_supp": 80, "mean_tranche_var": 25},
]
df_result = force_mean_with_factor(
test_bucket, df_test, col_name="monte_carlo_fake_var", debug=True
)
tc.assertAlmostEqual(df_result["monte_carlo_fake_var"].mean(), 18.18, places=2)
df, errors = apply_Monte_Carlo(df, copules, debug=True)
print(f"Erreur minimale : {min(errors):.4f} Erreur maximale : {max(errors):.4f}")
# df = df.copy() # Avoid fragmenting error
print(
f'Nb var à 0 : {df[df["fake_var"].values == 0.0]["fake_var"].count()} Nb var inférieures à la médianne : {df[df["fake_var"].values <= df.fake_var.quantile(0.5)]["fake_var"].count()} Nb var supérieures à la médianne : {df[df["fake_var"].values > df.fake_var.quantile(0.5)]["fake_var"].count()}'
)
print(
f'Nb MC var à 0 : {df[df["monte_carlo_fake_var"].values == 0.0]["monte_carlo_fake_var"].count()} Nb MC var inférieures à la médianne : {df[df["monte_carlo_fake_var"].values <= df.fake_var.quantile(0.5)]["monte_carlo_fake_var"].count()} Nb MC var supérieures à la médianne de VAR : {df[df["monte_carlo_fake_var"].values > df.fake_var.quantile(0.5)]["monte_carlo_fake_var"].count()}'
)
print(
f"Somme fake_var : {df.fake_var.sum():,}, somme monte_carlo_fake_var : {df.monte_carlo_fake_var.sum():,}. Ecart {abs((df.monte_carlo_fake_var.sum()-df.fake_var.sum())*100)/df.fake_var.sum():.2f}%"
)
bins = 5
df.fake_var.hist(bins=bins)
df.monte_carlo_fake_var.hist(bins=bins, alpha=0.8)
bins = 100
df.fake_var.hist(bins=bins)
df.monte_carlo_fake_var.hist(bins=bins, alpha=0.8)
On constate des trous dans la distribution : c'est normal car dans chaque groupe, on affecte la valeur moyenne.
Comme on peut le constater sur le graphique suivant :
sns.scatterplot(data=df, x=df.index, y="monte_carlo_fake_var")
sns.scatterplot(data=df, x=df.index, y="fake_var")
En orange nous avons les "vrais" valeurs que nous avons générées au début. En bleu les valeurs affectée en utilisant les copules calculés à partir des données oranges.
df.describe()
df, errors = apply_Monte_Carlo(
df, copules, use_force_mean_with_factor=True, debug=False
)
print(f"Erreur minimale : {min(errors):.4f} Erreur maximale : {max(errors):.4f}")
# df = df.copy() # Avoid fragmenting error
print(
f'Nb var à 0 : {df[df["fake_var"].values == 0.0]["fake_var"].count()} Nb var inférieures à la médianne : {df[df["fake_var"].values <= df.fake_var.quantile(0.5)]["fake_var"].count()} Nb var supérieures à la médianne : {df[df["fake_var"].values > df.fake_var.quantile(0.5)]["fake_var"].count()}'
)
print(
f'Nb MC var à 0 : {df[df["monte_carlo_fake_var"].values == 0.0]["monte_carlo_fake_var"].count()} Nb MC var inférieures à la médianne : {df[df["monte_carlo_fake_var"].values <= df.fake_var.quantile(0.5)]["monte_carlo_fake_var"].count()} Nb MC var supérieures à la médianne de VAR : {df[df["monte_carlo_fake_var"].values > df.fake_var.quantile(0.5)]["monte_carlo_fake_var"].count()}'
)
print(
f"Somme fake_var : {df.fake_var.sum():,}, somme monte_carlo_fake_var : {df.monte_carlo_fake_var.sum():,}. Ecart {abs((df.monte_carlo_fake_var.sum()-df.fake_var.sum())*100)/df.fake_var.sum():.2f}%"
)
Il est temps de confronter notre modèle aux données de l'INSEE et de POTE.
from nbdev.export import notebook2script
notebook2script(fname="./copules_01_algo_monte_carlo.ipynb")