Using Azure Stream Analytics

An example of using Azure Stream Analytics to query Events in Event Hub

Using Azure Stream Analytics
A Stream of event data flows from one point and ends at another, changing its shape along the way 

Azure Stream Analytics is a way to run light weight queries in the Azure cloud on event streams that arrive into Azure using  SQL-like queries. Azure Stream Analytics is able to run transformations on events and turn the input events into another data shape to be sent to other data sinks in Azure. The raw events going into Azure can be in the form of telemetry, IoT events, data logs or other events originating from API calls for example.

Azure Stream Analytics is able to run queries on input event data that arrives in Azure IoT Hub, Azure Event Hubs, and Azure Blob Storage at the time of this writing. After running the query, the transformed data can then be sent to even more output data sinks such as Data Lake, Blob Storage, Functions, PowerBI (for DataViz wizards out there) and more.

The ultimate benefits of Azure Stream Analytics from my experience was that it can save on the overhead of having to write the boilerplate SQL database connection code in cases where a Client application may need a SQL database for storing events in and have to connect to it.

Here is an example setup:  

Components used

The setup  above involves:

  • A browser - the calling client to make an HTTP call to an Azure Function
  • An Azure Function (using .NET Core 3.1, also see package references xml below) with an HTTP Trigger and Event Hub output binding
  • A store of raw data to use as events - in this case some World Bank Data retrieved by an API call
  • An Azure Event Hub - to initially store the incoming events, and this Event Hub is used as the output binding on the Azure Function
  • An Azure Stream Analytics Job - to listen for events on the Event Hub, query them with a pre-defined query and send the query results to Blob Storage
  • Azure Blob Storage - to store output results

The Azure Function Code:

public static class FetchData
{
    private static HttpClient client = new HttpClient();

    [FunctionName("FetchData")]
    public static async Task<IActionResult> Run(
    [HttpTrigger(AuthorizationLevel.Function, "get", Route = null)] HttpRequest req, 
    [EventHub("clouddemo", Connection = "EventHubConnectionAppSetting")] 
    IAsyncCollector<WBData> events, ILogger log)
    {    //api call to get data
        var uri = new Uri("https://api.worldbank.org/v2/country/all/indicator/SP.POP.TOTL?date=1980:2010&format=json");
        var request = new HttpRequestMessage(HttpMethod.Get, uri);
        var responseMessage = await client.SendAsync(request);

        var rawdata = await responseMessage.Content.ReadAsStringAsync();
        log.LogInformation("printed data .. " + rawdata);
        
        //sanitise the response data before deserialising and sending to hub
        var extraNavData = rawdata.Split("},")[0];
        rawdata = rawdata.Replace(extraNavData, "");
        rawdata = rawdata.Remove(rawdata.Length - 1).Substring(2);
        var result = JsonConvert.DeserializeObject<List<WBData>>(rawdata);
        foreach (var item in result)
        {   //add data as 'events' to Event Hub
        	await events.AddAsync(item);
        }
        return new OkObjectResult(rawdata); //respond back to browser with raw data found
    }
}
Fetch some data from a source, segment each item found into events into Azure Event Hub
public class WBData
{
    public Indicator indicator { get; set; }
    public Country country { get; set; }
    public string countryiso3code { get; set; }
    public string date { get; set; }
    public int value { get; set; }
    public string unit { get; set; }
    public string obs_status { get; set; }
    public int @decimal { get; set; }
}

public class Indicator
{
    public string id { get; set; }
    public string value { get; set; }
}

public class Country
{
    public string id { get; set; }
    public string value { get; set; }
}
The custom WBData object is described as above to map out the response from the World Bank API used
<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <AzureFunctionsVersion>v3</AzureFunctionsVersion>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.30" />
    <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.EventHubs" Version="5.0.0" />
    <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Http" Version="3.0.12" />
    <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.0.1" />
  </ItemGroup>
  <ItemGroup>
    <None Update="host.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
    <None Update="local.settings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
      <CopyToPublishDirectory>Never</CopyToPublishDirectory>
    </None>
  </ItemGroup>
</Project>
Packages used. Important - Use Microsoft.Azure.WebJobs.Extensions.EventHubs for correct EventHubs output Binding!!!

In my quick example, I use an Azure Function that responds to an HTTP trigger and makes a call to data source (in this case some World Bank data) via an API call. The results from the World Bank are then organised into easy to manage WBData objects. The WBData objects become the events that get sent to Azure Event Hub.

After hitting the Azure Function URL, the waiting Azure Stream Analytics job waits for events as they arrive in the linked Azure Event Hub Instance (not the Namespace!!) and automatically runs the defined query and outputs the results into the defined output data sink (Blob Storage in my example):

A simple query that selects the population count and the year (value and date) from the incoming input event data (in clouddemo Event Hub) where the population was more than 350000000 and less than 420000000. The query results are sent to Blob Storage (dataresult) as json files
{"value":419223717,"date":"2002"}
{"value":408522129,"date":"2001"}
{"value":398113044,"date":"2000"}
{"value":387977990,"date":"1999"}
{"value":378098393,"date":"1998"}
{"value":368440591,"date":"1997"}
{"value":358953595,"date":"1996"}
{"value":350556886,"date":"2010"}
Output result

This is a simple example but the idea is that it is possible to also add multiple Inputs and to output to multiple Outputs and query from and into them. This means you can have multiple event hubs that get queried to multiple blob stores or multiple Functions or a single Cosmos DB instance or an Azure SQL Database. Powerful!!!  

NOW REMEMBER TO DELETE ANY  RESOURCES IN YOUR SUSBCRIPTION YOU NO LONGER NEED

Ref Cover Image by eberhard grossgasteiger from Pexels