diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java index ed7a05027e8..5b39147f3e2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java @@ -73,6 +73,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException table.deleteWithBatch(batch, key); } + @Override + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + table.deleteRangeWithBatch(batch, beginKey, endKey); + } + @Override public final KeyValueIterator iterator(KEY prefix, IteratorType type) { throw new UnsupportedOperationException("Iterating tables directly is not" + diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml index ea7f08edd82..27654930c41 100644 --- a/hadoop-hdds/framework/pom.xml +++ b/hadoop-hdds/framework/pom.xml @@ -307,6 +307,12 @@ test-jar test + + org.apache.ozone + hdds-managed-rocksdb + test-jar + test + org.apache.ozone hdds-test-utils diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java index 9d2944fab66..416cc8bb9c1 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java @@ -39,12 +39,12 @@ */ public final class CodecBufferCodec implements Codec { - private static final Codec DIRECT_INSTANCE = new CodecBufferCodec(true); - private static final Codec NON_DIRECT_INSTANCE = new CodecBufferCodec(false); + private static final CodecBufferCodec DIRECT_INSTANCE = new CodecBufferCodec(true); + private static final CodecBufferCodec NON_DIRECT_INSTANCE = new CodecBufferCodec(false); private final CodecBuffer.Allocator allocator; - public static Codec get(boolean direct) { + public static CodecBufferCodec get(boolean direct) { return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 87d9ffc625c..808a34ca45f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -17,20 +17,30 @@ package org.apache.hadoop.hdds.utils.db; -import static org.apache.hadoop.hdds.StringUtils.bytes2String; - import com.google.common.base.Preconditions; +import java.io.Closeable; +import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; +import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; import org.apache.ratis.util.TraditionalBinaryPrefix; import org.apache.ratis.util.UncheckedAutoCloseable; +import org.rocksdb.AbstractSlice; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +52,12 @@ public final class RDBBatchOperation implements BatchOperation { static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class); + private static final String PUT_OP = "PUT"; + private static final String DELETE_OP = "DELETE"; + private static final String DELETE_RANGE_OP = "DELETE_RANGE"; + private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); + private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true); private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); @@ -50,8 +65,6 @@ public final class RDBBatchOperation implements BatchOperation { private final OpCache opCache = new OpCache(); - private enum Op { DELETE } - public static RDBBatchOperation newAtomicOperation() { return newAtomicOperation(new ManagedWriteBatch()); } @@ -75,34 +88,37 @@ private static String countSize2String(int count, long size) { } /** - * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#ops}. + * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#batchOps}. * To implement {@link #equals(Object)} and {@link #hashCode()} * based on the contents of the bytes. */ - static final class Bytes { - private final byte[] array; - private final CodecBuffer buffer; + static final class Bytes implements Comparable, Closeable { + private AbstractSlice slice; /** Cache the hash value. */ - private final int hash; + private int hash; Bytes(CodecBuffer buffer) { - this.array = null; - this.buffer = Objects.requireNonNull(buffer, "buffer == null"); - this.hash = buffer.asReadOnlyByteBuffer().hashCode(); + Objects.requireNonNull(buffer, "buffer == null"); + if (buffer.isDirect()) { + initWithDirectByteBuffer(buffer.asReadOnlyByteBuffer()); + } else { + initWithByteArray(buffer.getArray()); + } } Bytes(byte[] array) { - this.array = array; - this.buffer = null; - this.hash = ByteBuffer.wrap(array).hashCode(); + Objects.requireNonNull(array, "array == null"); + initWithByteArray(array); } - byte[] array() { - return array; + private void initWithByteArray(byte[] array) { + this.slice = new ManagedSlice(array); + this.hash = ByteBuffer.wrap(array).hashCode(); } - ByteBuffer asReadOnlyByteBuffer() { - return buffer.asReadOnlyByteBuffer(); + private void initWithDirectByteBuffer(ByteBuffer byteBuffer) { + this.slice = new ManagedDirectSlice(byteBuffer); + this.hash = byteBuffer.hashCode(); } @Override @@ -116,11 +132,7 @@ public boolean equals(Object obj) { if (this.hash != that.hash) { return false; } - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); - return thisBuf.equals(thatBuf); + return slice.equals(that.slice); } @Override @@ -130,8 +142,187 @@ public int hashCode() { @Override public String toString() { - return array != null ? bytes2String(array) - : bytes2String(asReadOnlyByteBuffer()); + return slice.toString(); + } + + // This method mimics the ByteWiseComparator in RocksDB. + @Override + public int compareTo(RDBBatchOperation.Bytes that) { + return this.slice.compare(that.slice); + } + + @Override + public void close() { + slice.close(); + } + } + + private abstract class Operation implements Closeable { + private final Bytes keyBytes; + + private Operation(Bytes keyBytes) { + this.keyBytes = keyBytes; + } + + abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; + + abstract int keyLen(); + + abstract int valLen(); + + Bytes getKey() { + return keyBytes; + } + + int totalLength() { + return keyLen() + valLen(); + } + + abstract String getOpType(); + + @Override + public void close() { + if (keyBytes != null) { + keyBytes.close(); + } + } + + @Override + public String toString() { + return getOpType() + ", key=" + keyBytes; + } + } + + /** + * Delete operation to be applied to a {@link ColumnFamily} batch. + */ + private final class DeleteOperation extends Operation { + private final CodecBuffer key; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private DeleteOperation(CodecBuffer key, Bytes keyBytes) { + super(Objects.requireNonNull(keyBytes, "keyBytes == null")); + this.key = Objects.requireNonNull(key, "key == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDelete(batch, this.key.asReadOnlyByteBuffer()); + } + + @Override + public int keyLen() { + return key.readableBytes(); + } + + @Override + public int valLen() { + return 0; + } + + @Override + public String getOpType() { + return DELETE_OP; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + key.release(); + } + super.close(); + } + } + + /** + * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. + */ + private final class PutOperation extends Operation { + private final CodecBuffer key; + private final CodecBuffer value; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private PutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { + super(Objects.requireNonNull(keyBytes, "keyBytes == null")); + this.key = key; + this.value = value; + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); + } + + @Override + public int keyLen() { + return key.readableBytes(); + } + + @Override + public int valLen() { + return value.readableBytes(); + } + + @Override + public String getOpType() { + return PUT_OP; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + key.release(); + value.release(); + } + super.close(); + } + } + + /** + * Delete range operation to be applied to a {@link ColumnFamily} batch. + */ + private final class DeleteRangeOperation extends Operation { + private final byte[] startKey; + private final byte[] endKey; + private final RangeQueryIndex.Range rangeEntry; + + private DeleteRangeOperation(byte[] startKey, byte[] endKey) { + super(null); + this.startKey = Objects.requireNonNull(startKey, "startKey == null"); + this.endKey = Objects.requireNonNull(endKey, "endKey == null"); + this.rangeEntry = new RangeQueryIndex.Range<>(new Bytes(startKey), new Bytes(endKey)); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDeleteRange(batch, startKey, endKey); + } + + @Override + public int keyLen() { + return startKey.length + endKey.length; + } + + @Override + public int valLen() { + return 0; + } + + @Override + public String getOpType() { + return DELETE_RANGE_OP; + } + + @Override + public void close() { + super.close(); + rangeEntry.getStartInclusive().close(); + rangeEntry.getEndExclusive().close(); + } + + @Override + public String toString() { + return getOpType() + ", rangeEntry=" + rangeEntry; } } @@ -144,12 +335,40 @@ private class OpCache { private class FamilyCache { private final ColumnFamily family; /** - * A (dbKey -> dbValue) map, where the dbKey type is {@link Bytes} - * and the dbValue type is {@link Object}. - * When dbValue is a byte[]/{@link ByteBuffer}, it represents a put-op. - * Otherwise, it represents a delete-op (dbValue is {@link Op#DELETE}). + * A mapping of operation keys to their respective indices in {@code FamilyCache}. + * + * Key details: + * - Maintains a mapping of unique operation keys to their insertion or processing order. + * - Used internally to manage and sort operations during batch writes. + * - Facilitates filtering, overwriting, or deletion of operations based on their keys. + * + * Constraints: + * - Keys must be unique, represented using {@link Bytes}, to avoid collisions. + * - Each key is associated with a unique integer index to track insertion order. + * + * This field plays a critical role in managing the logical consistency and proper execution + * order of operations stored in the batch when interacting with a RocksDB-backed system. + */ + private final Map opsKeys = new HashMap<>(); + /** + * Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances. + * + * This map serves as the primary container for recording operations in preparation for a batch write + * within a RocksDB-backed system. Each operation is referenced by an integer index, which determines + * its insertion order and ensures correct sequencing during batch execution. + * + * Key characteristics: + * - Stores operations of type {@code Operation}. + * - Uses a unique integer key (index) for mapping each operation. + * - Serves as an intermediary structure during batch preparation and execution. + * + * Usage context: + * - This map is managed as part of the batch-writing process, which involves organizing, + * filtering, and applying multiple operations in a single cohesive batch. + * - Operations stored in this map are expected to define specific actions (e.g., put, delete, + * delete range) and their associated data (e.g., keys, values). */ - private final Map ops = new HashMap<>(); + private final Map batchOps = new HashMap<>(); private boolean isCommit; private long batchSize; @@ -157,32 +376,80 @@ private class FamilyCache { private int discardedCount; private int putCount; private int delCount; + private int delRangeCount; + private AtomicInteger opIndex; FamilyCache(ColumnFamily family) { this.family = family; + this.opIndex = new AtomicInteger(0); } - /** Prepare batch write for the entire family. */ + /** + * Prepares a batch write operation for a RocksDB-backed system. + * + * This method ensures the orderly execution of operations accumulated in the batch, + * respecting their respective types and order of insertion. + * + * Key functionalities: + * 1. Ensures that the batch is not already committed before proceeding. + * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. + * 3. Filters and adapts operations to account for any delete range operations that might + * affect other operations in the batch: + * - Operations with keys that fall within the range specified by a delete range operation + * are discarded. + * - Delete range operations are executed in their correct order. + * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. + * 5. Logs a summary of the batch execution for debugging purposes. + * + * Throws: + * - RocksDatabaseException if any error occurs while applying operations to the write batch. + * + * Prerequisites: + * - The method assumes that the operations are represented by `Operation` objects, each of which + * encapsulates the logic for its specific type. + * - Delete range operations must be represented by the `DeleteRangeOperation` class. + */ void prepareBatchWrite() throws RocksDatabaseException { Preconditions.checkState(!isCommit, "%s is already committed.", this); isCommit = true; - for (Map.Entry op : ops.entrySet()) { - final Bytes key = op.getKey(); - final Object value = op.getValue(); - if (value instanceof byte[]) { - family.batchPut(writeBatch, key.array(), (byte[]) value); - } else if (value instanceof CodecBuffer) { - family.batchPut(writeBatch, key.asReadOnlyByteBuffer(), - ((CodecBuffer) value).asReadOnlyByteBuffer()); - } else if (value == Op.DELETE) { - family.batchDelete(writeBatch, key.array()); - } else { - throw new IllegalStateException("Unexpected value: " + value - + ", class=" + value.getClass().getSimpleName()); + // Sort Entries based on opIndex and flush the operation to the batch in the same order. + List ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) + .map(Map.Entry::getValue).collect(Collectors.toList()); + Set> deleteRangeEntries = new HashSet<>(); + for (Operation op : ops) { + if (DELETE_RANGE_OP.equals(op.getOpType())) { + DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op; + deleteRangeEntries.add(deleteRangeOp.rangeEntry); } } - - debug(this::summary); + try { + RangeQueryIndex rangeQueryIdx = new RangeQueryIndex<>(deleteRangeEntries); + for (Operation op : ops) { + if (DELETE_RANGE_OP.equals(op.getOpType())) { + DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op; + rangeQueryIdx.removeRange(deleteRangeOp.rangeEntry); + op.apply(family, writeBatch); + } else { + // Find a delete range op matching which would contain the key after the + // operation has occurred. If there is no such operation then perform the operation otherwise discard the + // op. + if (!rangeQueryIdx.containsIntersectingRange(op.getKey())) { + op.apply(family, writeBatch); + } else { + debug(() -> { + RangeQueryIndex.Range deleteRangeOp = rangeQueryIdx.getFirstIntersectingRange(op.getKey()); + return String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", + op.getKey(), deleteRangeOp.getStartInclusive(), deleteRangeOp.getEndExclusive()); + }); + discardedCount++; + discardedSize += op.totalLength(); + } + } + } + debug(this::summary); + } catch (IOException e) { + throw new RocksDatabaseException("Failed to prepare batch write", e); + } } private String summary() { @@ -194,48 +461,38 @@ void clear() { final boolean warn = !isCommit && batchSize > 0; String details = warn ? summary() : null; - for (Object value : ops.values()) { - if (value instanceof CodecBuffer) { - ((CodecBuffer) value).release(); // the key will also be released - } - } - ops.clear(); + IOUtils.close(LOG, batchOps.values()); + batchOps.clear(); if (warn) { LOG.warn("discarding changes {}", details); } } - void putOrDelete(Bytes key, int keyLen, Object val, int valLen) { - Preconditions.checkState(!isCommit, "%s is already committed.", this); - batchSize += keyLen + valLen; + private void deleteIfExist(Bytes key) { // remove previous first in order to call release() - final Object previous = ops.remove(key); - if (previous != null) { - final boolean isPut = previous != Op.DELETE; - final int preLen; - if (!isPut) { - preLen = 0; - } else if (previous instanceof CodecBuffer) { - final CodecBuffer previousValue = (CodecBuffer) previous; - preLen = previousValue.readableBytes(); - previousValue.release(); // key will also be released - } else if (previous instanceof byte[]) { - preLen = ((byte[]) previous).length; - } else { - throw new IllegalStateException("Unexpected previous: " + previous - + ", class=" + previous.getClass().getSimpleName()); - } - discardedSize += keyLen + preLen; + if (opsKeys.containsKey(key)) { + int previousIndex = opsKeys.remove(key); + final Operation previous = batchOps.remove(previousIndex); + previous.close(); + discardedSize += previous.totalLength(); discardedCount++; - debug(() -> String.format("%s overwriting a previous %s", this, - isPut ? "put (value: " + byteSize2String(preLen) + ")" : "del")); + debug(() -> String.format("%s overwriting a previous %s[valLen => %s]", this, previous.getOpType(), + previous.valLen())); } - final Object overwritten = ops.put(key, val); - Preconditions.checkState(overwritten == null); + } + void overWriteOpIfExist(Bytes key, Operation operation) { + Preconditions.checkState(!isCommit, "%s is already committed.", this); + deleteIfExist(key); + batchSize += operation.totalLength(); + int newIndex = opIndex.getAndIncrement(); + final Integer overwrittenOpKey = opsKeys.put(key, newIndex); + final Operation overwrittenOp = batchOps.put(newIndex, operation); + Preconditions.checkState(overwrittenOpKey == null && overwrittenOp == null); debug(() -> String.format("%s %s, %s; key=%s", this, - valLen == 0 ? delString(keyLen) : putString(keyLen, valLen), + DELETE_OP.equals(operation.getOpType()) ? delString(operation.totalLength()) : putString(operation.keyLen(), + operation.valLen()), batchSizeDiscardedString(), key)); } @@ -243,19 +500,35 @@ void put(CodecBuffer key, CodecBuffer value) { putCount++; // always release the key with the value - value.getReleaseFuture().thenAccept(v -> key.release()); - putOrDelete(new Bytes(key), key.readableBytes(), - value, value.readableBytes()); + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new PutOperation(key, value, keyBytes)); } void put(byte[] key, byte[] value) { putCount++; - putOrDelete(new Bytes(key), key.length, value, value.length); + CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); + CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value); + Bytes keyBytes = new Bytes(keyBuffer); + overWriteOpIfExist(keyBytes, new PutOperation(keyBuffer, valueBuffer, keyBytes)); } void delete(byte[] key) { delCount++; - putOrDelete(new Bytes(key), key.length, Op.DELETE, 0); + CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); + Bytes keyBytes = new Bytes(keyBuffer); + + overWriteOpIfExist(keyBytes, new DeleteOperation(keyBuffer, keyBytes)); + } + + void delete(CodecBuffer key) { + delCount++; + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); + } + + void deleteRange(byte[] startKey, byte[] endKey) { + delRangeCount++; + batchOps.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); } String putString(int keySize, int valueSize) { @@ -295,6 +568,15 @@ void delete(ColumnFamily family, byte[] key) { .delete(key); } + void delete(ColumnFamily family, CodecBuffer key) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key); + } + + void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) + .deleteRange(startKey, endKey); + } + /** Prepare batch write for the entire cache. */ UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { for (Map.Entry e : name2cache.entrySet()) { @@ -316,6 +598,7 @@ String getCommitString() { int opSize = 0; int discardedCount = 0; int discardedSize = 0; + int delRangeCount = 0; for (FamilyCache f : name2cache.values()) { putCount += f.putCount; @@ -323,12 +606,13 @@ String getCommitString() { opSize += f.batchSize; discardedCount += f.discardedCount; discardedSize += f.discardedSize; + delRangeCount += f.delRangeCount; } final int opCount = putCount + delCount; return String.format( - "#put=%s, #del=%s, batchSize: %s, discarded: %s, committed: %s", - putCount, delCount, + "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, committed: %s", + putCount, delCount, delRangeCount, countSize2String(opCount, opSize), countSize2String(discardedCount, discardedSize), countSize2String(opCount - discardedCount, opSize - discardedSize)); @@ -371,6 +655,10 @@ public void delete(ColumnFamily family, byte[] key) { opCache.delete(family, key); } + public void delete(ColumnFamily family, CodecBuffer key) { + opCache.delete(family, key); + } + public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { opCache.put(family, key, value); } @@ -378,4 +666,8 @@ public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { public void put(ColumnFamily family, byte[] key, byte[] value) { opCache.put(family, key, value); } + + public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + opCache.deleteRange(family, startKey, endKey); + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index f732735cbe3..ec9d900a1d4 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce db.deleteRange(family, beginKey, endKey); } + void deleteWithBatch(BatchOperation batch, CodecBuffer key) { + if (batch instanceof RDBBatchOperation) { + ((RDBBatchOperation) batch).delete(family, key); + } else { + throw new IllegalArgumentException("Unexpected batch class: " + batch.getClass().getSimpleName()); + } + } + @Override public void deleteWithBatch(BatchOperation batch, byte[] key) { if (batch instanceof RDBBatchOperation) { @@ -203,6 +211,15 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) { } + @Override + public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] endKey) { + if (batch instanceof RDBBatchOperation) { + ((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey); + } else { + throw new IllegalArgumentException("batch should be RDBBatchOperation"); + } + } + @Override public KeyValueIterator iterator(byte[] prefix, IteratorType type) throws RocksDatabaseException { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java new file mode 100644 index 00000000000..89c6d16905a --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; + +/** + * An index for answering "does this point fall within any of these ranges?" efficiently. + * + *

The indexed ranges are half-open intervals of the form + * {@code [startInclusive, endExclusive)}. + * + *

Core idea (sweep-line / prefix-sum over range boundaries): + * Instead of scanning every range on each query, this index stores a sorted map from + * boundary points to a running count of "active" ranges at that point. + * + *

    + *
  • For each range {@code [s, e)}, we add a delta {@code +1} at {@code s} and a delta + * {@code -1} at {@code e}.
  • + *
  • We then convert the deltas into a prefix sum in key order, so every boundary key + * stores the number of ranges active at that coordinate.
  • + *
  • For any query point {@code k}, the active count is {@code floorEntry(k).value}. + * If it is {@code > 0}, then {@code k} intersects at least one range.
  • + *
+ * + *

Update model: this index supports only removing ranges that were part of the + * initial set. Removal updates the prefix sums for keys in {@code [startInclusive, endExclusive)} + * (net effect of removing {@code +1} at start and {@code -1} at end). + * + *

Complexities: + *

    + *
  • Build: {@code O(R log B)} where {@code R} is #ranges and {@code B} is #distinct boundaries.
  • + *
  • {@link #containsIntersectingRange(Object)}: {@code O(log B)}.
  • + *
  • {@link #removeRange(Range)}: {@code O(log B + K)} where {@code K} is #boundaries in the range.
  • + *
+ * + * @param boundary type (must be {@link Comparable} to be stored in a {@link TreeMap}) + */ +class RangeQueryIndex> { + + private final TreeMap rangeCountIndexMap; + private final Set> ranges; + + RangeQueryIndex(Set> ranges) { + this.rangeCountIndexMap = new TreeMap<>(); + this.ranges = ranges; + init(); + } + + private void init() { + // Phase 1: store boundary deltas (+1 at start, -1 at end). + for (Range range : ranges) { + rangeCountIndexMap.compute(range.startInclusive, (k, v) -> v == null ? 1 : v + 1); + rangeCountIndexMap.compute(range.endExclusive, (k, v) -> v == null ? -1 : v - 1); + } + + // Phase 2: convert deltas to prefix sums so each key holds the active range count at that coordinate. + int totalCount = 0; + for (Map.Entry entry : rangeCountIndexMap.entrySet()) { + totalCount += entry.getValue(); + entry.setValue(totalCount); + } + } + + /** + * Remove a range from the index. + * + *

This method assumes the range set is "popped" over time (ranges are removed but not added). + * Internally, removing {@code [s, e)} decreases the active count by 1 for all boundary keys in + * {@code [s, e)} and leaves counts outside the range unchanged. + * + * @throws IOException if the given {@code range} is not part of the indexed set + */ + void removeRange(Range range) throws IOException { + if (!ranges.contains(range)) { + throw new IOException(String.format("Range %s not found in index structure : %s", range, ranges)); + } + ranges.remove(range); + for (Map.Entry entry : rangeCountIndexMap.subMap(range.startInclusive, true, + range.endExclusive, false).entrySet()) { + entry.setValue(entry.getValue() - 1); + } + } + + /** + * @return true iff {@code key} is contained in at least one indexed range. + * + *

Implementation detail: uses {@link TreeMap#floorEntry(Object)} to find the last boundary + * at or before {@code key}, and checks the prefix-summed active count at that point.

+ */ + boolean containsIntersectingRange(T key) { + Map.Entry countEntry = rangeCountIndexMap.floorEntry(key); + if (countEntry == null) { + return false; + } + return countEntry.getValue() > 0; + } + + /** + * Returns an intersecting range containing {@code key}, if any. + * + *

This method first checks {@link #containsIntersectingRange(Comparable)} using the index; + * if the count indicates an intersection exists, it then scans the backing {@link #ranges} + * set to find a concrete {@link Range} that contains {@code key}.

+ * + *

Note that because {@link #ranges} is a {@link Set}, "first" refers to whatever iteration + * order that set provides (it is not guaranteed to be deterministic unless the provided set is).

+ * + * @return a containing range, or null if none intersect + */ + Range getFirstIntersectingRange(T key) { + Map.Entry countEntry = rangeCountIndexMap.floorEntry(key); + if (countEntry == null) { + return null; + } + for (Range range : ranges) { + if (range.contains(key)) { + return range; + } + } + return null; + } + + /** + * A half-open interval {@code [startInclusive, endExclusive)}. + * + *

For a value {@code k} to be contained, it must satisfy: + * {@code startInclusive <= k < endExclusive} (according to {@link Comparable#compareTo(Object)}).

+ */ + static final class Range> { + private final T startInclusive; + private final T endExclusive; + + Range(T startInclusive, T endExclusive) { + this.startInclusive = Objects.requireNonNull(startInclusive, "start == null"); + this.endExclusive = Objects.requireNonNull(endExclusive, "end == null"); + } + + @Override + public boolean equals(Object o) { + return this == o; + } + + @Override + public int hashCode() { + return Objects.hash(startInclusive, endExclusive); + } + + T getStartInclusive() { + return startInclusive; + } + + T getEndExclusive() { + return endExclusive; + } + + /** + * @return true iff {@code key} is within {@code [startInclusive, endExclusive)}. + */ + public boolean contains(T key) { + return startInclusive.compareTo(key) <= 0 && key.compareTo(endExclusive) < 0; + } + + @Override + public String toString() { + return "Range{" + + "startInclusive=" + startInclusive + + ", endExclusive=" + endExclusive + + '}'; + } + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index 659954a861b..621178f687d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -299,7 +299,7 @@ public ColumnFamilyHandle getHandle() { return handle; } - public void batchDelete(ManagedWriteBatch writeBatch, byte[] key) + public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key) throws RocksDatabaseException { try (UncheckedAutoCloseable ignored = acquire()) { writeBatch.delete(getHandle(), key); @@ -308,17 +308,13 @@ public void batchDelete(ManagedWriteBatch writeBatch, byte[] key) } } - public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[] value) + public void batchDeleteRange(ManagedWriteBatch writeBatch, byte[] beginKey, byte[] endKey) throws RocksDatabaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("batchPut array key {}", bytes2String(key)); - LOG.debug("batchPut array value {}", bytes2String(value)); - } - try (UncheckedAutoCloseable ignored = acquire()) { - writeBatch.put(getHandle(), key, value); + writeBatch.deleteRange(getHandle(), beginKey, endKey); } catch (RocksDBException e) { - throw toRocksDatabaseException(this, "batchPut key " + bytes2String(key), e); + throw toRocksDatabaseException(this, "batchDeleteRange key " + bytes2String(beginKey) + " - " + + bytes2String(endKey), e); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index fc049034406..6904f22d7d8 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -134,6 +134,14 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException */ void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException; + /** + * Deletes a range of keys from the metadata store as part of a batch operation. + * @param batch Batch operation to perform the delete operation. + * @param beginKey start metadata key, inclusive. + * @param endKey end metadata key, exclusive. + */ + void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException; + /** * Deletes a range of keys from the metadata store. * diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 8000d48c618..bd0f6321b5b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -377,7 +377,24 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException { @Override public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException { - rawTable.deleteWithBatch(batch, encodeKey(key)); + if (supportCodecBuffer) { + CodecBuffer keyBuffer = null; + try { + keyBuffer = keyCodec.toDirectCodecBuffer(key); + // The buffers will be released after commit. + rawTable.deleteWithBatch(batch, keyBuffer); + } catch (Exception e) { + IOUtils.closeQuietly(keyBuffer); + throw e; + } + } else { + rawTable.deleteWithBatch(batch, encodeKey(key)); + } + } + + @Override + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey), encodeKey(endKey)); } @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java index 1dbb5029713..7f2ce3fc3a5 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java @@ -90,6 +90,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) { throw new UnsupportedOperationException(); } + @Override + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) { + throw new UnsupportedOperationException(); + } + @Override public void deleteRange(KEY beginKey, KEY endKey) { map.subMap(beginKey, endKey).clear(); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java index f695f286405..88eca2b02f6 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.google.common.primitives.Shorts; @@ -35,6 +36,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; @@ -49,6 +51,7 @@ public final class TestCodec { static { CodecBuffer.enableLeakDetection(); + ManagedRocksObjectUtils.loadRocksDBLibrary(); } @Test @@ -295,14 +298,13 @@ public static void runTest(Codec codec, T original, static void runTestBytes(T object, Codec codec) throws IOException { final byte[] array = codec.toPersistedFormat(object); final Bytes fromArray = new Bytes(array); - - try (CodecBuffer buffer = codec.toCodecBuffer(object, - CodecBuffer.Allocator.HEAP)) { - final Bytes fromBuffer = new Bytes(buffer); - - assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); - assertEquals(fromArray, fromBuffer); - assertEquals(fromBuffer, fromArray); + for (CodecBuffer.Allocator allocator : ImmutableList.of(CodecBuffer.Allocator.HEAP, CodecBuffer.Allocator.DIRECT)) { + try (CodecBuffer buffer = codec.toCodecBuffer(object, allocator)) { + final Bytes fromBuffer = new Bytes(buffer); + assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); + assertEquals(fromArray, fromBuffer); + assertEquals(fromBuffer, fromArray); + } } } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java new file mode 100644 index 00000000000..4f6db2a45bf --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.apache.hadoop.hdds.StringUtils.string2Bytes; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; +import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch; +import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.OpType; +import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.Operation; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +/** + * Test class for verifying batch operations with delete ranges using the + * RDBBatchOperation and MockedConstruction of ManagedWriteBatch. + * + * This test class includes: + * - Mocking and tracking of operations including put, delete, and delete range + * within a batch operation. + * - Validation of committed operations using assertions on collected data. + * - Ensures that the batch operation interacts correctly with the + * RocksDatabase and ColumnFamilyHandle components. + * + * The test method includes: + * 1. Setup of mocked ColumnFamilyHandle and RocksDatabase.ColumnFamily. + * 2. Mocking of methods to track operations performed on*/ +public class TestRDBBatchOperation { + + static { + ManagedRocksObjectUtils.loadRocksDBLibrary(); + } + + private static Operation getOperation(String key, String value, OpType opType) { + return new Operation(string2Bytes(key), value == null ? null : string2Bytes(value), opType); + } + + @Test + public void testBatchOperationWithDeleteRange() throws RocksDatabaseException, CodecException, RocksDBException { + try (TrackingUtilManagedWriteBatch writeBatch = new TrackingUtilManagedWriteBatch(); + RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation(writeBatch)) { + ColumnFamilyHandle columnFamilyHandle = Mockito.mock(ColumnFamilyHandle.class); + RocksDatabase.ColumnFamily columnFamily = Mockito.mock(RocksDatabase.ColumnFamily.class); + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .put(columnFamilyHandle, (ByteBuffer) i.getArgument(1), (ByteBuffer) i.getArgument(2)); + return null; + }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), any(ByteBuffer.class), any(ByteBuffer.class)); + + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .deleteRange(columnFamilyHandle, (byte[]) i.getArgument(1), (byte[]) i.getArgument(2)); + return null; + }).when(columnFamily).batchDeleteRange(any(ManagedWriteBatch.class), any(byte[].class), any(byte[].class)); + + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1)); + return null; + }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), any(ByteBuffer.class)); + + when(columnFamily.getHandle()).thenReturn(columnFamilyHandle); + when(columnFamilyHandle.getName()).thenReturn(string2Bytes("test")); + when(columnFamily.getName()).thenReturn("test"); + Codec codec = StringCodec.get(); + // OP1 + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value01")); + // OP2 + batchOperation.put(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("value02")); + // OP3 + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"), codec.toDirectCodecBuffer("value03")); + // OP4 + batchOperation.put(columnFamily, codec.toPersistedFormat("key03"), codec.toPersistedFormat("value04")); + // OP5 + batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05")); + // OP6 : This delete operation should get skipped because of OP11 + batchOperation.delete(columnFamily, codec.toPersistedFormat("key10")); + // OP7 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key01"), codec.toPersistedFormat("key02")); + // OP8 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("key03")); + // OP9 + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), codec.toDirectCodecBuffer("value04")); + // OP10 + batchOperation.put(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("value05")); + // OP11 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("key12")); + // OP12 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key09"), codec.toPersistedFormat("key10")); + + RocksDatabase db = Mockito.mock(RocksDatabase.class); + doNothing().when(db).batchWrite(any()); + batchOperation.commit(db); + List expectedOps = ImmutableList.of( + getOperation("key03", "value04", OpType.PUT_DIRECT), + getOperation("key05", null, OpType.DELETE_DIRECT), + getOperation("key01", "key02", OpType.DELETE_RANGE_INDIRECT), + getOperation("key02", "key03", OpType.DELETE_RANGE_INDIRECT), + getOperation("key04", "value04", OpType.PUT_DIRECT), + getOperation("key06", "key12", OpType.DELETE_RANGE_INDIRECT), + getOperation("key09", "key10", OpType.DELETE_RANGE_INDIRECT)); + assertEquals(ImmutableMap.of("test", expectedOps), writeBatch.getOperations()); + } + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index 2741834c9d7..cd155f27a96 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.utils.db; +import static org.apache.hadoop.hdds.StringUtils.bytes2String; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -66,7 +67,7 @@ public class TestRDBTableStore { public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80; private static int count = 0; private final List families = - Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY), + Arrays.asList(bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY), "First", "Second", "Third", "Fourth", "Fifth", "Sixth", "Seventh", @@ -635,21 +636,21 @@ public void testPrefixedRangeKVs() throws Exception { // test start with a middle key startKey = StringUtils.string2Bytes( - StringUtils.bytes2String(samplePrefix) + "3"); + bytes2String(samplePrefix) + "3"); rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix); assertEquals(2, rangeKVs.size()); // test with a filter - final KeyPrefixFilter filter1 = KeyPrefixFilter.newFilter(StringUtils.bytes2String(samplePrefix) + "1"); + final KeyPrefixFilter filter1 = KeyPrefixFilter.newFilter(bytes2String(samplePrefix) + "1"); startKey = StringUtils.string2Bytes( - StringUtils.bytes2String(samplePrefix)); + bytes2String(samplePrefix)); rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix, filter1); assertEquals(1, rangeKVs.size()); // test start with a non-exist key startKey = StringUtils.string2Bytes( - StringUtils.bytes2String(samplePrefix) + 123); + bytes2String(samplePrefix) + 123); rangeKVs = testTable.getRangeKVs(startKey, 10, samplePrefix); assertEquals(0, rangeKVs.size()); } @@ -775,4 +776,77 @@ private void populateTable(Table table, } } } + + @Test + public void batchDeleteWithRange() throws Exception { + final Table testTable = rdbStore.getTable("Fifth"); + try (BatchOperation batch = rdbStore.initBatchOperation()) { + + //given + String keyStr = RandomStringUtils.secure().next(10); + byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] keyInRange2 = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] endKey = ("4-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] value = + RandomStringUtils.secure().next(10).getBytes(StandardCharsets.UTF_8); + testTable.put(startKey, value); + testTable.put(keyInRange1, value); + testTable.put(keyInRange2, value); + testTable.put(endKey, value); + assertNotNull(testTable.get(startKey)); + assertNotNull(testTable.get(keyInRange1)); + assertNotNull(testTable.get(keyInRange2)); + assertNotNull(testTable.get(endKey)); + + //when + testTable.deleteRangeWithBatch(batch, startKey, endKey); + rdbStore.commitBatchOperation(batch); + + //then + assertNull(testTable.get(startKey)); + assertNull(testTable.get(keyInRange1)); + assertNull(testTable.get(keyInRange2)); + assertNotNull(testTable.get(endKey)); + } + } + + @Test + public void orderOfBatchOperations() throws Exception { + final Table testTable = rdbStore.getTable("Fifth"); + try (BatchOperation batch = rdbStore.initBatchOperation()) { + + //given + String keyStr = RandomStringUtils.secure().next(10); + byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] endKey = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] value1 = ("value1-" + RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8); + byte[] value2 = ("value2-" + RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8); + byte[] value3 = ("value3-" + RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8); + + //when + testTable.putWithBatch(batch, startKey, value1); + testTable.putWithBatch(batch, keyInRange1, value1); + testTable.deleteWithBatch(batch, keyInRange1); + // ops map key should be <, 1> + testTable.deleteRangeWithBatch(batch, startKey, endKey); + testTable.putWithBatch(batch, startKey, value2); + testTable.putWithBatch(batch, keyInRange1, value2); + // ops map key is <, 2>. + testTable.deleteRangeWithBatch(batch, startKey, keyInRange1); + testTable.putWithBatch(batch, endKey, value1); + testTable.putWithBatch(batch, endKey, value2); + // ops map key is <, 3>. + testTable.deleteRangeWithBatch(batch, startKey, endKey); + testTable.putWithBatch(batch, startKey, value3); + + rdbStore.commitBatchOperation(batch); + + //then + assertEquals(bytes2String(value3), bytes2String(testTable.get(startKey))); + assertNull(testTable.get(keyInRange1)); + assertEquals(bytes2String(value2), bytes2String(testTable.get(endKey))); + } + } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java new file mode 100644 index 00000000000..aa75d053902 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Set; +import org.apache.hadoop.hdds.utils.db.RangeQueryIndex.Range; +import org.junit.jupiter.api.Test; + +/** + * Test class for validating the behavior and functionality of the {@code RangeQueryIndex} class. + * + *

This class contains a collection of unit tests to ensure correct behavior of the range + * indexing system under various scenarios, such as intersections, overlaps, boundary conditions, + * and removal of range objects. The tests leverage the {@code Range} class for defining + * half-open intervals and test different operations provided by the {@code RangeQueryIndex}. + * + *

The tested operations include: + * - Checking for intersecting ranges. + * - Retrieving the first intersecting range. + * - Handling overlaps and nested ranges. + * - Adjacency between ranges. + * - Behaviors when handling duplicate ranges or ranges with identical bounds but different instances. + * - Error conditions when attempting invalid removals of ranges. + */ +public class TestRangeQueryIndex { + + @Test + public void testContainsIntersectingRangeHalfOpenBoundaries() { + final Range r1 = new Range<>(10, 20); // [10, 20) + final Range r2 = new Range<>(30, 40); // [30, 40) + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + ranges.add(r2); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // Before first range + assertFalse(index.containsIntersectingRange(0)); + assertFalse(index.containsIntersectingRange(9)); + + // Start is inclusive + assertTrue(index.containsIntersectingRange(10)); + assertTrue(index.containsIntersectingRange(19)); + + // End is exclusive + assertFalse(index.containsIntersectingRange(20)); + assertFalse(index.containsIntersectingRange(29)); + + // Second range + assertTrue(index.containsIntersectingRange(30)); + assertTrue(index.containsIntersectingRange(39)); + assertFalse(index.containsIntersectingRange(40)); + assertFalse(index.containsIntersectingRange(100)); + } + + @Test + public void testGetFirstIntersectingRangeAndRemovalWithOverlaps() throws Exception { + // Use LinkedHashSet to make iteration order deterministic for getFirstIntersectingRange(). + final Range r2 = new Range<>(5, 15); // overlaps with r1 for [5, 10) + final Range r1 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r2); + ranges.add(r1); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + assertTrue(index.containsIntersectingRange(7)); + final Range first = index.getFirstIntersectingRange(7); + assertNotNull(first); + assertSame(r2, first, "should return the first containing range in set iteration order"); + + index.removeRange(r2); + assertTrue(index.containsIntersectingRange(7), "still intersecting due to remaining overlapping range"); + assertSame(r1, index.getFirstIntersectingRange(7)); + + index.removeRange(r1); + assertFalse(index.containsIntersectingRange(7)); + assertNull(index.getFirstIntersectingRange(7)); + } + + @Test + public void testAdjacentRangesShareBoundary() { + final Range left = new Range<>(0, 10); // [0, 10) + final Range right = new Range<>(10, 20); // [10, 20) + final Set> ranges = new LinkedHashSet<>(); + ranges.add(left); + ranges.add(right); + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // End is exclusive for left; start is inclusive for right. + assertTrue(index.containsIntersectingRange(9)); + assertTrue(index.containsIntersectingRange(0)); + assertTrue(index.containsIntersectingRange(10)); + assertTrue(index.containsIntersectingRange(19)); + assertFalse(index.containsIntersectingRange(20)); + } + + @Test + public void testMultipleOverlapsAndNestedRangesRemovalOrder() throws Exception { + // rOuter covers everything; rMid overlaps partially; rInner is nested. + final Range rOuter = new Range<>(0, 100); // [0, 100) + final Range rMid = new Range<>(20, 80); // [20, 80) + final Range rInner = new Range<>(30, 40); // [30, 40) + final Set> ranges = new LinkedHashSet<>(); + ranges.add(rOuter); + ranges.add(rMid); + ranges.add(rInner); + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // Covered by outer only + assertTrue(index.containsIntersectingRange(10)); + assertSame(rOuter, index.getFirstIntersectingRange(10)); + + // Covered by all three + assertTrue(index.containsIntersectingRange(35)); + assertSame(rOuter, index.getFirstIntersectingRange(35)); + + // Remove the middle range first. + index.removeRange(rMid); + assertTrue(index.containsIntersectingRange(35), "still covered by outer + inner"); + + // Remove the inner range next. + index.removeRange(rInner); + assertTrue(index.containsIntersectingRange(35), "still covered by outer"); + + // Now remove the outer range; should become uncovered. + index.removeRange(rOuter); + assertFalse(index.containsIntersectingRange(35)); + assertNull(index.getFirstIntersectingRange(35)); + } + + @Test + public void testDuplicateSameBoundsDifferentInstances() throws Exception { + // Range.equals is identity-based, so two ranges with the same bounds can co-exist in the Set. + final Range r1 = new Range<>(0, 10); + final Range r2 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + ranges.add(r2); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + assertTrue(index.containsIntersectingRange(5)); + + // Remove one instance: should still intersect due to the other. + index.removeRange(r1); + assertTrue(index.containsIntersectingRange(5)); + + // Remove the second instance: now it should not intersect. + index.removeRange(r2); + assertFalse(index.containsIntersectingRange(5)); + } + + @Test + public void testRemoveSameInstanceTwiceThrows() throws Exception { + final Range r = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + index.removeRange(r); + assertThrows(IOException.class, () -> index.removeRange(r)); + } + + @Test + public void testRemoveRangeNotFoundThrows() throws Exception { + final Range r1 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // Range.equals is identity-based, so a different object with same bounds is not "found". + final Range sameBoundsDifferentInstance = new Range<>(0, 10); + assertThrows(IOException.class, () -> index.removeRange(sameBoundsDifferentInstance)); + + // Removing the original instance works. + index.removeRange(r1); + assertFalse(index.containsIntersectingRange(0)); + } + + @Test + public void testRemoveRangeDifferentBoundsThrows() { + final Range r1 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + assertThrows(IOException.class, () -> index.removeRange(new Range<>(1, 2))); + assertTrue(index.containsIntersectingRange(1), "index should remain unchanged after failed remove"); + } +} + + diff --git a/hadoop-hdds/managed-rocksdb/pom.xml b/hadoop-hdds/managed-rocksdb/pom.xml index 5e6976500f9..1a1fb3a82be 100644 --- a/hadoop-hdds/managed-rocksdb/pom.xml +++ b/hadoop-hdds/managed-rocksdb/pom.xml @@ -25,11 +25,6 @@ Apache Ozone HDDS Managed RocksDB Apache Ozone Managed RocksDB library - - - true - - com.google.guava @@ -63,6 +58,11 @@ org.slf4j slf4j-api + + org.apache.commons + commons-lang3 + test + @@ -74,6 +74,18 @@ none + + org.apache.maven.plugins + maven-jar-plugin + + + test-jar + + test-jar + + + + diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java index 52b8b5aabb1..1c2be3be36d 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java @@ -17,98 +17,34 @@ package org.apache.hadoop.hdds.utils.db.managed; -import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB.NOT_FOUND; - -import com.google.common.annotations.VisibleForTesting; import java.nio.ByteBuffer; -import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; -import org.apache.ratis.util.function.CheckedConsumer; -import org.apache.ratis.util.function.CheckedFunction; import org.rocksdb.DirectSlice; -import org.rocksdb.RocksDBException; /** - * ManagedDirectSlice is a managed wrapper around the DirectSlice object. It ensures - * proper handling of native resources associated with DirectSlice, utilizing - * the ManagedObject infrastructure to prevent resource leaks. It works in tandem - * with a ByteBuffer, which acts as the data source for the managed slice. + * ManagedDirectSlice is a class that extends the {@link DirectSlice} class and provides additional + * management for slices of direct {@link ByteBuffer} memory. This class initializes the slice with + * the given ByteBuffer and sets its prefix and length properties based on the buffer's position + * and remaining capacity. + * + * The class is designed to handle specific memory slicing operations while ensuring that the + * provided ByteBuffer’s constraints are respected. ManagedDirectSlice leverages its parent + * {@link DirectSlice} functionalities to deliver optimized direct buffer handling. + * + * Constructor: + * - Initializes the ManagedDirectSlice instance with a provided ByteBuffer. + * - Sets the slice length to the buffer's remaining capacity. + * - Removes the prefix based on the buffer's position. * - * This class overrides certain operations to tightly control the lifecycle and - * behavior of the DirectSlice it manages. It specifically caters to use cases - * where the slice is used in RocksDB operations, providing methods for safely - * interacting with the slice for put-like operations. + * NOTE: This class should be only with ByteBuffer whose position and limit is going be immutable in the lifetime of + * this ManagedDirectSlice instance. This means that the ByteBuffer's position and limit should not be modified + * externally while the ManagedDirectSlice is in use. The value in the byte buffer should be only accessed via the + * instance. */ -public class ManagedDirectSlice extends ManagedObject { - - private final ByteBuffer data; +public class ManagedDirectSlice extends DirectSlice { public ManagedDirectSlice(ByteBuffer data) { - super(new DirectSlice(data)); - this.data = data; - } - - @Override - public DirectSlice get() { - throw new UnsupportedOperationException("get() is not supported."); - } - - /** - * Executes the provided consumer on the internal {@code DirectSlice} after - * adjusting the slice's prefix and length based on the current position and - * remaining data in the associated {@code ByteBuffer}. If the consumer throws - * a {@code RocksDBException}, it is wrapped and rethrown as a - * {@code RocksDatabaseException}. - * - * @param consumer the operation to perform on the managed {@code DirectSlice}. - * The consumer must handle a {@code DirectSlice} and may throw - * a {@code RocksDBException}. - * @throws RocksDatabaseException if the provided consumer throws a - * {@code RocksDBException}. - */ - public void putFromBuffer(CheckedConsumer consumer) - throws RocksDatabaseException { - DirectSlice slice = super.get(); - slice.removePrefix(this.data.position()); - slice.setLength(this.data.remaining()); - try { - consumer.accept(slice); - } catch (RocksDBException e) { - throw new RocksDatabaseException("Error while performing put op with directSlice", e); - } - data.position(data.limit()); - } - - /** - * Retrieves data from the associated DirectSlice into the buffer managed by this instance. - * The supplied function is applied to the DirectSlice to process the data, and the method - * adjusts the buffer's position and limit based on the result. - * - * @param function a function that operates on a DirectSlice and returns the number - * of bytes written to the buffer, or a specific "not found" value - * if the operation fails. The function may throw a RocksDBException. - * @return the number of bytes written to the buffer if successful, or a specific - * "not found" value indicating the requested data was absent. - * @throws RocksDatabaseException if the provided function throws a RocksDBException, - * wrapping the original exception. - */ - public int getToBuffer(CheckedFunction function) - throws RocksDatabaseException { - DirectSlice slice = super.get(); - slice.removePrefix(this.data.position()); - slice.setLength(this.data.remaining()); - try { - int lengthWritten = function.apply(slice); - if (lengthWritten != NOT_FOUND) { - this.data.limit(Math.min(data.limit(), data.position() + lengthWritten)); - } - return lengthWritten; - } catch (RocksDBException e) { - throw new RocksDatabaseException("Error while performing put op with directSlice", e); - } - } - - @VisibleForTesting - DirectSlice getDirectSlice() { - return super.get(); + super(data); + this.removePrefix(data.position()); + this.setLength(data.remaining()); } } diff --git a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java index c332d32704f..f13aba39a7b 100644 --- a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java +++ b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java @@ -22,11 +22,8 @@ import java.nio.ByteBuffer; import java.util.Arrays; import org.apache.commons.lang3.RandomUtils; -import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.rocksdb.DirectSlice; /** * Tests for ManagedDirectSlice. @@ -39,48 +36,15 @@ public class TestManagedDirectSlice { @ParameterizedTest @CsvSource({"0, 1024", "1024, 1024", "512, 1024", "0, 100", "10, 512", "0, 0"}) - public void testManagedDirectSliceWithOffsetMovedAheadByteBuffer(int offset, int numberOfBytesWritten) - throws RocksDatabaseException { + public void testManagedDirectSliceWithOffset(int offset, int numberOfBytesWritten) { ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); byte[] randomBytes = RandomUtils.secure().nextBytes(numberOfBytesWritten); byteBuffer.put(randomBytes); byteBuffer.flip(); + byteBuffer.position(offset); try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer); ManagedSlice slice = new ManagedSlice(Arrays.copyOfRange(randomBytes, offset, numberOfBytesWritten))) { - byteBuffer.position(offset); - directSlice.putFromBuffer((ds) -> { - DirectSlice directSliceFromByteBuffer = directSlice.getDirectSlice(); - assertEquals(numberOfBytesWritten - offset, ds.size()); - assertEquals(0, directSliceFromByteBuffer.compare(slice)); - assertEquals(0, slice.compare(directSliceFromByteBuffer)); - }); - Assertions.assertEquals(numberOfBytesWritten, byteBuffer.position()); - } - } - - @ParameterizedTest - @CsvSource({"0, 1024, 512", "1024, 1024, 5", "512, 1024, 600", "0, 100, 80", "10, 512, 80", "0, 0, 10", - "100, 256, -1"}) - public void testManagedDirectSliceWithOpPutToByteBuffer(int offset, int maxNumberOfBytesWrite, - int numberOfBytesToWrite) throws RocksDatabaseException { - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); - byte[] randomBytes = RandomUtils.secure().nextBytes(offset); - byteBuffer.put(randomBytes); - try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer)) { - byteBuffer.position(offset); - byteBuffer.limit(Math.min(offset + maxNumberOfBytesWrite, 1024)); - assertEquals(numberOfBytesToWrite, directSlice.getToBuffer((ds) -> { - assertEquals(byteBuffer.remaining(), ds.size()); - return numberOfBytesToWrite; - })); - Assertions.assertEquals(offset, byteBuffer.position()); - if (numberOfBytesToWrite == -1) { - assertEquals(offset + maxNumberOfBytesWrite, byteBuffer.limit()); - } else { - Assertions.assertEquals(Math.min(Math.min(offset + numberOfBytesToWrite, 1024), maxNumberOfBytesWrite), - byteBuffer.limit()); - } - + assertEquals(slice, directSlice); } } } diff --git a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java new file mode 100644 index 00000000000..eca3977d1ef --- /dev/null +++ b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.managed; + +import static org.apache.hadoop.hdds.StringUtils.bytes2String; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +/** + * The TrackingUtilManagedWriteBatch class extends ManagedWriteBatch to provide functionality + * for tracking operations in a managed write batch context. Operations such as put, delete, + * merge, and delete range are managed and tracked, along with their corresponding operation types. + * + * This class supports direct and indirect operation types, delineated in the OpType enumeration. + * Direct operations are created using ByteBuffers while indirect operations are created using + * byte arrays. + */ +public class TrackingUtilManagedWriteBatch extends ManagedWriteBatch { + + private final Map> operations = new HashMap<>(); + + /** + * The OpType enumeration defines the different types of operations performed in a batch. + */ + public enum OpType { + PUT_DIRECT, + DELETE_DIRECT, + MERGE_DIRECT, + DELETE_RANGE_INDIRECT, + PUT_INDIRECT, + DELETE_INDIRECT, + MERGE_INDIRECT, + } + + /** + * The Operation class represents an individual operation to be performed in the context of + * a batch operation, such as a database write, delete, or merge. Each operation is characterized + * by a key, value, and an operation type (OpType). + * + * Operations can be of different types, as defined in the OpType enumeration, which include + * actions such as put, delete, merge, and delete range, either direct or indirect. + */ + public static class Operation { + private final byte[] key; + private final byte[] value; + private final OpType opType; + + public Operation(byte[] key, byte[] value, OpType opType) { + this.key = Arrays.copyOf(key, key.length); + this.value = value == null ? null : Arrays.copyOf(value, value.length); + this.opType = opType; + } + + public Operation(byte[] key, OpType opType) { + this(key, null, opType); + } + + @Override + public final boolean equals(Object o) { + if (!(o instanceof Operation)) { + return false; + } + + Operation operation = (Operation) o; + return Arrays.equals(key, operation.key) && Arrays.equals(value, operation.value) && + opType == operation.opType; + } + + @Override + public final int hashCode() { + return Arrays.hashCode(key) + Arrays.hashCode(value) + opType.hashCode(); + } + + @Override + public String toString() { + return "Operation{" + + "key=" + bytes2String(key) + + ", value=" + (value == null ? null : bytes2String(value)) + + ", opType=" + opType + + '}'; + } + } + + public Map> getOperations() { + return operations; + } + + public TrackingUtilManagedWriteBatch() { + super(); + } + + private byte[] convert(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + + @Override + public void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(key, OpType.DELETE_INDIRECT)); + } + + @Override + public void delete(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(convert(key), OpType.DELETE_DIRECT)); + } + + @Override + public void delete(byte[] key) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()).add(new Operation(key, OpType.DELETE_INDIRECT)); + } + + @Override + public void delete(ByteBuffer key) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(convert(key), OpType.DELETE_DIRECT)); + } + + @Override + public void deleteRange(byte[] beginKey, byte[] endKey) { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT)); + } + + @Override + public void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey) + throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT)); + } + + @Override + public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(key, value, OpType.MERGE_INDIRECT)); + } + + @Override + public void merge(byte[] key, byte[] value) { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(key, value, OpType.MERGE_INDIRECT)); + } + + @Override + public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(key, value, OpType.PUT_INDIRECT)); + } + + @Override + public void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT)); + } + + @Override + public void put(byte[] key, byte[] value) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()).add(new Operation(key, value, OpType.PUT_INDIRECT)); + } + + @Override + public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT)); + } + + @Override + public void close() { + super.close(); + } +} diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java index 14f553a9b18..a689e9fdea1 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java @@ -85,11 +85,9 @@ public void delete(byte[] key) throws RocksDatabaseException { public void delete(CodecBuffer key) throws RocksDatabaseException { try (ManagedDirectSlice slice = new ManagedDirectSlice(key.asReadOnlyByteBuffer())) { - slice.putFromBuffer(directSlice -> { - sstFileWriter.delete(directSlice); - keyCounter.incrementAndGet(); - }); - } catch (RocksDatabaseException e) { + sstFileWriter.delete(slice); + keyCounter.incrementAndGet(); + } catch (RocksDBException e) { closeOnFailure(); throw new RocksDatabaseException("Failed to delete key (length=" + key.readableBytes() + "), sstFile=" + sstFile.getAbsolutePath(), e); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index b488997b522..77ed8a82487 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -92,7 +92,6 @@ import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint; import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; @@ -584,11 +583,7 @@ private static void deleteKeysFromTableWithBucketPrefix(OMMetadataManager metada String endKey = getLexicographicallyHigherString(prefix); LOG.debug("Deleting key range from {} - startKey: {}, endKey: {}", table.getName(), prefix, endKey); - try (TableIterator itr = table.keyIterator(prefix)) { - while (itr.hasNext()) { - table.deleteWithBatch(batchOperation, itr.next()); - } - } + table.deleteRangeWithBatch(batchOperation, prefix, endKey); } @VisibleForTesting diff --git a/pom.xml b/pom.xml index 73ed23ae01f..d68b95b6982 100644 --- a/pom.xml +++ b/pom.xml @@ -1075,6 +1075,12 @@ hdds-managed-rocksdb ${hdds.version} + + org.apache.ozone + hdds-managed-rocksdb + ${hdds.version} + test-jar + org.apache.ozone hdds-rocks-native @@ -1965,6 +1971,7 @@ org.rocksdb.ColumnFamilyHandle org.rocksdb.Env org.rocksdb.Statistics + org.rocksdb.AbstractSlice org.rocksdb.RocksDB.*