@ThreadSafe public class AtomicArrayKVStore extends AbstractKVStore implements AtomicKVStore
AtomicKVStore
based on ArrayKVStore
plus a write-ahead log and background compaction.
This implementation is designed to maximize the speed of reads and minimize the amount of memory overhead per key/value pair. It is optimized for relatively infrequent writes.
A (read-only) ArrayKVStore
is the basis for the database; the array files are mapped into memory. As mutations are
applied, they are added to an in-memory change set, and appended to a mutation log file for persistence. On restart,
the mutation log file (if any) is read to reconstruct the in-memory change set.
Compaction
Instances periodically compact outstanding changes into new array files (and truncate the mutation log file) in a background thread. A compaction is scheduled whenever:
scheduleCompaction()
method is invoked
In order to prevent compaction from getting hopelessly behind when there is high write volume, a
compaction space high-water mark is also used. When the size of the mutation log file
exceeds the half-way point between the low-water and high-water marks, new write attempts start being artificially delayed,
and this delay amount increases to infinity as the high-water mark is approached,
so that as long as the high-water mark is exceeded, new writes are blocked completely until the current compaction cycle
completes. Use getTotalMillisWaiting()
to asses the total amount of time spent in these artificial delays.
The number of bytes occupied by the in-memory change set and the length of the outstanding mutations log file are related. Therefore, the compaction high-water mark loosely correlates to a maximum amount of memory required by the in-memory change set.
Hot Backups
"Hot" backups may be created in parallel with normal operation via hotCopy()
.
Hard links are used to make this operation fast; only the mutation log file (if any) is actually copied.
The database directory is a required configuration property.
Key and value data must not exceed 2GB (each separately).
Instances may be stopped and (re)started multiple times.
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_COMPACTION_HIGH_WATER
Default compaction space high-water mark in bytes (1073741824 bytes).
|
static int |
DEFAULT_COMPACTION_LOW_WATER
Default compaction space low-water mark in bytes (65536 bytes).
|
static int |
DEFAULT_COMPACTION_MAX_DELAY
Default compaction maximum delay in seconds (86400 seconds).
|
Constructor and Description |
---|
AtomicArrayKVStore() |
Modifier and Type | Method and Description |
---|---|
void |
adjustCounter(byte[] key,
long amount)
Adjust the counter at the given key by the given amount.
|
void |
apply(Mutations mutations)
Apply all the given
Mutations to this instance. |
protected long |
calculateCompactionPressureDelay(float windowRatio)
Calculate the maximum amount of time that a thread attempting to
mutate() must wait
for a background compaction to complete as we start nearing the high water mark. |
long |
decodeCounter(byte[] bytes)
Decode a counter value previously encoded by
encodeCounter() . |
byte[] |
encodeCounter(long value)
Encode a counter value into a
byte[] value suitable for use with decodeCounter()
and/or adjustCounter() . |
protected void |
finalize()
Finalize this instance.
|
byte[] |
get(byte[] key)
Get the value associated with the given key, if any.
|
KVPair |
getAtLeast(byte[] minKey,
byte[] maxKey)
Get the key/value pair having the smallest key greater than or equal to the given minimum, if any.
|
KVPair |
getAtMost(byte[] maxKey,
byte[] minKey)
Get the key/value pair having the largest key strictly less than the given maximum, if any.
|
File |
getDirectory()
Get the filesystem directory containing the database.
|
CloseableIterator<KVPair> |
getRange(byte[] minKey,
byte[] maxKey,
boolean reverse)
Iterate the key/value pairs in the specified range.
|
long |
getTotalMillisWaiting()
Get the total number of milliseconds spent in artificial delays caused by waiting for compaction.
|
void |
hotCopy(File target)
Create a filesystem atomic snapshot, or "hot" copy", of this instance in the specified destination directory.
|
void |
mutate(Mutations mutations,
boolean sync)
Apply a set of mutations to this instance atomically.
|
void |
put(byte[] key,
byte[] value)
Set the value associated with the given key.
|
void |
remove(byte[] key)
Remove the key/value pair with the given key, if it exists.
|
void |
removeRange(byte[] minKey,
byte[] maxKey)
Remove all key/value pairs whose keys are in a given range.
|
Future<?> |
scheduleCompaction()
Schedule a new compaction cycle, unless there is one already scheduled or running, or there are no
outstanding uncompacted modifications.
|
void |
setCompactHighWater(int compactHighWater)
Configure the compaction space high-water mark in bytes.
|
void |
setCompactLowWater(int compactLowWater)
Configure the compaction space low-water mark in bytes.
|
void |
setCompactMaxDelay(int compactMaxDelay)
Configure the compaction time maximum delay.
|
void |
setDirectory(File directory)
Configure the filesystem directory containing the database.
|
void |
setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
Configure the
ScheduledExecutorService used to schedule background compaction. |
CloseableKVStore |
snapshot()
Acquire a read-only, snapshot view of this instance based on the current state.
|
void |
start()
Start this instance.
|
void |
stop()
Stop this instance.
|
String |
toString() |
clone, equals, getClass, hashCode, notify, notifyAll, wait, wait, wait
getRange, getRange, removeRange
public static final int DEFAULT_COMPACTION_MAX_DELAY
public static final int DEFAULT_COMPACTION_LOW_WATER
public static final int DEFAULT_COMPACTION_HIGH_WATER
public File getDirectory()
public void setDirectory(File directory)
directory
- database directory, or null for noneIllegalStateException
- if this instance is already start()
edpublic void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
ScheduledExecutorService
used to schedule background compaction.
If not explicitly configured, a ScheduledExecutorService
will be created automatically during start()
using Executors.newSingleThreadScheduledExecutor()
and shutdown by stop()
(if explicitly configured here,
the configured ScheduledExecutorService
will not be shutdown by stop()
).
scheduledExecutorService
- schduled executor service, or null to have one created automaticallyIllegalStateException
- if this instance is already start()
edpublic void setCompactMaxDelay(int compactMaxDelay)
compactMaxDelay
- compaction time maximum delay in secondsIllegalArgumentException
- if compactMaxDelay
is negativeIllegalStateException
- if this instance is already start()
edpublic void setCompactLowWater(int compactLowWater)
This value is applied to the size of the on-disk modifications file.
compactLowWater
- compaction space low-water mark in bytesIllegalArgumentException
- if compactLowWater
is negativeIllegalStateException
- if this instance is already start()
edpublic void setCompactHighWater(int compactHighWater)
This value is applied to the size of the on-disk modifications file.
If the compaction space high water mark is set smaller than the the compaction space low water mark, then it's treated as if it were the same as the compaction space low water mark.
compactHighWater
- compaction space high-water mark in bytesIllegalArgumentException
- if compactHighWater
is negativeIllegalStateException
- if this instance is already start()
edpublic long getTotalMillisWaiting()
@PostConstruct public void start()
AtomicKVStore
This method is idempotent: if this instance is already started, nothing happens.
Whether an instance that has been started and stopped can be restarted is implementation-dependent.
start
in interface AtomicKVStore
@PreDestroy public void stop()
AtomicKVStore
Any open AtomicKVStore.snapshot()
's should be close()
'd before invoking this method;
the behavior of those that are not is undefined.
This method is idempotent: if this instance has not been started, or is already stopped, nothing happens.
stop
in interface AtomicKVStore
public byte[] get(byte[] key)
KVStore
Modifications to the returned byte[]
array do not affect this instance.
get
in interface KVStore
get
in class AbstractKVStore
key
- keypublic KVPair getAtLeast(byte[] minKey, byte[] maxKey)
KVStore
An optional (exclusive) maximum key may also be specified; if maxKey
is null, there is no upper bound;
if maxKey <= minKey
, null is always returned.
If keys starting with 0xff
are not supported by this instance, and minKey
starts with 0xff
,
then this method returns null.
Modifications to the returned byte[]
arrays do not affect this instance.
getAtLeast
in interface KVStore
getAtLeast
in class AbstractKVStore
minKey
- minimum key (inclusive), or null for no minimum (get the smallest key)maxKey
- maximum key (exclusive), or null for no maximum (no upper bound)key >= minKey
and key < maxKey
, or null if none existspublic KVPair getAtMost(byte[] maxKey, byte[] minKey)
KVStore
An optional (inclusive) minimum key may also be specified; if minKey
is null, there is no lower bound
(equivalent to a lower bound of the empty byte array); if minKey >= maxKey
, null is always returned.
If keys starting with 0xff
are not supported by this instance, and maxKey
starts with 0xff
,
then this method behaves as if maxKey
were null.
Modifications to the returned byte[]
arrays do not affect this instance.
getAtMost
in interface KVStore
getAtMost
in class AbstractKVStore
maxKey
- maximum key (exclusive), or null for no maximum (get the largest key)minKey
- minimum key (inclusive), or null for no minimum (no lower bound)key < maxKey
and key >= minKey
, or null if none existspublic CloseableIterator<KVPair> getRange(byte[] minKey, byte[] maxKey, boolean reverse)
KVStore
Iterator
's
remove()
method must be supported and should have the same effect as
invoking remove()
on the corresponding key.
If keys starting with 0xff
are not supported by this instance, and minKey
starts with 0xff
,
then this method returns an empty iteration.
If keys starting with 0xff
are not supported by this instance, and maxKey
starts with 0xff
,
then this method behaves as if maxKey
were null.
The returned Iterator
must not throw ConcurrentModificationException
;
however, whether or not a "live" Iterator
reflects any modifications made after its creation is
implementation dependent. Implementations that do make post-creation updates visible in the Iterator
,
even if the update occurs after some delay, must preserve the order in which the modifications actually occurred.
The returned Iterator
itself is not guaranteed to be thread safe.
Invokers of this method are encouraged to close()
the returned iterators,
though this is not required for correct behavior.
Modifications to the returned KVPair
key and value byte[]
arrays do not affect this instance.
getRange
in interface KVStore
minKey
- minimum key (inclusive), or null for no minimum (start at the smallest key)maxKey
- maximum key (exclusive), or null for no maximum (end at the largest key)reverse
- true to return key/value pairs in reverse order (i.e., keys descending)minKey
(inclusive) to maxKey
(exclusive)public void put(byte[] key, byte[] value)
KVStore
put
in interface KVStore
put
in class AbstractKVStore
key
- keyvalue
- valuepublic void remove(byte[] key)
KVStore
remove
in interface KVStore
remove
in class AbstractKVStore
key
- keypublic void removeRange(byte[] minKey, byte[] maxKey)
KVStore
The minKey
must be less than or equal to maxKey
; if they equal (and not null)
then nothing happens; if they are both null then all entries are deleted.
If keys starting with 0xff
are not supported by this instance, then:
minKey
starts with 0xff
, then no change occursmaxKey
starts with 0xff
, then this method behaves as if maxKey
were nullremoveRange
in interface KVStore
removeRange
in class AbstractKVStore
minKey
- minimum key (inclusive), or null for no minimummaxKey
- maximum key (exclusive), or null for no maximumpublic void adjustCounter(byte[] key, long amount)
KVStore
Ideally this operation should behave in a lock-free manner, so that concurrent transactions can invoke it without conflict. However, when lock-free behavior occurs (if at all) depends on the implementation.
If there is no value associated with key
, or key
's value is not a valid counter encoding as
would be acceptable to decodeCounter()
, then how this operation affects key
's
value is undefined.
adjustCounter
in interface KVStore
adjustCounter
in class AbstractKVStore
key
- keyamount
- amount to adjust counter value bypublic byte[] encodeCounter(long value)
KVStore
byte[]
value suitable for use with decodeCounter()
and/or adjustCounter()
.encodeCounter
in interface KVStore
encodeCounter
in class AbstractKVStore
value
- desired counter valuepublic long decodeCounter(byte[] bytes)
KVStore
encodeCounter()
.decodeCounter
in interface KVStore
decodeCounter
in class AbstractKVStore
bytes
- encoded counter valuepublic void apply(Mutations mutations)
KVStore
Mutations
to this instance.
Mutations are always to be applied in this order: removes, puts, counter adjustments.
The implementation in KVStore
simply iterates over the individual changes and applies them
via remove()
(for removals of a single key), removeRange()
,
put()
, and/or adjustCounter()
. Implementations that can process
batch updates more efficiently are encouraged to override this method.
public CloseableKVStore snapshot()
AtomicKVStore
The returned KVStore
view should remain constant even if this instance is subsequently mutated.
Note: callers are required to close
the returned instance when no longer in use.
snapshot
in interface AtomicKVStore
public void mutate(Mutations mutations, boolean sync)
AtomicKVStore
If this method returns normally, all of the given mutations will have been applied. If this method returns abnormally, then none of the given mutations will have been applied.
In any case, other threads observing this instance will never see a partial application of the given mutations.
This method is required to apply the mutations in this order: removes, puts, adjusts.
If sync
is true, the implementation must durably persist the changes before returning.
mutate
in interface AtomicKVStore
mutations
- the mutations to applysync
- if true, caller requires that the changes be durably persistedpublic void hotCopy(File target) throws IOException
The target
directory will be created if it does not exist; otherwise, it must be empty.
All files except the uncompacted modifications file are copied into target
in constant time using hard links if possible.
The hot copy operation can proceed in parallel with normal database activity, with the exception that the compaction operation cannot start while there are concurrent hot copies in progress.
Therefore, for best performance, consider performing a compaction immediately prior to this operation.
Note: when this method returns, all copied files, and the target
directory, have been fsync()
'd
and therefore may be considered durable in case of a system crash.
target
- destination directoryIOException
- if an I/O error occursIllegalArgumentException
- if target
exists and is not a directory or is non-emptyIllegalArgumentException
- if target
is nullprotected long calculateCompactionPressureDelay(float windowRatio)
mutate()
must wait
for a background compaction to complete as we start nearing the high water mark.
The implementation in AtomicArrayKVStore
returns -1
for all values below 50%,
and then scales according to an inverse power function returning zero at 50%, 100ms at 75%,
and Long.MAX_VALUE
at 100%.
windowRatio
- the uncompacted mutation size as a fraction of the interval between low and high
water marks; always a value value between zero and one (exclusive)public Future<?> scheduleCompaction()
IllegalStateException
- if this instance is not startedprotected void finalize() throws Throwable
stop()
to close any unclosed iterators.Copyright © 2019. All rights reserved.