Skip to content

Commit abfded6

Browse files
committed
Merge branch 'feature/145_siret_empty_names' into dev
2 parents b56b014 + 40f8010 commit abfded6

15 files changed

Lines changed: 268 additions & 86 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ logs
66
.venv
77
*.egg-info
88
*.parquet
9+
!tests/data/sirene/*.parquet
910
!code_officiel_geographique.parquet
1011
*.gz
1112
*.zip

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ testpaths = [
4141
]
4242
env = [
4343
"DATASETS_REFERENCE_FILEPATH=tests/data/source_datasets_test.json",
44+
"SIRENE_DATA_DIR=tests/data/sirene",
4445
"PREFECT_API_URL=",
4546
"DECP_PROCESSING_PUBLISH=",
4647
"DECP_USE_CACHE=false"

src/config.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,15 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
9898

9999

100100
SIRENE_DATA_PARENT_DIR = make_path_from_env("SIRENE_DATA_PARENT_DIR", DATA_DIR)
101-
SIRENE_DATA_DIR = make_sirene_data_dir(SIRENE_DATA_PARENT_DIR)
101+
102+
# SIRENE_DATA_DIR ne doit être spécifié que pour les tests. Laisser vide dans .env et laisser make_sirene_data_dir
103+
# le déterminer
104+
SIRENE_DATA_DIR = os.getenv(
105+
"SIRENE_DATA_DIR", make_sirene_data_dir(SIRENE_DATA_PARENT_DIR)
106+
)
107+
if isinstance(SIRENE_DATA_DIR, str):
108+
SIRENE_DATA_DIR = Path(os.path.join(BASE_DIR, SIRENE_DATA_DIR))
109+
102110
# SIRENE_DATA_DIR on ne le crée que si nécessaire, dans flows.py
103111
print(f"{'SIRENE_DATA_PARENT_DIR':<40}", SIRENE_DATA_PARENT_DIR)
104112
print(f"{'SIRENE_DATA_DIR':<40}", SIRENE_DATA_DIR)

src/flows/decp_processing.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ def decp_processing(enable_cache_removal: bool = True):
104104
# Preprocessing des données SIRENE si :
105105
# - le dossier n'existe pas encore (= les données n'ont pas déjà été preprocessed ce mois-ci)
106106
# - on est au moins le 5 du mois (pour être sûr que les données SIRENE ont été mises à jour sur data.gouv.fr)
107-
print(SIRENE_DATA_DIR)
108107
if not SIRENE_DATA_DIR.exists():
109108
sirene_preprocess()
110109

src/flows/sirene_preprocess.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
from src.config import SIRENE_DATA_DIR
55
from src.flows.get_cog import get_cog
6-
from src.tasks.get import get_etablissements
7-
from src.tasks.transform import get_prepare_unites_legales, prepare_etablissements
6+
from src.tasks.get import get_etablissements, get_unite_legales
7+
from src.tasks.transform import prepare_etablissements
88
from src.tasks.utils import create_sirene_data_dir
99

1010

@@ -26,7 +26,7 @@ def sirene_preprocess():
2626
processed_ul_parquet_path = SIRENE_DATA_DIR / "unites_legales.parquet"
2727
if not processed_ul_parquet_path.exists():
2828
print("Téléchargement et préparation des unités légales...")
29-
get_prepare_unites_legales(processed_ul_parquet_path)
29+
get_unite_legales(processed_ul_parquet_path)
3030
else:
3131
print(processed_ul_parquet_path, " existe, skipping.")
3232

@@ -35,7 +35,7 @@ def sirene_preprocess():
3535
if not processed_etab_parquet_path.exists():
3636
print("Téléchargement et préparation des établissements...")
3737
lf = get_etablissements()
38-
prepare_etablissements(lf, processed_etab_parquet_path)
38+
prepare_etablissements(lf).sink_parquet(processed_etab_parquet_path)
3939
else:
4040
print(processed_etab_parquet_path, " existe, skipping.")
4141

src/tasks/enrich.py

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import polars as pl
22
import polars.selectors as cs
3+
from polars_ds import haversine
34
from prefect import task
45

56
from src.config import SIRENE_DATA_DIR
@@ -20,6 +21,27 @@ def add_etablissement_data(
2021
lf_sirets = lf_sirets.join(
2122
lf_etablissements, how="inner", left_on=siret_column, right_on="siret"
2223
)
24+
25+
# On ne prend pas l'activité des acheteurs
26+
if type_siret == "acheteur":
27+
lf_sirets = lf_sirets.drop(cs.starts_with("activite_"))
28+
29+
# Si il y a un etablissement_nom (Enseigne1Etablissement ou denominationUsuelleEtablissement),
30+
# on l'ajoute au nom de l'organisme, entre parenthèses
31+
lf_sirets = lf_sirets.with_columns(
32+
pl.when(pl.col("etablissement_nom").is_not_null())
33+
.then(
34+
pl.concat_str(
35+
pl.col(f"{type_siret}_nom"),
36+
pl.lit(" ("),
37+
pl.col("etablissement_nom"),
38+
pl.lit(")"),
39+
)
40+
)
41+
.otherwise(pl.col(f"{type_siret}_nom"))
42+
.alias(f"{type_siret}_nom")
43+
).drop("etablissement_nom")
44+
2345
lf_sirets = lf_sirets.rename(
2446
{
2547
"latitude": f"{type_siret}_latitude",
@@ -59,6 +81,7 @@ def enrich_from_sirene(lf: pl.LazyFrame):
5981
# Récupération des données SIRET/SIREN préparées dans sirene-preprocess()
6082
lf_etablissements = pl.scan_parquet(SIRENE_DATA_DIR / "etablissements.parquet")
6183
lf_unites_legales = pl.scan_parquet(SIRENE_DATA_DIR / "unites_legales.parquet")
84+
6285
lf_base = lf.clone()
6386

6487
# DONNÉES SIRENE ACHETEURS
@@ -133,29 +156,16 @@ def enrich_from_sirene(lf: pl.LazyFrame):
133156

134157

135158
def calculate_distance(lf: pl.LazyFrame) -> pl.LazyFrame:
136-
# Implémentation native de la formule de Haversine
137-
# R = 6371 # Rayon de la Terre en km
138-
139-
# Conversion en radians
140-
lat1 = pl.col("acheteur_latitude").radians()
141-
lon1 = pl.col("acheteur_longitude").radians()
142-
lat2 = pl.col("titulaire_latitude").radians()
143-
lon2 = pl.col("titulaire_longitude").radians()
144-
145-
# Différences
146-
dlat = lat2 - lat1
147-
dlon = lon2 - lon1
148-
149-
# Formule de Haversine
150-
a = (dlat / 2).sin().pow(2) + lat1.cos() * lat2.cos() * (dlon / 2).sin().pow(2)
151-
c = 2 * a.sqrt().arcsin()
152-
153-
# Distance en km
154-
distance = 6371 * c
155-
159+
# Utilisation de polars_ds.haversine
160+
# https://polars-ds-extension.readthedocs.io/en/latest/num.html#polars_ds.exprs.num.haversine
156161
lf = lf.with_columns(
157-
distance.round(1).alias(
158-
"distance"
159-
) # Arrondi à 1 décimale comme avant (mode="half_away_from_zero" n'est pas dispo direct mais round standard est ok)
162+
haversine(
163+
pl.col("acheteur_latitude"),
164+
pl.col("acheteur_longitude"),
165+
pl.col("titulaire_latitude"),
166+
pl.col("titulaire_longitude"),
167+
)
168+
.round(mode="half_away_from_zero")
169+
.alias("distance")
160170
)
161171
return lf

src/tasks/get.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from prefect import task
1515
from prefect.transactions import transaction
1616

17+
from config import SIRENE_UNITES_LEGALES_URL
1718
from src.config import (
1819
DECP_PROCESSING_PUBLISH,
1920
DECP_USE_CACHE,
@@ -35,6 +36,7 @@
3536
gen_artifact_row,
3637
stream_replace_bytestring,
3738
)
39+
from tasks.transform import prepare_unites_legales
3840

3941

4042
@task(retries=3, retry_delay_seconds=3)
@@ -314,10 +316,10 @@ def yield_modifications(row: dict, separator="_") -> Iterator[dict] or None:
314316
raw_mods = raw_mods["modification"]
315317
# Couvre le (non-)format dans lequel "modifications" ou "modification" mène
316318
# directement à un dict contenant les métadonnées liées à une modification.
317-
if isinstance(raw_mods, dict):
319+
elif isinstance(raw_mods, dict):
318320
raw_mods = [raw_mods]
319-
320-
raw_mods = [] if raw_mods is None else raw_mods
321+
elif isinstance(raw_mods, str) or raw_mods is None:
322+
raw_mods = []
321323

322324
mods = [{}] + raw_mods
323325
for i, mod in enumerate(mods):
@@ -369,6 +371,8 @@ def get_etablissements() -> pl.LazyFrame:
369371
"longitude": pl.Float64,
370372
"activitePrincipaleEtablissement": pl.String,
371373
"nomenclatureActivitePrincipaleEtablissement": pl.String,
374+
"enseigne1Etablissement": pl.String,
375+
"denominationUsuelleEtablissement": pl.String,
372376
}
373377

374378
columns = list(schema.keys())
@@ -387,7 +391,7 @@ def get_etablissements() -> pl.LazyFrame:
387391
hrefs.append(base_url + href)
388392

389393
# Fonction de traitement pour un fichier
390-
def process_file(_href: str):
394+
def get_process_file(_href: str):
391395
print(_href.split("/")[-1])
392396
try:
393397
response = http_client.get(
@@ -403,18 +407,12 @@ def process_file(_href: str):
403407
content = response.content
404408
lff = pl.scan_csv(content, schema_overrides=schema)
405409
lff = lff.select(columns)
406-
lff = lff.with_columns(
407-
[
408-
pl.col("codeCommuneEtablissement").str.pad_start(5, "0"),
409-
pl.col("siret").str.pad_start(14, "0"),
410-
]
411-
)
412410
return lff
413411

414412
# Traitement en parrallèle avec 8 threads
415413
lfs = []
416414
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
417-
futures = [executor.submit(process_file, href) for href in hrefs]
415+
futures = [executor.submit(get_process_file, href) for href in hrefs]
418416
for future in concurrent.futures.as_completed(futures):
419417
try:
420418
lf = future.result()
@@ -474,3 +472,13 @@ def get_clean(
474472
# Le fichier parquet est déjà disponible pour ce checksum
475473
print(f"👍 Ressource déjà en cache : {resource['dataset_code']}")
476474
return parquet_path.with_suffix(".parquet")
475+
476+
477+
@task
478+
def get_unite_legales(processed_parquet_path):
479+
print("Téléchargement des données unité légales et sélection des colonnes...")
480+
(
481+
pl.scan_parquet(SIRENE_UNITES_LEGALES_URL)
482+
.pipe(prepare_unites_legales)
483+
.sink_parquet(processed_parquet_path)
484+
)

src/tasks/transform.py

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33

44
import polars as pl
55
import polars.selectors as cs
6-
from prefect import task
76

8-
from src.config import DATA_DIR, DIST_DIR, SIRENE_UNITES_LEGALES_URL, DecpFormat
7+
from src.config import DATA_DIR, DIST_DIR, DecpFormat
98
from src.tasks.output import save_to_files
109
from src.tasks.utils import check_parquet_file
1110

@@ -205,33 +204,82 @@ def extract_unique_titulaires_siret(lf: pl.LazyFrame):
205204
return lf
206205

207206

208-
@task
209-
def get_prepare_unites_legales(processed_parquet_path):
210-
print("Téléchargement des données unité légales et sélection des colonnes...")
211-
(
212-
pl.scan_parquet(SIRENE_UNITES_LEGALES_URL)
213-
.select(["siren", "denominationUniteLegale"])
214-
.filter(pl.col("siren").is_not_null())
215-
.filter(pl.col("denominationUniteLegale").is_not_null())
216-
.unique()
217-
.sink_parquet(processed_parquet_path, engine="streaming")
207+
def prepare_unites_legales(lf: pl.LazyFrame) -> pl.LazyFrame:
208+
return (
209+
lf.select(
210+
[
211+
"siren",
212+
"denominationUniteLegale",
213+
"prenomUsuelUniteLegale",
214+
"nomUniteLegale", # toujours rempli pour personnes physique
215+
"nomUsageUniteLegale", # parfois rempli, a la priorité sur nomUniteLegale
216+
"statutDiffusionUniteLegale", # P = non-diffusible
217+
]
218+
)
219+
.filter(
220+
pl.col("siren").is_not_null()
221+
) # utilisation du fichier Stock, normalement pas de siren null
222+
.unique() # utilisation du fichier Stock, normalement pas de doublons
223+
.with_columns(
224+
pl.when(pl.col("nomUsageUniteLegale").is_not_null())
225+
.then(pl.col("nomUsageUniteLegale"))
226+
.otherwise(pl.col("nomUniteLegale"))
227+
.alias("nomUniteLegale")
228+
)
229+
.with_columns(
230+
pl.when(pl.col("nomUniteLegale").is_not_null())
231+
.then(
232+
pl.concat_str(
233+
pl.col("prenomUsuelUniteLegale"),
234+
pl.col("nomUniteLegale"),
235+
separator=" ",
236+
)
237+
)
238+
.otherwise(pl.col("denominationUniteLegale"))
239+
.alias("denominationUniteLegale")
240+
)
241+
.with_columns(
242+
pl.when(pl.col("statutDiffusionUniteLegale") == "P")
243+
.then(pl.lit("[Données personnelles non-diffusibles]"))
244+
.otherwise(pl.col("denominationUniteLegale"))
245+
.alias("denominationUniteLegale")
246+
)
247+
.drop(
248+
[
249+
"prenomUsuelUniteLegale",
250+
"statutDiffusionUniteLegale",
251+
"nomUniteLegale",
252+
"nomUsageUniteLegale",
253+
]
254+
)
218255
)
219256

220257

221-
def prepare_etablissements(lf: pl.LazyFrame, processed_parquet_path: Path) -> None:
222-
lf = lf.rename(
258+
def prepare_etablissements(lff: pl.LazyFrame) -> pl.LazyFrame:
259+
lff = lff.with_columns(
260+
[
261+
pl.col("codeCommuneEtablissement").str.pad_start(5, "0"),
262+
pl.col("siret").str.pad_start(14, "0"),
263+
# Si enseigne1Etablissement est null, on utilise denominationUsuelleEtablissement
264+
pl.coalesce(
265+
"enseigne1Etablissement", "denominationUsuelleEtablissement"
266+
).alias("etablissement_nom"),
267+
]
268+
)
269+
lff = lff.drop("denominationUsuelleEtablissement", "enseigne1Etablissement")
270+
lff = lff.rename(
223271
{
224272
"codeCommuneEtablissement": "commune_code",
225273
"activitePrincipaleEtablissement": "activite_code",
226274
"nomenclatureActivitePrincipaleEtablissement": "activite_nomenclature",
227275
}
228276
)
229277

230-
# Ajout des noms de départements, noms régions,
278+
# Ajout des noms de commune, départements, régions
231279
lf_cog = pl.scan_parquet(DATA_DIR / "code_officiel_geographique.parquet")
232-
lf = lf.join(lf_cog, on="commune_code", how="left")
280+
lff = lff.join(lf_cog, on="commune_code", how="left")
233281

234-
lf.sink_parquet(processed_parquet_path, engine="streaming")
282+
return lff
235283

236284

237285
def sort_columns(lf: pl.LazyFrame, config_columns):

tests/data/decp_test_2019.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
{
1515
"typeIdentifiant": "SIRET",
1616
"denominationSociale": "AMC FOLLIOT",
17-
"id": "65265021900023"
17+
"id": "12345678900022"
1818
}
1919
]
2020
]
@@ -28,7 +28,7 @@
2828
{
2929
"typeIdentifiant": "SIRET",
3030
"denominationSociale": "AMC FOLLIOT",
31-
"id": "65265021900023"
31+
"id": "12345678900022"
3232
}
3333
],
3434
"id": "2019_83935401",
@@ -68,7 +68,7 @@
6868
"titulaires": [
6969
{
7070
"typeIdentifiant": "SIRET",
71-
"id": "34027049500021",
71+
"id": "12345678900023",
7272
"denominationSociale": "FFF"
7373
},
7474
{

tests/data/decp_test_2022.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
{
3131
"titulaire": {
3232
"typeIdentifiant": "SIRET",
33-
"id": "34027049500021"
33+
"id": "12345678900023"
3434
}
3535
},
3636
{
@@ -184,7 +184,7 @@
184184
{
185185
"titulaire": {
186186
"typeIdentifiant": "SIRET",
187-
"id": "58211867500054"
187+
"id": "12345678900022"
188188
}
189189
},
190190
{

0 commit comments

Comments
 (0)