diff --git a/projects/pom.xml b/projects/pom.xml index 5d14966..e377333 100644 --- a/projects/pom.xml +++ b/projects/pom.xml @@ -30,6 +30,7 @@ dkhurtin ale3otik Pitovsky + sopilnyak diff --git a/projects/sopilnyak/pom.xml b/projects/sopilnyak/pom.xml new file mode 100644 index 0000000..e4a7ea7 --- /dev/null +++ b/projects/sopilnyak/pom.xml @@ -0,0 +1,68 @@ + + 4.0.0 + + + ru.mipt.diht.students + parent + 1.0-SNAPSHOT + + + ru.mipt.diht.students + sopilnyak + 1.0-SNAPSHOT + sopilnyak + http://maven.apache.org + + + UTF-8 + + + + + + maven-assembly-plugin + + + package + + single + + + + + + jar-with-dependencies + + + + ru.mipt.diht.students.sopilnyak.moduletests.Counter + + + + + + + + + + junit + junit + 3.8.1 + test + + + + + org.twitter4j + twitter4j-core + [4.0,) + + + junit + junit + 4.12 + test + + + \ No newline at end of file diff --git a/projects/sopilnyak/src/main/java/ru/mipt/diht/students/sopilnyak/threads/BlockingQueue.java b/projects/sopilnyak/src/main/java/ru/mipt/diht/students/sopilnyak/threads/BlockingQueue.java new file mode 100644 index 0000000..2126e77 --- /dev/null +++ b/projects/sopilnyak/src/main/java/ru/mipt/diht/students/sopilnyak/threads/BlockingQueue.java @@ -0,0 +1,156 @@ +package ru.mipt.diht.students.sopilnyak.threads; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class BlockingQueue { + + private int maxSize; + private Queue queue; + private Lock queueLock = new ReentrantLock(); + private Lock stateChanged = new ReentrantLock(); + private Condition popWait = stateChanged.newCondition(); + private Condition pushWait = stateChanged.newCondition(); + + BlockingQueue(int inputMaxSize) { + this.maxSize = inputMaxSize; + queue = new ArrayDeque<>(); + } + + // push_back to the queue + public final void offer(List e) { + try { + offer(e, 0, false); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + + // pop_front from the queue + public final List take(int n) { + try { + return take(n, 0, false); + } catch (InterruptedException e) { + e.printStackTrace(); + return null; + } + } + + public final void offer(List e, long timeout) throws InterruptedException { + offer(e, timeout, true); + } + + public final List take(int n, long timeout) throws InterruptedException { + return take(n, timeout, true); + } + + private long getTimeout(long timeLimit) throws InterruptedException { + long currTime = System.currentTimeMillis(); + if (currTime > timeLimit) { + throw new InterruptedException("Timeout"); + } + return timeLimit - currTime; + } + + public final void offer(List e, long timeout, boolean needTimeout) throws InterruptedException { + long timeLimit = System.currentTimeMillis() + timeout; + + if (needTimeout) { + if (!stateChanged.tryLock(getTimeout(timeLimit), TimeUnit.MILLISECONDS)) { + throw new InterruptedException("Timeout"); + } + } else { + stateChanged.lock(); + } + + try { + boolean added = false; + while (!added) { + try { + if (needTimeout) { + if (!queueLock.tryLock(getTimeout(timeLimit), TimeUnit.MILLISECONDS)) { + throw new InterruptedException("Timeout"); + } + } else { + queueLock.lock(); + } + + if (queue.size() + e.size() <= maxSize) { + queue.addAll(e); + added = true; + } + } finally { + queueLock.unlock(); + } + + if (!added) { + if (needTimeout) { + if (!popWait.await(getTimeout(timeLimit), TimeUnit.NANOSECONDS)) { + throw new InterruptedException("Timeout"); + } + } else { + popWait.await(); + } + } + } + } finally { + pushWait.signalAll(); + stateChanged.unlock(); + } + } + + public final List take(int n, long timeout, boolean needTimeout) throws InterruptedException { + long timeLimit = System.currentTimeMillis() + timeout; + if (needTimeout) { + if (!stateChanged.tryLock(getTimeout(timeLimit), TimeUnit.MILLISECONDS)) { + throw new InterruptedException("Timeout"); + } + } else { + stateChanged.lock(); + } + + try { + List answer = new ArrayList<>(); + while (answer.size() < n) { + try { + if (needTimeout) { + if (!queueLock.tryLock(getTimeout(timeLimit), TimeUnit.MILLISECONDS)) { + throw new InterruptedException("Timeout"); + } + } else { + queueLock.lock(); + } + if (queue.size() >= n) { + for (int i = 0; i < n; i++) { + answer.add(queue.poll()); + } + } + } finally { + queueLock.unlock(); + if (answer.size() == n) { + return answer; + } + } + if (answer.size() < n) { + if (needTimeout) { + if (!pushWait.await(getTimeout(timeLimit), TimeUnit.NANOSECONDS)) { + throw new InterruptedException("Timeout"); + } + } else { + pushWait.await(); + } + } + } + return answer; + } finally { + popWait.signalAll(); + stateChanged.unlock(); + } + } +} diff --git a/projects/sopilnyak/src/main/java/ru/mipt/diht/students/sopilnyak/threads/Counter.java b/projects/sopilnyak/src/main/java/ru/mipt/diht/students/sopilnyak/threads/Counter.java new file mode 100644 index 0000000..b30666b --- /dev/null +++ b/projects/sopilnyak/src/main/java/ru/mipt/diht/students/sopilnyak/threads/Counter.java @@ -0,0 +1,54 @@ +package ru.mipt.diht.students.sopilnyak.threads; + +public class Counter { + + private static final Object MONITOR = new Object(); + + private static int currentId; + + private static class ThreadCount extends Thread { + + private int id, nextId; + + ThreadCount(int inputId, int inputNextId) { + this.id = inputId; + this.nextId = inputNextId; + } + + @Override + public void run() { + while (true) { + synchronized (MONITOR) { + while (id != currentId) { + try { + MONITOR.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + System.out.println("Thread-" + String.valueOf(id + 1)); + currentId = nextId; + MONITOR.notifyAll(); + } + } + } + } + + public static void main(String[] args) { + int n; + try { + n = Integer.valueOf(args[0]); + if (n <= 0) { + throw new NumberFormatException(); + } + } catch (Exception e) { + System.err.println("Wrong number of threads"); + return; + } + currentId = 0; + for (int i = 0; i < n; i++) { + ThreadCount thread = new ThreadCount(i, (i + 1) % n); + thread.start(); + } + } +} diff --git a/projects/sopilnyak/src/main/java/ru/mipt/diht/students/sopilnyak/threads/Rollcall.java b/projects/sopilnyak/src/main/java/ru/mipt/diht/students/sopilnyak/threads/Rollcall.java new file mode 100644 index 0000000..d4aa352 --- /dev/null +++ b/projects/sopilnyak/src/main/java/ru/mipt/diht/students/sopilnyak/threads/Rollcall.java @@ -0,0 +1,122 @@ +package ru.mipt.diht.students.sopilnyak.threads; + +import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +public class Rollcall { + + private static CyclicBarrier askBarrier; + private static CyclicBarrier waitForAnswerBarrier; + private static CyclicBarrier checkSuccessBarrier; + private static boolean success = false; + private static Object monitor = new Object(); + + private static class CountThread extends Thread { + + private final int probability = 10; + + private boolean result; + private Random random = new Random(); + + @Override + public void run() { + while (true) { + try { + askBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + result = random.nextInt(probability) != 0; + if (result) { + System.out.println("Yes"); + } else { + System.out.println("No"); + } + synchronized (monitor) { + success &= result; + } + try { + waitForAnswerBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + if (success) { + return; + } + try { + checkSuccessBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + } + } + + public static void main(String[] args) { + int n; + try { + n = new Integer(args[0]); + if (n <= 0) { + throw new NumberFormatException(""); + } + } catch (Exception e) { + System.err.println("Wrong number of threads"); + return; + } + + CountThread[] threads = new CountThread[n]; + askBarrier = new CyclicBarrier(n + 1); + waitForAnswerBarrier = new CyclicBarrier(n + 1); + checkSuccessBarrier = new CyclicBarrier(n + 1); + + for (int i = 0; i < n; i++) { + threads[i] = new CountThread(); + threads[i].start(); + } + + while (!success) { + System.out.println("Are you ready?"); + success = true; + try { + askBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + askBarrier.reset(); + try { + waitForAnswerBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + waitForAnswerBarrier.reset(); + if (success) { + break; + } + try { + checkSuccessBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + for (int i = 0; i < n; i++) { + try { + threads[i].join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} diff --git a/projects/sopilnyak/src/test/java/ru/mipt/diht/students/sopilnyak/threads/BlockingQueueTest.java b/projects/sopilnyak/src/test/java/ru/mipt/diht/students/sopilnyak/threads/BlockingQueueTest.java new file mode 100644 index 0000000..03ff1b3 --- /dev/null +++ b/projects/sopilnyak/src/test/java/ru/mipt/diht/students/sopilnyak/threads/BlockingQueueTest.java @@ -0,0 +1,77 @@ +package ru.mipt.diht.students.sopilnyak.threads; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; + +@RunWith(JUnit4.class) +public class BlockingQueueTest { + + BlockingQueue queue; + + @Before + public void setUp() { + queue = new BlockingQueue(10); + } + + @Test + public void testSimpleQueueSingleThread() { + List list = new ArrayList(); + + IntStream.range(0, 10).forEach(t -> list.add(t)); + queue.offer(list); + assertEquals(list, queue.take(10)); + + queue.offer(list); + assertEquals(list, queue.take(10)); + } + + @Test + public void testSimpleQueueMultiThread() { + List list = new ArrayList(); + List ans = new ArrayList(); + + for (int i = 0; i < 10; i++) { + list.add(i); + ans.add(i % 5); + } + + Thread t = new Thread() { + @Override + public void run() { + while (true) { + queue.offer(list.subList(0, 5)); + } + } + }; + + t.start(); + assertEquals(ans, queue.take(10)); + assertEquals(ans, queue.take(10)); + } + + @Test(expected = InterruptedException.class) + public void testTimeouts() throws InterruptedException { + List list = new ArrayList(); + + IntStream.range(0, 15).forEach(t -> list.add(t)); + queue.offer(list, 10); + queue.offer(list, 10); + } + + @Test(timeout = 1000) + public void testTimeoutsCorrect() throws InterruptedException { + List list = new ArrayList(); + + IntStream.range(0, 10).forEach(t -> list.add(t)); + queue.offer(list, 100); + assertEquals(list, queue.take(10, 100)); + } +}