diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/DBTableProvider.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/DBTableProvider.java index edfda2a76..54fa92cda 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/DBTableProvider.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/DBTableProvider.java @@ -21,6 +21,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; public class DBTableProvider implements TableProvider { @@ -65,10 +67,14 @@ public class DBTableProvider implements TableProvider { */ private final Map corruptTables; + /** + * Lock for getting/creating/removing tables access management. + */ + private final ReadWriteLock persistenceLock = new ReentrantReadWriteLock(true); + /** * Constructs a database table provider. - * @throws ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.exception - * .DatabaseIOException + * @throws ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.exception.DatabaseIOException * If failed to scan database directory. */ DBTableProvider(Path databaseRoot) throws DatabaseIOException { @@ -81,16 +87,22 @@ public class DBTableProvider implements TableProvider { @Override public StoreableTableImpl getTable(String name) throws IllegalArgumentException { Utility.checkTableNameIsCorrect(name); - if (tables.containsKey(name)) { - StoreableTableImpl table = tables.get(name); - if (table == null) { - DatabaseIOException corruptionReason = corruptTables.get(name); - throw new IllegalArgumentException( - corruptionReason.getMessage(), corruptionReason); + + persistenceLock.readLock().lock(); + try { + if (tables.containsKey(name)) { + StoreableTableImpl table = tables.get(name); + if (table == null) { + DatabaseIOException corruptionReason = corruptTables.get(name); + throw new IllegalArgumentException( + corruptionReason.getMessage(), corruptionReason); + } + return table; + } else { + return null; } - return table; - } else { - return null; + } finally { + persistenceLock.readLock().unlock(); } } @@ -109,13 +121,18 @@ public StoreableTableImpl createTable(String name, List> columnTypes) Path tablePath = databaseRoot.resolve(name); - if (tables.containsKey(name) && tables.get(name) != null) { - return null; - } + persistenceLock.writeLock().lock(); + try { + if (tables.containsKey(name) && tables.get(name) != null) { + return null; + } - StoreableTableImpl newTable = StoreableTableImpl.createTable(this, tablePath, columnTypes); - tables.put(name, newTable); - return newTable; + StoreableTableImpl newTable = StoreableTableImpl.createTable(this, tablePath, columnTypes); + tables.put(name, newTable); + return newTable; + } finally { + persistenceLock.writeLock().unlock(); + } } @Override @@ -124,31 +141,39 @@ public void removeTable(String name) Utility.checkTableNameIsCorrect(name); Path tablePath = databaseRoot.resolve(name); - if (!tables.containsKey(name)) { - throw new IllegalStateException(name + " not exists"); - } + persistenceLock.writeLock().lock(); + try { + if (!tables.containsKey(name)) { + throw new IllegalStateException(name + " not exists"); + } - StoreableTableImpl removed = tables.remove(name); - if (removed != null) { - removed.invalidate(); - } + StoreableTableImpl removed = tables.remove(name); + if (removed != null) { + // After invalidation all attempts to commit from other threads fail with + // IllegalStateException. Now we can delete the table without fear that it will be written + // to the file system again. + removed.invalidate(); + } - corruptTables.remove(name); + corruptTables.remove(name); - if (!Files.exists(tablePath)) { - return; - } + if (!Files.exists(tablePath)) { + return; + } - try { - Utility.rm(tablePath); - } catch (IOException exc) { - // Mark as corrupt. - tables.put(name, null); + try { + Utility.rm(tablePath); + } catch (IOException exc) { + // Mark as corrupt. + tables.put(name, null); - TableCorruptIOException corruptionReason = new TableCorruptIOException( - name, "Failed to drop table: " + exc.toString(), exc); - corruptTables.put(name, corruptionReason); - throw corruptionReason; + TableCorruptIOException corruptionReason = new TableCorruptIOException( + name, "Failed to drop table: " + exc.toString(), exc); + corruptTables.put(name, corruptionReason); + throw corruptionReason; + } + } finally { + persistenceLock.writeLock().unlock(); } } diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StoreableImpl.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StoreableImpl.java index 8a407bf51..9b5fa3520 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StoreableImpl.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StoreableImpl.java @@ -6,6 +6,10 @@ import java.util.Objects; +/** + * Implementation of Storeable that can be put to the table it is assigned to as a value.
+ * Not thread-safe. + */ public class StoreableImpl implements Storeable { private final Object[] values; @@ -120,16 +124,6 @@ public boolean equals(Object obj) { return false; } - if (host.getColumnsCount() != storeable.host.getColumnsCount()) { - return false; - } - - for (int col = 0; col < host.getColumnsCount(); col++) { - if (!host.getColumnType(col).equals(storeable.host.getColumnType(col))) { - return false; - } - } - for (int col = 0; col < host.getColumnsCount(); col++) { if (!Objects.equals(values[col], storeable.values[col])) { return false; diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StoreableTableImpl.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StoreableTableImpl.java index 3ebc0de7a..0c436737b 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StoreableTableImpl.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StoreableTableImpl.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Scanner; +import java.util.concurrent.atomic.AtomicBoolean; public class StoreableTableImpl implements Table { private static final Map, String> CLASSES_TO_NAMES_MAP = @@ -47,11 +48,11 @@ public class StoreableTableImpl implements Table { private final List> columnTypes; - private boolean invalidated; + private AtomicBoolean invalidated; private StoreableTableImpl(TableProvider provider, StringTableImpl store, List> columnTypes) { this.provider = provider; - this.invalidated = false; + this.invalidated = new AtomicBoolean(false); this.store = store; this.columnTypes = Collections.unmodifiableList(new ArrayList>(columnTypes)); } @@ -155,12 +156,13 @@ static StoreableTableImpl getTable(TableProvider provider, Path tablePath) throw /** * Checks whether the given storeable can be stored in the given table as a value. - * @throws ColumnFormatException + * @throws ru.fizteh.fivt.storage.structured.ColumnFormatException * If columns count differs or some column has wrong type. Note that if some column has null * value, its type cannot be determined. - * @throws java.lang.IllegalStateException + * @throws IllegalStateException * If the given storeable is already assigned to another table. This check can be performed only - * for instances of {@link StoreableTableImpl}. + * for instances of {@link ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.db + * .StoreableTableImpl}. */ public static void checkStoreableAppropriate(Table table, Storeable storeable) throws ColumnFormatException, IllegalStateException { @@ -215,14 +217,21 @@ TableProvider getProvider() { } /** - * Mark this table as invalidated (all further operations throw {@link java.lang.IllegalStateException}). + * Mark this table as invalidated (all further operations throw {@link IllegalStateException}). */ void invalidate() { - invalidated = true; + // We need table's write lock here to sync with file system. + // Remember the table becomes invalidated before being deleted. + store.getPersistenceLock().writeLock().lock(); + try { + invalidated.set(true); + } finally { + store.getPersistenceLock().writeLock().unlock(); + } } private void checkValidity() throws IllegalStateException { - if (invalidated) { + if (invalidated.get()) { throw new IllegalStateException(store.getName() + " is invalidated"); } } diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StringTableImpl.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StringTableImpl.java index 3388c3384..037c3c400 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StringTableImpl.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/StringTableImpl.java @@ -14,8 +14,10 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map.Entry; +import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; /** @@ -33,11 +35,15 @@ public final class StringTableImpl { private final Path tableRoot; private final String tableName; + /** + * Lock for exploit of table and writing to the file system. + */ + private final ReadWriteLock persistenceLock = new ReentrantReadWriteLock(true); /** * Mapping between table parts and local hashes of keys that can be stored inside them. * @see #getHash(String) */ - private HashMap tableParts; + private Map tableParts; /** * Constructor for cloning and safe table creation/obtaining. @@ -138,6 +144,16 @@ public static StringTableImpl getTable(Path tableRoot, Predicate extraFile return table; } + /** + * Get read-write lock for the table.
+ * Write lock is acquired when something is going to be written to the file system (before this data is + * updated using the diff for the calling thread).
+ * In all other cases read lock is acquired. + */ + ReadWriteLock getPersistenceLock() { + return persistenceLock; + } + public Path getTableRoot() { return tableRoot; } @@ -214,50 +230,42 @@ private void checkFileSystem(Predicate filter) throws DatabaseIOException } public void readFromFileSystem() throws DBFileCorruptIOException, TableCorruptIOException { - StringTableImpl thisClone = clone(); - tableParts.clear(); - + persistenceLock.writeLock().lock(); try { - for (int dir = 0; dir < DIRECTORIES_COUNT; dir++) { - for (int file = 0; file < FILES_COUNT; file++) { - int partHash = buildHash(dir, file); - TablePart fmap = new TablePart(makeTablePartFilePath(partHash)); - if (Files.exists(fmap.getTablePartFilePath())) { - fmap.readFromFile(); - } + Map oldTableParts = tableParts; + tableParts = new HashMap<>(); + + try { + for (int dir = 0; dir < DIRECTORIES_COUNT; dir++) { + for (int file = 0; file < FILES_COUNT; file++) { + int partHash = buildHash(dir, file); - // checking keys' hashes - Set keySet = fmap.keySet(); - for (String key : keySet) { - int keyHash = getHash(key); - if (keyHash != partHash) { - throw new TableCorruptIOException( - tableName, "Some keys are stored in improper places"); + TablePart tablePart = new TablePart(makeTablePartFilePath(partHash)); + if (Files.exists(tablePart.getTablePartFilePath())) { + tablePart.readFromFile(); + } + + // checking keys' hashes + Set keySet = tablePart.keySet(); + for (String key : keySet) { + int keyHash = getHash(key); + if (keyHash != partHash) { + throw new TableCorruptIOException( + tableName, "Some keys are stored in improper places"); + } } - } - tableParts.put(partHash, fmap); + tableParts.put(partHash, tablePart); + } } + } catch (Exception exc) { + this.tableParts = oldTableParts; + throw exc; } - } catch (Exception exc) { - this.tableParts = thisClone.tableParts; - throw exc; - } - } - - /** - * Clones the whole table - */ - @Override - protected StringTableImpl clone() { - StringTableImpl cloneTable = new StringTableImpl(tableRoot); - - for (Entry entry : tableParts.entrySet()) { - cloneTable.tableParts.put(entry.getKey(), entry.getValue().clone()); + } finally { + persistenceLock.writeLock().unlock(); } - - return cloneTable; } public String getName() { @@ -265,16 +273,31 @@ public String getName() { } public String get(String key) { - return obtainTablePart(key).get(key); + persistenceLock.readLock().lock(); + try { + return obtainTablePart(key).get(key); + } finally { + persistenceLock.readLock().unlock(); + } } public String put(String key, String value) { Utility.checkNotNull(value, "Value"); - return obtainTablePart(key).put(key, value); + persistenceLock.readLock().lock(); + try { + return obtainTablePart(key).put(key, value); + } finally { + persistenceLock.readLock().unlock(); + } } public String remove(String key) { - return obtainTablePart(key).remove(key); + persistenceLock.readLock().lock(); + try { + return obtainTablePart(key).remove(key); + } finally { + persistenceLock.readLock().unlock(); + } } /** @@ -283,8 +306,13 @@ public String remove(String key) { public int size() { int rowsNumber = 0; - for (TablePart part : tableParts.values()) { - rowsNumber += part.size(); + persistenceLock.readLock().lock(); + try { + for (TablePart part : tableParts.values()) { + rowsNumber += part.size(); + } + } finally { + persistenceLock.readLock().unlock(); } return rowsNumber; @@ -292,16 +320,29 @@ public int size() { public int commit() throws DatabaseIOException { int diffsCount = 0; - for (TablePart part : tableParts.values()) { - diffsCount += part.commit(); + + persistenceLock.writeLock().lock(); + try { + for (TablePart part : tableParts.values()) { + diffsCount += part.commit(); + } + } finally { + persistenceLock.writeLock().unlock(); } return diffsCount; } public int rollback() { int diffsCount = 0; - for (TablePart part : tableParts.values()) { - diffsCount += part.rollback(); + + persistenceLock.readLock().lock(); + try { + for (TablePart part : tableParts.values()) { + diffsCount += part.rollback(); + } + + } finally { + persistenceLock.readLock().unlock(); } return diffsCount; } @@ -312,8 +353,13 @@ public int rollback() { public List list() { List keySet = new LinkedList<>(); - for (TablePart part : tableParts.values()) { - keySet.addAll(part.keySet()); + persistenceLock.readLock().lock(); + try { + for (TablePart part : tableParts.values()) { + keySet.addAll(part.keySet()); + } + } finally { + persistenceLock.readLock().unlock(); } return keySet; @@ -330,7 +376,8 @@ private Path makeTablePartFilePath(int hash) { } /** - * Gets {@link TablePart} instance assigned to this {@code hash} from memory + * Gets {@link ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.db.TablePart} instance assigned to + * this {@code hash} from memory. Not thread-safe. * @param key * key that is hold by desired table. */ @@ -339,6 +386,9 @@ private TablePart obtainTablePart(String key) { return tableParts.get(getHash(key)); } + /** + * Counts number of uncommitted changes for this thread. + */ public int getNumberOfUncommittedChanges() { int diffsCount = 0; for (TablePart part : tableParts.values()) { diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/TablePart.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/TablePart.java index ae272f0eb..1dd810783 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/TablePart.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/db/TablePart.java @@ -15,25 +15,32 @@ import java.nio.file.Path; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; /** * This class represents a table part implemented as usual {@link java.util.HashMap} and stored in a separate - * file. + * file.
+ * This class is not thread-safe. * @author phoenix */ public class TablePart { public static final int READ_BUFFER_SIZE = 16 * 1024; - + /** + * A pair (key, value) describes put. A pair (key, null) describes removal. + */ + private final ThreadLocal> diffMap = + ThreadLocal.withInitial(() -> new HashMap()); private Path tablePartFilePath; - - private HashMap tablePartMap; - - private HashMap lastCommittedMap; + /** + * Map with last changes that are written to the file system.
+ */ + private Map lastCommittedMap; /** - * Private constructor for cloning + * Private constructor for cloning. */ private TablePart() { @@ -51,25 +58,16 @@ public TablePart(Path tablePartFilePath) { this.tablePartFilePath = tablePartFilePath; - tablePartMap = new HashMap<>(); lastCommittedMap = new HashMap<>(); } - /** - * Silently clones the object - no changes in file system are made. - */ - @SuppressWarnings("unchecked") - @Override - public TablePart clone() { - TablePart fmap = new TablePart(); - fmap.tablePartMap = (HashMap) this.tablePartMap.clone(); - fmap.lastCommittedMap = (HashMap) this.lastCommittedMap.clone(); - fmap.tablePartFilePath = this.tablePartFilePath; - return fmap; - } - public String get(String key) { - return tablePartMap.get(key); + if (diffMap.get().containsKey(key)) { + String value = diffMap.get().get(key); + return value; + } else { + return lastCommittedMap.get(key); + } } public Path getTablePartFilePath() { @@ -77,35 +75,33 @@ public Path getTablePartFilePath() { } public Set keySet() { - return tablePartMap.keySet(); + return makeNewActualVersion().keySet(); } public String put(String key, String value) { - return tablePartMap.put(key, value); + String oldValue = get(key); + diffMap.get().put(key, value); + return oldValue; } /** - * Reads database from file system (all previous data is purged).
If an error occurs the - * state before this operation is recovered. + * Reads database from file system (all previous data is purged).
+ * If an error occurs the state before this operation is recovered.
+ * Thread-local uncommitted diffs are not effected.
* @throws ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.exception.DBFileCorruptIOException */ @SuppressWarnings("unchecked") public void readFromFile() throws DBFileCorruptIOException { - /* - * if an exception occurs and database is cloned, recover if cloned - * object is null - no recover is performed. - */ - HashMap cloneDBMap = (HashMap) tablePartMap.clone(); - tablePartMap.clear(); + // For recover purposes. + Map oldLastCommittedMap = lastCommittedMap; + lastCommittedMap = new HashMap<>(); try (DataInputStream stream = new DataInputStream( new FileInputStream( tablePartFilePath.toString()))) { - /* - * structure: (no spaces or newlines) 00<4 - * bytes:offset> 00<4 bytes:offset> ... - * ... - */ + // Structure: (no spaces or newlines) 00<4 + // bytes:offset> 00<4 bytes:offset> ... + // ... byte[] buffer = new byte[1024]; int bufferSize = 0; @@ -170,23 +166,25 @@ public void readFromFile() throws DBFileCorruptIOException { } } - // empty map + // Empty map. if (offsets.isEmpty()) { return; } - // reading values - String currentKey = offsets.get(nextValue); // value matching this - // key is now being - // built - offsets.remove(nextValue); // next value start boundary + // Reading values. + + // Value matching this key is now being built. + String currentKey = offsets.get(nextValue); + + // Next value start boundary. + offsets.remove(nextValue); - // reading up to the last value (exclusive) + // Reading up to the last value (exclusive). while (!offsets.isEmpty()) { nextValue = offsets.firstKey(); String value = new String(buffer, bufferOffset, nextValue - bufferOffset); - tablePartMap.put(currentKey, value); + lastCommittedMap.put(currentKey, value); bufferOffset = nextValue; currentKey = offsets.get(nextValue); @@ -194,37 +192,78 @@ public void readFromFile() throws DBFileCorruptIOException { offsets.remove(nextValue); } - // putting the last value + // Putting the last value. String value = new String(buffer, bufferOffset, bufferSize - bufferOffset); - tablePartMap.put(currentKey, value); + lastCommittedMap.put(currentKey, value); } catch (IOException exc) { - // recover - if (cloneDBMap != null) { - tablePartMap = cloneDBMap; - } + // Recover. + lastCommittedMap = oldLastCommittedMap; throw new DBFileCorruptIOException( "Failed to read data from file: " + tablePartFilePath.toString(), exc); } - // if everything went ok - lastCommittedMap = new HashMap<>(tablePartMap); + // Everything went ok. } public String remove(String key) { - return tablePartMap.remove(key); + if (diffMap.get().containsKey(key)) { + String oldValue = diffMap.get().get(key); + // Already removed. + if (oldValue == null) { + return null; + } else { + // Postponed put will be cancelled. + diffMap.get().remove(key); + return oldValue; + } + } else { + diffMap.get().put(key, null); + return lastCommittedMap.get(key); + } + } + + /** + * Makes a separate up-to-date version which is a commit of the thread diff to the clone of {@link + * #lastCommittedMap}. + * @return Separate actual version. Changes in this instance have not effect on true database state. + */ + private Map makeNewActualVersion() { + return makeActualVersion(new HashMap(lastCommittedMap)); + } + + /** + * Convenience method for private actual version supplying. + * @param actualVersion + * Map to commit thread diffs to. + * @return actualVersion (the same instance) with committed diffs. + */ + private Map makeActualVersion(Map actualVersion) { + for (Entry e : diffMap.get().entrySet()) { + if (e.getValue() == null) { + actualVersion.remove(e.getKey()); + } else { + actualVersion.put(e.getKey(), e.getValue()); + } + } + + return actualVersion; } public int size() { - return tablePartMap.size(); + return makeNewActualVersion().size(); } - public void writeToFile() throws IOException { + /** + * Writes changes to the file. + * @throws IOException + */ + private void writeToFile() throws IOException { ByteArrayOutputStream stream = new ByteArrayOutputStream(1024); - Iterator keyIterator = tablePartMap.keySet().iterator(); + Iterator keyIterator = lastCommittedMap.keySet().iterator(); Charset charset = Charset.forName("UTF-8"); - int[] shiftPositions = new int[tablePartMap.size()]; + int[] shiftPositions = new int[lastCommittedMap.size()]; byte[] intZero = new byte[] {0, 0, 0, 0}; @@ -237,14 +276,14 @@ public void writeToFile() throws IOException { stream.write(intZero); } - int[] links = new int[tablePartMap.size()]; + int[] links = new int[lastCommittedMap.size()]; keyID = 0; - keyIterator = tablePartMap.keySet().iterator(); + keyIterator = lastCommittedMap.keySet().iterator(); while (keyIterator.hasNext()) { links[keyID] = stream.size(); keyID++; - stream.write(tablePartMap.get(keyIterator.next()).getBytes(charset)); + stream.write(lastCommittedMap.get(keyIterator.next()).getBytes(charset)); } byte[] bytes = stream.toByteArray(); @@ -285,7 +324,8 @@ public int commit() throws DatabaseIOException { int diffsCount = getUncommittedChangesCount(); if (diffsCount > 0) { - lastCommittedMap = new HashMap<>(tablePartMap); + makeActualVersion(lastCommittedMap); + diffMap.get().clear(); try { writeToFile(); } catch (IOException exc) { @@ -298,11 +338,11 @@ public int commit() throws DatabaseIOException { public int rollback() { int diffsCount = getUncommittedChangesCount(); - tablePartMap = new HashMap<>(lastCommittedMap); + diffMap.get().clear(); return diffsCount; } public int getUncommittedChangesCount() { - return Utility.countDifferences(lastCommittedMap, tablePartMap); + return diffMap.get().size(); } } diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/AbstractCommand.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/AbstractCommand.java index 70938bd33..5faced748 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/AbstractCommand.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/AbstractCommand.java @@ -20,7 +20,7 @@ public abstract class AbstractCommand implements Command DATABASE_ERROR_HANDLER = @@ -82,7 +82,9 @@ private static Class[] obtainExceptionsThrownByExecuteSafely() { } /** - * In implementation of {@link AbstractCommand} arguments number is checked first and then + * In implementation of {@link ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.shell + * .AbstractCommand} + * arguments number is checked first and then * {@link #executeSafely(SingleDatabaseShellState, String[])} is invoked.
If you want to * disable forced arguments number checking, override this method without invocation super * method and put empty implementation inside {@link #executeSafely(SingleDatabaseShellState, diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/CommandContainer.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/CommandContainer.java index 348e75d0b..499ec99b9 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/CommandContainer.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/CommandContainer.java @@ -6,7 +6,7 @@ * Base interface for class that has a variety of commands suitable for given shell state. * @param * Some class extending ShellState - * @see ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.shell.ShellState + * @see ShellState */ public interface CommandContainer> { Map> getCommands(); diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/Shell.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/Shell.java index 261c14b65..616dccb85 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/Shell.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/shell/Shell.java @@ -61,7 +61,7 @@ public Shell(ShellStateImpl shellState) throws TerminalException { * Commands split by {@link #COMMAND_END_CHARACTER}. * @return List of commands, each command is an array of its parts (space splitters are excluded from * everywhere except quoted parts). - * @throws ParseException + * @throws java.text.ParseException * In case of bad format. */ public static List splitCommandsString(String commandsStr) throws ParseException { diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/ConvenientCollection.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/ConvenientCollection.java index 9b1fe3abc..64b490865 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/ConvenientCollection.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/ConvenientCollection.java @@ -8,7 +8,8 @@ import java.util.stream.Stream; /** - * Usual collection over the given base collection with extension {@link ConvenientCollection#addNext + * Usual collection over the given base collection with extension {@link ru.fizteh.fivt.students + * .fedorov_andrew.databaselibrary.support.ConvenientCollection#addNext * (Object)}. */ public class ConvenientCollection implements Collection { diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/ConvenientMap.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/ConvenientMap.java index 4fe389c04..0240c6248 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/ConvenientMap.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/ConvenientMap.java @@ -8,7 +8,9 @@ import java.util.function.Function; /** - * Usual map extended with method {@link ConvenientMap#putNext(Object, Object)}. + * Usual map extended with method {@link ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.support + * .ConvenientMap#putNext(Object, + * Object)}. */ public class ConvenientMap implements Map { private final Map baseMap; diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/Utility.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/Utility.java index 86cf21e16..b478cd8a9 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/Utility.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/support/Utility.java @@ -92,7 +92,7 @@ public static void removeEmptyFilesAndFolders(Path rootDirectory) throws IOExcep */ public static void rm(final Path removePath) throws IOException { if (Files.isDirectory(removePath)) { - Files.walkFileTree(removePath, new Utility.FileTreeRemover()); + Files.walkFileTree(removePath, new FileTreeRemover()); } else { Files.delete(removePath); } @@ -205,7 +205,7 @@ public static void checkNotNull(Object variable, String name) throws IllegalArgu * @param * Value type in the source map. * @return An inversed map. It is not guaranteed that it is instance of the same class as source map has. - * @throws java.lang.IllegalArgumentException + * @throws IllegalArgumentException * If there are two keys having the same values. * @see Object#equals(Object) */ @@ -246,7 +246,8 @@ public static String getQuotedStringRegex(String quotes, String escapeSequence) * @param escapeSequence * Escape sequence. Quotes and this sequence occurrences will be prepended by escape sequence. * @return Endcoded string inside quotes. Returns null for null string. - * @see Utility#unquoteString(String, String, String) + * @see ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.support.Utility#unquoteString(String, + * String, String) */ public static String quoteString(String s, String quoteSequence, String escapeSequence) { if (s == null) { @@ -260,7 +261,9 @@ public static String quoteString(String s, String quoteSequence, String escapeSe } /** - * Decodes a quoted via {@link Utility#quoteString(String, String, String)} method string. + * Decodes a quoted via {@link ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.support + * .Utility#quoteString(String, + * String, String)} method string. * @param s * Quoted string (must start and end with quote sequence). * @param quoteSequence diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/ControllableRunnerTest.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/ControllableRunnerTest.java new file mode 100644 index 000000000..1b4243b58 --- /dev/null +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/ControllableRunnerTest.java @@ -0,0 +1,118 @@ +package ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel.ControllableAgent; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel.ControllableRunnable; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel.ControllableRunner; + +import static org.junit.Assert.*; + +@RunWith(JUnit4.class) +public class ControllableRunnerTest extends DuplicatedIOTestBase { + private static final String NEW_LINE = System.lineSeparator(); + + @Test + public void testWaitForEndOfWork() throws Exception { + ControllableRunner runner = new ControllableRunner(); + ControllableRunnable runnable = runner.createControllable( + (ControllableAgent agent) -> { + try { + Thread.sleep(2000L); + } catch (InterruptedException exc) { + throw new AssertionError(exc); + } + System.err.println("Hello from runnable"); + }); + runner.assignRunnable(runnable); + + new Thread(runner, "Runnable").start(); + runner.waitUntilPause(); + + System.err.println("After execution"); + + assertEquals("Hello from runnable" + NEW_LINE + "After execution" + NEW_LINE, getOutput()); + } + + @Test + public void testWaitForEndOfWork1() throws Exception { + ControllableRunner runner = new ControllableRunner(); + ControllableRunnable runnable = runner.createControllable( + (ControllableAgent agent) -> { + System.err.println("Hello from runnable"); + }); + runner.assignRunnable(runnable); + + new Thread(runner, "Runnable").start(); + + // Possibly giving time to finish; + Thread.sleep(2000L); + runner.waitUntilPause(); + + System.err.println("After execution"); + + assertEquals("Hello from runnable" + NEW_LINE + "After execution" + NEW_LINE, getOutput()); + } + + @Test + public void testContinueInCheckpoint() throws Throwable { + ControllableRunner runner = new ControllableRunner(); + ControllableRunnable runnable = runner.createControllable( + (ControllableAgent agent) -> { + System.err.println("Before checkpoint"); + try { + agent.notifyAndWait(); + } catch (InterruptedException exc) { + throw new AssertionError(exc); + } + System.err.println("After checkpoint"); + }); + runner.assignRunnable(runnable); + + new Thread(runner, "Runnable").start(); + runner.waitUntilPause(); + System.err.println("In checkpoint"); + runner.continueWork(); + runner.waitUntilPause(); + System.err.println("After execution"); + + assertEquals( + String.join( + NEW_LINE, + "Before checkpoint", + "In checkpoint", + "After checkpoint", + "After execution", + ""), getOutput()); + } + + @Test + public void testInterruptInCheckpoint() throws InterruptedException, Exception { + ControllableRunner runner = new ControllableRunner(); + ControllableRunnable runnable = runner.createControllable( + (ControllableAgent agent) -> { + System.err.println("Before checkpoint"); + try { + agent.notifyAndWait(); + } catch (InterruptedException exc) { + System.err.println("Interrupted"); + return; + } + System.err.println("Must not be written"); + }); + runner.assignRunnable(runnable); + + new Thread(runner, "Runnable").start(); + runner.waitUntilPause(); + System.err.println("In checkpoint"); + runner.interruptWork(); + runner.waitUntilPause(); // Waiting until execution ends. + System.err.println("After execution"); + + assertEquals( + String.join( + NEW_LINE, "Before checkpoint", "In checkpoint", "Interrupted", "After execution", ""), + getOutput()); + } +} diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/DatabaseShellTest.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/DatabaseShellTest.java index 7f80fcde6..9f1d6a323 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/DatabaseShellTest.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/DatabaseShellTest.java @@ -230,9 +230,13 @@ public void testInteractiveMode1() throws TerminalException { createAndUseTable(table); runBatchExpectZero( - false, "put a [\"b\"]; put b [\"c\"]; put c [\"d\"]; put d [\"e\"]; put e [\"a\"]; exit"); + "put a [\"b\"]; put b [\"c\"]; put c [\"d\"]; put d [\"e\"]; put e [\"a\"]; exit"); runInteractiveExpectZero( - "use " + table, "show tables", "use " + fakeTable + "; list", "use " + table + "; list"); + true, + "use " + table, + "show tables", + "use " + fakeTable + "; list", + "use " + table + "; list"); String regex = makeTerminalExpectedRegex( GREETING_REGEX, diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/DuplicatedIOTestBase.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/DuplicatedIOTestBase.java new file mode 100644 index 000000000..22d2edbe9 --- /dev/null +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/DuplicatedIOTestBase.java @@ -0,0 +1,94 @@ +package ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.BAOSDuplicator; + +import java.io.PrintStream; + +/** + * Test base for convenient output tracking. {@link ru.fizteh.fivt.students.fedorov_andrew.databaselibrary + * .test.support.BAOSDuplicator} + * is used for output duplicating. + */ +@Ignore +public class DuplicatedIOTestBase { + protected static PrintStream stdErr; + // Standard out and error streams are stored here. + private static PrintStream stdOut; + + private static BAOSDuplicator out; + + /** + * Sets standard output and error stream as {@link ru.fizteh.fivt.students.fedorov_andrew + * .databaselibrary.test.support.BAOSDuplicator}. + */ + @BeforeClass + public static void globalPrepareDuplicatedIOTestBase() { + stdOut = System.out; + stdErr = System.err; + out = new BAOSDuplicator(stdOut); + + // Wrap over {@link #out} that is used as {@link System#out} and {@link System#err}. + PrintStream outAndErrPrintStream = new PrintStream(out); + System.setOut(outAndErrPrintStream); + System.setErr(outAndErrPrintStream); + } + + /** + * Recovers standard output and error streams. + */ + @AfterClass + public static void globalCleanupDuplicatedIOTestBase() { + System.setOut(stdOut); + System.setErr(stdErr); + } + + /** + * Obtains output from the buffer. + */ + public String getOutput() { + return out.toString(); + } + + public void printDirectlyToStdOut(Object obj) { + stdOut.print(obj); + } + + public void printlnDirectlyToStdOut(Object obj) { + stdOut.println(obj); + } + + public void printDirectlyToStdOut(String str) { + stdOut.print(str); + } + + public void printlnDirectlyToStdOut(String str) { + stdOut.println(str); + } + + public void printlnDirectlyToStdOut() { + stdOut.println(); + } + + /** + * Resets output in the buffer. + */ + @Before + public void prepare() { + out.reset(); + } + + /** + * Prints to the standard output test separator string. + */ + @After + public void cleanup() { + printlnDirectlyToStdOut(); + printlnDirectlyToStdOut("-------------------------------------------------"); + } + +} diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/InterpreterTestBase.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/InterpreterTestBase.java index b635ccc48..29119072b 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/InterpreterTestBase.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/InterpreterTestBase.java @@ -9,12 +9,9 @@ import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.exception.TerminalException; import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.shell.Shell; import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.shell.ShellState; -import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.BAOSDuplicator; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.PrintStream; import java.util.Arrays; import static org.junit.Assert.*; @@ -28,13 +25,8 @@ public abstract class InterpreterTestBase constructInterpreter() throws TerminalException; @@ -79,8 +61,7 @@ public void prepare() throws TerminalException { @After public void cleanup() throws IOException { interpreter = null; - stdOut.println(); - stdOut.println("-------------------------------------------------"); + IO_DUPLICATOR.cleanup(); } /** @@ -113,7 +94,7 @@ protected String makeTerminalExpectedRegex(String greetingRegex, String... repor * Obtains everything that was output by the interpreter.
*/ protected String getOutput() { - return out.toString(); + return IO_DUPLICATOR.getOutput(); } /** @@ -128,9 +109,9 @@ protected String getOutput() { */ protected int runBatch(boolean reinit, String... commands) throws TerminalException { // Clean what has been output before. - out.reset(); + IO_DUPLICATOR.prepare(); - stdOut.println(Arrays.toString(commands)); + IO_DUPLICATOR.printlnDirectlyToStdOut(Arrays.toString(commands)); for (int i = 0, len = commands.length; i < len; i++) { commands[i] = commands[i].trim(); if (!commands[i].endsWith(";")) { @@ -159,10 +140,10 @@ protected int runBatch(boolean reinit, String... commands) throws TerminalExcept * @throws TerminalException */ protected int runInteractive(boolean reinit, String... lines) throws TerminalException { - out.reset(); + IO_DUPLICATOR.prepare(); for (String cmd : lines) { - stdOut.println(cmd); + IO_DUPLICATOR.printlnDirectlyToStdOut(cmd); } StringBuilder sb = new StringBuilder(); for (String cmd : lines) { diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/ReadWriteTest.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/ReadWriteTest.java index 1a5d9a41a..5ebccace6 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/ReadWriteTest.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/ReadWriteTest.java @@ -49,13 +49,15 @@ private void performReadWriteFileMapTest(int keysMin, } Path testPath = Paths.get(System.getProperty("user.home"), "test", "java_test.dat"); + Files.deleteIfExists(testPath); + TablePart testFileMap = new TablePart(testPath); for (Entry e : map.entrySet()) { testFileMap.put(e.getKey(), e.getValue()); } - testFileMap.writeToFile(); + testFileMap.commit(); testFileMap = new TablePart(testPath); diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/StoreableTest.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/StoreableTest.java index 479323169..a516ea20e 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/StoreableTest.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/StoreableTest.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.List; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; @@ -24,6 +25,8 @@ @RunWith(JUnit4.class) public class StoreableTest extends TestBase { private static final String TABLE_NAME = "table"; + private static final List> TABLE_COLUMN_TYPES = Arrays.asList( + String.class, Integer.class, Double.class, Float.class, Boolean.class, Byte.class, Long.class); private static TableProviderFactory factory; @Rule public ExpectedException exception = ExpectedException.none(); @@ -39,15 +42,7 @@ public static void globalPrepare() { @Before public void prepare() throws IOException { provider = factory.create(DB_ROOT.toString()); - table = provider.createTable( - TABLE_NAME, Arrays.asList( - String.class, - Integer.class, - Double.class, - Float.class, - Boolean.class, - Byte.class, - Long.class)); + table = provider.createTable(TABLE_NAME, TABLE_COLUMN_TYPES); storeable = provider.createFor(table); } @@ -58,6 +53,22 @@ public void cleanup() throws IOException { table = null; } + @Test + public void testStoreableEquals() throws IOException { + Table table2 = provider.createTable(TABLE_NAME + "2", TABLE_COLUMN_TYPES); + assertNotEquals(storeable, provider.createFor(table2)); + } + + @Test + public void testStoreableEquals1() throws IOException { + Storeable storeable2 = provider.createFor(table); + + storeable.setColumnAt(0, "string1"); + storeable2.setColumnAt(0, "string2"); + + assertNotEquals(storeable, storeable2); + } + @Test public void testPutStringToInt() { exception.expect(ColumnFormatException.class); diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableProviderFactoryTest.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableProviderFactoryTest.java index f13dd5462..b6cd80c05 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableProviderFactoryTest.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableProviderFactoryTest.java @@ -16,7 +16,7 @@ import static org.hamcrest.CoreMatchers.*; /** - * Tests {@link TableProviderFactory } mostly for error cases. + * Tests {@link ru.fizteh.fivt.storage.structured.TableProviderFactory } mostly for error cases. * @author phoenix */ @RunWith(org.junit.runners.JUnit4.class) diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableProviderTest.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableProviderTest.java index 248c790a0..d84630bb2 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableProviderTest.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableProviderTest.java @@ -1,5 +1,6 @@ package ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test; +import junit.framework.AssertionFailedError; import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; @@ -16,6 +17,10 @@ import ru.fizteh.fivt.storage.structured.TableProviderFactory; import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.db.DBTableProviderFactory; import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.db.StringTableImpl; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.TestUtils; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel.ControllableAgent; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel.ControllableRunnable; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel.ControllableRunner; import java.io.IOException; import java.io.PrintWriter; @@ -30,6 +35,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; @@ -91,6 +97,71 @@ private void expectJSONRegexMatchFailure() { "Does not match JSON simple list regular expression"))); } + @Test + public void testConcurrentCreateTable() throws Exception { + final String tableName = "table"; + + class TableCreator extends ControllableRunnable { + volatile Table createdTable; + volatile Table gotTable; + + public TableCreator(ControllableRunner host) { + super(host); + } + + @Override + public void runWithFreedom(ControllableAgent agent) throws Exception, AssertionError { + TestUtils.consumeCPU(ThreadLocalRandom.current().nextInt(20, 40)); + System.err.println("Attempt to create table"); + createdTable = provider.createTable(tableName, DEFAULT_COLUMN_TYPES); + gotTable = provider.getTable(tableName); + } + } + + int threadsCount = 26; + + ControllableRunner[] runners = new ControllableRunner[threadsCount]; + + for (int i = 0; i < threadsCount; i++) { + runners[i] = new ControllableRunner(); + runners[i].assignRunnable(new TableCreator(runners[i])); + } + + for (int i = 0; i < threadsCount; i++) { + new Thread(runners[i], "Runner " + i).start(); + } + + for (int i = 0; i < threadsCount; i++) { + runners[i].waitUntilEndOfWork(); + } + + // All gotTable must be equal. + // One createdTable must be equal to any gotTable, all other createdTables must be null. + + Table gotTable = ((TableCreator) runners[0].getRunnable()).gotTable; + + for (int i = 1; i < threadsCount; i++) { + assertTrue( + "All links for gotTable must be the same", + ((TableCreator) runners[i].getRunnable()).gotTable == gotTable); + } + + boolean foundCreated = false; + + for (int i = 0; i < threadsCount; i++) { + Table createdTable = ((TableCreator) runners[i].getRunnable()).createdTable; + if (createdTable != null) { + if (foundCreated) { + throw new AssertionFailedError("More then one created table"); + } else { + foundCreated = true; + } + } + } + + assertTrue("Must be one created table", foundCreated); + } + @Test public void testDeserialize() throws IOException, ParseException { Table table = createTable(Long.class, Byte.class, String.class); diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableTest.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableTest.java index b067444ab..3426ba318 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableTest.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/TableTest.java @@ -14,12 +14,17 @@ import ru.fizteh.fivt.storage.structured.TableProviderFactory; import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.db.StoreableTableImpl; import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.TestUtils; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel.ControllableAgent; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel.ControllableRunnable; +import ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel.ControllableRunner; import java.io.IOException; import java.io.PrintWriter; import java.text.ParseException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; @@ -153,7 +158,109 @@ public void testNumberOfUncommittedChanges4() throws ParseException, IOException remove("key"); put("key", "value"); - assertEquals(0, table.getNumberOfUncommittedChanges()); + assertEquals(1, table.getNumberOfUncommittedChanges()); + } + + @Test + public void testRollbackConcurrent() throws Throwable { + ControllableRunner runnerA = new ControllableRunner(); + ControllableRunner runnerB = new ControllableRunner(); + + // Scheme: + // runnerA: put values + // runnerB: put values, rollback + // runnerA: check own changes still exist + + ControllableRunnable contrA = runnerA.createAndAssign( + (ControllableAgent agent) -> { + // runnerA: put values + put("a", "b"); + put("b", "c"); + + agent.notifyAndWait(); + + // runnerA: check own changes still exist + int changes = table.getNumberOfUncommittedChanges(); + assertEquals("Changes of another thread must not have been rolled back", 2, changes); + + }); + + ControllableRunnable contrB = runnerB.createAndAssign( + (ControllableAgent agent) -> { + // runnerB: put values, rollback + put("a", "b"); + put("c", "d"); + int changes = table.rollback(); + assertEquals("Must have rolled back my changes", 2, changes); + }); + + new Thread(runnerA, "runnerA").start(); + new Thread(runnerB, "runnerB").start(); + + runnerA.waitUntilPause(); // Wait until pause. + runnerB.waitUntilEndOfWork(); // Wait until end of work. + runnerA.continueWork(); + runnerA.waitUntilEndOfWork(); // Wait until end of work. + } + + @Test + public void testCommitGlobalUpdate() throws Throwable { + ControllableRunner runnerA = new ControllableRunner(); + ControllableRunner runnerB = new ControllableRunner(); + + ControllableRunnable contrA = runnerA.createAndAssign( + (ControllableAgent agent) -> { + // runnerA: put values + put("a", "b"); + put("b", "c"); + + agent.notifyAndWait(); + + // runnerA: commit + // System.err.println("A: commit"); + int changes = table.commit(); + assertEquals("Must have committed my changes.", 2, changes); + + agent.notifyAndWait(); + + // runnerA: get values + assertNull("Must have been removed.", get("b")); + assertEquals("Committed from another thread.", "d", get("c")); + assertEquals("Committed from me earlier.", "b", get("a")); + }); + + ControllableRunnable contrB = runnerB.createAndAssign( + (ControllableAgent agent) -> { + // runnerB: get values, put values, remove values + assertNull("Must not have been committed.", get("a")); + assertNull("Must not have been committed.", get("b")); + + put("c", "d"); + remove("b"); + + agent.notifyAndWait(); + + // runnerB: get values, commit + assertNull("Committed but replaced", get("b")); + assertEquals("Committed from another thread.", "b", get("a")); + int changes = table.commit(); + assertEquals("Must have committed my changes.", 2, changes); + }); + + // runnerA: put values + // runnerB: get values, put values, remove values + // runnerA: commit + // runnerB: get values, commit + // runnerA: get values + + new Thread(runnerA, "runnerA").start(); + runnerA.waitUntilPause(); + new Thread(runnerB, "runnerB").start(); + runnerB.waitUntilPause(); + runnerA.continueWork(); + runnerA.waitUntilPause(); + runnerB.waitUntilEndOfWork(); + runnerA.waitUntilEndOfWork(); } @Test @@ -166,6 +273,69 @@ public void testNumberOfUncommittedChanges5() throws ParseException, IOException assertEquals(1, table.getNumberOfUncommittedChanges()); } + @Test + public void testManyConcurrentCommits() throws Exception { + int threadsNumber = TestUtils.ALPHABET.length; + + ControllableRunner[] runners = new ControllableRunner[threadsNumber]; + + for (int i = 0; i < threadsNumber; i++) { + final int id = i; + runners[i] = new ControllableRunner(); + runners[i].createAndAssign( + (ControllableAgent agent) -> { + final ThreadLocalRandom random = ThreadLocalRandom.current(); + + Consumer trashFiller = (actionsCount) -> { + try { + System.err.println( + Thread.currentThread().getName() + ": doing " + actionsCount + + " actions"); + for (int j = 0; j < actionsCount; j++) { + put( + String.valueOf(random.nextInt(100)), + String.valueOf(random.nextInt())); + if (random.nextInt(10) < 5) { + table.commit(); + } + } + } catch (IOException | ParseException exc) { + throw new AssertionError(exc); + } + }; + + trashFiller.accept(random.nextInt(20, 40)); + + String mainKey = "key" + TestUtils.ALPHABET[id]; + String mainValue = "value " + mainKey; + + put(mainKey, mainValue); + table.commit(); + assertEquals("Main key-value not matches", mainValue, get(mainKey)); + + trashFiller.accept(random.nextInt(20, 40)); + + agent.notifyAndWait(); + + assertEquals("Main key-value not matches", mainValue, get(mainKey)); + }); + } + + for (int i = 0; i < threadsNumber; i++) { + new Thread(runners[i], "runner " + i).start(); + } + + // Going to the finish line... + for (ControllableRunner runner : runners) { + runner.waitUntilPause(); + } + + // Total check in the end. + for (ControllableRunner runner : runners) { + runner.waitUntilEndOfWork(); + } + } + @Test public void testPutOneStoreableToAnotherTable() throws IOException { Table table2 = provider.createTable(TABLE_NAME + "2", DEFAULT_COLUMN_TYPES); diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/TestUtils.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/TestUtils.java index 0c8f8597c..9f1fb92ea 100644 --- a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/TestUtils.java +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/TestUtils.java @@ -10,6 +10,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; /** * This class provides some utility methods for testing.
Note that some methods are linking to @@ -18,7 +19,7 @@ * directly. */ public class TestUtils { - private static final char[] ALPHABET = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + public static final char[] ALPHABET = "abcdefghijklmnopqrstuvwxyz".toCharArray(); private static final Random RANDOM = new Random(); // Not for constructing @@ -41,6 +42,15 @@ public static String randString(int length) { return String.valueOf(data); } + public static void consumeCPU(int actionsCount) { + int sum = 0; + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < actionsCount; i++) { + sum += random.nextInt(123512); + } + random.nextInt(sum); + } + public static TableProviderFactory obtainFactory() { return new DBTableProviderFactory(); } diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ControllableAgent.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ControllableAgent.java new file mode 100644 index 000000000..0cd93277c --- /dev/null +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ControllableAgent.java @@ -0,0 +1,14 @@ +package ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel; + +/** + * Interface that lets the controllable runnable to notify all waiting threads that the pause has come and + * wait until any of the threads decides whether to continue or interrupt the execution of the runnable. + */ +@FunctionalInterface +public interface ControllableAgent { + /** + * Call this method if you want to make a pause. + * @throws InterruptedException + */ + void notifyAndWait() throws InterruptedException; +} diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ControllableRunnable.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ControllableRunnable.java new file mode 100644 index 000000000..04250cbbb --- /dev/null +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ControllableRunnable.java @@ -0,0 +1,51 @@ +package ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel; + +/** + * Base class for runnables served by this runner. + */ +public abstract class ControllableRunnable implements Runnable, ControllableAgent, ExceptionFreeRunnable { + private final ControllableRunner host; + + private volatile Throwable exception; + + public ControllableRunnable(ControllableRunner host) { + this.host = host; + } + + @Override + public final void run() { + synchronized (this) { + exception = null; + } + try { + runWithFreedom(this::notifyAndWait); + } catch (Exception | AssertionError exc) { + synchronized (this) { + exception = exc; + } + } + } + + /** + * Call this method after execution finishes. If an {@link java.lang.Exception} or {@link + * java.lang.AssertionError} has occurred during execution, it will + * be rethrown. + * @throws Exception + */ + public final synchronized void checkException() throws Exception, AssertionError { + if (exception != null) { + if (exception instanceof Exception) { + throw (Exception) exception; + } else if (exception instanceof AssertionError) { + throw (AssertionError) exception; + } else { + throw new Error("Some fatal exception occurred during thread execution", exception); + } + } + } + + @Override + public final void notifyAndWait() throws InterruptedException { + host.onControllablePause(this); + } +} diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ControllableRunner.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ControllableRunner.java new file mode 100644 index 000000000..16effbb9b --- /dev/null +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ControllableRunner.java @@ -0,0 +1,230 @@ +package ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel; + +/** + * Runnable that consumes some collections of runnables that must be executed sequentially and executes them + * so that you can track whether some part has been executed or not. + */ +public class ControllableRunner implements Runnable { + private static final int ORDER_NOT_SET = -1; + private static final int ORDER_TERMINATE = 0; + private static final int ORDER_CONTINUE = 1; + + private static final int STATUS_NOT_STARTED = -1; + private static final int STATUS_FINISHED = 0; + private static final int STATUS_STARTED = 1; + + private volatile ControllableRunnable runnable; + + /** + * Order from the observer: continue or terminate. Can be also not set. + */ + private volatile int order; + + /** + * Current status: not started/started/finished. + */ + private volatile int status; + + /** + * Creates a new instance of this runner. + */ + public ControllableRunner() { + order = ORDER_TERMINATE; + status = STATUS_NOT_STARTED; + } + + /** + * Assign the given runnable to execute it once. You can perform this action if you have never assigned + * runnable to this runner before or the last assigned runnable has finished its execution. + * @throws java.lang.IllegalStateException + * If you cannot assign any runnables now. + */ + public synchronized void assignRunnable(ControllableRunnable runnable) throws IllegalStateException { + if (status == STATUS_STARTED) { + throw new IllegalStateException("Runnable has been already assigned and has not been finished"); + } + this.runnable = runnable; + status = STATUS_NOT_STARTED; + } + + /** + * Get the currently assigned controllable runnable. + */ + public ControllableRunnable getRunnable() { + return runnable; + } + + /** + * Creates and attempts to assign the alternate-kind runnable. + * @param runnable + * Alternate kind of runnable. + */ + public synchronized ControllableRunnable createAndAssign(ExceptionFreeRunnable runnable) { + ControllableRunnable controllable = createControllable(runnable); + assignRunnable(controllable); + return controllable; + } + + /** + * @throws IllegalStateException + * You can call run() only if status = unstarted. + */ + @Override + public synchronized void run() throws IllegalStateException { + checkRunnableAssigned(); + + if (status != STATUS_NOT_STARTED) { + throw new IllegalStateException("Can run only if status is: not started"); + } + + status = STATUS_STARTED; + order = ORDER_CONTINUE; + + // System.err.println("Starting"); + try { + runnable.run(); + } finally { + // System.err.println("Finishing"); + status = STATUS_FINISHED; + order = ORDER_TERMINATE; + notifyAll(); + } + } + + /** + * Creates a controllable runnable that will work this this runner. + * @param runnable + * alternate form of runnable - function that takes notification agent as an argument. + */ + public ControllableRunnable createControllable(ExceptionFreeRunnable runnable) { + return new ControllableRunnable(this) { + @Override + public void runWithFreedom(ControllableAgent agent) throws Exception { + runnable.runWithFreedom(agent); + } + }; + } + + /** + * Call this method if you want to wait until the next pause and play the role of observer.
+ * If there are no checkpoints expected in future, this method waits until execution ends. + * @throws InterruptedException + * @throws java.lang.Exception + * If thrown during runnable execution. + * @throws java.lang.AssertionError + * If thrown during runnable execution. + */ + public synchronized void waitUntilPause() throws InterruptedException, Exception, AssertionError { + // System.err.println(hashCode() + ": waiting until pause"); + while (order != ORDER_NOT_SET && status != STATUS_FINISHED) { + wait(); + } + runnable.checkException(); + } + + /** + * Call this method if you want to wait until execution ends. Returns immediately if the runnable is not + * set. All pauses that can be met in the future will be ignored with order {@link #continueWork()}.
+ * throws exception. + * @throws InterruptedException + * @throws java.lang.Exception + * If thrown during runnable execution. + * @throws java.lang.AssertionError + * If thrown during runnable execution. + */ + public synchronized void waitUntilEndOfWork() throws InterruptedException, Exception, AssertionError { + while (status != STATUS_FINISHED) { + if (order == ORDER_NOT_SET) { + continueWork(); + } + wait(); + } + runnable.checkException(); + } + + /** + * This method is called by {@link ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support + * .support.parallel.ControllableRunnable} + * when it wants to pause and wait until the observer decides whether to continue work or interrupt. + * @throws InterruptedException + */ + synchronized void onControllablePause(ControllableRunnable pausingRunnable) throws InterruptedException { + checkStatusIsStarted(); + if (runnable != pausingRunnable) { + throw new IllegalStateException( + "Controllable runner can handle only one runnable at a time. Do not execute more than " + + "one runnable assigned to the same controllable runner in parallel."); + } + + order = ORDER_NOT_SET; + notifyAll(); + while (order == ORDER_NOT_SET) { + wait(); + } + if (order == ORDER_TERMINATE) { + throw new InterruptedException("Terminated after pause because of user command."); + } + + // For all runners that want to wait did really wait + order = ORDER_NOT_SET; + } + + public synchronized boolean isRunnableAssigned() { + return runnable != null; + } + + private synchronized void checkRunnableAssigned() { + if (!isRunnableAssigned()) { + throw new IllegalStateException("Runnable has not been assigned"); + } + } + + private synchronized void checkStatusIsStarted() throws IllegalStateException { + if (status != STATUS_STARTED) { + throw new IllegalStateException("Can perform this only if status = started"); + } + } + + /** + * Call this method after waiting in {@link #waitUntilPause()} to tell the runnable to continue work. + * @throws IllegalStateException + * If execution has finished. + * @throws java.lang.Exception + * If thrown during runnable execution. + * @throws java.lang.AssertionError + * If thrown during runnable execution. + */ + public synchronized void continueWork() throws IllegalStateException { + checkRunnableAssigned(); + checkStatusIsStarted(); + + // System.err.println(hashCode() + ": continue work"); + + if (order == ORDER_NOT_SET) { + order = ORDER_CONTINUE; + notifyAll(); + } else { + throw new IllegalStateException( + "Cannot manage the runnable now, because it is not in paused state."); + } + } + + /** + * Call this method after waiting in {@link #waitUntilPause()} to tell the runnable to interrupt work. + * @throws IllegalStateException + * If execution has finished. + */ + public synchronized void interruptWork() throws IllegalStateException { + checkRunnableAssigned(); + checkStatusIsStarted(); + + if (order == ORDER_NOT_SET) { + order = ORDER_TERMINATE; + notifyAll(); + } else { + throw new IllegalStateException( + "Cannot manage the runnable now, because it is not in paused state."); + } + } + +} diff --git a/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ExceptionFreeRunnable.java b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ExceptionFreeRunnable.java new file mode 100644 index 000000000..abc9aedfb --- /dev/null +++ b/src/ru/fizteh/fivt/students/fedorov_andrew/databaselibrary/test/support/parallel/ExceptionFreeRunnable.java @@ -0,0 +1,15 @@ +package ru.fizteh.fivt.students.fedorov_andrew.databaselibrary.test.support.parallel; + +/** + * Interface for runnable that can throw any exceptions. + */ +@FunctionalInterface +public interface ExceptionFreeRunnable { + /** + * Place your implementation here and do not care of exceptions. The given throwables will be caught and + * become accessible. + * @throws Exception + * @throws java.lang.AssertionError + */ + void runWithFreedom(ControllableAgent agent) throws Exception, AssertionError; +}