by

Send or receive events from Azure Event Hub using Python

This article is an quickstart demo of how one can send or receive events from Azure Event Hub using python script. If you are new to Event Hubs please check my previous post which explains the basics before you continue. We will be using two python scripts, ‘send.py’ and ‘recv.py’ for sending and receiving test messages in this exercise.

I created an Azure VM with ubuntu image to test the send/receive python scripts. Obviously creating an Azure VM is very cheap, easily done in few steps and you can connect to the new machine within minutes. Don’t forget the steps required for connectivity like adding port in firewall and in case you want to connect the VM with remote desktop connection there are certain commands that has to be run before you try to RDP, I wasted few mins before knowing these.

Make sure the following prerequisites are completed in the VM before you start:

  • Python version 2.7 or above
  • Python package for Event Hubs
  • Checkpoint blob for events to use ‘Azure Blob Storage’ for storing checkpoints while processing events from Azure Event Hubs

I installed python and installed the python package with the following commands from putty, you can also run this in terminal directly into the vm after connecting via rdp.

sudo apt install python3-pip

pip install azure-eventhub

After completing the above proceed with installing checkpoint store blob with below command

pip install azure-eventhub-checkpointstoreblob-aio

Event Hub Namespace

The first step is to create an Event Hubs Namespace followed by an EventHub and get the credentials that you can use to your application to communicate with Event Hubs. If you want to learn how to create a namespace and event hub, please refer my other post.

Sending the events using send.py

Open your favorite script editor for the python script we are going to use. I used ‘atom’ editor which was readily available to me. In case if you don’t have any editor and do not want to waste downloading it or if you have any installation restrictions on your laptop you can simply use ‘Notepad++’ which in my opinion serves our purpose here.

Save the below code as send.py

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())
getting event hub namespace and event hub name for conn string

more detailed source code is available at GitHub in the following link

https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py

Before creating an script to Receive events you have to create ‘Azure Blob Storage’ (blob container) which will be used as checkpoint store. The checkpoint store is used to maintain checkpoints/last read positions.

Create blob container

You can create a blob container by first creating an Azure Storage Account.

Receiving the events using recv.py

Similar to the send.py script copy paste the below script and save as recv.py in your preferred location

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore


async def on_event(partition_context, event):
    # Print the event data.
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
    async with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main())    
storage conn string

Run the receive script

Open a terminal that has python in its path and run the recv.py. Once you run it, you wont be seeing any difference initially as we have not executed our send.py file yet.

python3 recv.py

Run the send script

In another terminal window and run the below script to initiate the message flow

python3 send.py

The send script has started sending the messages, as I checked i could see the message received acknowledgment and the partition it was saved into with its ID

recv.py

The overview page available in the left navigation pane of azure portal will let you see graphical monitor overview of the sum of messages, number of requests and their throughputs.

If you feel you need deeper insights you can go with ‘Monitoring’ tab available in the pane which have loads of parameters you can select as metrics. This is one of the best options if you want to have an complete picture of how your events are doing and the very good part of this dashboard is its customizable based on your need for hours or days of data.

Azure Monitor metrics data is available for 90 days. However, when creating charts only 30 days can be visualized. For example, if you want to visualize a 90 day period, you must break it into three charts of 30 days within the 90 day period.

1k messages logged
we can see the events physically getting logged into container inside the storage

Write a Comment

Comment