ExceptionFactory

Producing content that a reasonable developer might want to read

Bringing Kubernetes Clustering to Apache NiFi

NiFi Kubernetes Clustering

2024-08-10 • 8 minute read • David Handermann

Background

Apache ZooKeeper has provided leader election and shared state management for Apache NiFi clustering since the NiFi project released version 1.0.0 in 2016. Building on Apache Curator libraries, NiFi clustering with ZooKeeper has enabled scalable deployment scenarios and fault-tolerant processing pipelines. Delegating clustering concerns has enabled NiFi to support a wide range of installation strategies from bare metal to virtual machines. With the rise of containerization and the popularity of Kubernetes for managing services at scale, integrating NiFi with native Kubernetes capabilities presents multiple potential benefits. Although ZooKeeper provides service coordination regardless of the deployment platform, several Kubernetes features provide better alternatives when running NiFi as a set of scalable containers.

Introduction

NiFi 2.0.0 introduces new capabilities leveraging Kubernetes ConfigMaps and Leases to support clustering without the need for ZooKeeper. This strategy reduces the deployment complexity required for NiFi clustering on Kubernetes, eliminating both the configuration and resources required for ZooKeeper service coordination. Integrating with Kubernetes services also enables cluster leader tracking and shared state monitoring using standard Kubernetes features. NiFi has supported extensible shared state tracking since version 0.5.0, but relied on ZooKeeper as the sole solution for leader election until version 2.0.0. Extending an initial abstraction, NiFi 2.0.0-M1 moved leader election interfaces to the nifi-framework-api library. Although the framework does not have the same set of compatibility constraints as the public nifi-api, promoting leader election to a framework extension decoupled clustering from ZooKeeper, providing the basis for a new implementation with Kubernetes.

Framework Libraries

The Kubernetes API provides mediated access to container orchestration services through an HTTP REST API. With an OpenAPI specification defining the API contract, several Java libraries are available for client integration.

Evaluating Kubernetes Clients

The official Java client library for Kubernetes provides complete support for control plane operations, tracking new Kubernetes server versions with incremented major version numbers. This versioning strategy follows the principles of Semantic Versioning, maintaining support for older versions of Kubernetes, but introducing the potential for breaking changes. This approach results in frequent major version updates with breaking changes across methods that include new parameters.

The Fabric8 Kubernetes Client library is a popular alternative to the official client library, also incorporating full support for the Kubernetes API. As an alternative, Fabric8 provides a fluent API for Kubernetes operations, which lends itself to fewer breaking changes across client library versions. The structure of Fabric8 includes the kubernetes-client-api, which abstracts core client interfaces from implementation concerns. Based on this approach, the Fabric8 library has frequent minor version updates, but has fewer major version changes than the official client library.

Although both libraries provide the features necessary to integrate with Kubernetes ConfigMaps and Leases, the NiFi clustering implementation selected the Fabric8 library based on its cleaner interface abstractions and stronger compatibility guarantees. Multiple milestone releases of NiFi 2.0.0 have incorporated Fabric8 library upgrades, but none of the upgrades have required code changes. This fact highlights the benefits of semantic versioning and interface decoupling that the Fabric8 library provides.

NiFi Integration Libraries

Building on the Fabric8 Kubernetes Client library, the nifi-kubernetes-client library provides shared components for general access to Kubernetes. In addition to wrapping access to the Kubernetes Client itself, the NiFi library includes a namespace provider abstraction with an implementation capable of reading standard service account information. This implementation strategy makes use of common Kubernetes conventions, reducing the number of configuration properties required.

The common NiFi client library for Kubernetes supports both the leader election and shared state components, which implement clustering features. The NiFi state provider implementation integrates with Kubernetes ConfigMaps to store and retrieve component state information, such as timestamps, counters, or references required to maintain consistent flow behavior. The nifi-framework-kubernetes-state-provider library contains the components implementing storage and retrieval operations.

The NiFi leader election implementation for Kubernetes also builds on the common client library, and uses extended elector abstractions from Fabric8. The nifi-framework-kubernetes-leader-election library includes the components implementing NiFi leader election using Kubernetes Leases. The Fabric8 library itself supports leader election tracking through either ConfigMaps or Leases, but the strategy based on Kubernetes Leases provides a more natural implementation for centralized locking. The leader election implementation has several timeout and retry properties that influence behavior for cluster members. Rather than providing an additional layer of configuration complexity, the initial implementation uses lease duration and retry period settings derived from the defaults for Kubernetes Scheduling. This approach sets the lease duration to 15 seconds and the retry period to 2 seconds, with the expectation that a cluster coordinator failing to renew a lease in short timeframe should be replaced sooner rather than later.

The nifi-framework-kubernetes-nar brings together these implementation modules and provides a single bundle for packaging Kubernetes clustering capabilities. Isolating cluster components for Kubernetes in a single NAR simplifies the process required to build or exclude these capabilities.

System Configuration

In addition to promoting leader election to a framework-level extension, supporting native clustering on Kubernetes required introducing new configuration properties and adapting NiFi concepts to Kubernetes capabilities. With a focus on usability, clustering with Kubernetes relies on service account defaults and convention over configuration.

Application Configuration

Extensible leader election required a new application property indicating the implementation class.

nifi.cluster.leader.election.implementation

The default value is CuratorLeaderElectionManager supporting clustering with ZooKeeper. This setting also uses existing ZooKeeper application properties for configuration.

Setting the value to KubernetesLeaderElectionManager instructs NiFi to use Kubernetes Leases for leader election.

nifi.cluster.leader.election.implementation=KubernetesLeaderElectionManager

The default behavior of the Kubernetes implementation creates and maintains Kubernetes Leases with the following names:

The Kubernetes implementation also supports an optional property for prefixing the Lease roles.

nifi.cluster.leader.election.kubernetes.lease.prefix

The prefix property allows for unique lease names for each NiFi cluster in a Kubernetes namespace. Best practices for NiFi deployments on Kubernetes include separating clusters in distinct namespaces, so the lease prefix property is not required.

State Management Configuration

Clustering on Kubernetes requires shared state tracking in addition to leader election for standard operations. The KubernetesConfigMapStateProvider implementation persists shared state information in Kubernetes ConfigMaps for each component that requires cluster state tracking.

The default state-management.xml configuration includes a cluster-provider definition named kubernetes-provider that can be configured in application properties. The default configuration of the kubernetes-provider uses the Kubernetes service account to manage ConfigMaps in the namespace where NiFi is running.

Based on the default state management configuration, the cluster state management property can be configured with the kubernetes-provider for shared state storage in Kubernetes.

nifi.state.management.provider.cluster=kubernetes-provider

The Kubernetes state provider supports an optional property named ConfigMap Name Prefix that is similar to the prefix property for Leases. Without the ConfigMap Name Prefix configured, the Kubernetes state provider creates ConfigMaps named nifi-component-{id} where {id} is the unique identifier for the component. The prefix property follows the same convention, prepending the configured value to the standard label.

Implementation Highlights

Kubernetes client libraries provide useful abstractions around API communication, but clustering support for NiFi required several important design choices for integrating with Kubernetes resources.

Leader Election with Kubernetes Leases

The Fabric8 kubernetes-client-api library provides a set of classes for leader election, supporting configurable implementation strategies. The LeaderElectorBuilder is accessible from the core KubernetesClient interface, streamlining the configuration process. The builder class supports ConfigMaps or Leases for centralized locking, which is essential for consistent cluster behavior. Although either resource would support leader election, Kubernetes Leases have standard specification properties that identify the current holder and track acquisition times. These properties integrate with common tools for Kubernetes, avoiding potential confusion related to ConfigMap properties. The Fabric8 LeaseLock class provides the Lease implementation.

The Fabric8 LeaderElector has several methods to control participation in a leader election, and it is important to differentiate between the run() method and the start() method. The start() method begins participation in a separate thread, returning to the caller. The run() method also begins participation, but blocks the caller and continues the loop until a separate action interrupts the calling thread. The start() method is helpful in some scenarios, but it does not provide a straightforward strategy for restarting election participation in the event of communication failures. For this reason, the NiFi implementation uses the run() method and triggers the method in a dedicated command. This approach provides greater control over failure conditions, ensuring continued leader election participation as needed.

State Management with Kubernetes ConfigMaps

Kubernetes ConfigMaps serve as a general resource for storing pairs of keys and values. This resource provides a natural fit for cluster state maps in NiFi, which also support storing pairs of keys and values. Kubernetes ConfigMaps have a maximum storage limit of 1MiB and NiFi relies on the Kubernetes API to enforce this limitation. ZooKeeper has similar size limits on stored information, so standard NiFi components should not have issues with maximum storage constraints.

NiFi state tracking associates stored information to individual components using a unique identifier. The Kubernetes implementation creates separate ConfigMaps for each component that requires cluster state information. ConfigMaps must be named according to the conventions of DNS Subdomain Names and the NiFi state implementation meets this requirement based on the use of UUID strings combined with a standard prefix for ConfigMap names.

ConfigMaps have additional constraints on the naming of keys within the map, requiring that valid keys are limited to alphanumeric characters or one of three possible separator characters. NiFi itself does not have the same level of restriction on state map keys, allowing characters such as spaces, raising compatibility concerns with ConfigMap keys. For this reason, the Kubernetes state provider applies Base64 encoding without padding before storing ConfigMap keys and values. Reading ConfigMap keys and values requires decoding keys before returning state information.

Conclusion

Cluster deployment with Apache NiFi involves a strong understanding of infrastructure requirements. This includes processing and memory considerations as well as network and security settings. Kubernetes brings declarative configuration and simplified scaling in return for an additional layer of abstraction. With the introduction of native clustering on Kubernetes, NiFi deployments support the benefits of clustering without the resource and configuration burden of ZooKeeper. For deployments on Kubernetes, direct integration simplifies the configuration, unlocking the potential for advanced features such as enterprise management and automatic scaling.