From 6121b282a33ca64fe42de46e6020a212096b8c6e Mon Sep 17 00:00:00 2001 From: RV Date: Thu, 13 Mar 2025 22:50:07 +0100 Subject: [PATCH 1/6] alembic: create columns and init dim_vessel & dim_zone --- ..._scd_implementation_for_dim_vessel_dim_.py | 68 +++++++++++++++++++ backend/bloom/config.py | 19 +++++- backend/bloom/infra/database/sql_model.py | 9 +++ 3 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 backend/alembic/versions/c02d2b79eab2_scd_implementation_for_dim_vessel_dim_.py diff --git a/backend/alembic/versions/c02d2b79eab2_scd_implementation_for_dim_vessel_dim_.py b/backend/alembic/versions/c02d2b79eab2_scd_implementation_for_dim_vessel_dim_.py new file mode 100644 index 00000000..b4dfdb35 --- /dev/null +++ b/backend/alembic/versions/c02d2b79eab2_scd_implementation_for_dim_vessel_dim_.py @@ -0,0 +1,68 @@ +"""SCD implementation for dim_vessel/dim_zone + +Revision ID: c02d2b79eab2 +Revises: 5801cb8f1af5 +Create Date: 2025-03-13 21:04:11.148024 + +""" +from alembic import op +import sqlalchemy as sa +from bloom.config import settings + + +# revision identifiers, used by Alembic. +revision = 'c02d2b79eab2' +down_revision = '5801cb8f1af5' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Create SCD columns for dim_vessel and dim_zone with existing data (nullable) + op.add_column("dim_vessel",sa.Column("scd_start",sa.DateTime(timezone=True),nullable=True)) + op.add_column("dim_vessel",sa.Column("scd_end",sa.DateTime(timezone=True),nullable=True)) + op.add_column("dim_vessel",sa.Column("scd_active",sa.Boolean,nullable=True)) + op.add_column("dim_vessel",sa.Column("key",sa.String,nullable=True)) + + op.add_column("dim_zone",sa.Column("scd_start",sa.DateTime(timezone=True),nullable=True)) + op.add_column("dim_zone",sa.Column("scd_end",sa.DateTime(timezone=True),nullable=True)) + op.add_column("dim_zone",sa.Column("scd_active",sa.Boolean,nullable=True)) + op.add_column("dim_zone",sa.Column("key",sa.String,nullable=True)) + + # Initialize scd_column values + op.execute( (f"update dim_vessel set scd_start = '{settings.scd_past_limit.isoformat()}'," + f"scd_end = '{settings.scd_future_limit.isoformat()}'," + f"scd_active = true," + f"key = (CASE WHEN imo IS NOT NULL THEN imo::varchar(255) ELSE cfr END)" + )) + + op.execute( (f"update dim_zone set scd_start = '{settings.scd_past_limit.isoformat()}'," + f"scd_end = '{settings.scd_future_limit.isoformat()}'," + f"scd_active = true," + f"key = COALESCE(category,'')||'/'||COALESCE(sub_category,'')||'/'||name" + )) + + # Set scd columns not nullable after init + op.alter_column("dim_vessel","scd_start",nullable=False) + op.alter_column("dim_vessel","scd_end",nullable=False) + op.alter_column("dim_vessel","scd_active",nullable=False) + op.alter_column("dim_vessel","key",nullable=False) + + op.alter_column("dim_zone","scd_start",nullable=False) + op.alter_column("dim_zone","scd_end",nullable=False) + op.alter_column("dim_zone","scd_active",nullable=False) + op.alter_column("dim_zone","key",nullable=False) + pass + + +def downgrade() -> None: + op.drop_column("dim_zone","scd_start") + op.drop_column("dim_zone","scd_end") + op.drop_column("dim_zone","scd_active") + op.drop_column("dim_zone","key") + + op.drop_column("dim_vessel","scd_start") + op.drop_column("dim_vessel","scd_end") + op.drop_column("dim_vessel","scd_active") + op.drop_column("dim_vessel","key") + pass diff --git a/backend/bloom/config.py b/backend/bloom/config.py index 5ec6cdf5..962b8261 100644 --- a/backend/bloom/config.py +++ b/backend/bloom/config.py @@ -1,6 +1,6 @@ import os from pathlib import Path -from datetime import timedelta +from datetime import datetime,timedelta, timezone from pydantic_settings import BaseSettings, SettingsConfigDict from typing import Any @@ -59,6 +59,23 @@ class Settings(BaseSettings): ) api_key:str = Field(min_length=4,default='bloom') + scd_past_limit: datetime = Field(default=datetime(year=1900, + month=1, + day=1, + hour=0, + minute=0, + second=0, + microsecond=0, + tzinfo=timezone.utc)) + scd_future_limit: datetime = Field(default=datetime(year=2999, + month=12, + day=31, + hour=23, + minute=59, + second=59, + microsecond=999999, + tzinfo=timezone.utc)) + @model_validator(mode='after') def update_db_url(self)->dict: new_url= f"postgresql://{self.postgres_user}:"\ diff --git a/backend/bloom/infra/database/sql_model.py b/backend/bloom/infra/database/sql_model.py index 42896ab7..54ccde14 100644 --- a/backend/bloom/infra/database/sql_model.py +++ b/backend/bloom/infra/database/sql_model.py @@ -20,6 +20,7 @@ from datetime import timedelta + class Vessel(Base): __tablename__ = "dim_vessel" id = Column("id", Integer, primary_key=True) @@ -43,6 +44,10 @@ class Vessel(Base): "created_at", DateTime(timezone=True), nullable=False, server_default=func.now(), ) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + scd_start = Column("scd_start",DateTime(timezone=True)) + scd_end = Column("scd_end",DateTime(timezone=True)) + scd_active = Column("scd_active",Boolean) + key = Column("key",String) class Alert(Base): @@ -117,6 +122,10 @@ class Zone(Base): json_data = Column("json_data", JSONB) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) enable = Column("enable",Boolean(), server_default="True") + scd_start = Column("scd_start",DateTime(timezone=True)) + scd_end = Column("scd_end",DateTime(timezone=True)) + scd_active = Column("scd_active",Boolean) + key = Column("key",String) class WhiteZone(Base): From 5928cab02c1508814cbf6529d17aef2c1f625734 Mon Sep 17 00:00:00 2001 From: RV Date: Sun, 16 Mar 2025 21:43:45 +0100 Subject: [PATCH 2/6] feat: dim_vessel add scd filters to vessel repository: +scd_date +scd_enable --- backend/bloom/domain/vessel.py | 4 + backend/bloom/domain/zone.py | 8 ++ backend/bloom/infra/database/sql_model.py | 3 +- .../repositories/repository_spire_ais_data.py | 12 +- .../infra/repositories/repository_vessel.py | 107 ++++++++++++++++-- .../repository_vessel_position.py | 5 +- .../infra/repositories/repository_zone.py | 100 ++++++++++++++-- backend/bloom/tasks/clean_positions.py | 1 + .../bloom/tasks/load_dim_vessel_from_csv.py | 21 ++-- .../bloom/tasks/load_dim_zone_amp_from_csv.py | 5 + 10 files changed, 237 insertions(+), 29 deletions(-) diff --git a/backend/bloom/domain/vessel.py b/backend/bloom/domain/vessel.py index 286bd642..0a2e7708 100644 --- a/backend/bloom/domain/vessel.py +++ b/backend/bloom/domain/vessel.py @@ -6,6 +6,7 @@ class Vessel(BaseModel): id: Union[int, None] = None + key: Union[str, None] = None mmsi: Union[int, None] ship_name: str width: Union[float, None] = None @@ -24,6 +25,9 @@ class Vessel(BaseModel): details: Union[str, None] = None check: Union[str, None] = None length_class: Union[str, None] = None + scd_start: Union[datetime, None] = None + scd_end: Union[datetime, None] = None + scd_active: Union[bool, None] = None class VesselListView(Vessel): details:ClassVar[Union[str, None]] = None diff --git a/backend/bloom/domain/zone.py b/backend/bloom/domain/zone.py index 737a6897..15bb419e 100644 --- a/backend/bloom/domain/zone.py +++ b/backend/bloom/domain/zone.py @@ -12,6 +12,7 @@ class Zone(BaseModel): Geometry: lambda p: mapping(p), },) id: Union[int, None] = None + key: Union[str, None] = None category: str sub_category: Union[str, None] = None name: str @@ -21,14 +22,21 @@ class Zone(BaseModel): centroid: Union[Point, None] = None json_data: Union[dict, None] = None enable: Union[bool, None] = None + scd_start: Union[datetime, None] = None + scd_end: Union[datetime, None] = None + scd_active: Union[bool, None] = None class ZoneSummary(BaseModel): id: Union[int, None] = None + key: Union[str, None] = None category: str sub_category: Union[str, None] = None name: str created_at: Union[datetime, None] = None enable: Union[bool, None] = None + scd_start: Union[datetime, None] = None + scd_end: Union[datetime, None] = None + scd_active: Union[bool, None] = None class ZoneListView(Zone): diff --git a/backend/bloom/infra/database/sql_model.py b/backend/bloom/infra/database/sql_model.py index 54ccde14..58cf95d6 100644 --- a/backend/bloom/infra/database/sql_model.py +++ b/backend/bloom/infra/database/sql_model.py @@ -24,6 +24,7 @@ class Vessel(Base): __tablename__ = "dim_vessel" id = Column("id", Integer, primary_key=True) + key = Column("key",String,nullable=False) mmsi = Column("mmsi", Integer) ship_name = Column("ship_name", String, nullable=False) width = Column("width", Double) @@ -47,7 +48,6 @@ class Vessel(Base): scd_start = Column("scd_start",DateTime(timezone=True)) scd_end = Column("scd_end",DateTime(timezone=True)) scd_active = Column("scd_active",Boolean) - key = Column("key",String) class Alert(Base): @@ -114,6 +114,7 @@ class SpireAisData(Base): class Zone(Base): __tablename__ = "dim_zone" id = Column("id", Integer, primary_key=True) + key = Column("key", String, nullable=False) category = Column("category", String, nullable=False) sub_category = Column("sub_category", String) name = Column("name", String, nullable=False) diff --git a/backend/bloom/infra/repositories/repository_spire_ais_data.py b/backend/bloom/infra/repositories/repository_spire_ais_data.py index 696ed98d..4b8c27b4 100644 --- a/backend/bloom/infra/repositories/repository_spire_ais_data.py +++ b/backend/bloom/infra/repositories/repository_spire_ais_data.py @@ -3,7 +3,7 @@ from bloom.infra.database import sql_model from dependency_injector.providers import Callable from sqlalchemy.orm import Session -from sqlalchemy import select, and_ +from sqlalchemy import select, and_, between from datetime import datetime from bloom.logger import logger @@ -38,6 +38,7 @@ def get_all_data_after_date( sql_model.SpireAisData.spire_update_statement, sql_model.SpireAisData.vessel_mmsi, sql_model.Vessel.id, + sql_model.Vessel.key, sql_model.SpireAisData.position_accuracy, sql_model.SpireAisData.position_collection_type, sql_model.SpireAisData.position_course, @@ -66,6 +67,7 @@ def get_all_data_after_date( "spire_update_statement", "vessel_mmsi", "vessel_id", + "vessel_key", "position_accuracy", "position_collection_type", "position_course", @@ -90,6 +92,7 @@ def get_all_data_between_date( sql_model.SpireAisData.spire_update_statement, sql_model.SpireAisData.vessel_mmsi, sql_model.Vessel.id, + sql_model.Vessel.key, sql_model.SpireAisData.position_accuracy, sql_model.SpireAisData.position_collection_type, sql_model.SpireAisData.position_course, @@ -112,6 +115,12 @@ def get_all_data_between_date( sql_model.SpireAisData.created_at > created_updated_after, sql_model.SpireAisData.created_at <= created_updated_before ) + ).where( + # scd attribute management + # filter on spire_update_statement reference date + between(sql_model.SpireAisData.spire_update_statement, + sql_model.Vessel.scd_start, + sql_model.Vessel.scd_end) ).order_by(sql_model.SpireAisData.created_at.asc()) result = session.execute(stmt) return pd.DataFrame(result, columns=[ @@ -119,6 +128,7 @@ def get_all_data_between_date( "spire_update_statement", "vessel_mmsi", "vessel_id", + "vessel_key", "position_accuracy", "position_collection_type", "position_course", diff --git a/backend/bloom/infra/repositories/repository_vessel.py b/backend/bloom/infra/repositories/repository_vessel.py index a0ea4bf0..85fba70e 100644 --- a/backend/bloom/infra/repositories/repository_vessel.py +++ b/backend/bloom/infra/repositories/repository_vessel.py @@ -1,15 +1,17 @@ from contextlib import AbstractContextManager -from typing import Any, Generator, Union +from typing import Any, Generator, Union, Optional +from datetime import datetime from bloom.domain.vessel import Vessel from bloom.domain.metrics import VesselTimeInZone from bloom.infra.database import sql_model from dependency_injector.providers import Callable -from sqlalchemy import func, select, update, and_, asc, desc, literal_column +from sqlalchemy import func, select, update, and_, asc, desc, literal_column, between from sqlalchemy.orm import Session from bloom.routers.requests import (DatetimeRangeRequest, OrderByRequest, OrderByEnum) +from bloom.config import settings class VesselRepository: @@ -20,54 +22,128 @@ def __init__( self.session_factory = session_factory - def get_vessel_tracked_count(self, session: Session) -> int: + def get_vessel_tracked_count(self, session: Session, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> int: stmt = select(func.count(sql_model.Vessel.id)).select_from(sql_model.Vessel)\ .distinct().where(sql_model.Vessel.tracking_activated == True) + if scd_enable: + if scd_date: + stmt=stmt.where(between(scd_date,sql_model.Vessel.scd_start,sql_model.Vessel.scd_end)) + else: + stmt=stmt.where(sql_model.Vessel.scd_active) return session.execute(stmt).scalar() - def get_vessel_types(self, session: Session) -> list[str]: + def get_vessel_types(self, + session: Session, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> list[str]: stmt = select(sql_model.Vessel.type).select_from(sql_model.Vessel).distinct() + if scd_enable: + if scd_date: + stmt=stmt.where(between(scd_date,sql_model.Vessel.scd_start,sql_model.Vessel.scd_end)) + else: + stmt=stmt.where(sql_model.Vessel.scd_active) return [i for i in session.execute(stmt).scalars()] - def get_vessel_length_classes(self, session: Session) -> list[str]: + def get_vessel_length_classes(self, session: Session, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> list[str]: stmt = select(sql_model.Vessel.length_class).select_from(sql_model.Vessel).distinct() + if scd_enable: + if scd_date: + stmt=stmt.where(between(scd_date,sql_model.Vessel.scd_start,sql_model.Vessel.scd_end)) + else: + stmt=stmt.where(sql_model.Vessel.scd_active) return [i for i in session.execute(stmt).scalars()] - def get_vessel_countries(self, session: Session) -> list[str]: + def get_vessel_countries(self, session: Session, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> list[str]: stmt = select(sql_model.Vessel.country_iso3).select_from(sql_model.Vessel).distinct() + if scd_enable: + if scd_date: + stmt=stmt.where(between(scd_date,sql_model.Vessel.scd_start,sql_model.Vessel.scd_end)) + else: + stmt=stmt.where(sql_model.Vessel.scd_active) return [i for i in session.execute(stmt).scalars()] def get_vessel_by_id(self, session: Session, vessel_id: int) -> Union[Vessel, None]: return session.get(sql_model.Vessel, vessel_id) + + def get_vessel_by_key(self, session: Session, + key: str, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> Union[Vessel, list[Vessel],None]: + stmt=select(sql_model.Vessel).where(sql_model.Vessel.key == key) + if scd_enable: + if scd_date: + stmt=stmt.where(between(scd_date,sql_model.Vessel.scd_start,sql_model.Vessel.scd_end)) + else: + stmt=stmt.where(sql_model.Vessel.scd_active) + return session.execute(stmt).scalar() + else: + return session.execute(stmt).scalars() - def get_activated_vessel_by_mmsi(self, session: Session, mmsi: int) -> Union[Vessel, None]: + def get_activated_vessel_by_mmsi(self, session: Session, + mmsi: int, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> Union[Vessel, None]: stmt = select(sql_model.Vessel).where( and_( sql_model.Vessel.tracking_activated == True, sql_model.Vessel.mmsi == mmsi ) ) + if scd_enable: + if scd_date: + stmt=stmt.where(between(scd_date,sql_model.Vessel.scd_start,sql_model.Vessel.scd_end)) + else: + stmt=stmt.where(sql_model.Vessel.scd_active) vessel = session.execute(stmt).scalar() if not vessel: return None else: return VesselRepository.map_to_domain(vessel) - def get_vessels_list(self, session: Session) -> list[Vessel]: + def get_vessels_list(self, session: Session, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> list[Vessel]: """ Liste l'ensemble des vessels actifs """ stmt = select(sql_model.Vessel).where(sql_model.Vessel.tracking_activated == True) + if scd_enable: + if scd_date: + stmt=stmt.where(between(scd_date,sql_model.Vessel.scd_start,sql_model.Vessel.scd_end)) + else: + stmt=stmt.where(sql_model.Vessel.scd_active) e = session.execute(stmt).scalars() if not e: return [] return [VesselRepository.map_to_domain(vessel) for vessel in e] - def get_all_vessels_list(self, session: Session) -> list[Vessel]: + def get_all_vessels_list(self, + session: Session, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> list[Vessel]: """ Liste l'ensemble des vessels actifs ou inactifs """ stmt = select(sql_model.Vessel) + if scd_enable: + if scd_date: + stmt=stmt.where(between(scd_date,sql_model.Vessel.scd_start,sql_model.Vessel.scd_end)) + else: + stmt=stmt.where(sql_model.Vessel.scd_active) e = session.execute(stmt).scalars() if not e: @@ -137,8 +213,10 @@ def get_vessel_times_in_zones( self, return result - def batch_create_vessel(self, session: Session, vessels: list[Vessel]) -> list[Vessel]: - orm_list = [VesselRepository.map_to_sql(port) for port in vessels] + def batch_create_vessel(self, session: Session, + vessels: list[Vessel] + ) -> list[Vessel]: + orm_list = [VesselRepository.map_to_sql(vessel) for vessel in vessels] session.add_all(orm_list) return [VesselRepository.map_to_domain(orm) for orm in orm_list] @@ -187,12 +265,16 @@ def map_to_domain(sql_vessel: sql_model.Vessel) -> Vessel: details=sql_vessel.details, check=sql_vessel.check, length_class=sql_vessel.length_class, + scd_start=sql_vessel.scd_start, + scd_end=sql_vessel.scd_end, + scd_active=sql_vessel.scd_active, ) @staticmethod def map_to_sql(vessel: Vessel) -> sql_model.Vessel: return sql_model.Vessel( id=vessel.id, + key=vessel.key, mmsi=vessel.mmsi, ship_name=vessel.ship_name, width=vessel.width, @@ -211,4 +293,7 @@ def map_to_sql(vessel: Vessel) -> sql_model.Vessel: details=vessel.details, check=vessel.check, length_class=vessel.length_class, + scd_start=vessel.scd_start, + scd_end=vessel.scd_end, + scd_active=vessel.scd_active, ) diff --git a/backend/bloom/infra/repositories/repository_vessel_position.py b/backend/bloom/infra/repositories/repository_vessel_position.py index 65e340ac..54ecd9f5 100644 --- a/backend/bloom/infra/repositories/repository_vessel_position.py +++ b/backend/bloom/infra/repositories/repository_vessel_position.py @@ -23,7 +23,10 @@ def create_vessel_position(self, session: Session, position: VesselPosition) -> session.add(orm_position) return VesselPositionRepository.map_to_domain(orm_position) - def batch_create_vessel_position(self, session: Session, vessel_positions: list[VesselPosition]) -> list[ + def batch_create_vessel_position(self, + session: Session, + vessel_positions: list[VesselPosition] + ) -> list[ VesselPosition]: orm_list = [VesselPositionRepository.map_to_sql(vessel_position) for vessel_position in vessel_positions] session.add_all(orm_list) diff --git a/backend/bloom/infra/repositories/repository_zone.py b/backend/bloom/infra/repositories/repository_zone.py index 198c3f6f..7231625e 100644 --- a/backend/bloom/infra/repositories/repository_zone.py +++ b/backend/bloom/infra/repositories/repository_zone.py @@ -1,6 +1,8 @@ from contextlib import AbstractContextManager from typing import Any, List, Union, Optional +from datetime import datetime +from bloom.config import settings from bloom.domain.zone import Zone, ZoneListView, ZoneSummary from bloom.domain.zone_category import ZoneCategory from bloom.infra.database import sql_model @@ -8,7 +10,7 @@ from geoalchemy2.shape import from_shape, to_shape from sqlalchemy.orm import Session from bloom.routers.requests import RangeHeader, PaginatedResult, NonPaginatedResult -from sqlalchemy.sql.expression import ScalarSelect, and_, or_, func, text +from sqlalchemy.sql.expression import ScalarSelect, and_, or_, func, text, select, between class ZoneRepository: @@ -20,14 +22,41 @@ def __init__( def get_zone_by_id(self, session: Session, zone_id: int) -> Union[Zone, None]: return ZoneRepository.map_to_domain(session.get(sql_model.Zone, zone_id)) + + def get_zone_by_key(self, session: Session, + key: str, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + )-> Union[Zone,list[Zone],None]: + stmt=select(sql_model.Zone).where(sql_model.Zone.key == key) + if scd_enable: + if scd_date: + stmt=stmt.where(between(scd_date,sql_model.Zone.scd_start,sql_model.Zone.scd_end)) + else: + stmt=stmt.where(sql_model.Zone.scd_active) + return ZoneRepository.map_to_domain(session.execute(stmt).scalar()) + else: + return ZoneRepository.map_to_domain(session.execute(stmt).scalars()) - def get_all_zones(self, session: Session, range: Optional[RangeHeader | None] = None) -> PaginatedResult[ + def get_all_zones(self, session: Session, + range: Optional[RangeHeader | None] = None, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> PaginatedResult[ list[Zone]]: # q = session.execute(q) payload = [] if range is not None: base_query = session.query(sql_model.Zone, func.count().over().label('total')) + + # SCD attribute management + if scd_enable: + if scd_date: + base_query=base_query.where(between(scd_date,sql_model.Zone.scd_start,sql_model.Zone.scd_end)) + else: + base_query=base_query.where(sql_model.Zone.scd_active) + # Getting total count of model table to evaluate validity of ranges total_query = session.query(func.count().label('total')).select_from(sql_model.Zone) total_count = session.execute(total_query).scalar_one_or_none() @@ -44,11 +73,21 @@ def get_all_zones(self, session: Session, range: Optional[RangeHeader | None] = if spec.end == None: range.spec[i].end = total - 1 return PaginatedResult[list[Zone]](payload=payload, total=total, spec=range.spec, unit=range.unit) else: + # SCD attribute management + query=session.query(sql_model.Zone) + if scd_enable: + if scd_date: + query=query.where(between(scd_date,sql_model.Zone.scd_start,sql_model.Zone.scd_end)) + else: + query=query.where(sql_model.Zone.scd_active) payload = [ZoneRepository.map_to_domain(model).model_dump() - for model in session.execute(session.query(sql_model.Zone)).scalars()] + for model in session.execute(query).scalars()] return PaginatedResult[list[Zone]](payload=payload) - def get_all_zones_summary(self, session: Session) -> list[ZoneSummary]: + def get_all_zones_summary(self, session: Session, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> list[ZoneSummary]: q = session.query( sql_model.Zone.id, sql_model.Zone.category, @@ -56,32 +95,65 @@ def get_all_zones_summary(self, session: Session) -> list[ZoneSummary]: sql_model.Zone.name, sql_model.Zone.created_at, sql_model.Zone.enable, - ).all() + ) + if scd_enable: + if scd_date: + q=q.where(between(scd_date,sql_model.Zone.scd_start,sql_model.Zone.scd_end)) + else: + q=q.where(sql_model.Zone.scd_active) + if not q: return [] - return [ZoneRepository.map_to_summary(entity) for entity in q] + return [ZoneRepository.map_to_summary(entity) for entity in q.all()] - def get_all_zone_categories(self, session: Session) -> list[ZoneCategory]: + def get_all_zone_categories(self, session: Session, + scd_date:Optional[datetime]=None, + scd_enable:bool=True) -> list[ZoneCategory]: q = session.query(sql_model.Zone.category, sql_model.Zone.sub_category).distinct() + # SCD attribute management + if scd_enable: + if scd_date: + q=q.where(between(scd_date,sql_model.Zone.scd_start,sql_model.Zone.scd_end)) + else: + q=q.where(sql_model.Zone.scd_active) q = session.execute(q) if not q: return [] return [ZoneRepository.map_zonecategory_to_domain(ZoneCategory(category=cat, sub_category=sub)) for cat, sub in q] - def get_all_zones_by_category(self, session: Session, category: str = None, sub: str = None) -> list[Zone]: + def get_all_zones_by_category(self, session: Session, + category: str = None, sub: str = None, + scd_date:Optional[datetime]=None, + scd_enable:bool=True + ) -> list[Zone]: q = session.query(sql_model.Zone) if category: q = q.filter(sql_model.Zone.category == category) if sub: q = q.filter(sql_model.Zone.sub_category == sub) + # SCD attribute management + if scd_enable: + if scd_date: + q=q.where(between(scd_date,sql_model.Zone.scd_start,sql_model.Zone.scd_end)) + else: + q=q.where(sql_model.Zone.scd_active) if not q: return [] return [ZoneRepository.map_to_domain(entity) for entity in session.execute(q).scalars()] def batch_create_zone(self, session: Session, zones: list[Zone]) -> list[Zone]: orm_list = [ZoneRepository.map_to_orm(zone) for zone in zones] + + # SCD attributes management initialization + # Consider new vessel valid and active on all scd interval + # from settings.scd_past_limit to settings.scd_future_limit + for zone in orm_list: + zone.scd_active = True + zone.scd_start = settings.scd_past_limit + zone.scd_end = settings.scd_future_limit + session.add_all(orm_list) return [ZoneRepository.map_to_domain(orm) for orm in orm_list] @@ -89,6 +161,7 @@ def batch_create_zone(self, session: Session, zones: list[Zone]) -> list[Zone]: def map_to_orm(zone: Zone) -> sql_model.Zone: return sql_model.Zone( id=zone.id, + key=zone.key, category=zone.category, sub_category=zone.sub_category, name=zone.name, @@ -97,12 +170,16 @@ def map_to_orm(zone: Zone) -> sql_model.Zone: json_data=zone.json_data, created_at=zone.created_at, enable=zone.enable, + scd_start=zone.scd_start, + scd_end=zone.scd_end, + scd_active=zone.scd_active, ) @staticmethod def map_to_domain(zone: sql_model.Zone) -> Zone: return Zone( id=zone.id, + key=zone.key, category=zone.category, sub_category=zone.sub_category, name=zone.name, @@ -111,17 +188,24 @@ def map_to_domain(zone: sql_model.Zone) -> Zone: json_data=zone.json_data, created_at=zone.created_at, enable=zone.enable, + scd_start=zone.scd_start, + scd_end=zone.scd_end, + scd_active=zone.scd_active, ) @staticmethod def map_to_summary(zone: sql_model.Zone) -> ZoneSummary: return ZoneSummary( id=zone.id, + key=zone.key, category=zone.category, sub_category=zone.sub_category, name=zone.name, created_at=zone.created_at, enable=zone.enable, + scd_start=zone.scd_start, + scd_end=zone.scd_end, + scd_active=zone.scd_active, ) @staticmethod diff --git a/backend/bloom/tasks/clean_positions.py b/backend/bloom/tasks/clean_positions.py index 4e801f87..7ebee042 100644 --- a/backend/bloom/tasks/clean_positions.py +++ b/backend/bloom/tasks/clean_positions.py @@ -27,6 +27,7 @@ def get_distance(current_position: tuple, last_position: tuple): def map_vessel_position_to_domain(row: pd.Series) -> VesselPosition: return VesselPosition( vessel_id=row["vessel_id"], + vessel_key=row["vessel_id"], timestamp=row["position_timestamp"], accuracy=row["position_accuracy"], collection_type=row["position_collection_type"], diff --git a/backend/bloom/tasks/load_dim_vessel_from_csv.py b/backend/bloom/tasks/load_dim_vessel_from_csv.py index 1d0a1e96..050f303c 100644 --- a/backend/bloom/tasks/load_dim_vessel_from_csv.py +++ b/backend/bloom/tasks/load_dim_vessel_from_csv.py @@ -9,11 +9,15 @@ from bloom.logger import logger from pydantic import ValidationError +from bloom.config import settings +from datetime import datetime,timezone + def map_to_domain(row: pd.Series) -> Vessel: isna = row.isna() return Vessel( id=int(row["id"]) if not isna["id"] else None, + key=str(row['imo']) if not isna["imo"] else str(row['cfr']), mmsi=int(row["mmsi"]) if not isna["mmsi"] else None, ship_name=row["ship_name"], width=int(row["width"]) if not isna["width"] else None, @@ -29,6 +33,9 @@ def map_to_domain(row: pd.Series) -> Vessel: details=row["details"] if not isna["details"] else None, length_class=row["length_class"] if not isna["length_class"] else None, check=row["check"] if not isna["length_class"] else None, + scd_start=datetime.fromisoformat(row["scd_start"]) if "scd_start" in row else settings.scd_past_limit, + scd_end=datetime.fromisoformat(row["scd_end"]) if "scd_end" in row else settings.scd_future_limit, + scd_active=row["scd_active"] if "scd_end" in row else True, ) @@ -38,27 +45,27 @@ def run(csv_file_name: str) -> None: db = use_cases.db() inserted_ports = [] + updated_vessels = [] deleted_ports = [] try: df = pd.read_csv(csv_file_name, sep=",") vessels = df.apply(map_to_domain, axis=1) + with db.session() as session: - ports_inserts = [] - ports_updates = [] # Pour chaque enregistrement du fichier CSV for vessel in vessels: if vessel.id and vessel_repository.get_vessel_by_id(session, vessel.id): # si la valeur du champ id n'est pas vide: # rechercher l'enregistrement correspondant dans la table dim_vessel # mettre à jour l'enregistrement à partir des données CSV. - ports_updates.append(vessel) + updated_vessels.append(vessel) else: # sinon: # insérer les données CSV dans la table dim_vessel; - ports_inserts.append(vessel) + inserted_ports.append(vessel) # Insertions / MAJ en batch - inserted_ports = vessel_repository.batch_create_vessel(session, ports_inserts) - vessel_repository.batch_update_vessel(session, ports_updates) + inserted_vessels = vessel_repository.batch_create_vessel(session, inserted_ports) + vessel_repository.batch_update_vessel(session, updated_vessels) # En fin de traitement: # les enregistrements de la table dim_vessel pourtant un MMSI absent du fichier CSV sont mis à jour @@ -83,7 +90,7 @@ def run(csv_file_name: str) -> None: except DBException: logger.error("Erreur d'insertion en base") logger.info(f"{len(inserted_ports)} bateau(x) créés") - logger.info(f"{len(ports_updates)} bateau(x) mise à jour ou inchangés") + logger.info(f"{len(updated_vessels)} bateau(x) mise à jour ou inchangés") logger.info(f"{len(deleted_ports)} bateau(x) désactivés") diff --git a/backend/bloom/tasks/load_dim_zone_amp_from_csv.py b/backend/bloom/tasks/load_dim_zone_amp_from_csv.py index 3adbc2a1..46f82d39 100644 --- a/backend/bloom/tasks/load_dim_zone_amp_from_csv.py +++ b/backend/bloom/tasks/load_dim_zone_amp_from_csv.py @@ -3,6 +3,7 @@ import pandas as pd from shapely import wkb +from datetime import datetime from bloom.config import settings from bloom.container import UseCases @@ -25,12 +26,16 @@ def map_to_domain(row: pd.Series) -> Zone: pass return Zone( + key=f'{row["category"]}/{row["sub_category"]}/{row["name"]}' if 'key' not in row else row['key'], category=row["category"], sub_category=row["sub_category"] if not isna["sub_category"] else None, name=row["name"], geometry=row["geometry"], centroid=row["geometry"].centroid, json_data=json_data, + scd_start=datetime.fromisoformat(row["scd_start"]) if "scd_start" in row else settings.scd_past_limit, + scd_end=datetime.fromisoformat(row["scd_end"]) if "scd_end" in row else settings.scd_future_limit, + scd_active=row["scd_active"] if "scd_end" in row else True, ) From 2244f769215e9ae1e511835592175de4fc107bf3 Mon Sep 17 00:00:00 2001 From: RV Date: Sun, 16 Mar 2025 22:26:00 +0100 Subject: [PATCH 3/6] fix: load_dim_zone_amp_from_csv when file is empty ('str' has no attribute 'id') --- backend/bloom/tasks/load_dim_zone_amp_from_csv.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/bloom/tasks/load_dim_zone_amp_from_csv.py b/backend/bloom/tasks/load_dim_zone_amp_from_csv.py index 46f82d39..497dbf97 100644 --- a/backend/bloom/tasks/load_dim_zone_amp_from_csv.py +++ b/backend/bloom/tasks/load_dim_zone_amp_from_csv.py @@ -4,7 +4,6 @@ import pandas as pd from shapely import wkb from datetime import datetime - from bloom.config import settings from bloom.container import UseCases from bloom.domain.zone import Zone @@ -52,9 +51,10 @@ def run(): total = 0 df = pd.read_csv(file_name, sep=",") df["geometry"] = df["geometry"].apply(wkb.loads) - zones = df.apply(map_to_domain, axis=1) - zones = zone_repository.batch_create_zone(session, list(zones)) - total = len(zones) + if not df.empty: + zones = df.apply(map_to_domain, axis=1) + zones = zone_repository.batch_create_zone(session, list(zones)) + total = len(zones) logger.info(f"{total} zone(s) créés") session.commit() From 5eaf6cf793ec05b32a7c403234f875b6e16a7adc Mon Sep 17 00:00:00 2001 From: RV Date: Sun, 16 Mar 2025 22:59:26 +0100 Subject: [PATCH 4/6] fix: "nan" instead of "" when sub_category is NULL --- backend/bloom/tasks/load_dim_zone_amp_from_csv.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/bloom/tasks/load_dim_zone_amp_from_csv.py b/backend/bloom/tasks/load_dim_zone_amp_from_csv.py index 497dbf97..88bc9ae2 100644 --- a/backend/bloom/tasks/load_dim_zone_amp_from_csv.py +++ b/backend/bloom/tasks/load_dim_zone_amp_from_csv.py @@ -8,6 +8,7 @@ from bloom.container import UseCases from bloom.domain.zone import Zone from bloom.logger import logger +import numpy as np FIC_ZONE = ["french_metropolitan_mpas.csv","fishing_coastal_waters.csv", "territorial_seas.csv","clipped_territorial_seas.csv"] @@ -25,7 +26,7 @@ def map_to_domain(row: pd.Series) -> Zone: pass return Zone( - key=f'{row["category"]}/{row["sub_category"]}/{row["name"]}' if 'key' not in row else row['key'], + key=f'{row["category"]}/{row["sub_category"] if not isna["sub_category"] else "" }/{row["name"]}' if 'key' not in row else row['key'], category=row["category"], sub_category=row["sub_category"] if not isna["sub_category"] else None, name=row["name"], From 2598110632777cb8a0fe9c8920fb4d12a0dd91e7 Mon Sep 17 00:00:00 2001 From: RV Date: Thu, 20 Mar 2025 23:32:24 +0100 Subject: [PATCH 5/6] feat: transform VesselRepository dep injection by constructor + starting SCD methods --- backend/bloom/container.py | 3 +- backend/bloom/infra/ports/repository.py | 113 ++++++++++++++++++ .../infra/repositories/repository_vessel.py | 43 ++++--- backend/bloom/routers/v1/vessels.py | 15 ++- ...convert_spire_vessels_to_spire_ais_data.py | 4 +- .../bloom/tasks/load_dim_vessel_from_csv.py | 7 +- .../bloom/tasks/load_spire_data_from_api.py | 2 +- .../bloom/tasks/update_vessel_data_voyage.py | 2 +- 8 files changed, 159 insertions(+), 30 deletions(-) create mode 100644 backend/bloom/infra/ports/repository.py diff --git a/backend/bloom/container.py b/backend/bloom/container.py index 26efb6cf..092ef28a 100644 --- a/backend/bloom/container.py +++ b/backend/bloom/container.py @@ -35,8 +35,7 @@ class UseCases(containers.DeclarativeContainer): ) vessel_repository = providers.Factory( - VesselRepository, - session_factory=db.provided.session, + VesselRepository ) alert_repository = providers.Factory( diff --git a/backend/bloom/infra/ports/repository.py b/backend/bloom/infra/ports/repository.py new file mode 100644 index 00000000..2f3bd81d --- /dev/null +++ b/backend/bloom/infra/ports/repository.py @@ -0,0 +1,113 @@ +import abc +from typing import TypeVar, Generic, Optional, List +from datetime import datetime +from sqlalchemy.sql.expression import ScalarSelect, select, and_, or_, between + +DOMAIN=TypeVar("DOMAIN") + +class RepositoryError(Exception): ... +class ItemNotFoundError(RepositoryError): ... +class SCDRepositoryError(RepositoryError): ... +class SCDDateError(SCDRepositoryError): ... +class SCDDateStartEndInvertedError(SCDRepositoryError): ... +class SCDDateOverlapError(SCDRepositoryError): ... + +class AbstractRepositoryClient(): + pass + +def construct_findBy_statement(model_type, + offset=None, + limit=None, + **filters, + )-> ScalarSelect: + stmt = select(model_type) + where_clauses = [] + for c, v in filters.items(): + print(c,v) + if not hasattr(model_type, c): + raise ValueError(f"Invalid column name {c}") + where_clauses.append(getattr(model_type, c) == v) + + if offset: + stmt=stmt.offset(offset) + if limit: + stmt=stmt.limit(limit) + + if len(where_clauses) == 1: + stmt = stmt.where(where_clauses[0]) + elif len(where_clauses) > 1: + stmt = stmt.where(and_(*where_clauses)) + return stmt + +def construct_findBy_scd_statement(model_type, + offset:int=None, + limit:int=None, + scd_date:Optional[datetime]=None, + **filters)-> ScalarSelect: + stmt = select(model_type) + where_clauses = [] + for c, v in filters.items(): + if not hasattr(model_type, c): + raise ValueError(f"Invalid column name {c}") + where_clauses.append(getattr(model_type, c) == v) + + if offset: + stmt=stmt.offset(offset) + if limit: + stmt=stmt.limit(limit) + + if scd_date != None: + stmt=stmt.where(between(scd_date,model_type.scd_start, model_type.scd_end)) + else: + stmt=stmt.where(model_type.scd_active == True) + + if len(where_clauses) == 1: + stmt = stmt.where(where_clauses[0]) + elif len(where_clauses) > 1: + stmt = stmt.where(and_(*where_clauses)) + return stmt + +class AbstractRepository(abc.ABC, Generic[DOMAIN]): + @abc.abstractmethod + def findBy(self, offset=None, limit=None, **filters) -> List[DOMAIN]: + raise NotImplementedError() + """@abc.abstractmethod + def insert(self, item:DOMAIN) -> DOMAIN: + raise NotImplementedError() + @abc.abstractmethod + def update(self, item:DOMAIN) -> DOMAIN: + raise NotImplementedError() + @abc.abstractmethod + def delete(self, filters:List) -> None: + raise NotImplementedError()""" + +class AbstractSqlAlchemyMixIn(): + pass + +class AbstractScdRepositoryMixIn(Generic[DOMAIN]): + @abc.abstractmethod + def scd_insert(self, + item:DOMAIN, + scd_start:datetime, + scd_end:datetime) -> DOMAIN: + """ + Insert new item with SCD date check. + If date range in arguments overlaps existing date ranges then the + function raise an error of type SCDDateOverlapError + """ + raise NotImplementedError() + @abc.abstractmethod + def scd_update(self, + item:DOMAIN, + scd_start:datetime, + scd_end:datetime) -> DOMAIN: + """ + update existing items with SCD date check. + Item must already exist else raise an ItemNotFoundError error + If date range in arguments overlaps existing date ranges, then + existing date ranges are update in order to insert new date range + """ + raise NotImplementedError() + @abc.abstractmethod + def scd_delete(self, item:DOMAIN) -> DOMAIN: + raise NotImplementedError() \ No newline at end of file diff --git a/backend/bloom/infra/repositories/repository_vessel.py b/backend/bloom/infra/repositories/repository_vessel.py index 85fba70e..c470e58f 100644 --- a/backend/bloom/infra/repositories/repository_vessel.py +++ b/backend/bloom/infra/repositories/repository_vessel.py @@ -12,28 +12,41 @@ OrderByRequest, OrderByEnum) from bloom.config import settings +from bloom.infra.ports.repository import ( + AbstractRepository, + AbstractScdRepositoryMixIn, + AbstractSqlAlchemyMixIn +) +from bloom.infra.ports.repository import (AbstractRepository, + construct_findBy_scd_statement +) -class VesselRepository: - def __init__( - self, - session_factory: Callable, - ) -> Callable[..., AbstractContextManager]: - self.session_factory = session_factory +class VesselRepository(AbstractRepository[Vessel]): + session:Session + def __init__(self, session:Session): + self.session=session + def findBy(self, + offset=None, + limit=None, + scd_date:Optional[datetime]=None, + **filters): + stmt = construct_findBy_scd_statement(sql_model.Vessel, + offset=offset, + limit=limit, + scd_date=scd_date, + **filters) + return [v for v in self.session.execute(stmt).scalars()] - def get_vessel_tracked_count(self, session: Session, + def get_vessel_tracked_count(self, scd_date:Optional[datetime]=None, scd_enable:bool=True ) -> int: - stmt = select(func.count(sql_model.Vessel.id)).select_from(sql_model.Vessel)\ - .distinct().where(sql_model.Vessel.tracking_activated == True) - if scd_enable: - if scd_date: - stmt=stmt.where(between(scd_date,sql_model.Vessel.scd_start,sql_model.Vessel.scd_end)) - else: - stmt=stmt.where(sql_model.Vessel.scd_active) - return session.execute(stmt).scalar() + stmt = construct_findBy_scd_statement(sql_model.Vessel, + scd_date=scd_date, + tracking_activated=True) + return [v for v in self.session.execute(stmt).scalars()] def get_vessel_types(self, session: Session, diff --git a/backend/bloom/routers/v1/vessels.py b/backend/bloom/routers/v1/vessels.py index c33f11e7..a4b4a791 100644 --- a/backend/bloom/routers/v1/vessels.py +++ b/backend/bloom/routers/v1/vessels.py @@ -18,9 +18,9 @@ async def list_vessel_tracked(request: Request, # used by @cache key: str = Depends(X_API_KEY_HEADER)): check_apikey(key) use_cases = UseCases() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) return vessel_repository.get_vessel_tracked_count(session) @router.get("/vessels/types") @@ -28,9 +28,9 @@ async def list_vessel_types(request: Request, # used by @cache key: str = Depends(X_API_KEY_HEADER)): check_apikey(key) use_cases = UseCases() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) return vessel_repository.get_vessel_types(session) @@ -39,9 +39,9 @@ async def list_vessel_classes(request: Request, # used by @cache key: str = Depends(X_API_KEY_HEADER)): check_apikey(key) use_cases = UseCases() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) return vessel_repository.get_vessel_length_classes(session) @router.get("/vessels/countries") @@ -49,9 +49,9 @@ async def list_vessel_countries(request: Request, # used by @cache key: str = Depends(X_API_KEY_HEADER)): check_apikey(key) use_cases = UseCases() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) return vessel_repository.get_vessel_countries(session) @@ -62,9 +62,9 @@ async def list_vessels(request: Request, # used by @cache key: str = Depends(X_API_KEY_HEADER)): check_apikey(key) use_cases = UseCases() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) return jsonable_encoder(vessel_repository.get_vessels_list(session)) @router.get("/vessels/{vessel_id}") @@ -75,10 +75,10 @@ async def get_vessel(request: Request, # used by @cache key: str = Depends(X_API_KEY_HEADER)): check_apikey(key) use_cases = UseCases() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() json_data={} with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) data=vessel_repository.get_vessel_by_id(session,vessel_id) return json.loads(vessel_repository.map_to_domain(data).model_dump_json()) if data else {} @@ -89,7 +89,6 @@ async def list_all_vessel_last_position(request: Request, # used by @cache key: str = Depends(X_API_KEY_HEADER)): check_apikey(key) use_cases = UseCases() - use_cases = UseCases() segment_repository = use_cases.segment_repository() db = use_cases.db() json_data={} @@ -203,10 +202,10 @@ async def get_vessel_excursion_segment(request: Request, # used by @cache key: str = Depends(X_API_KEY_HEADER)): check_apikey(key) use_cases = UseCases() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() json_data={} with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) result = vessel_repository.get_vessel_times_in_zones(session, vessel_id=vessel_id, datetime_range=datetime_range, diff --git a/backend/bloom/tasks/convert_spire_vessels_to_spire_ais_data.py b/backend/bloom/tasks/convert_spire_vessels_to_spire_ais_data.py index df120b9e..3d0bd939 100644 --- a/backend/bloom/tasks/convert_spire_vessels_to_spire_ais_data.py +++ b/backend/bloom/tasks/convert_spire_vessels_to_spire_ais_data.py @@ -10,7 +10,6 @@ from sqlalchemy.orm.session import Session use_cases = UseCases() -vessel_repo = use_cases.vessel_repository() spire_ais_data_repo = use_cases.spire_ais_data_repository() db = use_cases.db() batch_size = 1000 @@ -54,7 +53,8 @@ def map_to_ais_spire_data(vessel_position: VesselPositionSpire) -> SpireAisData: def batch_convert(session: Session) -> Generator[list[SpireAisData], None, None]: batch = [] - for vessel_position in vessel_repo.get_all_spire_vessels_position(session, batch_size): + vessel_repository = use_cases.vessel_repository(session=session) + for vessel_position in vessel_repository.get_all_spire_vessels_position(session, batch_size): batch.append(map_to_ais_spire_data(vessel_position)) if len(batch) >= batch_size: yield batch diff --git a/backend/bloom/tasks/load_dim_vessel_from_csv.py b/backend/bloom/tasks/load_dim_vessel_from_csv.py index 050f303c..eb0d3793 100644 --- a/backend/bloom/tasks/load_dim_vessel_from_csv.py +++ b/backend/bloom/tasks/load_dim_vessel_from_csv.py @@ -41,7 +41,6 @@ def map_to_domain(row: pd.Series) -> Vessel: def run(csv_file_name: str) -> None: use_cases = UseCases() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() inserted_ports = [] @@ -52,6 +51,12 @@ def run(csv_file_name: str) -> None: vessels = df.apply(map_to_domain, axis=1) with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) + print("#######################################") + print(vessel_repository.findBy(scd_date=datetime(3850,1,1))) + print("#######################################") + print(vessel_repository.get_vessel_tracked_count(scd_date=datetime(3850,1,1))) + print("#######################################") # Pour chaque enregistrement du fichier CSV for vessel in vessels: if vessel.id and vessel_repository.get_vessel_by_id(session, vessel.id): diff --git a/backend/bloom/tasks/load_spire_data_from_api.py b/backend/bloom/tasks/load_spire_data_from_api.py index 4a271abd..9a853e33 100644 --- a/backend/bloom/tasks/load_spire_data_from_api.py +++ b/backend/bloom/tasks/load_spire_data_from_api.py @@ -17,7 +17,6 @@ def run(dump_path: str) -> None: use_cases = UseCases() spire_ais_data_repository = use_cases.spire_ais_data_repository() spire_traffic_usecase = use_cases.get_spire_data_usecase() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() orm_data = [] @@ -26,6 +25,7 @@ def run(dump_path: str) -> None: current_datetime=None position_count= None with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) currentTaskTime=TaskExecutionRepository.get_point_in_time(session,"load_spire_data_from_api") if(currentTaskTime <= datetime.now(timezone.utc) - settings.api_pooling_period): vessels: list[Vessel] = vessel_repository.get_vessels_list(session) diff --git a/backend/bloom/tasks/update_vessel_data_voyage.py b/backend/bloom/tasks/update_vessel_data_voyage.py index ef4886ff..5c1da201 100644 --- a/backend/bloom/tasks/update_vessel_data_voyage.py +++ b/backend/bloom/tasks/update_vessel_data_voyage.py @@ -49,9 +49,9 @@ def map_ais_data_to_vessel_voyage(ais_data: SpireAisData, vessel: Vessel) -> Uni def run() -> None: use_cases = UseCases() spire_ais_data_repository = use_cases.spire_ais_data_repository() - vessel_repository = use_cases.vessel_repository() db = use_cases.db() with db.session() as session: + vessel_repository = use_cases.vessel_repository(session=session) point_in_time = TaskExecutionRepository.get_point_in_time(session, "update_vessel_data_voyage") logger.info(f"Point in time={point_in_time}") now = datetime.now(timezone.utc) From 5d1dfec0649ad26c861362e55a0f37505aa583d2 Mon Sep 17 00:00:00 2001 From: RV Date: Mon, 24 Mar 2025 01:15:39 +0100 Subject: [PATCH 6/6] feat: add dim_vessel_mapping + repository + alembic revision --- ..._scd_implementation_for_dim_vessel_dim_.py | 327 +++++++++++++++++- backend/bloom/config.py | 4 +- backend/bloom/container.py | 5 + backend/bloom/domain/vessel_mapping.py | 29 ++ backend/bloom/infra/database/sql_model.py | 29 ++ backend/bloom/infra/interfaces/__init__.py | 2 + .../infra/{ports => interfaces}/repository.py | 3 + backend/bloom/infra/interfaces/vessel.py | 18 + .../repositories/repository_spire_ais_data.py | 9 + .../infra/repositories/repository_vessel.py | 29 +- .../repositories/repository_vessel_mapping.py | 90 +++++ 11 files changed, 526 insertions(+), 19 deletions(-) create mode 100644 backend/bloom/domain/vessel_mapping.py create mode 100644 backend/bloom/infra/interfaces/__init__.py rename backend/bloom/infra/{ports => interfaces}/repository.py (97%) create mode 100644 backend/bloom/infra/interfaces/vessel.py create mode 100644 backend/bloom/infra/repositories/repository_vessel_mapping.py diff --git a/backend/alembic/versions/c02d2b79eab2_scd_implementation_for_dim_vessel_dim_.py b/backend/alembic/versions/c02d2b79eab2_scd_implementation_for_dim_vessel_dim_.py index b4dfdb35..f7a0c4a2 100644 --- a/backend/alembic/versions/c02d2b79eab2_scd_implementation_for_dim_vessel_dim_.py +++ b/backend/alembic/versions/c02d2b79eab2_scd_implementation_for_dim_vessel_dim_.py @@ -32,9 +32,13 @@ def upgrade() -> None: # Initialize scd_column values op.execute( (f"update dim_vessel set scd_start = '{settings.scd_past_limit.isoformat()}'," f"scd_end = '{settings.scd_future_limit.isoformat()}'," - f"scd_active = true," + f"scd_active = false," f"key = (CASE WHEN imo IS NOT NULL THEN imo::varchar(255) ELSE cfr END)" )) + op.execute( (f"update dim_vessel " + f"set scd_active = true " + f"where tracking_status = 'active'" + )) op.execute( (f"update dim_zone set scd_start = '{settings.scd_past_limit.isoformat()}'," f"scd_end = '{settings.scd_future_limit.isoformat()}'," @@ -52,10 +56,329 @@ def upgrade() -> None: op.alter_column("dim_zone","scd_end",nullable=False) op.alter_column("dim_zone","scd_active",nullable=False) op.alter_column("dim_zone","key",nullable=False) - pass + + # Adding vessel_mapping table + op.create_table("dim_vessel_mapping", + sa.Column("id", sa.Integer, primary_key=True), + sa.Column("imo", sa.Integer), + sa.Column("mmsi", sa.Integer), + sa.Column("name", sa.String), + sa.Column("country", sa.String), + + sa.Column("same_imo", sa.ARRAY(sa.Integer)), + sa.Column("same_mmsi", sa.ARRAY(sa.Integer)), + sa.Column("same_name", sa.ARRAY(sa.Integer)), + sa.Column("same_country", sa.ARRAY(sa.Integer)), + + sa.Column("appearance_first",sa.DateTime(timezone=True)), + sa.Column("appearance_last",sa.DateTime(timezone=True)), + + sa.Column("mapping_auto", sa.Integer, sa.ForeignKey("dim_vessel.id"), nullable=True), + sa.Column("mapping_manual", sa.Integer, sa.ForeignKey("dim_vessel.id"), nullable=True), + sa.Column("vessel_id", sa.Integer,sa.ForeignKey("dim_vessel.id"), nullable=True), + + sa.Column("scd_start",sa.DateTime(timezone=True)), + sa.Column("scd_end",sa.DateTime(timezone=True)), + sa.Column("scd_active",sa.Boolean), + sa.Column( + "created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now(), + ), + sa.Column("updated_at", sa.DateTime(timezone=True), onupdate=sa.func.now()) + ) + # Initialization of dim_vessel_mapping data from current spire_ais_data + op.execute(""" + insert into dim_vessel_mapping (imo,mmsi,name,country) + select distinct sad.vessel_imo , sad.vessel_mmsi, sad.vessel_name , + case sad.vessel_flag + WHEN 'AF' THEN 'AFG' + WHEN 'AL' THEN 'ALB' + WHEN 'AQ' THEN 'ATA' + WHEN 'DZ' THEN 'DZA' + WHEN 'AS' THEN 'ASM' + WHEN 'AD' THEN 'AND' + WHEN 'AO' THEN 'AGO' + WHEN 'AG' THEN 'ATG' + WHEN 'AZ' THEN 'AZE' + WHEN 'AR' THEN 'ARG' + WHEN 'AU' THEN 'AUS' + WHEN 'AT' THEN 'AUT' + WHEN 'BS' THEN 'BHS' + WHEN 'BH' THEN 'BHR' + WHEN 'BD' THEN 'BGD' + WHEN 'AM' THEN 'ARM' + WHEN 'BB' THEN 'BRB' + WHEN 'BE' THEN 'BEL' + WHEN 'BM' THEN 'BMU' + WHEN 'BT' THEN 'BTN' + WHEN 'BO' THEN 'BOL' + WHEN 'BA' THEN 'BIH' + WHEN 'BW' THEN 'BWA' + WHEN 'BV' THEN 'BVT' + WHEN 'BR' THEN 'BRA' + WHEN 'BZ' THEN 'BLZ' + WHEN 'IO' THEN 'IOT' + WHEN 'SB' THEN 'SLB' + WHEN 'VG' THEN 'VGB' + WHEN 'BN' THEN 'BRN' + WHEN 'BG' THEN 'BGR' + WHEN 'MM' THEN 'MMR' + WHEN 'BI' THEN 'BDI' + WHEN 'BY' THEN 'BLR' + WHEN 'KH' THEN 'KHM' + WHEN 'CM' THEN 'CMR' + WHEN 'CA' THEN 'CAN' + WHEN 'CV' THEN 'CPV' + WHEN 'KY' THEN 'CYM' + WHEN 'CF' THEN 'CAF' + WHEN 'LK' THEN 'LKA' + WHEN 'TD' THEN 'TCD' + WHEN 'CL' THEN 'CHL' + WHEN 'CN' THEN 'CHN' + WHEN 'TW' THEN 'TWN' + WHEN 'CX' THEN 'CXR' + WHEN 'CC' THEN 'CCK' + WHEN 'CO' THEN 'COL' + WHEN 'KM' THEN 'COM' + WHEN 'YT' THEN 'MYT' + WHEN 'CG' THEN 'COG' + WHEN 'CD' THEN 'COD' + WHEN 'CK' THEN 'COK' + WHEN 'CR' THEN 'CRI' + WHEN 'HR' THEN 'HRV' + WHEN 'CU' THEN 'CUB' + WHEN 'CY' THEN 'CYP' + WHEN 'CZ' THEN 'CZE' + WHEN 'BJ' THEN 'BEN' + WHEN 'DK' THEN 'DNK' + WHEN 'DM' THEN 'DMA' + WHEN 'DO' THEN 'DOM' + WHEN 'EC' THEN 'ECU' + WHEN 'SV' THEN 'SLV' + WHEN 'GQ' THEN 'GNQ' + WHEN 'ET' THEN 'ETH' + WHEN 'ER' THEN 'ERI' + WHEN 'EE' THEN 'EST' + WHEN 'FO' THEN 'FRO' + WHEN 'FK' THEN 'FLK' + WHEN 'GS' THEN 'SGS' + WHEN 'FJ' THEN 'FJI' + WHEN 'FI' THEN 'FIN' + WHEN 'AX' THEN 'ALA' + WHEN 'FR' THEN 'FRA' + WHEN 'GF' THEN 'GUF' + WHEN 'PF' THEN 'PYF' + WHEN 'TF' THEN 'ATF' + WHEN 'DJ' THEN 'DJI' + WHEN 'GA' THEN 'GAB' + WHEN 'GE' THEN 'GEO' + WHEN 'GM' THEN 'GMB' + WHEN 'PS' THEN 'PSE' + WHEN 'DE' THEN 'DEU' + WHEN 'GH' THEN 'GHA' + WHEN 'GI' THEN 'GIB' + WHEN 'KI' THEN 'KIR' + WHEN 'GR' THEN 'GRC' + WHEN 'GL' THEN 'GRL' + WHEN 'GD' THEN 'GRD' + WHEN 'GP' THEN 'GLP' + WHEN 'GU' THEN 'GUM' + WHEN 'GT' THEN 'GTM' + WHEN 'GN' THEN 'GIN' + WHEN 'GY' THEN 'GUY' + WHEN 'HT' THEN 'HTI' + WHEN 'HM' THEN 'HMD' + WHEN 'VA' THEN 'VAT' + WHEN 'HN' THEN 'HND' + WHEN 'HK' THEN 'HKG' + WHEN 'HU' THEN 'HUN' + WHEN 'IS' THEN 'ISL' + WHEN 'IN' THEN 'IND' + WHEN 'ID' THEN 'IDN' + WHEN 'IR' THEN 'IRN' + WHEN 'IQ' THEN 'IRQ' + WHEN 'IE' THEN 'IRL' + WHEN 'IL' THEN 'ISR' + WHEN 'IT' THEN 'ITA' + WHEN 'CI' THEN 'CIV' + WHEN 'JM' THEN 'JAM' + WHEN 'JP' THEN 'JPN' + WHEN 'KZ' THEN 'KAZ' + WHEN 'JO' THEN 'JOR' + WHEN 'KE' THEN 'KEN' + WHEN 'KP' THEN 'PRK' + WHEN 'KR' THEN 'KOR' + WHEN 'KW' THEN 'KWT' + WHEN 'KG' THEN 'KGZ' + WHEN 'LA' THEN 'LAO' + WHEN 'LB' THEN 'LBN' + WHEN 'LS' THEN 'LSO' + WHEN 'LV' THEN 'LVA' + WHEN 'LR' THEN 'LBR' + WHEN 'LY' THEN 'LBY' + WHEN 'LI' THEN 'LIE' + WHEN 'LT' THEN 'LTU' + WHEN 'LU' THEN 'LUX' + WHEN 'MO' THEN 'MAC' + WHEN 'MG' THEN 'MDG' + WHEN 'MW' THEN 'MWI' + WHEN 'MY' THEN 'MYS' + WHEN 'MV' THEN 'MDV' + WHEN 'ML' THEN 'MLI' + WHEN 'MT' THEN 'MLT' + WHEN 'MQ' THEN 'MTQ' + WHEN 'MR' THEN 'MRT' + WHEN 'MU' THEN 'MUS' + WHEN 'MX' THEN 'MEX' + WHEN 'MC' THEN 'MCO' + WHEN 'MN' THEN 'MNG' + WHEN 'MD' THEN 'MDA' + WHEN 'ME' THEN 'MNE' + WHEN 'MS' THEN 'MSR' + WHEN 'MA' THEN 'MAR' + WHEN 'MZ' THEN 'MOZ' + WHEN 'OM' THEN 'OMN' + WHEN 'NA' THEN 'NAM' + WHEN 'NR' THEN 'NRU' + WHEN 'NP' THEN 'NPL' + WHEN 'NL' THEN 'NLD' + WHEN 'CW' THEN 'CUW' + WHEN 'AW' THEN 'ABW' + WHEN 'SX' THEN 'SXM' + WHEN 'BQ' THEN 'BES' + WHEN 'NC' THEN 'NCL' + WHEN 'VU' THEN 'VUT' + WHEN 'NZ' THEN 'NZL' + WHEN 'NI' THEN 'NIC' + WHEN 'NE' THEN 'NER' + WHEN 'NG' THEN 'NGA' + WHEN 'NU' THEN 'NIU' + WHEN 'NF' THEN 'NFK' + WHEN 'NO' THEN 'NOR' + WHEN 'MP' THEN 'MNP' + WHEN 'UM' THEN 'UMI' + WHEN 'FM' THEN 'FSM' + WHEN 'MH' THEN 'MHL' + WHEN 'PW' THEN 'PLW' + WHEN 'PK' THEN 'PAK' + WHEN 'PA' THEN 'PAN' + WHEN 'PG' THEN 'PNG' + WHEN 'PY' THEN 'PRY' + WHEN 'PE' THEN 'PER' + WHEN 'PH' THEN 'PHL' + WHEN 'PN' THEN 'PCN' + WHEN 'PL' THEN 'POL' + WHEN 'PT' THEN 'PRT' + WHEN 'GW' THEN 'GNB' + WHEN 'TL' THEN 'TLS' + WHEN 'PR' THEN 'PRI' + WHEN 'QA' THEN 'QAT' + WHEN 'RE' THEN 'REU' + WHEN 'RO' THEN 'ROU' + WHEN 'RU' THEN 'RUS' + WHEN 'RW' THEN 'RWA' + WHEN 'BL' THEN 'BLM' + WHEN 'SH' THEN 'SHN' + WHEN 'KN' THEN 'KNA' + WHEN 'AI' THEN 'AIA' + WHEN 'LC' THEN 'LCA' + WHEN 'MF' THEN 'MAF' + WHEN 'PM' THEN 'SPM' + WHEN 'VC' THEN 'VCT' + WHEN 'SM' THEN 'SMR' + WHEN 'ST' THEN 'STP' + WHEN 'SA' THEN 'SAU' + WHEN 'SN' THEN 'SEN' + WHEN 'RS' THEN 'SRB' + WHEN 'SC' THEN 'SYC' + WHEN 'SL' THEN 'SLE' + WHEN 'SG' THEN 'SGP' + WHEN 'SK' THEN 'SVK' + WHEN 'VN' THEN 'VNM' + WHEN 'SI' THEN 'SVN' + WHEN 'SO' THEN 'SOM' + WHEN 'ZA' THEN 'ZAF' + WHEN 'ZW' THEN 'ZWE' + WHEN 'ES' THEN 'ESP' + WHEN 'SS' THEN 'SSD' + WHEN 'SD' THEN 'SDN' + WHEN 'EH' THEN 'ESH' + WHEN 'SR' THEN 'SUR' + WHEN 'SJ' THEN 'SJM' + WHEN 'SZ' THEN 'SWZ' + WHEN 'SE' THEN 'SWE' + WHEN 'CH' THEN 'CHE' + WHEN 'SY' THEN 'SYR' + WHEN 'TJ' THEN 'TJK' + WHEN 'TH' THEN 'THA' + WHEN 'TG' THEN 'TGO' + WHEN 'TK' THEN 'TKL' + WHEN 'TO' THEN 'TON' + WHEN 'TT' THEN 'TTO' + WHEN 'AE' THEN 'ARE' + WHEN 'TN' THEN 'TUN' + WHEN 'TR' THEN 'TUR' + WHEN 'TM' THEN 'TKM' + WHEN 'TC' THEN 'TCA' + WHEN 'TV' THEN 'TUV' + WHEN 'UG' THEN 'UGA' + WHEN 'UA' THEN 'UKR' + WHEN 'MK' THEN 'MKD' + WHEN 'EG' THEN 'EGY' + WHEN 'GB' THEN 'GBR' + WHEN 'GG' THEN 'GGY' + WHEN 'JE' THEN 'JEY' + WHEN 'IM' THEN 'IMN' + WHEN 'TZ' THEN 'TZA' + WHEN 'US' THEN 'USA' + WHEN 'VI' THEN 'VIR' + WHEN 'BF' THEN 'BFA' + WHEN 'UY' THEN 'URY' + WHEN 'UZ' THEN 'UZB' + WHEN 'VE' THEN 'VEN' + WHEN 'WF' THEN 'WLF' + WHEN 'WS' THEN 'WSM' + WHEN 'YE' THEN 'YEM' + WHEN 'ZM' THEN 'ZMB' + else '???' END + from spire_ais_data sad + """) + # Add vessel_mapping for past excursion without spire_ais_data (archive,...) + op.execute(""" + insert into dim_vessel_mapping (imo,mmsi,"name",country) + (select imo, mmsi, ship_name, country_iso3 + from fct_excursion + join dim_vessel on fct_excursion.vessel_id = dim_vessel.id + where dim_vessel.id not in (select vessel_id from dim_vessel_mapping)) + """) + op.execute(""" + update dim_vessel_mapping + set mapping_auto = (select id from dim_vessel where dim_vessel.mmsi = dim_vessel_mapping.mmsi and dim_vessel.tracking_status='active'), + vessel_id = (select id from dim_vessel where dim_vessel.mmsi = dim_vessel_mapping.mmsi and dim_vessel.tracking_status='active'), + scd_start = (select scd_start from dim_vessel where dim_vessel.mmsi = dim_vessel_mapping.mmsi and dim_vessel.tracking_status='active'), + scd_end = (select scd_end from dim_vessel where dim_vessel.mmsi = dim_vessel_mapping.mmsi and dim_vessel.tracking_status='active'), + scd_active = (select scd_active from dim_vessel where dim_vessel.mmsi = dim_vessel_mapping.mmsi and dim_vessel.tracking_status='active'); """) + # Adding vessel_mapping_id to fct_excursion + op.add_column("fct_excursion", + sa.Column('vessel_mapping_id', + sa.Integer, + sa.ForeignKey("dim_vessel_mapping.id"), + nullable=True)) + + # Initializing fct_excursion.vessel_mapping_id from dim_vessel_mapping.auto_mapping + op.execute(""" + update fct_excursion + set vessel_mapping_id = + ( select id from dim_vessel_mapping + where vessel_id = fct_excursion.vessel_id limit 1); + """) + + op.alter_column("fct_excursion","vessel_mapping_id",nullable=False) def downgrade() -> None: + op.drop_column("fct_excursion","vessel_mapping_id") + op.drop_table("dim_vessel_mapping") + op.drop_column("dim_zone","scd_start") op.drop_column("dim_zone","scd_end") op.drop_column("dim_zone","scd_active") diff --git a/backend/bloom/config.py b/backend/bloom/config.py index 962b8261..56d4a8f7 100644 --- a/backend/bloom/config.py +++ b/backend/bloom/config.py @@ -67,7 +67,9 @@ class Settings(BaseSettings): second=0, microsecond=0, tzinfo=timezone.utc)) - scd_future_limit: datetime = Field(default=datetime(year=2999, + # scd_future_limit doit être < 2262 + # Cf: https://numpy.org/doc/stable/reference/arrays.datetime.html#datetime-units + scd_future_limit: datetime = Field(default=datetime(year=2199, month=12, day=31, hour=23, diff --git a/backend/bloom/container.py b/backend/bloom/container.py index 092ef28a..cb48e882 100644 --- a/backend/bloom/container.py +++ b/backend/bloom/container.py @@ -10,6 +10,7 @@ from bloom.infra.repositories.repository_segment import SegmentRepository from bloom.infra.repositories.repository_zone import ZoneRepository from bloom.infra.repositories.repository_metrics import MetricsRepository +from bloom.infra.repositories.repository_vessel_mapping import VesselMappingRepository from bloom.services.GetVesselsFromSpire import GetVesselsFromSpire from bloom.services.metrics import MetricsService @@ -38,6 +39,10 @@ class UseCases(containers.DeclarativeContainer): VesselRepository ) + vessel_mapping_repository = providers.Factory( + VesselMappingRepository + ) + alert_repository = providers.Factory( RepositoryAlert, session_factory=db.provided.session, diff --git a/backend/bloom/domain/vessel_mapping.py b/backend/bloom/domain/vessel_mapping.py new file mode 100644 index 00000000..ba9dda0f --- /dev/null +++ b/backend/bloom/domain/vessel_mapping.py @@ -0,0 +1,29 @@ +from datetime import datetime +from typing import Optional, ClassVar +from .vessel import VesselListView + +from pydantic import BaseModel + + +class VesselMapping(BaseModel): + id: Optional[int] = None + imo: Optional[int] = None + mmsi: Optional[int] = None + name: Optional[str] = None + country: Optional[str] = None + + same_imo: Optional[list[int]] = None + same_mmsi: Optional[list[int]] = None + same_name: Optional[list[int]] = None + same_country: Optional[list[int]] = None + + appearance_first:Optional[datetime] = None + appearance_last:Optional[datetime] = None + + mapping_auto: Optional[VesselListView] = None + mapping_manual: Optional[VesselListView] = None + vessel: Optional[VesselListView] = None + + scd_start: Optional[datetime] = None + scd_end: Optional[datetime] = None + scd_active: Optional[bool] = None diff --git a/backend/bloom/infra/database/sql_model.py b/backend/bloom/infra/database/sql_model.py index 58cf95d6..02512728 100644 --- a/backend/bloom/infra/database/sql_model.py +++ b/backend/bloom/infra/database/sql_model.py @@ -2,6 +2,7 @@ from bloom.infra.database.database_manager import Base from geoalchemy2 import Geometry from sqlalchemy import ( + ARRAY, Boolean, Column, DateTime, @@ -50,6 +51,34 @@ class Vessel(Base): scd_active = Column("scd_active",Boolean) +class VesselMapping(Base): + __tablename__ = "dim_vessel_mapping" + id = Column("id", Integer, primary_key=True) + imo = Column("imo", Integer) + mmsi = Column("mmsi", Integer) + name = Column("ship_name", String, nullable=False) + country = Column("country", String, nullable=False) + + same_imo= Column("same_imo", ARRAY(Integer)) + same_mmsi= Column("same_mmsi", ARRAY(Integer)) + same_name= Column("same_name", ARRAY(Integer)) + same_country= Column("same_country", ARRAY(Integer)) + + appearance_first= Column("appearance_first",DateTime(timezone=True)) + appearance_last= Column("appearance_last",DateTime(timezone=True)) + + mapping_auto= Column("mapping_auto", Integer, ForeignKey("dim_vessel.id"), nullable=True) + mapping_manual= Column("mapping_manual", Integer, ForeignKey("dim_vessel.id"), nullable=True) + vessel= Column("vessel", Integer, ForeignKey("dim_vessel.id"), nullable=True) + + scd_start = Column("scd_start",DateTime(timezone=True)) + scd_end = Column("scd_end",DateTime(timezone=True)) + scd_active = Column("scd_active",Boolean) + created_at = Column( + "created_at", DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + class Alert(Base): __tablename__ = "alert" id = Column("id", Integer, primary_key=True, index=True) diff --git a/backend/bloom/infra/interfaces/__init__.py b/backend/bloom/infra/interfaces/__init__.py new file mode 100644 index 00000000..7acd3fd0 --- /dev/null +++ b/backend/bloom/infra/interfaces/__init__.py @@ -0,0 +1,2 @@ +from .repository import * +from .vessel import AbstractVesselRepository, AbstractVesselMappingRepository \ No newline at end of file diff --git a/backend/bloom/infra/ports/repository.py b/backend/bloom/infra/interfaces/repository.py similarity index 97% rename from backend/bloom/infra/ports/repository.py rename to backend/bloom/infra/interfaces/repository.py index 2f3bd81d..86dcb0e8 100644 --- a/backend/bloom/infra/ports/repository.py +++ b/backend/bloom/infra/interfaces/repository.py @@ -15,6 +15,9 @@ class SCDDateOverlapError(SCDRepositoryError): ... class AbstractRepositoryClient(): pass +def exclude_keys(d:dict,keys:list[str]): + return {x: d[x] for x in d if x not in keys} + def construct_findBy_statement(model_type, offset=None, limit=None, diff --git a/backend/bloom/infra/interfaces/vessel.py b/backend/bloom/infra/interfaces/vessel.py new file mode 100644 index 00000000..20657ddc --- /dev/null +++ b/backend/bloom/infra/interfaces/vessel.py @@ -0,0 +1,18 @@ +from .repository import AbstractRepository +import abc +from typing import TypeVar, Generic, Optional, List +from datetime import datetime + +DOMAIN=TypeVar("DOMAIN") + +class AbstractVesselRepository(AbstractRepository[DOMAIN],Generic[DOMAIN]): + pass + +class AbstractVesselMappingRepository(AbstractRepository[DOMAIN],Generic[DOMAIN]): + def get( + imo:Optional[int]=None, + mmsi:Optional[int]=None, + country:Optional[int]=None, + scd_date:Optional[datetime]=None + ): + pass \ No newline at end of file diff --git a/backend/bloom/infra/repositories/repository_spire_ais_data.py b/backend/bloom/infra/repositories/repository_spire_ais_data.py index 4b8c27b4..ea95bec2 100644 --- a/backend/bloom/infra/repositories/repository_spire_ais_data.py +++ b/backend/bloom/infra/repositories/repository_spire_ais_data.py @@ -84,6 +84,15 @@ def get_all_data_after_date( ], ) + def get_data_spire_updated_between(self, session: Session,spire_updated_after:datetime, + spire_updated_before:datetime)-> pd.DataFrame: + stmt = (select(sql_model.SpireAisData) + .where(sql_model.SpireAisData.spire_update_statement.between(spire_updated_after,spire_updated_before)) + .order_by(sql_model.SpireAisData.spire_update_statement.asc()) + ) + data= pd.DataFrame([v.__dict__ for v in session.execute(stmt).scalars()]) + return data + def get_all_data_between_date( self, session: Session, created_updated_after: datetime, created_updated_before: datetime ) -> pd.DataFrame: diff --git a/backend/bloom/infra/repositories/repository_vessel.py b/backend/bloom/infra/repositories/repository_vessel.py index c470e58f..71be9608 100644 --- a/backend/bloom/infra/repositories/repository_vessel.py +++ b/backend/bloom/infra/repositories/repository_vessel.py @@ -6,23 +6,20 @@ from bloom.domain.metrics import VesselTimeInZone from bloom.infra.database import sql_model from dependency_injector.providers import Callable -from sqlalchemy import func, select, update, and_, asc, desc, literal_column, between +from sqlalchemy import (func, select, update, and_, asc, desc, literal_column, + between, bindparam) from sqlalchemy.orm import Session from bloom.routers.requests import (DatetimeRangeRequest, OrderByRequest, OrderByEnum) from bloom.config import settings -from bloom.infra.ports.repository import ( - AbstractRepository, - AbstractScdRepositoryMixIn, - AbstractSqlAlchemyMixIn +from bloom.infra.interfaces.repository import (construct_findBy_scd_statement ) -from bloom.infra.ports.repository import (AbstractRepository, - construct_findBy_scd_statement +from bloom.infra.interfaces.vessel import (AbstractVesselRepository ) -class VesselRepository(AbstractRepository[Vessel]): +class VesselRepository(AbstractVesselRepository[Vessel]): session:Session def __init__(self, session:Session): self.session=session @@ -37,7 +34,7 @@ def findBy(self, limit=limit, scd_date=scd_date, **filters) - return [v for v in self.session.execute(stmt).scalars()] + return [self.map_to_domain(v) for v in self.session.execute(stmt).scalars()] def get_vessel_tracked_count(self, scd_date:Optional[datetime]=None, @@ -226,6 +223,7 @@ def get_vessel_times_in_zones( self, return result + def batch_create_vessel(self, session: Session, vessels: list[Vessel] ) -> list[Vessel]: @@ -234,13 +232,12 @@ def batch_create_vessel(self, session: Session, return [VesselRepository.map_to_domain(orm) for orm in orm_list] def batch_update_vessel(self, session: Session, vessels: list[Vessel]) -> None: - updates = [{"id": v.id, "mmsi": v.mmsi, "ship_name": v.ship_name, "width": v.width, "length": v.length, - "country_iso3": v.country_iso3, "type": v.type, "imo": v.imo, "cfr": v.cfr, - "external_marking": v.external_marking, - "ircs": v.ircs, "tracking_activated": v.tracking_activated, "tracking_status": v.tracking_status, - "home_port_id": v.home_port_id} for v in - vessels] - session.execute(update(sql_model.Vessel), updates) + # exclude_keys enlève la colonne "created_at" censée déjà exister + # en base de données puisque c'est une update + # de plus created est souvent non présente ou null dans les fichiers CSV + updates = [exclude_keys(v.__dict__,["created_at"]) for v in vessels] + session.execute(update(sql_model.Vessel), + updates) def set_tracking(self, session: Session, vessel_ids: list[int], tracking_activated: bool, tracking_status: str) -> None: diff --git a/backend/bloom/infra/repositories/repository_vessel_mapping.py b/backend/bloom/infra/repositories/repository_vessel_mapping.py new file mode 100644 index 00000000..7a8e9d60 --- /dev/null +++ b/backend/bloom/infra/repositories/repository_vessel_mapping.py @@ -0,0 +1,90 @@ +from contextlib import AbstractContextManager +from typing import Any, Generator, Union, Optional +from datetime import datetime + +from bloom.domain.vessel_mapping import VesselMapping +from bloom.domain.metrics import VesselTimeInZone +from bloom.infra.database import sql_model +from dependency_injector.providers import Callable +from sqlalchemy import (func, select, update, and_, asc, desc, literal_column, + between, bindparam) +from sqlalchemy.orm import Session +from bloom.routers.requests import (DatetimeRangeRequest, + OrderByRequest, + OrderByEnum) +from bloom.config import settings +from bloom.infra.interfaces.repository import (AbstractRepository, + construct_findBy_scd_statement, + exclude_keys +) +from bloom.infra.interfaces.vessel import (AbstractVesselMappingRepository) +from bloom.infra.repositories.repository_vessel import VesselRepository + + +class VesselMappingRepository(AbstractVesselMappingRepository[VesselMapping]): + session:Session + def __init__(self, session:Session): + self.session=session + + def findBy(self, + offset=None, + limit=None, + scd_date:Optional[datetime]=None, + **filters): + stmt = construct_findBy_scd_statement(sql_model.Vessel, + offset=offset, + limit=limit, + scd_date=scd_date, + **filters) + return [v for v in self.session.execute(stmt).scalars()] + + + @staticmethod + def map_to_domain(model: sql_model.VesselMapping) -> VesselMapping: + return VesselMapping( + id=model.id, + imo=model.imo, + mmsi=model.mmsi, + name=model.name, + country=model.country, + + same_imo=model.same_imo, + same_mmsi=model.same_mmsi, + same_name=model.same_name, + same_country=model.same_country, + + appearance_first=model.appearance_first, + appearance_last=model.appearance_last, + + mapping_auto=model.mapping_auto, + mapping_manual=model.mapping_manual, + vessel=VesselRepository.map_to_domain(model.vessel), + + scd_start=model.scd_start, + scd_end=model.scd_end, + scd_active=model.scd_active, + ) + + @staticmethod + def map_to_sql(vessel: VesselMapping) -> sql_model.VesselMapping: + return sql_model.VesselMapping( + id=vessel.id, + imo=vessel.imo, + mmsi=vessel.mmsi, + name=vessel.name, + country=vessel.country, + + same_imo=vessel.same_imo, + same_mmsi=vessel.same_mmsi, + same_name=vessel.same_name, + same_country=vessel.same_country, + + appearance_first=vessel.appearance_first, + appearance_last=vessel.appearance_last, + + vessel=VesselRepository.map_to_sql(vessel.vessel), + + scd_start=vessel.scd_start, + scd_end=vessel.scd_end, + scd_active=vessel.scd_active, + )