diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml
index c41ceaf..80261e5 100644
--- a/.github/workflows/unit-tests.yml
+++ b/.github/workflows/unit-tests.yml
@@ -12,10 +12,7 @@ jobs:
fail-fast: false
matrix:
os: [windows-latest, ubuntu-latest, macos-latest]
- python-version: ['3.6', '3.7', '3.8', '3.9', '3.10', '3.11']
- exclude:
- - os: ubuntu-latest
- python-version: '3.6'
+ python-version: ['3.9', '3.10', '3.11', '3.12', '3.13']
env:
OS: ${{ matrix.os }}
@@ -25,25 +22,26 @@ jobs:
steps:
- - uses: actions/setup-python@v3
+ - uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- run: python -m pip install --upgrade pip
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- run: python -m pip install .[test]
- - run: python -m pytest ./tests/ --cov=ebmlite --cov-report=xml --flake8 -n auto
-
- - uses: actions/upload-artifact@v3
- with:
- name: multi-file-stuff
- path: |
- ./tests/SSX46714-doesnot.IDE
- ./tests/SSX46714-new.IDE
- ./tests/ssx-1.xml
- ./tests/ssx-2.xml
+ - run: python -m pytest ./tests/ --cov=ebmlite --cov-report=xml -n auto
+
+# - uses: actions/upload-artifact@v4
+# with:
+# name: multi-file-stuff
+# overwrite: true
+# path: |
+# ./tests/SSX46714-doesnot.IDE
+# ./tests/SSX46714-new.IDE
+# ./tests/ssx-1.xml
+# ./tests/ssx-2.xml
- uses: codecov/codecov-action@v3
with:
diff --git a/.gitignore b/.gitignore
index f7c881c..b5fa74b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,6 @@ venv?
*.egg*
.mypy_cache
+
+# Sphinx documentation
+docs/_build/
diff --git a/.readthedocs.yml b/.readthedocs.yml
new file mode 100644
index 0000000..77e4b4d
--- /dev/null
+++ b/.readthedocs.yml
@@ -0,0 +1,22 @@
+# Read the Docs configuration file
+# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
+
+# Required
+version: 2
+
+# Set the OS, Python version, and other tools you might need
+build:
+ os: ubuntu-24.04
+ tools:
+ python: "3.13"
+
+# Build documentation in the "docs/" directory with Sphinx
+sphinx:
+ configuration: docs/conf.py
+
+# Optionally, but recommended,
+# declare the Python requirements required to build your documentation
+# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html
+python:
+ install:
+ - requirements: docs/requirements.txt
\ No newline at end of file
diff --git a/README.md b/README.md
index 9d9bad5..76a8a52 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
- [](https://codecov.io/gh/MideTechnology/ebmlite)
+[](https://pypi.org/project/ebmlite/)  [](https://codecov.io/gh/MideTechnology/ebmlite)
diff --git a/codecov.yml b/codecov.yml
index cc1be98..1ae145b 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -1,4 +1,2 @@
coverage:
- range: 50..90
-ignore:
- - "ebmlite/tools"
\ No newline at end of file
+ range: 50..90
\ No newline at end of file
diff --git a/docs/Makefile b/docs/Makefile
new file mode 100644
index 0000000..d4bb2cb
--- /dev/null
+++ b/docs/Makefile
@@ -0,0 +1,20 @@
+# Minimal makefile for Sphinx documentation
+#
+
+# You can set these variables from the command line, and also
+# from the environment for the first two.
+SPHINXOPTS ?=
+SPHINXBUILD ?= sphinx-build
+SOURCEDIR = .
+BUILDDIR = _build
+
+# Put it first so that "make" without argument is like "make help".
+help:
+ @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
+
+.PHONY: help Makefile
+
+# Catch-all target: route all unknown targets to Sphinx using the new
+# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
+%: Makefile
+ @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
diff --git a/docs/_static/endaq-favicon.ico b/docs/_static/endaq-favicon.ico
new file mode 100644
index 0000000..c67cb04
Binary files /dev/null and b/docs/_static/endaq-favicon.ico differ
diff --git a/docs/_static/endaq-logo-300x121.svg b/docs/_static/endaq-logo-300x121.svg
new file mode 100644
index 0000000..3f62b15
--- /dev/null
+++ b/docs/_static/endaq-logo-300x121.svg
@@ -0,0 +1,61 @@
+
+
+
diff --git a/docs/_static/endaq-style.css b/docs/_static/endaq-style.css
new file mode 100644
index 0000000..6e30390
--- /dev/null
+++ b/docs/_static/endaq-style.css
@@ -0,0 +1,136 @@
+@charset "utf-8";
+/* CSS Document */
+
+@charset "utf-8";
+/* CSS Document */
+
+
+/*****************************************************************************
+ * Color
+ *
+ * Colors are defined in rgb string way, "red, green, blue"
+ **/
+ --pst-color-primary: #f26722;
+ --pst-color-success: 40, 167, 69;
+ --pst-color-info: 0, 123, 255; /*23, 162, 184;*/
+ --pst-color-warning: 255, 193, 7;
+ --pst-color-danger: 220, 53, 69;
+ --pst-color-text-base: 51, 51, 51;
+
+ --pst-color-h1: var(--pst-color-primary);
+ --pst-color-h2: var(--pst-color-primary);
+ --pst-color-h3: var(--pst-color-text-base);
+ --pst-color-h4: var(--pst-color-text-base);
+ --pst-color-h5: var(--pst-color-text-base);
+ --pst-color-h6: var(--pst-color-text-base);
+ --pst-color-paragraph: var(--pst-color-text-base);
+ --pst-color-link: 231, 112, 37;
+ --pst-color-link-hover: 227, 46, 0;
+ --pst-color-headerlink: 198, 15, 15;
+ --pst-color-headerlink-hover: 255, 255, 255;
+ --pst-color-preformatted-text: 34, 34, 34;
+ --pst-color-preformatted-background: 250, 250, 250;
+ --pst-color-inline-code: 156, 35, 233;
+
+ --pst-color-active-navigation: 231, 112, 37;
+ --pst-color-navbar-link: 77, 77, 77;
+ --pst-color-navbar-link-hover: var(--pst-color-active-navigation);
+ --pst-color-navbar-link-active: var(--pst-color-active-navigation);
+ --pst-color-sidebar-link: 77, 77, 77;
+ --pst-color-sidebar-link-hover: var(--pst-color-active-navigation);
+ --pst-color-sidebar-link-active: var(--pst-color-active-navigation);
+ --pst-color-sidebar-expander-background-hover: 244, 244, 244;
+ --pst-color-sidebar-caption: 77, 77, 77;
+ --pst-color-toc-link: 119, 117, 122;
+ --pst-color-toc-link-hover: var(--pst-color-active-navigation);
+ --pst-color-toc-link-active: var(--pst-color-active-navigation);
+
+
+cite {
+ font-style: normal!important;
+}
+
+ /*****************************************************************************
+ * Icon
+ **/
+
+ /* font awesome icons*/
+ --pst-icon-check-circle: '\f058';
+ --pst-icon-info-circle: '\f05a';
+ --pst-icon-exclamation-triangle: '\f071';
+ --pst-icon-exclamation-circle: '\f06a';
+ --pst-icon-times-circle: '\f057';
+ --pst-icon-lightbulb: '\f0eb';
+
+
+.wy-side-nav-search {
+ display: block;
+ width: 300px;
+ padding: .809em;
+ margin-bottom: .809em;
+ z-index: 200;
+ background-color: #e77025;
+ text-align: center;
+ color: #fcfcfc;
+}
+
+ul.task-bullet > li:before {
+ content: "";
+ height: 2em;
+ width: 2em;
+ display: block;
+ float: left;
+ margin-left: -2em;
+ background-position: center;
+ background-repeat: no-repeat;
+ background-color: #e77025;
+ border-radius: 50%;
+ background-size: 100%;
+ background-image: url(https://info.endaq.com/hubfs/readthedocs/question_mark_noback.svg);
+}
+
+.bd-search {
+ background-color: f4eeea;
+}
+
+h1 {
+ font-weight: bold;
+ color: #f26722;
+}
+
+dl.field-list {
+ background-color: #f3f3f3;
+ padding: 7px;
+}
+
+--pst-color-link: #e77025;
+
+.sig-name {
+ color: #9c23e9;
+}
+
+.prev-next-area a p.prev-next-title {
+ color: #e77025;
+ font-weight: 600;
+ font-size: 1.1em;
+}
+
+span.sig-name.descname > span {
+ color: #9c23e9;
+}
+
+h1 > cite, h2 > cite {
+ font-style: normal;
+ color: #e77025;
+}
+
+ h2 > cite {
+ font-size: 32px;
+}
+
+a.headerlink {
+ color: #00a9a4;
+ font-size: .8em;
+ padding: 0 4px;
+ text-decoration: none;
+}
\ No newline at end of file
diff --git a/docs/conf.py b/docs/conf.py
new file mode 100644
index 0000000..894a969
--- /dev/null
+++ b/docs/conf.py
@@ -0,0 +1,87 @@
+# Configuration file for the Sphinx documentation builder.
+#
+# For the full list of built-in configuration values, see the documentation:
+# https://www.sphinx-doc.org/en/master/usage/configuration.html
+
+import codecs
+import os.path
+import sys
+
+# go up a dir and include that guy =
+p = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
+sys.path.insert(0, p)
+
+
+# -- Project information -----------------------------------------------------
+# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
+
+def get_version(rel_path):
+ """ Read the version number directly from the source. """
+ with codecs.open(rel_path, 'r') as fp:
+ for line in fp:
+ if line.startswith('__version__'):
+ delim = '"' if '"' in line else "'"
+ return line.split(delim)[1]
+ else:
+ raise RuntimeError("Unable to find version string.")
+
+
+project = 'ebmlite'
+copyright = '2025, Midé Technology Corp.'
+author = 'David R. Stokes'
+
+# The full version, including alpha/beta/rc tags
+release = get_version(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'ebmlite', '__init__.py')))
+# The short X.Y version
+version = '.'.join(release.split(".")[:2])
+
+
+# -- General configuration ---------------------------------------------------
+# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
+
+extensions = [
+ 'sphinx.ext.autodoc',
+ 'sphinx.ext.intersphinx',
+ 'sphinx.ext.githubpages',
+ 'sphinx_autodoc_typehints',
+]
+
+templates_path = ['_templates']
+exclude_patterns = ['_build', 'venv', 'Thumbs.db', '.DS_Store']
+
+
+# The theme to use for HTML and HTML Help pages. See the documentation for
+# a list of builtin themes.
+
+html_theme = 'pydata_sphinx_theme'
+html_logo = '_static/endaq-logo-300x121.svg'
+html_favicon = '_static/endaq-favicon.ico'
+
+# Theme options are theme-specific and customize the look and feel of a theme
+# further. For a list of options available for each theme, see the
+# documentation.
+#
+html_theme_options = {
+ "logo": {
+ "link": "index"
+ },
+ "github_url": "https://github.com/MideTechnology/ebmlite",
+ "twitter_url": "https://twitter.com/enDAQ_sensors",
+ "collapse_navigation": True,
+ "analytics": {
+ "google_analytics_id": "G-E9QXH4H5LP",
+ }
+}
+
+# Add any paths that contain custom static files (such as style sheets) here,
+# relative to this directory. They are copied after the builtin static files,
+# so a file named "default.css" will overwrite the builtin "default.css".
+html_static_path = ['_static']
+
+# Appends custom .css file
+# https://docs.readthedocs.io/en/stable/guides/adding-custom-css.html#overriding-or-replacing-a-theme-s-stylesheet
+html_style = "https://info.endaq.com/hubfs/docs/css/endaq-docs-style.css"
+
+intersphinx_mapping = {
+ 'python': ('https://docs.python.org/3', None),
+}
diff --git a/docs/ebmlite/core.rst b/docs/ebmlite/core.rst
new file mode 100644
index 0000000..c24939b
--- /dev/null
+++ b/docs/ebmlite/core.rst
@@ -0,0 +1,5 @@
+core
+===================
+
+.. automodule:: ebmlite.core
+ :members:
\ No newline at end of file
diff --git a/docs/ebmlite/decoding.rst b/docs/ebmlite/decoding.rst
new file mode 100644
index 0000000..089c162
--- /dev/null
+++ b/docs/ebmlite/decoding.rst
@@ -0,0 +1,5 @@
+decoding
+===================
+
+.. automodule:: ebmlite.decoding
+ :members:
\ No newline at end of file
diff --git a/docs/ebmlite/encoding.rst b/docs/ebmlite/encoding.rst
new file mode 100644
index 0000000..0174572
--- /dev/null
+++ b/docs/ebmlite/encoding.rst
@@ -0,0 +1,5 @@
+encoding
+===================
+
+.. automodule:: ebmlite.encoding
+ :members:
\ No newline at end of file
diff --git a/docs/ebmlite/util.rst b/docs/ebmlite/util.rst
new file mode 100644
index 0000000..ee66e36
--- /dev/null
+++ b/docs/ebmlite/util.rst
@@ -0,0 +1,5 @@
+utils
+===================
+
+.. automodule:: ebmlite.util
+ :members:
\ No newline at end of file
diff --git a/docs/ebmlite/xml_codecs.rst b/docs/ebmlite/xml_codecs.rst
new file mode 100644
index 0000000..17faf2a
--- /dev/null
+++ b/docs/ebmlite/xml_codecs.rst
@@ -0,0 +1,5 @@
+core
+===================
+
+.. automodule:: ebmlite.xml_codecs
+ :members:
\ No newline at end of file
diff --git a/docs/index.rst b/docs/index.rst
new file mode 100644
index 0000000..68c5efa
--- /dev/null
+++ b/docs/index.rst
@@ -0,0 +1,200 @@
+.. ebmlite documentation master file, created by
+ sphinx-quickstart on Wed Jul 8 15:45:54 2020.
+ You can adapt this file completely to your liking, but it should at least
+ contain the root `toctree` directive.
+.. default-domain:: py
+.. currentmodule:: ebmlite.core
+
+`ebmlite`
+=========
+
+*ebmlite* is a lightweight, "pure Python" library for parsing EBML (Extensible
+Binary Markup Language) data. It is designed to crawl through EBML files quickly
+and efficiently, and that's about it. *ebmlite* can also do basic EBML encoding,
+but more advanced EBML manipulation (e.g. with a proper `DOM `_)
+are beyond its scope, and are better left to other libraries.
+
+
+EBML Overview (the short version)
+=================================
+
+`EBML `_ (Extensible Binary Markup
+Language) is a hierarchical tagged binary format, originally created for the
+`Matroska `_ project. The hierarchical structure of
+EBML bears some conceptual/functional similarity to XML, although the actual
+structure differs significantly.
+
+In the raw, EBML elements consist of a numeric ID, the size of the element, and
+a payload. It is space-efficient; the lengths of the ID and size descriptors are
+variable, using prefix bits to indicate their lengths, a system similar to UTF-8.
+The mapping of IDs to names and payload data types is done via an external schema.
+
+See the `official specification `_
+for more information.
+
+EBML Schemata
+=============
+
+An EBML file is largely meaningless without a schema that defines its elements.
+The schema maps element IDs to names and data types; it also describes the
+structure (e.g. what elements can be children of other elements) and provides
+additional metadata. *Note: ebmlite currently uses the structure for decoding
+only, and does not stringently enforce it.*
+
+The :class:`Schema` class is a factory used to encode and decode EBML files.
+When it's initialized, it scans through the schema file and creates a new class
+for each element defined in the schema file; then, when encoding or decoding EBML
+files, it references these classes in order to encapsulate everything safely.
+
+*ebmlite* schemata are defined in XML. From these XML files, a :class:`Schema`
+instance is created; within the :class:`Schema` are :class:`Element` subclasses
+for each element defined in the XML. Since the interpretation of an EBML file is
+almost entirely dependent on a schema, importing of EBML files is done through a
+:class:`Schema` instance.
+
+:class:`Schema` instances are typically created from an XML file through :func:`loadSchema`,
+or from a byte string using :func:`parseSchema`.
+
+.. code-block:: python
+
+ from ebmlite import loadSchema
+ schema = loadSchema('mide_ide.xml')
+ doc = schema.load('test_file.ebml')
+
+Loading an EBML file creates an instance of a :class:`Document` subclass, created
+by the schema, which acts as the root node of the EBML tree. :class:`Document`
+instances are typically created by reading an EBML file with
+:meth:`Schema.load`, or a byte string via :meth:`Schema.loads`.
+
+Schema Format
+-------------
+*ebmlite* uses its own Schema definition syntax. It does not (currently) use the `official schema format
+`_.
+
+Here is an example of an *ebmlite* schema, showing a simplified version of the
+definition of the standard EBML header elements:
+
+.. code-block:: xml
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+Element types
+'''''''''''''
+Each element defined in the schema is a subclass of one of 8 Element base classes:
+
+* **MasterElement:** An element containing other elements.
+* **IntegerElement:** Contains a signed integer value of variable length.
+* **UIntegerElement:** Contains an unsigned integer value of variable length.
+* **FloatElement:** Contains a 32 or 64 bit floating point value.
+* **StringElement:** Contains printable US-ASCII characters (0x20 to 0x7E).
+* **UnicodeElement:** Contains UTF-8 string data.
+* **DateElement:** Contains a timestamp, stored as nanoseconds since
+ 2001-01-01T00:00:00 UTC as a 64 bit integer. *ebmlite* automatically translates
+ this into a Python :py:class:`datetime.datetime` object.
+* **BinaryElement:** Contains binary data.
+
+Element definitions have several attributes:
+
+* :code:`name` (string): The Element subclass' name.
+* :code:`id` (integer): The Element subclass' EBML ID.
+* :code:`global` (bool, optional): If "true" (e.g. :code:`1` or :code:`True`),
+ the element may appear in any location in an EBML file, not just where it
+ appears in the schema.
+* :code:`length` (integer, optional): A fixed size to use when encoding the
+ element, overriding the EBML variable length encoding. Use to create
+ byte-aligned structures.
+* :code:`multiple` (bool, optional, default=1): Indicates that the element can
+ appear more than once within the same parent.
+ *Currently partially enforced for encoding.*
+* :code:`mandatory` (bool, optional, default=0): Indicates that the element
+ *must* be present. *Not currently enforced.*
+* :code:`precache` (bool, optional, default varies by type): Indicates that the
+ element's value should be read and cached when the element is parsed, rather
+ than 'lazy-loaded' when explicitly accessed. Can be used to reduce the number
+ of seeks when working with an EBML file after it has been imported. Simple
+ numeric element types have this enabled by default; master, binary, and
+ string/Unicode elements do not.
+
+There are two additional, special-case Element subclasses which are not subclassed:
+
+* **UnknownElement:** Instantiated for elements with IDs that do not appear in
+ the schema. Its payload is treated as binary data. The UnknownElement itself
+ does not appear in the Schema. Unlike other Element subclasses, its ID can
+ vary from instance to instance.
+* **VoidElement:** "Void" (ID :code:`0xEC`) is a standard EBML element,
+ typically used for padding. If the Schema defines the Void element, it is
+ replaced by this special-case element. The contents of its payload are ignored.
+
+Schema XML Structure
+--------------------
+The structure of the schema's XML defines the structure of the EBML document;
+children of a MasterElement in the schema are valid child element types in the EBML.
+An Element type can appear multiple times in a schema; i.e. if its type can
+appear as a child of different parent types. Only the first definition requires
+both :code:`name` and :code:`id` attributes. Successive definitions can be
+abbreviated to just the :code:`name` and/or :code:`id`; they will inherit all
+the other attributes of the first definition. Successive definitions must *not*
+have contradictory attributes, however.
+
+.. code-block:: xml
+
+
+ ...
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+**Note:** As seen in the example above, *ebmlite* allows an EBML document to
+have multiple elements at its root level. Several other EBML libraries do this
+as well, but this is apparently counter to the official spec. Officially, an EBML
+document should have only a single root element, similar to an XML file.
+
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`
diff --git a/docs/make.bat b/docs/make.bat
new file mode 100644
index 0000000..954237b
--- /dev/null
+++ b/docs/make.bat
@@ -0,0 +1,35 @@
+@ECHO OFF
+
+pushd %~dp0
+
+REM Command file for Sphinx documentation
+
+if "%SPHINXBUILD%" == "" (
+ set SPHINXBUILD=sphinx-build
+)
+set SOURCEDIR=.
+set BUILDDIR=_build
+
+%SPHINXBUILD% >NUL 2>NUL
+if errorlevel 9009 (
+ echo.
+ echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
+ echo.installed, then set the SPHINXBUILD environment variable to point
+ echo.to the full path of the 'sphinx-build' executable. Alternatively you
+ echo.may add the Sphinx directory to PATH.
+ echo.
+ echo.If you don't have Sphinx installed, grab it from
+ echo.https://www.sphinx-doc.org/
+ exit /b 1
+)
+
+if "%1" == "" goto help
+
+%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
+goto end
+
+:help
+%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
+
+:end
+popd
diff --git a/docs/requirements.txt b/docs/requirements.txt
new file mode 100644
index 0000000..96a9bed
--- /dev/null
+++ b/docs/requirements.txt
@@ -0,0 +1,4 @@
+# Documentation requirements
+Sphinx==8.2.3
+pydata-sphinx-theme==0.16.1
+sphinx-autodoc-typehints==1.18.1
diff --git a/ebmlite/__init__.py b/ebmlite/__init__.py
index 63bf32a..b6ecb9f 100644
--- a/ebmlite/__init__.py
+++ b/ebmlite/__init__.py
@@ -1,4 +1,10 @@
+"""
+EBMLite: A lightweight EBML parsing library. It is designed to crawl through
+EBML files quickly and efficiently, and that's about it.
+"""
+
from .core import *
from .core import SCHEMA_PATH, SCHEMATA, __all__
name = "ebmlite"
+__version__ = "3.4.0"
diff --git a/ebmlite/core.py b/ebmlite/core.py
index ae605ed..79bce58 100644
--- a/ebmlite/core.py
+++ b/ebmlite/core.py
@@ -1,40 +1,40 @@
-"""'''
+"""
EBMLite: A lightweight EBML parsing library. It is designed to crawl through
EBML files quickly and efficiently, and that's about it.
-
-@todo: Complete EBML encoding. Specifically, make 'master' elements write
- directly to the stream, rather than build bytearrays, so huge 'master'
- elements can be handled. It appears that the official spec may prohibit
- (or at least counter-indicate) multiple root elements. Possible
- compromise until proper fix: handle root 'master' elements differently
- than deeper ones, more like the current `Document`.
-@todo: Validation. Enforce the hierarchy defined in each schema.
-@todo: Optimize 'infinite' master elements (i.e `size` is `None`). See notes
- in `MasterElement` class' method definitions.
-@todo: Improved `MasterElement.__eq__()` method, possibly doing a recursive
- crawl of both elements and comparing the actual contents, or iterating
- over chunks of the raw binary data. Current implementation doesn't check
- element contents, just ID and payload size (for speed).
-@todo: Document-wide caching, for future handling of streamed data. Affects
- the longer-term streaming to-do (listed below) and optimization of
- 'infinite' elements (listed above).
-@todo: Clean up and standardize usage of the term 'size' versus 'length.'
-@todo: General documentation (more detailed than the README) and examples.
-@todo: Document the best way to load schemata in a PyInstaller executable.
-
-@todo: (longer term) Consider making schema loading automatic based on the EBML
- DocType, DocTypeVersion, and DocTypeReadVersion. Would mean a refactoring
- of how schemata are loaded.
-@todo: (longer term) Refactor to support streaming data. This will require
- modifying the indexing and iterating methods of `Document`. Also affects
- the document-wide caching to-do item, listed above.
-@todo: (longer term) Support the official Schema definition format. Start by
- adopting some of the attributes, specifically ``minOccurs`` and
- ``maxOccurs`` (they serve the function provided by the current
- ``mandatory`` and ``multiple`` attributes). Add ``range`` later.
- Eventually, recognize official schemata when loading, like the system
- currently handles legacy ``python-ebml`` schemata.
"""
+# :todo: Complete EBML encoding. Specifically, make 'master' elements write
+# directly to the stream, rather than build bytearrays, so huge 'master'
+# elements can be handled. It appears that the official spec may prohibit
+# (or at least counter-indicate) multiple root elements. Possible
+# compromise until proper fix: handle root 'master' elements differently
+# than deeper ones, more like the current `Document`.
+# :todo: Validation. Enforce the hierarchy defined in each schema.
+# :todo: Optimize 'infinite' master elements (i.e `size` is `None`). See notes
+# in `MasterElement` class' method definitions.
+# :todo: Improved `MasterElement.__eq__()` method, possibly doing a recursive
+# crawl of both elements and comparing the actual contents, or iterating
+# over chunks of the raw binary data. Current implementation doesn't check
+# element contents, just ID and payload size (for speed).
+# :todo: Document-wide caching, for future handling of streamed data. Affects
+# the longer-term streaming to-do (listed below) and optimization of
+# 'infinite' elements (listed above).
+# :todo: Clean up and standardize usage of the term 'size' versus 'length.'
+# :todo: General documentation (more detailed than the README) and examples.
+# :todo: Document the best way to load schemata in a PyInstaller executable.
+#
+# :todo: (longer term) Consider making schema loading automatic based on the EBML
+# DocType, DocTypeVersion, and DocTypeReadVersion. Would mean a refactoring
+# of how schemata are loaded.
+# :todo: (longer term) Refactor to support streaming data. This will require
+# modifying the indexing and iterating methods of `Document`. Also affects
+# the document-wide caching to-do item, listed above.
+# :todo: (longer term) Support the official Schema definition format. Start by
+# adopting some of the attributes, specifically ``minOccurs`` and
+# ``maxOccurs`` (they serve the function provided by the current
+# ``mandatory`` and ``multiple`` attributes). Add ``range`` later.
+# Eventually, recognize official schemata when loading, like the system
+# currently handles legacy ``python-ebml`` schemata.
+
__author__ = "David Randall Stokes, Connor Flanigan"
__copyright__ = "Copyright 2022, Mide Technology Corporation"
__credits__ = "David Randall Stokes, Connor Flanigan, Becker Awqatty, Derek Witt"
@@ -47,13 +47,13 @@
from ast import literal_eval
from datetime import datetime
import errno
-import importlib
+import importlib.resources as importlib_resources
from io import BytesIO, StringIO, IOBase
import os.path
from pathlib import Path
import re
-import sys
import types
+from typing import Any, BinaryIO, Dict, List, Optional, TextIO, Tuple, Union
from xml.etree import ElementTree as ET
from .decoding import readElementID, readElementSize
@@ -62,20 +62,6 @@
from . import encoding
from . import schemata
-# Dictionaries in Python 3.7+ are explicitly insert-ordered in all
-# implementations. If older, continue to use `collections.OrderedDict`.
-if sys.hexversion < 0x03070000:
- from collections import OrderedDict as Dict
-else:
- Dict = dict
-
-# Additionally, `importlib.resources.files` is new to 3.9 as well; this is
-# part of a work-around.
-if sys.hexversion < 0x03090000:
- importlib_resources = None
-else:
- import importlib.resources as importlib_resources
-
# ==============================================================================
#
# ==============================================================================
@@ -102,21 +88,22 @@ class Element(object):
""" Base class for all EBML elements. Each data type has its own subclass,
and these subclasses get subclassed when a Schema is read.
- @cvar id: The element's EBML ID.
- @cvar name: The element's name.
- @cvar schema: The `Schema` to which this element belongs.
- @cvar multiple: Can this element be appear multiple times? Note:
+ :var id: The element's EBML ID.
+ :var name: The element's name.
+ :var schema: The `Schema` to which this element belongs.
+ :var multiple: Can this element appear multiple times? Note:
Currently only enforced for encoding.
- @cvar mandatory: Must this element appear in all EBML files using
+ :var mandatory: Must this element appear in all EBML files using
this element's schema? Note: Not currently enforced.
- @cvar children: A list of valid child element types. Only applicable to
- `Document` and `Master` subclasses. Note: Not currently enforced.
- @cvar dtype: The element's native Python data type.
- @cvar precache: If `True`, the Element's value is read when the Element
+ :var children: A list of valid child element types. Only applicable to
+ `Document` and `Master` subclasses. Note: Not currently enforced;
+ only used when decoding 'infinite' length elements.
+ :var dtype: The element's native Python data type.
+ :var precache: If `True`, the Element's value is read when the Element
is parsed. if `False`, the value is lazy-loaded when needed.
Numeric element types default to `True`. Can be used to reduce
the number of file seeks, potentially speeding things up.
- @cvar length: An explicit length (in bytes) of the element when
+ :var length: An explicit length (in bytes) of the element when
encoding. `None` will use standard EBML variable-length encoding.
"""
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
@@ -124,6 +111,12 @@ class Element(object):
# Parent `Schema`
schema = None
+ # Element name
+ name = None
+
+ # Element EBML ID
+ id = None
+
# Python native data type.
dtype = bytearray
@@ -142,22 +135,25 @@ class Element(object):
# For python-ebml compatibility; not currently used.
children = None
- def parse(self, stream, size):
+ def parse(self, stream: BinaryIO, size: int):
""" Type-specific helper function for parsing the element's payload.
It is assumed the file pointer is at the start of the payload.
"""
# Document-wide caching could be implemented here.
return bytearray(stream.read(size))
- def __init__(self, stream=None, offset=0, size=0, payloadOffset=0):
+ def __init__(self, stream: BinaryIO = None,
+ offset: int = 0,
+ size: int = 0,
+ payloadOffset: int = 0):
""" Constructor. Instantiate a new Element from a file. In most cases,
elements should be created when a `Document` is loaded, rather
than instantiated explicitly.
- @keyword stream: A file-like object containing EBML data.
- @keyword offset: The element's starting location in the file.
- @keyword size: The size of the whole element.
- @keyword payloadOffset: The starting location of the element's
+ :param stream: A file-like object containing EBML data.
+ :param offset: The element's starting location in the file.
+ :param size: The size of the whole element.
+ :param payloadOffset: The starting location of the element's
payload (i.e. immediately after the element's header).
"""
self.stream = stream
@@ -166,11 +162,11 @@ def __init__(self, stream=None, offset=0, size=0, payloadOffset=0):
self.payloadOffset = payloadOffset
self._value = None
- def __repr__(self):
+ def __repr__(self) -> str:
return "<%s (ID:0x%02X), offset %s, size %s>" % \
(self.__class__.__name__, self.id, self.offset, self.size)
- def __eq__(self, other):
+ def __eq__(self, other) -> bool:
""" Equality check. Elements are considered equal if they are the same
type and have the same ID, size, offset, and schema. Note: element
value is not considered! Check for value equality explicitly
@@ -196,13 +192,13 @@ def value(self):
self._value = self.parse(self.stream, self.size)
return self._value
- def getRaw(self):
+ def getRaw(self) -> bytes:
""" Get the element's raw binary data, including EBML headers.
"""
self.stream.seek(self.offset)
return self.stream.read(self.size + (self.payloadOffset - self.offset))
- def getRawValue(self):
+ def getRawValue(self) -> bytes:
""" Get the raw binary of the element's value.
"""
self.stream.seek(self.payloadOffset)
@@ -212,7 +208,7 @@ def getRawValue(self):
# Caching (experimental)
# ==========================================================================
- def gc(self, recurse=False):
+ def gc(self, recurse=False) -> int:
""" Clear any cached values. To save memory and/or force values to be
re-read from the file. Returns the number of cached values cleared.
"""
@@ -227,23 +223,32 @@ def gc(self, recurse=False):
# ==========================================================================
@classmethod
- def encodePayload(cls, data, length=None):
+ def encodePayload(cls, data: Any, length: Optional[int] = None) -> bytes:
""" Type-specific payload encoder. """
return encoding.encodeBinary(data, length)
+
@classmethod
- def encode(cls, value, length=None, lengthSize=None, infinite=False):
+ def encode(cls,
+ value: Any,
+ length: Optional[int] = None,
+ lengthSize: Optional[int] = None,
+ infinite: bool = False) -> bytes:
""" Encode an EBML element.
- @param value: The value to encode, or a list of values to encode.
+ :param value: The value to encode, or a list of values to encode.
If a list is provided, each item will be encoded as its own
element.
- @keyword length: An explicit length for the encoded data,
+ :param length: An explicit length for the encoded data,
overriding the variable length encoding. For producing
byte-aligned structures.
- @keyword lengthSize: An explicit length for the encoded element
+ :param lengthSize: An explicit length for the encoded element
size, overriding the variable length encoding.
- @return: A bytearray containing the encoded EBML data.
+ :param infinite: If `True`, the element will be marked as being
+ 'infinite'. Infinite elements are read until an element is
+ encountered that is not defined as a valid child in the
+ schema.
+ :return: A bytearray containing the encoded EBML data.
"""
if infinite and not issubclass(cls, MasterElement):
raise ValueError("Only Master elements can have 'infinite' lengths")
@@ -285,14 +290,14 @@ def __eq__(self, other):
return False
return self.value == other.value
- def parse(self, stream, size):
+ def parse(self, stream: BinaryIO, size: int) -> int:
""" Type-specific helper function for parsing the element's payload.
It is assumed the file pointer is at the start of the payload.
"""
return readInt(stream, size)
@classmethod
- def encodePayload(cls, data, length=None):
+ def encodePayload(cls, data: int, length: int = None) -> bytes:
""" Type-specific payload encoder for signed integer elements. """
return encoding.encodeInt(data, length)
@@ -308,14 +313,14 @@ class UIntegerElement(IntegerElement):
dtype = int
precache = True
- def parse(self, stream, size):
+ def parse(self, stream: BinaryIO, size: int) -> int:
""" Type-specific helper function for parsing the element's payload.
It is assumed the file pointer is at the start of the payload.
"""
return readUInt(stream, size)
@classmethod
- def encodePayload(cls, data, length=None):
+ def encodePayload(cls, data: int, length: int = None) -> bytes:
""" Type-specific payload encoder for unsigned integer elements. """
return encoding.encodeUInt(data, length)
@@ -336,14 +341,14 @@ def __eq__(self, other):
return False
return self.value == other.value
- def parse(self, stream, size):
+ def parse(self, stream: BinaryIO, size: int) -> float:
""" Type-specific helper function for parsing the element's payload.
It is assumed the file pointer is at the start of the payload.
"""
return readFloat(stream, size)
@classmethod
- def encodePayload(cls, data, length=None):
+ def encodePayload(cls, data: float, length: int = None) -> bytes:
""" Type-specific payload encoder for floating point elements. """
return encoding.encodeFloat(data, length)
@@ -366,14 +371,14 @@ def __eq__(self, other):
def __len__(self):
return self.size
- def parse(self, stream, size):
+ def parse(self, stream: BinaryIO, size: int) -> str:
""" Type-specific helper function for parsing the element's payload.
It is assumed the file pointer is at the start of the payload.
"""
return readString(stream, size)
@classmethod
- def encodePayload(cls, data, length=None):
+ def encodePayload(cls, data: str, length: int = None) -> bytes:
""" Type-specific payload encoder for ASCII string elements. """
return encoding.encodeString(data, length)
@@ -388,18 +393,18 @@ class UnicodeElement(StringElement):
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
dtype = str
- def __len__(self):
+ def __len__(self) -> int:
# Value may be multiple bytes per character
return len(self.value)
- def parse(self, stream, size):
+ def parse(self, stream: BinaryIO, size: int) -> str:
""" Type-specific helper function for parsing the element's payload.
It is assumed the file pointer is at the start of the payload.
"""
return readUnicode(stream, size)
@classmethod
- def encodePayload(cls, data, length=None):
+ def encodePayload(cls, data: str, length: int = None) -> bytes:
""" Type-specific payload encoder for Unicode string elements. """
return encoding.encodeUnicode(data, length)
@@ -414,14 +419,14 @@ class DateElement(IntegerElement):
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
dtype = datetime
- def parse(self, stream, size):
+ def parse(self, stream: BinaryIO, size: int) -> datetime:
""" Type-specific helper function for parsing the element's payload.
It is assumed the file pointer is at the start of the payload.
"""
return readDate(stream, size)
@classmethod
- def encodePayload(cls, data, length=None):
+ def encodePayload(cls, data: datetime, length: Optional[int] = None) -> bytes:
""" Type-specific payload encoder for date elements. """
return encoding.encodeDate(data, length)
@@ -450,11 +455,13 @@ class VoidElement(BinaryElement):
"""
__slots__ = ("stream", "offset", "size", "sizeLength", "payloadOffset", "_value")
- def parse(self, stream, size):
+ def parse(self,
+ stream: BinaryIO,
+ size: Optional[int]) -> bytearray:
return bytearray()
@classmethod
- def encodePayload(cls, data, length=0):
+ def encodePayload(cls, data: Any, length: int = 0) -> bytearray:
""" Type-specific payload encoder for Void elements. """
length = 0 if length is None else length
return bytearray(b'\xff' * length)
@@ -463,6 +470,7 @@ def encodePayload(cls, data, length=0):
# ==============================================================================
+# noinspection PyDunderSlots
class UnknownElement(BinaryElement):
""" Special case ``Unknown`` element, used for elements with IDs not
present in a schema. Unlike other elements, each instance has its own
@@ -473,21 +481,26 @@ class UnknownElement(BinaryElement):
name = "UnknownElement"
precache = False
- def __init__(self, stream=None, offset=0, size=0, payloadOffset=0, eid=None,
- schema=None):
+ def __init__(self,
+ stream: Optional[BinaryIO] = None,
+ offset: int = 0,
+ size: int = 0,
+ payloadOffset: int = 0,
+ eid: Optional[int] = None,
+ schema: Optional["Schema"] = None):
""" Constructor. Instantiate a new `UnknownElement` from a file. In
most cases, elements should be created when a `Document` is loaded,
rather than instantiated explicitly.
- @keyword stream: A file-like object containing EBML data.
- @keyword offset: The element's starting location in the file.
- @keyword size: The size of the whole element.
- @keyword payloadOffset: The starting location of the element's
+ :param stream: A file-like object containing EBML data.
+ :param offset: The element's starting location in the file.
+ :param size: The size of the whole element.
+ :param payloadOffset: The starting location of the element's
payload (i.e. immediately after the element's header).
- @keyword id: The unknown element's ID. Unlike 'normal' elements,
+ :param eid: The unknown element's ID. Unlike 'normal' elements,
in which ID is a class attribute, each UnknownElement instance
explicitly defines this.
- @keyword schema: The schema used to load the element. Specified
+ :param schema: The schema used to load the element. Specified
explicitly because `UnknownElement`s are not part of any
schema.
"""
@@ -496,7 +509,7 @@ def __init__(self, stream=None, offset=0, size=0, payloadOffset=0, eid=None,
self.id = eid
self.schema = schema
- def __eq__(self, other):
+ def __eq__(self, other) -> bool:
""" Equality check. Unknown elements are considered equal if they have
the same ID and value. Note that this differs from the criteria
used for other element classes!
@@ -522,24 +535,30 @@ class MasterElement(Element):
"_size", "_length")
dtype = list
- def parse(self):
+ _childIds = None
+
+ def parse(self, *args) -> List[Element]:
""" Type-specific helper function for parsing the element's payload.
+ This is a special case; parameters `stream` and `size` are not
+ used.
"""
# Special case; unlike other elements, value() property doesn't call
# parse(). Used only when pre-caching.
return self.value
- def parseElement(self, stream, nocache=False):
+ def parseElement(self,
+ stream: BinaryIO,
+ nocache: bool = False) -> Tuple[Element, int]:
""" Read the next element from a stream, instantiate a `MasterElement`
object, and then return it and the offset of the next element
(this element's position + size).
- @param stream: The source file-like stream.
- @keyword nocache: If `True`, the parsed element's `precache`
+ :param stream: The source file-like stream.
+ :param nocache: If `True`, the parsed element's `precache`
attribute is ignored, and the element's value will not be
cached. For faster iteration when the element value doesn't
matter (e.g. counting child elements).
- @return: The parsed element and the offset of the next element
+ :return: The parsed element and the offset of the next element
(i.e. the end of the parsed element).
"""
offset = stream.tell()
@@ -561,7 +580,7 @@ def parseElement(self, stream, nocache=False):
return el, payloadOffset + el.size
@classmethod
- def _isValidChild(cls, elId):
+ def _isValidChild(cls, elId: int) -> bool:
""" Is the given element ID represent a valid sub-element, i.e.
explicitly specified as a child element or a 'global' in the
schema?
@@ -569,17 +588,11 @@ def _isValidChild(cls, elId):
if not cls.children:
return False
- try:
- return elId in cls._childIds
- except AttributeError:
- # The set of valid child IDs hasn't been created yet.
- cls._childIds = set(cls.children)
- if cls.schema is not None:
- cls._childIds.update(cls.schema.globals)
- return elId in cls._childIds
+ return elId in cls.children or elId in cls.schema.globals
+
@property
- def size(self):
+ def size(self) -> int:
""" The element's size. Master elements can be instantiated with this
as `None`; this denotes an 'infinite' EBML element, and its size
will be determined by iterating over its contents until an invalid
@@ -589,7 +602,7 @@ def size(self):
return self._size
except AttributeError:
# An "infinite" element (size specified in file is all 0xFF)
- pos = end = self.payloadOffset
+ pos = self.payloadOffset
numChildren = 0
while True:
self.stream.seek(pos)
@@ -613,13 +626,13 @@ def size(self):
return self._size
@size.setter
- def size(self, esize):
+ def size(self, esize: Optional[int]):
if esize is not None:
# Only create the `_size` attribute for a real value. Don't
# define it if it's `None`, so `size` will get calculated.
self._size = esize
- def __iter__(self, nocache=False):
+ def __iter__(self, nocache: bool = False):
""" x.__iter__() <==> iter(x)
"""
# TODO: Better support for 'infinite' elements (getting the size of
@@ -637,7 +650,7 @@ def __iter__(self, nocache=False):
break
raise
- def __len__(self):
+ def __len__(self) -> int:
""" x.__len__() <==> len(x)
"""
try:
@@ -653,7 +666,7 @@ def __len__(self):
return self._length
@property
- def value(self):
+ def value(self) -> List[Element]:
""" Parse and cache the element's value.
"""
if self._value is not None:
@@ -661,7 +674,7 @@ def value(self):
self._value = list(self)
return self._value
- def __getitem__(self, *args):
+ def __getitem__(self, *args) -> Element:
# TODO: Parse only the requested item(s), like `Document`
return self.value.__getitem__(*args)
@@ -669,7 +682,7 @@ def __getitem__(self, *args):
# Caching (experimental!)
# ==========================================================================
- def gc(self, recurse=False):
+ def gc(self, recurse: bool = False) -> int:
""" Clear any cached values. To save memory and/or force values to be
re-read from the file.
"""
@@ -685,7 +698,9 @@ def gc(self, recurse=False):
# ==========================================================================
@classmethod
- def encodePayload(cls, data, length=None):
+ def encodePayload(cls,
+ data: Union[Dict[str, Any], List[Tuple[str, Any]], None],
+ length: Optional[int] = None):
""" Type-specific payload encoder for 'master' elements.
"""
result = bytearray()
@@ -705,17 +720,26 @@ def encodePayload(cls, data, length=None):
return result
@classmethod
- def encode(cls, data, length=None, lengthSize=None, infinite=False):
+ def encode(cls,
+ data: Union[Dict[str, Any], List[Tuple[str, Any]]],
+ length: Optional[int] = None,
+ lengthSize: Optional[int] = None,
+ infinite: bool = False) -> bytes:
""" Encode an EBML master element.
- @param data: The data to encode, provided as a dictionary keyed by
+ :param data: The data to encode, provided as a dictionary keyed by
element name, a list of two-item name/value tuples, or a list
of either. Note: individual items in a list of name/value
pairs *must* be tuples!
- @keyword infinite: If `True`, the element will be written with an
+ :param length: An explicit length for the encoded data,
+ overriding the variable length encoding. For producing
+ byte-aligned structures.
+ :param lengthSize: An explicit length for the encoded element
+ size, overriding the variable length encoding.
+ :param infinite: If `True`, the element will be written with an
undefined size. When parsed, its end will be determined by the
occurrence of an invalid child element (or end-of-file).
- @return: A bytearray containing the encoded EBML binary.
+ :return: A bytearray containing the encoded EBML binary.
"""
# TODO: Use 'length' to automatically generate `Void` element?
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], list):
@@ -734,18 +758,18 @@ def encode(cls, data, length=None, lengthSize=None, infinite=False):
lengthSize=lengthSize,
infinite=infinite)
- def dump(self):
+ def dump(self) -> Dict[str, Any]:
""" Dump this element's value as nested dictionaries, keyed by
element name. The values of 'multiple' elements return as lists.
Note: The order of 'multiple' elements relative to other elements
will be lost; a file containing elements ``A1 B1 A2 B2 A3 B3`` will
result in``[A1 A2 A3][B1 B2 B3]``.
- @todo: Decide if this should be in the `util` submodule. It is
+ :todo: Decide if this should be in the `util` submodule. It is
very specific, and it isn't totally necessary for the core
library.
"""
- result = Dict()
+ result = {}
for el in self:
if el.multiple:
result.setdefault(el.name, []).append(el.dump())
@@ -764,24 +788,28 @@ class Document(MasterElement):
Loading a `Schema` generates a subclass.
"""
- def __init__(self, stream, name=None, size=None, headers=True):
+ def __init__(self,
+ stream: BinaryIO,
+ name: Optional[str] = None,
+ size: Optional[int] = None,
+ headers: bool = True):
""" Constructor. Instantiate a `Document` from a file-like stream.
In most cases, `Schema.load()` should be used instead of
explicitly instantiating a `Document`.
- @param stream: A stream object (e.g. a file) from which to read
+ :param stream: A stream object (e.g. a file) from which to read
the EBML content.
- @keyword name: The name of the document. Defaults to the filename
+ :param name: The name of the document. Defaults to the filename
(if applicable).
- @keyword size: The size of the document, in bytes. Use if the
- stream is neither a file or a `BytesIO` object.
- @keyword headers: If `False`, the file's ``EBML`` header element
+ :param size: The size of the document, in bytes. Use if the
+ stream is neither a file nor a `BytesIO` object.
+ :param headers: If `False`, the file's ``EBML`` header element
(if present) will not appear as a root element in the document.
The contents of the ``EBML`` element will always be read,
regardless, and stored in the Document's `info` attribute.
"""
self._ownsStream = False
- if isinstance(stream, (str, bytes, bytearray)):
+ if isinstance(stream, (str, Path)):
stream = open(stream, 'rb')
self._ownsStream = True
@@ -825,12 +853,12 @@ def __init__(self, stream, name=None, size=None, headers=True):
self.info = el.dump()
if not headers:
self.payloadOffset = pos
- except:
+ except Exception:
# Failed to read the first element. Don't raise here; do that when
# the Document is actually used.
pass
- def __repr__(self):
+ def __repr__(self) -> str:
""" "x.__repr__() <==> repr(x) """
if self.name == self.__class__.__name__:
return object.__repr__(self)
@@ -855,7 +883,7 @@ def close(self):
if self._ownsStream:
self.stream.close()
- def __len__(self):
+ def __len__(self) -> int:
""" x.__len__() <==> len(x)
Not recommended for huge documents.
"""
@@ -868,7 +896,7 @@ def __len__(self):
self._length = n
return self._length
- def __iter__(self, nocache=False):
+ def __iter__(self, nocache: bool = False):
""" Iterate root elements.
"""
# TODO: Cache root elements, prevent unnecessary duplicates. Maybe a
@@ -894,7 +922,7 @@ def value(self):
# 'value' not really applicable to a document; return an iterator.
return iter(self)
- def __getitem__(self, idx):
+ def __getitem__(self, idx: int) -> Element:
""" Get one of the document's root elements by index.
"""
# TODO: Cache parsed root elements, handle indexing dynamically.
@@ -908,19 +936,19 @@ def __getitem__(self, idx):
if n is None:
# If object being enumerated is empty, `n` is never set.
raise IndexError("Document contained no readable data")
- raise IndexError("list index out of range (0-%d)" % n)
+ raise IndexError("list index out of range (0-{})".format(n))
elif isinstance(idx, slice):
raise IndexError("Document root slicing not (yet) supported")
else:
raise TypeError("list indices must be integers, not %s" % type(idx))
@property
- def version(self):
+ def version(self) -> int:
""" The document's type version (i.e. the EBML ``DocTypeVersion``). """
return self.info.get('DocTypeVersion')
@property
- def type(self):
+ def type(self) -> str:
""" The document's type name (i.e. the EBML ``DocType``). """
return self.info.get('DocType')
@@ -928,7 +956,7 @@ def type(self):
# Caching (experimental!)
# ==========================================================================
- def gc(self, recurse=False):
+ def gc(self, recurse: bool = False) -> int:
# TODO: Implement this if/when caching of root elements is implemented.
return 0
@@ -937,18 +965,18 @@ def gc(self, recurse=False):
# ==========================================================================
@classmethod
- def _createHeaders(cls):
+ def _createHeaders(cls) -> Dict[str, Any]:
""" Create the default EBML 'header' elements for a Document, using
the default values in the schema.
- @return: A dictionary containing a single key (``EBML``) with a
+ :return: A dictionary containing a single key (``EBML``) with a
dictionary as its value. The child dictionary contains
element names and values.
"""
if 'EBML' not in cls.schema:
return {}
- headers = Dict()
+ headers = {}
for elName, elType in (('EBMLVersion', int),
('EBMLReadVersion', int),
('DocType', str),
@@ -959,17 +987,23 @@ def _createHeaders(cls):
if v is not None:
headers[elName] = v
- return Dict(EBML=headers)
+ return dict(EBML=headers)
@classmethod
- def encode(cls, stream, data, headers=False, **kwargs):
+ def encode(cls,
+ stream: BinaryIO,
+ data: Union[Dict[str, Any], List[Tuple[str, Any]]],
+ headers: bool = False, **kwargs):
""" Encode an EBML document.
- @param value: The data to encode, provided as a dictionary keyed
+ :param stream:
+ :param data: The data to encode, provided as a dictionary keyed
by element name, or a list of two-item name/value tuples.
Note: individual items in a list of name/value pairs *must*
be tuples!
- @return: A bytearray containing the encoded EBML binary.
+ :param headers: If `True`, include the standard ``EBML`` header
+ element.
+ :return: A bytearray containing the encoded EBML binary.
"""
if headers is True:
stream.write(cls.encodePayload(cls._createHeaders()))
@@ -996,23 +1030,26 @@ class Schema(object):
the document and element types, this is not a base class; all schemata
are actual instances of this class.
- @ivar document: The schema's Document subclass.
- @ivar elements: A dictionary mapping element IDs to the schema's
+ Schema instances are typically created by loading and XML schema file
+ using :func:`loadSchema` or a byte string using :func:`parseSchema`.
+
+ :ivar document: The schema's Document subclass.
+ :ivar elements: A dictionary mapping element IDs to the schema's
corresponding `Element` subclasses.
- @ivar elementsByName: A dictionary mapping element names to the
+ :ivar elementsByName: A dictionary mapping element names to the
schema's corresponding `Element` subclasses.
- @ivar elementInfo: A dictionary mapping IDs to the raw schema
+ :ivar elementInfo: A dictionary mapping IDs to the raw schema
attribute data. It may have additional items not present in the
created element class' attributes.
- @ivar UNKNOWN: A class/function that handles unknown element IDs. By
+ :ivar UNKNOWN: A class/function that handles unknown element IDs. By
default, this is the `UnknownElement` class. Special-case handling
can be done by substituting a different class, or an
element-producing factory function.
- @ivar source: The source from which the Schema was loaded; either a
+ :ivar source: The source from which the Schema was loaded; either a
filename or a file-like stream.
- @ivar filename: The absolute path of the source file, if the source
+ :ivar filename: The absolute path of the source file, if the source
was a file or a filename.
"""
@@ -1046,18 +1083,20 @@ class Schema(object):
# factory function.
UNKNOWN = UnknownElement
- def __init__(self, source, name=None):
+ def __init__(self,
+ source: Union[str, Path, TextIO],
+ name: Optional[str] = None):
""" Constructor. Creates a new Schema from a schema description XML.
- @param source: The Schema's source, either a string with the full
+ :param source: The Schema's source, either a string with the full
path and name of the schema XML file, or a file-like stream.
- @keyword name: The schema's name. Defaults to the document type
+ :param name: The schema's name. Defaults to the document type
element's default value (if defined) or the base file name.
"""
self.source = source
self.filename = None
- if isinstance(source, (str, bytes, bytearray)):
+ if isinstance(source, (str, Path)):
self.filename = os.path.realpath(source)
elif hasattr(source, "name"):
self.filename = os.path.realpath(source.name)
@@ -1067,7 +1106,7 @@ def __init__(self, source, name=None):
self.elementInfo = {} # Raw element schema attributes, keyed by ID
self.globals = {} # Elements valid for any parent, by ID
- self.children = {} # Valid root elements, by ID
+ self.children = set() # Valid root elements, by ID
# Parse, using the correct method for the schema format.
schema = ET.parse(source)
@@ -1158,8 +1197,13 @@ def _parseSchema(self, el, parent=None):
for chEl in el:
self._parseSchema(chEl, cls)
- def addElement(self, eid, ename, baseClass, attribs={}, parent=None,
- docs=None):
+ def addElement(self,
+ eid: int,
+ ename: str,
+ baseClass,
+ attribs: Optional[Dict[str, Any]] = None,
+ parent=None,
+ docs: Optional[str] = None):
""" Create a new `Element` subclass and add it to the schema.
Duplicate elements are permitted (e.g. if one kind of element can
@@ -1168,23 +1212,16 @@ def addElement(self, eid, ename, baseClass, attribs={}, parent=None,
schema must contain the required ID, name, and type; successive
appearances only need the ID and/or name.
- @param eid: The element's EBML ID.
- @param ename: The element's name.
- @keyword multiple: If `True`, an EBML document can contain more
- than one of this element. Not currently enforced.
- @keyword mandatory: If `True`, a valid EBML document requires one
- (or more) of this element. Not currently enforced.
- @keyword length: A fixed length to use when writing the element.
- `None` will use the minimum length required.
- @keyword precache: If `True`, the element's value will be read
- when the element is parsed, rather than when the value is
- explicitly accessed. Can save time for small elements.
- @keyword attribs: A dictionary of raw element attributes, as read
+ :param eid: The element's EBML ID.
+ :param ename: The element's name.
+ :param baseClass: The base `Element` class.
+ :param attribs: A dictionary of raw element attributes, as read
from the schema file.
- @keyword parent: The new element's parent element class.
- @keyword docs: The new element's docstring (e.g. the defining XML
+ :param parent: The new element's parent element class.
+ :param docs: The new element's docstring (e.g. the defining XML
element's text content).
"""
+ attribs = {} if attribs is None else attribs
def _getBool(d, k, default):
""" Helper function to get a dictionary value cast to bool. """
@@ -1265,7 +1302,7 @@ def _getInt(d, k, default):
{'id': eid, 'name': ename, 'schema': self,
'mandatory': mandatory, 'multiple': multiple,
'precache': precache, 'length': length,
- 'children': dict(), '__doc__': docs,
+ 'children': set(), '__doc__': docs,
'__slots__': baseClass.__slots__})
self.elements[eid] = eclass
@@ -1277,8 +1314,8 @@ def _getInt(d, k, default):
parent = parent or self
if parent.children is None:
- parent.children = {}
- parent.children[eid] = eclass
+ parent.children = set()
+ parent.children.add(eid)
return eclass
@@ -1293,7 +1330,7 @@ def __repr__(self):
except AttributeError:
return object.__repr__(self)
- def __eq__(self, other):
+ def __eq__(self, other) -> bool:
""" Equality check. Schemata are considered equal if the attributes of
their elements match.
"""
@@ -1302,53 +1339,57 @@ def __eq__(self, other):
except AttributeError:
return False
- def __contains__(self, key):
+ def __contains__(self, key: Union[str, int]):
""" Does the Schema contain a given element name or ID? """
return (key in self.elementsByName) or (key in self.elements)
- def __getitem__(self, key):
+ def __getitem__(self, key: Union[str, int]):
""" Get an Element class from the schema, by name or by ID. """
try:
return self.elements[key]
except KeyError:
return self.elementsByName[key]
- def get(self, key, default=None):
+ def get(self, key: Union[str, int, None], default=None):
if key in self:
return self[key]
return default
- def load(self, fp, name=None, headers=False, **kwargs):
+ def load(self,
+ fp: BinaryIO,
+ name: Optional[str] = None,
+ headers: bool = False,
+ **kwargs) -> Document:
""" Load an EBML file using this Schema.
- @param fp: A file-like object containing the EBML to load, or the
+ :param fp: A file-like object containing the EBML to load, or the
name of an EBML file.
- @keyword name: The name of the document. Defaults to filename.
- @keyword headers: If `False`, the file's ``EBML`` header element
+ :param name: The name of the document. Defaults to filename.
+ :param headers: If `False`, the file's ``EBML`` header element
(if present) will not appear as a root element in the
document. The contents of the ``EBML`` element will always be
read.
"""
return self.document(fp, name=name, headers=headers, **kwargs)
- def loads(self, data, name=None):
+ def loads(self, data: bytes, name: Optional[str] = None) -> Document:
""" Load EBML from a string using this Schema.
- @param data: A string or bytearray containing raw EBML data.
- @keyword name: The name of the document. Defaults to the Schema's
+ :param data: A string or bytearray containing raw EBML data.
+ :param name: The name of the document. Defaults to the Schema's
document class name.
"""
return self.load(BytesIO(data), name=name)
- def __call__(self, fp, name=None):
+ def __call__(self, fp: BinaryIO, name: Optional[str] = None):
""" Load an EBML file using this Schema. Same as `Schema.load()`.
- @todo: Decide if this is worth keeping. It exists for historical
+ :todo: Decide if this is worth keeping. It exists for historical
reasons that may have been refactored out.
- @param fp: A file-like object containing the EBML to load, or the
+ :param fp: A file-like object containing the EBML to load, or the
name of an EBML file.
- @keyword name: The name of the document. Defaults to filename.
+ :param name: The name of the document. Defaults to filename.
"""
return self.load(fp, name=name)
@@ -1364,12 +1405,12 @@ def _getInfo(self, eid, dtype):
return None
@property
- def version(self):
+ def version(self) -> int:
""" Schema version, extracted from EBML ``DocTypeVersion`` default. """
return self._getInfo(0x4287, int) # ID of EBML 'DocTypeVersion'
@property
- def type(self):
+ def type(self) -> str:
""" Schema type name, extracted from EBML ``DocType`` default. """
return self._getInfo(0x4282, str) # ID of EBML 'DocType'
@@ -1377,32 +1418,42 @@ def type(self):
# Encoding
# ==========================================================================
- def encode(self, stream, data, headers=False):
+ def encode(self,
+ stream: BinaryIO,
+ data: Union[Dict[str, Any], List[Tuple[str, Any]]],
+ headers: bool = False):
""" Write an EBML document using this Schema to a file or file-like
stream.
- @param stream: The file (or ``.write()``-supporting file-like
+ :param stream: The file (or ``.write()``-supporting file-like
object) to which to write the encoded EBML.
- @param data: The data to encode, provided as a dictionary keyed by
+ :param data: The data to encode, provided as a dictionary keyed by
element name, or a list of two-item name/value tuples. Note:
individual items in a list of name/value pairs *must* be tuples!
+ :param headers: If `True`, include the standard ``EBML`` header
+ element.
"""
self.document.encode(stream, data, headers=headers)
return stream
- def encodes(self, data, headers=False):
+ def encodes(self,
+ data: Union[Dict[str, Any], List[Tuple[str, Any]]],
+ headers: bool = False) -> bytes:
""" Create an EBML document using this Schema, returned as a string.
- @param data: The data to encode, provided as a dictionary keyed by
- element name, or a list of two-item name/value tuples. Note:
- individual items in a list of name/value pairs *must* be tuples!
- @return: A string containing the encoded EBML binary.
+ :param data: The data to encode, provided as a dictionary keyed
+ by element name, or a list of two-item name/value tuples.
+ Note: individual items in a list of name/value pairs *must*
+ be tuples!
+ :param headers: If `True`, include the standard ``EBML`` header
+ element.
+ :return: A string containing the encoded EBML binary.
"""
stream = BytesIO()
self.encode(stream, data, headers=headers)
return stream.getvalue()
- def verify(self, data):
+ def verify(self, data: bytes) -> bool:
""" Perform basic tests on EBML binary data, ensuring it can be parsed
using this `Schema`. Failure will raise an expression.
"""
@@ -1426,17 +1477,18 @@ def _crawl(el):
#
# ==============================================================================
-def _expandSchemaPath(path, name=''):
+def _expandSchemaPath(path: Union[str, Path, types.ModuleType],
+ name: Union[str, Path] = '') -> Path:
""" Helper function to process a schema path or name, converting module
references to Paths.
- @param path: The schema path. May be a directory name, a module
+ :param path: The schema path. It may be a directory name, a module
name in braces (e.g., `{idelib.schemata}`), or a module
instance. Directory and module names may contain schema
filenames.
- @param name: An optional schema base filename. Will get appended
+ :param name: An optional schema base filename. Will get appended
to the resulting `Path`/`Traversable`.
- @return: A `Path`/`Traversable` object.
+ :return: A `Path`/`Traversable` object.
"""
strpath = str(path)
subdir = ''
@@ -1447,34 +1499,25 @@ def _expandSchemaPath(path, name=''):
if '}' not in strpath:
raise IOError(errno.ENOENT, 'Malformed module path', strpath)
- m = re.match(r'(\{.+\})[/\\](.+)', strpath)
+ m = re.match(r'(\{.+})[/\\](.+)', strpath)
if m:
path, subdir = m.groups()
strpath = path
- if importlib_resources:
- if isinstance(path, types.ModuleType):
- return importlib_resources.files(path) / subdir / name
- elif '{' in strpath:
- return importlib_resources.files(strpath.strip('{} ')) / subdir / name
- else:
- # Pre-3.9: Use naive means of finding the module path. Won't work in
- # some cases (module is a zip, etc.); it's just a fallback. To be
- # deprecated.
- if isinstance(path, types.ModuleType):
- path = os.path.dirname(path.__file__)
- elif '{' in strpath:
- path = os.path.dirname(importlib.import_module(strpath.strip('{}')).__file__)
+ if isinstance(path, types.ModuleType):
+ return importlib_resources.files(path) / subdir / name
+ elif '{' in strpath:
+ return importlib_resources.files(strpath.strip('{} ')) / subdir / name
return Path(path) / subdir / name
-def listSchemata(*paths, absolute=True):
+def listSchemata(*paths, absolute: bool = True) -> Dict[str, List[Schema]]:
""" Gather all EBML schemata. `ebmlite.SCHEMA_PATH` is used by default;
alternatively, one or more paths or modules can be supplied as
arguments.
- @returns: A dictionary of schema files. Keys are the base name of the
+ :returns: A dictionary of schema files. Keys are the base name of the
schema XML, values are lists of full paths to the XML. The first
filename in the list is what will load if the base name is used
with `loadSchema()`.
@@ -1506,24 +1549,27 @@ def listSchemata(*paths, absolute=True):
return schemata
-def loadSchema(filename, reload=False, paths=None, **kwargs):
+def loadSchema(filename: Union[str, Path],
+ reload: bool = False,
+ paths: Optional[str] = None,
+ **kwargs) -> Schema:
""" Import a Schema XML file. Loading the same file more than once will
return the initial instantiation, unless `reload` is `True`.
- @param filename: The name of the Schema XML file. If the file cannot
+ :param filename: The name of the Schema XML file. If the file cannot
be found and file's path is not absolute, the paths listed in
`SCHEMA_PATH` will be searched (similar to `sys.path` when
importing modules).
- @param reload: If `True`, the resulting Schema is guaranteed to be
+ :param reload: If `True`, the resulting Schema is guaranteed to be
new. Note: existing references to previous instances of the
Schema and/or its elements will not update.
- @param paths: A list of paths to search for schemata, an alternative
+ :param paths: A list of paths to search for schemata, an alternative
to `ebmlite.SCHEMA_PATH`
Additional keyword arguments are sent verbatim to the `Schema`
constructor.
- @raises: IOError, ModuleNotFoundError
+ :raises: IOError, ModuleNotFoundError
"""
global SCHEMATA
@@ -1564,16 +1610,19 @@ def loadSchema(filename, reload=False, paths=None, **kwargs):
return schema
-def parseSchema(src, name=None, reload=False, **kwargs):
+def parseSchema(src: str,
+ name: Optional[str] = None,
+ reload: bool = False,
+ **kwargs) -> Schema:
""" Read Schema XML data from a string or stream. Loading one with the
same `name` will return the initial instantiation, unless `reload`
is `True`. Calls to `loadSchema()` using a name previously used with
`parseSchema()` will also return the previously instantiated Schema.
- @param src: The XML string, or a stream containing XML.
- @param name: The name of the schema. If none is supplied,
+ :param src: The XML string, or a stream containing XML.
+ :param name: The name of the schema. If none is supplied,
the name defined within the schema will be used.
- @param reload: If `True`, the resulting Schema is guaranteed to be
+ :param reload: If `True`, the resulting Schema is guaranteed to be
new. Note: existing references to previous instances of the
Schema and/or its elements will not update.
diff --git a/ebmlite/decoding.py b/ebmlite/decoding.py
index 2322997..bb2dbb0 100644
--- a/ebmlite/decoding.py
+++ b/ebmlite/decoding.py
@@ -15,6 +15,7 @@
from datetime import datetime, timedelta
import struct
+from typing import BinaryIO, Optional, Tuple
import warnings
# ==============================================================================
@@ -42,10 +43,10 @@
# --- Reading and Decoding
# ==============================================================================
-def decodeIntLength(byte):
+def decodeIntLength(byte: int) -> Tuple[int, int]:
""" Extract the encoded size from an initial byte.
- @return: The size, and the byte with the size removed (it is the first
+ :return: The size, and the byte with the size removed (it is the first
byte of the value).
"""
# An inelegant implementation, but it's fast.
@@ -67,11 +68,11 @@ def decodeIntLength(byte):
return 8, 0
-def decodeIDLength(byte):
+def decodeIDLength(byte: int) -> Tuple[int, int]:
""" Extract the encoded ID size from an initial byte.
- @return: The size and the original byte (it is part of the ID).
- @raise IOError: raise if the length of an ID is invalid.
+ :return: The size and the original byte (it is part of the ID).
+ :raise IOError: raise if the length of an ID is invalid.
"""
if byte >= 128:
return 1, byte
@@ -86,12 +87,12 @@ def decodeIDLength(byte):
raise IOError('Invalid length for ID: %d' % length)
-def readElementID(stream):
+def readElementID(stream: BinaryIO) -> Tuple[int, int]:
""" Read an element ID from a file (or file-like stream).
- @param stream: The source file-like object.
- @return: The decoded element ID and its length in bytes.
- @raise IOError: raised if the length of the ID of an element is greater than 4 bytes.
+ :param stream: The source file-like object.
+ :return: The decoded element ID and its length in bytes.
+ :raise IOError: raised if the length of the ID of an element is greater than 4 bytes.
"""
ch = stream.read(1)
length, eid = decodeIDLength(ord(ch))
@@ -104,11 +105,11 @@ def readElementID(stream):
return eid, length
-def readElementSize(stream):
+def readElementSize(stream: BinaryIO) -> Tuple[Optional[int], int]:
""" Read an element size from a file (or file-like stream).
- @param stream: The source file-like object.
- @return: The decoded size (or `None`) and the length of the
+ :param stream: The source file-like object.
+ :return: The decoded size (or `None`) and the length of the
descriptor in bytes.
"""
ch = stream.read(1)
@@ -126,12 +127,12 @@ def readElementSize(stream):
return size, length
-def readUInt(stream, size):
+def readUInt(stream: BinaryIO, size: int) -> int:
""" Read an unsigned integer from a file (or file-like stream).
- @param stream: The source file-like object.
- @param size: The number of bytes to read from the stream.
- @return: The decoded value.
+ :param stream: The source file-like object.
+ :param size: The number of bytes to read from the stream.
+ :return: The decoded value.
"""
if size == 0:
@@ -141,12 +142,12 @@ def readUInt(stream, size):
return _struct_uint64_unpack_from(data.rjust(8, b'\x00'))[0]
-def readInt(stream, size):
+def readInt(stream: BinaryIO, size: int) -> int:
""" Read a signed integer from a file (or file-like stream).
- @param stream: The source file-like object.
- @param size: The number of bytes to read from the stream.
- @return: The decoded value.
+ :param stream: The source file-like object.
+ :param size: The number of bytes to read from the stream.
+ :return: The decoded value.
"""
if size == 0:
@@ -160,13 +161,13 @@ def readInt(stream, size):
return _struct_int64_unpack_from(data.rjust(8, pad))[0]
-def readFloat(stream, size):
- """ Read an floating point value from a file (or file-like stream).
+def readFloat(stream: BinaryIO, size: int) -> float:
+ """ Read a floating point value from a file (or file-like stream).
- @param stream: The source file-like object.
- @param size: The number of bytes to read from the stream.
- @return: The decoded value.
- @raise IOError: raised if the length of this floating point number is not
+ :param stream: The source file-like object.
+ :param size: The number of bytes to read from the stream.
+ :return: The decoded value.
+ :raise IOError: raised if the length of this floating point number is not
valid (0, 4, 8 bytes)
"""
if size == 4:
@@ -180,12 +181,12 @@ def readFloat(stream, size):
"only lengths of 0, 4, or 8 bytes supported." % size)
-def readString(stream, size):
+def readString(stream: BinaryIO, size: int) -> str:
""" Read an ASCII string from a file (or file-like stream).
- @param stream: The source file-like object.
- @param size: The number of bytes to read from the stream.
- @return: The decoded value.
+ :param stream: The source file-like object.
+ :param size: The number of bytes to read from the stream.
+ :return: The decoded value.
"""
if size == 0:
return u''
@@ -200,12 +201,12 @@ def readString(stream, size):
return str(value, 'ascii', 'replace')
-def readUnicode(stream, size):
- """ Read an UTF-8 encoded string from a file (or file-like stream).
+def readUnicode(stream: BinaryIO, size: int) -> str:
+ """ Read a UTF-8 encoded string from a file (or file-like stream).
- @param stream: The source file-like object.
- @param size: The number of bytes to read from the stream.
- @return: The decoded value.
+ :param stream: The source file-like object.
+ :param size: The number of bytes to read from the stream.
+ :return: The decoded value.
"""
if size == 0:
@@ -216,14 +217,14 @@ def readUnicode(stream, size):
return str(data, 'utf_8')
-def readDate(stream, size=8):
+def readDate(stream: BinaryIO, size: int = 8) -> datetime:
""" Read an EBML encoded date (nanoseconds since UTC 2001-01-01T00:00:00)
from a file (or file-like stream).
- @param stream: The source file-like object.
- @param size: The number of bytes to read from the stream.
- @return: The decoded value (as `datetime.datetime`).
- @raise IOError: raised if the length of the date is not 8 bytes.
+ :param stream: The source file-like object.
+ :param size: The number of bytes to read from the stream.
+ :return: The decoded value (as `datetime.datetime`).
+ :raise IOError: raised if the length of the date is not 8 bytes.
"""
if size != 8:
raise IOError("Cannot read date value of length %d, only 8." % size)
diff --git a/ebmlite/encoding.py b/ebmlite/encoding.py
index ebe367a..f7b9e53 100644
--- a/ebmlite/encoding.py
+++ b/ebmlite/encoding.py
@@ -14,6 +14,7 @@
import datetime
import struct
import sys
+from typing import AnyStr, Optional
import warnings
from .decoding import _struct_uint64, _struct_int64
@@ -45,11 +46,11 @@
# ==============================================================================
-def getLength(val):
+def getLength(val: int) -> int:
""" Calculate the encoded length of a value.
- @param val: A value to be encoded, generally either an ID or a size for
+ :param val: A value to be encoded, generally either an ID or a size for
an EBML element
- @return The minimum length, in bytes, that can be used to represent val
+ :return The minimum length, in bytes, that can be used to represent val
"""
# Brute force it. Ugly but faster than calculating it.
if val <= 126:
@@ -70,15 +71,15 @@ def getLength(val):
return 8
-def encodeSize(val, length=None):
+def encodeSize(val: Optional[int], length: Optional[int] = None) -> bytes:
""" Encode an element size.
- @param val: The size to encode. If `None`, the EBML 'unknown' size
+ :param val: The size to encode. If `None`, the EBML 'unknown' size
will be returned (1 or `length` bytes, all bits 1).
- @keyword length: An explicit length for the encoded size. If `None`,
+ :param length: An explicit length for the encoded size. If `None`,
the size will be encoded at the minimum length required.
- @return: an encoded size for an EBML element.
- @raise ValueError: raised if the length is invalid, or the length cannot
+ :return: an encoded size for an EBML element.
+ :raise ValueError: raised if the length is invalid, or the length cannot
be encoded.
"""
if val is None:
@@ -98,16 +99,16 @@ def encodeSize(val, length=None):
# --- Encoding
# ==============================================================================
-def encodeId(eid, length=None):
+def encodeId(eid: int, length: Optional[int] = None) -> bytes:
""" Encode an element ID.
- @param eid: The EBML ID to encode.
- @keyword length: An explicit length for the encoded data. A `ValueError`
+ :param eid: The EBML ID to encode.
+ :param length: An explicit length for the encoded data. A `ValueError`
will be raised if the length is too short to encode the value.
- @return: The binary representation of ID, left-padded with ``0x00`` if
+ :return: The binary representation of ID, left-padded with ``0x00`` if
`length` is not `None`.
- @return: The encoded version of the ID.
- @raise ValueError: raised if length is less than one or more than 4.
+ :return: The encoded version of the ID.
+ :raise ValueError: raised if length is less than one or more than 4.
"""
if length is not None:
if length < 1 or length > 4:
@@ -119,15 +120,15 @@ def encodeId(eid, length=None):
raise TypeError('Cannot encode {} {!r} as ID'.format(type(eid).__name__, eid))
-def encodeUInt(val, length=None):
+def encodeUInt(val: int, length: Optional[int] = None) -> bytes:
""" Encode an unsigned integer.
- @param val: The unsigned integer value to encode.
- @keyword length: An explicit length for the encoded data. A `ValueError`
+ :param val: The unsigned integer value to encode.
+ :param length: An explicit length for the encoded data. A `ValueError`
will be raised if the length is too short to encode the value.
- @return: The binary representation of val as an unsigned integer,
+ :return: The binary representation of val as an unsigned integer,
left-padded with ``0x00`` if `length` is not `None`.
- @raise ValueError: raised if val is longer than length.
+ :raise ValueError: raised if val is longer than length.
"""
if isinstance(val, float):
fval, val = val, int(val)
@@ -155,16 +156,16 @@ def encodeUInt(val, length=None):
return packed.rjust(length, pad)
-def encodeInt(val, length=None):
+def encodeInt(val: int, length: Optional[int] = None) -> bytes:
""" Encode a signed integer.
- @param val: The signed integer value to encode.
- @keyword length: An explicit length for the encoded data. A `ValueError`
+ :param val: The signed integer value to encode.
+ :param length: An explicit length for the encoded data. A `ValueError`
will be raised if the length is too short to encode the value.
- @return: The binary representation of val as a signed integer,
+ :return: The binary representation of val as a signed integer,
left-padded with either ```0x00`` (for positive values) or ``0xFF``
(for negative) if `length` is not `None`.
- @raise ValueError: raised if val is longer than length.
+ :raise ValueError: raised if val is longer than length.
"""
if isinstance(val, float):
fval, val = val, int(val)
@@ -194,15 +195,15 @@ def encodeInt(val, length=None):
raise TypeError('Cannot encode {} {!r} as integer'.format(type(val).__name__, val))
-def encodeFloat(val, length=None):
+def encodeFloat(val: float, length: Optional[int] = None) -> bytes:
""" Encode a floating point value.
- @param val: The floating point value to encode.
- @keyword length: An explicit length for the encoded data. Must be
+ :param val: The floating point value to encode.
+ :param length: An explicit length for the encoded data. Must be
`None`, 0, 4, or 8; otherwise, a `ValueError` will be raised.
- @return: The binary representation of val as a float, left-padded with
+ :return: The binary representation of val as a float, left-padded with
``0x00`` if `length` is not `None`.
- @raise ValueError: raised if val not length 0, 4, or 8
+ :raise ValueError: raised if val not length 0, 4, or 8
"""
if length is None:
if val is None or val == 0.0:
@@ -224,16 +225,16 @@ def encodeFloat(val, length=None):
raise TypeError('Cannot encode {} {!r} as float'.format(type(val).__name__, val))
-def encodeBinary(val, length=None):
+def encodeBinary(val: AnyStr, length: Optional[int] = None) -> bytes:
""" Encode binary data.
- @param val: A string, bytes, or bytearray containing the data to encode.
- @keyword length: An explicit length for the encoded data. A
+ :param val: A string, bytes, or bytearray containing the data to encode.
+ :param length: An explicit length for the encoded data. A
`ValueError` will be raised if `length` is shorter than the
actual length of the binary data.
- @return: The binary representation of value as binary data, left-padded
+ :return: The binary representation of value as binary data, left-padded
with ``0x00`` if `length` is not `None`.
- @raise ValueError: raised if val is longer than length.
+ :raise ValueError: raised if val is longer than length.
"""
if val is None:
val = b''
@@ -251,13 +252,13 @@ def encodeBinary(val, length=None):
(len(val), length))
-def encodeString(val, length=None):
+def encodeString(val: AnyStr, length: Optional[int] = None) -> bytes:
""" Encode an ASCII string.
- @param val: The string (or bytearray) to encode.
- @keyword length: An explicit length for the encoded data. The result
+ :param val: The string (or bytearray) to encode.
+ :param length: An explicit length for the encoded data. The result
will be truncated if the original string is longer.
- @return: The binary representation of val as a string, truncated or
+ :return: The binary representation of val as a string, truncated or
left-padded with ``0x00`` if `length` is not `None`.
"""
if isinstance(val, str):
@@ -271,13 +272,13 @@ def encodeString(val, length=None):
return encodeBinary(val.translate(STRING_CHARACTERS), length)
-def encodeUnicode(val, length=None):
+def encodeUnicode(val: str, length: Optional[int] = None) -> bytes:
""" Encode a Unicode string.
- @param val: The Unicode string to encode.
- @keyword length: An explicit length for the encoded data. The result
+ :param val: The Unicode string to encode.
+ :param length: An explicit length for the encoded data. The result
will be truncated if the original string is longer.
- @return: The binary representation of val as a string, truncated or
+ :return: The binary representation of val as a string, truncated or
left-padded with ``0x00`` if `length` is not `None`.
"""
if not isinstance(val, (bytearray, bytes, str)):
@@ -291,15 +292,15 @@ def encodeUnicode(val, length=None):
return encodeBinary(val, length)
-def encodeDate(val, length=None):
+def encodeDate(val: datetime.datetime, length: Optional[int] = None) -> bytes:
""" Encode a `datetime` object as an EBML date (i.e. nanoseconds since
2001-01-01T00:00:00).
- @param val: The `datetime.datetime` object value to encode.
- @keyword length: An explicit length for the encoded data. Must be
+ :param val: The `datetime.datetime` object value to encode.
+ :param length: An explicit length for the encoded data. Must be
`None` or 8; otherwise, a `ValueError` will be raised.
- @return: The binary representation of val as an 8-byte dateTime.
- @raise ValueError: raised if the length of the input is not 8 bytes.
+ :return: The binary representation of val as an 8-byte dateTime.
+ :raise ValueError: raised if the length of the input is not 8 bytes.
"""
if length is None:
length = 8
diff --git a/ebmlite/threaded_file.py b/ebmlite/threaded_file.py
index 0fae09d..20609b0 100644
--- a/ebmlite/threaded_file.py
+++ b/ebmlite/threaded_file.py
@@ -1,4 +1,4 @@
-'''
+"""
A special-case, drop-in 'replacement' for a standard read-only file stream
that supports simultaneous access by multiple threads without (explicit)
blocking. Each thread actually gets its own stream, so it can perform its
@@ -6,7 +6,7 @@
functionality is transparent.
@author: dstokes
-'''
+"""
__author__ = "David Randall Stokes, Connor Flanigan"
__copyright__ = "Copyright 2021, Mide Technology Corporation"
__credits__ = "David Randall Stokes, Connor Flanigan, Becker Awqatty, Derek Witt"
@@ -16,6 +16,8 @@
import io
import platform
from threading import currentThread, Event
+from typing import BinaryIO, TextIO, Union
+
class ThreadAwareFile(io.FileIO):
""" A 'replacement' for a standard read-only file stream that supports
@@ -28,7 +30,7 @@ class ThreadAwareFile(io.FileIO):
the standard attributes and properties. Most of these affect only
the current thread.
- @var timeout: A value (in seconds) for blocking operations to wait.
+ :var timeout: A value (in seconds) for blocking operations to wait.
Very few operations block; specifically, only those that do
(or depend upon) internal housekeeping. Timeout should only occur
in certain extreme conditions (e.g. filesystem-related file
@@ -44,7 +46,7 @@ def __init__(self, *args, **kwargs):
"""
# Ensure the file mode, if specified, is "read."
mode = args[1] if len(args) > 1 else 'r'
- if isinstance(mode, (str, bytes, bytearray)):
+ if isinstance(mode, str):
if 'a' in mode or 'w' in mode or '+' in mode:
raise IOError("%s is read-only" % self.__class__.__name__)
@@ -71,7 +73,7 @@ def __init__(self, *args, **kwargs):
self._mode = mode
- def __repr__(self):
+ def __repr__(self) -> str:
# Format the object's ID appropriately for the architecture (32b/64b)
if '32' in platform.architecture()[0]:
fmt = "<%s %s %r, mode %r at 0x%08X>"
@@ -86,7 +88,7 @@ def __repr__(self):
@classmethod
- def makeThreadAware(cls, fileStream):
+ def makeThreadAware(cls, fileStream: Union[TextIO, BinaryIO]) -> "ThreadAwareFile":
""" Create a new `ThreadAwareFile` from an already-open file. If the
object is a `ThreadAwareFile`, it is returned verbatim.
"""
@@ -100,7 +102,7 @@ def makeThreadAware(cls, fileStream):
return f
- def getThreadStream(self):
+ def getThreadStream(self) -> Union[TextIO, BinaryIO]:
""" Get (or create) the file stream for the current thread.
"""
self._ready.wait(self.timeout)
@@ -143,7 +145,7 @@ def cleanup(self):
@property
- def closed(self):
+ def closed(self) -> bool:
""" Is the file not open? Note: A thread that never accessed the file
will get `True`.
"""
@@ -153,56 +155,50 @@ def closed(self):
return True
- def close(self, *args, **kwargs):
+ def close(self):
""" Close the file for the current thread. The file will remain
open for other threads.
"""
- result = self.getThreadStream().close(*args, **kwargs)
+ result = self.getThreadStream().close()
self.cleanup()
return result
# Standard file methods, overridden
- def __format__(self, *args, **kwargs):
- return self.getThreadStream().__format__(*args, **kwargs)
-
- def __hash__(self, *args, **kwargs):
- return self.getThreadStream().__hash__(*args, **kwargs)
+ def __format__(self, *args):
+ return self.getThreadStream().__format__(*args)
- def __iter__(self, *args, **kwargs):
- return self.getThreadStream().__iter__(*args, **kwargs)
+ def __hash__(self):
+ return self.getThreadStream().__hash__()
- def __reduce__(self, *args, **kwargs):
- return self.getThreadStream().__reduce__(*args, **kwargs)
+ def __iter__(self):
+ return self.getThreadStream().__iter__()
- def __reduce_ex__(self, *args, **kwargs):
- return self.getThreadStream().__reduce_ex__(*args, **kwargs)
+ def __reduce__(self):
+ return self.getThreadStream().__reduce__()
- def __sizeof__(self, *args, **kwargs):
- return self.getThreadStream().__sizeof__(*args, **kwargs)
+ def __reduce_ex__(self, *args):
+ return self.getThreadStream().__reduce_ex__(*args)
- def __str__(self, *args, **kwargs):
- return self.getThreadStream().__str__(*args, **kwargs)
+ def __sizeof__(self):
+ return self.getThreadStream().__sizeof__()
- def fileno(self, *args, **kwargs):
- return self.getThreadStream().fileno(*args, **kwargs)
+ def __str__(self):
+ return self.getThreadStream().__str__()
- def flush(self, *args, **kwargs):
- return self.getThreadStream().flush(*args, **kwargs)
+ def fileno(self):
+ return self.getThreadStream().fileno()
- def isatty(self, *args, **kwargs):
- return self.getThreadStream().isatty(*args, **kwargs)
+ def flush(self):
+ return self.getThreadStream().flush()
- def next(self, *args, **kwargs):
- return self.getThreadStream().next(*args, **kwargs)
+ def isatty(self):
+ return self.getThreadStream().isatty()
def read(self, *args, **kwargs):
return self.getThreadStream().read(*args, **kwargs)
- def readinto(self, *args, **kwargs):
- return self.getThreadStream().readinto(*args, **kwargs)
-
def readline(self, *args, **kwargs):
return self.getThreadStream().readline(*args, **kwargs)
@@ -212,8 +208,8 @@ def readlines(self, *args, **kwargs):
def seek(self, *args, **kwargs):
return self.getThreadStream().seek(*args, **kwargs)
- def tell(self, *args, **kwargs):
- return self.getThreadStream().tell(*args, **kwargs)
+ def tell(self):
+ return self.getThreadStream().tell()
def truncate(self, *args, **kwargs):
raise IOError("Can't truncate(); %s is read-only" %
@@ -227,11 +223,8 @@ def writelines(self, *args, **kwargs):
raise IOError("Can't writelines(); %s is read-only" %
self.__class__.__name__)
- def xreadlines(self, *args, **kwargs):
- return self.getThreadStream().xreadlines(*args, **kwargs)
-
def __enter__(self, *args, **kwargs):
- return self.getThreadStream().__enter__(*args, **kwargs)
+ return self.getThreadStream().__enter__()
def __exit__(self, *args, **kwargs):
return self.getThreadStream().__exit__(*args, **kwargs)
@@ -259,11 +252,3 @@ def name(self):
@property
def newlines(self):
return self.getThreadStream().newlines
-
- @property
- def softspace(self):
- return self.getThreadStream().softspace
-
- @softspace.setter
- def softspace(self, val):
- self.getThreadStream().softspace = val
diff --git a/ebmlite/tools/list_schemata.py b/ebmlite/tools/list_schemata.py
index 3aef585..be39c5b 100644
--- a/ebmlite/tools/list_schemata.py
+++ b/ebmlite/tools/list_schemata.py
@@ -34,4 +34,3 @@ def main():
if __name__ == "__main__":
main()
-
diff --git a/ebmlite/tools/utils.py b/ebmlite/tools/utils.py
index 136412a..7169429 100644
--- a/ebmlite/tools/utils.py
+++ b/ebmlite/tools/utils.py
@@ -17,6 +17,7 @@ def load_files(args, binary_output=False):
sys.stderr.write("Input file does not exist: %s\n" % args.input)
exit(1)
+ schema = None
try:
schema_file = args.schema
if os.path.splitext(schema_file.strip())[1] == '':
@@ -26,11 +27,11 @@ def load_files(args, binary_output=False):
errPrint("Error loading schema: %s\n" % err)
if not args.output:
- yield (schema, sys.stdout)
+ yield schema, sys.stdout
return
output = os.path.realpath(os.path.expanduser(args.output))
if os.path.exists(output) and not args.clobber:
errPrint("Error: Output file already exists: %s" % args.output)
with open(output, ('wb' if binary_output else 'w')) as out:
- yield (schema, out)
+ yield schema, out
diff --git a/ebmlite/util.py b/ebmlite/util.py
index 6fe77da..8dd6b97 100644
--- a/ebmlite/util.py
+++ b/ebmlite/util.py
@@ -4,10 +4,10 @@
Created on Aug 11, 2017
-@todo: Clean up and standardize usage of the term 'size' versus 'length.'
-@todo: Modify (or create an alternate version of) `toXml()` that writes
+:todo: Clean up and standardize usage of the term 'size' versus 'length.'
+:todo: Modify (or create an alternate version of) `toXml()` that writes
directly to a file, allowing the conversion of huge EBML files.
-@todo: Add other options to command-line utility for the other arguments of
+:todo: Add other options to command-line utility for the other arguments of
`toXml()` and `xml2ebml()`.
"""
__author__ = "David Randall Stokes, Connor Flanigan"
@@ -15,16 +15,17 @@
__credits__ = "David Randall Stokes, Connor Flanigan, Becker Awqatty, Derek Witt"
__all__ = ['createID', 'validateID', 'toXml', 'xml2ebml', 'loadXml', 'pprint',
- 'printSchemata']
+ 'printSchemata', 'flatiter']
import ast
-from base64 import b64encode, b64decode
-from io import StringIO
+from io import BytesIO
import pathlib
import struct
import sys
import tempfile
+from typing import BinaryIO, Callable, IO, List, Optional, Tuple, Union
from xml.etree import ElementTree as ET
+from pathlib import Path
from . import core, encoding, decoding
from . import xml_codecs
@@ -34,22 +35,27 @@
# ==============================================================================
-def createID(schema, idClass, exclude=(), minId=0x81, maxId=0x1FFFFFFE, count=1):
+def createID(schema: core.Schema,
+ idClass: str,
+ exclude: Tuple[int] = (),
+ minId: int = 0x81,
+ maxId: int = 0x1FFFFFFE,
+ count: int = 1) -> List[int]:
""" Generate unique EBML IDs. Primarily intended for use 'offline' by
humans creating EBML schemata.
- @param schema: The `Schema` in which the new IDs must coexist.
- @param idClass: The EBML class of ID, one of (case-insensitive):
+ :param schema: The `Schema` in which the new IDs must coexist.
+ :param idClass: The EBML class of ID, one of (case-insensitive):
* `'a'`: Class A (1 octet, base 0x8X)
* `'b'`: Class B (2 octets, base 0x4000)
* `'c'`: Class C (3 octets, base 0x200000)
* `'d'`: Class D (4 octets, base 0x10000000)
- @param exclude: A list of additional IDs to avoid.
- @param minId: The minimum ID value, within the ID class' range.
- @param maxId: The maximum ID value, within the ID class' range.
- @param count: The maximum number of IDs to generate. The result may be
+ :param exclude: A list of additional IDs to avoid.
+ :param minId: The minimum ID value, within the ID class' range.
+ :param maxId: The maximum ID value, within the ID class' range.
+ :param count: The maximum number of IDs to generate. The result may be
fewer than specified if too few meet the given criteria.
- @return: A list of EBML IDs that match the given criteria.
+ :return: A list of EBML IDs that match the given criteria.
"""
ranges = dict(A=(0x81, 0xFE),
B=(0x407F, 0x7FFE),
@@ -75,7 +81,7 @@ def createID(schema, idClass, exclude=(), minId=0x81, maxId=0x1FFFFFFE, count=1)
return result
-def validateID(elementId):
+def validateID(elementId: int) -> bool:
""" Verify that a number is a valid EBML element ID. A `ValueError`
will be raised if the element ID is invalid.
@@ -85,8 +91,8 @@ def validateID(elementId):
* C: 0x203FFF to 0x3FFFFE
* D: 0x101FFFFF to 0x1FFFFFFE
- @param elementId: The element ID to validate
- @raises: `ValueError`, although certain edge cases may raise
+ :param elementId: The element ID to validate
+ :raises: `ValueError`, although certain edge cases may raise
another type.
"""
ranges = ((0x81, 0xFE), (0x407F, 0x7FFE), (0x203FFF, 0x3FFFFE), (0x101FFFFF, 0x1FFFFFFE))
@@ -123,30 +129,36 @@ def validateID(elementId):
# ==============================================================================
-def toXml(el, parent=None, offsets=True, sizes=True, types=True, ids=True,
- binary_codec='base64', void_codec='ignore'):
+def toXml(el: core.Element,
+ parent=None,
+ offsets: bool = True,
+ sizes: bool = True,
+ types: bool = True,
+ ids: bool = True,
+ binary_codec: Union[Callable, str] = 'base64',
+ void_codec: Union[Callable, str] = 'ignore'):
""" Convert an EBML Document to XML. Binary elements will contain
base64-encoded data in their body. Other non-master elements will
contain their value in a ``value`` attribute.
- @param el: An instance of an EBML Element or Document subclass.
- @keyword parent: The resulting XML element's parent element, if any.
- @keyword offsets: If `True`, create a ``offset`` attributes for each
+ :param el: An instance of an EBML Element or Document subclass.
+ :param parent: The resulting XML element's parent element, if any.
+ :param offsets: If `True`, create a ``offset`` attributes for each
generated XML element, containing the corresponding EBML element's
offset.
- @keyword sizes: If `True`, create ``size`` attributes containing the
+ :param sizes: If `True`, create ``size`` attributes containing the
corresponding EBML element's size.
- @keyword types: If `True`, create ``type`` attributes containing the
+ :param types: If `True`, create ``type`` attributes containing the
name of the corresponding EBML element type.
- @keyword ids: If `True`, create ``id`` attributes containing the
+ :param ids: If `True`, create ``id`` attributes containing the
corresponding EBML element's EBML ID.
- @keyword binary_codec: The name of an XML codec class from
+ :param binary_codec: The name of an XML codec class from
`ebmlite.xml_codecs`, or an instance of a codec, for rendering
binary elements as text.
- @keyword void_codec: The name of an XML codec class from
+ :param void_codec: The name of an XML codec class from
`ebmlite.xml_codecs`, or an instance of a codec, for rendering
the contents of Void elements as text.
- @return The root XML element of the file.
+ :return The root XML element of the file.
"""
if isinstance(binary_codec, str):
binary_codec = xml_codecs.BINARY_CODECS[binary_codec]()
@@ -194,26 +206,30 @@ def toXml(el, parent=None, offsets=True, sizes=True, types=True, ids=True,
return xmlEl
-#===============================================================================
+# ===========================================================================
#
-#===============================================================================
+# ===========================================================================
-def xmlElement2ebml(xmlEl, ebmlFile, schema, sizeLength=None, unknown=True):
+def xmlElement2ebml(xmlEl,
+ ebmlFile: BinaryIO,
+ schema: core.Schema,
+ sizeLength: Optional[int] = None,
+ unknown: bool = True):
""" Convert an XML element to EBML, recursing if necessary. For converting
an entire XML document, use `xml2ebml()`.
- @param xmlEl: The XML element. Its tag must match an element defined
+ :param xmlEl: The XML element. Its tag must match an element defined
in the `schema`.
- @param ebmlFile: An open file-like stream, to which the EBML data will
+ :param ebmlFile: An open file-like stream, to which the EBML data will
be written.
- @param schema: An `ebmlite.core.Schema` instance to use when
+ :param schema: An `ebmlite.core.Schema` instance to use when
writing the EBML document.
- @keyword sizeLength:
- @param unknown: If `True`, unknown element names will be allowed,
+ :param sizeLength:
+ :param unknown: If `True`, unknown element names will be allowed,
provided their XML elements include an ``id`` attribute with the
EBML ID (in hexadecimal).
- @return The length of the encoded element, including header and children.
- @raise NameError: raised if an xml element is not present in the schema and unknown is False, OR if the xml
+ :return The length of the encoded element, including header and children.
+ :raise NameError: raised if an XML element is not present in the schema and unknown is False, OR if the xml
element does not have an ID.
"""
if not isinstance(xmlEl.tag, (str, bytes, bytearray)):
@@ -284,32 +300,36 @@ def xmlElement2ebml(xmlEl, ebmlFile, schema, sizeLength=None, unknown=True):
return len(encoded)
-def xml2ebml(xmlFile, ebmlFile, schema, sizeLength=None, headers=True,
- unknown=True):
+def xml2ebml(xmlFile,
+ ebmlFile: BinaryIO,
+ schema: Union[str, Path, core.Schema],
+ sizeLength: Optional[int] = None,
+ headers: bool = True,
+ unknown: bool = True):
""" Convert an XML file to EBML.
- @todo: Convert XML on the fly, rather than parsing it first, allowing
+ :todo: Convert XML on the fly, rather than parsing it first, allowing
for the conversion of arbitrarily huge files.
- @param xmlFile: The XML source. Can be a filename, an open file-like
+ :param xmlFile: The XML source. Can be a filename, an open file-like
stream, or a parsed XML document.
- @param ebmlFile: The EBML file to write. Can be a filename or an open
+ :param ebmlFile: The EBML file to write. Can be a filename or an open
file-like stream.
- @param schema: The EBML schema to use. Can be a filename or an
+ :param schema: The EBML schema to use. Can be a filename or an
instance of a `Schema`.
- @keyword sizeLength: The default length of each element's size
+ :param sizeLength: The default length of each element's size
descriptor. Must be large enough to store the largest 'master'
element. If an XML element has a ``sizeLength`` attribute, it will
override this.
- @keyword headers: If `True`, generate the standard ``EBML`` EBML
+ :param headers: If `True`, generate the standard ``EBML`` EBML
element if the XML document does not contain one.
- @param unknown: If `True`, unknown element names will be allowed,
+ :param unknown: If `True`, unknown element names will be allowed,
provided their XML elements include an ``id`` attribute with the
EBML ID (in hexadecimal).
- @return: the size of the ebml file in bytes.
- @raise NameError: raises if an xml element is not present in the schema.
+ :return: the size of the ebml file in bytes.
+ :raise NameError: raises if an xml element is not present in the schema.
"""
- if isinstance(ebmlFile, (str, bytes, bytearray)):
+ if isinstance(ebmlFile, (str, Path)):
ebmlFile = open(ebmlFile, 'wb')
openedEbml = True
else:
@@ -354,25 +374,27 @@ def xml2ebml(xmlFile, ebmlFile, schema, sizeLength=None, headers=True,
return numBytes
-#===============================================================================
+# ===========================================================================
#
-#===============================================================================
+# ===========================================================================
-def loadXml(xmlFile, schema, ebmlFile=None):
+def loadXml(xmlFile,
+ schema: core.Schema,
+ ebmlFile: Union[BinaryIO, str, None] = None):
""" Helpful utility to load an EBML document from an XML file.
- @param xmlFile: The XML source. Can be a filename, an open file-like
+ :param xmlFile: The XML source. Can be a filename, an open file-like
stream, or a parsed XML document.
- @param schema: The EBML schema to use. Can be a filename or an
+ :param schema: The EBML schema to use. Can be a filename or an
instance of a `Schema`.
- @keyword ebmlFile: The name of the temporary EBML file to write, or
+ :param ebmlFile: The name of the temporary EBML file to write, or
``:memory:`` to use RAM (like `sqlite3`). Defaults to an
automatically-generated temporary file.
- @return The root node of the specified EBML file.
+ :return The root node of the specified EBML file.
"""
if ebmlFile == ":memory:":
- ebmlFile = StringIO()
+ ebmlFile = BytesIO()
xml2ebml(xmlFile, ebmlFile, schema)
ebmlFile.seek(0)
else:
@@ -382,23 +404,28 @@ def loadXml(xmlFile, schema, ebmlFile=None):
return schema.load(ebmlFile)
-#===============================================================================
+# ===========================================================================
#
-#===============================================================================
-
-def pprint(el, values=True, out=sys.stdout, indent=" ", binary_codec="ignore",
- void_codec="ignore", _depth=0):
+# ===========================================================================
+
+def pprint(el: core.Element,
+ values: bool = True,
+ out: IO = sys.stdout,
+ indent: str = " ",
+ binary_codec: Union[Callable, str] = "ignore",
+ void_codec: Union[Callable, str] = "ignore",
+ _depth: int = 0):
""" Test function to recursively crawl an EBML document or element and
print its structure, with child elements shown indented.
- @param el: An instance of a `Document` or `Element` subclass.
- @keyword values: If `True`, show elements' values.
- @keyword out: A file-like stream to which to write.
- @keyword indent: The string containing the character(s) used for each
+ :param el: An instance of a `Document` or `Element` subclass.
+ :param values: If `True`, show elements' values.
+ :param out: A file-like stream to which to write.
+ :param indent: The string containing the character(s) used for each
indentation.
- @keyword binary_codec: The name of a class from `ebmlite.xml_codecs`,
+ :param binary_codec: The name of a class from `ebmlite.xml_codecs`,
or an instance of a codec, for rendering binary elements as text.
- @keyword void_codec: The name of a class from `ebmlite.xml_codecs`,
+ :param void_codec: The name of a class from `ebmlite.xml_codecs`,
or an instance of a codec, for rendering the contents of Void
elements as text.
"""
@@ -444,18 +471,24 @@ def pprint(el, values=True, out=sys.stdout, indent=" ", binary_codec="ignore",
out.flush()
-#===============================================================================
+# ===========================================================================
#
-#===============================================================================
+# ===========================================================================
-def printSchemata(paths=None, out=sys.stdout, absolute=True):
+def printSchemata(paths: Optional[List[Union[str, Path]]] = None,
+ out: Union[str, Path, IO] = sys.stdout,
+ absolute: bool = True):
""" Display a list of schemata in `SCHEMA_PATH`. A thin wrapper for the
core `listSchemata()` function.
- @param out: A file-like stream to which to write.
+ :param paths: A list of paths to search for schemata, in addition to
+ those in `SCHEMA_PATH`.
+ :param out: A file-like stream or filename to which to write.
+ :param absolute: If `True`, use absolute paths in the schema
+ filenames.
"""
out = out or sys.stdout
- newfile = isinstance(out, (str, pathlib.Path))
+ newfile = isinstance(out, (str, Path))
if newfile:
out = open(out, 'w')
@@ -473,3 +506,30 @@ def printSchemata(paths=None, out=sys.stdout, absolute=True):
finally:
if newfile:
out.close()
+
+
+#===============================================================================
+#
+#===============================================================================
+
+
+def flatiter(element, depth=None):
+ """ Recursively crawl an EBML document or element, depth-first,
+ yielding all elements (or elements down to a given depth).
+
+ :param element: The EBML `Document` or `Element` to iterate.
+ :param depth: The maximum recursion depth. `None` or a value less
+ than zero will fully recurse without limit.
+ """
+ depth = -1 if depth is None else depth
+
+ def _flatiter(el, d, first):
+ if not first:
+ yield el
+ if abs(d) > 0 and isinstance(el, core.MasterElement):
+ for ch in el:
+ for grandchild in _flatiter(ch, d-1, False):
+ yield grandchild
+
+ for child in _flatiter(element, depth, True):
+ yield child
diff --git a/ebmlite/xml_codecs.py b/ebmlite/xml_codecs.py
index 225bc28..62407ca 100644
--- a/ebmlite/xml_codecs.py
+++ b/ebmlite/xml_codecs.py
@@ -6,6 +6,7 @@
import base64
from io import BytesIO, StringIO
+from typing import BinaryIO, Optional, Union
# ==============================================================================
@@ -30,7 +31,12 @@ def __init__(self, **kwargs):
"""
pass
- def encode(self, data, stream=None, indent='', offset=0, **kwargs):
+ def encode(self,
+ data: bytes,
+ stream: Optional[BinaryIO] = None,
+ indent: Union[str, bytes] = '',
+ offset: int = 0,
+ **kwargs):
""" Convert binary data to text. Typical arguments:
:param data: The binary data from an EBML `BinaryElement`.
@@ -69,7 +75,7 @@ class Base64Codec(BinaryCodec):
"""
NAME = "base64"
- def __init__(self, cols=76, **kwargs):
+ def __init__(self, cols=76, **_kwargs):
""" Constructor.
:param cols: The length of each line of base64 data, excluding
@@ -80,10 +86,15 @@ def __init__(self, cols=76, **kwargs):
Additional keyword arguments will be accepted (to maintain
compatibility with other codecs) but ignored.
"""
+ super().__init__()
self.cols = cols
- def encode(self, data, stream=None, indent='', **kwargs):
+ def encode(self,
+ data: bytes,
+ stream: Optional[BinaryIO] = None,
+ indent: Union[str, bytes] = '',
+ **kwargs) -> Union[str, int]:
""" Convert binary data to base64 text.
:param data: The binary data from an EBML `BinaryElement`.
@@ -176,7 +187,11 @@ class HexCodec(BinaryCodec):
# The name shown in the encoded XML element's `encoding` attribute
NAME = "hex"
- def __init__(self, width=2, cols=32, offsets=True, **kwargs):
+ def __init__(self,
+ width: int = 2,
+ cols: int = 32,
+ offsets: bool = True,
+ **_kwargs):
""" Constructor.
:param width: The number of bytes displayed per column when
@@ -187,12 +202,18 @@ def __init__(self, width=2, cols=32, offsets=True, **kwargs):
:param offsets: If `True`, each line will start with its offset
(in decimal). Applicable if `cols` is a non-zero number.
"""
+ super().__init__()
self.width = width
self.cols = cols
self.offsets = bool(offsets and cols)
- def encode(self, data, stream=None, offset=0, indent='', **kwargs):
+ def encode(self,
+ data: bytes,
+ stream: Optional[BinaryIO] = None,
+ offset: int = 0,
+ indent='',
+ **kwargs) -> Union[str, int]:
""" Convert binary data to hexadecimal text.
:param data: The binary data from an EBML `BinaryElement`.
@@ -233,7 +254,9 @@ def encode(self, data, stream=None, offset=0, indent='', **kwargs):
@classmethod
- def decode(cls, data, stream=None):
+ def decode(cls,
+ data: bytes,
+ stream: Optional[BinaryIO] = None) -> Union[bytes, int]:
""" Decode binary data in hexadecimal (e.g., from an XML file). Note:
this is a `classmethod`, and works regardles of how the encoded
data was formatted (e.g., number of columns, with or without
@@ -281,13 +304,17 @@ class IgnoreCodec(BinaryCodec):
NAME = "ignore"
@staticmethod
- def encode(data, stream=None, **kwargs):
+ def encode(data: bytes,
+ stream: Optional[BinaryIO] = None,
+ **kwargs) -> Union[str, int]:
if stream:
return 0
return ''
@staticmethod
- def decode(data, stream=None, **kwargs):
+ def decode(data: bytes,
+ stream: Optional[BinaryIO] = None,
+ **kwargs) -> Union[bytes, int]:
if stream:
return 0
return b''
diff --git a/setup.py b/setup.py
index 8fc674e..1d89395 100644
--- a/setup.py
+++ b/setup.py
@@ -1,11 +1,27 @@
+import codecs
+import os.path
import setuptools
+def read(rel_path):
+ here = os.path.abspath(os.path.dirname(__file__))
+ with codecs.open(os.path.join(here, rel_path), 'r') as fp:
+ return fp.read()
+
+
+def get_version(rel_path):
+ for line in read(rel_path).splitlines():
+ if line.startswith('__version__'):
+ delim = '"' if '"' in line else "'"
+ return line.split(delim)[1]
+ else:
+ raise RuntimeError("Unable to find version string.")
+
+
with open('README.md', 'r') as fh:
long_description = fh.read()
INSTALL_REQUIRES = [
-# 'numpy',
]
TEST_REQUIRES = [
@@ -21,7 +37,7 @@
setuptools.setup(
name='ebmlite',
- version='3.3.1',
+ version=get_version('ebmlite/__init__.py'),
author='Mide Technology',
author_email='help@mide.com',
description='A lightweight, "pure Python" library for parsing EBML (Extensible Binary Markup Language) data.',
@@ -32,12 +48,11 @@
classifiers=['Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: MIT License',
'Natural Language :: English',
- 'Programming Language :: Python :: 3.6',
- 'Programming Language :: Python :: 3.7',
- 'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
+ 'Programming Language :: Python :: 3.12',
+ 'Programming Language :: Python :: 3.13',
],
keywords='ebml binary matroska webm',
packages=setuptools.find_packages(exclude="tests"),
diff --git a/tests/test_general.py b/tests/test_general.py
index c108083..3114ef3 100644
--- a/tests/test_general.py
+++ b/tests/test_general.py
@@ -4,6 +4,7 @@
@author: dstokes
"""
+from itertools import zip_longest
import os.path
import unittest
from xml.dom.minidom import parseString
@@ -66,18 +67,11 @@ def testMkv(self):
xmlDoc2 = util.loadXml(xmlFile2, schema)
# Compare each element from the XML
- xmlEls1 = [xmlDoc1]
- xmlEls2 = [xmlDoc2]
- while len(xmlEls1) > 0:
- self.assertEqual(xmlEls1[0], xmlEls2[0], 'Element '
- + repr(xmlEls1[0])
- + ' was not converted properly')
- for x in list(xmlEls1.pop(0).children.values()):
- if issubclass(x, core.Element):
- xmlEls1.append(x)
- for x in list(xmlEls2.pop(0).children.values()):
- if issubclass(x, core.Element):
- xmlEls2.append(x)
+ for el1, el2 in zip_longest(util.flatiter(xmlDoc1),
+ util.flatiter(xmlDoc2),
+ fillvalue=None):
+ self.assertEqual(el1, el2,
+ 'Element {!r} was not converted properly'.format(el1))
def testIde(self):
@@ -121,19 +115,11 @@ def testIde(self):
xmlDoc2 = util.loadXml(xmlFile2, schema)
# Compare each element from the XML
- xmlEls1 = [xmlDoc1]
- xmlEls2 = [xmlDoc2]
- while len(xmlEls1) > 0:
- self.assertEqual(xmlEls1[0], xmlEls2[0], 'Element '
- + repr(xmlEls1[0])
- + ' was not converted properly')
- for x in list(xmlEls1.pop(0).children.values()):
- if issubclass(x, core.Element):
- xmlEls1.append(x)
- for x in list(xmlEls2.pop(0).children.values()):
- if issubclass(x, core.Element):
- xmlEls2.append(x)
-
+ for el1, el2 in zip_longest(util.flatiter(xmlDoc1),
+ util.flatiter(xmlDoc2),
+ fillvalue=None):
+ self.assertEqual(el1, el2,
+ 'Element {!r} was not converted properly'.format(el1))
def testPPrint(self):
@@ -298,18 +284,11 @@ def testMkv(self):
xmlDoc2 = util.loadXml(xmlFile2, schema)
# Compare each element from the XML
- xmlEls1 = [xmlDoc1]
- xmlEls2 = [xmlDoc2]
- while len(xmlEls1) > 0:
- self.assertEqual(xmlEls1[0], xmlEls2[0], 'Element '
- + repr(xmlEls1[0])
- + ' was not converted properly')
- for x in list(xmlEls1.pop(0).children.values()):
- if issubclass(x, core.Element):
- xmlEls1.append(x)
- for x in list(xmlEls2.pop(0).children.values()):
- if issubclass(x, core.Element):
- xmlEls2.append(x)
+ for el1, el2 in zip_longest(util.flatiter(xmlDoc1),
+ util.flatiter(xmlDoc2),
+ fillvalue=None):
+ self.assertEqual(el1, el2,
+ 'Element {!r} was not converted properly'.format(el1))
if __name__ == "__main__":
diff --git a/tests/test_tools.py b/tests/test_tools.py
index 396fd92..f1f293a 100644
--- a/tests/test_tools.py
+++ b/tests/test_tools.py
@@ -12,6 +12,13 @@
@pytest.mark.script_launch_mode('subprocess')
def test_ebml2xml(script_runner):
+
+ # This test can only run if the library has been installed,
+ # e.g., in a GitHub action. Bail if not.
+ # TODO: This is a hack and should be redone.
+ if os.getenv("GITHUB_ACTIONS") != "true":
+ return
+
path_base = os.path.join(".", "tests", "video-4{ext}")
path_in = path_base.format(ext=".ebml")
path_out = path_base.format(ext=".ebml.xml")
@@ -56,6 +63,13 @@ def assert_elements_are_equiv(e1, e2):
@pytest.mark.script_launch_mode('subprocess')
def test_xml2ebml(script_runner):
+
+ # This test can only run if the library has been installed,
+ # e.g., in a GitHub action. Bail if not.
+ # TODO: This is a hack and should be redone.
+ if os.getenv("GITHUB_ACTIONS") != "true":
+ return
+
path_base = os.path.join(".", "tests", "video-4{ext}")
path_in = path_base.format(ext=".xml")
path_out = path_base.format(ext=".xml.ebml")
@@ -84,6 +98,13 @@ def test_xml2ebml(script_runner):
@pytest.mark.script_launch_mode('subprocess')
def test_view(script_runner):
+
+ # This test can only run if the library has been installed,
+ # e.g., in a GitHub action. Bail if not.
+ # TODO: This is a hack and should be redone.
+ if os.getenv("GITHUB_ACTIONS") != "true":
+ return
+
path_base = os.path.join(".", "tests", "video-4{ext}")
path_in = path_base.format(ext=".ebml")
path_out = path_base.format(ext=".xml.txt")
@@ -112,6 +133,13 @@ def test_view(script_runner):
@pytest.mark.script_launch_mode('subprocess')
def test_list_schemata(script_runner):
+
+ # This test can only run if the library has been installed,
+ # e.g., in a GitHub action. Bail if not.
+ # TODO: This is a hack and should be redone.
+ if os.getenv("GITHUB_ACTIONS") != "true":
+ return
+
core.SCHEMA_PATH = [os.path.dirname(schemata.__file__)]
path_out = os.path.join(".", "tests", "list-schemata.txt")