Custom Extension for Azure Functions— Part 1 — Triggers

NATS messaging system

Azure Function Custom Extension

Custom Trigger

  • Define a class that extends from Attribute. This class represents our attribute class. We define all the parameters and configuration values for our trigger. In our case, we define connection string and NATS channels.
  • Define a class that implements the interface IListener. This class contains the logic to connect to our external event source and wait for events. In our case, we will connect to the NATS server and look for incoming messages. The IListener interface has the following functions:
    StartAsync:- The system calls this function to start our listener. This function returns one Task object that completes when our listener successfully started.
    StopAsync:- The system calls this function to stop our listener. This function returns one Task object that completes when the listener completely stopped.
    Cancel:- The system calls this function to cancel any ongoing listen operation.
    Dispose:- IDisposable’s dispose function.
  • Define a class that implements the interface ITriggerBinding. In this class, we create our listener and bind our trigger data. The ITriggerBinding interface has the following functions:
    CreateListenerAsync:- The system calls this function to create a listener. This function returns a Task object that has our listener.
    BindAsync:- This function is called to bind a specified value using a binding context. When our listener receives an event, we try to execute the function, passing the event data. This event data is encapsulated in a TriggeredFunctionData class and send to the Azure Function. In the BindAsync, we will bind this value to our corresponding data. This function returns a TriggerData class. TriggerData class accepts a class that implements an IValueBinder interface and a read-only dictionary. We will revisit this later in this article.
    ToParameterDescriptor:- The system calls this function to get a description of the binding.
  • Define a class that implements the interface IValueBinder. As I explained in the BindAsync section, we are binding the trigger data to our data class using this class. The IValueBinder has three methods:
    GetValueAsync:- Returns a task that has the value object.
    SetValueAsync: — Returns a task that completes when the object to our data class completes.
    ToInvokeString:- Returns object string.
  • Define a class that implements the interface ITriggerBindingProvider. This class is a provider class that returns a class that implements the ITriggerBinding interface. This class has the following function:
    TryCreateAsync:- The system call this function to get a class that implements the ITriggerBinding interface. The system will pass a TriggerBindingProviderContext class as a parameter. In this function, we check whether the TriggerBindingProviderContext object contains our custom attribute. If the Attribute is present, we will create TriggerBinding class and return a Task.
  • Create a class that implements the interface IExtensionConfigProvider. The IExtensionConfigProvider defines an interface enabling third party extension to register. The interface has the following function:
    Initialize:- In this function, we will register all our triggers and bindings.
  • And finally, we create a class that implements the interface IWebJobStartup. This interface defines the configuration actions to perform when the Function Host starts up. This interface has the following function:
    Configure:- The system call this function when the function host initializes. In this function, we will add our custom extension.
  • The system calls the Configure method passing the IWebJobsBuilder object. We add the extension using the AddExtension method using the class that implements the IExtensionConfigProvider interface.
  • The system calls the Initialise method of the IExtensionConfigProvider passing ExtensionConfigContext as a parameter. Our implementation of the Initialize method adds the add the binding rule using the AddBindingRule method of the ExtensionConfigContext, which returns a BindingRule object. We call the BindToTrigger method to add our trigger passing TriggerBindingProvider as a parameter.
  • After that system calls the TryCreateAsync function of the TriggerBindingProvider passing the TriggerBindingProviderContext as a parameter, in this TryCreateAsync method, we check whether our Attribute class present or not. We create our class that implements the ITriggerBinding interface and return a Task that contains the object.
  • The system then calls the CreateListenerAsync method of our class that implements the ITriggerBinding interface passing ListenerFactoryContext object. In our CreateListenerAsync, we return a class that implements the IListener interface. The ListenerFactoryContext object contains a class that implements the ITriggeredFunctionExecutor interface. The ITriggeredFunctionExecutor interface has a method called TryExecuteAsync. Using this method, we can execute the triggered function, passing the event data and CancellationToken.

Creating NATS Custom Extension

Microsoft.Azure.WebJobs.Extensions 
MyNatsClient

Create Trigger

using System;
using Microsoft.Azure.WebJobs.Description;

namespace WebJobs.Extension.Nats
{
/// <summary>
/// <c>Attribute</c> class for Trigger
/// </summary>
[AttributeUsage(AttributeTargets.Parameter)]
[Binding]
public class NatsTriggerAttribute: Attribute
{
// <summary>
// Connection string in the form of nats://<username>:<password>@<host>:<port>
// </summary>
public string Connection { get; set; }
// Channel string
public string Channel { get; set; }
// QueueGroup string
public string QueueGroup { get; set; }
// <summary>
// Helper method to get connection string from environment variables
// </summary>
internal string GetConnectionString()
{
return Environment.GetEnvironmentVariable(Connection);
}
}
}
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using MyNatsClient.Rx;

namespace WebJobs.Extension.Nats
{
/*
The NatsListner class
Implements the <c>IListener</c> interface. Contains the code to connect
to a NATS server and subscribe a Channel.
*/
public class NatsListener: IListener
{
private readonly ITriggeredFunctionExecutor _executor;
private readonly NatsTriggerContext _context;

/// <summary>
/// NatsListener constructor
/// </summary>
///
/// <param name="executor"><c>ITriggeredFunctionExecutor</c> instance</param>
/// <param name="context"><c>NatsTriggerContext</c> instance</param>
///
public NatsListener(ITriggeredFunctionExecutor executor, NatsTriggerContext context)
{
_executor = executor;
_context = context;
}

/// <summary>
/// Cancel any pending operation
/// </summary>
public void Cancel()
{
if (_context == null || _context.Client == null) return;

_context.Client.Disconnect();
}

/// <summary>
/// Dispose method
/// </summary>
public void Dispose()
{
_context.Client.Dispose();
}

/// <summary>
/// Start the listener asynchronously. Subscribe to NATS channel and
/// wait for message. When a message is received, execute the function
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A Task returned from Subscribe method</returns>
public Task StartAsync(CancellationToken cancellationToken)
{
return _context.Client.Subscribe(_context.TriggerAttribute.Channel, stream => stream.Subscribe(msg => {
var triggerData = new TriggeredFunctionData
{
TriggerValue = msg.GetPayloadAsString()
};

var task = _executor.TryExecuteAsync(triggerData, CancellationToken.None);
task.Wait();
}));
}

/// <summary>
/// Stop current listening operation
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.Run(() =>{
_context.Client.Disconnect();
});
}
}
}
namespace WebJobs.Extension.Nats
{
/// <summary>
/// Trigger context class
/// </summary>
public class NatsTriggerContext
{
/// <summary>
/// <c>Attribute</c> instance
/// </summary>
public NatsTriggerAttribute TriggerAttribute;
/// <summary>
/// <c>NatsClient</c> instance to connect and subscribe to NATS
/// </summary>
public NatsClient Client;

/// <summary>
/// Constructor
/// </summary>
/// <param name="attribute">Attribute instnace</param>
/// <param name="client">NatsClient instance</param>
public NatsTriggerContext(NatsTriggerAttribute attribute, NatsClient client)
{
this.TriggerAttribute = attribute;
this.Client = client;
}
}
}
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Hosting;
using WebJobs.Extension.Nats;

[assembly: WebJobsStartup(typeof(NatsBinding.Startup))]
namespace WebJobs.Extension.Nats
{
/// <summary>
/// Starup object
/// </summary>
public class NatsBinding
{
/// <summary>
/// IWebJobsStartup startup class
/// </summary>
public class Startup : IWebJobsStartup
{
public void Configure(IWebJobsBuilder builder)
{
// Add NATS extensions
builder.AddNatsExtension();
}
}
}
}
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.DependencyInjection;

namespace WebJobs.Extension.Nats
{
/// <summary>
/// WebJobBuilder extension to add NATS extensions
/// </summary>
public static class NatsWebJobsBuilderExtensions
{
/// <summary>
/// Extension method to add our custom extensions
/// </summary>
/// <param name="builder"><c>IWebJobsBuilder</c> instance</param>
/// <returns><c>IWebJobsBuilder</c> instance</returns>
/// <exception>Throws ArgumentNullException if builder is null</exception>
public static IWebJobsBuilder AddNatsExtension(this IWebJobsBuilder builder)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}


builder.AddExtension<NatsExtensionConfigProvider>();

builder.Services.AddSingleton<INatsServiceFactory, NatsServiceFactory>();

return builder;
}
}
}

Create a sample to test the NATS Trigger

using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using WebJobs.Extension.Nats;

namespace NatsTrigger.Sample
{
public static class NatsTriggerSample
{
[FunctionName("NatsTriggerSample")]
public static void Run(
[NatsTrigger(Connection = "NatsConnection", Channel = "SampleChannel", QueueGroup = "SampleGroup")] string message,
ILogger log)
{
log.LogInformation($"Message Received From SampleChannel {message}");
}
}
}
docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats
#!/usr/bin/env node

/* jslint node: true */
'use strict';

var nats = require('nats').connect("nats://<username>:<password>@localhost:4222");
var args = process.argv.slice(2)

nats.on('error', function(e) {
console.log('Error [' + nats.options.url + ']: ' + e);
process.exit();
});

var subject = args[0];
var msg = args[1];

if (!subject) {
console.log('Usage: node-pub <subject> [msg]');
process.exit();
}

nats.publish(subject, msg, function() {
console.log('Published [' + subject + '] : "' + msg + '"');
process.exit();
});
node.js publish.js SampleChannel "Aure Function and Nats are awesome."

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Krishnaraj Varma

Krishnaraj Varma

64 Followers

A Software Architect from Kerala, India, Open Source, Cloud Native enthusiast. Likes Golang, Rust, C/C++, Kubernetes, Kafka, etc.