Class AtomicArrayKVStore

java.lang.Object
io.permazen.kv.AbstractKVStore
io.permazen.kv.array.AtomicArrayKVStore
All Implemented Interfaces:
KVStore, AtomicKVStore

@ThreadSafe public class AtomicArrayKVStore extends AbstractKVStore implements AtomicKVStore
AtomicKVStore based on ArrayKVStore plus a write-ahead log and background compaction.

This implementation is designed to maximize the speed of reads and minimize the amount of memory overhead per key/value pair. It is optimized for relatively infrequent writes.

A (read-only) ArrayKVStore is the basis for the database; the array files are mapped into memory. As mutations are applied, they are added to an in-memory change set, and appended to a mutation log file for persistence. On restart, the mutation log file (if any) is read to reconstruct the in-memory change set.

Compaction

Instances periodically compact outstanding changes into new array files (and truncate the mutation log file) in a background thread. A compaction is scheduled whenever:

In order to prevent compaction from getting hopelessly behind when there is high write volume, a compaction space high-water mark is also used. When the size of the mutation log file exceeds the half-way point between the low-water and high-water marks, new write attempts start being artificially delayed, and this delay amount increases to infinity as the high-water mark is approached, so that as long as the high-water mark is exceeded, new writes are blocked completely until the current compaction cycle completes. Use getTotalMillisWaiting() to asses the total amount of time spent in these artificial delays.

The number of bytes occupied by the in-memory change set and the length of the outstanding mutations log file are related. Therefore, the compaction high-water mark loosely correlates to a maximum amount of memory required by the in-memory change set.

Hot Backups

"Hot" backups may be created in parallel with normal operation via hotCopy(). Hard links are used to make this operation fast; only the mutation log file (if any) is actually copied.

The database directory is a required configuration property.

Key and value data must not exceed 2GB (each separately).

Instances may be stopped and (re)started multiple times.

See Also:
  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    static final int
    Default compaction space high-water mark in bytes (1073741824 bytes).
    static final int
    Default compaction space low-water mark in bytes (65536 bytes).
    static final int
    Default compaction maximum delay in seconds (86400 seconds).
  • Constructor Summary

    Constructors
    Constructor
    Description
     
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    adjustCounter(byte[] key, long amount)
    Adjust the counter at the given key by the given amount.
    void
    apply(Mutations mutations)
    Apply all the given Mutations to this instance.
    void
    apply(Mutations mutations, boolean sync)
    Apply a set of mutations to this instance atomically.
    protected long
    Calculate the maximum amount of time that a thread attempting to apply() mutations must wait for a background compaction to complete as we start nearing the high water mark.
    long
    decodeCounter(byte[] bytes)
    Decode a counter value previously encoded by encodeCounter().
    byte[]
    encodeCounter(long value)
    Encode a counter value into a byte[] value suitable for use with decodeCounter() and/or adjustCounter().
    protected void
    Finalize this instance.
    byte[]
    get(byte[] key)
    Get the value associated with the given key, if any.
    getAtLeast(byte[] minKey, byte[] maxKey)
    Get the key/value pair having the smallest key greater than or equal to the given minimum, if any.
    getAtMost(byte[] maxKey, byte[] minKey)
    Get the key/value pair having the largest key strictly less than the given maximum, if any.
    Get the filesystem directory containing the database.
    getRange(byte[] minKey, byte[] maxKey, boolean reverse)
    Iterate the key/value pairs in the specified range.
    long
    Get the total number of milliseconds spent in artificial delays caused by waiting for compaction.
    void
    hotCopy(File target)
    Create a filesystem atomic snapshot, or "hot" copy", of this instance in the specified destination directory.
    void
    put(byte[] key, byte[] value)
    Set the value associated with the given key.
    Create a read-only "snapshot" view of this instance equal to its current state.
    void
    remove(byte[] key)
    Remove the key/value pair with the given key, if it exists.
    void
    removeRange(byte[] minKey, byte[] maxKey)
    Remove all key/value pairs whose keys are in a given range.
    Schedule a new compaction cycle, unless there is one already scheduled or running, or there are no outstanding uncompacted modifications.
    void
    setCompactHighWater(int compactHighWater)
    Configure the compaction space high-water mark in bytes.
    void
    setCompactLowWater(int compactLowWater)
    Configure the compaction space low-water mark in bytes.
    void
    setCompactMaxDelay(int compactMaxDelay)
    Configure the compaction time maximum delay.
    void
    setDirectory(File directory)
    Configure the filesystem directory containing the database.
    void
    Configure the ScheduledExecutorService used to schedule background compaction.
    void
    Start this instance.
    void
    Stop this instance.
     

    Methods inherited from class java.lang.Object

    clone, equals, getClass, hashCode, notify, notifyAll, wait, wait, wait

    Methods inherited from interface io.permazen.kv.KVStore

    getRange, getRange, removeRange
  • Field Details

    • DEFAULT_COMPACTION_MAX_DELAY

      public static final int DEFAULT_COMPACTION_MAX_DELAY
      Default compaction maximum delay in seconds (86400 seconds).
      See Also:
    • DEFAULT_COMPACTION_LOW_WATER

      public static final int DEFAULT_COMPACTION_LOW_WATER
      Default compaction space low-water mark in bytes (65536 bytes).
      See Also:
    • DEFAULT_COMPACTION_HIGH_WATER

      public static final int DEFAULT_COMPACTION_HIGH_WATER
      Default compaction space high-water mark in bytes (1073741824 bytes).
      See Also:
  • Constructor Details

    • AtomicArrayKVStore

      public AtomicArrayKVStore()
  • Method Details

    • getDirectory

      public File getDirectory()
      Get the filesystem directory containing the database. If not set, this class functions as an in-memory store.
      Returns:
      database directory, or null for none
    • setDirectory

      public void setDirectory(File directory)
      Configure the filesystem directory containing the database. If not set, this class functions as an in-memory store.
      Parameters:
      directory - database directory, or null for none
      Throws:
      IllegalStateException - if this instance is already start()ed
    • setScheduledExecutorService

      public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
      Configure the ScheduledExecutorService used to schedule background compaction.

      If not explicitly configured, a ScheduledExecutorService will be created automatically during start() using Executors.newSingleThreadScheduledExecutor() and shutdown by stop() (if explicitly configured here, the configured ScheduledExecutorService will not be shutdown by stop()).

      Parameters:
      scheduledExecutorService - schduled executor service, or null to have one created automatically
      Throws:
      IllegalStateException - if this instance is already start()ed
    • setCompactMaxDelay

      public void setCompactMaxDelay(int compactMaxDelay)
      Configure the compaction time maximum delay. Compaction will be automatically triggered whenever there is any uncompacted modification older than this.
      Parameters:
      compactMaxDelay - compaction time maximum delay in seconds
      Throws:
      IllegalArgumentException - if compactMaxDelay is negative
      IllegalStateException - if this instance is already start()ed
    • setCompactLowWater

      public void setCompactLowWater(int compactLowWater)
      Configure the compaction space low-water mark in bytes.

      This value is applied to the size of the on-disk modifications file.

      Parameters:
      compactLowWater - compaction space low-water mark in bytes
      Throws:
      IllegalArgumentException - if compactLowWater is negative
      IllegalStateException - if this instance is already start()ed
    • setCompactHighWater

      public void setCompactHighWater(int compactHighWater)
      Configure the compaction space high-water mark in bytes.

      This value is applied to the size of the on-disk modifications file.

      If the compaction space high water mark is set smaller than the the compaction space low water mark, then it's treated as if it were the same as the compaction space low water mark.

      Parameters:
      compactHighWater - compaction space high-water mark in bytes
      Throws:
      IllegalArgumentException - if compactHighWater is negative
      IllegalStateException - if this instance is already start()ed
    • getTotalMillisWaiting

      public long getTotalMillisWaiting()
      Get the total number of milliseconds spent in artificial delays caused by waiting for compaction.
      Returns:
      total delay in milliseconds
    • start

      @PostConstruct public void start()
      Description copied from interface: AtomicKVStore
      Start this instance. This method must be called prior to creating any transactions.

      This method is idempotent: if this instance is already started, nothing happens.

      Whether an instance that has been started and stopped can be restarted is implementation-dependent.

      Specified by:
      start in interface AtomicKVStore
    • stop

      @PreDestroy public void stop()
      Description copied from interface: AtomicKVStore
      Stop this instance.

      Any open AtomicKVStore.readOnlySnapshot()'s should be close()'d before invoking this method; the behavior of those that are not is undefined.

      This method is idempotent: if this instance has not been started, or is already stopped, nothing happens.

      Specified by:
      stop in interface AtomicKVStore
    • get

      public byte[] get(byte[] key)
      Description copied from interface: KVStore
      Get the value associated with the given key, if any.

      Modifications to the returned byte[] array do not affect this instance.

      Specified by:
      get in interface KVStore
      Overrides:
      get in class AbstractKVStore
      Parameters:
      key - key
      Returns:
      value associated with key, or null if not found
    • getAtLeast

      public KVPair getAtLeast(byte[] minKey, byte[] maxKey)
      Description copied from interface: KVStore
      Get the key/value pair having the smallest key greater than or equal to the given minimum, if any.

      An optional (exclusive) maximum key may also be specified; if maxKey is null, there is no upper bound; if maxKey <= minKey, null is always returned.

      If keys starting with 0xff are not supported by this instance, and minKey starts with 0xff, then this method returns null.

      Modifications to the returned byte[] arrays do not affect this instance.

      Specified by:
      getAtLeast in interface KVStore
      Overrides:
      getAtLeast in class AbstractKVStore
      Parameters:
      minKey - minimum key (inclusive), or null for no minimum (get the smallest key)
      maxKey - maximum key (exclusive), or null for no maximum (no upper bound)
      Returns:
      smallest key/value pair with key >= minKey and key < maxKey, or null if none exists
    • getAtMost

      public KVPair getAtMost(byte[] maxKey, byte[] minKey)
      Description copied from interface: KVStore
      Get the key/value pair having the largest key strictly less than the given maximum, if any.

      An optional (inclusive) minimum key may also be specified; if minKey is null, there is no lower bound (equivalent to a lower bound of the empty byte array); if minKey >= maxKey, null is always returned.

      If keys starting with 0xff are not supported by this instance, and maxKey starts with 0xff, then this method behaves as if maxKey were null.

      Modifications to the returned byte[] arrays do not affect this instance.

      Specified by:
      getAtMost in interface KVStore
      Overrides:
      getAtMost in class AbstractKVStore
      Parameters:
      maxKey - maximum key (exclusive), or null for no maximum (get the largest key)
      minKey - minimum key (inclusive), or null for no minimum (no lower bound)
      Returns:
      largest key/value pair with key < maxKey and key >= minKey, or null if none exists
    • getRange

      public CloseableIterator<KVPair> getRange(byte[] minKey, byte[] maxKey, boolean reverse)
      Description copied from interface: KVStore
      Iterate the key/value pairs in the specified range. The returned CloseableIterator's remove() method must be supported and should have the same effect as invoking remove() on the corresponding key.

      If keys starting with 0xff are not supported by this instance, and minKey starts with 0xff, then this method returns an empty iteration.

      If keys starting with 0xff are not supported by this instance, and maxKey starts with 0xff, then this method behaves as if maxKey were null.

      The returned CloseableIterator is weakly consistent (see java.util.concurrent). In short, the returned CloseableIterator must not throw ConcurrentModificationException; however, whether or not a "live" CloseableIterator reflects any modifications made after its creation is implementation dependent. Implementations that do make post-creation updates visible in the CloseableIterator, even if the update occurs after some delay, must preserve the order in which the modifications actually occurred.

      The returned CloseableIterator itself is not guaranteed to be thread safe; is should only be used in the thread that created it.

      Invokers of this method are encouraged to close() the returned iterators, though this is not required for correct behavior.

      Modifications to the returned KVPair key and value byte[] arrays do not affect this instance.

      Specified by:
      getRange in interface KVStore
      Parameters:
      minKey - minimum key (inclusive), or null for no minimum (start at the smallest key)
      maxKey - maximum key (exclusive), or null for no maximum (end at the largest key)
      reverse - true to return key/value pairs in reverse order (i.e., keys descending)
      Returns:
      iteration of key/value pairs in the range minKey (inclusive) to maxKey (exclusive)
    • put

      public void put(byte[] key, byte[] value)
      Description copied from interface: KVStore
      Set the value associated with the given key.
      Specified by:
      put in interface KVStore
      Overrides:
      put in class AbstractKVStore
      Parameters:
      key - key
      value - value
    • remove

      public void remove(byte[] key)
      Description copied from interface: KVStore
      Remove the key/value pair with the given key, if it exists.
      Specified by:
      remove in interface KVStore
      Overrides:
      remove in class AbstractKVStore
      Parameters:
      key - key
    • removeRange

      public void removeRange(byte[] minKey, byte[] maxKey)
      Description copied from interface: KVStore
      Remove all key/value pairs whose keys are in a given range.

      The minKey must be less than or equal to maxKey; if they equal (and not null) then nothing happens; if they are both null then all entries are deleted.

      If keys starting with 0xff are not supported by this instance, then:

      • If minKey starts with 0xff, then no change occurs
      • If maxKey starts with 0xff, then this method behaves as if maxKey were null
      Specified by:
      removeRange in interface KVStore
      Overrides:
      removeRange in class AbstractKVStore
      Parameters:
      minKey - minimum key (inclusive), or null for no minimum
      maxKey - maximum key (exclusive), or null for no maximum
    • adjustCounter

      public void adjustCounter(byte[] key, long amount)
      Description copied from interface: KVStore
      Adjust the counter at the given key by the given amount.

      Ideally this operation should behave in a lock-free manner, so that concurrent transactions can invoke it without conflict. However, when lock-free behavior occurs (if at all) depends on the implementation.

      If there is no value associated with key, or key's value is not a valid counter encoding as would be acceptable to decodeCounter(), then how this operation affects key's value is undefined.

      Specified by:
      adjustCounter in interface KVStore
      Overrides:
      adjustCounter in class AbstractKVStore
      Parameters:
      key - key
      amount - amount to adjust counter value by
    • encodeCounter

      public byte[] encodeCounter(long value)
      Description copied from interface: KVStore
      Encode a counter value into a byte[] value suitable for use with decodeCounter() and/or adjustCounter().
      Specified by:
      encodeCounter in interface KVStore
      Overrides:
      encodeCounter in class AbstractKVStore
      Parameters:
      value - desired counter value
      Returns:
      encoded counter value
    • decodeCounter

      public long decodeCounter(byte[] bytes)
      Description copied from interface: KVStore
      Decode a counter value previously encoded by encodeCounter().
      Specified by:
      decodeCounter in interface KVStore
      Overrides:
      decodeCounter in class AbstractKVStore
      Parameters:
      bytes - encoded counter value
      Returns:
      decoded counter value
    • apply

      public void apply(Mutations mutations)
      Description copied from interface: AtomicKVStore
      Apply all the given Mutations to this instance.

      Equivalent to: apply(mutations, false).

      Specified by:
      apply in interface AtomicKVStore
      Specified by:
      apply in interface KVStore
      Parameters:
      mutations - mutations to apply
    • readOnlySnapshot

      public CloseableKVStore readOnlySnapshot()
      Description copied from interface: AtomicKVStore
      Create a read-only "snapshot" view of this instance equal to its current state.

      The returned CloseableKVStore should be treated as read-only. It may not actually be read-only, but if it's not, then any changes should have no effect on this instance. The returned CloseableKVStore must be completely independent from this instance (subsequent changes to either one do not affect the other).

      The returned CloseableKVStore should be promply close()'d when no longer needed to release any underlying resources.

      Specified by:
      readOnlySnapshot in interface AtomicKVStore
      Returns:
      read-only, snapshot view of this instance
    • apply

      public void apply(Mutations mutations, boolean sync)
      Description copied from interface: AtomicKVStore
      Apply a set of mutations to this instance atomically.

      If this method returns normally, all of the given mutations will have been applied. If this method returns abnormally, then none of the given mutations will have been applied.

      In any case, other threads observing this instance will never see a partial application of the given mutations.

      This method is required to apply the mutations in this order: removes, puts, adjusts.

      If sync is true, the implementation must durably persist the changes before returning.

      Specified by:
      apply in interface AtomicKVStore
      Parameters:
      mutations - the mutations to apply
      sync - if true, caller requires that the changes be durably persisted
    • hotCopy

      public void hotCopy(File target) throws IOException
      Create a filesystem atomic snapshot, or "hot" copy", of this instance in the specified destination directory. The copy will be up-to-date as of the time this method is invoked.

      The target directory will be created if it does not exist; otherwise, it must be empty. All files except the uncompacted modifications file are copied into target in constant time using hard links if possible.

      The hot copy operation can proceed in parallel with normal database activity, with the exception that the compaction operation cannot start while there are concurrent hot copies in progress.

      Therefore, for best performance, consider performing a compaction immediately prior to this operation.

      Note: when this method returns, all copied files, and the target directory, have been fsync()'d and therefore may be considered durable in case of a system crash.

      Parameters:
      target - destination directory
      Throws:
      IOException - if an I/O error occurs
      IllegalArgumentException - if target exists and is not a directory or is non-empty
      IllegalArgumentException - if target is null
    • calculateCompactionPressureDelay

      protected long calculateCompactionPressureDelay(float windowRatio)
      Calculate the maximum amount of time that a thread attempting to apply() mutations must wait for a background compaction to complete as we start nearing the high water mark.

      The implementation in AtomicArrayKVStore returns -1 for all values below 50%, and then scales according to an inverse power function returning zero at 50%, 100ms at 75%, and Long.MAX_VALUE at 100%.

      Parameters:
      windowRatio - the uncompacted mutation size as a fraction of the interval between low and high water marks; always a value value between zero and one (exclusive)
      Returns:
      negative value for no delay, zero or positive value to trigger a background compaction (if not already running) and wait up to that many milliseconds for it to complete
    • scheduleCompaction

      public Future<?> scheduleCompaction()
      Schedule a new compaction cycle, unless there is one already scheduled or running, or there are no outstanding uncompacted modifications.
      Returns:
      a future for the completion of the next compaction cycle, or null if there are no uncompacted modifications
      Throws:
      IllegalStateException - if this instance is not started
    • finalize

      protected void finalize() throws Throwable
      Finalize this instance. Invokes stop() to close any unclosed iterators.
      Overrides:
      finalize in class Object
      Throws:
      Throwable
    • toString

      public String toString()
      Overrides:
      toString in class Object