Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5a72820
Adding in new environment variable for configuring Kafka Segment Length
John-Wiens May 12, 2025
de33706
Adding some descriptive text on Kafka Log rotation principles
John-Wiens May 12, 2025
509c83f
Merge branch 'develop' of github.com:CDOT-CV/jpo-utils into log-rotation
John-Wiens May 12, 2025
01421a6
Merge pull request #25 from CDOT-CV/log-rotation
John-Wiens May 12, 2025
3d802b9
Merge branch 'develop' of github.com:CDOT-CV/jpo-utils into develop
John-Wiens May 13, 2025
a29fd30
Merge branch 'usdot-jpo-ode:develop' into develop
jacob6838 May 15, 2025
ac6e146
Merge branch 'master' into develop
jacob6838 May 15, 2025
964a46b
Merge branch 'develop' of https://github.com/CDOT-CV/jpo-utils into d…
Michael7371 May 20, 2025
0af0503
Add topics for revocable lane events
iyourshaw May 28, 2025
9051ac8
Merge pull request #27 from usdot-jpo-ode/develop
drewjj Jun 4, 2025
91176a7
Adding in RSU status Connector to Jikkou
John-Wiens Jun 5, 2025
367e7cc
Merge branch 'develop' of github.com:usdot-jpo-ode/jpo-utils into rsu…
John-Wiens Jun 5, 2025
140350f
Adding in RSU Status connector
John-Wiens Jun 5, 2025
2d51983
Merge branch 'develop' of github.com:usdot-jpo-ode/jpo-utils into dev…
John-Wiens Jun 13, 2025
9d2b0ab
Removing Mongo Index Env vars
John-Wiens Jun 13, 2025
3fae7ab
Merge branch 'usdot-jpo-ode:develop' into develop
drewjj Jun 13, 2025
ba15f48
Add notification topics
iyourshaw Jun 19, 2025
bd9d91c
Merge pull request #30 from CDOT-CV/iyourshaw/revocable-lane-topics
iyourshaw Jun 24, 2025
aef16b5
Merge branch 'develop' of https://github.com/usdot-jpo-ode/jpo-utils …
iyourshaw Jun 29, 2025
d32a800
Merge branch 'develop' of https://github.com/CDOT-CV/jpo-utils into d…
iyourshaw Jun 29, 2025
41ecfcb
Merge branch 'develop' of https://github.com/CDOT-CV/jpo-utils into d…
Michael7371 Jul 7, 2025
52cfd92
Merge branch 'usdot-jpo-ode:develop' into develop
drewjj Jul 11, 2025
d8a65b8
Update Kafka and Kafka connectors with RTCM
drewjj Jul 11, 2025
386df26
Merge pull request #31 from CDOT-CV/rtcm
drewjj Jul 11, 2025
38d5c3d
Merge pull request #28 from CDOT-CV/rsu-status
John-Wiens Jul 18, 2025
5003f41
Merge branch 'develop' of github.com:CDOT-CV/jpo-utils into mongo-env…
John-Wiens Jul 18, 2025
92f7b26
Merge pull request #29 from CDOT-CV/mongo-env-fixes
John-Wiens Jul 18, 2025
776d955
Adding Connectors and Indexes for Revocable Lanes
John-Wiens Jul 18, 2025
c53e873
Adding docs and scripts for how to manage data in mongoDB
John-Wiens Jul 22, 2025
3d4f53f
Merge pull request #32 from CDOT-CV/revocable-lane-support
John-Wiens Jul 22, 2025
fd26b25
Merge pull request #33 from CDOT-CV/mongo-storage-management-document…
John-Wiens Jul 22, 2025
056c86e
Add jpo-ode RSM topics to Jikkou
drewjj Jul 25, 2025
49e53d6
Merge pull request #34 from CDOT-CV/rsm_topic_support
drewjj Jul 25, 2025
d3b3150
Merge branch 'develop' of https://github.com/CDOT-CV/jpo-utils into d…
iyourshaw Jul 27, 2025
5cbdb09
rtcm topics
iyourshaw Jul 27, 2025
45928ab
Merge branch 'develop' of https://github.com/CDOT-CV/jpo-utils into d…
Michael7371 Jul 29, 2025
74e79cc
Merge pull request #35 from CDOT-CV/rtcm-topics
drewjj Aug 1, 2025
6ddc0a2
Remove RawEncodedTimJson from MongoDB and the Deduplicator connectors
drewjj Aug 1, 2025
87fbe05
Remove OdeRawEncoded connectors
drewjj Aug 1, 2025
281b032
Merge pull request #36 from CDOT-CV/remove_rawtimdedup
drewjj Aug 1, 2025
af2224a
Adding additionalIndexes generator and OdeBsmJson index
jacob6838 Aug 8, 2025
f428642
Merge pull request #37 from CDOT-CV/bsm-indexes
jacob6838 Aug 8, 2025
0c4c405
Add topics for RsuStatusMonitor
drewjj Sep 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ An optional `kafka-init`, `schema-registry`, and `kafka-ui` instance can be depl
- `kafka_schema_registry` - deploys a `kafka-schema-registry` service that can be used to manage schemas for kafka topics
- `kafka_ui` - deploys a [web interface](https://github.com/kafbat/kafka-ui) to interact with the kafka cluster

### Kafka Topic Log Retention and Rotation

During operation, Kafka stores all the messages published to topics in files called logs. These logs are stored on the host system and can quickly become quite large for deployments with real CV volumes of data. A kafka log may be split across multiple files called segments. Each segment is limited in size based upon the KAFKA_LOG_SEGMENT_BYTES environment variable. The number of segments stored is variable but collectively, the total volume of all segments for 1 partition will not exceed the value specified in KAFKA_LOG_RETENTION_BYTES. When Kafka needs to delete data, either because the total KAFKA_LOG_RETENTION_BYTES is exceeded or because data in the oldest segment exceeds the KAFKA_LOG_RETENTION_HOURS an entire segment file will be deleted. Please note, Kafka will never delete the active log segment. So even if the data is far older than the value specified in KAFKA_LOG_RETENTION_HOURS the data may not be deleted if there is not enough data to fill up the segment and cause the creation of a new log segment. Additionally, please note that the values specified in KAFKA_LOG_RETENTION_BYTES and KAFKA_LOG_SEGMENT_BYTES are on a per topic per partition basis. So for a deployment with multiple partitions and topics, the total used storage will likely become quite large even for what may seem like small individual log and segment sizes. When configuring the KAFKA_LOG_SEGMENT_BYTES and KAFKA_LOG_RETENTION_BYTES variables. Make sure that the KAFKA_LOG_RETENTION_BYTES is larger than the KAFKA_LOG_SEGMENT_BYTES. If the value is not larger, Kafka will not be able to generate a new log segment before needing to rotate the logs which will lead to unpredictable behavior.

### Configure Topic Creation

The Kafka topics created by the `kafka-setup` service are configured in the [kafka-topics-values.yaml](jikkou/kafka-topics-values.yaml) file. The topics in that file are organized by the application, and sorted into "Stream Topics" (those with `cleanup.policy` = `delete`) and "Table Topics" (with `cleanup.policy` = `compact`).
Expand Down
1 change: 1 addition & 0 deletions docker-compose-connect.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ services:
CONNECT_CREATE_ODE: ${CONNECT_CREATE_ODE:-true}
CONNECT_CREATE_GEOJSONCONVERTER: ${CONNECT_CREATE_GEOJSONCONVERTER:-true}
CONNECT_CREATE_CONFLICTMONITOR: ${CONNECT_CREATE_CONFLICTMONITOR:-true}
CONNECT_CREATE_INTERSECTION_API: ${CONNECT_CREATE_INTERSECTION_API:-true}
CONNECT_CREATE_DEDUPLICATOR: ${CONNECT_CREATE_DEDUPLICATOR:-false}
CONNECT_CREATE_MECDEPOSIT: ${CONNECT_CREATE_MECDEPOSIT:-false}
MONGO_CONNECTOR_USERNAME: ${MONGO_ADMIN_DB_USER:-admin}
Expand Down
3 changes: 3 additions & 0 deletions docker-compose-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ services:
KAFKA_CFG_DELETE_TOPIC_ENABLE: "true"
KAFKA_CFG_LOG_RETENTION_HOURS: ${KAFKA_LOG_RETENTION_HOURS:-3}
KAFKA_CFG_LOG_RETENTION_BYTES: ${KAFKA_LOG_RETENTION_BYTES:-10737418240}
KAFKA_CFG_LOG_SEGMENT_BYTES: ${KAFKA_LOG_SEGMENT_BYTES:-1073741824}
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
logging:
options:
Expand Down Expand Up @@ -69,6 +70,8 @@ services:
KAFKA_TOPIC_CREATE_CONFLICTMONITOR: ${KAFKA_TOPIC_CREATE_CONFLICTMONITOR:-true}
KAFKA_TOPIC_CREATE_DEDUPLICATOR: ${KAFKA_TOPIC_CREATE_DEDUPLICATOR:-false}
KAFKA_TOPIC_CREATE_MECDEPOSIT: ${KAFKA_TOPIC_CREATE_MECDEPOSIT:-false}
KAFKA_TOPIC_CREATE_INTERSECTION_API: ${KAFKA_TOPIC_CREATE_INTERSECTION_API:-true}
KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR: ${KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR:-false}

KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:-kafka:9092}
KAFKA_TOPIC_PARTITIONS: ${KAFKA_TOPIC_PARTITIONS:-1}
Expand Down
11 changes: 6 additions & 5 deletions docker-compose-mongo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ services:
- kafka_connect_standalone
- mongo_full
- mongo
- mongo_setup
image: mongo:8
hostname: mongo_setup
depends_on:
Expand Down Expand Up @@ -92,10 +93,10 @@ services:

MONGO_SAMPLE_DATA_RELATIVE_PATH: ${MONGO_SAMPLE_DATA_RELATIVE_PATH:-./mongo/dump}

MONGO_INDEX_CREATE_ODE: ${MONGO_INDEX_CREATE_ODE:-true}
MONGO_INDEX_CREATE_GEOJSONCONVERTER: ${MONGO_INDEX_CREATE_GEOJSONCONVERTER:-true}
MONGO_INDEX_CREATE_CONFLICTMONITOR: ${MONGO_INDEX_CREATE_CONFLICTMONITOR:-true}
MONGO_INDEX_CREATE_DEDUPLICATOR: ${MONGO_INDEX_CREATE_DEDUPLICATOR:-false}
CONNECT_CREATE_ODE: ${CONNECT_CREATE_ODE:-true}
CONNECT_CREATE_GEOJSONCONVERTER: ${CONNECT_CREATE_GEOJSONCONVERTER:-true}
CONNECT_CREATE_CONFLICTMONITOR: ${CONNECT_CREATE_CONFLICTMONITOR:-true}
CONNECT_CREATE_DEDUPLICATOR: ${CONNECT_CREATE_DEDUPLICATOR:-false}
entrypoint: ["/bin/bash", "-c", "./setup_mongo.sh && ./restore_mongo.sh"]
volumes:
- ${MONGO_SETUP_SCRIPT_RELATIVE_PATH:-./mongo/setup_mongo.sh}:/setup_mongo.sh
Expand Down Expand Up @@ -137,4 +138,4 @@ services:
retries: 4

volumes:
mongo_data:
mongo_data:
Empty file added docs/MongoDB_Configuration.md
Empty file.
60 changes: 60 additions & 0 deletions docs/Storage_Management.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Storage Management

The default configuration of the CV-Manager system is very data intensive and stores multiple forms of CV-Data redundantly in the database. While this provides some benefit for compliance and debugging purposes, it is also often a strain on limited cloud and storage budgets. Below are some tips and tricks for reducing the storage footprint of CV-Data

## Stop syncing unused collections
By Default JPO-utils syncs a number of topics created by the ode, geojson-converter, Conflict Monitor and CV-Manager. However, many of these collections are unused by the CV-Manager system and therefore not required to be saved to the database. In particular, data formats such as the OdeSpatJson format which have lots of traffic can quickly become a storage burden on the system.

1) To stop a topic from syncing to mongoDB users can modify the `kafka-connectors-values.yaml` file and remove entries. Make sure to remove the entire entry for the given topic. For example, to stop the OdeSpatJson topic from syncing remove the following:
```
- topicName: topic.OdeSpatJson
collectionName: OdeSpatJson
generateTimestamp: true
```

Please note, that none of the Ode collections are required for running the GeoJson Converter, Conflict Monitor, or CV-Manager systems. The OdeSpatJson and OdeMapJson collections in particular should be deleted first as they generally contains significant data.

2) Once a topic is deleted from the `kafka-connectors-values.yaml` file. Users can bring up the remaining connectors by rebuilding the kafka-setup container (see step 3). For existing deployments that already have the connectors created. The removed connectors will need to be removed manually. This can be done by manually deleting connectors using curl.
```
curl -X DELETE "http://localhost:8083/connectors/sink.OdeSpatJson" >/dev/null 2>&1
```
Alternatively, users may delete all existing topics are recreate them. This may be faster if updating a large number of connectors. The [delete_connectors.sh](../kafka-connect/delete_connectors.sh) script will automatically delete all existing connectors.
```
cd kafka-connect
./delete_connectors.sh
```

3) Once all the unused connectors have been deleted, reapply the connectors scripts to the cluster by rebuilding the kafka-connect-setup container. Make sure kafka_connect, kafka_connect_standalone, or kafka_connect_setup is specified in the COMPOSE_PROFILES variable of your .env file before rebuilding to ensure the setup container is rerun
```
docker-compose up --build -d
```




## Change MongoDB Data Retention Time
By default all collections in the default mongoDB are only retained for a set time window. Depending on deployment requirements, a shorter retention window may be desirable to reduce storage costs and improve database query speeds. Data retention is managed by adjusting TTL's on collections in mongoDB. Below are instructions to adjust the data retention of an existing mongoDB deployment.

1) There are currently two environment variables that configure the data retention duration in mongoDB. The `MONGO_DATA_RETENTION_SECONDS` variable configures the number of seconds that data generated within the system should be retained for. Similarly, he `MONGO_ASN_RETENTION_SECONDS` variable configures the number of seconds that raw ASN.1 data should be retained for. By default Mongo Data is retained for 60 days, while ASN.1 data is configured for 24 hours. To change the data retention window, modify the .env file in the jpo-utils repo, or parent module and provide new values for these variables
```
MONGO_DATA_RETENTION_SECONDS=604800 # 1 week
MONGO_ASN_RETENTION_SECONDS=3600 # 1 Hour
```

2) Once the variables are changed, rebuild the docker-setup container and let it run its setup procedure. Make sure that one of the following is specified in the `COMPOSE_PROFILES` variable to ensure the mongo setup container runs mongo, mongo_full, mongo_setup. Then run the following to reboot

```
docker-compose up --build -d
```

3) Once the indexes have been updated, verify that the change worked properly by checking the TTL index for the adjusted collections. This can be done by logging into the database using

```
mongosh -u <username> -p <password> --authenticationDatabase admin
use CV
db.ProcessedSpat.getIndexes();
```

Alternatively, this may be checked in MongoDB compass


4 changes: 4 additions & 0 deletions jikkou/kafka-connectors-template.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ spec:
{{ create_connector(values.apps.conflictmonitor) }}
{% endif %}

{% if system.env.CONNECT_CREATE_INTERSECTION_API %}
{{ create_connector(values.apps.intersection_api) }}
{% endif %}

{% if system.env.CONNECT_CREATE_DEDUPLICATOR %}
{{ create_connector(values.apps.deduplicator) }}
{% else %}
Expand Down
72 changes: 44 additions & 28 deletions jikkou/kafka-connectors-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,6 @@ apps:
ode:
name: jpo-ode
connectors:
- topicName: topic.OdeRawEncodedBSMJson
collectionName: OdeRawEncodedBSMJson
generateTimestamp: true
- topicName: topic.OdeRawEncodedMAPJson
collectionName: OdeRawEncodedMAPJson
generateTimestamp: true
- topicName: topic.OdeRawEncodedSPATJson
collectionName: OdeRawEncodedSPATJson
generateTimestamp: true
- topicName: topic.OdeSpatJson
collectionName: OdeSpatJson
generateTimestamp: true
Expand All @@ -67,33 +58,27 @@ apps:
- topicName: topic.OdeTIMCertExpirationTimeJson
collectionName: OdeTIMCertExpirationTimeJson
generateTimestamp: true
- topicName: topic.OdeRawEncodedPSMJson
collectionName: OdeRawEncodedPSMJson
generateTimestamp: true
- topicName: topic.OdePsmJson
collectionName: OdePsmJson
generateTimestamp: true
- topicName: topic.OdeRawEncodedSRMJson
collectionName: OdeRawEncodedSRMJson
generateTimestamp: true
- topicName: topic.OdeSrmJson
collectionName: OdeSrmJson
generateTimestamp: true
- topicName: topic.OdeRawEncodedSSMJson
collectionName: OdeRawEncodedSSMJson
generateTimestamp: true
- topicName: topic.OdeSsmJson
collectionName: OdeSsmJson
generateTimestamp: true
- topicName: topic.OdeDriverAlertJson
collectionName: OdeDriverAlertJson
generateTimestamp: true
- topicName: topic.OdeRawEncodedSDSMJson
collectionName: OdeRawEncodedSDSMJson
generateTimestamp: true
- topicName: topic.OdeSdsmJson
collectionName: OdeSdsmJson
generateTimestamp: true
- topicName: topic.OdeRtcmJson
collectionName: OdeRtcmJson
generateTimestamp: true
- topicName: topic.OdeRsmJson
collectionName: OdeRsmJson
generateTimestamp: true
ode_duplicated:
name: ode-duplicated
connectors:
Expand All @@ -106,9 +91,6 @@ apps:
- topicName: topic.OdeBsmJson
collectionName: OdeBsmJson
generateTimestamp: true
- topicName: topic.OdeRawEncodedTIMJson
collectionName: OdeRawEncodedTIMJson
generateTimestamp: true
geojsonconverter_duplicated:
name: geojsonconverter-duplicated
connectors:
Expand All @@ -121,6 +103,13 @@ apps:
- topicName: topic.ProcessedBsm
collectionName: ProcessedBsm
generateTimestamp: true
intersection_api:
name: intersection_api
connectors:
- topicName: topic.IntersectionApiRsuStatus
collectionName: IntersectionApiRsuStatus
useTimestamp: true
timestampField: timestamp
deduplicator:
name: deduplicator
connectors:
Expand All @@ -136,10 +125,6 @@ apps:
collectionName: OdeTimJson
generateTimestamp: true
connectorName: DeduplicatedOdeTimJson
- topicName: topic.DeduplicatedOdeRawEncodedTIMJson
collectionName: OdeRawEncodedTIMJson
generateTimestamp: true
connectorName: DeduplicatedOdeRawEncodedTIMJson
- topicName: topic.DeduplicatedOdeBsmJson
collectionName: OdeBsmJson
generateTimestamp: true
Expand Down Expand Up @@ -226,6 +211,18 @@ apps:
collectionName: CmBsmMessageCountProgressionEvents
useTimestamp: true
timestampField: eventGeneratedAt
- topicName: topic.CmRevocableEnabledLaneAlignmentEvent
collectionName: CmRevocableEnabledLaneAlignmentEvent
useTimestamp: true
timestampField: eventGeneratedAt
- topicName: topic.CmRtcmBroadcastRateEvent
collectionName: CmRtcmBroadcastRateEvent
useTimestamp: true
timestampField: eventGeneratedAt
- topicName: topic.CmRtcmMinimumDataEvent
collectionName: CmRtcmMinimumDataEvent
useTimestamp: true
timestampField: eventGeneratedAt
- topicName: topic.CmSpatMinimumDataEventAggregation
collectionName: CmSpatMinimumDataEventAggregation
generateTimestamp: true
Expand Down Expand Up @@ -266,6 +263,16 @@ apps:
collectionName: CmSpatMessageCountProgressionEventAggregation
generateTimestamp: true
timestampField: eventGeneratedAt
- topicName: topic.CmRevocableEnabledLaneAlignmentEventAggregation
collectionName: CmRevocableEnabledLaneAlignmentEventAggregation
generateTimestamp: true
timestampField: eventGeneratedAt
- topicName: topic.CmRtcmMinimumDataEventAggregation
collectionName: CmRtcmMinimumDataEventAggregation
generateTimestamp: true
timestampField: eventGeneratedAt



# Record BSM events:
- topicName: topic.CmBsmEvents
Expand Down Expand Up @@ -350,6 +357,10 @@ apps:
collectionName: CmEventStateProgressionNotification
generateTimestamp: true
timestampField: eventGeneratedAt
- topicName: topic.CmRevocableEnabledLaneAlignmentNotification
collectionName: CmRevocableEnabledLaneAlignmentNotification
generateTimestamp: true
timestampField: eventGeneratedAt
- topicName: topic.CmIntersectionReferenceAlignmentNotificationAggregation
collectionName: CmIntersectionReferenceAlignmentNotificationAggregation
generateTimestamp: true
Expand All @@ -370,6 +381,11 @@ apps:
collectionName: CmEventStateProgressionNotificationAggregation
generateTimestamp: true
timestampField: notificationGeneratedAt
- topicName: topic.CmRevocableEnabledLaneAlignmentNotificationAggregation
collectionName: CmRevocableEnabledLaneAlignmentNotificationAggregation
generateTimestamp: true
timestampField: notificationGeneratedAt

mecdeposit:
name: mecdeposit
connectors:
Expand Down
8 changes: 8 additions & 0 deletions jikkou/kafka-topics-template.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ spec:
{{ create_topics(values.apps.conflictmonitor) }}
{% endif %}

{% if system.env.KAFKA_TOPIC_CREATE_INTERSECTION_API %}
{{ create_topics(values.apps.intersection_api) }}
{% endif %}

{% if system.env.KAFKA_TOPIC_CREATE_DEDUPLICATOR %}
{{ create_topics(values.apps.deduplicator) }}
{% endif %}
Expand All @@ -80,6 +84,10 @@ spec:
{{ create_topics(values.apps.mecdeposit) }}
{% endif %}

{% if system.env.KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR %}
{{ create_topics(values.apps.rsustatusmonitor) }}
{% endif %}

{% if system.env.KAFKA_TOPIC_CREATE_OTHER %}
{{ create_topics(values.apps.other) }}
{% endif %}
Loading
Loading