diff --git a/.github/workflows/pypi_publish.yml b/.github/workflows/pypi_publish.yml index 0224af3..3a71b43 100644 --- a/.github/workflows/pypi_publish.yml +++ b/.github/workflows/pypi_publish.yml @@ -66,7 +66,7 @@ jobs: # Linux: Build for x86_64 and aarch64 (ARM64), skip 32-bit (i686) CIBW_ARCHS_LINUX: x86_64 aarch64 # Windows: Build only for AMD64 (64-bit), skip 32-bit (x86) to avoid build issues - # CIBW_ARCHS_WINDOWS: AMD64 + CIBW_ARCHS_WINDOWS: AMD64 # Skip PyPy CIBW_SKIP: pp* *musllinux* # Ensure cmake is available in the build environment diff --git a/README.md b/README.md index 614a5c3..8e60a7e 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ A C++ local database library with cross language bindings. Aiming to be a fast, lightweight, and easy-to-use data communication solution for RPC and coupled modeling in scientific computing. ## What's new +- **2026-02-28 (Release Improvement)**: Fix bugs related to build process in Windows. (PR #20) - **2025-12-31(Bug Fix)**: Fixed an issue where shared memory segments were not being properly unregistered from the resource tracker upon closing, which could lead to resource leaks. (PR #17) - **2025-12-15 (Release Improvement)**: Enabled distribution of pre-compiled binary wheels for macOS (Intel/Apple Silicon) and Linux (x86_64/aarch64), eliminating the need for local compilation tools during installation. (PR #15) - **2025-12-10 (Bug Fix)**: Fixed the data type mapping for `U32` fields in Python bindings to ensure correct representation as unsigned 32-bit integers in NumPy arrays. (PR #13) diff --git a/debug_core.py b/debug_core.py new file mode 100644 index 0000000..82d093d --- /dev/null +++ b/debug_core.py @@ -0,0 +1,15 @@ +from fastdb4py import core +try: + print(f"core.gtAny: {core.gtAny}") +except: + print("core.gtAny not found") + +try: + print(f"core.cfDefault: {core.cfDefault}") +except: + print("core.cfDefault not found") + +try: + help(core.WxLayerTableBuild.set_geometry_type) +except: + print("Help not available") diff --git a/pyproject.toml b/pyproject.toml index fb2f762..66e7cf0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fastdb4py" -version = "0.1.9" +version = "0.1.10" description = "FastCarto database bindings" readme = "README.md" requires-python = ">=3.10" diff --git a/python/fastdb4py/__init__.py b/python/fastdb4py/__init__.py index 4c9f50b..39269af 100644 --- a/python/fastdb4py/__init__.py +++ b/python/fastdb4py/__init__.py @@ -3,4 +3,5 @@ F32, F64, STR, WSTR, REF, BYTES ) from .feature import Feature -from .orm import ORM, Table, TableDefn \ No newline at end of file +from .orm import ORM, Table, TableDefn +from .serializer import FastSerializer \ No newline at end of file diff --git a/python/fastdb4py/feature/feature.py b/python/fastdb4py/feature/feature.py index 06df9e9..c17d88b 100644 --- a/python/fastdb4py/feature/feature.py +++ b/python/fastdb4py/feature/feature.py @@ -14,14 +14,26 @@ class Feature(BaseFeature): def __init__(self, **kwargs): + # Local cache for Python-side fields and serializer-hydrated values. self._cache: Dict[str, Any] = {} + # Origin feature mapped from fastdb layer (None means pure Python object). self._origin: core.WxFeature | None = None + # Database handle used when the feature is mapped from fastdb. self._db: core.WxDatabase | core.WxDatabaseBuild | None = None + # Full Python type hints declared on this Feature subclass. self._type_hints: Dict[str, Any] = _get_feature_hints(self.__class__) + # Parsed fastdb field definitions: name -> (field_type, field_index). self._origin_hints: Dict[str, tuple[OriginFieldType, int]] = parse_defns(self.__class__) + # Constructor fast-path: + # kwargs are applied directly to cache to avoid __setattr__ dispatch overhead. + # This object is not fixed yet (_origin is None), so cache assignment is equivalent + # to the non-fixed path in __setattr__. for key, value in kwargs.items(): - setattr(self, key, value) + if key.startswith('_'): + object.__setattr__(self, key, value) + else: + self._cache[key] = value @property def fixed(self) -> bool: @@ -41,116 +53,144 @@ def map_from( return feature def __getattr__(self, name: str): - # Try to get origin type definition with the given name - defn = self._origin_hints.get(name, None) + # Cache-first access: serializer-populated values and dynamic fields live here. + if name in self._cache: + return self._cache[name] + + # Resolve field metadata from parsed feature definitions. + defn = self._origin_hints.get(name) + + # Unknown field behavior: + # - If it is a typed Python field (e.g. List[T]), return None by default. + # - Otherwise, follow Python protocol and raise AttributeError. if defn is None or defn[0] is OriginFieldType.unknown: - warnings.warn(f'Field "{name}" not found in feature "{self.__class__.__name__}".', UserWarning) - return None - + if name in self._type_hints: + return None + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + ft, fid = defn - - # Case for not mapping from database ############################################## - - # If not on mapping, return cached value or default value + + # Case 1: not mapped from database yet (pure Python object). + # Return cached default values and persist them into cache. if not self.fixed: - if name in self._cache: - return self._cache[name] - else: - if ft == OriginFieldType.ref: - ref_feature_type = self._type_hints[name] - default_ref_feature = ref_feature_type() - self._cache[name] = default_ref_feature - return default_ref_feature - else: - default_value = FIELD_TYPE_DEFAULTS.get(ft, None) - self._cache[name] = default_value - return default_value - - # Case for mapping from database ################################################## - - # Type Bytes is specially stored in fastdb as geometry-like chunk - # Return it directly from table feature + if ft == OriginFieldType.ref: + ref_feature_type = self._type_hints[name] + default_ref_feature = ref_feature_type() + self._cache[name] = default_ref_feature + return default_ref_feature + + default_value = FIELD_TYPE_DEFAULTS.get(ft, None) + self._cache[name] = default_value + return default_value + + # Case 2: mapped from database. + # Bytes field is stored as geometry-like chunk in fastdb. if ft == OriginFieldType.bytes: return self._origin.get_geometry_like_chunk() - - # Type Ref requires special handling to get referenced feature - elif ft == OriginFieldType.ref: - # Get feature referencing - ref = self._origin.get_field_as_ref(fid) - - # Return as Feature object - ref_feature_type: Feature = self._type_hints[name] - feature_origin: core.WxFeature = self._db.tryGetFeature(ref) - return ref_feature_type.map_from(self._db, feature_origin) - - # Other types: map to corresponding get_field_as_* method - elif ft == OriginFieldType.u8: + + # Ref field handling strategy: + # 1) Try native fastdb ref lookup when origin/db are available. + # 2) Return None when native ref is unavailable/invalid. + # 3) Cache resolved feature instance for subsequent fast access. + if ft == OriginFieldType.ref: + try: + ref = self._origin.get_field_as_ref(fid) + except Exception: + return None + + if not ref or self._db is None: + return None + + ref_feature_origin = self._db.tryGetFeature(ref) + if not ref_feature_origin: + return None + + ref_feature_type = self._type_hints.get(name, Feature) + feature = ref_feature_type.map_from(self._db, ref_feature_origin) + self._cache[name] = feature + return feature + + # Scalar field mapping to fastdb getters. + if ft == OriginFieldType.u8: return self._origin.get_field_as_int(fid) - elif ft == OriginFieldType.u16: + if ft == OriginFieldType.u16: return self._origin.get_field_as_int(fid) - elif ft == OriginFieldType.u32: + if ft == OriginFieldType.u32: return self._origin.get_field_as_int(fid) - elif ft == OriginFieldType.i32: + if ft == OriginFieldType.i32: return self._origin.get_field_as_int(fid) - elif ft == OriginFieldType.f32: + if ft == OriginFieldType.f32: return self._origin.get_field_as_float(fid) - elif ft == OriginFieldType.f64: + if ft == OriginFieldType.f64: return self._origin.get_field_as_float(fid) - elif ft == OriginFieldType.str: + if ft == OriginFieldType.str: return self._origin.get_field_as_string(fid) - elif ft == OriginFieldType.wstr: + if ft == OriginFieldType.wstr: return self._origin.get_field_as_wstring(fid) + + return None def __setattr__(self, name: str, value): - # Allow setting internal attributes directly + # Internal runtime attributes bypass field mapping. if name.startswith('_'): object.__setattr__(self, name, value) return - - # Try to get origin type definition with the given name - defn = self._origin_hints.get(name, None) + + # Resolve field metadata from parsed feature definitions. + defn = self._origin_hints.get(name) + + # Unknown or non-fastdb-mapped fields are kept in local cache. if defn is None or defn[0] is OriginFieldType.unknown: - warnings.warn(f'Field "{name}" not found in feature "{self.__class__.__name__}".', UserWarning) + self._cache[name] = value return - + ft, fid = defn - - # Case for not mapping from database ############################################## - - # Cache the value for later use + + # Pure Python object path: assign to cache only. if not self.fixed: self._cache[name] = value return - - # Case for mapping from database ################################################## - - # Directly set field value to database according to its type - if ft == OriginFieldType.u8 \ - or ft == OriginFieldType.u16 \ - or ft == OriginFieldType.u32 \ - or ft == OriginFieldType.i32 \ - or ft == OriginFieldType.f32 \ - or ft == OriginFieldType.f64 \ - or ft == OriginFieldType.u8n \ - or ft == OriginFieldType.u16n: + + # Database-mapped numeric fields are written directly to fastdb origin. + if ft in ( + OriginFieldType.u8, + OriginFieldType.u16, + OriginFieldType.u32, + OriginFieldType.i32, + OriginFieldType.f32, + OriginFieldType.f64, + OriginFieldType.u8n, + OriginFieldType.u16n, + ): self._origin.set_field(fid, value) - elif ft == OriginFieldType.ref: - # Get referenced feature type + return + + # Ref field handling strategy: + # - Accept None as a nullable ref and keep it in cache. + # - Validate Python type against annotation. + # - Try native fastdb ref assignment when referenced origin exists. + # - Always cache Python-side value for serializer compatibility. + if ft == OriginFieldType.ref: + if value is None: + self._cache[name] = None + return + ref_feature_type: Feature = self._type_hints[name] if not isinstance(value, ref_feature_type): warnings.warn(f'Field "{name}" expects a reference to type "{ref_feature_type.__name__}", but got "{type(value).__name__}".', UserWarning) return - - self._origin.set_field(fid, value._origin) - - # Get the origin ref feature and set all its fields with the given feature - # Note: this is a deep copy operation, performance may be affected for feature with many fields - # origin_feature: Feature = getattr(self, name) - # for ref_field_name in origin_feature._type_hints.keys(): - # setattr(origin_feature, ref_field_name, getattr(value, ref_field_name)) - - else: - warnings.warn(f'Fastdb only support features to set numeric field for a scale-known block.', UserWarning) + + try: + if value._origin is not None: + self._origin.set_field(fid, value._origin) + except Exception: + pass + + self._cache[name] = value + return + + # Non-numeric writes are not supported by direct fastdb set_field API. + warnings.warn(f'Fastdb only support features to set numeric field for a scale-known block.', UserWarning) # Helpers ################################################## diff --git a/python/fastdb4py/feature/utils.py b/python/fastdb4py/feature/utils.py index 0fa3f5c..4a60f8d 100644 --- a/python/fastdb4py/feature/utils.py +++ b/python/fastdb4py/feature/utils.py @@ -25,7 +25,11 @@ def parse_defns(cls): try: origin_type = get_origin_type(hint) if origin_type == OriginFieldType.unknown: - if issubclass(hint, BaseFeature): + # Check if hint is a Feature subclass or forward reference + if hasattr(hint, '__mro__') and issubclass(hint, BaseFeature): + origin_type = OriginFieldType.ref + # Heuristic for string/ForwardRef: Assume it's a Feature ref if it's not a basic type + elif isinstance(hint, str) or hasattr(hint, '__forward_arg__'): origin_type = OriginFieldType.ref except Exception as e: origin_type = OriginFieldType.unknown diff --git a/python/fastdb4py/serializer.py b/python/fastdb4py/serializer.py new file mode 100644 index 0000000..56e78a5 --- /dev/null +++ b/python/fastdb4py/serializer.py @@ -0,0 +1,710 @@ +import struct +import numpy as np +import ctypes +from threading import Lock +from weakref import WeakKeyDictionary +from typing import Type, List, Dict, Any, get_type_hints, Tuple, get_origin, get_args +from .feature import Feature, get_all_defns +from .type import OriginFieldType, U32, F64 +from . import core + +_NUMERIC_LIST_LAYER_PREFIX = "__fastser_list__|" +_CLASS_SCHEMA_CACHE_LOCK = Lock() +_CLASS_SCHEMA_CACHE: WeakKeyDictionary = WeakKeyDictionary() + +# FastSerializer blob protocol (Mini spec) +# +# The serializer uses a hybrid layout: +# - Scalar fields are stored as fastdb columns. +# - Numeric lists List[U32]/List[F64] are stored in dedicated columnar auxiliary layers. +# - Other complex fields (list/ref/bytes/unknown) are stored in one geometry-like raw blob. +# +# Object reference encoding used by this serializer: +# - Ref = [layer_idx:u16][feature_idx:u32] +# - Null ref sentinel = [0xFFFF:u16][0xFFFFFFFF:u32] +# +# List encoding: +# - List header = [count:u32] +# - List[int] payload = count * i32 +# - List[float] payload = count * f64 +# - List[str] payload = repeated [byte_len:u32][utf8_bytes] +# - List[Feature] payload = count * Ref +# +# Field order contract: +# - Blob payload is written and read strictly by class field definition order. +# - Encoder and decoder must keep identical traversal order for compatibility. +# +# Numeric-list auxiliary layer schema: +# - owner_fid:u32, geometry_raw = packed numeric list payload +# - Layer name format: __fastser_list__|{ClassName}|{FieldName}|{u32|f64} + +class FastSerializer: + """ + High-performance serializer for fastdb Feature objects, supporting nested types and lists. + Builds on top of fastdb's direct memory database capabilities. + """ + + @staticmethod + def dumps(obj: Feature) -> bytes: + if not isinstance(obj, Feature): + raise TypeError("Only fastdb4py.Feature objects can be serialized.") + + ctx = _DumpContext() + # Pass 1: Object graph traversal and ID assignment + ctx.register(obj) + + # Pass 2: Build database + db = core.WxDatabaseBuild() + db.begin("") + + # Create layers for all discovered types + layer_builders = {} + for cls, l_idx in ctx.type_to_layer.items(): + lb = db.create_layer_begin(cls.__name__) + layer_builders[l_idx] = lb + + # Define fields (add only scalar types as columns) + lb.set_geometry_type(0, core.cfDefault, False) # 0 is unknown/any which allows blob storage + + schema = _get_class_schema(cls) + defns = schema["defns"] + for field_name, origin_type in defns: + if origin_type in (OriginFieldType.u8, OriginFieldType.u16, OriginFieldType.u32, + OriginFieldType.i32, OriginFieldType.f32, OriginFieldType.f64, + OriginFieldType.str, OriginFieldType.wstr): + lb.add_field(field_name, origin_type.value) + + # Create auxiliary layers for numeric lists (List[U32] / List[F64]) + numeric_list_layers = {} + for cls in ctx.type_to_layer.keys(): + schema = _get_class_schema(cls) + for field_name, kind in schema["numeric_fields"]: + aux_name = _make_numeric_list_layer_name(cls.__name__, field_name, kind) + aux_lb = db.create_layer_begin(aux_name) + aux_lb.set_geometry_type(0, core.cfDefault, False) + aux_lb.add_field("owner_fid", OriginFieldType.u32.value) + numeric_list_layers[(cls, field_name)] = (kind, aux_lb) + + # Write all objects ordered by layer + sorted_objects = sorted(ctx.objects, key=lambda x: (x.layer_idx, x.feature_idx)) + + current_layer_idx = -1 + current_lb = None + + for obj_wrapper in sorted_objects: + obj = obj_wrapper.obj + l_idx = obj_wrapper.layer_idx + + # Switch Layer + if l_idx != current_layer_idx: + current_layer_idx = l_idx + current_lb = layer_builders[l_idx] + + lb = current_lb + lb.add_feature_begin() + + # Prepare Blob for storing non-scalar data + blob_buffer = bytearray() + + schema = _get_class_schema(obj.__class__) + defns = schema["defns"] + hints = schema["hints"] + numeric_field_kinds = schema["numeric_field_kinds"] + + for idx, (fn, ft) in enumerate(defns): + val = getattr(obj, fn) + numeric_kind = numeric_field_kinds.get(fn) if ft == OriginFieldType.list else None + + # Strategy: Scalar -> Column, Complex -> Blob + if ft == OriginFieldType.list and numeric_kind is not None: + if isinstance(val, list): + _, aux_lb = numeric_list_layers[(obj.__class__, fn)] + _write_numeric_list_chunk(aux_lb, obj_wrapper.feature_idx, val, numeric_kind) + elif ft in (OriginFieldType.list, OriginFieldType.unknown): + if isinstance(val, list): + _pack_list(blob_buffer, val, hints.get(fn, Any), ctx) + elif isinstance(val, Feature): + # Treat single feature as list of 1 element for unknown fields + # This allows forward compatibility if fields become lists + # Or simply to store it. + _pack_feature_ref(blob_buffer, val, ctx) + # Wait, pack_feature_ref writes 6 bytes. + # But unpack_list expects count first. + # So we must wrap in list structure: count=1 + ref + temp_buf = bytearray() + temp_buf.extend(struct.pack(' Feature: + # Zero-copy load database + db = core.WxDatabase.load_xbuffer(data) + if db.get_layer_count() == 0: + return None + + ctx = _LoadContext(db) + + # Discover and register related types + _discover_types(root_type, ctx.type_map) + + # Root object is always at Layer 0, Feature 0 + root_layer = db.get_layer(0) + if root_layer.get_feature_count() == 0: + return None + + return ctx.get_object(0, 0, root_type) + +# --- Internal Helpers --- + +class _DumpContext: + def __init__(self): + self.objects = [] # List[_ObjectWrapper] + self.obj_to_id = {} # id(obj) -> (layer_idx, feature_idx) + self.type_to_layer = {} # Type -> layer_idx + self.layer_counters = {} # layer_idx -> feature_count + + def get_hints(self, cls): + return _get_class_schema(cls)["hints"] + + def get_db_field_index(self, cls, schema_idx): + return _get_class_schema(cls)["db_field_index_by_schema"].get(schema_idx, -1) + + def register(self, obj): + if id(obj) in self.obj_to_id: + return + cls = obj.__class__ + if cls not in self.type_to_layer: + l_idx = len(self.type_to_layer) + self.type_to_layer[cls] = l_idx + self.layer_counters[l_idx] = 0 + + l_idx = self.type_to_layer[cls] + f_idx = self.layer_counters[l_idx] + self.layer_counters[l_idx] += 1 + + self.obj_to_id[id(obj)] = (l_idx, f_idx) + self.objects.append(_ObjectWrapper(obj, l_idx, f_idx)) + + # Recursive registration + hints = self.get_hints(cls) + defns = _get_class_schema(cls)["defns"] + for fn, _ in defns: + type_hint = hints.get(fn) + val = getattr(obj, fn) + + if val is None: + continue + + if isinstance(val, Feature): + self.register(val) + elif isinstance(val, list): + # Check hint + args = get_args(type_hint) if type_hint else None + inner = args[0] if args else None + + if inner and hasattr(inner, '__mro__') and issubclass(inner, Feature): + for item in val: + if isinstance(item, Feature): + self.register(item) + # Fallback check content + elif val and isinstance(val[0], Feature): + for item in val: + if isinstance(item, Feature): + self.register(item) + + def get_ref(self, obj): + return self.obj_to_id.get(id(obj)) + +class _ObjectWrapper: + def __init__(self, obj, l_idx, f_idx): + self.obj = obj + self.layer_idx = l_idx + self.feature_idx = f_idx + +class _LoadContext: + def __init__(self, db): + self.db = db + self.obj_cache = {} # (layer_idx, feature_idx) -> obj + self.type_map = {} # class_name -> Type + self.numeric_list_values = _load_numeric_list_values(db) + + def get_object(self, l_idx, f_idx, expected_type): + key = (l_idx, f_idx) + if key in self.obj_cache: + return self.obj_cache[key] + + layer = self.db.get_layer(l_idx) + if layer is None: + # Fallback: maybe layer name mapping? + # But here use index. + # If layer is None, something is critically wrong. + return None + + feature_data = layer.tryGetFeature(f_idx) + if not feature_data: return None + + cls_name = layer.name() + cls = self.type_map.get(cls_name, expected_type) + + obj = cls() + self.obj_cache[key] = obj # Cache to solve cyclic references + + # Fill data + obj._origin = feature_data + obj._db = self.db + + schema = _get_class_schema(cls) + defns = schema["defns"] + hints = schema["hints"] + numeric_field_kinds = schema["numeric_field_kinds"] + + # Read Blob data + blob = feature_data.get_geometry_like_chunk() + blob_view = None + if blob.size > 0: + # Safe access to memory view + addr = int(blob.pdata) if hasattr(blob.pdata, '__int__') else blob.pdata + if not isinstance(addr, int): # Fallback + try: addr = int(addr) + except: pass + BlobType = ctypes.c_ubyte * blob.size + blob_array = BlobType.from_address(addr) + blob_view = memoryview(blob_array) + + curr_blob_offset = 0 + + for idx, (fn, ft) in enumerate(defns): + numeric_kind = numeric_field_kinds.get(fn) if ft == OriginFieldType.list else None + + # Recover numeric lists from dedicated auxiliary layers (columnar fast-path) + if numeric_kind is not None: + obj._cache[fn] = self.numeric_list_values.get((cls.__name__, fn, f_idx), []) + continue + + # Recover complex types from Blob + if ft in (OriginFieldType.list, OriginFieldType.unknown): + if blob_view: + # Debug print + # print(f"DEBUG: Unpacking list field {fn} at offset {curr_blob_offset}") + val, new_offset = _unpack_list(blob_view, curr_blob_offset, hints.get(fn, Any), self) + obj._cache[fn] = val + # print(f"DEBUG: New offset {new_offset}") + curr_blob_offset = new_offset + elif ft == OriginFieldType.bytes: + if blob_view: + cnt = struct.unpack_from(' 0xFFFFFFFF: + raise ValueError(f"List[U32] item out of range: {iv}") + normalized.append(iv) + packed = struct.pack(f'<{len(normalized)}I', *normalized) + else: + normalized = [float(value) for value in values] + packed = struct.pack(f'<{len(normalized)}d', *normalized) + + aux_lb.set_geometry_raw(packed) + aux_lb.add_feature_end() + +def _load_numeric_list_values(db): + out = {} + + for layer_idx in range(db.get_layer_count()): + layer = db.get_layer(layer_idx) + if layer is None: + continue + + parsed = _parse_numeric_list_layer_name(layer.name()) + if parsed is None: + continue + + class_name, field_name, kind = parsed + + for row_idx in range(layer.get_feature_count()): + row = layer.tryGetFeature(row_idx) + if not row: + continue + + owner_fid = int(row.get_field_as_int(0)) + chunk = row.get_geometry_like_chunk() + values = _decode_numeric_list_chunk(chunk, kind) + out[(class_name, field_name, owner_fid)] = values + + return out + +def _decode_numeric_list_chunk(chunk, kind): + if chunk.size <= 0: + return [] + + addr = int(chunk.pdata) if hasattr(chunk.pdata, '__int__') else chunk.pdata + if not isinstance(addr, int): + addr = int(addr) + + BlobType = ctypes.c_ubyte * chunk.size + blob_array = BlobType.from_address(addr) + view = memoryview(blob_array) + + if kind == "u32": + count = chunk.size // 4 + if count == 0: + return [] + return list(struct.unpack_from(f'<{count}I', view, 0)) + + count = chunk.size // 8 + if count == 0: + return [] + return list(struct.unpack_from(f'<{count}d', view, 0)) + +def _pack_list(buffer, lst, type_hint, ctx): + count = len(lst) + buffer.extend(struct.pack(' 0: + first = lst[0] + if isinstance(first, int): inner = int + elif isinstance(first, float): inner = float + elif isinstance(first, str): inner = str + elif isinstance(first, Feature): inner = Feature + + if inner == int: + buffer.extend(struct.pack(f'<{count}i', *lst)) + elif inner == float: + buffer.extend(struct.pack(f'<{count}d', *lst)) + elif inner == str: + for item in lst: + encoded = item.encode('utf-8') + buffer.extend(struct.pack(' len(view): + # Not feature refs, or out of bounds. + # Should we try unpacking as ints? + # But we already did 'inner == int' check. + break + l_idx, f_idx = struct.unpack_from(' OriginFieldType: + if get_origin(type_var) is list or type_var is list: + return OriginFieldType.list return FIELD_TYPE_MAP.get(type_var, OriginFieldType.unknown) diff --git a/tests/python/benchmark_fast_serializer.py b/tests/python/benchmark_fast_serializer.py new file mode 100644 index 0000000..eaa999a --- /dev/null +++ b/tests/python/benchmark_fast_serializer.py @@ -0,0 +1,304 @@ +import argparse +import gc +import multiprocessing as mp +import pickle +import statistics +import time +import traceback +from dataclasses import dataclass +from typing import List + +from fastdb4py import FastSerializer, Feature, U32, F64, I32, STR + + +class Point(Feature): + x: F64 + y: F64 + + +class NumericPayload(Feature): + ids: List[U32] + values: List[F64] + + +class MixedPayload(Feature): + name: STR + score: F64 + tags: List[str] + points: List[Point] + + +class RecursiveNode(Feature): + val: I32 + next: 'RecursiveNode' + + +@dataclass +class PPoint: + x: float + y: float + + +@dataclass +class PNumericPayload: + ids: List[int] + values: List[float] + + +@dataclass +class PMixedPayload: + name: str + score: float + tags: List[str] + points: List[PPoint] + + +class PRecursiveNode: + def __init__(self, val: int): + self.val = val + self.next = None + + +def _timeit(func, iterations: int, warmup: int) -> list[float]: + for _ in range(warmup): + func() + + samples = [] + for _ in range(iterations): + t0 = time.perf_counter() + func() + t1 = time.perf_counter() + samples.append(t1 - t0) + + return samples + + +def _summary(samples: list[float]) -> tuple[float, float]: + mean_us = statistics.mean(samples) * 1_000_000 + p50_us = statistics.median(samples) * 1_000_000 + return mean_us, p50_us + + +def _build_numeric_payload_pair(n: int): + ids = list(range(n)) + values = [float(i) * 0.125 for i in range(n)] + + fast_obj = NumericPayload(ids=ids, values=values) + pickle_obj = PNumericPayload(ids=ids, values=values) + return fast_obj, pickle_obj + + +def _build_mixed_payload_pair(n: int): + points = [Point(x=float(i), y=float(i) * 0.5) for i in range(n)] + ppoints = [PPoint(x=float(i), y=float(i) * 0.5) for i in range(n)] + tags = [f"tag_{i}" for i in range(64)] + ["你好", "emoji🙂", "line1\nline2"] + fast_obj = MixedPayload( + name="mixed_payload", + score=123.456, + tags=tags, + points=points, + ) + + pickle_obj = PMixedPayload( + name="mixed_payload", + score=123.456, + tags=tags, + points=ppoints, + ) + + return fast_obj, pickle_obj + + +def _build_recursive_payload_pair(n: int): + f_head = RecursiveNode(val=0) + f_current = f_head + + p_head = PRecursiveNode(val=0) + p_current = p_head + + for i in range(1, n): + f_nxt = RecursiveNode(val=i) + f_current.next = f_nxt + f_current = f_nxt + + p_nxt = PRecursiveNode(val=i) + p_current.next = p_nxt + p_current = p_nxt + + f_current.next = f_head + p_current.next = p_head + return f_head, p_head + + +def benchmark_case(name: str, fast_obj, root_type, pickle_obj, iterations: int, warmup: int): + gc.collect() + + fast_bytes = FastSerializer.dumps(fast_obj) + pickle_bytes = pickle.dumps(pickle_obj, protocol=pickle.HIGHEST_PROTOCOL) + + fast_dump_samples = _timeit(lambda: FastSerializer.dumps(fast_obj), iterations, warmup) + fast_load_samples = _timeit(lambda: FastSerializer.loads(fast_bytes, root_type), iterations, warmup) + + pickle_dump_samples = _timeit( + lambda: pickle.dumps(pickle_obj, protocol=pickle.HIGHEST_PROTOCOL), + iterations, + warmup, + ) + pickle_load_samples = _timeit(lambda: pickle.loads(pickle_bytes), iterations, warmup) + + fast_dump_mean, fast_dump_p50 = _summary(fast_dump_samples) + fast_load_mean, fast_load_p50 = _summary(fast_load_samples) + + pickle_dump_mean, pickle_dump_p50 = _summary(pickle_dump_samples) + pickle_load_mean, pickle_load_p50 = _summary(pickle_load_samples) + + return { + "name": name, + "fast_size": len(fast_bytes), + "pickle_size": len(pickle_bytes), + "fast_dump_mean": fast_dump_mean, + "fast_dump_p50": fast_dump_p50, + "fast_load_mean": fast_load_mean, + "fast_load_p50": fast_load_p50, + "pickle_dump_mean": pickle_dump_mean, + "pickle_dump_p50": pickle_dump_p50, + "pickle_load_mean": pickle_load_mean, + "pickle_load_p50": pickle_load_p50, + } + + +def _build_case(case_name: str, numeric_size: int, mixed_points: int, cyclic_nodes: int): + if case_name == "numeric": + fast_obj, pickle_obj = _build_numeric_payload_pair(numeric_size) + return fast_obj, NumericPayload, pickle_obj + if case_name == "mixed": + fast_obj, pickle_obj = _build_mixed_payload_pair(mixed_points) + return fast_obj, MixedPayload, pickle_obj + if case_name == "cyclic": + fast_obj, pickle_obj = _build_recursive_payload_pair(cyclic_nodes) + return fast_obj, RecursiveNode, pickle_obj + raise ValueError(f"Unknown case: {case_name}") + + +def _worker(queue: mp.Queue, case_name: str, iterations: int, warmup: int, numeric_size: int, mixed_points: int, cyclic_nodes: int): + try: + fast_obj, root_type, pickle_obj = _build_case(case_name, numeric_size, mixed_points, cyclic_nodes) + result = benchmark_case(case_name, fast_obj, root_type, pickle_obj, iterations, warmup) + queue.put({"ok": True, "result": result}) + except Exception as exc: + queue.put({"ok": False, "error": repr(exc), "traceback": traceback.format_exc()}) + + +def run_case_isolated(case_name: str, iterations: int, warmup: int, numeric_size: int, mixed_points: int, cyclic_nodes: int, timeout: int): + queue: mp.Queue = mp.Queue() + proc = mp.Process( + target=_worker, + args=(queue, case_name, iterations, warmup, numeric_size, mixed_points, cyclic_nodes), + ) + proc.start() + proc.join(timeout=timeout) + + if proc.is_alive(): + proc.terminate() + proc.join() + return { + "name": case_name, + "status": "timeout", + } + + if proc.exitcode != 0: + return { + "name": case_name, + "status": f"crash({proc.exitcode})", + } + + if queue.empty(): + return { + "name": case_name, + "status": "no-result", + } + + msg = queue.get() + if not msg.get("ok"): + return { + "name": case_name, + "status": f"error({msg.get('error')})", + "traceback": msg.get("traceback", ""), + } + + out = msg["result"] + out["status"] = "ok" + return out + + +def print_report(rows: list[dict], iterations: int, warmup: int): + print("FastSerializer vs pickle benchmark") + print(f"iterations={iterations}, warmup={warmup}\n") + + header = ( + f"{'Case':<14} | {'Status':<12} | {'Size(Fast)':>10} | {'Size(Pickle)':>12} | " + f"{'Dump Fast μs':>12} | {'Dump Pickle μs':>14} | " + f"{'Load Fast μs':>12} | {'Load Pickle μs':>14}" + ) + print(header) + print("-" * len(header)) + + for row in rows: + if row.get("status") != "ok": + print( + f"{row['name']:<14} | {row.get('status', 'unknown'):<12} | {'-':>10} | {'-':>12} | {'-':>12} | {'-':>14} | {'-':>12} | {'-':>14}" + ) + continue + + print( + f"{row['name']:<14} | {row.get('status', 'ok'):<12} | {row['fast_size']:>10} | {row['pickle_size']:>12} | " + f"{row['fast_dump_p50']:>12.2f} | {row['pickle_dump_p50']:>14.2f} | " + f"{row['fast_load_p50']:>12.2f} | {row['pickle_load_p50']:>14.2f}" + ) + + print("\nDetail (mean μs):") + for row in rows: + if row.get("status") != "ok": + print(f"- {row['name']}: {row.get('status', 'unknown')}") + tb = row.get("traceback", "") + if tb: + print(tb.rstrip()) + continue + print( + f"- {row['name']}: " + f"dump fast={row['fast_dump_mean']:.2f}, pickle={row['pickle_dump_mean']:.2f}; " + f"load fast={row['fast_load_mean']:.2f}, pickle={row['pickle_load_mean']:.2f}" + ) + + +def main(): + parser = argparse.ArgumentParser(description="Benchmark FastSerializer against pickle") + parser.add_argument("--iterations", type=int, default=200) + parser.add_argument("--warmup", type=int, default=20) + parser.add_argument("--numeric-size", type=int, default=8, help="List length for numeric payload") + parser.add_argument("--mixed-points", type=int, default=4, help="Point count for mixed payload") + parser.add_argument("--cyclic-nodes", type=int, default=8, help="Node count for cyclic payload") + parser.add_argument("--timeout", type=int, default=30, help="Per-case timeout in seconds") + args = parser.parse_args() + + print("Note: Cases run in isolated subprocesses to tolerate native-level crashes.") + print(" Increase sizes gradually with --numeric-size/--mixed-points/--cyclic-nodes for stress tests.\n") + + rows = [] + for case_name in ["numeric", "mixed", "cyclic"]: + row = run_case_isolated( + case_name=case_name, + iterations=args.iterations, + warmup=args.warmup, + numeric_size=args.numeric_size, + mixed_points=args.mixed_points, + cyclic_nodes=args.cyclic_nodes, + timeout=args.timeout, + ) + rows.append(row) + + print_report(rows, args.iterations, args.warmup) + + +if __name__ == "__main__": + main() diff --git a/tests/python/test_fast_serializer.py b/tests/python/test_fast_serializer.py new file mode 100644 index 0000000..75a2d8a --- /dev/null +++ b/tests/python/test_fast_serializer.py @@ -0,0 +1,182 @@ +import unittest +import struct +from typing import List, Optional +from fastdb4py import FastSerializer, Feature, I32, U32, F64, STR, REF + +class Point(Feature): + x: F64 + y: F64 + +class Line(Feature): + points: List[Point] + id: I32 + +class Node(Feature): + id: I32 + # next: REF # Recursive ref directly + # children: List[REF] # List[Node] + # To support recursive types properly in Python < 3.10 without from __future__ annotations, + # we rely on FastSerializer's robust discovery. + # But explicit type hints are needed. + pass + +class RecursiveNode(Feature): + val: I32 + next: 'RecursiveNode' + +class TreeNode(Feature): + val: I32 + children: List['TreeNode'] + +class User(Feature): + name: STR + age: I32 + scores: List[float] + +class MultiListPayload(Feature): + ints: List[int] + names: List[str] + points: List[Point] + +class StringListOnly(Feature): + names: List[str] + +class NumericColumnarLists(Feature): + ids: List[U32] + values: List[F64] + +class TestFastSerializer(unittest.TestCase): + def test_simple_object(self): + p = Point(x=1.0, y=2.0) + data = FastSerializer.dumps(p) + self.assertTrue(len(data) > 0) + + p2 = FastSerializer.loads(data, Point) + self.assertAlmostEqual(p2.x, 1.0) + self.assertAlmostEqual(p2.y, 2.0) + + def test_nested_list(self): + p1 = Point(x=1.0, y=2.0) + p2 = Point(x=3.0, y=4.0) + line = Line(id=100, points=[p1, p2]) + + data = FastSerializer.dumps(line) + line2 = FastSerializer.loads(data, Line) + + self.assertEqual(line2.id, 100) + self.assertEqual(len(line2.points), 2) + self.assertAlmostEqual(line2.points[0].x, 1.0) + self.assertAlmostEqual(line2.points[1].y, 4.0) + + def test_scalar_list(self): + print("Running test_basic_types...") + u = User(name="Alice", age=30, scores=[90.5, 80.0, 95.5]) + data = FastSerializer.dumps(u) + print("Dumps done.") + + u2 = FastSerializer.loads(data, User) + print("Loads done.") + self.assertEqual(u2.name, "Alice") + self.assertEqual(u2.age, 30) + self.assertEqual(len(u2.scores), 3) + self.assertAlmostEqual(u2.scores[0], 90.5) + + def test_cyclic_reference(self): + print("Running test_cyclic_reference...") + # A -> B -> A + n1 = RecursiveNode(val=1) + n2 = RecursiveNode(val=2) + n1.next = n2 + n2.next = n1 + + data = FastSerializer.dumps(n1) + print("Cyclic dumps done.") + + # Load + check_n1 = FastSerializer.loads(data, RecursiveNode) + print("Cyclic loads done.") + self.assertEqual(check_n1.val, 1) + self.assertIsNotNone(check_n1.next) + self.assertEqual(check_n1.next.val, 2) + print("Cyclic checks 1 done.") + + # Check cycle identity + # Note: FastSerializer ensures identity preservation within one load + self.assertIs(check_n1.next.next, check_n1) + print("Cyclic checks done.") + + def test_tree_structure(self): + print("Running test_tree_structure...") + root = TreeNode(val=0, children=[]) + child1 = TreeNode(val=1, children=[]) + child2 = TreeNode(val=2, children=[]) + root.children.append(child1) + root.children.append(child2) + + subchild = TreeNode(val=3, children=[]) + child1.children.append(subchild) + + data = FastSerializer.dumps(root) + print("Tree dumps done.") + + f_root = FastSerializer.loads(data, TreeNode) + print("Tree loads done.") + self.assertEqual(f_root.val, 0) + self.assertEqual(len(f_root.children), 2) + self.assertEqual(f_root.children[0].val, 1) + self.assertEqual(f_root.children[0].children[0].val, 3) + + def test_multi_list_and_string_list(self): + payload = MultiListPayload( + ints=[1, 2, 3, 5, 8], + names=["alpha", "beta", "你好", "emoji🙂"], + points=[Point(x=10.0, y=20.0), Point(x=30.0, y=40.0)] + ) + + data = FastSerializer.dumps(payload) + payload2 = FastSerializer.loads(data, MultiListPayload) + + self.assertEqual(payload2.ints, [1, 2, 3, 5, 8]) + self.assertEqual(payload2.names, ["alpha", "beta", "你好", "emoji🙂"]) + self.assertEqual(len(payload2.points), 2) + self.assertAlmostEqual(payload2.points[0].x, 10.0) + self.assertAlmostEqual(payload2.points[1].y, 40.0) + + def test_string_list_edge_cases(self): + long_text = "x" * 10000 + payload = StringListOnly( + names=["", "ascii", "你好", "emoji🙂", "line1\nline2", long_text] + ) + + data = FastSerializer.dumps(payload) + payload2 = FastSerializer.loads(data, StringListOnly) + + self.assertEqual( + payload2.names, + ["", "ascii", "你好", "emoji🙂", "line1\nline2", long_text] + ) + + empty_payload = StringListOnly(names=[]) + empty_data = FastSerializer.dumps(empty_payload) + empty_payload2 = FastSerializer.loads(empty_data, StringListOnly) + self.assertEqual(empty_payload2.names, []) + + def test_numeric_list_columnar_path(self): + payload = NumericColumnarLists( + ids=[0, 1, 2, 1024, 65535, 4294967295], + values=[0.0, 1.5, -3.25, 1e-6, 1e6] + ) + + data = FastSerializer.dumps(payload) + payload2 = FastSerializer.loads(data, NumericColumnarLists) + + self.assertEqual(payload2.ids, [0, 1, 2, 1024, 65535, 4294967295]) + self.assertEqual(len(payload2.values), 5) + self.assertAlmostEqual(payload2.values[0], 0.0) + self.assertAlmostEqual(payload2.values[1], 1.5) + self.assertAlmostEqual(payload2.values[2], -3.25) + self.assertAlmostEqual(payload2.values[3], 1e-6) + self.assertAlmostEqual(payload2.values[4], 1e6) + +if __name__ == '__main__': + unittest.main()