diff --git a/README.md b/README.md index c9a94d5..19df946 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,10 @@ An optional `kafka-init`, `schema-registry`, and `kafka-ui` instance can be depl - `kafka_schema_registry` - deploys a `kafka-schema-registry` service that can be used to manage schemas for kafka topics - `kafka_ui` - deploys a [web interface](https://github.com/kafbat/kafka-ui) to interact with the kafka cluster +### Kafka Topic Log Retention and Rotation + +During operation, Kafka stores all the messages published to topics in files called logs. These logs are stored on the host system and can quickly become quite large for deployments with real CV volumes of data. A kafka log may be split across multiple files called segments. Each segment is limited in size based upon the KAFKA_LOG_SEGMENT_BYTES environment variable. The number of segments stored is variable but collectively, the total volume of all segments for 1 partition will not exceed the value specified in KAFKA_LOG_RETENTION_BYTES. When Kafka needs to delete data, either because the total KAFKA_LOG_RETENTION_BYTES is exceeded or because data in the oldest segment exceeds the KAFKA_LOG_RETENTION_HOURS an entire segment file will be deleted. Please note, Kafka will never delete the active log segment. So even if the data is far older than the value specified in KAFKA_LOG_RETENTION_HOURS the data may not be deleted if there is not enough data to fill up the segment and cause the creation of a new log segment. Additionally, please note that the values specified in KAFKA_LOG_RETENTION_BYTES and KAFKA_LOG_SEGMENT_BYTES are on a per topic per partition basis. So for a deployment with multiple partitions and topics, the total used storage will likely become quite large even for what may seem like small individual log and segment sizes. When configuring the KAFKA_LOG_SEGMENT_BYTES and KAFKA_LOG_RETENTION_BYTES variables. Make sure that the KAFKA_LOG_RETENTION_BYTES is larger than the KAFKA_LOG_SEGMENT_BYTES. If the value is not larger, Kafka will not be able to generate a new log segment before needing to rotate the logs which will lead to unpredictable behavior. + ### Configure Topic Creation The Kafka topics created by the `kafka-setup` service are configured in the [kafka-topics-values.yaml](jikkou/kafka-topics-values.yaml) file. The topics in that file are organized by the application, and sorted into "Stream Topics" (those with `cleanup.policy` = `delete`) and "Table Topics" (with `cleanup.policy` = `compact`). diff --git a/docker-compose-connect.yml b/docker-compose-connect.yml index ba59536..5cd5a22 100644 --- a/docker-compose-connect.yml +++ b/docker-compose-connect.yml @@ -78,6 +78,7 @@ services: CONNECT_CREATE_ODE: ${CONNECT_CREATE_ODE:-true} CONNECT_CREATE_GEOJSONCONVERTER: ${CONNECT_CREATE_GEOJSONCONVERTER:-true} CONNECT_CREATE_CONFLICTMONITOR: ${CONNECT_CREATE_CONFLICTMONITOR:-true} + CONNECT_CREATE_INTERSECTION_API: ${CONNECT_CREATE_INTERSECTION_API:-true} CONNECT_CREATE_DEDUPLICATOR: ${CONNECT_CREATE_DEDUPLICATOR:-false} CONNECT_CREATE_MECDEPOSIT: ${CONNECT_CREATE_MECDEPOSIT:-false} MONGO_CONNECTOR_USERNAME: ${MONGO_ADMIN_DB_USER:-admin} diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml index 3f56d77..9eb1f91 100644 --- a/docker-compose-kafka.yml +++ b/docker-compose-kafka.yml @@ -37,6 +37,7 @@ services: KAFKA_CFG_DELETE_TOPIC_ENABLE: "true" KAFKA_CFG_LOG_RETENTION_HOURS: ${KAFKA_LOG_RETENTION_HOURS:-3} KAFKA_CFG_LOG_RETENTION_BYTES: ${KAFKA_LOG_RETENTION_BYTES:-10737418240} + KAFKA_CFG_LOG_SEGMENT_BYTES: ${KAFKA_LOG_SEGMENT_BYTES:-1073741824} KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" logging: options: @@ -69,6 +70,8 @@ services: KAFKA_TOPIC_CREATE_CONFLICTMONITOR: ${KAFKA_TOPIC_CREATE_CONFLICTMONITOR:-true} KAFKA_TOPIC_CREATE_DEDUPLICATOR: ${KAFKA_TOPIC_CREATE_DEDUPLICATOR:-false} KAFKA_TOPIC_CREATE_MECDEPOSIT: ${KAFKA_TOPIC_CREATE_MECDEPOSIT:-false} + KAFKA_TOPIC_CREATE_INTERSECTION_API: ${KAFKA_TOPIC_CREATE_INTERSECTION_API:-true} + KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR: ${KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR:-false} KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:-kafka:9092} KAFKA_TOPIC_PARTITIONS: ${KAFKA_TOPIC_PARTITIONS:-1} diff --git a/docker-compose-mongo.yml b/docker-compose-mongo.yml index a1a6f98..96b1b77 100644 --- a/docker-compose-mongo.yml +++ b/docker-compose-mongo.yml @@ -64,6 +64,7 @@ services: - kafka_connect_standalone - mongo_full - mongo + - mongo_setup image: mongo:8 hostname: mongo_setup depends_on: @@ -92,10 +93,10 @@ services: MONGO_SAMPLE_DATA_RELATIVE_PATH: ${MONGO_SAMPLE_DATA_RELATIVE_PATH:-./mongo/dump} - MONGO_INDEX_CREATE_ODE: ${MONGO_INDEX_CREATE_ODE:-true} - MONGO_INDEX_CREATE_GEOJSONCONVERTER: ${MONGO_INDEX_CREATE_GEOJSONCONVERTER:-true} - MONGO_INDEX_CREATE_CONFLICTMONITOR: ${MONGO_INDEX_CREATE_CONFLICTMONITOR:-true} - MONGO_INDEX_CREATE_DEDUPLICATOR: ${MONGO_INDEX_CREATE_DEDUPLICATOR:-false} + CONNECT_CREATE_ODE: ${CONNECT_CREATE_ODE:-true} + CONNECT_CREATE_GEOJSONCONVERTER: ${CONNECT_CREATE_GEOJSONCONVERTER:-true} + CONNECT_CREATE_CONFLICTMONITOR: ${CONNECT_CREATE_CONFLICTMONITOR:-true} + CONNECT_CREATE_DEDUPLICATOR: ${CONNECT_CREATE_DEDUPLICATOR:-false} entrypoint: ["/bin/bash", "-c", "./setup_mongo.sh && ./restore_mongo.sh"] volumes: - ${MONGO_SETUP_SCRIPT_RELATIVE_PATH:-./mongo/setup_mongo.sh}:/setup_mongo.sh @@ -137,4 +138,4 @@ services: retries: 4 volumes: - mongo_data: \ No newline at end of file + mongo_data: diff --git a/docs/MongoDB_Configuration.md b/docs/MongoDB_Configuration.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/Storage_Management.md b/docs/Storage_Management.md new file mode 100644 index 0000000..1502fb1 --- /dev/null +++ b/docs/Storage_Management.md @@ -0,0 +1,60 @@ +# Storage Management + +The default configuration of the CV-Manager system is very data intensive and stores multiple forms of CV-Data redundantly in the database. While this provides some benefit for compliance and debugging purposes, it is also often a strain on limited cloud and storage budgets. Below are some tips and tricks for reducing the storage footprint of CV-Data + +## Stop syncing unused collections +By Default JPO-utils syncs a number of topics created by the ode, geojson-converter, Conflict Monitor and CV-Manager. However, many of these collections are unused by the CV-Manager system and therefore not required to be saved to the database. In particular, data formats such as the OdeSpatJson format which have lots of traffic can quickly become a storage burden on the system. + +1) To stop a topic from syncing to mongoDB users can modify the `kafka-connectors-values.yaml` file and remove entries. Make sure to remove the entire entry for the given topic. For example, to stop the OdeSpatJson topic from syncing remove the following: +``` +- topicName: topic.OdeSpatJson + collectionName: OdeSpatJson + generateTimestamp: true +``` + +Please note, that none of the Ode collections are required for running the GeoJson Converter, Conflict Monitor, or CV-Manager systems. The OdeSpatJson and OdeMapJson collections in particular should be deleted first as they generally contains significant data. + +2) Once a topic is deleted from the `kafka-connectors-values.yaml` file. Users can bring up the remaining connectors by rebuilding the kafka-setup container (see step 3). For existing deployments that already have the connectors created. The removed connectors will need to be removed manually. This can be done by manually deleting connectors using curl. +``` +curl -X DELETE "http://localhost:8083/connectors/sink.OdeSpatJson" >/dev/null 2>&1 +``` +Alternatively, users may delete all existing topics are recreate them. This may be faster if updating a large number of connectors. The [delete_connectors.sh](../kafka-connect/delete_connectors.sh) script will automatically delete all existing connectors. +``` +cd kafka-connect +./delete_connectors.sh +``` + +3) Once all the unused connectors have been deleted, reapply the connectors scripts to the cluster by rebuilding the kafka-connect-setup container. Make sure kafka_connect, kafka_connect_standalone, or kafka_connect_setup is specified in the COMPOSE_PROFILES variable of your .env file before rebuilding to ensure the setup container is rerun +``` +docker-compose up --build -d +``` + + + + +## Change MongoDB Data Retention Time +By default all collections in the default mongoDB are only retained for a set time window. Depending on deployment requirements, a shorter retention window may be desirable to reduce storage costs and improve database query speeds. Data retention is managed by adjusting TTL's on collections in mongoDB. Below are instructions to adjust the data retention of an existing mongoDB deployment. + +1) There are currently two environment variables that configure the data retention duration in mongoDB. The `MONGO_DATA_RETENTION_SECONDS` variable configures the number of seconds that data generated within the system should be retained for. Similarly, he `MONGO_ASN_RETENTION_SECONDS` variable configures the number of seconds that raw ASN.1 data should be retained for. By default Mongo Data is retained for 60 days, while ASN.1 data is configured for 24 hours. To change the data retention window, modify the .env file in the jpo-utils repo, or parent module and provide new values for these variables +``` +MONGO_DATA_RETENTION_SECONDS=604800 # 1 week +MONGO_ASN_RETENTION_SECONDS=3600 # 1 Hour +``` + +2) Once the variables are changed, rebuild the docker-setup container and let it run its setup procedure. Make sure that one of the following is specified in the `COMPOSE_PROFILES` variable to ensure the mongo setup container runs mongo, mongo_full, mongo_setup. Then run the following to reboot + +``` +docker-compose up --build -d +``` + +3) Once the indexes have been updated, verify that the change worked properly by checking the TTL index for the adjusted collections. This can be done by logging into the database using + +``` +mongosh -u -p --authenticationDatabase admin +use CV +db.ProcessedSpat.getIndexes(); +``` + +Alternatively, this may be checked in MongoDB compass + + diff --git a/jikkou/kafka-connectors-template.jinja b/jikkou/kafka-connectors-template.jinja index eb9a95d..cfb4d08 100644 --- a/jikkou/kafka-connectors-template.jinja +++ b/jikkou/kafka-connectors-template.jinja @@ -60,6 +60,10 @@ spec: {{ create_connector(values.apps.conflictmonitor) }} {% endif %} +{% if system.env.CONNECT_CREATE_INTERSECTION_API %} +{{ create_connector(values.apps.intersection_api) }} +{% endif %} + {% if system.env.CONNECT_CREATE_DEDUPLICATOR %} {{ create_connector(values.apps.deduplicator) }} {% else %} diff --git a/jikkou/kafka-connectors-values.yaml b/jikkou/kafka-connectors-values.yaml index 3e994eb..e0b5f6b 100644 --- a/jikkou/kafka-connectors-values.yaml +++ b/jikkou/kafka-connectors-values.yaml @@ -46,15 +46,6 @@ apps: ode: name: jpo-ode connectors: - - topicName: topic.OdeRawEncodedBSMJson - collectionName: OdeRawEncodedBSMJson - generateTimestamp: true - - topicName: topic.OdeRawEncodedMAPJson - collectionName: OdeRawEncodedMAPJson - generateTimestamp: true - - topicName: topic.OdeRawEncodedSPATJson - collectionName: OdeRawEncodedSPATJson - generateTimestamp: true - topicName: topic.OdeSpatJson collectionName: OdeSpatJson generateTimestamp: true @@ -67,33 +58,27 @@ apps: - topicName: topic.OdeTIMCertExpirationTimeJson collectionName: OdeTIMCertExpirationTimeJson generateTimestamp: true - - topicName: topic.OdeRawEncodedPSMJson - collectionName: OdeRawEncodedPSMJson - generateTimestamp: true - topicName: topic.OdePsmJson collectionName: OdePsmJson generateTimestamp: true - - topicName: topic.OdeRawEncodedSRMJson - collectionName: OdeRawEncodedSRMJson - generateTimestamp: true - topicName: topic.OdeSrmJson collectionName: OdeSrmJson generateTimestamp: true - - topicName: topic.OdeRawEncodedSSMJson - collectionName: OdeRawEncodedSSMJson - generateTimestamp: true - topicName: topic.OdeSsmJson collectionName: OdeSsmJson generateTimestamp: true - topicName: topic.OdeDriverAlertJson collectionName: OdeDriverAlertJson generateTimestamp: true - - topicName: topic.OdeRawEncodedSDSMJson - collectionName: OdeRawEncodedSDSMJson - generateTimestamp: true - topicName: topic.OdeSdsmJson collectionName: OdeSdsmJson generateTimestamp: true + - topicName: topic.OdeRtcmJson + collectionName: OdeRtcmJson + generateTimestamp: true + - topicName: topic.OdeRsmJson + collectionName: OdeRsmJson + generateTimestamp: true ode_duplicated: name: ode-duplicated connectors: @@ -106,9 +91,6 @@ apps: - topicName: topic.OdeBsmJson collectionName: OdeBsmJson generateTimestamp: true - - topicName: topic.OdeRawEncodedTIMJson - collectionName: OdeRawEncodedTIMJson - generateTimestamp: true geojsonconverter_duplicated: name: geojsonconverter-duplicated connectors: @@ -121,6 +103,13 @@ apps: - topicName: topic.ProcessedBsm collectionName: ProcessedBsm generateTimestamp: true + intersection_api: + name: intersection_api + connectors: + - topicName: topic.IntersectionApiRsuStatus + collectionName: IntersectionApiRsuStatus + useTimestamp: true + timestampField: timestamp deduplicator: name: deduplicator connectors: @@ -136,10 +125,6 @@ apps: collectionName: OdeTimJson generateTimestamp: true connectorName: DeduplicatedOdeTimJson - - topicName: topic.DeduplicatedOdeRawEncodedTIMJson - collectionName: OdeRawEncodedTIMJson - generateTimestamp: true - connectorName: DeduplicatedOdeRawEncodedTIMJson - topicName: topic.DeduplicatedOdeBsmJson collectionName: OdeBsmJson generateTimestamp: true @@ -226,6 +211,18 @@ apps: collectionName: CmBsmMessageCountProgressionEvents useTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentEvent + collectionName: CmRevocableEnabledLaneAlignmentEvent + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmRtcmBroadcastRateEvent + collectionName: CmRtcmBroadcastRateEvent + useTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmRtcmMinimumDataEvent + collectionName: CmRtcmMinimumDataEvent + useTimestamp: true + timestampField: eventGeneratedAt - topicName: topic.CmSpatMinimumDataEventAggregation collectionName: CmSpatMinimumDataEventAggregation generateTimestamp: true @@ -266,6 +263,16 @@ apps: collectionName: CmSpatMessageCountProgressionEventAggregation generateTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentEventAggregation + collectionName: CmRevocableEnabledLaneAlignmentEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + - topicName: topic.CmRtcmMinimumDataEventAggregation + collectionName: CmRtcmMinimumDataEventAggregation + generateTimestamp: true + timestampField: eventGeneratedAt + + # Record BSM events: - topicName: topic.CmBsmEvents @@ -350,6 +357,10 @@ apps: collectionName: CmEventStateProgressionNotification generateTimestamp: true timestampField: eventGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentNotification + collectionName: CmRevocableEnabledLaneAlignmentNotification + generateTimestamp: true + timestampField: eventGeneratedAt - topicName: topic.CmIntersectionReferenceAlignmentNotificationAggregation collectionName: CmIntersectionReferenceAlignmentNotificationAggregation generateTimestamp: true @@ -370,6 +381,11 @@ apps: collectionName: CmEventStateProgressionNotificationAggregation generateTimestamp: true timestampField: notificationGeneratedAt + - topicName: topic.CmRevocableEnabledLaneAlignmentNotificationAggregation + collectionName: CmRevocableEnabledLaneAlignmentNotificationAggregation + generateTimestamp: true + timestampField: notificationGeneratedAt + mecdeposit: name: mecdeposit connectors: diff --git a/jikkou/kafka-topics-template.jinja b/jikkou/kafka-topics-template.jinja index 66fd363..bb1b736 100644 --- a/jikkou/kafka-topics-template.jinja +++ b/jikkou/kafka-topics-template.jinja @@ -72,6 +72,10 @@ spec: {{ create_topics(values.apps.conflictmonitor) }} {% endif %} +{% if system.env.KAFKA_TOPIC_CREATE_INTERSECTION_API %} +{{ create_topics(values.apps.intersection_api) }} +{% endif %} + {% if system.env.KAFKA_TOPIC_CREATE_DEDUPLICATOR %} {{ create_topics(values.apps.deduplicator) }} {% endif %} @@ -80,6 +84,10 @@ spec: {{ create_topics(values.apps.mecdeposit) }} {% endif %} +{% if system.env.KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR %} +{{ create_topics(values.apps.rsustatusmonitor) }} +{% endif %} + {% if system.env.KAFKA_TOPIC_CREATE_OTHER %} {{ create_topics(values.apps.other) }} {% endif %} diff --git a/jikkou/kafka-topics-values.yaml b/jikkou/kafka-topics-values.yaml index 7db3a19..1717dd6 100644 --- a/jikkou/kafka-topics-values.yaml +++ b/jikkou/kafka-topics-values.yaml @@ -46,19 +46,12 @@ apps: ode: name: jpo-ode streamTopics: - - topic.OdeBsmPojo - - topic.OdeSpatTxPojo - - topic.OdeSpatPojo - topic.OdeSpatJson - topic.FilteredOdeSpatJson - - topic.OdeSpatRxJson - - topic.OdeSpatRxPojo - topic.OdeBsmJson - topic.FilteredOdeBsmJson - topic.OdeTimJson - topic.OdeTimJsonTMCFiltered - - topic.OdeTimBroadcastJson - - topic.J2735TimBroadcastJson - topic.OdeTimKTableJson - topic.FilteredOdeTimJson - topic.OdeDriverAlertJson @@ -72,20 +65,19 @@ apps: - topic.OdeRawEncodedSPATJson - topic.OdeRawEncodedTIMJson - topic.OdeRawEncodedMAPJson - - topic.OdeMapTxPojo - topic.OdeMapJson - topic.OdeRawEncodedSSMJson - - topic.OdeSsmPojo - topic.OdeSsmJson - topic.OdeRawEncodedSRMJson - - topic.OdeSrmTxPojo - topic.OdeSrmJson - topic.OdeRawEncodedPSMJson - - topic.OdePsmTxPojo - topic.OdePsmJson - - topic.OdeTimRxJson - topic.OdeRawEncodedSDSMJson - topic.OdeSdsmJson + - topic.OdeRawEncodedRTCMJson + - topic.OdeRtcmJson + - topic.OdeRawEncodedRSMJson + - topic.OdeRsmJson tableTopics: {} customTopics: {} geojsonconverter: @@ -95,6 +87,7 @@ apps: - topic.ProcessedMap - topic.ProcessedMapWKT - topic.ProcessedBsm + - topic.ProcessedRtcm tableTopics: {} customTopics: {} conflictmonitor: @@ -123,6 +116,7 @@ apps: - topic.CmMapMessageCountProgressionEvents - topic.CmSpatMessageCountProgressionEvents - topic.CmEventStateProgressionEvent + - topic.CmRevocableEnabledLaneAlignmentEvent - topic.CmSpatMinimumDataEventAggregation - topic.CmMapMinimumDataEventAggregation - topic.CmIntersectionReferenceAlignmentEventAggregation @@ -133,6 +127,7 @@ apps: - topic.CmBsmMessageCountProgressionEventAggregation - topic.CmMapMessageCountProgressionEventAggregation - topic.CmSpatMessageCountProgressionEventAggregation + - topic.CmRevocableEnabledLaneAlignmentEventAggregation tableTopics: - topic.CmLaneDirectionOfTravelNotification - topic.CmConnectionOfTravelNotification @@ -160,13 +155,20 @@ apps: - topic.CmTimestampDeltaNotification - topic.CmSpatTransitionNotification - topic.CmEventStateProgressionNotification + - topic.CmRevocableEnabledLaneAlignmentNotification - topic.CmIntersectionReferenceAlignmentNotificationAggregation - topic.CmSignalGroupAlignmentNotificationAggregation - topic.CmSignalStateConflictNotificationAggregation - topic.CmSpatTimeChangeDetailsNotificationAggregation - topic.CmEventStateProgressionNotificationAggregation + - topic.CmRevocableEnabledLaneAlignmentNotificationAggregation customTopics: {} + intersection_api: + name: intersection_api + customTopics: + - topicName: topic.IntersectionApiRsuStatus + retentionMs: 43200000 deduplicator: name: jpo-deduplicator streamTopics: {} @@ -180,8 +182,6 @@ apps: retentionMs: 43200000 - topicName: topic.DeduplicatedOdeTimJson retentionMs: 43200000 - - topicName: topic.DeduplicatedOdeRawEncodedTIMJson - retentionMs: 43200000 - topicName: topic.DeduplicatedOdeBsmJson retentionMs: 43200000 - topicName: topic.DeduplicatedProcessedBsm @@ -208,6 +208,13 @@ apps: - topic.MecDepositMetrics tableTopics: {} customTopics: {} + rsustatusmonitor: + name: rsu-status-monitor + streamTopics: + - topic.RmNearestNeighborUnresponsiveEvent + tableTopics: + - topic.RmIntersectionStatusRecords + customTopics: {} other: name: other-topics streamTopics: {} diff --git a/kafka-connect/delete_connectors.sh b/kafka-connect/delete_connectors.sh new file mode 100644 index 0000000..149cc89 --- /dev/null +++ b/kafka-connect/delete_connectors.sh @@ -0,0 +1,15 @@ +CONNECT_URL="http://localhost:8083" +# Get the list of connectors +connectors=$(curl -s "$CONNECT_URL/connectors") + +# Remove brackets from the JSON array +connectors=${connectors:1:-1} +# Split the connectors into an array +IFS=', ' read -r -a connector_array <<< "$connectors" +# Loop through each connector and delete it +for connector in "${connector_array[@]}"; do + connector_name=$(echo "$connector" | tr -d '"') + echo "Deleting connector: $connector_name" + curl -X DELETE "$CONNECT_URL/connectors/$connector_name" >/dev/null 2>&1 +done +echo "All connectors deleted." else echo "No connectors found." \ No newline at end of file diff --git a/mongo/create_indexes.js b/mongo/create_indexes.js index b48ce28..0344f5d 100644 --- a/mongo/create_indexes.js +++ b/mongo/create_indexes.js @@ -38,6 +38,7 @@ const CONNECT_CREATE_ODE = process.env['CONNECT_CREATE_ODE'] || true; const CONNECT_CREATE_GEOJSONCONVERTER = process.env['CONNECT_CREATE_GEOJSONCONVERTER'] || true; const CONNECT_CREATE_CONFLICTMONITOR = process.env['CONNECT_CREATE_CONFLICTMONITOR'] || true; const CONNECT_CREATE_DEDUPLICATOR = process.env['CONNECT_CREATE_DEDUPLICATOR'] || true; +const CONNECT_INDEX_CREATE_INTERSECTION_API = process.env['CONNECT_CREATE_INTERSECTION_API'] || true; const users = [ @@ -61,7 +62,7 @@ const odeCollections = [ // ODE Json data {name: "OdeDriverAlertJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeBsmJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, - {name: "OdeBsmJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, + {name: "OdeBsmJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds, additionalIndexes: [{"metadata.odeReceivedAt": -1, "payload.data.coreData.position.latitude": 1, "payload.data.coreData.position.longitude": 1}]}, {name: "OdeMapJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeMapJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeSpatJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, @@ -74,21 +75,13 @@ const odeCollections = [ {name: "OdeTimJson", "timeField": "recordGeneratedAt", rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeTimBroadcastJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, {name: "OdeTIMCertExpirationTimeJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: expireSeconds}, - - // Ode Raw ASN - {name: "OdeRawEncodedBSMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedMAPJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedSPATJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedSRMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedSSMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, - {name: "OdeRawEncodedTIMJson", ttlField: "recordGeneratedAt", "timeField": "metadata.odeReceivedAt", intersectionField: null, rsuIP:"metadata.originIp", expireTime: ttlExpireSeconds}, ]; // GeoJson Converter Data const geoJsonConverterCollections = [ {name: "ProcessedMap", ttlField: "recordGeneratedAt", timeField: "properties.timeStamp", intersectionField: "properties.intersectionId", expireTime: expireSeconds}, {name: "ProcessedSpat", ttlField: "recordGeneratedAt", timeField: "utcTimeStamp", intersectionField: "intersectionId", expireTime: expireSeconds}, - {name: "ProcessedBsm", ttlField: "recordGeneratedAt", timeField: "timeStamp", geoSpatialField: "geometry.coordinates", expireTime: expireSeconds}, + {name: "ProcessedBsm", ttlField: "recordGeneratedAt", timeField: "timeStamp", geoSpatialField: "geometry.coordinates", expireTime: expireSeconds} ]; @@ -118,6 +111,7 @@ const conflictMonitorCollections = [ { name: "CmSpatMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmMapMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmBsmMessageCountProgressionEvents", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmRevocableEnabledLaneAlignmentEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatMinimumDataEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmMapMinimumDataEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, @@ -130,7 +124,7 @@ const conflictMonitorCollections = [ { name: "CmBsmMessageCountProgressionEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmMapMessageCountProgressionEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatMessageCountProgressionEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, - + { name: "CmRevocableEnabledLaneAlignmentEventAggregation", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, // Conflict Monitor Assessments @@ -152,6 +146,7 @@ const conflictMonitorCollections = [ { name: "CmTimestampDeltaNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatTransitionNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmEventStateProgressionNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, + { name: "CmRevocableEnabledLaneAlignmentNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmNotification", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmEventStateProgressionNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, @@ -160,15 +155,17 @@ const conflictMonitorCollections = [ { name: "CmSignalStateConflictNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmTimeChangeDetailsNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, { name: "CmSpatTimeChangeDetailsNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, - - - + { name: "CmRevocableEnabledLaneAlignmentNotificationAggregation", ttlField: "notificationGeneratedAt", timeField: "notificationGeneratedAt", intersectionField: "intersectionID", expireTime: expireSeconds }, // Reports { name: "CmReport", timeField: "reportGeneratedAt", intersectionField: "intersectionID"}, ]; +let intersectionAPICollections = [ + { name: "IntersectionApiRsuStatus", timeField: "timestamp", intersectionField: "intersectionID", rsuIP:"rsuIP"}, +]; + let collections = []; if(CONNECT_CREATE_ODE){ @@ -183,6 +180,10 @@ if(CONNECT_CREATE_CONFLICTMONITOR){ collections = collections.concat(conflictMonitorCollections); } +if(CONNECT_INDEX_CREATE_INTERSECTION_API){ + collections = collections.concat(intersectionAPICollections); +} + try{ @@ -231,6 +232,7 @@ do { createTimeIntersectionIndex(collection); createTimeRsuIpIndex(collection); createGeoSpatialIndex(collection); + createAdditionalIndexes(collection); }else{ missing_collection_count++; console.log("Collection " + collection['name'] + " does not exist yet"); @@ -439,6 +441,24 @@ function createGeoSpatialIndex(collection){ } +function createAdditionalIndexes(collection){ + for (const additionalIndex of collection['additionalIndexes'] || []) { + console.log("Creating additional index for " + collectionName + " with index: " + JSON.stringify(additionalIndex)); + const index_name = Object.entries(additionalIndex).map(([key, value]) => `${key}_${value}`).join("_"); + if (indexExists(collection['name'], index_name)) { + continue; + } + try { + db[collectionName].createIndex(additionalIndex); + console.log("Created additional index for " + collectionName + " with index: " + JSON.stringify(additionalIndex)); + } catch (err) { + console.log("Failed to create additional index for " + collectionName + " with index: " + JSON.stringify(additionalIndex)); + console.log(err); + } + } + +} + function ttlIndexExists(collection) { return db[collection['name']].getIndexes().find((idx) => idx.hasOwnProperty("expireAfterSeconds")) !== undefined; } @@ -458,3 +478,7 @@ function timeIndexExists(collection){ function geoSpatialIndexExists(collection){ return db[collection['name']].getIndexes().find((idx) => idx.name == collection['geoSpatialField'] + "_2dsphere_timeStamp_-1") !== undefined; } + +function indexExists(collectionName, indexName){ + return db[collectionName].getIndexes().find((idx) => idx.name == indexName) !== undefined; +} diff --git a/mongo/setup_mongo.sh b/mongo/setup_mongo.sh index c6f0a77..7ac783a 100755 --- a/mongo/setup_mongo.sh +++ b/mongo/setup_mongo.sh @@ -1,6 +1,7 @@ #!/bin/bash until mongosh --host mongo:27017 --eval 'quit(db.runCommand({ ping: 1 }).ok ? 0 : 2)' &>/dev/null; do + echo "Waiting for valid Ping from MongoDB" sleep 1 done diff --git a/mongo/storage_checker.js b/mongo/storage_checker.js new file mode 100644 index 0000000..b56981c --- /dev/null +++ b/mongo/storage_checker.js @@ -0,0 +1,22 @@ +var collections = db.getCollectionNames(); +let totalAllocatedStorage = 0; +let totalFreeSpace = 0; + +for (var i = 0; i < collections.length; i++) { + let stats = db.getCollection(collections[i]).stats(); + let colStats = db.runCommand({"collstats": collections[i]}); + let blockManager = colStats["wiredTiger"]["block-manager"]; + let freeSpace = Number(blockManager["file bytes available for reuse"]); + let allocatedStorage = Number(blockManager["file size in bytes"]); + let indexSize = Number(stats.totalIndexSize); + + totalAllocatedStorage += allocatedStorage + indexSize; + totalFreeSpace += freeSpace; + + print(collections[i], allocatedStorage/1024/1024/1024, freeSpace/1024/1024/1024, indexSize/1024/1024/1024); + +} + +print("Total On Disk Storage: " + (totalAllocatedStorage + totalFreeSpace) / 1024 / 1024 / 1024 + " GB"); +print("Total Free Space: " + totalFreeSpace / 1024 / 1024 / 1024 + " GB"); +print("Effective Database Size: " + totalAllocatedStorage/ 1024 / 1024 / 1024 + " GB"); \ No newline at end of file diff --git a/sample.env b/sample.env index ea2d33d..eaafb11 100644 --- a/sample.env +++ b/sample.env @@ -43,6 +43,7 @@ COMPOSE_PROFILES=all KAFKA_BOOTSTRAP_SERVERS=${DOCKER_HOST_IP}:9092 KAFKA_LOG_RETENTION_HOURS=3 KAFKA_LOG_RETENTION_BYTES=10737418240 # 10GB +KAFKA_LOG_SEGMENT_BYTES=1073741824 # 1GB. Must be less than KAFKA_LOG_RETENTION_BYTES # Variables for creating kafka topics: KAFKA_TOPIC_PARTITIONS=1 @@ -54,8 +55,10 @@ KAFKA_TOPIC_DELETE_RETENTION_MS=3600000 KAFKA_TOPIC_CREATE_ODE=true # Create topics for ODE KAFKA_TOPIC_CREATE_GEOJSONCONVERTER=true # Create topics for GeoJSON Converter KAFKA_TOPIC_CREATE_CONFLICTMONITOR=true # Create topics for Conflict Monitor +KAFKA_TOPIC_CREATE_INTERSECTION_API=true # Create topics for Intersection API KAFKA_TOPIC_CREATE_DEDUPLICATOR=false # Create topics for Deduplicator KAFKA_TOPIC_CREATE_MECDEPOSIT=false # Create topics for MecDeposit +KAFKA_TOPIC_CREATE_RSUSTATUSMONITOR=false # Create topics for RsuStatusMonitor KAFKA_TOPIC_CREATE_OTHER=false # Create topics for other applications # Relative path to the Kafka topics values file, upper level directories are supported @@ -108,11 +111,6 @@ MONGO_DATABASE_COMPACTION_TRIGGER_PERCENT=MONGO_DATABASE_COMPACTION_TRIGGER_PERC MONGO_ENABLE_STORAGE_RECORD=true MONGO_ENABLE_DYNAMIC_TTL=true -MONGO_INDEX_CREATE_ODE=true # Create indexes for ODE -MONGO_INDEX_CREATE_GEOJSONCONVERTER=true # Create indexes for GeoJSON Converter -MONGO_INDEX_CREATE_CONFLICTMONITOR=true # Create indexes for Conflict Monitor -MONGO_INDEX_CREATE_DEDUPLICATOR=false # Create indexes for Deduplicator - # Relative path to the MongoDB init script, upper level directories are supported MONGO_SETUP_SCRIPT_RELATIVE_PATH="./mongo/setup_mongo.sh" MONGO_RESTORE_SCRIPT_RELATIVE_PATH="./mongo/restore_mongo.sh" @@ -139,6 +137,7 @@ CONNECT_TASKS_MAX=1 # Number of concurrent tasks to configur CONNECT_CREATE_ODE=true # Create kafka connectors to MongoDB for ODE CONNECT_CREATE_GEOJSONCONVERTER=true # Create kafka connectors to MongoDB for GeoJSON Converter CONNECT_CREATE_CONFLICTMONITOR=true # Create kafka connectors to MongoDB for Conflict Monitor +CONNECT_CREATE_INTERSECTION_API=true # Create kafka Connectors to MongoDB for Intersection API CONNECT_CREATE_DEDUPLICATOR=false # Create kafka connectors to MongoDB for Deduplicator CONNECT_CREATE_MECDEPOSIT=false # Create kafka connectors to MongoDB for MecDeposit CONNECT_CREATE_OTHER=false # Create kafka connectors to MongoDB for other applications