From 5a728209c9442a964efa5e86cbaf7fb4e1b8b4a6 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Mon, 12 May 2025 13:37:14 -0600 Subject: [PATCH 01/16] Adding in new environment variable for configuring Kafka Segment Length --- docker-compose-kafka.yml | 1 + sample.env | 1 + 2 files changed, 2 insertions(+) diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml index 44937f1..faeee35 100644 --- a/docker-compose-kafka.yml +++ b/docker-compose-kafka.yml @@ -37,6 +37,7 @@ services: KAFKA_CFG_DELETE_TOPIC_ENABLE: "true" KAFKA_CFG_LOG_RETENTION_HOURS: ${KAFKA_LOG_RETENTION_HOURS} KAFKA_CFG_LOG_RETENTION_BYTES: ${KAFKA_LOG_RETENTION_BYTES} + KAFKA_CFG_LOG_SEGMENT_BYTES: ${KAFKA_LOG_SEGMENT_BYTES} KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" logging: options: diff --git a/sample.env b/sample.env index aaa9fd5..0c8075b 100644 --- a/sample.env +++ b/sample.env @@ -37,6 +37,7 @@ COMPOSE_PROFILES=all KAFKA_BOOTSTRAP_SERVERS=${DOCKER_HOST_IP}:9092 KAFKA_LOG_RETENTION_HOURS=3 KAFKA_LOG_RETENTION_BYTES=10737418240 # 10GB +KAFKA_LOG_SEGMENT_BYTES=1073741824 # 1GB. Must be less than KAFKA_LOG_RETENTION_BYTES # Variables for creating kafka topics: KAFKA_TOPIC_PARTITIONS=1 From de33706161cec413fc0291538af71735b7f5fe99 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Mon, 12 May 2025 13:37:33 -0600 Subject: [PATCH 02/16] Adding some descriptive text on Kafka Log rotation principles --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 06c3daf..719dab2 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,10 @@ An optional `kafka-init`, `schema-registry`, and `kafka-ui` instance can be depl - `kafka_schema_registry` - deploys a `kafka-schema-registry` service that can be used to manage schemas for kafka topics - `kafka_ui` - deploys a [web interface](https://github.com/kafbat/kafka-ui) to interact with the kafka cluster +### Kafka Topic Log Retention and Rotation + +During operation, Kafka stores all the messages published to topics in files called logs. These logs are stored on the host system and can quickly become quite large for deployments with real CV volumes of data. A kafka log may be split across multiple files called segments. Each segment is limited in size based upon the KAFKA_LOG_SEGMENT_BYTES environment variable. The number of segments stored is variable but collectively, the total volume of all segments for 1 partition will not exceed the value specified in KAFKA_LOG_RETENTION_BYTES. When Kafka needs to delete data, either because the total KAFKA_LOG_RETENTION_BYTES is exceeded or because data in the oldest segment exceeds the KAFKA_LOG_RETENTION_HOURS an entire segment file will be deleted. Please note, Kafka will never delete the active log segment. So even if the data is far older than the value specified in KAFKA_LOG_RETENTION_HOURS the data may not be deleted if there is not enough data to fill up the segment and cause the creation of a new log segment. Additionally, please note that the values specified in KAFKA_LOG_RETENTION_BYTES and KAFKA_LOG_SEGMENT_BYTES are on a per topic per partition basis. So for a deployment with multiple partitions and topics, the total used storage will likely become quite large even for what may seem like small individual log and segment sizes. When configuring the KAFKA_LOG_SEGMENT_BYTES and KAFKA_LOG_RETENTION_BYTES variables. Make sure that the KAFKA_LOG_RETENTION_BYTES is larger than the KAFKA_LOG_SEGMENT_BYTES. If the value is not larger, Kafka will not be able to generate a new log segment before needing to rotate the logs which will lead to unpredictable behavior. + ### Configure Topic Creation The Kafka topics created by the `kafka-setup` service are configured in the [kafka-topics-values.yaml](jikkou/kafka-topics-values.yaml) file. The topics in that file are organized by the application, and sorted into "Stream Topics" (those with `cleanup.policy` = `delete`) and "Table Topics" (with `cleanup.policy` = `compact`). From 0af05033fb834fab20b674d76114cab7f53f87d5 Mon Sep 17 00:00:00 2001 From: Ivan Yourshaw <39739503+iyourshaw@users.noreply.github.com> Date: Wed, 28 May 2025 16:49:10 -0600 Subject: [PATCH 03/16] Add topics for revocable lane events --- jikkou/kafka-topics-values.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index fa8265f..52a3fcc 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -130,6 +130,8 @@ apps: - topic.CmBsmMessageCountProgressionEventAggregation - topic.CmMapMessageCountProgressionEventAggregation - topic.CmSpatMessageCountProgressionEventAggregation + - topic.CmRevocableEnabledLaneAlignment + - topic.CmRevocableEnabledLaneAlignmentEventAggregation tableTopics: - topic.CmLaneDirectionOfTravelNotification - topic.CmConnectionOfTravelNotification From 91176a701fa89df54f2dd07ee68274faffe8d06c Mon Sep 17 00:00:00 2001 From: john-wiens Date: Thu, 5 Jun 2025 10:04:00 -0600 Subject: [PATCH 04/16] Adding in RSU status Connector to Jikkou --- jikkou/kafka-connectors-values.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 69d3421..6a1bd90 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -115,6 +115,13 @@ apps: - topicName: topic.ProcessedBsm collectionName: ProcessedBsm generateTimestamp: true + intersection_api: + name: intersection_api + connectors: + topicName: IntersectionApiRsuStatus + collectionName: IntersectionApiRsuStatus + useTimestamp: true + timestampField: timestamp deduplicator: name: deduplicator connectors: From 140350fbbcccd1b2db11310a3bd7b9cdd5ee70ef Mon Sep 17 00:00:00 2001 From: John-Wiens Date: Thu, 5 Jun 2025 16:16:32 -0600 Subject: [PATCH 05/16] Adding in RSU Status connector --- docker-compose-connect.yml | 1 + docker-compose-kafka.yml | 1 + docker-compose-mongo.yml | 4 +++- jikkou/kafka-connectors-template.jinja | 4 ++++ jikkou/kafka-connectors-values.yaml | 8 ++++---- jikkou/kafka-topics-template.jinja | 4 ++++ jikkou/kafka-topics-values.yaml | 5 +++++ mongo/create_indexes.js | 9 +++++++++ mongo/setup_mongo.sh | 1 + sample.env | 3 +++ 10 files changed, 35 insertions(+), 5 deletions(-) diff --git a/docker-compose-connect.yml b/docker-compose-connect.yml index ba59536..5cd5a22 100644 --- a/docker-compose-connect.yml +++ b/docker-compose-connect.yml @@ -78,6 +78,7 @@ services: CONNECT_CREATE_ODE: ${CONNECT_CREATE_ODE:-true} CONNECT_CREATE_GEOJSONCONVERTER: ${CONNECT_CREATE_GEOJSONCONVERTER:-true} CONNECT_CREATE_CONFLICTMONITOR: ${CONNECT_CREATE_CONFLICTMONITOR:-true} + CONNECT_CREATE_INTERSECTION_API: ${CONNECT_CREATE_INTERSECTION_API:-true} CONNECT_CREATE_DEDUPLICATOR: ${CONNECT_CREATE_DEDUPLICATOR:-false} CONNECT_CREATE_MECDEPOSIT: ${CONNECT_CREATE_MECDEPOSIT:-false} MONGO_CONNECTOR_USERNAME: ${MONGO_ADMIN_DB_USER:-admin} diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml index 105c0c8..986f968 100644 --- a/docker-compose-kafka.yml +++ b/docker-compose-kafka.yml @@ -70,6 +70,7 @@ services: KAFKA_TOPIC_CREATE_CONFLICTMONITOR: ${KAFKA_TOPIC_CREATE_CONFLICTMONITOR:-true} KAFKA_TOPIC_CREATE_DEDUPLICATOR: ${KAFKA_TOPIC_CREATE_DEDUPLICATOR:-false} KAFKA_TOPIC_CREATE_MECDEPOSIT: ${KAFKA_TOPIC_CREATE_MECDEPOSIT:-false} + KAFKA_TOPIC_CREATE_INTERSECTION_API: ${KAFKA_TOPIC_CREATE_INTERSECTION_API:-true} KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:-kafka:9092} KAFKA_TOPIC_PARTITIONS: ${KAFKA_TOPIC_PARTITIONS:-1} diff --git a/docker-compose-mongo.yml b/docker-compose-mongo.yml index a1a6f98..b39b825 100644 --- a/docker-compose-mongo.yml +++ b/docker-compose-mongo.yml @@ -64,6 +64,7 @@ services: - kafka_connect_standalone - mongo_full - mongo + - mongo_setup image: mongo:8 hostname: mongo_setup depends_on: @@ -96,6 +97,7 @@ services: MONGO_INDEX_CREATE_GEOJSONCONVERTER: ${MONGO_INDEX_CREATE_GEOJSONCONVERTER:-true} MONGO_INDEX_CREATE_CONFLICTMONITOR: ${MONGO_INDEX_CREATE_CONFLICTMONITOR:-true} MONGO_INDEX_CREATE_DEDUPLICATOR: ${MONGO_INDEX_CREATE_DEDUPLICATOR:-false} + MONGO_INDEX_CREATE_INTERSECTION_API: ${MONGO_INDEX_CREATE_INTERSECTION_API:-true} entrypoint: ["/bin/bash", "-c", "./setup_mongo.sh && ./restore_mongo.sh"] volumes: - ${MONGO_SETUP_SCRIPT_RELATIVE_PATH:-./mongo/setup_mongo.sh}:/setup_mongo.sh @@ -137,4 +139,4 @@ services: retries: 4 volumes: - mongo_data: \ No newline at end of file + mongo_data: diff --git a/jikkou/kafka-connectors-template.jinja b/jikkou/kafka-connectors-template.jinja index eb9a95d..cfb4d08 100644 --- a/jikkou/kafka-connectors-template.jinja +++ b/jikkou/kafka-connectors-template.jinja @@ -60,6 +60,10 @@ spec: {{ create_connector(values.apps.conflictmonitor) }} {% endif %} +{% if system.env.CONNECT_CREATE_INTERSECTION_API %} +{{ create_connector(values.apps.intersection_api) }} +{% endif %} + {% if system.env.CONNECT_CREATE_DEDUPLICATOR %} {{ create_connector(values.apps.deduplicator) }} {% else %} diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 3c81445..c4706e6 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -124,10 +124,10 @@ apps: intersection_api: name: intersection_api connectors: - topicName: IntersectionApiRsuStatus - collectionName: IntersectionApiRsuStatus - useTimestamp: true - timestampField: timestamp + - topicName: topic.IntersectionApiRsuStatus + collectionName: IntersectionApiRsuStatus + useTimestamp: true + timestampField: timestamp deduplicator: name: deduplicator connectors: diff --git a/jikkou/kafka-topics-template.jinja b/jikkou/kafka-topics-template.jinja index 66fd363..dd411f4 100644 --- a/jikkou/kafka-topics-template.jinja +++ b/jikkou/kafka-topics-template.jinja @@ -72,6 +72,10 @@ spec: {{ create_topics(values.apps.conflictmonitor) }} {% endif %} +{% if system.env.KAFKA_TOPIC_CREATE_INTERSECTION_API %} +{{ create_topics(values.apps.intersection_api) }} +{% endif %} + {% if system.env.KAFKA_TOPIC_CREATE_DEDUPLICATOR %} {{ create_topics(values.apps.deduplicator) }} {% endif %} diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index cf8ece1..8574db2 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -166,6 +166,11 @@ apps: - topic.CmEventStateProgressionNotificationAggregation customTopics: {} + intersection_api: + name: intersection_api + customTopics: + - topicName: topic.IntersectionApiRsuStatus + retentionMs: 43200000 deduplicator: name: jpo-deduplicator streamTopics: {} diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index b48ce28..49ce2e1 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -38,6 +38,7 @@ const CONNECT_CREATE_ODE = process.env['CONNECT_CREATE_ODE'] || true; const CONNECT_CREATE_GEOJSONCONVERTER = process.env['CONNECT_CREATE_GEOJSONCONVERTER'] || true; const CONNECT_CREATE_CONFLICTMONITOR = process.env['CONNECT_CREATE_CONFLICTMONITOR'] || true; const CONNECT_CREATE_DEDUPLICATOR = process.env['CONNECT_CREATE_DEDUPLICATOR'] || true; +const CONNECT_INDEX_CREATE_INTERSECTION_API = process.env['CONNECT_CREATE_INTERSECTION_API'] || true; const users = [ @@ -169,6 +170,10 @@ const conflictMonitorCollections = [ ]; +let intersectionAPICollections = [ + { name: "IntersectionApiRsuStatus", timeField: "timestamp", intersectionField: "intersectionID", rsuIP:"rsuIP"}, +]; + let collections = []; if(CONNECT_CREATE_ODE){ @@ -183,6 +188,10 @@ if(CONNECT_CREATE_CONFLICTMONITOR){ collections = collections.concat(conflictMonitorCollections); } +if(CONNECT_INDEX_CREATE_INTERSECTION_API){ + collections = collections.concat(intersectionAPICollections); +} + try{ diff --git a/mongo/setup_mongo.sh b/mongo/setup_mongo.sh index c6f0a77..7ac783a 100755 --- a/mongo/setup_mongo.sh +++ b/mongo/setup_mongo.sh @@ -1,6 +1,7 @@ #!/bin/bash until mongosh --host mongo:27017 --eval 'quit(db.runCommand({ ping: 1 }).ok ? 0 : 2)' &>/dev/null; do + echo "Waiting for valid Ping from MongoDB" sleep 1 done diff --git a/sample.env b/sample.env index b54fb70..d5b610e 100644 --- a/sample.env +++ b/sample.env @@ -55,6 +55,7 @@ KAFKA_TOPIC_DELETE_RETENTION_MS=3600000 KAFKA_TOPIC_CREATE_ODE=true # Create topics for ODE KAFKA_TOPIC_CREATE_GEOJSONCONVERTER=true # Create topics for GeoJSON Converter KAFKA_TOPIC_CREATE_CONFLICTMONITOR=true # Create topics for Conflict Monitor +KAFKA_TOPIC_CREATE_INTERSECTION_API=true # Create topics for Intersection API KAFKA_TOPIC_CREATE_DEDUPLICATOR=false # Create topics for Deduplicator KAFKA_TOPIC_CREATE_MECDEPOSIT=false # Create topics for MecDeposit KAFKA_TOPIC_CREATE_OTHER=false # Create topics for other applications @@ -113,6 +114,7 @@ MONGO_INDEX_CREATE_ODE=true # Create indexes for ODE MONGO_INDEX_CREATE_GEOJSONCONVERTER=true # Create indexes for GeoJSON Converter MONGO_INDEX_CREATE_CONFLICTMONITOR=true # Create indexes for Conflict Monitor MONGO_INDEX_CREATE_DEDUPLICATOR=false # Create indexes for Deduplicator +MONGO_INDEX_CREATE_INTERSECTION_API=true # Create indexes for Intersection API # Relative path to the MongoDB init script, upper level directories are supported MONGO_SETUP_SCRIPT_RELATIVE_PATH="./mongo/setup_mongo.sh" @@ -140,6 +142,7 @@ CONNECT_TASKS_MAX=1 # Number of concurrent tasks to configur CONNECT_CREATE_ODE=true # Create kafka connectors to MongoDB for ODE CONNECT_CREATE_GEOJSONCONVERTER=true # Create kafka connectors to MongoDB for GeoJSON Converter CONNECT_CREATE_CONFLICTMONITOR=true # Create kafka connectors to MongoDB for Conflict Monitor +CONNECT_CREATE_INTERSECTION_API=true # Create kafka Connectors to MongoDB for Intersection API CONNECT_CREATE_DEDUPLICATOR=false # Create kafka connectors to MongoDB for Deduplicator CONNECT_CREATE_MECDEPOSIT=false # Create kafka connectors to MongoDB for MecDeposit CONNECT_CREATE_OTHER=false # Create kafka connectors to MongoDB for other applications From 9d2b0ab72b4f8fcd2280c8ee0ec3c2ee94552126 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Fri, 13 Jun 2025 13:05:31 -0600 Subject: [PATCH 06/16] Removing Mongo Index Env vars --- docker-compose-mongo.yml | 8 ++++---- sample.env | 5 ----- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/docker-compose-mongo.yml b/docker-compose-mongo.yml index a1a6f98..4dbba34 100644 --- a/docker-compose-mongo.yml +++ b/docker-compose-mongo.yml @@ -92,10 +92,10 @@ services: MONGO_SAMPLE_DATA_RELATIVE_PATH: ${MONGO_SAMPLE_DATA_RELATIVE_PATH:-./mongo/dump} - MONGO_INDEX_CREATE_ODE: ${MONGO_INDEX_CREATE_ODE:-true} - MONGO_INDEX_CREATE_GEOJSONCONVERTER: ${MONGO_INDEX_CREATE_GEOJSONCONVERTER:-true} - MONGO_INDEX_CREATE_CONFLICTMONITOR: ${MONGO_INDEX_CREATE_CONFLICTMONITOR:-true} - MONGO_INDEX_CREATE_DEDUPLICATOR: ${MONGO_INDEX_CREATE_DEDUPLICATOR:-false} + CONNECT_CREATE_ODE: ${CONNECT_CREATE_ODE:-true} + CONNECT_CREATE_GEOJSONCONVERTER: ${CONNECT_CREATE_GEOJSONCONVERTER:-true} + CONNECT_CREATE_CONFLICTMONITOR: ${CONNECT_CREATE_CONFLICTMONITOR:-true} + CONNECT_CREATE_DEDUPLICATOR: ${CONNECT_CREATE_DEDUPLICATOR:-false} entrypoint: ["/bin/bash", "-c", "./setup_mongo.sh && ./restore_mongo.sh"] volumes: - ${MONGO_SETUP_SCRIPT_RELATIVE_PATH:-./mongo/setup_mongo.sh}:/setup_mongo.sh diff --git a/sample.env b/sample.env index b54fb70..d868b35 100644 --- a/sample.env +++ b/sample.env @@ -109,11 +109,6 @@ MONGO_DATABASE_COMPACTION_TRIGGER_PERCENT=MONGO_DATABASE_COMPACTION_TRIGGER_PERC MONGO_ENABLE_STORAGE_RECORD=true MONGO_ENABLE_DYNAMIC_TTL=true -MONGO_INDEX_CREATE_ODE=true # Create indexes for ODE -MONGO_INDEX_CREATE_GEOJSONCONVERTER=true # Create indexes for GeoJSON Converter -MONGO_INDEX_CREATE_CONFLICTMONITOR=true # Create indexes for Conflict Monitor -MONGO_INDEX_CREATE_DEDUPLICATOR=false # Create indexes for Deduplicator - # Relative path to the MongoDB init script, upper level directories are supported MONGO_SETUP_SCRIPT_RELATIVE_PATH="./mongo/setup_mongo.sh" MONGO_RESTORE_SCRIPT_RELATIVE_PATH="./mongo/restore_mongo.sh" From ba15f48e159144b91f922d63cde999a7b2926f28 Mon Sep 17 00:00:00 2001 From: Ivan Yourshaw <39739503+iyourshaw@users.noreply.github.com> Date: Thu, 19 Jun 2025 08:47:36 -0600 Subject: [PATCH 07/16] Add notification topics --- jikkou/kafka-topics-values.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 52a3fcc..960b95e 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -159,11 +159,13 @@ apps: - topic.CmTimestampDeltaNotification - topic.CmSpatTransitionNotification - topic.CmEventStateProgressionNotification + - topic.CmRevocableEnabledLaneAlignmentNotification - topic.CmIntersectionReferenceAlignmentNotificationAggregation - topic.CmSignalGroupAlignmentNotificationAggregation - topic.CmSignalStateConflictNotificationAggregation - topic.CmSpatTimeChangeDetailsNotificationAggregation - topic.CmEventStateProgressionNotificationAggregation + - topic.CmRevocableEnabledLaneAlignmentNotificationAggregation customTopics: {} deduplicator: From d8a65b891e0e0eda10733384bfe04c82338caa7f Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Fri, 11 Jul 2025 10:53:59 -0600 Subject: [PATCH 08/16] Update Kafka and Kafka connectors with RTCM --- jikkou/kafka-connectors-values.yaml | 6 ++++++ jikkou/kafka-topics-values.yaml | 14 ++------------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 3e994eb..6651ef3 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -94,6 +94,12 @@ apps: - topicName: topic.OdeSdsmJson collectionName: OdeSdsmJson generateTimestamp: true + - topicName: topic.OdeRawEncodedRTCMJson + collectionName: OdeRawEncodedRTCMJson + generateTimestamp: true + - topicName: topic.OdeRtcmJson + collectionName: OdeRtcmJson + generateTimestamp: true ode_duplicated: name: ode-duplicated connectors: diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index cf0e9bb..f254c9e 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -46,19 +46,12 @@ apps: ode: name: jpo-ode streamTopics: - - topic.OdeBsmPojo - - topic.OdeSpatTxPojo - - topic.OdeSpatPojo - topic.OdeSpatJson - topic.FilteredOdeSpatJson - - topic.OdeSpatRxJson - - topic.OdeSpatRxPojo - topic.OdeBsmJson - topic.FilteredOdeBsmJson - topic.OdeTimJson - topic.OdeTimJsonTMCFiltered - - topic.OdeTimBroadcastJson - - topic.J2735TimBroadcastJson - topic.OdeTimKTableJson - topic.FilteredOdeTimJson - topic.OdeDriverAlertJson @@ -72,20 +65,17 @@ apps: - topic.OdeRawEncodedSPATJson - topic.OdeRawEncodedTIMJson - topic.OdeRawEncodedMAPJson - - topic.OdeMapTxPojo - topic.OdeMapJson - topic.OdeRawEncodedSSMJson - - topic.OdeSsmPojo - topic.OdeSsmJson - topic.OdeRawEncodedSRMJson - - topic.OdeSrmTxPojo - topic.OdeSrmJson - topic.OdeRawEncodedPSMJson - - topic.OdePsmTxPojo - topic.OdePsmJson - - topic.OdeTimRxJson - topic.OdeRawEncodedSDSMJson - topic.OdeSdsmJson + - topic.OdeRawEncodedRTCMJson + - topic.OdeRtcmJson tableTopics: {} customTopics: {} geojsonconverter: From 776d955a93fe20d1616bda07ff39104f517cbf63 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Fri, 18 Jul 2025 10:21:28 -0600 Subject: [PATCH 09/16] Adding Connectors and Indexes for Revocable Lanes --- jikkou/kafka-connectors-values.yaml | 19 +++++++++++++++++++ jikkou/kafka-topics-values.yaml | 2 +- mongo/create_indexes.js | 8 ++++---- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 0a720a0..3580661 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -239,6 +239,10 @@ apps: collectionName: CmBsmMessageCountProgressionEvents useTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentEvent + collectionName: CmRevocableEnabledLaneAlignmentEvent + useTimestamp: true + timestampField: eventGeneratedAt - topicName: topic.CmSpatMinimumDataEventAggregation collectionName: CmSpatMinimumDataEventAggregation generateTimestamp: true @@ -279,6 +283,12 @@ apps: collectionName: CmSpatMessageCountProgressionEventAggregation generateTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentEventAggregation + collectionName: CmRevocableEnabledLaneAlignmentEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + + # Record BSM events: - topicName: topic.CmBsmEvents @@ -363,6 +373,10 @@ apps: collectionName: CmEventStateProgressionNotification generateTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentNotification + collectionName: CmRevocableEnabledLaneAlignmentNotification + generateTimestamp: true + timestampField: eventGeneratedAt - topicName: topic.CmIntersectionReferenceAlignmentNotificationAggregation collectionName: CmIntersectionReferenceAlignmentNotificationAggregation generateTimestamp: true @@ -383,6 +397,11 @@ apps: collectionName: CmEventStateProgressionNotificationAggregation generateTimestamp: true timestampField: notificationGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentNotificationAggregation + collectionName: CmRevocableEnabledLaneAlignmentNotificationAggregation + generateTimestamp: true + timestampField: notificationGeneratedAt + mecdeposit: name: mecdeposit connectors: diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index fff14fd..528915d 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -113,6 +113,7 @@ apps: - topic.CmMapMessageCountProgressionEvents - topic.CmSpatMessageCountProgressionEvents - topic.CmEventStateProgressionEvent + - topic.CmRevocableEnabledLaneAlignmentEvent - topic.CmSpatMinimumDataEventAggregation - topic.CmMapMinimumDataEventAggregation - topic.CmIntersectionReferenceAlignmentEventAggregation @@ -123,7 +124,6 @@ apps: - topic.CmBsmMessageCountProgressionEventAggregation - topic.CmMapMessageCountProgressionEventAggregation - topic.CmSpatMessageCountProgressionEventAggregation - - topic.CmRevocableEnabledLaneAlignment - topic.CmRevocableEnabledLaneAlignmentEventAggregation tableTopics: - topic.CmLaneDirectionOfTravelNotification diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index 49ce2e1..82b8bea 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -119,6 +119,7 @@ const conflictMonitorCollections = [ { name: "CmSpatMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmMapMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmBsmMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmRevocableEnabledLaneAlignmentEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatMinimumDataEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmMapMinimumDataEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, @@ -131,7 +132,7 @@ const conflictMonitorCollections = [ { name: "CmBsmMessageCountProgressionEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmMapMessageCountProgressionEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatMessageCountProgressionEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, - + { name: "CmRevocableEnabledLaneAlignmentEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, // Conflict Monitor Assessments @@ -153,6 +154,7 @@ const conflictMonitorCollections = [ { name: "CmTimestampDeltaNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatTransitionNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmEventStateProgressionNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmRevocableEnabledLaneAlignmentNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmEventStateProgressionNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, @@ -161,9 +163,7 @@ const conflictMonitorCollections = [ { name: "CmSignalStateConflictNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmTimeChangeDetailsNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatTimeChangeDetailsNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, - - - + { name: "CmRevocableEnabledLaneAlignmentNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, // Reports { name: "CmReport", timeField: "reportGeneratedAt", intersectionField: "intersectionID"}, From c53e873a3979b4cc0aed81d16e84a42ebd190022 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Tue, 22 Jul 2025 09:10:13 -0600 Subject: [PATCH 10/16] Adding docs and scripts for how to manage data in mongoDB --- docs/MongoDB_Configuration.md | 0 docs/Storage_Management.md | 60 ++++++++++++++++++++++++++++++ kafka-connect/delete_connectors.sh | 15 ++++++++ mongo/storage_checker.js | 22 +++++++++++ 4 files changed, 97 insertions(+) create mode 100644 docs/MongoDB_Configuration.md create mode 100644 docs/Storage_Management.md create mode 100644 kafka-connect/delete_connectors.sh create mode 100644 mongo/storage_checker.js diff --git a/docs/MongoDB_Configuration.md b/docs/MongoDB_Configuration.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/Storage_Management.md b/docs/Storage_Management.md new file mode 100644 index 0000000..1502fb1 --- /dev/null +++ b/docs/Storage_Management.md @@ -0,0 +1,60 @@ +# Storage Management + +The default configuration of the CV-Manager system is very data intensive and stores multiple forms of CV-Data redundantly in the database. While this provides some benefit for compliance and debugging purposes, it is also often a strain on limited cloud and storage budgets. Below are some tips and tricks for reducing the storage footprint of CV-Data + +## Stop syncing unused collections +By Default JPO-utils syncs a number of topics created by the ode, geojson-converter, Conflict Monitor and CV-Manager. However, many of these collections are unused by the CV-Manager system and therefore not required to be saved to the database. In particular, data formats such as the OdeSpatJson format which have lots of traffic can quickly become a storage burden on the system. + +1) To stop a topic from syncing to mongoDB users can modify the `kafka-connectors-values.yaml` file and remove entries. Make sure to remove the entire entry for the given topic. For example, to stop the OdeSpatJson topic from syncing remove the following: +``` +- topicName: topic.OdeSpatJson + collectionName: OdeSpatJson + generateTimestamp: true +``` + +Please note, that none of the Ode collections are required for running the GeoJson Converter, Conflict Monitor, or CV-Manager systems. The OdeSpatJson and OdeMapJson collections in particular should be deleted first as they generally contains significant data. + +2) Once a topic is deleted from the `kafka-connectors-values.yaml` file. Users can bring up the remaining connectors by rebuilding the kafka-setup container (see step 3). For existing deployments that already have the connectors created. The removed connectors will need to be removed manually. This can be done by manually deleting connectors using curl. +``` +curl -X DELETE "http://localhost:8083/connectors/sink.OdeSpatJson" >/dev/null 2>&1 +``` +Alternatively, users may delete all existing topics are recreate them. This may be faster if updating a large number of connectors. The [delete_connectors.sh](../kafka-connect/delete_connectors.sh) script will automatically delete all existing connectors. +``` +cd kafka-connect +./delete_connectors.sh +``` + +3) Once all the unused connectors have been deleted, reapply the connectors scripts to the cluster by rebuilding the kafka-connect-setup container. Make sure kafka_connect, kafka_connect_standalone, or kafka_connect_setup is specified in the COMPOSE_PROFILES variable of your .env file before rebuilding to ensure the setup container is rerun +``` +docker-compose up --build -d +``` + + + + +## Change MongoDB Data Retention Time +By default all collections in the default mongoDB are only retained for a set time window. Depending on deployment requirements, a shorter retention window may be desirable to reduce storage costs and improve database query speeds. Data retention is managed by adjusting TTL's on collections in mongoDB. Below are instructions to adjust the data retention of an existing mongoDB deployment. + +1) There are currently two environment variables that configure the data retention duration in mongoDB. The `MONGO_DATA_RETENTION_SECONDS` variable configures the number of seconds that data generated within the system should be retained for. Similarly, he `MONGO_ASN_RETENTION_SECONDS` variable configures the number of seconds that raw ASN.1 data should be retained for. By default Mongo Data is retained for 60 days, while ASN.1 data is configured for 24 hours. To change the data retention window, modify the .env file in the jpo-utils repo, or parent module and provide new values for these variables +``` +MONGO_DATA_RETENTION_SECONDS=604800 # 1 week +MONGO_ASN_RETENTION_SECONDS=3600 # 1 Hour +``` + +2) Once the variables are changed, rebuild the docker-setup container and let it run its setup procedure. Make sure that one of the following is specified in the `COMPOSE_PROFILES` variable to ensure the mongo setup container runs mongo, mongo_full, mongo_setup. Then run the following to reboot + +``` +docker-compose up --build -d +``` + +3) Once the indexes have been updated, verify that the change worked properly by checking the TTL index for the adjusted collections. This can be done by logging into the database using + +``` +mongosh -u -p --authenticationDatabase admin +use CV +db.ProcessedSpat.getIndexes(); +``` + +Alternatively, this may be checked in MongoDB compass + + diff --git a/kafka-connect/delete_connectors.sh b/kafka-connect/delete_connectors.sh new file mode 100644 index 0000000..149cc89 --- /dev/null +++ b/kafka-connect/delete_connectors.sh @@ -0,0 +1,15 @@ +CONNECT_URL="http://localhost:8083" +# Get the list of connectors +connectors=$(curl -s "$CONNECT_URL/connectors") + +# Remove brackets from the JSON array +connectors=${connectors:1:-1} +# Split the connectors into an array +IFS=', ' read -r -a connector_array <<< "$connectors" +# Loop through each connector and delete it +for connector in "${connector_array[@]}"; do + connector_name=$(echo "$connector" | tr -d '"') + echo "Deleting connector: $connector_name" + curl -X DELETE "$CONNECT_URL/connectors/$connector_name" >/dev/null 2>&1 +done +echo "All connectors deleted." else echo "No connectors found." \ No newline at end of file diff --git a/mongo/storage_checker.js b/mongo/storage_checker.js new file mode 100644 index 0000000..b56981c --- /dev/null +++ b/mongo/storage_checker.js @@ -0,0 +1,22 @@ +var collections = db.getCollectionNames(); +let totalAllocatedStorage = 0; +let totalFreeSpace = 0; + +for (var i = 0; i < collections.length; i++) { + let stats = db.getCollection(collections[i]).stats(); + let colStats = db.runCommand({"collstats": collections[i]}); + let blockManager = colStats["wiredTiger"]["block-manager"]; + let freeSpace = Number(blockManager["file bytes available for reuse"]); + let allocatedStorage = Number(blockManager["file size in bytes"]); + let indexSize = Number(stats.totalIndexSize); + + totalAllocatedStorage += allocatedStorage + indexSize; + totalFreeSpace += freeSpace; + + print(collections[i], allocatedStorage/1024/1024/1024, freeSpace/1024/1024/1024, indexSize/1024/1024/1024); + +} + +print("Total On Disk Storage: " + (totalAllocatedStorage + totalFreeSpace) / 1024 / 1024 / 1024 + " GB"); +print("Total Free Space: " + totalFreeSpace / 1024 / 1024 / 1024 + " GB"); +print("Effective Database Size: " + totalAllocatedStorage/ 1024 / 1024 / 1024 + " GB"); \ No newline at end of file From 056c86e802f0850d1270ef0b41119bc8860118ee Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Fri, 25 Jul 2025 04:32:12 -0600 Subject: [PATCH 11/16] Add jpo-ode RSM topics to Jikkou --- jikkou/kafka-connectors-values.yaml | 6 ++++++ jikkou/kafka-topics-values.yaml | 2 ++ 2 files changed, 8 insertions(+) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 3580661..94e8861 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -100,6 +100,12 @@ apps: - topicName: topic.OdeRtcmJson collectionName: OdeRtcmJson generateTimestamp: true + - topicName: topic.OdeRawEncodedRSMJson + collectionName: OdeRawEncodedRSMJson + generateTimestamp: true + - topicName: topic.OdeRsmJson + collectionName: OdeRsmJson + generateTimestamp: true ode_duplicated: name: ode-duplicated connectors: diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 528915d..8c9732a 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -76,6 +76,8 @@ apps: - topic.OdeSdsmJson - topic.OdeRawEncodedRTCMJson - topic.OdeRtcmJson + - topic.OdeRawEncodedRSMJson + - topic.OdeRsmJson tableTopics: {} customTopics: {} geojsonconverter: From 5cbdb095271710823eeb8024874fef913d7506a7 Mon Sep 17 00:00:00 2001 From: Ivan Yourshaw <39739503+iyourshaw@users.noreply.github.com> Date: Sun, 27 Jul 2025 16:26:22 -0600 Subject: [PATCH 12/16] rtcm topics --- jikkou/kafka-connectors-values.yaml | 12 ++++++++++++ jikkou/kafka-topics-values.yaml | 1 + mongo/create_indexes.js | 2 +- 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 94e8861..1ae0fbd 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -249,6 +249,14 @@ apps: collectionName: CmRevocableEnabledLaneAlignmentEvent useTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRtcmBroadcastRateEvent + collectionName: CmRtcmBroadcastRateEvent + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmRtcmMinimumDataEvent + collectionName: CmRtcmMinimumDataEvent + useTimestamp: true + timestampField: eventGeneratedAt - topicName: topic.CmSpatMinimumDataEventAggregation collectionName: CmSpatMinimumDataEventAggregation generateTimestamp: true @@ -293,6 +301,10 @@ apps: collectionName: CmRevocableEnabledLaneAlignmentEventAggregation generateTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRtcmMinimumDataEventAggregation + collectionName: CmRtcmMinimumDataEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 8c9732a..625969b 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -87,6 +87,7 @@ apps: - topic.ProcessedMap - topic.ProcessedMapWKT - topic.ProcessedBsm + - topic.ProcessedRtcm tableTopics: {} customTopics: {} conflictmonitor: diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index 82b8bea..5d0cf41 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -89,7 +89,7 @@ const odeCollections = [ const geoJsonConverterCollections = [ {name: "ProcessedMap", ttlField: "recordGeneratedAt", timeField: "properties.timeStamp", intersectionField: "properties.intersectionId", expireTime: expireSeconds}, {name: "ProcessedSpat", ttlField: "recordGeneratedAt", timeField: "utcTimeStamp", intersectionField: "intersectionId", expireTime: expireSeconds}, - {name: "ProcessedBsm", ttlField: "recordGeneratedAt", timeField: "timeStamp", geoSpatialField: "geometry.coordinates", expireTime: expireSeconds}, + {name: "ProcessedBsm", ttlField: "recordGeneratedAt", timeField: "timeStamp", geoSpatialField: "geometry.coordinates", expireTime: expireSeconds} ]; From 6ddc0a2526c5a0cef841a3ad3df245f7010133a6 Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Fri, 1 Aug 2025 15:23:56 -0600 Subject: [PATCH 13/16] Remove RawEncodedTimJson from MongoDB and the Deduplicator connectors --- jikkou/kafka-connectors-values.yaml | 7 ------- jikkou/kafka-topics-values.yaml | 2 -- mongo/create_indexes.js | 8 -------- 3 files changed, 17 deletions(-) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 1ae0fbd..e970341 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -118,9 +118,6 @@ apps: - topicName: topic.OdeBsmJson collectionName: OdeBsmJson generateTimestamp: true - - topicName: topic.OdeRawEncodedTIMJson - collectionName: OdeRawEncodedTIMJson - generateTimestamp: true geojsonconverter_duplicated: name: geojsonconverter-duplicated connectors: @@ -155,10 +152,6 @@ apps: collectionName: OdeTimJson generateTimestamp: true connectorName: DeduplicatedOdeTimJson - - topicName: topic.DeduplicatedOdeRawEncodedTIMJson - collectionName: OdeRawEncodedTIMJson - generateTimestamp: true - connectorName: DeduplicatedOdeRawEncodedTIMJson - topicName: topic.DeduplicatedOdeBsmJson collectionName: OdeBsmJson generateTimestamp: true diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 625969b..21a60c2 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -182,8 +182,6 @@ apps: retentionMs: 43200000 - topicName: topic.DeduplicatedOdeTimJson retentionMs: 43200000 - - topicName: topic.DeduplicatedOdeRawEncodedTIMJson - retentionMs: 43200000 - topicName: topic.DeduplicatedOdeBsmJson retentionMs: 43200000 - topicName: topic.DeduplicatedProcessedBsm diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index 5d0cf41..ad6c91d 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -75,14 +75,6 @@ const odeCollections = [ {name: "OdeTimJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeTimBroadcastJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeTIMCertExpirationTimeJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, - - // Ode Raw ASN - {name: "OdeRawEncodedBSMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedMAPJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedSPATJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedSRMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedSSMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedTIMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, ]; // GeoJson Converter Data From 87fbe05072f1ce3d5c1468e3a596a170e9cef0ff Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Fri, 1 Aug 2025 15:28:19 -0600 Subject: [PATCH 14/16] Remove OdeRawEncoded connectors --- jikkou/kafka-connectors-values.yaml | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index e970341..e0b5f6b 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -46,15 +46,6 @@ apps: ode: name: jpo-ode connectors: - - topicName: topic.OdeRawEncodedBSMJson - collectionName: OdeRawEncodedBSMJson - generateTimestamp: true - - topicName: topic.OdeRawEncodedMAPJson - collectionName: OdeRawEncodedMAPJson - generateTimestamp: true - - topicName: topic.OdeRawEncodedSPATJson - collectionName: OdeRawEncodedSPATJson - generateTimestamp: true - topicName: topic.OdeSpatJson collectionName: OdeSpatJson generateTimestamp: true @@ -67,42 +58,24 @@ apps: - topicName: topic.OdeTIMCertExpirationTimeJson collectionName: OdeTIMCertExpirationTimeJson generateTimestamp: true - - topicName: topic.OdeRawEncodedPSMJson - collectionName: OdeRawEncodedPSMJson - generateTimestamp: true - topicName: topic.OdePsmJson collectionName: OdePsmJson generateTimestamp: true - - topicName: topic.OdeRawEncodedSRMJson - collectionName: OdeRawEncodedSRMJson - generateTimestamp: true - topicName: topic.OdeSrmJson collectionName: OdeSrmJson generateTimestamp: true - - topicName: topic.OdeRawEncodedSSMJson - collectionName: OdeRawEncodedSSMJson - generateTimestamp: true - topicName: topic.OdeSsmJson collectionName: OdeSsmJson generateTimestamp: true - topicName: topic.OdeDriverAlertJson collectionName: OdeDriverAlertJson generateTimestamp: true - - topicName: topic.OdeRawEncodedSDSMJson - collectionName: OdeRawEncodedSDSMJson - generateTimestamp: true - topicName: topic.OdeSdsmJson collectionName: OdeSdsmJson generateTimestamp: true - - topicName: topic.OdeRawEncodedRTCMJson - collectionName: OdeRawEncodedRTCMJson - generateTimestamp: true - topicName: topic.OdeRtcmJson collectionName: OdeRtcmJson generateTimestamp: true - - topicName: topic.OdeRawEncodedRSMJson - collectionName: OdeRawEncodedRSMJson - generateTimestamp: true - topicName: topic.OdeRsmJson collectionName: OdeRsmJson generateTimestamp: true From af2224a3d6523b12711aa724836e2bca38677866 Mon Sep 17 00:00:00 2001 From: jacob6838 Date: Fri, 8 Aug 2025 07:29:29 -0800 Subject: [PATCH 15/16] Adding additionalIndexes generator and OdeBsmJson index --- mongo/create_indexes.js | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index ad6c91d..0344f5d 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -62,7 +62,7 @@ const odeCollections = [ // ODE Json data {name: "OdeDriverAlertJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeBsmJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, - {name: "OdeBsmJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeBsmJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds, additionalIndexes: [{"metadata.odeReceivedAt": -1, "payload.data.coreData.position.latitude": 1, "payload.data.coreData.position.longitude": 1}]}, {name: "OdeMapJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeMapJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeSpatJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, @@ -232,6 +232,7 @@ do { createTimeIntersectionIndex(collection); createTimeRsuIpIndex(collection); createGeoSpatialIndex(collection); + createAdditionalIndexes(collection); }else{ missing_collection_count++; console.log("Collection " + collection['name'] + " does not exist yet"); @@ -440,6 +441,24 @@ function createGeoSpatialIndex(collection){ } +function createAdditionalIndexes(collection){ + for (const additionalIndex of collection['additionalIndexes'] || []) { + console.log("Creating additional index for " + collectionName + " with index: " + JSON.stringify(additionalIndex)); + const index_name = Object.entries(additionalIndex).map(([key, value]) => `${key}_${value}`).join("_"); + if (indexExists(collection['name'], index_name)) { + continue; + } + try { + db[collectionName].createIndex(additionalIndex); + console.log("Created additional index for " + collectionName + " with index: " + JSON.stringify(additionalIndex)); + } catch (err) { + console.log("Failed to create additional index for " + collectionName + " with index: " + JSON.stringify(additionalIndex)); + console.log(err); + } + } + +} + function ttlIndexExists(collection) { return db[collection['name']].getIndexes().find((idx) => idx.hasOwnProperty("expireAfterSeconds")) !== undefined; } @@ -459,3 +478,7 @@ function timeIndexExists(collection){ function geoSpatialIndexExists(collection){ return db[collection['name']].getIndexes().find((idx) => idx.name == collection['geoSpatialField'] + "_2dsphere_timeStamp_-1") !== undefined; } + +function indexExists(collectionName, indexName){ + return db[collectionName].getIndexes().find((idx) => idx.name == indexName) !== undefined; +} From 0c4c405190441d222915d705e014ee344a9009b2 Mon Sep 17 00:00:00 2001 From: Drew Johnston Date: Tue, 2 Sep 2025 12:04:04 -0600 Subject: [PATCH 16/16] Add topics for RsuStatusMonitor --- docker-compose-kafka.yml | 1 + jikkou/kafka-topics-template.jinja | 4 ++++ jikkou/kafka-topics-values.yaml | 7 +++++++ sample.env | 1 + 4 files changed, 13 insertions(+) diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml index 986f968..9eb1f91 100644 --- a/docker-compose-kafka.yml +++ b/docker-compose-kafka.yml @@ -71,6 +71,7 @@ services: KAFKA_TOPIC_CREATE_DEDUPLICATOR: ${KAFKA_TOPIC_CREATE_DEDUPLICATOR:-false} KAFKA_TOPIC_CREATE_MECDEPOSIT: ${KAFKA_TOPIC_CREATE_MECDEPOSIT:-false} KAFKA_TOPIC_CREATE_INTERSECTION_API: ${KAFKA_TOPIC_CREATE_INTERSECTION_API:-true} + KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR: ${KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR:-false} KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:-kafka:9092} KAFKA_TOPIC_PARTITIONS: ${KAFKA_TOPIC_PARTITIONS:-1} diff --git a/jikkou/kafka-topics-template.jinja b/jikkou/kafka-topics-template.jinja index dd411f4..bb1b736 100644 --- a/jikkou/kafka-topics-template.jinja +++ b/jikkou/kafka-topics-template.jinja @@ -84,6 +84,10 @@ spec: {{ create_topics(values.apps.mecdeposit) }} {% endif %} +{% if system.env.KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR %} +{{ create_topics(values.apps.rsustatusmonitor) }} +{% endif %} + {% if system.env.KAFKA_TOPIC_CREATE_OTHER %} {{ create_topics(values.apps.other) }} {% endif %} diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 21a60c2..1717dd6 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -208,6 +208,13 @@ apps: - topic.MecDepositMetrics tableTopics: {} customTopics: {} + rsustatusmonitor: + name: rsu-status-monitor + streamTopics: + - topic.RmNearestNeighborUnresponsiveEvent + tableTopics: + - topic.RmIntersectionStatusRecords + customTopics: {} other: name: other-topics streamTopics: {} diff --git a/sample.env b/sample.env index 43f79c7..eaafb11 100644 --- a/sample.env +++ b/sample.env @@ -58,6 +58,7 @@ KAFKA_TOPIC_CREATE_CONFLICTMONITOR=true # Create topics for Conflict Monitor KAFKA_TOPIC_CREATE_INTERSECTION_API=true # Create topics for Intersection API KAFKA_TOPIC_CREATE_DEDUPLICATOR=false # Create topics for Deduplicator KAFKA_TOPIC_CREATE_MECDEPOSIT=false # Create topics for MecDeposit +KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR=false # Create topics for RsuStatusMonitor KAFKA_TOPIC_CREATE_OTHER=false # Create topics for other applications # Relative path to the Kafka topics values file, upper level directories are supported