Azure Event Hubs Latest

Scale applications based on Azure Event Hubs.

Availability: v1.0+ Maintainer: Microsoft

Trigger Specification

This specification describes the azure-eventhub trigger for Azure Event Hubs.

triggers:
- type: azure-eventhub
  metadata:
    connectionFromEnv: EVENTHUB_CONNECTIONSTRING_ENV_NAME
    storageConnectionFromEnv: STORAGE_CONNECTIONSTRING_ENV_NAME
    consumerGroup: $Default
    unprocessedEventThreshold: '64'
    activationUnprocessedEventThreshold: '10'
    blobContainer: 'name_of_container'
    # Optional (Default: AzurePublicCloud)
    cloud: Private
    # Required when cloud = Private
    endpointSuffix: servicebus.airgap.example
    # Required when cloud = Private
    storageEndpointSuffix: airgap.example
    # Required when using pod identity authentication with blob storage
    storageAccountName: 'name_of_account'

Parameter list:

  • connectionFromEnv - Name of the environment variable your deployment uses to get the connection string appended with EntityPath=<event_hub_name>. If the connection string does not end with EntityPath=<event_hub_name>, then the parameters eventHubName / eventHubNameFromEnv must be used to provide the name of the Event Hub.
  • storageConnectionFromEnv - Name of the environment variable that provides connection string for Azure Storage Account to store checkpoint. As of now the Event Hub scaler only reads from Azure Blob Storage. (Only required when not using pod identity)
  • consumerGroup - Consumer group of Azure Event Hub consumer. (default: $default, Optional)
  • unprocessedEventThreshold - Average target value to trigger scaling actions. (Default: 64, Optional)
  • activationUnprocessedEventThreshold - Target value for activating the scaler. Learn more about activation here. (Default: 0, Optional)
  • blobContainer - Container name to store checkpoint. This is needed for every checkpointStrategy except of AzureFunction. With Azure Functions the blobContainer is autogenerated and cannot be overridden.
  • eventHubNamespace - Name of the Event Hub namespace which has the Event Hub. (Optional)
  • eventHubNamespaceFromEnv - Name of the environment variable that provides the name of the Event Hub namespace, which has the Event Hub. (Optional)
  • eventHubName - Name of the Event Hub containing the messages. (Optional)
  • eventHubNameFromEnv - Name of the environment variable that provides the name of the Event Hub, containing the messages. (Optional)
  • storageAccountName - Account name for blob storage used for checkpoints. (Required when storageConnectionFromEnv is not specified. The storage account name is the first part in the hostname of a Blob Storage endpoint. E.g.: for examplename.blob.core.windows.net the account name is examplename.)
  • checkpointStrategy - configure the checkpoint behaviour of different Event Hub SDKs. (Values: azureFunction, blobMetadata, goSdk, default: "", Optional)
    • azureFunction - Suitable for Azure Functions & Azure WebJobs SDK. This is the default setting, when blobcontainer is not specified.
    • blobMetadata - For all implementations that store checkpoint information on blob metadata such as current C#, Python, Java and JavaScript Event Hub SDKs.
    • goSdk - For all implementations using the Golang SDK’s checkpointing.
    • dapr - Suitable for Dapr pubsub and bindings, depending on the used Dapr version:
      • pubsub components: >= Dapr 1.6 (older versions need the GoSdk checkpointer)
      • binding components: >= Dapr 1.9 (older versions need the GoSdk checkpointer)
    • When no checkpoint strategy is specified, the Event Hub scaler will use backwards compatibility and able to scale older implementations of C#, Python or Java Event Hub SDKs. (see “Legacy checkpointing”). If this behaviour should be used, blobContainer is also required.
  • cloud - Name of the cloud environment that the Event Hub belongs to. (Values: AzurePublicCloud, AzureUSGovernmentCloud, AzureChinaCloud, AzureGermanCloud, Private, Default: AzurePublicCloud, Optional)
  • endpointSuffix - Service Bus endpoint suffix of the cloud environment. (Required when cloud is set to Private, e.g. servicebus.cloudapi.de for AzureGermanCloud).
  • storageEndpointSuffix - Blob Storage endpoint of the cloud environment. (Required when cloud is set to Private, e.g. airgap.example. Do not include the blob part of the endpoint.)

When using pod identity, Microsoft Entra ID endpoint is recovered via AZURE_AUTHORITY_HOST env var provided by https://azure.github.io/azure-workload-identity/docs/installation/mutating-admission-webhook.html

💡 Learn more about the checkpointing behaviour in this section.

💡 The Azure Storage connection string is not compatible with connection string created from a Shared Access Signature.

Authentication Parameters

You can authenticate by using pod identity or connection string authentication.

Connection String Authentication:

  • connection - Connection string for the Azure Event Hubs Namespace.

    The following formats are supported.

    • With SharedAccessKey - Endpoint=sb://<sb>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>;EntityPath=<hub-name>.
  • storageConnection - Connection string for the Azure Storage Account used to store checkpoint information.

💡 When providing connection, EntityPath is optional. If it is not provided, then eventHubName must be used to provide the name of the Azure Event Hub instance to use inside the namespace.

Pod identity based authentication:

Azure AD Pod Identity or Azure AD Workload Identity providers can be used.

apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: nameOfTriggerAuth
  namespace: default
spec:
  podIdentity:
    provider: Azure | azure-workload

When you do so, the Event Hub scaler will depend on the existence of two configurations you have to provide: eventHubNamespace and eventHubName. You can also configure storageAccountName if you wish to use Azure AD Pod / Workload Identity to authenticate to Azure Blob Storage instead of a connection string.

💡 When using Azure AD Pod Identity to authenticate the identity must have appropriate RBAC role-assignments for both Event Hub and Storage Account. Permissions covered by Azure Event Hubs Data Receiver and Storage Blob Data Reader are required.

Checkpointing Behaviour

The list of available checkpointing strategies can be found in the trigger specification section. The way checkpoints are stored has changed with updates to the EventHub SDKs.

  • Legacy behaviour: The older implementations are based on the EventProcessorHost client, which stores the checkpoint information as contents of the storage blob. This is the default behaviour when no checkpointStrategy is specified. This is applicable for the following scenarios:

    • .NET applications using Microsoft.Azure.EventHubs NuGet package.
    • Java applications using azure-eventhubs-eph package.
    • Python applications using azure-eventhub package below v5.
  • Current behaviour: The newer implementations are based on the EventProcessorClient, which stores the checkpoint information as metadata on the storage blob. This is the behaviour when checkpointStrategy is set to blobMetadata. This is applicable for the following scenarios:

    • .NET applications using Azure.Messaging.EventHubs NuGet package.
    • Python applications using azure-eventhub v5.
    • .NET Azure Functions using Microsoft.Azure.WebJobs.Extensions.EventHubs v5.
    • Azure Functions in other languages using Microsoft.Azure.Functions.ExtensionBundle v3.

💡 blobContainer name is required for applications following legacy behaviour.

💡 Users should set blobContainer to azure-webjobs-eventhub for Azure Functions using blobMetadata as checkpointStrategy.

Example

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: azure-eventhub-scaledobject
  namespace: default
spec:
  scaleTargetRef:
    name: azureeventhub-function
  triggers:
  - type: azure-eventhub
    metadata:
      # Required
      storageConnectionFromEnv: AzureWebJobsStorage
      # Required if not using Pod Identity
      connectionFromEnv: EventHub
      # Required if using Pod Identity
      eventHubNamespace: AzureEventHubNameSpace
      eventHubName: NameOfTheEventHub
# Optional
      consumerGroup: $Default # default: $Default
      unprocessedEventThreshold: '64' # default 64 events.
      blobContainer: ehcontainer