This is a very simple PoC (Proof of Concept) of anomaly detection using Apache Kafka® Streams.
The aims are:
- Be minimalistic - don't try for anything other than a minimal concept of "anomaly". Real anomaly detection can be quite complex, including use of windowing techniques, cross comparison between different fields, and many other things which are determined by the particular requirements.
- Use Apache Kafka Streams® to read messages from one topic, and write anomalous messages to another.
- Message values are encoded using Avro and the Confluent convention on binding a schema id to the start of each message (see the Confluent Wire format documentation for details of how this works).
- A compatible schema registry is assumed.
- A single field in the message value (specified by name) is compared to maximum and minimum bounds.
- Only
int(32 bit signed) andlong(64 bit signed) fields are checkable, and the bounds are specified asints. - Messages where the value is out of range (less than the lower bound or more than the higher bound) will be sent to the output topic.
- Messages where the field is missing will be regarded as anomalies, and sent to the output topic.
- The application is designed to be run in a container. A
Dockerfileand associated run scripts (run.shandsetup_auth) are provided. - A script is provided to generate sample data.
- Instructions are given for using the app with an Aiven for Kafka service and the Karapace schema registry.
The main code is [AnomalyDetectorApp.java] (app/src/main/java/org/example/AnomalyDetectorApp.java)
The project uses Gradle and Groovy for configuration and building.
Download the URL and the certificates for the Kafka service.
Set an environment variable for the Kafka service URL - something like
export KAFKA_BOOTSTRAP_SERVERS=<service uri>or in the Fish shell
set -x KAFKA_BOOTSTRAP_SERVERS <service uri>Do the same for the schema registry URL and password (the program
defaults to the standard schema Karapace username of avnadmin, so we don't
need to specify that).
export SCHEMA_REGISTRY_URL=<schema registry url>export SCHEMA_REGISTRY_PASSWORD=<schema registry password>(the Fish shell equivalents are left as an exercise for Fish shell users:).)
Set an environment variable to the content of each certificate file.
Typically,
- Download the certificate files for the Kafka service (
ca.pem,service. certandservice.key). For an Aiven for Kafka service you can do this from the Connection information in the service Overview.- Put the files into a directory called
certsand use one of the convenience shell scripts to read the content of those files and set the environment variables:or for Fishsource prep.shsource prep.fish
Build the container image:
docker build -t appimage .Run the container image - for instance
docker run -d --name kafka-streams-container -p 3000:3000 \
-e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
-e KAFKA_CA_CERT="$KAFKA_CA_CERT" \
-e KAFKA_ACCESS_CERT="$KAFKA_ACCESS_CERT" \
-e KAFKA_ACCESS_KEY="$KAFKA_ACCESS_KEY" \
-e SCHEMA_REGISTRY_URL=$SCHEMA_REGISTRY_URL \
-e SCHEMA_REGISTRY_USERNAME=$SCHEMA_REGISTRY_USERNAME \
-e SCHEMA_REGISTRY_PASSWORD=$SCHEMA_REGISTRY_PASSWORD \
-e INPUT_TOPIC=metric_data \
-e OUTPUT_TOPIC=anomaly_daya \
-e FIELD_NAME=temperature \
-e MIN_BOUND=-50 \
-e MAX_BOUND=-30 \
-e EXACTLY_ONCE=false \
appimage
appimage...or if you have environment variables for the topics names, the field name and the bounds:
docker run -d --name kafka-streams-container -p 3000:3000 \
-e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
-e KAFKA_CA_CERT="$KAFKA_CA_CERT" \
-e KAFKA_ACCESS_CERT="$KAFKA_ACCESS_CERT" \
-e KAFKA_ACCESS_KEY="$KAFKA_ACCESS_KEY" \
-e SCHEMA_REGISTRY_URL=$SCHEMA_REGISTRY_URL \
-e SCHEMA_REGISTRY_USERNAME=$SCHEMA_REGISTRY_USERNAME \
-e SCHEMA_REGISTRY_PASSWORD=$SCHEMA_REGISTRY_PASSWORD \
-e INPUT_TOPIC=$INPUT_TOPIC \
-e OUTPUT_TOPIC=$OUTPUT_TOPIC \
-e FIELD_NAME=$FIELD_NAME \
-e MIN_BOUND=$MIN_BOUND \
-e MAX_BOUND=-$MAX_BOUND \
-e EXACTLY_ONCE=false \
appimageWe don't actually use the port for anything at the moment.
Some of those environment variable arguments have defaults, so you can leave them off if you're happy with the default:
SCHEMA_REGISTRY_USERNAME- bothrun.shand the Java code default this toavnadminEXACTLY_ONCE-run.shdefaults this tofalse.
All variants of the Java app take the following arguments (of course
OUTPUT_TOPIC is not used by the Log app). Common code to handle these is in
Config.java. The names chosen
match the environment variables used by the container file and run.sh.
-DKAFKA_BOOTSTRAP_SERVERS- the URL for the Kafka service.-DKAFKA_CA_CERT- the contents of theca.pemfile-DKAFKA_ACCESS_CERT- the contents of theservice.certfile-DKAFKA_ACCESS_KEY- the contents of theservice.keyfile-DSCHEMA_REGISTRY_URL- the URL for the schema registry.-DSCHEMA_REGISTRY_USERNAME- the user name for accessing the schema registry. This defaults toavnadmin, which is the default user name for Karapace.-DSCHEMA_REGISTRY_PASSWORD- the password for accessing the schema registry-DINPUT_TOPIC- the input topic name.-DOUTPUT_TOPIC- the output topic name, for anomalous messages.-DFIELD_NAME- the name of the field to check in the message value.-DMIN_BOUND- the minimumintvalue for that field - any messages with a value below this are anomalous.-DMAX_BOUND- the maximumintvalue for that field - any messages with a value above this are anomalous.-DEXACTLY_ONCE- request exactly once semantics. A value oftruerequests exactly once semantics, a value offalse, an empty string or the absence of this property does not. The value is case insensitive. Any other value is an error.
It is arguable that the topic names, field names, bounds and exactly once flag should be "proper" command line (switch) arguments, instead of being set by environment variables, but since this app is only intended to be run via a container file, the distinction doesn't seem worth pursuing.
This is a two stage container file.
The APP_NAME variable determines which app is being built and run. It
is set to AnomalyDetectorApp. Using this saves repeating the app name
throughout the container file.
The first stage builds a fat (uber) JAR for the program. This minimises the size of the executable to be passed to the second stage.
It uses jdeps and jlink to work out the dependencies that are not in the
JAR file, and extract a minimum JRE from the larger JRE in provided by the
operating system used in that first stage.
The second stage then downloads rocksdb (used by Kafka Streams).
It then copies over the minimal JRE prepared in the first stage, and the fat
JAR itself, as well as the run.sh and setup_auth.sh files, and
finally runs the run.sh script.
The run.sh file expects the following environment variables as input
you'll recognise all but APP_NAME from the instructions on running the
container and the Java app itself). The "For instance" examples work if you're
using the produce_avro.py script to produce sample data.
KAFKA_BOOTSTRAP_SERVERS- the URL of the Kafka service we're usingKAFKA_CA_CERT- the contents of theca.pemfileKAFKA_ACCESS_CERT- the contents of theservice.certfileKAFKA_ACCESS_KEY- the contents of theservice.keyfileSCHEMA_REGISTRY_URL- the URL for the schema registrySCHEMA_REGISTRY_USERNAME- the user name for accessing the schema registry. This is optional and if it is not given, a value ofavnadminwill be assumedSCHEMA_REGISTRY_PASSWORD- the password for accessing the schema registryINPUT_TOPIC- the input topic name. has a sensible default,metric_data.OUTPUT_TOPIC- the output topic name, for anomalous messages. For instance,anomaly_data.
FIELD_NAME- the name of the field to check in the message value. For instance,temperature.MIN_BOUND- the minimumintvalue for that field - any messages with a value below this are anomalous. For instance,-50.MAX_BOUND- the maximumintvalue for that field - any messages with a value above this are anomalous. For instance,-30
EXACTLY_ONCE- whether exactly once semantics is wanted. This is optional and if it is not given, defaults tofalse. Requesttrueif you want exactly once semantics.APP_NAME- the name of the application to run. This will be set toAnomalyDetectorAppby the container file.
It sources the setup_auth.sh script which makes sure that the
KAFKA_CA_CERT, KAFKA_ACCESS_CERT and KAFKA_ACCESS_KEY
environment variables contain data that is correctly split into lines.
Finally the run.sh script runs the fat Java JAR with the necessary
arguments.
We use a fat (uber) JAR in the container, so that all of the programs non-standard dependencies (the ones not provided by the JRE) are frozen into the final executable.
You can build that fat JAR file with
gradle AnomalyDetectorAppUberJar(See app/build.gradle for the definition of the UberJar task.)
If you want to run the app using the provided run.sh script, then you'll
also need to copy the result to the top-level directory
cp app/build/libs/AnomalyDetectorApp-uber.jar .With a normal Kafka Streams application, it is possible that a message might be processed once, more than once, or never at all (networks are unreliable, services can crash, and so on).
Exactly once semantics (EOS) in Kafka Streams guarantees that each message will be processed once, no more and no less. It's been available since 2017.
There is quite a lot of good documentation about exactly once semantics for Kafka Streams - the following is by no means an exhaustive list.
See Confluent's Exactly-Once Semantics Are Possible: Here’s How Kafka Does It (2017/2025) for a good introduction to how this works.
The Kafka Streams Core Concepts document (this link is for Apache Kafka 4.1.x which is current at time of writing) is also pretty good. It talks about exactly once in the Processing Guarantees section.
Zeinab Dashti's Developer Guide to Achieve Transactional Processing in Kafka Streams (2025) is good at the pragmatics, and notes that:
When
processing.guarantee=exactly_once_v2is set, Kafka Streams automatically enforces the required producer and consumer configurations:enable.idempotence=true (on the Kafka producer) isolation.level=read_committed (on the Kafka consumer)You don’t need to set these manually — doing so isn’t harmful if you match the required values, but Kafka Streams will log a warning or ignore conflicting settings.
and
Any external consumer reading from Kafka output topics that are written transactionally must configure:
isolation.level=read_committedKafka Streams enforces this by default within its topology, but it must be set explicitly for standalone consumers (e.g., Kafka Connect, other microservices). Without it, consumers could read uncommitted or aborted records, which may result in data duplication or inconsistency.
There are minimal unit tests for the application.
Run them with, for instance:
gradle clean cleanTest testIt's possible to run the run.sh script locally, and indeed this is useful
for testing. It's important to remember to
- Copy the built app into the same directory as the
run.shscript - Set the required various environment variables first - these are also
documented at the top of the
run.shfile.
For instance
./run.shor, to override the OUTPUT_TOPIC environment variable for this run
OUTPUT_TOPIC=bad_messages ./run.shThe program produce_avro.py can be used to write sample data to Kafka.
It expects uv to be installed.
Assuming the environment variables needed to run the container file / run. sh / Java application are all set up, and the certification files are in
the certs directory, you can generate 500 sample messages with
./produce_avro.pyTo find out more about how the program can be used, try
./produce_avro.py --helpNote For trying out this Kafka Streams app, a free Aiven for Kafka service will work just fine. The instructions below show how to use that, as well as how to use a paid service if that's more suitable.
It's possible to do everything in this section using the Aiven web
console, but for documentation purposes here I
shall use the avn command line tool.
Since avn is a Python tool, make sure you're in a virtual environment and
download it:
python -m venv venv
source venv/bin/activate # If you're using fish, activate.fish
pip install aiven-clientRetrieve an Aiven session token (see the documentation) and login, using the email address you logged in to the console with, and pasting the token when prompted:
avn user login <your-email> --tokenFor convenience, set the project to your current project - this means you don't have to specify it on every command:
avn project switch <project-name>Set an environment variable for the service name - perhaps something like "kafka-streams-example"
export KAFKA_SERVICE_NAME=<service name>or for Fish shell
set -x KAFKA_SERVICE_NAME <service name>Create the Aiven for Kafka service. We'll show how to create a free or paid service. There are notes about each command after the command.
-
For trying out this app, a free Aiven for Kafka service will work just fine. Create the service using the following command:
avn service create $KAFKA_SERVICE_NAME \ --service-type kafka \ --cloud do-ams \ --plan free-0 \ -c schema_registry=true \ -c kafka.auto_create_topics_enable=trueNotes
- The details of how the free cloud and plan are specified at the command line may change. This is one case where it's actually simpler to do this in the Aiven web console, as there you just choose the free Kafka tier and then what part of the world you want.
-c schema_registry=truesays we want to enable the Karapace schema registry. This is also free, and we need it to handle Avro messages.-c kafka.auto_create_topics_enable=truesays we want producers to be able to create topics. You don't want this in production, but it's often a good idea in development, and it means the output topics will get created as we need them.
-
If you prefer (or if you're already using your free Aiven for Kafka service for something else and don't want to add new topics to it), you can instead create a paid service. For that, use a command like the following:
avn service create $KAFKA_SERVICE_NAME \ --service-type kafka \ --cloud aws-eu-west-1 \ --plan startup-4 \ --no-project-vpc \ -c schema_registry=true \ -c kafka.auto_create_topics_enable=trueNotes
- Choose a cloud and plan that match your needs. There's no need to go
for anything above the minimum plan (
startup-4in this case). - In the case of this cloud and region, I knew there was a VPC (virtual private cloud) available to my organization, so I needed to tell the command I did not want to use it. It doesn't hurt to specify th
- The last two switches are the same as in the free example above.
- Choose a cloud and plan that match your needs. There's no need to go
for anything above the minimum plan (
While that's running, get the service URL for the new service
export KAFKA_BOOTSTRAP_SERVERS=$(avn service get $KAFKA_SERVICE_NAME --format '{service_uri}')or for Fish shell
set -x KAFKA_BOOTSTRAP_SERVERS (avn service get $KAFKA_SERVICE_NAME --format '{service_uri}')Get the schema registry (Karapace) URL
export SCHEMA_REGISTRY_URL=$(avn service get $KAFKA_SERVICE_NAME --json | jq -r '.connection_info.schema_registry_uri')or for Fish shell
set -x SCHEMA_REGISTRY_URL (avn service get $KAFKA_SERVICE_NAME --json | jq -r '.connection_info.schema_registry_uri')Get the schema registry password
export SCHEMA_REGISTRY_PASSWORD=$(avn service get $KAFKA_SERVICE_NAME --json | jq -r '.users[0].password')or for Fish shell
set -x SCHEMA_REGISTRY_PASSWORD (avn service get $KAFKA_SERVICE_NAME --json | jq -r '.users[0].password')We assume the default username for the schema registry, so don't need to look that up, but if you do need it then you can get it with
export SCHEMA_REGISTRY_USERNAME=$(avn service get $KAFKA_SERVICE_NAME --json | jq -r '.users[0].username')or for Fish shell
set -x SCHEMA_REGISTRY_USERNAME (avn service get $KAFKA_SERVICE_NAME --json | jq -r '.users[0].username')Wait for it to reach Running state
avn service wait $KAFKA_SERVICE_NAMEOnce the Kafka service is running, you can create the two topics.
Note In fact, the
-c kafka.auto_create_topics_enable=truespecified when creating the service means the topics will get created when first written to, so this is not necessarily required.The
produce_avro.pyscript assumes that the "input" topic is calledmetric_data.
avn service topic-create \
--partitions 1 \
--replication 2 \
$KAFKA_SERVICE_NAME metric_dataavn service topic-create \
--partitions 1 \
--replication 2 \
$KAFKA_SERVICE_NAME anomaly_dataDownload the certification files (it will create the directory if necessary)
avn service user-creds-download $KAFKA_SERVICE_NAME --username avnadmin -d certsls certsshould report
ca.pem service.cert service.key
Set the environment variables for the certificate file contents
source prep.shor for Fish shell
source prep.fishAnd now you're ready to run the program, either via ./run.sh
or via Docker.