From 776d955a93fe20d1616bda07ff39104f517cbf63 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Fri, 18 Jul 2025 10:21:28 -0600 Subject: [PATCH 1/2] Adding Connectors and Indexes for Revocable Lanes --- jikkou/kafka-connectors-values.yaml | 19 +++++++++++++++++++ jikkou/kafka-topics-values.yaml | 2 +- mongo/create_indexes.js | 8 ++++---- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 0a720a0..3580661 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -239,6 +239,10 @@ apps: collectionName: CmBsmMessageCountProgressionEvents useTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentEvent + collectionName: CmRevocableEnabledLaneAlignmentEvent + useTimestamp: true + timestampField: eventGeneratedAt - topicName: topic.CmSpatMinimumDataEventAggregation collectionName: CmSpatMinimumDataEventAggregation generateTimestamp: true @@ -279,6 +283,12 @@ apps: collectionName: CmSpatMessageCountProgressionEventAggregation generateTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentEventAggregation + collectionName: CmRevocableEnabledLaneAlignmentEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + + # Record BSM events: - topicName: topic.CmBsmEvents @@ -363,6 +373,10 @@ apps: collectionName: CmEventStateProgressionNotification generateTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentNotification + collectionName: CmRevocableEnabledLaneAlignmentNotification + generateTimestamp: true + timestampField: eventGeneratedAt - topicName: topic.CmIntersectionReferenceAlignmentNotificationAggregation collectionName: CmIntersectionReferenceAlignmentNotificationAggregation generateTimestamp: true @@ -383,6 +397,11 @@ apps: collectionName: CmEventStateProgressionNotificationAggregation generateTimestamp: true timestampField: notificationGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentNotificationAggregation + collectionName: CmRevocableEnabledLaneAlignmentNotificationAggregation + generateTimestamp: true + timestampField: notificationGeneratedAt + mecdeposit: name: mecdeposit connectors: diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index fff14fd..528915d 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -113,6 +113,7 @@ apps: - topic.CmMapMessageCountProgressionEvents - topic.CmSpatMessageCountProgressionEvents - topic.CmEventStateProgressionEvent + - topic.CmRevocableEnabledLaneAlignmentEvent - topic.CmSpatMinimumDataEventAggregation - topic.CmMapMinimumDataEventAggregation - topic.CmIntersectionReferenceAlignmentEventAggregation @@ -123,7 +124,6 @@ apps: - topic.CmBsmMessageCountProgressionEventAggregation - topic.CmMapMessageCountProgressionEventAggregation - topic.CmSpatMessageCountProgressionEventAggregation - - topic.CmRevocableEnabledLaneAlignment - topic.CmRevocableEnabledLaneAlignmentEventAggregation tableTopics: - topic.CmLaneDirectionOfTravelNotification diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index 49ce2e1..82b8bea 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -119,6 +119,7 @@ const conflictMonitorCollections = [ { name: "CmSpatMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmMapMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmBsmMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmRevocableEnabledLaneAlignmentEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatMinimumDataEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmMapMinimumDataEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, @@ -131,7 +132,7 @@ const conflictMonitorCollections = [ { name: "CmBsmMessageCountProgressionEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmMapMessageCountProgressionEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatMessageCountProgressionEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, - + { name: "CmRevocableEnabledLaneAlignmentEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, // Conflict Monitor Assessments @@ -153,6 +154,7 @@ const conflictMonitorCollections = [ { name: "CmTimestampDeltaNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatTransitionNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmEventStateProgressionNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmRevocableEnabledLaneAlignmentNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmEventStateProgressionNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, @@ -161,9 +163,7 @@ const conflictMonitorCollections = [ { name: "CmSignalStateConflictNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmTimeChangeDetailsNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatTimeChangeDetailsNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, - - - + { name: "CmRevocableEnabledLaneAlignmentNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, // Reports { name: "CmReport", timeField: "reportGeneratedAt", intersectionField: "intersectionID"}, From c53e873a3979b4cc0aed81d16e84a42ebd190022 Mon Sep 17 00:00:00 2001 From: john-wiens Date: Tue, 22 Jul 2025 09:10:13 -0600 Subject: [PATCH 2/2] Adding docs and scripts for how to manage data in mongoDB --- docs/MongoDB_Configuration.md | 0 docs/Storage_Management.md | 60 ++++++++++++++++++++++++++++++ kafka-connect/delete_connectors.sh | 15 ++++++++ mongo/storage_checker.js | 22 +++++++++++ 4 files changed, 97 insertions(+) create mode 100644 docs/MongoDB_Configuration.md create mode 100644 docs/Storage_Management.md create mode 100644 kafka-connect/delete_connectors.sh create mode 100644 mongo/storage_checker.js diff --git a/docs/MongoDB_Configuration.md b/docs/MongoDB_Configuration.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/Storage_Management.md b/docs/Storage_Management.md new file mode 100644 index 0000000..1502fb1 --- /dev/null +++ b/docs/Storage_Management.md @@ -0,0 +1,60 @@ +# Storage Management + +The default configuration of the CV-Manager system is very data intensive and stores multiple forms of CV-Data redundantly in the database. While this provides some benefit for compliance and debugging purposes, it is also often a strain on limited cloud and storage budgets. Below are some tips and tricks for reducing the storage footprint of CV-Data + +## Stop syncing unused collections +By Default JPO-utils syncs a number of topics created by the ode, geojson-converter, Conflict Monitor and CV-Manager. However, many of these collections are unused by the CV-Manager system and therefore not required to be saved to the database. In particular, data formats such as the OdeSpatJson format which have lots of traffic can quickly become a storage burden on the system. + +1) To stop a topic from syncing to mongoDB users can modify the `kafka-connectors-values.yaml` file and remove entries. Make sure to remove the entire entry for the given topic. For example, to stop the OdeSpatJson topic from syncing remove the following: +``` +- topicName: topic.OdeSpatJson + collectionName: OdeSpatJson + generateTimestamp: true +``` + +Please note, that none of the Ode collections are required for running the GeoJson Converter, Conflict Monitor, or CV-Manager systems. The OdeSpatJson and OdeMapJson collections in particular should be deleted first as they generally contains significant data. + +2) Once a topic is deleted from the `kafka-connectors-values.yaml` file. Users can bring up the remaining connectors by rebuilding the kafka-setup container (see step 3). For existing deployments that already have the connectors created. The removed connectors will need to be removed manually. This can be done by manually deleting connectors using curl. +``` +curl -X DELETE "http://localhost:8083/connectors/sink.OdeSpatJson" >/dev/null 2>&1 +``` +Alternatively, users may delete all existing topics are recreate them. This may be faster if updating a large number of connectors. The [delete_connectors.sh](../kafka-connect/delete_connectors.sh) script will automatically delete all existing connectors. +``` +cd kafka-connect +./delete_connectors.sh +``` + +3) Once all the unused connectors have been deleted, reapply the connectors scripts to the cluster by rebuilding the kafka-connect-setup container. Make sure kafka_connect, kafka_connect_standalone, or kafka_connect_setup is specified in the COMPOSE_PROFILES variable of your .env file before rebuilding to ensure the setup container is rerun +``` +docker-compose up --build -d +``` + + + + +## Change MongoDB Data Retention Time +By default all collections in the default mongoDB are only retained for a set time window. Depending on deployment requirements, a shorter retention window may be desirable to reduce storage costs and improve database query speeds. Data retention is managed by adjusting TTL's on collections in mongoDB. Below are instructions to adjust the data retention of an existing mongoDB deployment. + +1) There are currently two environment variables that configure the data retention duration in mongoDB. The `MONGO_DATA_RETENTION_SECONDS` variable configures the number of seconds that data generated within the system should be retained for. Similarly, he `MONGO_ASN_RETENTION_SECONDS` variable configures the number of seconds that raw ASN.1 data should be retained for. By default Mongo Data is retained for 60 days, while ASN.1 data is configured for 24 hours. To change the data retention window, modify the .env file in the jpo-utils repo, or parent module and provide new values for these variables +``` +MONGO_DATA_RETENTION_SECONDS=604800 # 1 week +MONGO_ASN_RETENTION_SECONDS=3600 # 1 Hour +``` + +2) Once the variables are changed, rebuild the docker-setup container and let it run its setup procedure. Make sure that one of the following is specified in the `COMPOSE_PROFILES` variable to ensure the mongo setup container runs mongo, mongo_full, mongo_setup. Then run the following to reboot + +``` +docker-compose up --build -d +``` + +3) Once the indexes have been updated, verify that the change worked properly by checking the TTL index for the adjusted collections. This can be done by logging into the database using + +``` +mongosh -u -p --authenticationDatabase admin +use CV +db.ProcessedSpat.getIndexes(); +``` + +Alternatively, this may be checked in MongoDB compass + + diff --git a/kafka-connect/delete_connectors.sh b/kafka-connect/delete_connectors.sh new file mode 100644 index 0000000..149cc89 --- /dev/null +++ b/kafka-connect/delete_connectors.sh @@ -0,0 +1,15 @@ +CONNECT_URL="http://localhost:8083" +# Get the list of connectors +connectors=$(curl -s "$CONNECT_URL/connectors") + +# Remove brackets from the JSON array +connectors=${connectors:1:-1} +# Split the connectors into an array +IFS=', ' read -r -a connector_array <<< "$connectors" +# Loop through each connector and delete it +for connector in "${connector_array[@]}"; do + connector_name=$(echo "$connector" | tr -d '"') + echo "Deleting connector: $connector_name" + curl -X DELETE "$CONNECT_URL/connectors/$connector_name" >/dev/null 2>&1 +done +echo "All connectors deleted." else echo "No connectors found." \ No newline at end of file diff --git a/mongo/storage_checker.js b/mongo/storage_checker.js new file mode 100644 index 0000000..b56981c --- /dev/null +++ b/mongo/storage_checker.js @@ -0,0 +1,22 @@ +var collections = db.getCollectionNames(); +let totalAllocatedStorage = 0; +let totalFreeSpace = 0; + +for (var i = 0; i < collections.length; i++) { + let stats = db.getCollection(collections[i]).stats(); + let colStats = db.runCommand({"collstats": collections[i]}); + let blockManager = colStats["wiredTiger"]["block-manager"]; + let freeSpace = Number(blockManager["file bytes available for reuse"]); + let allocatedStorage = Number(blockManager["file size in bytes"]); + let indexSize = Number(stats.totalIndexSize); + + totalAllocatedStorage += allocatedStorage + indexSize; + totalFreeSpace += freeSpace; + + print(collections[i], allocatedStorage/1024/1024/1024, freeSpace/1024/1024/1024, indexSize/1024/1024/1024); + +} + +print("Total On Disk Storage: " + (totalAllocatedStorage + totalFreeSpace) / 1024 / 1024 / 1024 + " GB"); +print("Total Free Space: " + totalFreeSpace / 1024 / 1024 / 1024 + " GB"); +print("Effective Database Size: " + totalAllocatedStorage/ 1024 / 1024 / 1024 + " GB"); \ No newline at end of file