Class RaftKVDatabase

  • All Implemented Interfaces:

    public class RaftKVDatabase
    extends Object
    implements KVDatabase
    A distributed KVDatabase based on the Raft consensus algorithm.

    Raft Algorithm

    Raft defines a distributed consensus algorithm for maintaining a shared state machine. Each Raft node maintains a complete copy of the state machine. Cluster nodes elect a leader who collects and distributes updates and provides for consistent reads. As long as as a node is part of a majority, the state machine is fully operational. Raft is described more fully here.

    Key/Value Database

    RaftKVDatabase turns this into a transactional, highly available clustered key/value database with linearizable (technically, strictly serializable) consistency. A RaftKVDatabase appears to each node in the cluster as a shared, ACID compliant key/value database. As long as a node can communicate with a majority of other nodes (i.e., at least half of the cluster), then the database is fully available.

    Concurrent Transactions

    RaftKVDatabase supports multiple simultaneous read/write transactions across all nodes. Simultaneous transactions will successfully commit as long as the database is able to meet its consistency obligations. It does this verification by performing conflict analysis at the key/value level. When two transactions do conflict, the loser receives a RetryTransactionException. Because it is based on the Raft algorithm, consistency is guaranteed even in the face of arbitrary network drops, delays, and reorderings.


    Each node maintains a complete copy of the database, and persistence is guaranteed even if up to half of the cluster is lost. Each node stores its private persistent state in an AtomicKVStore (see setKVStore()).

    Standalone Mode

    Optional support for falling back to a "standalone mode" based on the most recent copy of the database when a majority of nodes can't be reached is provided by FallbackKVDatabase.

    Raft Implementation Details

    • The Raft state machine is the key/value store data.
    • Unapplied log entries are stored on disk as serialized mutations, and also cached in memory.
    • Concurrent transactions are supported through a simple optimistic locking MVCC scheme (similar to that used by SnapshotKVDatabase):
      • Transactions execute locally until commit time, using a MutableView to collect mutations. The MutableView is based on the local node's last unapplied log entry, if any (whether committed or not), or else directly on the underlying key/value store; this defines the base term and index for the transaction.
      • Since the transaction's view incorporates all unapplied log entries down to the underlying compacted key/value store, transaction performance degrades as the number of unapplied log entries grows. Log entries are always applied as soon as possible, but they are also kept around on disk (up to a point) after being applied in case needed by a leader.
      • On commit, the transaction's Reads, Writes, base index and term, and any config change are sent to the leader.
      • The leader confirms that the log entry corresponding to the transaction's base index and term matches its log. If this is not the case, then the transaction is rejected with a RetryTransactionException.
      • The leader confirms that the Writes associated with log entries (if any) after the transaction's base log entry do not create conflicts when compared against the transaction's Reads. If so, the transaction is rejected with a RetryTransactionException.
      • The leader adds a new log entry consisting of the transaction's Writes (and any config change) to its log. The associated term and index become the transaction's commit term and index; the leader then replies to the follower with this information.
      • If/when the follower sees a committed (in the Raft sense) log entry appear in its log matching the transaction's commit term and index, then the transaction is complete.
      • As a small optimization, when the leader sends a log entry to the same follower who committed the corresponding transaction in the first place, only the transaction ID is sent, because the follower already has the data.
      • After adding a new log entry, both followers and leaders "rebase" any open Consistency.LINEARIZABLE transactions by checking for conflicts in the manner described above.
    • For transactions occurring on a leader, the logic is similar except of course no network communication occurs.
    • For read-only transactions, the leader does not create a new log entry; instead, the transaction's commit term and index are set to the base term and index, and the leader also calculates its current "leader lease timeout", which is the earliest time at which it is possible for another leader to be elected. This is calculated as the time in the past at which the leader sent AppendRequest's to a majority of followers who have since responded, plus the minimum election timeout, minus a small adjustment for possible clock drift (this assumes all nodes have the same minimum election timeout configured). If the current time is prior to the leader lease timeout, then the transaction may be committed as soon as the log entry corresponding to the commit term and index is committed (it may already be); otherwise, the current time is returned to the follower as minimum required leader lease timeout before the transaction may be committed.
    • For read-only transactions, followers send the base term and index to the leader as soon as the transaction is set read-only, without any conflict information. This allows the leader to capture and return the lowest possible commit index to the follower while the transaction is still open, and lets followers stop rebasing the transaction (at the returned commit index) as soon as possible, minimizing conflicts.
    • Every AppendRequest includes the leader's current timestamp and leader lease timeout, so followers can commit any waiting read-only transactions. Leaders keep track of which followers are waiting on which leader lease timeout values, and when the leader lease timeout advances to allow a follower to commit a transaction, the follower is immediately notified.
    • Optional weaker consistency guarantees are availble on a per-transaction bases; see OPTION_CONSISTENCY. Setting the consistency to any level other than Consistency.LINEARIZABLE implicitly sets the transaction to read-only.


    • A transaction's mutations must fit in memory.
    • All nodes must be configured with the same minimum election timeout. This guarantees that the leader's lease timeout calculation is valid.
    • Due to the optimistic locking approach used, this implementation will perform poorly when there is a high rate of conflicting transactions; the result will be many transaction retries.
    • Performance will suffer when the amount of data associated with a typical transaction cannot be delivered quickly and reliably over the network.

    In general, the algorithm should function correctly under all non-Byzantine conditions. The level of difficultly the system is experiencing, due to contention, network errors, etc., can be measured in terms of:

    Cluster Configuration

    Instances support dynamic cluster configuration changes at runtime.

    Initially, all nodes are in an unconfigured state, where nothing has been added to the Raft log yet and no cluster is defined. Unconfigured nodes are passive: they stay in follower mode (i.e., they will not start elections), and they disallow local transactions that make any changes other than as described below to create a new cluster.

    An unconfigured node becomes configured when either:

    1. RaftKVTransaction.configChange() is invoked and committed within a local transaction, which creates a new single node cluster, with the current node as leader, and commits the cluster's first log entry; or
    2. An AppendRequest is received from a leader of some existing cluster, in which case the node records the cluster ID thereby joining the cluster (see below), and applies the received cluster configuration.

    A node is configured if and only if it has recorded one or more log entries. The very first log entry always contains the initial cluster configuration (containing only the node that created it, whether local or remote), so any node that has a non-empty log is configured.

    Newly created clusters are assigned a random 32-bit cluster ID (option #1 above). This ID is included in all messages sent over the network, and adopted by unconfigured nodes that join the cluster (via option #2 above). Configured nodes discard incoming messages containing a cluster ID different from the one they have joined. This prevents data corruption that can occur if nodes from two different clusters are inadvertently mixed together on the same network.

    Once a node joins a cluster with a specific cluster ID, it cannot be reassigned to a different cluster without first returning it to the unconfigured state; to do that, it must be shut it down and its persistent state deleted.

    Configuration Changes

    Once a node is configured, a separate issue is whether the node is included in its own configuration, i.e., whether the node is a member of its cluster according to the current cluster configuration. A node that is not a member of its own cluster does not count its own vote to determine committed log entries (if a leader), and does not start elections (if a follower). However, it will accept and respond to incoming AppendRequests and RequestVotes.

    In addition, leaders follow these rules with respect to configuration changes:

    • If a leader is removed from a cluster, it remains the leader until the corresponding configuration change is committed (not counting its own vote), and then steps down (i.e., reverts to follower).
    • If a follower is added to a cluster, the leader immediately starts sending that follower AppendRequests.
    • If a follower is removed from a cluster, the leader continues to send that follower AppendRequests until the follower acknowledges receipt of the log entry containing the configuration change.
    • Leaders defer configuration changes until they have committed at least one log entry in the current term (see this discussion).
    • Configuration changes that remove the last node in a cluster are disallowed.
    • Only one configuration change may take place at a time (i.e., be not yet committed).

    Follower Probes

    This implementation includes a modification to the Raft state machine to avoid unnecessary, disruptive elections when a node or nodes is disconnected from, and then reconnected to, the majority.

    When a follower's election timeout fires, before converting into a candidate, the follower is required to verify communication with a majority of the cluster using PingRequest messages. Only when the follower has successfully done so may it become a candidate. While in this intermediate "probing" mode, the follower responds normally to incoming messages. In particular, if the follower receives a valid AppendRequest from the leader, it reverts back to normal operation.

    This behavior is optional, but enabled by default; see setFollowerProbingEnabled().

    Key Watches

    Key watches are supported.

    Mutable Snapshots

    Mutable snapshots are supported.

    Spring Isolation Levels

    In Spring applications, the transaction Consistency level may be configured through the Spring PermazenTransactionManager by (ab)using the transaction isolation level setting, for example, via the @Transactional annotation's isolation() property. All Raft consistency levels are made available this way, though the mapping from Spring's isolation levels to RaftKVDatabase's consistency levels is only semantically approximate:

    High Priority Transactions

    Transactions may be configured as high priority; see RaftKVTransaction.setHighPriority().

    See Also:
    The Raft Consensus Algorithm
    • Constructor Detail

      • RaftKVDatabase

        public RaftKVDatabase()
    • Method Detail

      • setKVStore

        public void setKVStore​(AtomicKVStore kvstore)
        Configure the AtomicKVStore in which local persistent state is stored.

        Required property.

        kvstore - local persistent data store
        IllegalStateException - if this instance is already started
      • setLogDirectory

        public void setLogDirectory​(File directory)
        Configure the directory in which uncommitted log entries are stored.

        Required property.

        directory - log directory
        IllegalStateException - if this instance is already started
      • getLogDirectory

        public File getLogDirectory()
        Get the directory in which uncommitted log entries are stored.
        configured log directory
      • setIdentity

        public void setIdentity​(String identity)
        Configure the Raft identity.

        Required property.

        identity - unique Raft identity of this node in its cluster
        IllegalStateException - if this instance is already started
      • getIdentity

        public String getIdentity()
        Get this node's Raft identity.
        the unique identity of this node in its cluster
      • setMinElectionTimeout

        public void setMinElectionTimeout​(int timeout)
        Configure the minimum election timeout.

        This must be set to a value greater than the heartbeat timeout.


        Warning: currently all nodes must have the same configured minimum election timeout, otherwise read-only transactions are not guaranteed to be completely up-to-date.

        timeout - minimum election timeout in milliseconds
        IllegalStateException - if this instance is already started
        IllegalArgumentException - if timeout <= 0
      • getMinElectionTimeout

        public int getMinElectionTimeout()
        Get the configured minimum election timeout.
        minimum election timeout in milliseconds
      • getMaxElectionTimeout

        public int getMaxElectionTimeout()
        Get the configured maximum election timeout.
        maximum election timeout in milliseconds
      • getHeartbeatTimeout

        public int getHeartbeatTimeout()
        Get the configured heartbeat timeout.
        heartbeat timeout in milliseconds
      • setMaxTransactionDuration

        public void setMaxTransactionDuration​(int duration)
        Configure the maximum supported duration for outstanding transactions.

        This value may be changed while this instance is already running.


        duration - maximum supported duration for outstanding transactions in milliseconds
        IllegalArgumentException - if duration <= 0
      • getMaxTransactionDuration

        public int getMaxTransactionDuration()
        Get the configured maximum supported duration for outstanding transactions.
        maximum supported duration for outstanding transactions in milliseconds
      • getCommitTimeout

        public int getCommitTimeout()
        Get the configured default transaction commit timeout.
        transaction commit timeout in milliseconds, or zero for unlimited
      • setFollowerProbingEnabled

        public void setFollowerProbingEnabled​(boolean followerProbingEnabled)
        Configure whether followers should be required to probe for network connectivity with a majority of the cluster after an election timeout prior to becoming a candidate.

        This value may be changed at any time.

        The default is enabled.

        followerProbingEnabled - true to enable, false to disable
      • isFollowerProbingEnabled

        public boolean isFollowerProbingEnabled()
        Determine whether follower probing prior to becoming a candidate is enabled.
        true if follower probing is enabled, otherwise false
      • setDisableSync

        public void setDisableSync​(boolean disableSync)
        Disable filesystem data sync.

        This gives higher performance in exchange for losing the guarantee of durability if the system crashes. Note: this feature is experimental and may violate consistency and/or durability guaratees.

        Default is false.

        disableSync - true to disable data sync
      • isDisableSync

        public boolean isDisableSync()
        Determine whether filesystem sync is disabled.
        true if filesystem sync is disabled, otherwise false
      • setDumpConflicts

        public void setDumpConflicts​(boolean dumpConflicts)
        Enable explicit logging of transaction conflicts.

        If enabled, when a transaction fails to due to conflicts, the conflicting key ranges are logged.

        Default is false.

        dumpConflicts - true to disable data sync
      • isDumpConflicts

        public boolean isDumpConflicts()
        Determine whether explicit logging of transaction conflicts is enabled.
        true if explicit logging of transaction conflicts is enabled, otherwise false
      • setThreadPriority

        public void setThreadPriority​(int threadPriority)
        Configure the priority of internal service threads.

        Default is -1, which means do not change thread priority from its default.

        threadPriority - internal service thread priority, or -1 to leave thread priority unchanged
        IllegalStateException - if this instance is already started
        IllegalArgumentException - if threadPriority is not -1 and not in the range Thread.MIN_PRIORITY to Thread.MAX_PRIORITY
      • getThreadPriority

        public int getThreadPriority()
        Get the configured internal service thread priority.
        internal service thread priority, or -1 if not configured
      • setPerformanceLogging

        public void setPerformanceLogging​(boolean performanceLogging)
        Configure whether to increase the log level for certain performance-related events (e.g., "info" instead of "debug").

        Performance-related events are events that affect performance and would be considered abnormal in a perfectly functioning Raft network, e.g., having to retransmit an acknowledgement.

        Default false.

        performanceLogging - true for higher level logging of performance-related events
      • getClusterId

        public int getClusterId()
        Retrieve the unique 32-bit ID for this node's cluster.

        A value of zero indicates an unconfigured system. Usually the reverse true, though an unconfigured system can have a non-zero cluster ID in the rare case where an error occurred persisting the initial log entry.

        the unique ID of this node's cluster, or zero if this node is unconfigured
      • getCurrentConfig

        public Map<String,​String> getCurrentConfig()
        Retrieve the current cluster configuration as understood by this node.

        Configuration changes are performed and committed in the context of a normal transaction; see RaftKVTransaction.configChange().

        If this system is unconfigured, an empty map is returned (and vice-versa).

        The returned map is a copy; changes have no effect on this instance.

        current configuration mapping from node identity to network address, or empty if this node is not started or unconfigured
      • isConfigured

        public boolean isConfigured()
        Determine whether this instance is configured.

        A node is configured if and only if it has at least one log entry. The first log entry always includes a configuration change that adds the node that created it to the (previously empty) cluster.

        true if this instance is started and configured, otherwise false
      • isClusterMember

        public boolean isClusterMember()
        Determine whether this node thinks that it is part of its cluster, as determined by its current configuration.
        true if this instance is started and part of the cluster, otherwise false
      • isClusterMember

        public boolean isClusterMember​(String node)
        Determine whether this node thinks that the specified node is part of the cluster, as determined by its current configuration.
        node - node identity
        true if this instance is started and the specified node is part of the cluster, otherwise false
      • getCurrentRole

        public Role getCurrentRole()
        Get this instance's current role: leadeer, follower, or candidate.
        current Role, or null if not running
      • getCurrentTerm

        public long getCurrentTerm()
        Get this instance's current term.
        current term, or zero if not running
      • getCurrentTermStartTime

        public long getCurrentTermStartTime()
        Get the time at which this instance's current term advanced to its current value.
        current term's start time in milliseconds since the epoch, or zero if unknown
      • getCommitIndex

        public long getCommitIndex()
        Get this instance's current commit index..
        current commit index, or zero if not running
      • getLastAppliedTerm

        public long getLastAppliedTerm()
        Get this instance's last applied log entry term.
        last applied term, or zero if not running
      • getLastAppliedIndex

        public long getLastAppliedIndex()
        Get this instance's last applied log entry index.
        last applied index, or zero if not running
      • getUnappliedLog

        public List<LogEntry> getUnappliedLog()
        Get the unapplied LogEntrys in this instance's Raft log.

        The returned list is a copy; changes have no effect on this instance.

        unapplied log entries; or null if this instance is not running
      • getUnappliedLogMemoryUsage

        public long getUnappliedLogMemoryUsage()
        Get the estimated total memory used by unapplied log entries.
        unapplied log entry memory usage, or zero if this instance is not running
      • getOpenTransactions

        public List<RaftKVTransaction> getOpenTransactions()
        Get the set of open transactions associated with this database.

        The returned set is a copy; changes have no effect on this instance.

        all open transactions
      • getLinearizableCommitTimestamp

        public Timestamp getLinearizableCommitTimestamp()
        Get the timestamp of the most recently committed linearizable transaction.

        This value can be used to confirm that the cluster is healthy.

        time of the most recent successful commit of a linearizable transaction, or null if none
      • start

        public void start()
        Description copied from interface: KVDatabase
        Start this instance. This method must be called prior to creating any transactions.

        This method is idempotent: if this instance is already started, nothing happens.

        Whether an instance that has been started and stopped can be restarted is implementation-dependent.

        Specified by:
        start in interface KVDatabase
      • stop

        public void stop()
        Description copied from interface: KVDatabase
        Stop this instance.

        This method is idempotent: if this instance has not been started, or is already stopped, nothing happens.

        Specified by:
        stop in interface KVDatabase
      • getLastInternalError

        public Throwable getLastInternalError()
        Get the exception most recently thrown by the internal service thread, if any. This is used mainly during testing.
        most recent service exception, or null if none
      • createTransaction

        public RaftKVTransaction createTransaction​(Map<String,​?> options)
        Description copied from interface: KVDatabase
        Create a new transaction with the specified options.
        Specified by:
        createTransaction in interface KVDatabase
        options - optional transaction options; may be null
        newly created transaction
      • createTransaction

        public RaftKVTransaction createTransaction​(Consistency consistency,
                                                   boolean highPriority)
        Create a new transaction with the specified consistency and with optional high priority.
        consistency - consistency level
        highPriority - true to make transaction high priority
        newly created transaction
        IllegalArgumentException - if consistency is null
        IllegalStateException - if this instance is not started or in the process of shutting down