Using Azure Video Analyser for Media in Azure Functions

First some clarification, 'Azure Video Analyser for Media' should be considered the new name for what used to be called Video Indexer. It is slightly different from 'Azure Video Analyser' which is currently in Preview.

With Azure Video Analyser for Media, it's possible to analyse and extract useful insights from media input (video with audio or audio only) such as the transcription of spoken word, timelines of certain events, key speaker recognition, topics, and even more exotic items such as black frame recognition (in videos), dog barks or glass shattering. These insights can be pushed onto further downstream processes as inputs in applications where required.

In this blog, I document the extraction process of the insights from audio only to try and keep this short and focussed. The audio is podcast audio that is sourced from a known RSS feed link of a podcast. The RSS link returns an XML structure that includes nodes that have links to the direct audio of each podcast episode. When the podcast creator has a new episode out, the XML is updated automatically as part of their distribution strategy to automatically update any consumer of this XML.

For my use, I use a Timer Triggered Azure Function to run on a schedule, in close sync with releases of new episodes, to fetch the latest episode audio link from the XML. The episode audio link is added to an Azure Storage Queue, which triggers an upload to a Video Indexer account that begins the analysis process, and a separate Durable Function is also started. This Durable Function uses a Monitor Pattern to constantly check every 5mins whether the analysis for insights (known as indexing in this context) has completed. The result from the indexing process is a very large JSON that can be deserialized to our liking.

The Process

The Code

Some Pre-Requisites:

  • Azure Storage Queue to add the new podcast link onto. In my case, this queue is called 'podcastlinks'.
  • An account created for 'Video Indexer', sign up for free tier account or a trial account at https://videoindexer.ai , then capture the AccountId under the 'Account Settings' from the dashboard. Also capture the API Key, by first going to https://api-portal.videoindexer.ai/products and under Authorization, select Subscribe. You should now have a section that should have the ability to reveal your Primary Key and Secondary Key(we will use the Primary key). You can always view the API Key here at https://api-portal.videoindexer.ai/profile.
  • Visual Studio 2022 for developing Azure Functions and deploying to Azure. The Functions deployed are on the Consumption Tier

The Timer Triggered Function runs every Thursday at 13:40pm:

[FunctionName("ScheduledXml")]
public async  Task  ScheduledXml([TimerTrigger("0 40 13 * * THU")]TimerInfo myTimer, [Queue("podcastlinks")] ICollector<string> queueItems, ILogger log)
{
    //Fetch an Xml from an external web resource
    HttpClient client = new HttpClient();
    var result = await client.GetAsync("https://feeds.megaphone.fm/lewlater"); // choose your podcast RSS feed here :D

    var podcastXml = await result.Content.ReadAsStringAsync();

    //the xml structure manipulation needed will depend on the source,
    //but most RSS XML's from Megaphone will have an 'enclosure' tag containing the audio location
    XmlDocument xmlDoc = new XmlDocument();
    xmlDoc.LoadXml(podcastXml);
    var lastNode = xmlDoc.GetElementsByTagName("item")[0].ChildNodes.Count - 1;
    var tagData = xmlDoc.GetElementsByTagName("item")[0].ChildNodes[lastNode].OuterXml;

    xmlDoc.LoadXml(tagData);
    var audioFileLocation = xmlDoc.GetElementsByTagName("enclosure")[0].Attributes[0].Value;
    log.LogInformation($"found audio at {audioFileLocation}");
    
    //add the audio file location Url to the Queue
    queueItems.Add(audioFileLocation);
}
On a schedule, get the known RSS XML, then read the value found in the latest 'enclosure' tag, add this value to the 'podcastlinks' Queue

Automatically, the Queue Triggered Function will trigger after getting a new message arriving on the queue. The message that arrives is the podcast audio link. With this publicly available audio link, we can upload it to the Video Analyser Account (video indexer account) which automatically starts an indexing process. Here I carry out these tasks in the StartIndex Function:

[FunctionName("StartIndex")]
public async Task StartIndex([QueueTrigger("podcastlinks")] string podcastlink, string DequeueCount, [DurableClient] IDurableOrchestrationClient durableclient, ILogger log)
{
    if (DequeueCount == "1")
    {
    	log.LogInformation($"Got the link: {podcastlink}");
        IndexerAccessDetail accessDetail = new IndexerAccessDetail() {
          BaseUrl = "https://api.videoindexer.ai",
          ApiKey = "[YOUR_API_KEY]",
          Location = "[YOUR_RESOURCE_LOCATION]", // if using trial account use 'trial'
          AccountId = "[YOUR_VIDEO_INDEXER_ACCOUNTID]"
        };

        HttpClient client = new HttpClient();
        client.DefaultRequestHeaders.Add("Ocp-Apim-Subscription-Key", accessDetail.apiKey);

        // account accessToken to use in the Upload api
        var accountAccessTokenRequest = await client.GetAsync($"{accessDetail.BaseUrl}/auth/{accessDetail.Location}/Accounts/{accessDetail.AccountId}/AccessToken?allowEdit=true");
        var accountAccessToken = (await accountAccessTokenRequest.Content.ReadAsStringAsync()).Replace("\"", "");

        // upload audio from the podcast link to video indexer
        var content = new MultipartFormDataContent();
        var uploadRequest = await client.PostAsync($"{accessDetail.BaseUrl}/{accessDetail.Location}/Accounts/{accessDetail.AccountId}/Videos?accessToken={accountAccessToken}&name=newaudio{Guid.NewGuid()}e&description=some_description&privacy=private&partition=newpt{Guid.NewGuid()}&videoUrl={podcastlink}", content);
        var uploadResult = await uploadRequest.Content.ReadAsStringAsync();
        accessDetail.UploadResult = uploadResult;

        // get the item id of the uploaded artefact from the upload result
        var podcastIndexerId = JsonConvert.DeserializeObject<dynamic>(uploadResult)["id"];
        accessDetail.PodcastIndexerId = podcastIndexerId;

        string instanceId = "InstanceId1"; //VERY IMPORTANT, keep the same instanceId in the following Orchestration Function, so that function can be re-called multiple times 

        //start durable Function orchestration and pass in collected details
        //that enable access to the uploaded artefact
        await durableclient.StartNewAsync("Orchestration", instanceId, accessDetail);

    }

}
Upon dequeue of the message, fetch the audio and upload it to Azure Video Analyser for Media (Video Indexer). Then Start an orchestration to constantly check the status of the indexing process

The custom IndexAccessDetail class:

public class IndexerAccessDetail
{
    public string AccountId { get; set; }
    public string Location { get; set; }
    public string BaseUrl { get; set; }
    public string ApiKey { get; set; }
    public string PodcastIndexerId { get; set; }
    public string UploadResult { get; set; }
}

After uploading is done, I start a separate Durable Function. The Orchestration shown below is there to make a call to an Activity Function called CheckState every 5mins. This 'sleep' for 5mins is done through a Timer, adding 5mins to the current Time and waiting for that to finish first (DO NOT USE Thread.Sleep in Azure Functions!!!). Provided the Function instanceId remains the same, the Orchestration Function is then 'called again' successfully through the use of ContinueAsNew from the durable orchestration context, and in my case also re-passes the existing input data to be re-used in the next call as the Function input data through context.GetInput. (More on Eternal orchestrations here).  [Note that this continuous polling for a change of state can be considered as a Monitor Pattern In Azure Functions]. This continuous cycle continues until the result from the CheckState Activity changes:

[FunctionName("Orchestration")]
public static async Task Orchestration(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
    //useful to double check instanceId
    log.LogInformation($"running as instance: {context.InstanceId}");
    
    //get indexer access properties from input data
    IndexerAccessDetail accessDetail = context.GetInput<IndexerAccessDetail>();

    var stateResult = await context.CallActivityAsync<string>("CheckState", accessDetail);
    
    //'Sleep' for 5mins then call the orchestration again if not finished
    DateTime refreshTime = context.CurrentUtcDateTime.AddMinutes(5);
    await context.CreateTimer(refreshTime, CancellationToken.None);
    if (stateResult == "Not Finished")
    {
    	context.ContinueAsNew(accessDetail);
    }
    else
    {
    	log.LogInformation($"Completed with:{stateResult}");
    }

}
Orchestration to call a CheckState Activity every 5 mins. When the stateResult condition switches into the else statement, the orchestration function automatically exits

The CheckState Function will look into the state of the indexing process to see whether the process has completed. Upon finishing, this function will print out the large JSON indexing result with the insights found. Once insights are found, it will inform the Orchestrator Function with a Completion state/status:

[FunctionName("CheckState")]
public async Task<string> CheckState([ActivityTrigger] IndexerAccessDetail accessDetail, ILogger log)
{
    if (accessDetail != null && accessDetail?.PodcastIndexerId!=null && accessDetail?.UploadResult != null)
    {
        HttpClient client = new HttpClient();
        client.DefaultRequestHeaders.Add("Ocp-Apim-Subscription-Key", accessDetail.ApiKey);

        //use a new accessToken in each check and avoid re-using the same accessToken
        var itemAccessTokenRequest = await client.GetAsync($"{accessDetail.BaseUrl}/auth/{accessDetail.Location}/Accounts/{accessDetail.AccountId}/Videos/{accessDetail.PodcastIndexerId}/AccessToken?allowEdit=true");
        var itemAccessToken = (await itemAccessTokenRequest.Content.ReadAsStringAsync()).Replace("\"", "");

        //attempt to fetch the index result and check the current state
        var fetchIndex = await client.GetAsync($"{accessDetail.BaseUrl}/{accessDetail.Location}/Accounts/{accessDetail.AccountId}/Videos/{accessDetail.PodcastIndexerId}/Index?accessToken={itemAccessToken}&language=English");
        var indexResult = await fetchIndex.Content.ReadAsStringAsync();

        var processingState = JsonConvert.DeserializeObject<dynamic>(indexResult)["state"];

        if (processingState != "Uploaded" && processingState != "Processing" && processingState != null)
        {
        	log.LogInformation(indexResult); //json index result
        	return processingState;
        }
        log.LogInformation($"Current State is: {processingState}");

        return "Not Finished";
    }
return "Not Finished";
}
Check the state of the indexing process, log the indexing JSON result if indexing has finished

The output JSON produced in the CheckState Activity would be something like the following, and is produced after about 45 to 50mins of indexing for a 1 hour 50min podcast:

The full indexing JSON result produced is quite large (containing the spoken transcript, topics, timestamps, named brands and locations), and deserializing into a smaller application object would be beneficial as a next step 
<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>net6.0</TargetFramework>
    <AzureFunctionsVersion>v4</AzureFunctionsVersion>
  </PropertyGroup>
  <ItemGroup>
	  <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.6.1" />
	  <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="5.0.0" />
	  <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="4.0.1" />
  </ItemGroup>
  <ItemGroup>
    <None Update="host.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
    <None Update="local.settings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
      <CopyToPublishDirectory>Never</CopyToPublishDirectory>
    </None>
  </ItemGroup>
</Project>
Packages Reference

Conclusions and further comment

This blog captures a smaller part of a larger idea I had in mind where a user could be presented with the discovered topics from the latest podcast through a web app or a Smart Display like a Google Nest Hub. They would be able to click their desired topics of choice, say 3 to 5 of them, and these selected topics would be used to clip and mux the sections where the topics are discussed in the entire podcast, and refined as a custom piece of audio that they would listen to. This would be a viable next step, thinking around some ways to slice it 🤔, but it would most likely involve the use of Azure Media Services as I could not find a possible way of editing the uploaded audio through the APIs for Azure Video Analyser for Media (Video Indexer).

N.B the free/trial account for Azure Video Analyser for Media has a 600 minute total uploaded content length limit for items held in the account.

Cover Image by Amr Taha on Unsplash