From 782f644a56d05556bcfe701538dd7aa335c94183 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Wed, 19 Feb 2025 11:09:43 +0100 Subject: [PATCH 1/5] add internal topic --- src/cascade/runtime/flink_runtime.py | 39 +++++++++++++++++++++------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index febfc83..dd090cb 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -317,7 +317,7 @@ def debug(x, msg=""): class FlinkRuntime(): """A Runtime that runs Dataflows on Flink.""" - def __init__(self, input_topic="input-topic", output_topic="output-topic", ui_port: Optional[int] = None): + def __init__(self, input_topic="input-topic", output_topic="output-topic", ui_port: Optional[int] = None, internal_topic="internal-topic"): self.env: Optional[StreamExecutionEnvironment] = None """@private""" @@ -328,7 +328,10 @@ def __init__(self, input_topic="input-topic", output_topic="output-topic", ui_po """The number of events that were sent using `send()`.""" self.input_topic = input_topic - """The topic to use for internal communications.""" + """The topic to use read new events/requests.""" + + self.internal_topic = internal_topic + """The topic used for internal messages.""" self.output_topic = output_topic """The topic to use for external communications, i.e. when a dataflow is @@ -391,7 +394,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para "auto.offset.reset": "earliest", "group.id": "test_group_1", } - kafka_source = ( + kafka_external_source = ( KafkaSource.builder() .set_bootstrap_servers(kafka_broker) .set_topics(self.input_topic) @@ -400,12 +403,21 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .set_value_only_deserializer(deserialization_schema) .build() ) + kafka_internal_source = ( + KafkaSource.builder() + .set_bootstrap_servers(kafka_broker) + .set_topics(self.internal_topic) + .set_group_id("test_group_1") + .set_starting_offsets(KafkaOffsetsInitializer.earliest()) + .set_value_only_deserializer(deserialization_schema) + .build() + ) self.kafka_internal_sink = ( KafkaSink.builder() .set_bootstrap_servers(kafka_broker) .set_record_serializer( KafkaRecordSerializationSchema.builder() - .set_topic(self.input_topic) + .set_topic(self.internal_topic) .set_value_serialization_schema(deserialization_schema) .build() ) @@ -428,17 +440,26 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para ) """Kafka sink corresponding to outputs of calls (`EventResult`s).""" - event_stream = ( - self.env.from_source( - kafka_source, + self.env.from_source( + kafka_external_source, WatermarkStrategy.no_watermarks(), - "Kafka Source" + "Kafka External Source" ) .map(lambda x: deserialize_and_timestamp(x)) # .map(lambda x: debug(x, msg=f"entry: {x}")) - .name("DESERIALIZE") + .name("DESERIALIZE external") + .set_parallelism(2) # .filter(lambda e: isinstance(e, Event)) # Enforced by `send` type safety + ).union( + self.env.from_source( + kafka_internal_source, + WatermarkStrategy.no_watermarks(), + "Kafka External Source" + ) + .map(lambda x: deserialize_and_timestamp(x)) + .name("DESERIALIZE internal") + .set_parallelism(4) ) """REMOVE SELECT ALL NODES From 1665966628ee11096414fb58d419cf010091bd04 Mon Sep 17 00:00:00 2001 From: Lucas Van Mol <16979353+lucasvanmol@users.noreply.github.com> Date: Sat, 22 Feb 2025 16:56:47 +0100 Subject: [PATCH 2/5] Run benchmarks on cluster --- Dockerfile.pyflink | 24 ++ README.md | 81 +++++- deathstar_movie_review/demo.py | 275 +++++------------- .../entities/compose_review.py | 2 +- deathstar_movie_review/entities/user.py | 4 +- deathstar_movie_review/start_benchmark.py | 207 +++++++++++++ docker-compose.yml | 28 ++ environment.yml | 94 ++++++ src/cascade/runtime/flink_runtime.py | 24 +- 9 files changed, 526 insertions(+), 213 deletions(-) create mode 100644 Dockerfile.pyflink create mode 100644 deathstar_movie_review/start_benchmark.py create mode 100644 environment.yml diff --git a/Dockerfile.pyflink b/Dockerfile.pyflink new file mode 100644 index 0000000..aa7356c --- /dev/null +++ b/Dockerfile.pyflink @@ -0,0 +1,24 @@ +FROM flink:1.20.0-scala_2.12 + +# Install Python 3.11.11 +RUN apt-get update && apt-get install -y \ + software-properties-common && \ + add-apt-repository ppa:deadsnakes/ppa && \ + apt-get update && \ + apt-get install -y python3.11 python3.11-venv python3.11-dev && \ + ln -sf /usr/bin/python3.11 /usr/bin/python && \ + ln -sf /usr/bin/python3.11 /usr/bin/python3 && \ + rm -rf /var/lib/apt/lists/* + +# Install pip +RUN python3.11 -m ensurepip --upgrade && \ + ln -sf /usr/local/bin/pip3 /usr/bin/pip && \ + ln -sf /usr/local/bin/pip3 /usr/bin/pip3 + +# Copy requirements file +COPY requirements.txt /requirements.txt + +# Install Python dependencies +RUN pip install --no-cache-dir -r /requirements.txt + +CMD ["bash"] diff --git a/README.md b/README.md index b9e673a..b7bd055 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,84 @@ # Cascade +## Benchmarking + +Requirements: +- Docker +- Conda +- Local flink client + +1. First create the conda environment with: + +``` +conda env create -f environment.yml +``` + +2. Activate the environment with: + +``` +conda activate cascade_env +``` + +3. Start the Kafka and Pyflink local clusters + +``` +docker compose up +``` + +This will launch: + +- a Kafka broker at `localhost:9092` (`kafka:9093` for inter-docker communication!) and, +- a [Kafbat UI](https://github.com/kafbat/kafka-ui) at http://localhost:8080 +- a local Flink cluster with `PyFlink` and all requirements, with a ui at http://localhost:8081 + +By default the flink cluster will run with 16 task slots. This can be changed +setting the `TASK_SLOTS` enviroment variable, for example: + +``` +TASK_SLOTS=32 docker compose up +``` + +You could also scale up the number of taskmanagers, each with the same defined +number of task slots (untested): + +``` +docker compose up --scale taskmanager=3 +``` + +Once everything has started (for example, you can see the web UIs running), you +can upload the benchmark job to the cluster. Note that the Kafka topics must be +emptied first, otherwise the job will immediately start consuming old events. +You can use the Kafbat UI for this, for example by deleting topics or purging +messages. To start the job, first navigate to the cascade repo directory e.g. +`cd /path/to/cascade`. Then run the following command, where `X` is the default +parallelism desired: + +``` +flink run --pyFiles /path/to/cascade/src,/path/to/cascade --pyModule deathstar_movie_review.demo -p X +``` + +Once the job is submitted, you can start the benchmark. Open another terminal in +the same directory and run: + +``` +python -m deathstar_movie_review.start_benchmark +``` + +This will start the benchmark by sending events to Kafka. The first phase will +initialise the state required for the benchmark, and is not measured. The second +phase starts the actual becnhmark. + + +### Notes + +Currently trying to scale up higher than `-p 16`, however I ran into the +following issue on `-p 64` with `TASK_SLOTS=128`, more configuration might be required? + +``` +Caused by: java.io.IOException: Insufficient number of network buffers: required 65, but only 38 available. The total number of network buffers is currently set to 4096 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'. +``` + + ## Development Cascade should work with Python 3.10 / 3.11 although other versions could work. Dependencies should first be installed with: @@ -8,7 +87,7 @@ Cascade should work with Python 3.10 / 3.11 although other versions could work. pip install -r requirements.txt ``` -## Testing +## (old) Testing The `pip install` command should have installed a suitable version of `pytest`. diff --git a/deathstar_movie_review/demo.py b/deathstar_movie_review/demo.py index 6e546e7..72a2dc0 100644 --- a/deathstar_movie_review/demo.py +++ b/deathstar_movie_review/demo.py @@ -1,188 +1,61 @@ -import hashlib -import uuid +from cascade.runtime.flink_runtime import FlinkRuntime -from .movie_data import movie_data -from .workload_data import movie_titles, charset -import random -from timeit import default_timer as timer -import sys -import os +from .entities.user import user_op +from .entities.compose_review import compose_review_op +from .entities.frontend import frontend_op, text_op, unique_id_op +from .entities.movie import movie_id_op, movie_info_op, plot_op -# import cascade -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) -from cascade.dataflow.dataflow import Event, EventResult, InitClass, OpNode -from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime -from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination +from confluent_kafka.admin import AdminClient, NewTopic -from .entities.user import user_op, User -from .entities.compose_review import compose_review_op -from .entities.frontend import frontend_op, text_op, unique_id_op -from .entities.movie import MovieInfo, movie_id_op, movie_info_op, plot_op, Plot, MovieId - -import time -import pandas as pd - -def populate_user(client: FlinkRuntime): - init_user = OpNode(User, InitClass(), read_key_from="username") - for i in range(1000): - user_id = f'user{i}' - username = f'username_{i}' - password = f'password_{i}' - hasher = hashlib.new('sha512') - salt = uuid.uuid1().bytes - hasher.update(password.encode()) - hasher.update(salt) - - password_hash = hasher.hexdigest() - - user_data = { - "userId": user_id, - "FirstName": "firstname", - "LastName": "lastname", - "Username": username, - "Password": password_hash, - "Salt": salt - } - event = Event(init_user, {"username": username, "user_data": user_data}, None) - client.send(event) - - -def populate_movie(client: FlinkRuntime): - init_movie_info = OpNode(MovieInfo, InitClass(), read_key_from="movie_id") - init_plot = OpNode(Plot, InitClass(), read_key_from="movie_id") - init_movie_id = OpNode(MovieId, InitClass(), read_key_from="title") - - for movie in movie_data: - movie_id = movie["MovieId"] - - # movie info -> write `movie` - event = Event(init_movie_info, {"movie_id": movie_id, "info": movie}, None) - client.send(event) - - # plot -> write "plot" - event = Event(init_plot, {"movie_id": movie_id, "plot": "plot"}, None) - client.send(event) - - # movie_id_op -> register movie id - event = Event(init_movie_id, {"title": movie["Title"], "movie_id": movie_id}, None) - client.send(event) - - -def compose_review(req_id): - user_index = random.randint(0, 999) - username = f"username_{user_index}" - password = f"password_{user_index}" - title = random.choice(movie_titles) - rating = random.randint(0, 10) - text = ''.join(random.choice(charset) for _ in range(256)) - - return frontend_op.dataflow.generate_event({ - "review": req_id, - "user": username, - "title": title, - "rating": rating, - "text": text - }) - -def deathstar_workload_generator(): - c = 1 - while True: - yield compose_review(c) - c += 1 - -threads = 1 -messages_per_burst = 10 -sleeps_per_burst = 10 -sleep_time = 0.08 #0.0085 -seconds_per_burst = 1 -bursts = 100 - - -def benchmark_runner(proc_num) -> dict[int, dict]: - print(f'Generator: {proc_num} starting') - client = FlinkClientSync("ds-movie-in", "ds-movie-out") - deathstar_generator = deathstar_workload_generator() - start = timer() - - for _ in range(bursts): - sec_start = timer() - - # send burst of messages - for i in range(messages_per_burst): - - # sleep sometimes between messages - if i % (messages_per_burst // sleeps_per_burst) == 0: - time.sleep(sleep_time) - event = next(deathstar_generator) - client.send(event) - - client.flush() - sec_end = timer() - - # wait out the second - lps = sec_end - sec_start - if lps < seconds_per_burst: - time.sleep(1 - lps) - sec_end2 = timer() - print(f'Latency per burst: {sec_end2 - sec_start} ({seconds_per_burst})') + +def create_topics(): + # Kafka broker configuration + conf = { + "bootstrap.servers": "localhost:9092" # Replace with your Kafka broker(s) + } + + # Create an AdminClient + admin_client = AdminClient(conf) + + # Topics to ensure + required_topics = ["ds-movie-in", "internal-topic", "ds-movie-out"] + + # Fetch existing topics + existing_topics = admin_client.list_topics(timeout=5).topics.keys() + + # Find missing topics + missing_topics = [topic for topic in required_topics if topic not in existing_topics] + + if missing_topics: + print(f"Creating missing topics: {missing_topics}") - end = timer() - print(f'Average latency per burst: {(end - start) / bursts} ({seconds_per_burst})') - - done = False - while not done: - done = True - for event_id, fut in client._futures.items(): - result = fut["ret"] - if result is None: - done = False - time.sleep(0.5) - break - futures = client._futures - client.close() - return futures - - -def write_dict_to_pkl(futures_dict, filename): - """ - Writes a dictionary of event data to a pickle file. - - Args: - futures_dict (dict): A dictionary where each key is an event ID and the value is another dict. - filename (str): The name of the pickle file to write to. - """ - - # Prepare the data for the DataFrame - data = [] - for event_id, event_data in futures_dict.items(): - ret: EventResult = event_data.get("ret") - row = { - "event_id": event_id, - "sent": str(event_data.get("sent")), - "sent_t": event_data.get("sent_t"), - "ret": str(event_data.get("ret")), - "ret_t": event_data.get("ret_t"), - "roundtrip": ret.metadata["roundtrip"] if ret else None, - "flink_time": ret.metadata["flink_time"] if ret else None, - "deser_times": ret.metadata["deser_times"] if ret else None, - "loops": ret.metadata["loops"] if ret else None, - "latency": event_data["ret_t"][1] - event_data["sent_t"][1] if ret else None - } - data.append(row) - - # Create a DataFrame and save it as a pickle file - df = pd.DataFrame(data) - df.to_pickle(filename) + # Define new topics (default: 1 partition, replication factor 1) + new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in missing_topics] + + # Create topics + futures = admin_client.create_topics(new_topics) + + # Wait for topic creation to complete + for topic, future in futures.items(): + try: + future.result() # Block until the operation is complete + print(f"Topic '{topic}' created successfully") + except Exception as e: + print(f"Failed to create topic '{topic}': {e}") + else: + print("All required topics exist.") + def main(): - runtime = FlinkRuntime("ds-movie-in", "ds-movie-out", 8081) - runtime.init(bundle_time=5, bundle_size=10) + create_topics() + runtime = FlinkRuntime("ds-movie-in", "ds-movie-out") + runtime.init(kafka_broker="kafka:9093",bundle_time=5, bundle_size=10) - print(frontend_op.dataflow.to_dot()) + # print(frontend_op.dataflow.to_dot()) # dead_node_elimination([], [frontend_op]) print(frontend_op.dataflow.to_dot()) - input() + # input() runtime.add_operator(compose_review_op) @@ -194,30 +67,34 @@ def main(): runtime.add_stateless_operator(unique_id_op) runtime.add_stateless_operator(text_op) - runtime.run(run_async=True) - populate_user(runtime) - populate_movie(runtime) - runtime.producer.flush() - time.sleep(1) - - input() - - # with Pool(threads) as p: - # results = p.map(benchmark_runner, range(threads)) - - # results = {k: v for d in results for k, v in d.items()} - results = benchmark_runner(0) - - print("last result:") - print(list(results.values())[-1]) - t = len(results) - r = 0 - for result in results.values(): - if result["ret"] is not None: - print(result) - r += 1 - print(f"{r}/{t} results recieved.") - write_dict_to_pkl(results, "test2.pkl") + print("running now...") + runtime.run() + print("running") + # input("wait!") + # print() + # populate_user(runtime) + # populate_movie(runtime) + # runtime.producer.flush() + # time.sleep(1) + + # input() + + # # with Pool(threads) as p: + # # results = p.map(benchmark_runner, range(threads)) + + # # results = {k: v for d in results for k, v in d.items()} + # results = benchmark_runner(0) + + # print("last result:") + # print(list(results.values())[-1]) + # t = len(results) + # r = 0 + # for result in results.values(): + # if result["ret"] is not None: + # print(result) + # r += 1 + # print(f"{r}/{t} results recieved.") + # write_dict_to_pkl(results, "test2.pkl") if __name__ == "__main__": main() diff --git a/deathstar_movie_review/entities/compose_review.py b/deathstar_movie_review/entities/compose_review.py index 7423c4b..853e34b 100644 --- a/deathstar_movie_review/entities/compose_review.py +++ b/deathstar_movie_review/entities/compose_review.py @@ -1,6 +1,6 @@ from typing import Any -from src.cascade.dataflow.operator import StatefulOperator +from cascade.dataflow.operator import StatefulOperator class ComposeReview: diff --git a/deathstar_movie_review/entities/user.py b/deathstar_movie_review/entities/user.py index 2b244a9..e883277 100644 --- a/deathstar_movie_review/entities/user.py +++ b/deathstar_movie_review/entities/user.py @@ -1,7 +1,7 @@ from typing import Any from deathstar_movie_review.entities.compose_review import ComposeReview -from src.cascade.dataflow.dataflow import DataFlow, Edge, InvokeMethod, OpNode -from src.cascade.dataflow.operator import StatefulOperator +from cascade.dataflow.dataflow import DataFlow, Edge, InvokeMethod, OpNode +from cascade.dataflow.operator import StatefulOperator class User: diff --git a/deathstar_movie_review/start_benchmark.py b/deathstar_movie_review/start_benchmark.py new file mode 100644 index 0000000..169d326 --- /dev/null +++ b/deathstar_movie_review/start_benchmark.py @@ -0,0 +1,207 @@ +import hashlib +import time +import uuid +import pandas as pd +import random +from .movie_data import movie_data +from .workload_data import movie_titles, charset +import sys +import os +from timeit import default_timer as timer + + +# import cascade +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) + +from cascade.dataflow.dataflow import Event, EventResult, InitClass, OpNode +from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime +from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination + +from .entities.user import user_op, User +from .entities.compose_review import compose_review_op +from .entities.frontend import frontend_op, text_op, unique_id_op +from .entities.movie import MovieInfo, movie_id_op, movie_info_op, plot_op, Plot, MovieId + + +def populate_user(client: FlinkClientSync): + init_user = OpNode(User, InitClass(), read_key_from="username") + for i in range(1000): + user_id = f'user{i}' + username = f'username_{i}' + password = f'password_{i}' + hasher = hashlib.new('sha512') + salt = uuid.uuid1().bytes + hasher.update(password.encode()) + hasher.update(salt) + + password_hash = hasher.hexdigest() + + user_data = { + "userId": user_id, + "FirstName": "firstname", + "LastName": "lastname", + "Username": username, + "Password": password_hash, + "Salt": salt + } + event = Event(init_user, {"username": username, "user_data": user_data}, None) + client.send(event) + + +def populate_movie(client: FlinkClientSync): + init_movie_info = OpNode(MovieInfo, InitClass(), read_key_from="movie_id") + init_plot = OpNode(Plot, InitClass(), read_key_from="movie_id") + init_movie_id = OpNode(MovieId, InitClass(), read_key_from="title") + + for movie in movie_data: + movie_id = movie["MovieId"] + + # movie info -> write `movie` + event = Event(init_movie_info, {"movie_id": movie_id, "info": movie}, None) + client.send(event) + + # plot -> write "plot" + event = Event(init_plot, {"movie_id": movie_id, "plot": "plot"}, None) + client.send(event) + + # movie_id_op -> register movie id + event = Event(init_movie_id, {"title": movie["Title"], "movie_id": movie_id}, None) + client.send(event) + + +def compose_review(req_id): + user_index = random.randint(0, 999) + username = f"username_{user_index}" + password = f"password_{user_index}" + title = random.choice(movie_titles) + rating = random.randint(0, 10) + text = ''.join(random.choice(charset) for _ in range(256)) + + return frontend_op.dataflow.generate_event({ + "review": req_id, + "user": username, + "title": title, + "rating": rating, + "text": text + }) + +def deathstar_workload_generator(): + c = 1 + while True: + yield compose_review(c) + c += 1 + +threads = 1 +messages_per_burst = 10 +sleeps_per_burst = 10 +sleep_time = 0.08 #0.0085 +seconds_per_burst = 1 +bursts = 100 + + +def benchmark_runner(proc_num) -> dict[int, dict]: + print(f'Generator: {proc_num} starting') + client = FlinkClientSync("ds-movie-in", "ds-movie-out") + deathstar_generator = deathstar_workload_generator() + start = timer() + + for _ in range(bursts): + sec_start = timer() + + # send burst of messages + for i in range(messages_per_burst): + + # sleep sometimes between messages + if i % (messages_per_burst // sleeps_per_burst) == 0: + time.sleep(sleep_time) + event = next(deathstar_generator) + client.send(event) + + client.flush() + sec_end = timer() + + # wait out the second + lps = sec_end - sec_start + if lps < seconds_per_burst: + time.sleep(1 - lps) + sec_end2 = timer() + print(f'Latency per burst: {sec_end2 - sec_start} ({seconds_per_burst})') + + end = timer() + print(f'Average latency per burst: {(end - start) / bursts} ({seconds_per_burst})') + + done = False + while not done: + done = True + for event_id, fut in client._futures.items(): + result = fut["ret"] + if result is None: + done = False + time.sleep(0.5) + break + futures = client._futures + client.close() + return futures + + +def write_dict_to_pkl(futures_dict, filename): + """ + Writes a dictionary of event data to a pickle file. + + Args: + futures_dict (dict): A dictionary where each key is an event ID and the value is another dict. + filename (str): The name of the pickle file to write to. + """ + + # Prepare the data for the DataFrame + data = [] + for event_id, event_data in futures_dict.items(): + ret: EventResult = event_data.get("ret") + row = { + "event_id": event_id, + "sent": str(event_data.get("sent")), + "sent_t": event_data.get("sent_t"), + "ret": str(event_data.get("ret")), + "ret_t": event_data.get("ret_t"), + "roundtrip": ret.metadata["roundtrip"] if ret else None, + "flink_time": ret.metadata["flink_time"] if ret else None, + "deser_times": ret.metadata["deser_times"] if ret else None, + "loops": ret.metadata["loops"] if ret else None, + "latency": event_data["ret_t"][1] - event_data["sent_t"][1] if ret else None + } + data.append(row) + + # Create a DataFrame and save it as a pickle file + df = pd.DataFrame(data) + df.to_pickle(filename) + +def main(): + client = FlinkClientSync("ds-movie-in", "ds-movie-out") + + populate_user(client) + populate_movie(client) + client.producer.flush() + time.sleep(1) + + input() + + # with Pool(threads) as p: + # results = p.map(benchmark_runner, range(threads)) + + # results = {k: v for d in results for k, v in d.items()} + results = benchmark_runner(0) + + print("last result:") + print(list(results.values())[-1]) + t = len(results) + r = 0 + for result in results.values(): + if result["ret"] is not None: + print(result) + r += 1 + print(f"{r}/{t} results recieved.") + write_dict_to_pkl(results, "test2.pkl") + client.close() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index ce9e1b4..22b5bb8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,3 +40,31 @@ services: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093 depends_on: - kafka + + # https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/resource-providers/standalone/docker/#flink-with-docker-compose + + jobmanager: + build: + context: . + dockerfile: Dockerfile.pyflink + ports: + - "8081:8081" + command: jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + + taskmanager: + build: + context: . + dockerfile: Dockerfile.pyflink + depends_on: + - jobmanager + command: taskmanager + scale: 1 + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.numberOfTaskSlots: ${TASK_SLOTS:-16} \ No newline at end of file diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000..e31bd80 --- /dev/null +++ b/environment.yml @@ -0,0 +1,94 @@ +name: cascade_env +channels: + - defaults + - https://repo.anaconda.com/pkgs/main + - https://repo.anaconda.com/pkgs/r +dependencies: + - _libgcc_mutex=0.1=main + - _openmp_mutex=5.1=1_gnu + - bzip2=1.0.8=h5eee18b_6 + - ca-certificates=2024.12.31=h06a4308_0 + - ld_impl_linux-64=2.40=h12ee557_0 + - libffi=3.4.4=h6a678d5_1 + - libgcc-ng=11.2.0=h1234567_1 + - libgomp=11.2.0=h1234567_1 + - libstdcxx-ng=11.2.0=h1234567_1 + - libuuid=1.41.5=h5eee18b_0 + - ncurses=6.4=h6a678d5_0 + - openssl=3.0.15=h5eee18b_0 + - pip=25.0=py311h06a4308_0 + - python=3.11.11=he870216_0 + - readline=8.2=h5eee18b_0 + - setuptools=75.8.0=py311h06a4308_0 + - sqlite=3.45.3=h5eee18b_0 + - tk=8.6.14=h39e8969_0 + - wheel=0.45.1=py311h06a4308_0 + - xz=5.6.4=h5eee18b_1 + - zlib=1.2.13=h5eee18b_1 + - pip: + - apache-beam==2.48.0 + - apache-flink==1.20.0 + - apache-flink-libraries==1.20.0 + - astor==0.8.1 + - avro-python3==1.10.2 + - certifi==2024.8.30 + - charset-normalizer==3.4.0 + - cloudpickle==2.2.1 + - configargparse==1.7 + - confluent-kafka==2.6.1 + - contourpy==1.3.1 + - crcmod==1.7 + - cycler==0.12.1 + - dill==0.3.1.1 + - dnspython==2.7.0 + - docopt==0.6.2 + - exceptiongroup==1.2.2 + - fastavro==1.9.7 + - fasteners==0.19 + - find-libpython==0.4.0 + - fonttools==4.56.0 + - geographiclib==2.0 + - geopy==2.4.1 + - grpcio==1.68.1 + - hdfs==2.7.3 + - httplib2==0.22.0 + - idna==3.10 + - iniconfig==2.0.0 + - jinja2==3.1.4 + - kiwisolver==1.4.8 + - klara==0.6.3 + - markupsafe==3.0.2 + - matplotlib==3.9.2 + - networkx==3.2.1 + - numpy==1.24.4 + - objsize==0.6.1 + - orjson==3.10.12 + - packaging==24.2 + - pandas==2.2.3 + - pdoc==15.0.0 + - pemja==0.4.1 + - pillow==11.1.0 + - pluggy==1.5.0 + - proto-plus==1.25.0 + - protobuf==4.23.4 + - py4j==0.10.9.7 + - pyarrow==11.0.0 + - pydot==1.4.2 + - pygments==2.18.0 + - pymongo==4.10.1 + - pyparsing==3.2.0 + - pytest==8.3.4 + - python-dateutil==2.9.0.post0 + - pytz==2024.2 + - regex==2024.11.6 + - requests==2.32.3 + - ruamel-yaml==0.18.6 + - ruamel-yaml-clib==0.2.12 + - six==1.17.0 + - tomli==2.2.1 + - typed-ast==1.5.5 + - typing-extensions==4.12.2 + - tzdata==2024.2 + - urllib3==2.2.3 + - z3-solver==4.14.0.0 + - zstandard==0.23.0 diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index dd090cb..038ec66 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -371,22 +371,28 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para config.set_integer("python.fn-execution.bundle.size", bundle_size) # optimize for low latency - config.set_integer("taskmanager.memory.managed.size", 0) - config.set_integer("execution.buffer-timeout", 0) + # config.set_integer("taskmanager.memory.managed.size", 0) + config.set_integer("execution.buffer-timeout", 5) + + + kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), + 'bin/flink-sql-connector-kafka-3.3.0-1.20.jar') + serializer_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin/flink-kafka-bytes-serializer.jar') + + # https://issues.apache.org/jira/browse/FLINK-36457q + config.set_string("pipeline.jars",f"file:///home/lvanmol/flink-1.20.1/opt/flink-python-1.20.1.jar;file://{kafka_jar};file://{serializer_jar}") self.env = StreamExecutionEnvironment.get_execution_environment(config) if parallelism: self.env.set_parallelism(parallelism) logger.debug(f"FlinkRuntime: parellelism {self.env.get_parallelism()}") - kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), - 'bin/flink-sql-connector-kafka-3.3.0-1.20.jar') - serializer_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin/flink-kafka-bytes-serializer.jar') if os.name == 'nt': self.env.add_jars(f"file:///{kafka_jar}",f"file:///{serializer_jar}") else: - self.env.add_jars(f"file://{kafka_jar}",f"file://{serializer_jar}") + pass + # self.env.add_jars(f"file://{kafka_jar}",f"file://{serializer_jar}") deserialization_schema = ByteSerializer() properties: dict = { @@ -421,7 +427,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .set_value_serialization_schema(deserialization_schema) .build() ) - .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) + # .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() ) """Kafka sink that will be ingested again by the Flink runtime.""" @@ -435,7 +441,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .set_value_serialization_schema(deserialization_schema) .build() ) - .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) + # .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() ) """Kafka sink corresponding to outputs of calls (`EventResult`s).""" @@ -449,7 +455,6 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .map(lambda x: deserialize_and_timestamp(x)) # .map(lambda x: debug(x, msg=f"entry: {x}")) .name("DESERIALIZE external") - .set_parallelism(2) # .filter(lambda e: isinstance(e, Event)) # Enforced by `send` type safety ).union( self.env.from_source( @@ -459,7 +464,6 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para ) .map(lambda x: deserialize_and_timestamp(x)) .name("DESERIALIZE internal") - .set_parallelism(4) ) """REMOVE SELECT ALL NODES From 1ebe3d5c56679cae00b20aca3ce95ba3a763e861 Mon Sep 17 00:00:00 2001 From: Lucas Van Mol <16979353+lucasvanmol@users.noreply.github.com> Date: Sat, 22 Feb 2025 18:02:09 +0100 Subject: [PATCH 3/5] Code cleanup --- README.md | 2 +- deathstar_movie_review/demo.py | 53 +++------- deathstar_movie_review/start_benchmark.py | 77 +++++++++----- display_results.ipynb | 5 +- src/cascade/runtime/flink_runtime.py | 122 +++++++--------------- 5 files changed, 103 insertions(+), 156 deletions(-) diff --git a/README.md b/README.md index b7bd055..01b1822 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ flink run --pyFiles /path/to/cascade/src,/path/to/cascade --pyModule deathstar_m ``` Once the job is submitted, you can start the benchmark. Open another terminal in -the same directory and run: +the same directory (and conda environment) and run: ``` python -m deathstar_movie_review.start_benchmark diff --git a/deathstar_movie_review/demo.py b/deathstar_movie_review/demo.py index 72a2dc0..889e623 100644 --- a/deathstar_movie_review/demo.py +++ b/deathstar_movie_review/demo.py @@ -8,19 +8,20 @@ from confluent_kafka.admin import AdminClient, NewTopic +KAFKA_BROKER = "localhost:9092" +KAFKA_FLINK_BROKER = "kafka:9093" # If running a flink cluster and kafka inside docker, the broker url might be different -def create_topics(): - # Kafka broker configuration +IN_TOPIC = "ds-movie-in" +OUT_TOPIC = "ds-movie-out" +INTERNAL_TOPIC = "ds-movie-internal" + +def create_topics(*required_topics): conf = { - "bootstrap.servers": "localhost:9092" # Replace with your Kafka broker(s) + "bootstrap.servers": KAFKA_BROKER } - # Create an AdminClient admin_client = AdminClient(conf) - # Topics to ensure - required_topics = ["ds-movie-in", "internal-topic", "ds-movie-out"] - # Fetch existing topics existing_topics = admin_client.list_topics(timeout=5).topics.keys() @@ -48,15 +49,14 @@ def create_topics(): def main(): - create_topics() - runtime = FlinkRuntime("ds-movie-in", "ds-movie-out") - runtime.init(kafka_broker="kafka:9093",bundle_time=5, bundle_size=10) + create_topics(IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC) + + runtime = FlinkRuntime(IN_TOPIC, OUT_TOPIC, internal_topic=INTERNAL_TOPIC) + runtime.init(kafka_broker=KAFKA_FLINK_BROKER,bundle_time=5, bundle_size=10) - # print(frontend_op.dataflow.to_dot()) # dead_node_elimination([], [frontend_op]) + print("Creating dataflow:") print(frontend_op.dataflow.to_dot()) - # input() - runtime.add_operator(compose_review_op) runtime.add_operator(user_op) @@ -67,34 +67,7 @@ def main(): runtime.add_stateless_operator(unique_id_op) runtime.add_stateless_operator(text_op) - print("running now...") runtime.run() - print("running") - # input("wait!") - # print() - # populate_user(runtime) - # populate_movie(runtime) - # runtime.producer.flush() - # time.sleep(1) - - # input() - - # # with Pool(threads) as p: - # # results = p.map(benchmark_runner, range(threads)) - - # # results = {k: v for d in results for k, v in d.items()} - # results = benchmark_runner(0) - - # print("last result:") - # print(list(results.values())[-1]) - # t = len(results) - # r = 0 - # for result in results.values(): - # if result["ret"] is not None: - # print(result) - # r += 1 - # print(f"{r}/{t} results recieved.") - # write_dict_to_pkl(results, "test2.pkl") if __name__ == "__main__": main() diff --git a/deathstar_movie_review/start_benchmark.py b/deathstar_movie_review/start_benchmark.py index 169d326..e381ff8 100644 --- a/deathstar_movie_review/start_benchmark.py +++ b/deathstar_movie_review/start_benchmark.py @@ -14,14 +14,20 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) from cascade.dataflow.dataflow import Event, EventResult, InitClass, OpNode -from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime -from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination - -from .entities.user import user_op, User -from .entities.compose_review import compose_review_op -from .entities.frontend import frontend_op, text_op, unique_id_op -from .entities.movie import MovieInfo, movie_id_op, movie_info_op, plot_op, Plot, MovieId - +from cascade.runtime.flink_runtime import FlinkClientSync + +from .entities.user import User +from .entities.frontend import frontend_op +from .entities.movie import MovieInfo, Plot, MovieId + +IN_TOPIC = "ds-movie-in" +OUT_TOPIC = "ds-movie-out" +# threads = 1 +messages_per_burst = 10 +sleeps_per_burst = 10 +sleep_time = 0.08 +seconds_per_burst = 1 +bursts = 100 def populate_user(client: FlinkClientSync): init_user = OpNode(User, InitClass(), read_key_from="username") @@ -91,21 +97,14 @@ def deathstar_workload_generator(): yield compose_review(c) c += 1 -threads = 1 -messages_per_burst = 10 -sleeps_per_burst = 10 -sleep_time = 0.08 #0.0085 -seconds_per_burst = 1 -bursts = 100 - def benchmark_runner(proc_num) -> dict[int, dict]: print(f'Generator: {proc_num} starting') - client = FlinkClientSync("ds-movie-in", "ds-movie-out") + client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) deathstar_generator = deathstar_workload_generator() start = timer() - for _ in range(bursts): + for b in range(bursts): sec_start = timer() # send burst of messages @@ -125,11 +124,15 @@ def benchmark_runner(proc_num) -> dict[int, dict]: if lps < seconds_per_burst: time.sleep(1 - lps) sec_end2 = timer() - print(f'Latency per burst: {sec_end2 - sec_start} ({seconds_per_burst})') + print(f'Latency per burst: {sec_end2 - sec_start} ({b+1}/{bursts})') end = timer() print(f'Average latency per burst: {(end - start) / bursts} ({seconds_per_burst})') - + futures = wait_for_futures(client) + client.close() + return futures + +def wait_for_futures(client: FlinkClientSync): done = False while not done: done = True @@ -140,7 +143,6 @@ def benchmark_runner(proc_num) -> dict[int, dict]: time.sleep(0.5) break futures = client._futures - client.close() return futures @@ -173,17 +175,25 @@ def write_dict_to_pkl(futures_dict, filename): # Create a DataFrame and save it as a pickle file df = pd.DataFrame(data) + + # Multiply flink_time by 1000 to convert to milliseconds + df['flink_time'] = df['flink_time'] * 1000 + df.to_pickle(filename) + return df def main(): - client = FlinkClientSync("ds-movie-in", "ds-movie-out") - - populate_user(client) - populate_movie(client) - client.producer.flush() + init_client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) + + print("Populating...") + populate_user(init_client) + populate_movie(init_client) + init_client.producer.flush() + wait_for_futures(init_client) + print("Done.") time.sleep(1) - input() + input("Press enter to start benchmark") # with Pool(threads) as p: # results = p.map(benchmark_runner, range(threads)) @@ -197,11 +207,20 @@ def main(): r = 0 for result in results.values(): if result["ret"] is not None: - print(result) + # print(result) r += 1 + print(f"{r}/{t} results recieved.") - write_dict_to_pkl(results, "test2.pkl") - client.close() + print("Writing results to benchmark_results.pkl") + + df = write_dict_to_pkl(results, "benchmark_results.pkl") + + flink_time = df['flink_time'].median() + latency = df['latency'].median() + flink_prct = float(flink_time) * 100 / latency + print(f"Median latency : {latency:.2f} ms") + print(f"Median Flink time : {flink_time:.2f} ms ({flink_prct:.2f}%)") + init_client.close() if __name__ == "__main__": main() \ No newline at end of file diff --git a/display_results.ipynb b/display_results.ipynb index f47e1d5..548f528 100644 --- a/display_results.ipynb +++ b/display_results.ipynb @@ -1910,7 +1910,7 @@ }, { "cell_type": "code", - "execution_count": 33, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -2917,9 +2917,6 @@ " # Read the DataFrame from the pickle file\n", " df = pd.read_pickle(pickle_file_path)\n", "\n", - " # Multiply flink_time by 1000 to convert to milliseconds\n", - " df['flink_time'] = df['flink_time'] * 1000\n", - "\n", " # Calculate the additional Kafka overhead\n", " df['kafka_overhead'] = df['latency'] - df['flink_time']\n", "\n", diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index 038ec66..5afd53f 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -7,7 +7,6 @@ from typing import Any, Literal, Optional, Type, Union from pyflink.common.typeinfo import Types, get_gateway from pyflink.common import Configuration, DeserializationSchema, SerializationSchema, WatermarkStrategy -from pyflink.datastream.connectors import DeliveryGuarantee from pyflink.datastream.data_stream import CloseableIterator from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext, ValueState, ValueStateDescriptor from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaRecordSerializationSchema, KafkaSource, KafkaSink @@ -25,6 +24,9 @@ console_handler.setFormatter(formatter) logger.addHandler(console_handler) +# Required if SelectAll nodes are used +SELECT_ALL_ENABLED = False + @dataclass class FlinkRegisterKeyNode(Node): """A node that will register a key with the SelectAll operator. @@ -69,14 +71,15 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): result = self.operator.handle_init_class(*event.variable_map.values()) # Register the created key in FlinkSelectAllOperator - # register_key_event = Event( - # FlinkRegisterKeyNode(key, self.operator.entity), - # {}, - # None, - # _id = event._id - # ) - # logger.debug(f"FlinkOperator {self.operator.entity.__name__}[{ctx.get_current_key()}]: Registering key: {register_key_event}") - # yield register_key_event + if SELECT_ALL_ENABLED: + register_key_event = Event( + FlinkRegisterKeyNode(key, self.operator.entity), + {}, + None, + _id = event._id + ) + logger.debug(f"FlinkOperator {self.operator.entity.__name__}[{ctx.get_current_key()}]: Registering key: {register_key_event}") + yield register_key_event self.state.update(pickle.dumps(result)) elif isinstance(event.target.method_type, InvokeMethod): @@ -92,6 +95,7 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): # TODO: check if state actually needs to be updated if state is not None: self.state.update(pickle.dumps(state)) + # Filter targets are used in cases of [hotel for hotel in Hotel.__all__() *if hotel....*] # elif isinstance(event.target.method_type, Filter): # state = pickle.loads(self.state.value()) # result = event.target.method_type.filter_fn(event.variable_map, state) @@ -321,9 +325,6 @@ def __init__(self, input_topic="input-topic", output_topic="output-topic", ui_po self.env: Optional[StreamExecutionEnvironment] = None """@private""" - self.producer: Producer = None - """@private""" - self.sent_events = 0 """The number of events that were sent using `send()`.""" @@ -378,9 +379,10 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin/flink-sql-connector-kafka-3.3.0-1.20.jar') serializer_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin/flink-kafka-bytes-serializer.jar') + flink_jar = "/home/lvanmol/flink-1.20.1/opt/flink-python-1.20.1.jar" - # https://issues.apache.org/jira/browse/FLINK-36457q - config.set_string("pipeline.jars",f"file:///home/lvanmol/flink-1.20.1/opt/flink-python-1.20.1.jar;file://{kafka_jar};file://{serializer_jar}") + # Add the required jars https://issues.apache.org/jira/browse/FLINK-36457q + config.set_string("pipeline.jars",f"file://{flink_jar};file://{kafka_jar};file://{serializer_jar}") self.env = StreamExecutionEnvironment.get_execution_environment(config) if parallelism: @@ -388,18 +390,8 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para logger.debug(f"FlinkRuntime: parellelism {self.env.get_parallelism()}") - if os.name == 'nt': - self.env.add_jars(f"file:///{kafka_jar}",f"file:///{serializer_jar}") - else: - pass - # self.env.add_jars(f"file://{kafka_jar}",f"file://{serializer_jar}") - deserialization_schema = ByteSerializer() - properties: dict = { - "bootstrap.servers": kafka_broker, - "auto.offset.reset": "earliest", - "group.id": "test_group_1", - } + kafka_external_source = ( KafkaSource.builder() .set_bootstrap_servers(kafka_broker) @@ -427,7 +419,6 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .set_value_serialization_schema(deserialization_schema) .build() ) - # .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() ) """Kafka sink that will be ingested again by the Flink runtime.""" @@ -441,7 +432,6 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .set_value_serialization_schema(deserialization_schema) .build() ) - # .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() ) """Kafka sink corresponding to outputs of calls (`EventResult`s).""" @@ -453,7 +443,6 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para "Kafka External Source" ) .map(lambda x: deserialize_and_timestamp(x)) - # .map(lambda x: debug(x, msg=f"entry: {x}")) .name("DESERIALIZE external") # .filter(lambda e: isinstance(e, Event)) # Enforced by `send` type safety ).union( @@ -466,42 +455,32 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .name("DESERIALIZE internal") ) - """REMOVE SELECT ALL NODES # Events with a `SelectAllNode` will first be processed by the select # all operator, which will send out multiple other Events that can # then be processed by operators in the same steam. - select_all_stream = ( - event_stream.filter(lambda e: - isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode)) - .key_by(lambda e: e.target.cls) - .process(FlinkSelectAllOperator()).name("SELECT ALL OP") - ) - # Stream that ingests events with an `SelectAllNode` or `FlinkRegisterKeyNode` - not_select_all_stream = ( - event_stream.filter(lambda e: - not (isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode))) - ) + if SELECT_ALL_ENABLED: + select_all_stream = ( + event_stream.filter(lambda e: + isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode)) + .key_by(lambda e: e.target.cls) + .process(FlinkSelectAllOperator()).name("SELECT ALL OP") + ) + # Stream that ingests events with an `SelectAllNode` or `FlinkRegisterKeyNode` + not_select_all_stream = ( + event_stream.filter(lambda e: + not (isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode))) + ) - operator_stream = select_all_stream.union(not_select_all_stream) - """ + event_stream = select_all_stream.union(not_select_all_stream) self.stateful_op_stream = event_stream self.stateless_op_stream = event_stream - # MOVED TO END OF OP STREAMS! - # self.merge_op_stream = ( - # event_stream.filter(lambda e: isinstance(e.target, CollectNode)) - # .key_by(lambda e: e._id) # might not work in the future if we have multiple merges in one dataflow? - # .process(FlinkCollectOperator()) - # .name("Collect") - # ) - # """Stream that ingests events with an `cascade.dataflow.dataflow.CollectNode` target""" self.stateless_op_streams = [] self.stateful_op_streams = [] """List of stateful operator streams, which gets appended at `add_operator`.""" - self.producer = Producer({'bootstrap.servers': kafka_broker}) logger.debug("FlinkRuntime initialized") def add_operator(self, op: StatefulOperator): @@ -510,10 +489,8 @@ def add_operator(self, op: StatefulOperator): op_stream = ( self.stateful_op_stream.filter(lambda e: isinstance(e.target, OpNode) and e.target.entity == flink_op.operator.entity) - # .map(lambda x: debug(x, msg=f"filtered op: {op.entity}")) .key_by(lambda e: e.variable_map[e.target.read_key_from]) .process(flink_op) - # .map(lambda x: debug(x, msg=f"processed op: {op.entity}")) .name("STATEFUL OP: " + flink_op.operator.entity.__name__) ) self.stateful_op_streams.append(op_stream) @@ -530,17 +507,6 @@ def add_stateless_operator(self, op: StatelessOperator): ) self.stateless_op_streams.append(op_stream) - def send(self, event: Event, flush=False): - """Send an event to the Kafka source. - Once `run` has been called, the Flink runtime will start ingesting these - messages. Messages can always be sent after `init` is called - Flink - will continue ingesting messages after `run` is called asynchronously. - """ - self.producer.produce(self.input_topic, value=pickle.dumps(event)) - if flush: - self.producer.flush() - self.sent_events += 1 - def run(self, run_async=False, output: Literal["collect", "kafka", "stdout"]="kafka") -> Union[CloseableIterator, None]: """Start ingesting and processing messages from the Kafka source. @@ -551,10 +517,16 @@ def run(self, run_async=False, output: Literal["collect", "kafka", "stdout"]="ka logger.debug("FlinkRuntime merging operator streams...") # Combine all the operator streams - # operator_streams = self.merge_op_stream.union(*self.stateful_op_streams[1:], *self.stateless_op_streams)#.map(lambda x: debug(x, msg="combined ops")) - s1 = self.stateful_op_streams[0] - rest = self.stateful_op_streams[1:] - operator_streams = s1.union(*rest, *self.stateless_op_streams)#.map(lambda x: debug(x, msg="combined ops")) + if len(self.stateful_op_streams) >= 1: + s1 = self.stateful_op_streams[0] + rest = self.stateful_op_streams[1:] + operator_streams = s1.union(*rest, *self.stateless_op_streams) + elif len(self.stateless_op_streams) >= 1: + s1 = self.stateless_op_streams[0] + rest = self.stateless_op_streams[1:] + operator_streams = s1.union(*rest, *self.stateful_op_streams) + else: + raise RuntimeError("No operators found, were they added to the flink runtime with .add_*_operator()") merge_op_stream = ( operator_streams.filter(lambda e: isinstance(e, Event) and isinstance(e.target, CollectNode)) @@ -564,20 +536,6 @@ def run(self, run_async=False, output: Literal["collect", "kafka", "stdout"]="ka ) """Stream that ingests events with an `cascade.dataflow.dataflow.CollectNode` target""" - - """ - # Add filtering for nodes with a `Filter` target - full_stream_filtered = ( - operator_streams - .filter(lambda e: isinstance(e, Event) and isinstance(e.target, Filter)) - .filter(lambda e: e.target.filter_fn()) - ) - full_stream_unfiltered = ( - operator_streams - .filter(lambda e: not (isinstance(e, Event) and isinstance(e.target, Filter))) - ) - ds = full_stream_filtered.union(full_stream_unfiltered) - """ # union with EventResults or Events that don't have a CollectNode target ds = merge_op_stream.union(operator_streams.filter(lambda e: not (isinstance(e, Event) and isinstance(e.target, CollectNode)))) From d997c40c1f57675e8eb15260a2f123e0f563e128 Mon Sep 17 00:00:00 2001 From: Lucas Van Mol <16979353+lucasvanmol@users.noreply.github.com> Date: Sat, 22 Feb 2025 19:16:08 +0100 Subject: [PATCH 4/5] Benchmarking tweaks --- README.md | 6 ++++++ deathstar_movie_review/demo.py | 17 ++++++++++++++--- deathstar_movie_review/start_benchmark.py | 3 +-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 01b1822..78fe221 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,12 @@ parallelism desired: flink run --pyFiles /path/to/cascade/src,/path/to/cascade --pyModule deathstar_movie_review.demo -p X ``` +> This command runs `FlinkRuntime.init`, which requires the location of a +> flink-python jarfile. +> The location is currently hardcoded in `src/cascade/runtime/flink_runtime` and +> should be changed based on your environment. The jar file is included as part +> of the flink installation itself, at https://flink.apache.org/downloads/ (1.20.1). + Once the job is submitted, you can start the benchmark. Open another terminal in the same directory (and conda environment) and run: diff --git a/deathstar_movie_review/demo.py b/deathstar_movie_review/demo.py index 889e623..d5e83f3 100644 --- a/deathstar_movie_review/demo.py +++ b/deathstar_movie_review/demo.py @@ -1,8 +1,10 @@ +from typing import Literal +from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination from cascade.runtime.flink_runtime import FlinkRuntime from .entities.user import user_op from .entities.compose_review import compose_review_op -from .entities.frontend import frontend_op, text_op, unique_id_op +from .entities.frontend import frontend_df_parallel, frontend_df_serial, frontend_op, text_op, unique_id_op from .entities.movie import movie_id_op, movie_info_op, plot_op @@ -15,6 +17,8 @@ OUT_TOPIC = "ds-movie-out" INTERNAL_TOPIC = "ds-movie-internal" +EXPERIMENT: Literal["baseline", "pipelined", "parallel"] = "baseline" + def create_topics(*required_topics): conf = { "bootstrap.servers": KAFKA_BROKER @@ -54,9 +58,16 @@ def main(): runtime = FlinkRuntime(IN_TOPIC, OUT_TOPIC, internal_topic=INTERNAL_TOPIC) runtime.init(kafka_broker=KAFKA_FLINK_BROKER,bundle_time=5, bundle_size=10) - # dead_node_elimination([], [frontend_op]) - print("Creating dataflow:") + if EXPERIMENT == "baseline": + frontend_op.dataflow = frontend_df_serial() + elif EXPERIMENT == "pipelined": + frontend_op.dataflow = frontend_df_serial() + dead_node_elimination([], [frontend_op]) + elif EXPERIMENT == "parallel": + frontend_op.dataflow = frontend_df_parallel() + print(frontend_op.dataflow.to_dot()) + print(f"Creating dataflow [{EXPERIMENT}]") runtime.add_operator(compose_review_op) runtime.add_operator(user_op) diff --git a/deathstar_movie_review/start_benchmark.py b/deathstar_movie_review/start_benchmark.py index e381ff8..8cf794c 100644 --- a/deathstar_movie_review/start_benchmark.py +++ b/deathstar_movie_review/start_benchmark.py @@ -9,7 +9,6 @@ import os from timeit import default_timer as timer - # import cascade sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) @@ -193,7 +192,7 @@ def main(): print("Done.") time.sleep(1) - input("Press enter to start benchmark") + print("Starting benchmark") # with Pool(threads) as p: # results = p.map(benchmark_runner, range(threads)) From 358c0b834e3ebd15b38ad55439b57c0c0dede087 Mon Sep 17 00:00:00 2001 From: Lucas Van Mol <16979353+lucasvanmol@users.noreply.github.com> Date: Wed, 5 Mar 2025 10:45:38 +0100 Subject: [PATCH 5/5] Add experiments script --- deathstar_movie_review/demo.py | 4 +- deathstar_movie_review/start_benchmark.py | 30 ++++--- run_experiments.py | 96 +++++++++++++++++++++++ 3 files changed, 119 insertions(+), 11 deletions(-) create mode 100755 run_experiments.py diff --git a/deathstar_movie_review/demo.py b/deathstar_movie_review/demo.py index d5e83f3..60a623b 100644 --- a/deathstar_movie_review/demo.py +++ b/deathstar_movie_review/demo.py @@ -7,7 +7,7 @@ from .entities.frontend import frontend_df_parallel, frontend_df_serial, frontend_op, text_op, unique_id_op from .entities.movie import movie_id_op, movie_info_op, plot_op - +import os from confluent_kafka.admin import AdminClient, NewTopic KAFKA_BROKER = "localhost:9092" @@ -17,7 +17,7 @@ OUT_TOPIC = "ds-movie-out" INTERNAL_TOPIC = "ds-movie-internal" -EXPERIMENT: Literal["baseline", "pipelined", "parallel"] = "baseline" +EXPERIMENT: Literal["baseline", "pipelined", "parallel"] = os.getenv("EXPERIMENT", "baseline") def create_topics(*required_topics): conf = { diff --git a/deathstar_movie_review/start_benchmark.py b/deathstar_movie_review/start_benchmark.py index 8cf794c..7664b86 100644 --- a/deathstar_movie_review/start_benchmark.py +++ b/deathstar_movie_review/start_benchmark.py @@ -8,6 +8,7 @@ import sys import os from timeit import default_timer as timer +import argparse # import cascade sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) @@ -22,11 +23,11 @@ IN_TOPIC = "ds-movie-in" OUT_TOPIC = "ds-movie-out" # threads = 1 -messages_per_burst = 10 -sleeps_per_burst = 10 -sleep_time = 0.08 -seconds_per_burst = 1 -bursts = 100 +# messages_per_burst = 10 +# sleeps_per_burst = 10 +# sleep_time = 0.08 +# seconds_per_burst = 1 +# bursts = 100 def populate_user(client: FlinkClientSync): init_user = OpNode(User, InitClass(), read_key_from="username") @@ -97,7 +98,7 @@ def deathstar_workload_generator(): c += 1 -def benchmark_runner(proc_num) -> dict[int, dict]: +def benchmark_runner(proc_num, messages_per_burst, sleeps_per_burst, sleep_time, seconds_per_burst, bursts) -> dict[int, dict]: print(f'Generator: {proc_num} starting') client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) deathstar_generator = deathstar_workload_generator() @@ -182,6 +183,17 @@ def write_dict_to_pkl(futures_dict, filename): return df def main(): + parser = argparse.ArgumentParser(description="Run the benchmark and save results.") + parser.add_argument("-o", "--output", type=str, default="benchmark_results.pkl", help="Output file name for the results") + parser.add_argument("--messages_per_burst", type=int, default=10, help="Number of messages per burst") + parser.add_argument("--sleeps_per_burst", type=int, default=10, help="Number of sleep cycles per burst") + parser.add_argument("--sleep_time", type=float, default=0.08, help="Sleep time between messages") + parser.add_argument("--seconds_per_burst", type=int, default=1, help="Seconds per burst") + parser.add_argument("--bursts", type=int, default=100, help="Number of bursts") + args = parser.parse_args() + + print(f"Starting with args:\n{args}") + init_client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) print("Populating...") @@ -198,7 +210,7 @@ def main(): # results = p.map(benchmark_runner, range(threads)) # results = {k: v for d in results for k, v in d.items()} - results = benchmark_runner(0) + results = benchmark_runner(0, args.messages_per_burst, args.sleeps_per_burst, args.sleep_time, args.seconds_per_burst, args.bursts) print("last result:") print(list(results.values())[-1]) @@ -210,9 +222,9 @@ def main(): r += 1 print(f"{r}/{t} results recieved.") - print("Writing results to benchmark_results.pkl") + print(f"Writing results to {args.output}") - df = write_dict_to_pkl(results, "benchmark_results.pkl") + df = write_dict_to_pkl(results, args.output) flink_time = df['flink_time'].median() latency = df['latency'].median() diff --git a/run_experiments.py b/run_experiments.py new file mode 100755 index 0000000..3cf327e --- /dev/null +++ b/run_experiments.py @@ -0,0 +1,96 @@ +import os +import subprocess +import time + +args = { + "messages_per_burst": 10, + "sleeps_per_burst": 10, + "sleep_time": 0.08, + "seconds_per_burst": 1, + "bursts": 100 +} + +mps_20 = { + **args, + "messages_per_burst": 20, + "sleeps_per_burst": 20, + "sleep_time": 0.08/2, +} + +mps_50 = { + **args, + "messages_per_burst": 50, + "sleeps_per_burst": 50, + "sleep_time": 0.08/5, +} + +# Define experiment parameters as a list of dictionaries +experiments = [ + {"parallelism": 16, "benchmark_args": {**args}}, + {"parallelism": 16, "benchmark_args": {**mps_20}}, + {"parallelism": 16, "benchmark_args": {**mps_50}}, + + {"parallelism": 8, "benchmark_args": {**args}}, + {"parallelism": 8, "benchmark_args": {**mps_20}}, + + {"parallelism": 4, "benchmark_args": {**mps_20}}, + {"parallelism": 4, "benchmark_args": {**args}}, + + {"parallelism": 2, "benchmark_args": {**args}}, + {"parallelism": 2, "benchmark_args": {**mps_20}}, + + {"parallelism": 1, "benchmark_args": {**args}}, + {"parallelism": 1, "benchmark_args": {**mps_20}}, + + {"parallelism": 8, "benchmark_args": {**mps_50}}, + {"parallelism": 4, "benchmark_args": {**mps_50}}, + {"parallelism": 2, "benchmark_args": {**mps_50}}, + {"parallelism": 1, "benchmark_args": {**mps_50}}, +] + + + + +print("Tearing down docker containers") +subprocess.run(["docker", "compose", "down"], check=True) + +for e in ["parallel", "base", "piplined"]: + for exp in experiments: + print(f"Starting experiment {exp}") + + # Start docker compose + subprocess.run(["docker", "compose", "up", "-d"], check=True) + + time.sleep(10) + + # Run Flink job + + flink_cmd = [ + "flink", "run", "--pyFiles", "/home/lvanmol/cascade/src,/home/lvanmol/cascade", + "--pyModule", "deathstar_movie_review.demo", "-d", "-p", str(exp['parallelism']) + ] + env = os.environ + env["EXPERIMENT"] = e + subprocess.run(flink_cmd, check=True, env=env) + + # Start benchmark + filename = f"{e}_p-{exp['parallelism']}_mps-{exp['benchmark_args']['messages_per_burst']}.plk" + benchmark_cmd = [ + "python", "-u", "-m", "deathstar_movie_review.start_benchmark", "--output", filename + ] + + for arg, val in exp['benchmark_args'].items(): + benchmark_cmd.append(f"--{arg}") + benchmark_cmd.append(str(val)) + subprocess.run(benchmark_cmd, check=True) + + # Sleep for experiment duration + # print(f"Sleeping for {exp['sleep']} seconds...") + # time.sleep(exp['sleep']) + + # Stop docker compose + subprocess.run(["docker", "compose", "down"], check=True) + + print(f"Experiment completed.") + +print("All experiments completed.")