Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions backend/alembic/versions/c16b5dd42d2c_create_kpler_ais_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""create_kpler_ais_data

Revision ID: c16b5dd42d2c
Revises: 5801cb8f1af5
Create Date: 2026-03-31 11:43:44.681915

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import func
from sqlalchemy.dialects.postgresql import JSONB

# revision identifiers, used by Alembic.
revision = 'c16b5dd42d2c'
down_revision = '5801cb8f1af5'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"kpler_ais_data",
sa.Column("id", sa.Integer, sa.Identity(), primary_key=True),
sa.Column("position_id", sa.Integer),
sa.Column("vessel_uid", sa.Integer),
sa.Column("vessel_flag", sa.String),
sa.Column("vessel_name", sa.String),
sa.Column("vessel_callsign", sa.String),
sa.Column("vessel_mmsi", sa.Integer),
sa.Column("vessel_imo", sa.Integer),
sa.Column("vessel_marinetraffic_type", sa.String),
sa.Column("vessel_ais_type", sa.Integer),
sa.Column("vessel_width", sa.Double),
sa.Column("vessel_length", sa.Double),
sa.Column("vessel_grt", sa.Double),
sa.Column("vessel_dwt", sa.Double),
sa.Column("static_timestamp", sa.DateTime(timezone=True)),
sa.Column("static_source", sa.String),
sa.Column("static_message_type", sa.Integer),
sa.Column("position_message_type", sa.Integer),
sa.Column("position_source", sa.String),
sa.Column("position_course", sa.Double),
sa.Column("position_heading", sa.Double),
sa.Column("position_longitude", sa.Double),
sa.Column("position_latitude", sa.Double),
sa.Column("position_navigational_status", sa.Integer),
sa.Column("position_rot", sa.Double),
sa.Column("position_speed", sa.Double),
sa.Column("position_timestamp", sa.DateTime(timezone=True)),
sa.Column("position_kpler_insert_timestamp", sa.DateTime(timezone=True)),
sa.Column("voyage_destination", sa.String),
sa.Column("voyage_draught", sa.Double),
sa.Column("voyage_eta", sa.DateTime(timezone=True)),
sa.Column("payload", JSONB),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=func.now())
)

op.create_index("i_kpler_ais_data_static_timesamp", "kpler_ais_data", ["static_timestamp"])
op.create_index(
"i_kpler_ais_data_position_timesamp",
"kpler_ais_data",
["position_timestamp"],
)


def downgrade() -> None:
op.drop_table("kpler_ais_data")
4 changes: 4 additions & 0 deletions backend/bloom/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class Settings(BaseSettings):
redis_password: str = Field(default='bloom',min_length=1)
redis_cache_expiration: int = Field(default=900)

kpler_api_root: str = Field(default='')
kpler_token: str = Field(default='')
messages_page_size: int = Field(default = 100)

api_pooling_period: timedelta = Field(default=timedelta(minutes=2))

logging_level:str=Field(
Expand Down
6 changes: 6 additions & 0 deletions backend/bloom/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from bloom.infra.database.database_manager import Database
from bloom.infra.repositories.repository_alert import RepositoryAlert
from bloom.infra.repositories.repository_excursion import ExcursionRepository
from bloom.infra.repositories.repository_kpler_ais_data import KplerAisDataRepository
from bloom.infra.repositories.repository_port import PortRepository
from bloom.infra.repositories.repository_raster import RepositoryRaster
from bloom.infra.repositories.repository_spire_ais_data import SpireAisDataRepository
Expand Down Expand Up @@ -74,6 +75,11 @@ class UseCases(containers.DeclarativeContainer):
session_factory=db.provided.session,
)

kpler_ais_data_repository = providers.Factory(
KplerAisDataRepository,
session_factory=db.provided.session,
)

segment_repository = providers.Factory(
SegmentRepository,
session_factory=db.provided.session,
Expand Down
80 changes: 80 additions & 0 deletions backend/bloom/domain/kpler_ais_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from datetime import datetime
from typing import Any

from pydantic import BaseModel
from functools import reduce

from typing import Union

class KplerAisData(BaseModel):
id: Union[int, None] = None # noqa: UP007
position_id: Union[int, None] = None # noqa: UP007
vessel_uid: Union[int, None] = None # noqa: UP007
vessel_flag: Union[str, None] = None # noqa: UP007
vessel_name: Union[str, None] = None # noqa: UP007
vessel_callsign: Union[str, None] = None # noqa: UP007
vessel_mmsi: Union[int] = None # noqa: UP007
vessel_imo: Union[int, None] = None # noqa: UP007
vessel_marinetraffic_type: Union[str, None] = None # noqa: UP007
vessel_ais_type: Union[int, None] = None # noqa: UP007
vessel_width: Union[float, None] = None # noqa: UP007
vessel_length: Union[float, None] = None # noqa: UP007
vessel_grt: Union[float, None] = None # noqa: UP007
vessel_dwt: Union[float, None] = None # noqa: UP007
static_timestamp: Union[datetime, None] = None # noqa: UP007
static_source: Union[str, None] = None # noqa: UP007
static_message_type: Union[int, None] = None # noqa: UP007
position_message_type: Union[int, None] = None # noqa: UP007
position_source: Union[str, None] = None # noqa: UP007
position_course: Union[float, None] = None # noqa: UP007
position_heading: Union[float, None] = None # noqa: UP007
position_longitude: Union[float, None] = None # noqa: UP007
position_latitude: Union[float, None] = None # noqa: UP007
position_navigational_status: Union[int, None] = None # noqa: UP007
position_rot: Union[float, None] = None # noqa: UP007
position_speed: Union[float, None] = None # noqa: UP007
position_timestamp: Union[datetime, None] = None # noqa: UP007
position_kpler_insert_timestamp: Union[datetime, None] = None # noqa: UP007
voyage_destination: Union[str, None] = None # noqa: UP007
voyage_draught: Union[float, None] = None # noqa: UP007
voyage_eta: Union[datetime, None] = None # noqa: UP007
payload: Union[dict, None] = None
created_at: Union[datetime, None] = None # noqa: UP007

def map_from_kpler(message: dict[str, Any]): # noqa: ANN201
def deep_get(dictionary: dict[str, Any], *keys) -> str:
return reduce(lambda d, key: d.get(key) if d else None, keys, dictionary)

return KplerAisData(
position_id=message["id"] if "id" in message.keys() else None,
vessel_uid=deep_get(message, "properties", "vesselUid") if "properties" in message.keys() and "vesselUid" in message["properties"].keys() else None,
vessel_flag=deep_get(message, "properties", "flag") if "properties" in message.keys() and "flag" in message["properties"].keys() else None,
vessel_name=deep_get(message, "properties", "vesselName") if "properties" in message.keys() and "vesselName" in message["properties"].keys() else None,
vessel_callsign=deep_get(message, "properties", "callsign") if "properties" in message.keys() and "callsign" in message["properties"].keys() else None,
vessel_mmsi=deep_get(message, "properties", "mmsi") if "properties" in message.keys() and "mmsi" in message["properties"].keys() else None,
vessel_imo=deep_get(message, "properties", "imo") if "properties" in message.keys() and "imo" in message["properties"].keys() else None,
vessel_marinetraffic_type=deep_get(message, "properties", "vesselType") if "properties" in message.keys() and "vesselType" in message["properties"].keys() else None,
vessel_ais_type=deep_get(message, "properties", "vesselTypeAis") if "properties" in message.keys() and "vesselTypeAis" in message["properties"].keys() else None,
vessel_width=deep_get(message, "properties", "width") if "properties" in message.keys() and "width" in message["properties"].keys() else None,
vessel_length=deep_get(message, "properties", "length") if "properties" in message.keys() and "length" in message["properties"].keys() else None,
vessel_grt=deep_get(message, "properties", "grt") if "properties" in message.keys() and "grt" in message["properties"].keys() else None,
vessel_dwt=deep_get(message, "properties", "dwt") if "properties" in message.keys() and "dwt" in message["properties"].keys() else None,
static_timestamp=deep_get(message, "properties", "staticDt") if "properties" in message.keys() and "staticDt" in message["properties"].keys() else None,
static_source=deep_get(message, "properties", "staticSrc") if "properties" in message.keys() and "staticSrc" in message["properties"].keys() else None,
static_message_type=deep_get(message, "properties", "staticMsgType") if "properties" in message.keys() and "staticMsgType" in message["properties"].keys() else None,
position_message_type=deep_get(message, "properties", "posMsgType") if "properties" in message.keys() and "posMsgType" in message["properties"].keys() else None,
position_source=deep_get(message, "properties", "posSrc") if "properties" in message.keys() and "posSrc" in message["properties"].keys() else None,
position_course=deep_get(message, "properties", "cog") if "properties" in message.keys() and "cog" in message["properties"].keys() else None,
position_heading=deep_get(message, "properties", "heading") if "properties" in message.keys() and "heading" in message["properties"].keys() else None,
position_longitude=deep_get(message, "properties", "longitude") if "properties" in message.keys() and "longitude" in message["properties"].keys() else None,
position_latitude=deep_get(message, "properties", "latitude") if "properties" in message.keys() and "latitude" in message["properties"].keys() else None,
position_navigational_status=deep_get(message, "properties", "navStatus") if "properties" in message.keys() and "navStatus" in message["properties"].keys() else None,
position_rot=deep_get(message, "properties", "rot") if "properties" in message.keys() and "rot" in message["properties"].keys() else None,
position_speed=deep_get(message, "properties", "sog") if "properties" in message.keys() and "sog" in message["properties"].keys() else None,
position_timestamp=deep_get(message, "properties", "posDt") if "properties" in message.keys() and "posDt" in message["properties"].keys() else None,
position_kpler_insert_timestamp=deep_get(message, "properties", "insertDt") if "properties" in message.keys() and "insertDt" in message["properties"].keys() else None,
voyage_destination=deep_get(message, "properties", "destination") if "properties" in message.keys() and "destination" in message["properties"].keys() else None,
voyage_draught=deep_get(message, "properties", "draught") if "properties" in message.keys() and "draught" in message["properties"].keys() else None,
voyage_eta=deep_get(message, "properties", "eta") if "properties" in message.keys() and "eta" in message["properties"].keys() else None,
payload=message
)
37 changes: 37 additions & 0 deletions backend/bloom/infra/database/sql_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,43 @@ class SpireAisData(Base):
voyage_update_timestamp = Column("voyage_update_timestamp", DateTime(timezone=True))
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())

class KplerAisData(Base):
__tablename__ = "kpler_ais_data"

id = Column("id", Integer, primary_key=True)
position_id = Column("position_id", Integer)
vessel_uid = Column("vessel_uid", Integer)
vessel_flag = Column("vessel_flag", String)
vessel_name = Column("vessel_name", String)
vessel_callsign = Column("vessel_callsign", String)
vessel_mmsi = Column("vessel_mmsi", Integer)
vessel_imo = Column("vessel_imo", Integer)
vessel_marinetraffic_type = Column("vessel_marinetraffic_type", String)
vessel_ais_type = Column("vessel_ais_type", Integer)
vessel_width = Column("vessel_width", Double)
vessel_length = Column("vessel_length", Double)
vessel_grt = Column("vessel_grt", Double)
vessel_dwt = Column("vessel_dwt", Double)
static_timestamp = Column("static_timestamp", DateTime(timezone=True))
static_source = Column("static_source", String)
static_message_type = Column("static_message_type", Integer)
position_message_type = Column("position_message_type", Integer)
position_source = Column("position_source", String)
position_course = Column("position_course", Double)
position_heading = Column("position_heading", Double)
position_longitude = Column("position_longitude", Double)
position_latitude = Column("position_latitude", Double)
position_navigational_status = Column("position_navigational_status", Integer)
position_rot = Column("position_rot", Double)
position_speed = Column("position_speed", Double)
position_timestamp = Column("position_timestamp", DateTime(timezone=True))
position_kpler_insert_timestamp = Column("position_kpler_insert_timestamp", DateTime(timezone=True))
voyage_destination = Column("voyage_destination", String)
voyage_draught = Column("voyage_draught", Double)
voyage_eta = Column("voyage_eta", DateTime(timezone=True))
payload = Column("payload", JSONB)
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())


class Zone(Base):
__tablename__ = "dim_zone"
Expand Down
11 changes: 11 additions & 0 deletions backend/bloom/infra/http/kpler_api_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Any
from bloom.domain.kpler_ais_data import KplerAisData
from gql import Client, gql

from bloom.logger import logger

def map_raw_messages_to_domain(raw_messages: list[dict[str, Any]]) -> list[KplerAisData]:
kpler_ais_data = []
for vessel in raw_messages:
kpler_ais_data.append(KplerAisData.map_from_kpler(vessel))
return kpler_ais_data
39 changes: 39 additions & 0 deletions backend/bloom/infra/kpler_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from bloom.config import settings
from typing import Any
import aiohttp
from urllib.parse import urljoin
from bloom.logger import logger


class KplerApiError(Exception):
pass

class KplerApiClient:
FORMAT = 'json'

def __init__(self):
self.api_root = settings.kpler_api_root
self.api_token = settings.kpler_token
conn = aiohttp.TCPConnector()
self.http_session = aiohttp.ClientSession(connector=conn)

async def get(self, endpoint: str, params=None, **kwargs) -> Any:
if params is None:
params = {}
params = params | {"format": KplerApiClient.FORMAT} | kwargs
headers={"Authorization": "Basic " + self.api_token,
"Accept": "application/json"}
path = urljoin(self.api_root, endpoint)

logger.debug(f"GET {path}, params={params}")
async with self.http_session.get(path, params=params, headers=headers) as resp:
if resp.status == 204:
return None
elif resp.ok:
response = await resp.json()
return response
else:
resp.raise_for_status()

async def close(self):
await self.http_session.close()
60 changes: 60 additions & 0 deletions backend/bloom/infra/repositories/repository_kpler_ais_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from bloom.domain.kpler_ais_data import KplerAisData
import pandas as pd
from bloom.infra.database import sql_model
from dependency_injector.providers import Callable
from sqlalchemy.orm import Session


class KplerAisDataRepository:

def __init__(self, session_factory: Callable) -> None:
self.session_factory = session_factory

def batch_create_ais_data(
self, ais_list: list[KplerAisData], session: Session
) -> list[KplerAisData]:
orm_list = [KplerAisDataRepository.map_to_orm(ais) for ais in ais_list]
session.add_all(orm_list)
return [KplerAisDataRepository.map_to_domain(orm) for orm in orm_list]

@staticmethod
def map_to_orm(data: KplerAisData) -> sql_model.KplerAisData:
return sql_model.KplerAisData(**data.__dict__)

@staticmethod
def map_to_domain(orm_data: sql_model.KplerAisData) -> KplerAisData:
return KplerAisData(
id = orm_data.id,
position_id = orm_data.position_id,
vessel_uid = orm_data.vessel_uid,
vessel_flag = orm_data.vessel_flag,
vessel_name = orm_data.vessel_name,
vessel_callsign = orm_data.vessel_callsign,
vessel_mmsi = orm_data.vessel_mmsi,
vessel_imo = orm_data.vessel_imo,
vessel_marinetraffic_type = orm_data.vessel_marinetraffic_type,
vessel_ais_type=orm_data.vessel_ais_type,
vessel_width = orm_data.vessel_width,
vessel_length = orm_data.vessel_length,
vessel_grt = orm_data.vessel_grt,
vessel_dwt= orm_data.vessel_dwt,
static_timestamp = orm_data.static_timestamp,
static_source= orm_data.static_source,
static_message_type= orm_data.static_message_type,
position_message_type= orm_data.position_message_type,
position_source = orm_data.position_source,
position_course = orm_data.position_course,
position_heading = orm_data.position_heading,
position_longitude = orm_data.position_longitude,
position_latitude = orm_data.position_latitude,
position_navigational_status = orm_data.position_navigational_status,
position_rot = orm_data.position_rot,
position_speed= orm_data.position_speed,
position_timestamp = orm_data.position_timestamp,
position_kpler_insert_timestamp = orm_data.position_kpler_insert_timestamp,
voyage_destination = orm_data.voyage_destination,
voyage_draught = orm_data.voyage_draught,
voyage_eta = orm_data.voyage_eta,
payload = orm_data.payload,
created_at = orm_data.created_at
)
7 changes: 0 additions & 7 deletions backend/bloom/infra/repositories/repository_task_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ def get_point_in_time(session: Session, task_name: str) -> datetime:
else:
return e.point_in_time

def set_duration(session: Session, task_name: str, pit: datetime,duration:timedelta)->None:
stmt = (update(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
.where(sql_model.TaskExecution.point_in_time==pit)
.values(duration=duration)
)
session.execute(stmt)
def set_position_count(session: Session, task_name: str, pit: datetime,count:int)->None:
stmt = (update(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
Expand Down
Loading
Loading