Skip to content
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
xmlschema[codegen]~=4.1.0
xmlschema[codegen]~=4.3.1
112 changes: 74 additions & 38 deletions src/cbexigen/SchemaAnalyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ def __get_particle_from_attribute(self, attribute: XsdAttribute):

particle.name = attribute.local_name

ns = tools.extract_namespace_uri(getattr(attribute, 'name', ''))
if ns:
particle.namespace = ns
elif hasattr(attribute, 'default_namespace') and attribute.default_namespace:
particle.namespace = attribute.default_namespace

particle.type = self.__get_type_name(attribute)
particle.type_short = self.__get_type_name_short(attribute)
particle.base_type = self.__get_base_type_name(attribute)
Expand Down Expand Up @@ -393,6 +399,12 @@ def __get_particle(self, element: XsdElement):

particle.name = element.local_name

ns = tools.extract_namespace_uri(element.name)
if ns:
particle.namespace = ns
elif hasattr(element, 'default_namespace') and element.default_namespace:
particle.namespace = element.default_namespace

particle.type = self.__get_type_name(element)
particle.type_short = self.__get_type_name_short(element)
particle.base_type = self.__get_base_type_name(element)
Expand Down Expand Up @@ -456,6 +468,12 @@ def __get_abstract_particle(self, element: XsdElement, substitute: XsdElement):

particle.name = substitute.local_name

ns = tools.extract_namespace_uri(substitute.name)
if ns:
particle.namespace = ns
elif hasattr(substitute, 'default_namespace') and substitute.default_namespace:
particle.namespace = substitute.default_namespace

particle.type = self.__get_type_name(substitute)
particle.type_short = self.__get_type_name_short(substitute)
particle.base_type = self.__get_base_type_name(substitute)
Expand Down Expand Up @@ -572,7 +590,7 @@ def __get_particle_list(self, element: XsdElement, subst_list):
if self.__is_abstract(child):
# get substituted particles for child
qname = self.__get_name(child)
subst_group = self.__current_schema.substitution_groups._target_dict.get(qname)
subst_group = self.__current_schema.maps.substitution_groups.get(qname)
if subst_group:
for elem in subst_group:
particle = self.__get_abstract_particle(child, elem)
Expand All @@ -590,7 +608,7 @@ def __get_particle_list(self, element: XsdElement, subst_list):
if self.__is_abstract_type(element):
# get substituted particles for element
qname = self.__get_name(element)
subst_group = self.__current_schema.substitution_groups._target_dict.get(qname)
subst_group = self.__current_schema.maps.substitution_groups.get(qname)
if subst_group:
for elem in subst_group:
particle = self.__get_abstract_particle(element, elem)
Expand Down Expand Up @@ -850,7 +868,7 @@ def __get_child_tree(self, element: XsdElement, level, recursive=True):
msg_write((level + 1) * " " + "ABSTRACT TYPE is extension")

qname = self.__get_name(child)
sg = self.__current_schema.substitution_groups._target_dict.get(qname)
sg = self.__current_schema.maps.substitution_groups.get(qname)
if sg:
for substitute in sg:
substitute_type_name = self.__get_type_name(substitute)
Expand Down Expand Up @@ -925,7 +943,7 @@ def analyze_schema_elements(self):
self.__build_schema_builtin_types_list()

if self.__is_iso20:
for element in self.__current_schema.elements._target_dict.values():
for element in self.__current_schema.maps.elements.values():
if element.prefixed_name.startswith('xs:'):
continue

Expand Down Expand Up @@ -1010,7 +1028,7 @@ def analyze_schema_elements(self):

def __build_schema_builtin_types_list(self):
xs_namespace = self.__current_schema.namespaces['xs']
for value in self.__current_schema.types._target_dict.values():
for value in self.__current_schema.maps.types.values():
if value.target_namespace == xs_namespace:
if value.__class__.__name__ == 'XsdAtomicBuiltin':
if value.simple_type.base_type is not None:
Expand Down Expand Up @@ -1063,7 +1081,7 @@ def __print_child_recursive(element_list, child_element: XsdElement):
self.__known_fragments.clear()
fragments = {}

for element in self.__current_schema.elements._target_dict.values():
for element in self.__current_schema.maps.elements.values():
if element.default_namespace:
if element.name not in fragments.keys():
fragments[element.name] = __get_fragment(element)
Expand Down Expand Up @@ -1109,7 +1127,7 @@ def __build_namespace_element_lists(self):
current_namespace = self.__current_schema.get_schema('')
for ele in current_namespace.elements.values():
items = []
for value in current_namespace.elements._target_dict.values():
for value in current_namespace.maps.elements.values():
if value.default_namespace:
name = self.__get_type_name_short(value)
if name == '' or name in ['AnonType', 'string']:
Expand Down Expand Up @@ -1143,6 +1161,14 @@ def __build_namespace_element_lists(self):

for gen_elem in self.__generate_elements:
if gen_elem.type == '{' + imp.default_namespace + '}' + name:
# Only replace particles for empty/abstract container types.
# Concrete types (with their own fields) must keep their particles.
if gen_elem.content_type != 'empty' and len(gen_elem.particles) > 0:
continue
# Skip substitution group heads – their particles come from
# the substitution group expansion, not from namespace imports.
if self.__current_schema.maps.substitution_groups.get(gen_elem.name):
continue
gen_elem.particles = items
gen_elem.is_in_namespace_elements = True
self.__namespace_elements[name] = items
Expand All @@ -1151,7 +1177,7 @@ def __build_namespace_element_lists(self):
def __build_generate_elements_types_list(self):
xs_namespace = self.__current_schema.namespaces['xs']
type_list = []
for value in self.__current_schema.types._target_dict.values():
for value in self.__current_schema.maps.types.values():
if value.target_namespace != xs_namespace and value.content_type_label == 'element-only':
type_list.append(value.local_name)

Expand Down Expand Up @@ -1244,7 +1270,7 @@ def __replace_particle_list_in_parent(parent_element: ElementData, particle_list
with the sorted particles from replacement_list.
"""
# the replacements need to be sorted alphabetically
replacement_list.sort(key=lambda particle_key: particle_key.name)
replacement_list.sort(key=lambda p: (p.name or '', p.namespace or ''))

# the particles need to be sorted by index, for proper removal
particle_list.sort(key=lambda x: x[0])
Expand All @@ -1259,8 +1285,10 @@ def __replace_particle_list_in_parent(parent_element: ElementData, particle_list
# drop all the particles listed in particle_list
p_index: int
for p_index, p in reversed(particle_list):
if parent_element.particles[p_index] != p:
log_write(f"particle '{p.name}' not found in '{parent_element.name_short}'")
if p_index >= len(parent_element.particles) or parent_element.particles[p_index] is not p:
raise RuntimeError(
f"particle '{p.name}' at index {p_index} does not match in "
f"'{parent_element.name_short}' -- particle list is stale")
del parent_element.particles[p_index]

# insert the replacements at the original position, assuming the lowest original particle
Expand Down Expand Up @@ -1301,19 +1329,25 @@ def __copy_particles_from_empty_content_elements(self, element: ElementData, par
else:
log_write(f' Add to list and remove particle {particle.name}.')
replacement_list.append(particle)
if particle in parent.particles:
particles_to_remove.append((parent.particles.index(particle), particle))
# Use name-based lookup (not object identity) to find the
# matching particle in parent for removal.
for idx, pp in enumerate(parent.particles):
if pp.name == particle.name and (idx, pp) not in particles_to_remove:
particles_to_remove.append((idx, pp))
break

for particle in parent.particles:
for idx, particle in enumerate(parent.particles):
if particle.name != element.name_short:
continue
if (idx, particle) in particles_to_remove:
continue

log_write(f' Add to list and remove particle {particle.name}.')
p_min_occurs = particle.min_occurs
p_max_occurs = particle.max_occurs
particle.min_occurs = 0
replacement_list.append(particle)
particles_to_remove.append((parent.particles.index(particle), particle))
particles_to_remove.append((idx, particle))

if len(replacement_list) > 0:
self.__replace_particle_list_in_parent(parent, particles_to_remove, replacement_list,
Expand Down Expand Up @@ -1355,6 +1389,9 @@ def __copy_particles_from_empty_content_elements_particle(self, element: Element
abstract=True,
min_occurs=0,
max_occurs=1)
ns = tools.extract_namespace_uri(element.name)
if ns:
part_new.namespace = ns
replacement_list.append(part_new)

self.__replace_particle_list_in_parent(parent, particles_to_remove, replacement_list,
Expand Down Expand Up @@ -1440,10 +1477,13 @@ def __scan_elements_for_empty_content(self):
type_short=element.type_short,
min_occurs=0,
max_occurs=1)
ns = tools.extract_namespace_uri(element.name)
if ns:
part.namespace = ns
re_list.append(part)

if len(re_list) > 0:
re_list.sort(key=lambda particle_key: particle_key.name)
re_list.sort(key=lambda p: (p.name or '', p.namespace or ''))
abstract_seq = []
for part in re_list:
log_write(f' Add particle from list {part.name}.')
Expand All @@ -1457,19 +1497,15 @@ def __scan_for_derived_and_extended_elements(self):
log_write('')
log_write('Scan for derived and extended elements')

def find_base_type(base_type_name):
result = None
for item in self.__current_schema.elements._target_dict.values():
if item.prefixed_name.startswith('xs:'):
continue
if item.type.base_type is None:
continue

if item.type.base_type.local_name == base_type_name:
result = item
break

return result
# Pre-build lookup: base_type_name -> [elements derived from it]
base_type_map = {}
for item in self.__current_schema.maps.elements.values():
if item.prefixed_name.startswith('xs:'):
continue
if item.type.base_type is None:
continue
key = item.type.base_type.local_name
base_type_map.setdefault(key, []).append(item)

element: ElementData
particle: Particle
Expand All @@ -1483,20 +1519,20 @@ def find_base_type(base_type_name):
list_with_missing.append(particle)
log_write(f' Adding abstract particle {particle.name} to missing list.')

# get the base type, add it as particle
missing_element = find_base_type(particle.type_short)
if missing_element is not None:
part = self.__get_particle(missing_element)

# get all elements derived from this type, add them as particles
missing_elements = base_type_map.get(particle.type_short, [])
if len(missing_elements) > 0:
particle.min_occurs_old = particle.min_occurs
particle.min_occurs = 0
list_with_missing.append(particle)
log_write(f' Adding particle {particle.name} to missing list.')

part.min_occurs_old = part.min_occurs
part.min_occurs = 0
list_with_missing.append(part)
log_write(f' Adding missing particle {part.name} to missing list.')
for missing_element in missing_elements:
part = self.__get_particle(missing_element)
part.min_occurs_old = part.min_occurs
part.min_occurs = 0
list_with_missing.append(part)
log_write(f' Adding missing particle {part.name} to missing list.')

if len(list_with_missing) > 0:
first = -1
Expand Down
11 changes: 5 additions & 6 deletions src/cbexigen/base_coder_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ def generate_element_grammars(self, element: ElementData):
choice_options = self.ChoiceOptions(element, particle)
combined_min_occurs_from_choice = \
choice_options.min_occurs if choice_options.particles else particle.min_occurs
if combined_min_occurs_from_choice == 1:
if combined_min_occurs_from_choice >= 1:
index_last_nonoptional_particle = particle_index

def _particle_is_in_choice(element: ElementData, particle: Particle):
Expand Down Expand Up @@ -631,11 +631,10 @@ def _add_particle_or_choice_list_to_details(
if m < _max:
# flag=Grammar.LOOP indicates that the _next_ grammar will be the same as this one
#
# Note: We do not loop on the mandatory elements of an array, it would require extra
# logic, and we have no min_occurs larger than 2 (i.e. no large repetition of mandatory
# elements) anyway in our existing standards.

if m > 1 and m > part.min_occurs - 1 and (part.max_occurs_old is None or m < part.max_occurs_old):
# Do not enter the self-loop until all mandatory occurrences have been consumed.
# Before minOccurs is reached, the grammar must only accept another START event;
# allowing END or LOOP too early changes the event-code width and misaligns EXI.
if m > 1 and m > part.min_occurs and (part.max_occurs_old is None or m < part.max_occurs_old):
flag = GrammarFlag.LOOP
skip_to_end = True
else:
Expand Down
6 changes: 3 additions & 3 deletions src/cbexigen/datatype_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def __generate_functions_enum(self):
comment = '// enum for function numbers'
enum_type = self.parameters['prefix'] + 'generatedFunctionNumbersType'
items = []
for value in self.scheme.elements._target_dict.values():
for value in self.scheme.maps.elements.values():
if value.default_namespace:
items.append(self.parameters['prefix'] + value.local_name)
items.sort()
Expand Down Expand Up @@ -538,7 +538,7 @@ def generate_file(self):
else:
element_type = self.scheme.types.get(element.type_short)
if element_type is None:
element_type = self.scheme.types._target_dict.get(element.type)
element_type = self.scheme.maps.types.get(element.type)
if element_type is None:
continue

Expand Down Expand Up @@ -580,7 +580,7 @@ def generate_file(self):

if skip_element:
curr_idx += 1
if curr_idx > len(self.__generate):
if curr_idx >= len(self.__generate):
log_write_error('Module datatypes: Generator loop aborted! Index larger than existing elements.')
break
else:
Expand Down
2 changes: 1 addition & 1 deletion src/cbexigen/decoder_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ def generate_file(self):

if skip_element:
curr_idx += 1
if curr_idx > len(self.elements_to_generate):
if curr_idx >= len(self.elements_to_generate):
log_write_error('Module decoder: Generator loop aborted! Index larger than existing elements.')
break
else:
Expand Down
1 change: 1 addition & 0 deletions src/cbexigen/elementData.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Particle:
integer_bit_size: int = -1
integer_base_type: str = None
integer_is_unsigned: bool = False
namespace: str = ''
# additional info for the anyType particle
process_content: str = None

Expand Down
2 changes: 1 addition & 1 deletion src/cbexigen/encoder_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ def generate_file(self):

if skip_element:
curr_idx += 1
if curr_idx > len(self.elements_to_generate):
if curr_idx >= len(self.elements_to_generate):
log_write_error('Module encoder: Generator loop aborted! Index larger than existing elements.')
break
else:
Expand Down
7 changes: 7 additions & 0 deletions src/cbexigen/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ def get_indent(level: int = 1):
return level * (' ' * CONFIG_PARAMS['c_code_indent_chars'])


def extract_namespace_uri(qualified_name: str) -> str:
"""Extract namespace URI from XSD qualified name like '{uri}localName'."""
if qualified_name and qualified_name.startswith('{') and '}' in qualified_name:
return qualified_name[1:qualified_name.index('}')]
return ''


''' generator tools '''


Expand Down
Loading