diff --git a/Task_2_1_2/.gitignore b/Task_2_1_2/.gitignore new file mode 100644 index 0000000..b63da45 --- /dev/null +++ b/Task_2_1_2/.gitignore @@ -0,0 +1,42 @@ +.gradle +build/ +!gradle/wrapper/gradle-wrapper.jar +!**/src/main/**/build/ +!**/src/test/**/build/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/Task_2_1_2/.idea/.gitignore b/Task_2_1_2/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/Task_2_1_2/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/Task_2_1_2/.idea/gradle.xml b/Task_2_1_2/.idea/gradle.xml new file mode 100644 index 0000000..2a65317 --- /dev/null +++ b/Task_2_1_2/.idea/gradle.xml @@ -0,0 +1,17 @@ + + + + + + + \ No newline at end of file diff --git a/Task_2_1_2/.idea/misc.xml b/Task_2_1_2/.idea/misc.xml new file mode 100644 index 0000000..1e34e43 --- /dev/null +++ b/Task_2_1_2/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/Task_2_1_2/.idea/vcs.xml b/Task_2_1_2/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/Task_2_1_2/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Task_2_1_2/build.gradle b/Task_2_1_2/build.gradle new file mode 100644 index 0000000..9c52dbd --- /dev/null +++ b/Task_2_1_2/build.gradle @@ -0,0 +1,27 @@ +plugins { + id 'java' + id 'jacoco' +} + +group = 'ru.nsu.ksadov.find' +version = '1.0-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + testImplementation platform('org.junit:junit-bom:5.10.0') + testImplementation 'org.junit.jupiter:junit-jupiter' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' +} + +test { + useJUnitPlatform() +} + +jacocoTestReport { + reports { + xml.required = true + } +} \ No newline at end of file diff --git a/Task_2_1_2/gradle/wrapper/gradle-wrapper.jar b/Task_2_1_2/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..249e583 Binary files /dev/null and b/Task_2_1_2/gradle/wrapper/gradle-wrapper.jar differ diff --git a/Task_2_1_2/gradle/wrapper/gradle-wrapper.properties b/Task_2_1_2/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..9b74290 --- /dev/null +++ b/Task_2_1_2/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Thu Apr 09 14:47:50 NOVT 2026 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/Task_2_1_2/gradlew b/Task_2_1_2/gradlew new file mode 100755 index 0000000..1b6c787 --- /dev/null +++ b/Task_2_1_2/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/Task_2_1_2/gradlew.bat b/Task_2_1_2/gradlew.bat new file mode 100644 index 0000000..107acd3 --- /dev/null +++ b/Task_2_1_2/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/Task_2_1_2/settings.gradle b/Task_2_1_2/settings.gradle new file mode 100644 index 0000000..891b549 --- /dev/null +++ b/Task_2_1_2/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'Task_2_1_2' \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Announcer.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Announcer.java new file mode 100644 index 0000000..ed2d163 --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Announcer.java @@ -0,0 +1,51 @@ +package ru.nsu.ksadov.find; + +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.MulticastSocket; + +/** + * Implements announce of connection. + */ +public class Announcer implements Runnable { + private final int tcpPort; + private final String message; + private volatile boolean running = true; + + /** + * Instantiates a new Announcer. + */ + public Announcer(int tcpPort) { + this.tcpPort = tcpPort; + this.message = ProtocolConstants.MAGIC_NUMBER + tcpPort; + } + + /** + * . + */ + public void stop() { + running = false; + } + + /** + * Runs the announcer loop to broadcast server presence. + */ + @Override + public void run() { + try (MulticastSocket socket = new MulticastSocket()) { + InetAddress group = InetAddress.getByName(ProtocolConstants.MULTICAST_GROUP); + byte[] buf = message.getBytes(); + + System.out.println("MC started..."); + + while (running) { + DatagramPacket packet = new DatagramPacket(buf, buf.length, group, + ProtocolConstants.MULTICAST_PORT); + socket.send(packet); + Thread.sleep(1000); + } + } catch (Exception e) { + System.err.println("Announcer error: " + e.getMessage()); + } + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/ConnectionHandler.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/ConnectionHandler.java new file mode 100644 index 0000000..64eca5d --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/ConnectionHandler.java @@ -0,0 +1,114 @@ +package ru.nsu.ksadov.find; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Class that implements mechanism of interaction Server with Workers. + */ +public class ConnectionHandler implements Runnable { + private Socket socket; + private long[] array; + private final ConcurrentLinkedQueue queue; + private final AtomicBoolean primeFound; + private final ConcurrentHashMap activeWorkers; + private String workerName; + + /** + * Instantiates a new ConnectionHandler. + */ + public ConnectionHandler(Socket socket, long[] array, + ConcurrentLinkedQueue queue, + AtomicBoolean primeFound, + ConcurrentHashMap activeWorkers, + String workerName) { + this.socket = socket; + this.array = array; + this.queue = queue; + this.primeFound = primeFound; + this.activeWorkers = activeWorkers; + this.workerName = workerName; + } + + /** + * . + */ + @Override + public void run() { + try { + handleConversation(socket); + } catch (IOException e) { + System.out.println("Error handling worker " + workerName + ": " + e.getMessage()); + } finally { + activeWorkers.remove(workerName); + try { + socket.close(); + } catch (IOException e) { + //ignore + } + } + } + + /** + * Handles conversation with the connected worker. + */ + private void handleConversation(Socket socket) throws IOException { + Task currTask = null; + try (DataInputStream in = new DataInputStream(socket.getInputStream()); + DataOutputStream out = new DataOutputStream(socket.getOutputStream())) { + + activeWorkers.put(workerName, "Working"); + + while (!primeFound.get()) { + currTask = queue.poll(); + + if (currTask == null) { + MessageSerializer.sendEndOfWork(out); + break; + } + + long[] chunk = getChunkForTask(currTask); + MessageSerializer.sendTaskChunk(out, currTask, chunk); + + boolean foundNonPrime = in.readBoolean(); + + if (foundNonPrime) { + primeFound.set(true); + activeWorkers.put(workerName, "Found non-prime in [" + + currTask.getStart() + ", " + currTask.getEnd() + "]"); + break; + } + + activeWorkers.put(workerName, "Completed [" + + currTask.getStart() + ", " + currTask.getEnd() + "]"); + + currTask = null; + } + + } catch (IOException e) { + if (currTask != null) { + System.out.println("Worker disconnected. Returning task to queue: [" + + currTask.getStart() + ", " + currTask.getEnd() + "]"); + queue.add(currTask); + } + throw e; + } finally { + activeWorkers.remove(workerName); + } + } + + /** + * Gets the chunk of array for the given task. + */ + private long[] getChunkForTask(Task task) { + int size = task.getEnd() - task.getStart(); + long[] chunk = new long[size]; + System.arraycopy(array, task.getStart(), chunk, 0, size); + return chunk; + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Discoverer.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Discoverer.java new file mode 100644 index 0000000..57957c4 --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Discoverer.java @@ -0,0 +1,47 @@ +package ru.nsu.ksadov.find; + +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.MulticastSocket; + +/** + * Class that implements connection to the Server using socket. + */ +public class Discoverer { + private static final int mulPort = 8888; + private static final String mulGroup = "224.0.0.1"; + private static final String magicN = "server prime port here:"; + + /** + * Discovers the server via multicast. + */ + public static ServerInfo discover() { + try (MulticastSocket socket = new MulticastSocket(mulPort)) { + InetAddress gr = InetAddress.getByName(mulGroup); + + socket.joinGroup(gr); + byte[] buf = new byte[256]; + System.out.println("worker trying to find server "); + + while (true) { + DatagramPacket packet = new DatagramPacket(buf, buf.length); + socket.receive(packet); + + String received = new String(packet.getData(), 0, packet.getLength()); + + if (received.startsWith(magicN)) { + int tcpPort = Integer.parseInt(received.substring(magicN.length())); + InetAddress servAdr = packet.getAddress(); + System.out.println("connected!" + servAdr.getHostAddress() + + " TCP port:" + tcpPort); + socket.leaveGroup(gr); + return new ServerInfo(servAdr, tcpPort); + } + } + } catch (Exception e) { + System.out.println("cannot find server "); + e.printStackTrace(); + return null; + } + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/MessageSerializer.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/MessageSerializer.java new file mode 100644 index 0000000..19a74fa --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/MessageSerializer.java @@ -0,0 +1,54 @@ +package ru.nsu.ksadov.find; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Utility class for message serialization/deserialization. + */ +public class MessageSerializer { + + /** + * Sends a task chunk to the worker. + */ + public static void sendTaskChunk(DataOutputStream out, Task task, long[] chunk) + throws IOException { + out.writeInt(task.getStart()); + out.writeInt(task.getEnd()); + out.writeInt(chunk.length); + for (long value : chunk) { + out.writeLong(value); + } + out.flush(); + } + + /** + * Sends end of work signal. + */ + public static void sendEndOfWork(DataOutputStream out) throws IOException { + out.writeInt(ProtocolConstants.NO_MORE_TASKS); + out.flush(); + } + + /** + * Receives task chunk from server. + * Returns null if no more tasks. + */ + public static TaskChunk receiveTaskChunk(DataInputStream in) throws IOException { + int start = in.readInt(); + if (start == ProtocolConstants.NO_MORE_TASKS) { + return null; + } + + int end = in.readInt(); + int chunkSize = in.readInt(); + long[] chunk = new long[chunkSize]; + + for (int i = 0; i < chunkSize; i++) { + chunk[i] = in.readLong(); + } + + return new TaskChunk(new Task(start, end), chunk); + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/ProtocolConstants.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/ProtocolConstants.java new file mode 100644 index 0000000..5f36687 --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/ProtocolConstants.java @@ -0,0 +1,14 @@ +package ru.nsu.ksadov.find; + +/** + * Protocol constants for client-server communication. + */ +public class ProtocolConstants { + public static final int NO_MORE_TASKS = -1; + public static final int CHECKSUM_ERROR = -2; + public static final int CHUNK_SIZE = 1000; + public static final String MAGIC_NUMBER = "server prime port here:"; + public static final int MULTICAST_PORT = 8888; + public static final String MULTICAST_GROUP = "224.0.0.1"; + public static final int STATUS_PRINT_INTERVAL_MS = 2000; +} \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Server.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Server.java new file mode 100644 index 0000000..97c5e83 --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Server.java @@ -0,0 +1,150 @@ +package ru.nsu.ksadov.find; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Implementation of Server that gives some tasks to the Workers. + */ +public class Server implements Runnable { + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final ServerSocket serverSocket; + private final long[] array; + private final AtomicBoolean primeFound = new AtomicBoolean(false); + private final ConcurrentHashMap activeWorkers = new ConcurrentHashMap<>(); + private final ExecutorService threadPool; + private volatile boolean serverRunning = true; + + /** + * Instantiates a new Server. + */ + public Server(long[] array, int port) throws IOException { + this.array = array; + this.serverSocket = new ServerSocket(port); + this.threadPool = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors() * 2); + initQueue(ProtocolConstants.CHUNK_SIZE); + + Thread announcerThread = new Thread(new Announcer(port)); + announcerThread.setDaemon(true); + announcerThread.start(); + + Thread printerThread = new Thread(() -> printer(activeWorkers)); + printerThread.setDaemon(true); + printerThread.start(); + } + + /** + * Initializes the task queue with chunks. + */ + private void initQueue(int chunkSize) { + for (int i = 0; i < array.length; i += chunkSize) { + int end = Math.min(i + chunkSize, array.length); + queue.add(new Task(i, end)); + } + } + + /** + * Daemon thread that shows info about current state for every 2 seconds. + */ + private void printer(ConcurrentHashMap activeWorkers) { + while (!primeFound.get()) { + System.out.println("\n--- Curr state ---"); + if (activeWorkers.isEmpty()) { + System.out.println("No connected workers."); + } else { + for (String key : activeWorkers.keySet()) { + System.out.println(key + " -> " + activeWorkers.get(key)); + } + } + System.out.println("Tasks remaining: " + queue.size()); + + try { + Thread.sleep(ProtocolConstants.STATUS_PRINT_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + /** + * Runs the server logic. + */ + @Override + public void run() { + System.out.println("Server started..."); + int count = 0; + + try { + while (!primeFound.get() && serverRunning) { + String workerName = "worker" + count; + Socket socket = serverSocket.accept(); + System.out.println("New worker connected: " + workerName); + + ConnectionHandler handler = new ConnectionHandler( + socket, array, queue, primeFound, activeWorkers, workerName); + + threadPool.submit(handler); + count++; + } + } catch (IOException e) { + if (serverRunning && !primeFound.get()) { + System.err.println("Server error: " + e.getMessage()); + e.printStackTrace(); + } + } finally { + serverRunning = false; + shutdown(); + } + } + + /** + * Gracefully shuts down the server. + */ + private void shutdown() { + System.out.println("Shutting down server..."); + try { + serverSocket.close(); + } catch (IOException e) { + //ignore + } + threadPool.shutdown(); + } + + /** + * Gets the result indicating if a prime was found. + */ + public boolean getResult() { + return primeFound.get(); + } + + /** + * Stops the server gracefully. + */ + public void stop() { + serverRunning = false; + try { + if (serverSocket != null && !serverSocket.isClosed()) { + serverSocket.close(); + } + } catch (IOException e) { + //ignore + } + threadPool.shutdownNow(); + } + + /** + * Gets the map of active workers (useful for testing/monitoring). + */ + public ConcurrentHashMap getActiveWorkers() { + return activeWorkers; + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/ServerInfo.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/ServerInfo.java new file mode 100644 index 0000000..43e9ffd --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/ServerInfo.java @@ -0,0 +1,19 @@ +package ru.nsu.ksadov.find; + +import java.net.InetAddress; + +/** + * Class that implements info about the Server. + */ +public class ServerInfo { + public final InetAddress addr; + public final int tcpPort; + + /** + * Instantiates a new ServerInfo. + */ + public ServerInfo(InetAddress inet, int tcpPort) { + this.addr = inet; + this.tcpPort = tcpPort; + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Task.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Task.java new file mode 100644 index 0000000..ef0dcc0 --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Task.java @@ -0,0 +1,31 @@ +package ru.nsu.ksadov.find; + +/** + * Class that implements configuration of the task. + */ +public class Task { + private final int start; + private final int end; + + /** + * Instantiates a new Task. + */ + public Task(int start, int end) { + this.end = end; + this.start = start; + } + + /** + * Gets the start index. + */ + public int getStart() { + return start; + } + + /** + * Gets the end index. + */ + public int getEnd() { + return end; + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/TaskChunk.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/TaskChunk.java new file mode 100644 index 0000000..799170c --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/TaskChunk.java @@ -0,0 +1,22 @@ +package ru.nsu.ksadov.find; + +/** + * Class that holds a task with its corresponding data chunk. + */ +public class TaskChunk { + private final Task task; + private final long[] chunk; + + public TaskChunk(Task task, long[] chunk) { + this.task = task; + this.chunk = chunk; + } + + public Task getTask() { + return task; + } + + public long[] getChunk() { + return chunk; + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Worker.java b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Worker.java new file mode 100644 index 0000000..6437d00 --- /dev/null +++ b/Task_2_1_2/src/main/java/ru/nsu/ksadov/find/Worker.java @@ -0,0 +1,87 @@ +package ru.nsu.ksadov.find; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; + +/** + * Worker is a client that tryes to connect to the Server. + */ +public class Worker { + private final ServerInfo serverInfo; + + /** + * Instantiates a new Worker. + */ + public Worker(ServerInfo info) { + this.serverInfo = info; + } + + /** + * Starts the worker process. + */ + public void start() { + try (Socket socket = new Socket(serverInfo.addr, serverInfo.tcpPort); + DataInputStream in = new DataInputStream(socket.getInputStream()); + DataOutputStream out = new DataOutputStream(socket.getOutputStream())) { + + System.out.println("Successful connect. Data waiting.."); + processTasks(in, out); + System.out.println("All tasks completed"); + + } catch (IOException e) { + System.out.println("Error. Could not connect to server : " + e.getMessage()); + } + } + + /** + * Checks if a number is prime. + */ + public static boolean isPrime(long n) { + if (n < 2) { + return false; + } + for (long i = 2; i * i <= n; i++) { + if (n % i == 0) { + return false; + } + } + return true; + } + + /** + * Processes tasks from the server. + */ + private void processTasks(DataInputStream in, DataOutputStream out) throws IOException { + while (true) { + TaskChunk taskChunk = MessageSerializer.receiveTaskChunk(in); + + if (taskChunk == null) { + break; + } + + Task task = taskChunk.getTask(); + long[] chunk = taskChunk.getChunk(); + + System.out.println("Working with range [" + task.getStart() + ", " + task.getEnd() + + "], chunk size: " + chunk.length); + + boolean found = false; + for (long value : chunk) { + if (!isPrime(value)) { + found = true; + break; + } + } + + out.writeBoolean(found); + out.flush(); + + if (found) { + System.out.println("Non-prime number found in range [" + + task.getStart() + ", " + task.getEnd() + "]"); + } + } + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/DiscovererTest.java b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/DiscovererTest.java new file mode 100644 index 0000000..2b9e77d --- /dev/null +++ b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/DiscovererTest.java @@ -0,0 +1,34 @@ +package ru.nsu.ksadov.find; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.junit.jupiter.api.Test; + +/** + * . + */ +class DiscovererTest { + + @Test + void testAnnouncerAndDiscoverer() throws InterruptedException { + int testTcpPort = 9000; + Thread announcerThread = new Thread(new Announcer(testTcpPort)); + announcerThread.start(); + + Thread.sleep(500); + + final ServerInfo[] foundInfo = new ServerInfo[1]; + Thread discovererThread = new Thread(() -> { + foundInfo[0] = Discoverer.discover(); + }); + discovererThread.start(); + + discovererThread.join(3000); + + announcerThread.interrupt(); + + assertNotNull(foundInfo[0], "Server does not found with Multicast"); + assertEquals(testTcpPort, foundInfo[0].tcpPort, "Wrong port was found"); + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/MessageSerializerTest.java b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/MessageSerializerTest.java new file mode 100644 index 0000000..7d3e81a --- /dev/null +++ b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/MessageSerializerTest.java @@ -0,0 +1,85 @@ +package ru.nsu.ksadov.find; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import org.junit.jupiter.api.Test; + +class MessageSerializerTest { + + @Test + void testTaskChunkSerializationAndDeserialization() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + + Task task = new Task(10, 20); + long[] chunk = {17L, 19L, 23L, 29L, 31L, 37L, 41L, 43L, 47L, 53L}; + + MessageSerializer.sendTaskChunk(out, task, chunk); + out.flush(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream in = new DataInputStream(bais); + TaskChunk received = MessageSerializer.receiveTaskChunk(in); + + assertNotNull(received); + assertEquals(10, received.getTask().getStart()); + assertEquals(20, received.getTask().getEnd()); + assertArrayEquals(chunk, received.getChunk()); + } + + @Test + void testEndOfWorkSignal() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + + MessageSerializer.sendEndOfWork(out); + out.flush(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream in = new DataInputStream(bais); + + TaskChunk received = MessageSerializer.receiveTaskChunk(in); + assertNull(received); + } + + @Test + void testMultipleTasksSequentially() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + + Task task1 = new Task(0, 5); + long[] chunk1 = {2L, 3L, 5L, 7L, 11L}; + MessageSerializer.sendTaskChunk(out, task1, chunk1); + + Task task2 = new Task(5, 10); + long[] chunk2 = {13L, 17L, 19L, 23L, 29L}; + MessageSerializer.sendTaskChunk(out, task2, chunk2); + + MessageSerializer.sendEndOfWork(out); + out.flush(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream in = new DataInputStream(bais); + + TaskChunk r1 = MessageSerializer.receiveTaskChunk(in); + assertNotNull(r1); + assertEquals(0, r1.getTask().getStart()); + assertArrayEquals(chunk1, r1.getChunk()); + + TaskChunk r2 = MessageSerializer.receiveTaskChunk(in); + assertNotNull(r2); + assertEquals(5, r2.getTask().getStart()); + assertArrayEquals(chunk2, r2.getChunk()); + + TaskChunk r3 = MessageSerializer.receiveTaskChunk(in); + assertNull(r3); + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/ServerDisconnectTest.java b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/ServerDisconnectTest.java new file mode 100644 index 0000000..2e9623e --- /dev/null +++ b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/ServerDisconnectTest.java @@ -0,0 +1,53 @@ +package ru.nsu.ksadov.find; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.Arrays; +import org.junit.jupiter.api.Test; + +/** + * . + */ +public class ServerDisconnectTest { + + @Test + public void testWorkerDisconnectRequeuesTask() throws Exception { + long[] array = new long[3000]; + Arrays.fill(array, 7L); + + int port = 9090; + Server server = new Server(array, port); + Thread serverThread = new Thread(server); + serverThread.start(); + + Thread.sleep(500); + + try (Socket badSocket = new Socket("localhost", port); + DataInputStream in = new DataInputStream(badSocket.getInputStream()); + DataOutputStream out = new DataOutputStream(badSocket.getOutputStream())) { + + TaskChunk chunk = MessageSerializer.receiveTaskChunk(in); + assertNotNull(chunk); + } + + Thread.sleep(500); + Worker goodWorker = new Worker(new ServerInfo(java.net.InetAddress.getByName("localhost"), + port)); + Thread workerThread = new Thread(() -> goodWorker.start()); + workerThread.start(); + + Thread.sleep(2000); + boolean actualResult = server.getResult(); + server.stop(); + serverThread.join(); + workerThread.join(); + + assertFalse(server.getResult()); + + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/ServerInfoTest.java b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/ServerInfoTest.java new file mode 100644 index 0000000..7b5f56a --- /dev/null +++ b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/ServerInfoTest.java @@ -0,0 +1,23 @@ +package ru.nsu.ksadov.find; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import org.junit.jupiter.api.Test; + +/** + * . + */ +class ServerInfoTest { + @Test + void testServerInfoCreation() throws UnknownHostException { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 9000; + + ServerInfo info = new ServerInfo(address, port); + + assertEquals(address, info.addr, "IP should be the same"); + assertEquals(port, info.tcpPort, "Ports should be the same"); + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/ServerTest.java b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/ServerTest.java new file mode 100644 index 0000000..b9f4af7 --- /dev/null +++ b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/ServerTest.java @@ -0,0 +1,113 @@ +package ru.nsu.ksadov.find; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.util.Arrays; +import org.junit.jupiter.api.Test; + +/** + * . + */ +class ServerTest { + + @Test + void testServerWorkerCalculations() throws IOException, InterruptedException { + long[] testArray = new long[500]; + Arrays.fill(testArray, 17L); + testArray[499] = 10L; + + int testPort = 9005; + Server server = new Server(testArray, testPort); + Thread serverThread = new Thread(server); + serverThread.start(); + + Thread.sleep(500); + + ServerInfo info = new ServerInfo(InetAddress.getByName("127.0.0.1"), testPort); + Worker worker = new Worker(info); + Thread workerThread = new Thread(worker::start); + workerThread.start(); + + int maxWait = 50; + while (!server.getResult() && maxWait > 0) { + Thread.sleep(100); + maxWait--; + } + + assertTrue(server.getResult()); + assertEquals(0, server.getActiveWorkers().size()); + + workerThread.interrupt(); + server.stop(); + serverThread.join(1000); + } + + @Test + void testWorkerDisconnectionGracefulHandling() throws IOException, InterruptedException { + long[] largePrimeArray = new long[10000]; + Arrays.fill(largePrimeArray, 17L); + + int testPort = 9006; + Server server = new Server(largePrimeArray, testPort); + Thread serverThread = new Thread(server); + serverThread.start(); + Thread.sleep(500); + + Socket workerSocket = new Socket("127.0.0.1", testPort); + DataInputStream in = new DataInputStream(workerSocket.getInputStream()); + + Thread.sleep(1000); + + workerSocket.close(); + Thread.sleep(1000); + + assertTrue(serverThread.isAlive()); + assertFalse(server.getResult()); + + server.stop(); + serverThread.join(1000); + } + + @Test + void testServerContinuesAfterWorkerDisconnect() throws IOException, InterruptedException { + long[] testArray = new long[200]; + Arrays.fill(testArray, 17L); + testArray[150] = 4L; + + int testPort = 9006; + Server server = new Server(testArray, testPort); + Thread serverThread = new Thread(server); + serverThread.start(); + Thread.sleep(500); + + Socket worker1 = new Socket("127.0.0.1", testPort); + Thread.sleep(500); + worker1.close(); + Thread.sleep(500); + + ServerInfo info = new ServerInfo(InetAddress.getByName("127.0.0.1"), testPort); + Worker worker2 = new Worker(info); + Thread worker2Thread = new Thread(worker2::start); + worker2Thread.start(); + + int maxWait = 50; + while (!server.getResult() && maxWait > 0) { + Thread.sleep(100); + maxWait--; + } + + assertTrue(server.getResult()); + assertEquals(0, server.getActiveWorkers().size()); + + worker2Thread.interrupt(); + server.stop(); + serverThread.join(1000); + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/TaskTest.java b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/TaskTest.java new file mode 100644 index 0000000..a9aebd5 --- /dev/null +++ b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/TaskTest.java @@ -0,0 +1,17 @@ +package ru.nsu.ksadov.find; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +/** + * . + */ +class TaskTest { + @Test + void testTaskCreationAndGetters() { + Task task = new Task(10, 20); + assertEquals(10, task.getStart(), "Start equals 10"); + assertEquals(20, task.getEnd(), "End equals 20"); + } +} \ No newline at end of file diff --git a/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/WorkerTest.java b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/WorkerTest.java new file mode 100644 index 0000000..a36f41c --- /dev/null +++ b/Task_2_1_2/src/test/java/ru/nsu/ksadov/find/WorkerTest.java @@ -0,0 +1,26 @@ +package ru.nsu.ksadov.find; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +/** + * . + */ +class WorkerTest { + + @Test + void testIsPrime() { + assertTrue(Worker.isPrime(2)); + assertTrue(Worker.isPrime(17)); + assertTrue(Worker.isPrime(997)); + + assertFalse(Worker.isPrime(0)); + assertFalse(Worker.isPrime(1)); + assertFalse(Worker.isPrime(-5)); + assertFalse(Worker.isPrime(10)); + assertFalse(Worker.isPrime(25)); + } + +} \ No newline at end of file