Azure Event Hubs Latest
Scale applications based on Azure Event Hubs.
Trigger Specification
This specification describes the azure-eventhub trigger for Azure Event Hubs.
triggers:
- type: azure-eventhub
metadata:
connectionFromEnv: EVENTHUB_CONNECTIONSTRING_ENV_NAME
storageConnectionFromEnv: STORAGE_CONNECTIONSTRING_ENV_NAME
consumerGroup: $Default
unprocessedEventThreshold: '64'
activationUnprocessedEventThreshold: '10'
blobContainer: 'name_of_container'
# Optional (Default: AzurePublicCloud)
cloud: Private
# Required when cloud = Private
endpointSuffix: servicebus.airgap.example
# Required when cloud = Private
storageEndpointSuffix: airgap.example
# Required when using pod identity authentication with blob storage
storageAccountName: 'name_of_account'
Parameter list:
connectionFromEnv- Name of the environment variable your deployment uses to get the connection string appended withEntityPath=<event_hub_name>. If the connection string does not end withEntityPath=<event_hub_name>, then the parameterseventHubName/eventHubNameFromEnvmust be used to provide the name of the Event Hub.storageConnectionFromEnv- Name of the environment variable that provides connection string for Azure Storage Account to store checkpoint. As of now the Event Hub scaler only reads from Azure Blob Storage. (Only required when not using pod identity)consumerGroup- Consumer group of Azure Event Hub consumer. (default:$default, Optional)unprocessedEventThreshold- Average target value to trigger scaling actions. (Default:64, Optional)activationUnprocessedEventThreshold- Target value for activating the scaler. Learn more about activation here. (Default:0, Optional)blobContainer- Container name to store checkpoint. This is needed for everycheckpointStrategyexcept ofAzureFunction. With Azure Functions theblobContaineris autogenerated and cannot be overridden.eventHubNamespace- Name of the Event Hub namespace which has the Event Hub. (Optional)eventHubNamespaceFromEnv- Name of the environment variable that provides the name of the Event Hub namespace, which has the Event Hub. (Optional)eventHubName- Name of the Event Hub containing the messages. (Optional)eventHubNameFromEnv- Name of the environment variable that provides the name of the Event Hub, containing the messages. (Optional)storageAccountName- Account name for blob storage used for checkpoints. (Required whenstorageConnectionFromEnvis not specified. The storage account name is the first part in the hostname of a Blob Storage endpoint. E.g.: forexamplename.blob.core.windows.netthe account name isexamplename.)checkpointStrategy- configure the checkpoint behaviour of different Event Hub SDKs. (Values:azureFunction,blobMetadata,goSdk, default:"", Optional)azureFunction- Suitable for Azure Functions & Azure WebJobs SDK. This is the default setting, whenblobcontaineris not specified.blobMetadata- For all implementations that store checkpoint information on blob metadata such as current C#, Python, Java and JavaScript Event Hub SDKs.goSdk- For all implementations using the Golang SDK’s checkpointing.dapr- Suitable for older Dapr pubsub and bindings- Compatible Dapr versions:
- pubsub components: 1.6, 1.7, 1.8, 1.9
- bindings: 1.9
- For newer Dapr versions, use the
blobMetadatastrategy. - For older Dapr versions, use the
goSdkstrategy.
- Compatible Dapr versions:
- When no checkpoint strategy is specified, the Event Hub scaler will use backwards compatibility and able to scale older implementations of C#, Python or Java Event Hub SDKs. (see “Legacy checkpointing”). If this behaviour should be used,
blobContaineris also required.
cloud- Name of the cloud environment that the Event Hub belongs to. (Values:AzurePublicCloud,AzureUSGovernmentCloud,AzureChinaCloud,AzureGermanCloud,Private, Default:AzurePublicCloud, Optional)endpointSuffix- Service Bus endpoint suffix of the cloud environment. (Required whencloudis set toPrivate, e.g.servicebus.cloudapi.deforAzureGermanCloud).storageEndpointSuffix- Blob Storage endpoint of the cloud environment. (Required whencloudis set toPrivate, e.g.airgap.example. Do not include theblobpart of the endpoint.)
When using pod identity, Microsoft Entra ID endpoint is recovered via
AZURE_AUTHORITY_HOSTenv var provided by https://azure.github.io/azure-workload-identity/docs/installation/mutating-admission-webhook.html
💡 Learn more about the checkpointing behaviour in this section.
💡 The Azure Storage connection string is not compatible with connection string created from a Shared Access Signature.
Authentication Parameters
You can authenticate by using pod identity or connection string authentication.
Connection String Authentication:
-
connection- Connection string for the Azure Event Hubs Namespace.The following formats are supported.
- With SharedAccessKey -
Endpoint=sb://<sb>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>;EntityPath=<hub-name>.
- With SharedAccessKey -
-
storageConnection- Connection string for the Azure Storage Account used to store checkpoint information.
💡 When providing
connection,EntityPathis optional. If it is not provided, theneventHubNamemust be used to provide the name of the Azure Event Hub instance to use inside the namespace.
Pod identity based authentication:
Azure AD Workload Identity provider can be used.
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: nameOfTriggerAuth
namespace: default
spec:
podIdentity:
provider: azure-workload
When you do so, the Event Hub scaler will depend on the existence of two configurations you have to provide: eventHubNamespace and eventHubName. You can also configure storageAccountName if you wish to use Azure AD Pod / Workload Identity to authenticate to Azure Blob Storage instead of a connection string.
💡 When using Azure AD Pod Identity to authenticate the identity must have appropriate RBAC role-assignments for both Event Hub and Storage Account. Permissions covered by
Azure Event Hubs Data ReceiverandStorage Blob Data Readerare required.
Checkpointing Behaviour
The list of available checkpointing strategies can be found in the trigger specification section. The way checkpoints are stored has changed with updates to the EventHub SDKs.
-
Legacy behaviour: The older implementations are based on the
EventProcessorHostclient, which stores the checkpoint information as contents of the storage blob. This is the default behaviour when nocheckpointStrategyis specified. This is applicable for the following scenarios:- .NET applications using
Microsoft.Azure.EventHubsNuGet package. - Java applications using
azure-eventhubs-ephpackage. - Python applications using
azure-eventhubpackage below v5.
- .NET applications using
-
Current behaviour: The newer implementations are based on the
EventProcessorClient, which stores the checkpoint information as metadata on the storage blob. This is the behaviour whencheckpointStrategyis set toblobMetadata. This is applicable for the following scenarios:- .NET applications using
Azure.Messaging.EventHubsNuGet package. - Python applications using
azure-eventhubv5. - .NET Azure Functions using
Microsoft.Azure.WebJobs.Extensions.EventHubsv5. - Azure Functions in other languages using
Microsoft.Azure.Functions.ExtensionBundlev3.
- .NET applications using
💡
blobContainername is required for applications following legacy behaviour.
💡 Users should set
blobContainertoazure-webjobs-eventhubfor Azure Functions usingblobMetadataascheckpointStrategy.
Example
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: azure-eventhub-scaledobject
namespace: default
spec:
scaleTargetRef:
name: azureeventhub-function
triggers:
- type: azure-eventhub
metadata:
# Required
storageConnectionFromEnv: AzureWebJobsStorage
# Required if not using Pod Identity
connectionFromEnv: EventHub
# Required if using Pod Identity
eventHubNamespace: AzureEventHubNameSpace
eventHubName: NameOfTheEventHub
# Optional
consumerGroup: $Default # default: $Default
unprocessedEventThreshold: '64' # default 64 events.
blobContainer: ehcontainer