--- title: CASD : Conversion de l'extraction SAS keywords: fastai sidebar: home_sidebar nb_path: "notebooks/extractions_base_des_impots/20_Convert_Merge_and_sort.ipynb" ---
{% raw %}
{% endraw %} {% raw %}
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
{% endraw %} {% raw %}
year = "2019"
year = "2018"
SAS_FILE = (
    "C:/Users/Public/Documents/TRAVAIL/csg/data_in/assiette_csg_pote_"
    + year
    + ".sas7bdat"
)
OUT_PARQUET_PATH = (
    "C:/Users/Public/Documents/TRAVAIL/csg/data_in/assiettes_csg" + year + ".parquet"
)
IN_PATH = "C:/Users/Public/Documents/TRAVAIL/csg/data_in/"
OUT_PATH = "C:/Users/Public/Documents/TRAVAIL/csg/data_out/"
ARROW_OUT = IN_PATH + "assiettes_pote_brutes_" + year + "-arrow/"
taille_chunk = 1_048_576
{% endraw %}

Conversion du fichier SAS

{% raw %}
from time import time

import pandas as pd
import vaex
from tqdm import tqdm
{% endraw %}

Lecture du fichier SAS

On va lire le fichier par morceau de 1 million de lignes, pour ne pas saturer la mémoire. Il y a 39 millions de lignes.

{% raw %}
%%time
# Temps sur CASD : moins de 3 minutes.
st = time()
taille = 1_048_576
dfi = pd.read_sas(SAS_FILE, chunksize=taille_chunk, iterator=True)

dd_values = None
dfs = []
i = 0

for chunk in tqdm(dfi):
    #### DEBUG
    #     i+=1
    #     if i>5:
    #         break
    #### DEBUG
    chunk = chunk.fillna(0)
    if dd_values is None:
        dd_values = vaex.from_pandas(chunk, copy_index=False)
    else:
        dd_values = dd_values.concat(vaex.from_pandas(chunk, copy_index=False))
{% endraw %}

Description des sources

Chômage perçu de l'étranger

Voir https://www.impots.gouv.fr/portail/files/formulaires/2041-gg/2021/2041-gg_3495.pdf

  • 8SW : chômage CSG à 6.2%
  • 8SX : chômage CSG à 3.8%

1AP (déclarant 1) et 1BP (déclarant 2): Autres revenus

  • Allocations chômage

Sont à déclarer toutes les sommes versées par « Pôle emploi » :

l'allocation d'aide au retour à l'emploi (ARE) ;
allocation temporaire d'attente (ATA) ;
l'allocation de solidarité spécifique (ASS) ;
l'allocation équivalent retraite (AER)...

L’allocation de retour à l’emploi formation (AREF) doit être déclarée avec les revenus d’activité cases 1AJ à 1DJ.

  • Allocations de préretraite

Sont à déclarer :

l'allocation perçue dans le cadre d'une convention de coopération du Fonds national de l'emploi (allocation spéciale FNE) ;
l'allocation de « préretraite progressive » ;
l'allocation de remplacement pour l'emploi (ARPE) ;
l'allocation mensuelle versée dans le cadre des dispositifs de cessation d'activité de certains travailleurs salariés (« CATS ») ;
l'allocation de préretraite amiante ;
l'allocation versée dans le cadre d'un dispositif de préretraite d'entreprises (« préretraite maison »).

  • Rémunération des membres du Gouvernement, du Conseil économique, social et environnemental et du Conseil constitutionnel.

  • Indemnités parlementaires (de base et de résidence), y compris pour les députés européens.

  • Indemnités de fonction des élus locaux en cas d'option pour le régime d'imposition des traitements et salaires.

{% raw %}
assert dd_values.get_column_names() == [
    "rnsgbd",
    "rnsgld",
    "revkire",
    "Z1aj",
    "Z1ap",
    "Z1as",
    "Z1bj",
    "Z1bp",
    "Z1bs",
    "Z1cj",
    "Z1cw",
    "Z1dw",
    "Z2ch",
    "Z2dc",
    "Z2dh",
    "Z2tr",
    "Z3ua",
    "Z3vg",
    "Z3vz",
    "Z4ba",
    "Z4bb",
    "Z4bc",
    "Z4bd",
    "Z4be",
    "Z6de",
    "Z8sc",
    "Z8sw",
    "Z8sx",
    "CICS",
    "MNIMQG",
]
{% endraw %} {% raw %}
def sum_columns(df):

    # Pour assiette de CSG
    df["revenus_capitaux_prelevement_bareme"] = df.Z2ch
    df["revenus_capitaux_prelevement_liberatoire"] = df.Z2dh
    df["revenus_capitaux_prelevement_forfaitaire_unique_ir"] = df.Z2dc + df.Z2tr

    df["rente_viagere_titre_onereux_net"] = df.Z1cw + df.Z1dw
    # Micro-foncier : on applique l'abattement de 30%
    df["revenu_categoriel_foncier"] = (
        df.Z4ba + (df.Z4be * 0.7) - (df.Z4bb + df.Z4bc + df.Z4bd)
    )
    # df['rev_categ_foncier4ba']=df.Z4ba
    df["assiette_csg_plus_values"] = df.Z3vg + df.Z3ua + df.Z3vz
    df["assiette_csg_revenus_capital"] = (
        df["revenus_capitaux_prelevement_bareme"]
        + df["revenus_capitaux_prelevement_liberatoire"]
        + df["revenus_capitaux_prelevement_forfaitaire_unique_ir"]
        + df["rente_viagere_titre_onereux_net"]
        + df["revenu_categoriel_foncier"]
        + df["assiette_csg_plus_values"]
    )

    # Autres
    df["retraites"] = df.Z1as + df.Z1bs + df.MNIMQG
    df[
        "pre_retraites_etranger"
    ] = df.Z8sc  # Attention, seulement de l'étranger, sinon c'est df.Z1ap + df.Z1bp

    df["chomage_et_indemnites"] = df.Z8sw + df.Z8sx + df.Z1ap + df.Z1bp
    df["rev_salaire"] = df.Z1aj + df.Z1bj + df.Z1cj

    # Il faudrait aussi interets_pel_moins_12_ans_cel, mais où le trouver ?
    return df
{% endraw %} {% raw %}
dd_values = sum_columns(dd_values)
{% endraw %}

Calcul d'agregats

{% raw %}
def compute_agg(vdf):
    sub_total = []
    ldf = vdf.shape[0]
    for col in tqdm(vdf.get_column_names()):
        name = f"{col}_non_zero"
        vdf.select(f"{col} > 0", name=name)
        lenzero = ldf - int(vdf.count("*", selection=name))
        dict_col = {
            "name": col,
            "nb_line": ldf,
            "lenzero": lenzero,
            "sum": int(vdf.sum(col)),
            "mean": int(vdf.mean(col)),
            "pct_zero": lenzero / ldf * 100,
        }
        sub_total.append(dict_col)
    return pd.DataFrame(sub_total)
{% endraw %} {% raw %}
%%time
# Temps sur CASD pour 41 colonnes : 2min 10s
df = compute_agg(dd_values)
{% endraw %} {% raw %}
(579282 + 330413) / (28041960914 + 6726148004) * 100
{% endraw %} {% raw %}
pd.set_option("display.float_format", "{:,}".format)
# Export dans un fichier
df.to_csv(OUT_PATH + "/agregats_des_variables_csg-POTE_" + year + ".csv", index=False)
df
{% endraw %} {% raw %}
%%time
if year == "2019":
    assert dd_values.rev_salaire.sum() == 650_855_163_531
if year == "2018":
    assert dd_values.rev_salaire.sum() == 649_078_215_971
{% endraw %} {% raw %}
col_to_keep = [
    "revkire",
    "revenus_capitaux_prelevement_bareme",
    "revenus_capitaux_prelevement_liberatoire",
    "revenus_capitaux_prelevement_forfaitaire_unique_ir",
    "rente_viagere_titre_onereux_net",
    "revenu_categoriel_foncier",
    "assiette_csg_plus_values",
    "assiette_csg_revenus_capital",
    "retraites",
    "pre_retraites_etranger",
    "chomage_et_indemnites",
    "rev_salaire",
]
{% endraw %} {% raw %}
vdf = dd_values[col_to_keep]
# vdf
{% endraw %} {% raw %}
assert vdf.get_column_names() == [
    "revkire",
    "revenus_capitaux_prelevement_bareme",
    "revenus_capitaux_prelevement_liberatoire",
    "revenus_capitaux_prelevement_forfaitaire_unique_ir",
    "rente_viagere_titre_onereux_net",
    "revenu_categoriel_foncier",
    "assiette_csg_plus_values",
    "assiette_csg_revenus_capital",
    "retraites",
    "pre_retraites_etranger",
    "chomage_et_indemnites",
    "rev_salaire",
]
{% endraw %}

Enregistrement en Arrow

{% raw %}
%%time
# Temps sur CASD : 1min 31s
from pathlib import Path

Path(ARROW_OUT).mkdir(parents=True, exist_ok=True)
# rfrs_sorted.export_arrow(ARROW_OUT)
vdf.export_many(
    ARROW_OUT + "assiettes_pote_brutes_" + year + "_chunk-{i:02}.arrow",
    chunk_size=taille_chunk,
)
{% endraw %} {% raw %}
del vdf
del dd_values
del chunk
{% endraw %}

Test de lecture

{% raw %}
%%time
vdf = vaex.open(ARROW_OUT + "assiettes_pote_brutes_" + year + "_chunk-*.arrow")
vdf.columns
{% endraw %}

Tri par RFR

{% raw %}
%%time
# Temps sur CASD : 15s mais c'est du lazy : ce n'est pas fait.
vdf_sorted = vdf.sort(by="revkire")
{% endraw %}

Enregistrement en Arrow

{% raw %}
%%time
# Temps sur CASD : 3min 44s
from pathlib import Path

Path(ARROW_OUT).mkdir(parents=True, exist_ok=True)
# rfrs_sorted.export_arrow(ARROW_OUT)
vdf_sorted.export_many(
    ARROW_OUT + "assiettes_pote_sorted_" + year + "_chunk-{i:02}.arrow",
    chunk_size=taille_chunk,
)
{% endraw %}

Vérifications

{% raw %}
%%time
nbff = vdf_sorted.shape[0]
print(f"Nombre de lignes = Nombre de foyer fiscaux : {nbff:,}")
if year == "fake":
    assert nbff == 39_000_000
if year == "2018":
    assert nbff == 38_487_937
if year == "2019":
    assert nbff == 39_264_696
# On vérifie le tri : 1 m
assert vdf_sorted[0][0] == vdf_sorted.min("revkire")
assert vdf_sorted[-1][0] == vdf_sorted.max("revkire")
assert vdf_sorted[nbff - 9][0] > vdf_sorted[nbff - 10][0]
assert vdf_sorted[nbff - 10][0] > vdf_sorted[nbff - 11][0]
{% endraw %} {% raw %}
del vdf_sorted
import gc

gc.collect()
{% endraw %}

Enregistrement en HDF

{% raw %}
# from pathlib import Path
# Path(OUT_PATH+'/assiettes_pote2019').mkdir(parents=True, exist_ok=True)

# files = dd_values.to_hdf(OUT_PATH+'/assiettes_pote2019.*.hdf', '/pote2019')
{% endraw %}

Enregistrement en parquet

{% raw %}
# Path(OUT_PARQUET).mkdir(parents=True, exist_ok=True)
{% endraw %} {% raw %}
# files = dd_values.export_parquet(OUT_PARQUET_PATH)
{% endraw %} {% raw %}
if year == "2019":
    assert rfrs.shape[0] == 39264696
if year == "2018":
    assert rfrs.shape[0] == 38487937
{% endraw %} {% raw %}
# input_directory = PATH+'extraction_assiettes_csg/extract-pote.*.hdf'
# rfrs = dd.read_hdf(input_directory, '/pote2019')
# rfrs.columns
{% endraw %} {% raw %}
 
{% endraw %}