ExceptionFactory

Producing content that a reasonable developer might want to read

Streamlining Apache NiFi Cluster State Migration

NiFi Clustering ZooKeeper Kubernetes

2023-07-01 • 6 minute read • David Handermann

Background

State tracking is a core framework feature of Apache NiFi, enabling Processors such as ListSFTP and ListS3 to enumerate new files without duplicating information from previous runs. Processors can indicate state management requirements using the Stateful annotation, and declare a required scope of LOCAL or CLUSTER along with a description. Standalone NiFi deployments store state information in a local directory, and clustered deployments depend on a configurable Cluster State Provider. Distributed and resilient state tracking is essential for clustered processing, enabling one node to pick up where another left off in the event of individual node failures. NiFi has depended on Apache ZooKeeper as the de facto solution for cluster state management, but also supports Redis through an extensible interface. The NiFi 2.0 release includes support for state management with Kubernetes using ConfigMaps to store component information for clustered deployments.

Introduction

Maintaining and upgrading NiFi deployments may require migrating cluster state under some circumstances. Whether moving from an embedded to an external ZooKeeper cluster, or switching cluster state providers, preserving and migrating cluster state information is an important part of ensuring reliable data processing. Initial versions of NiFi included the ZooKeeper Migrator to support transitioning between ZooKeeper deployments, but the ZooKeeper Migrator does not support TLS for encrypted communication and requires temporary file storage of state information. NIFI-10976 introduced cluster state migration as a feature of the NiFi framework itself, streamlining the state migration process regardless of state provider implementation. The improved state migration feature simplifies the operational process while supporting the complete set of access options for state providers including ZooKeeper, Redis, and Kubernetes ConfigMaps.

Migration Process

The cluster state migration process requires configuring the new state provider settings and updating application properties to reflect the current and previous providers. The process requires a clean destination provider, meaning that the new state provider service must not contain any component information prior to migration.

Cluster state migration involves the following steps:

  1. Add new Cluster State Provider definition to state-management.xml
  2. Update Cluster State Provider and Previous Provider identifiers in nifi.properties
  3. Stop all NiFi nodes
  4. Start all NiFi nodes
  5. Remove Previous Cluster State Provider identifier in nifi.properties

Adding the provider definition to state-management.xml and updating nifi.properties can be completed while the NiFi cluster is running.

Adding a New Cluster State Provider

The standard binary distribution of NiFi includes state-management.xml with a ZooKeeperStateProvider defined as the Cluster State Provider. The Connect String property contains the comma-separated list of ZooKeeper host and port references, and the default Cluster State Provider identifier is zk-provider.

For migration to another ZooKeeper cluster, a new Cluster State Provider must be configured with a different id value for reference in nifi.properties. The existing provider must remain configured to support the migration process. The following Cluster State Provider definition uses new-provider as the id value for the new destination:

<cluster-provider>
  <id>new-provider</id>
  <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
  <property name="Connect String">node-1:2181,node-2:2181,node-3:2181</property>
  <property name="Root Node">/nifi</property>
  <property name="Session Timeout">10 seconds</property>
  <property name="Access Control">Open</property>
</cluster-provider>

The Connect String property provides an example of a ZooKeeper cluster consisting of three nodes. The Connect String property must reflect the DNS names or IP addresses of ZooKeeper cluster nodes.

Updating Cluster State Provider Identifiers

The nifi.properties configuration controls the selected Cluster State Provider using the following property:

nifi.state.management.provider.cluster

The default configuration specifies the ZooKeeper State Provider using the zk-provider identifier:

nifi.state.management.provider.cluster=zk-provider

Configuring cluster state migration requires updating the identifier property to reference the new provider, defined above using new-provider as the identifier:

nifi.state.management.provider.cluster=new-provider

The previous Cluster State Provider serves as the source of existing component state information. With zk-provider as the original value for the Cluster State Provider, that identifier can be set as the value for the previous provider:

nifi.state.management.provider.cluster.previous=zk-provider

With both the new and previous Cluster State Providers defined and referenced in application properties, NiFi cluster nodes can be restarted to begin the migration.

Restart Cluster Nodes

All NiFi cluster nodes must be shutdown to ensure that the source Cluster State Provider contains current information. Failing to stop all cluster nodes will leave one primary node processing and updating state information, resulting in unexpected state information, and leading to potential data duplication or other issues.

After starting NiFi cluster nodes with new and previous Cluster State Providers, the node elected as primary will enable the Cluster State Provider and determine whether to proceed with state migration.

On initial startup, the primary node enumerates the contents of the current and previous Cluster State Providers. After confirming that the current provider is empty and the previous provider contains state information, the primary node logs the component identifiers selected for migration. Log messages contain the current and previous identifiers of Cluster State Providers for troubleshooting and status evaluation.

On subsequent restarts, the primary node enumerates the contents of the current Cluster State Provider and ignores the previous provider when finding current state information.

Remove Previous Cluster State Provider

The NiFi cluster will continue to function without additional changes while having a previous Cluster State Provider configured. As long as the current Cluster State Provider contains state information, NiFi will not attempt to enumerate the previous provider. The previous Cluster State Provider configuration should be removed to maintain current configuration and avoid unexpected migration attempts.

Removing the value of nifi.state.management.provider.cluster.previous from nifi.properties is sufficient to disable possible cluster state migration. The Cluster State Provider definition can be removed from state-management.xml for completeness, but NiFi ignores provider definitions that are not referenced in application properties.

StateProvider Interface Changes

The StateProvider interface can be implemented and provided in an extension NAR to support alternative cluster state services. Supporting cluster state migration requires implementing two new methods, both of which have default implementations to provide basic compatibility with existing providers:

/**
 * Indicates whether the State Provider supports enumerating stored state information
 *
 * @return Component enumeration supported status
 */
default boolean isComponentEnumerationSupported() {
    return false;
}

/**
 * Get Component Identifiers with associated state stored in the Provider
 *
 * @return Collection of Component Identifiers with stored state defaults to empty
 * @throws IOException Thrown on failures to retrieve component identifiers
 */
default Collection<String> getStoredComponentIds() throws IOException {
    return Collections.emptyList();
}

As described in the default implementations, State Providers that do not implement these methods will not support state migration. State Provider developers must implement both methods to support cluster state migration, otherwise the NiFi framework will throw an exception listing the unsupported State Provider. Custom State Providers do not need to implement these methods if migration is not required.

Conclusion

Cluster state migration is not a common process for a stable NiFi deployment, but it can be a useful feature for containerized flows or upgrades that involve significant architectural adjustments. With minor changes to internal State Provider architecture, NiFi 2.0 incorporates a cluster state migration strategy that obviates the need for duplicative processes and external tools. Although alternative State Providers are not a common feature request, implementing framework support for cluster state migration highlights the importance of well-defined interfaces that enable architectures to adapt to technical changes.