C#
Our C# SDK is designed to be used for high performance telemetry services where you need to process high volumes of data in a nanosecond response time.
We also provides HTTP and Websockets services to read and write realtime telemetry data for less performance oriented use cases like Mobile or Web applications.
Connecting to Quix
Streams are written and read using an instance of StreamingClient
class.
You can instantiate an instance of StreamingClient
with a string containing your Quix Kafka server list and some SecurityOptions
to access it. You can find your Kafka configuration parameters using the automatically generated Samples in the platform for your specific workspace.
var security = new SecurityOptions(CERTIFICATES_FOLDER, QUIX_USER, QUIX_PASSWORD);
var client = new Quix.Sdk.Streaming.StreamingClient("kafka-k1.quix.ai:9093,kafka-k2.quix.ai:9093", security);
Writing Data to Quix
You need a Topic to write data to Quix. You can create one in platform. This instance allow you to write new Streams into the specified Topic.
You can instantiate an instance of OutputTopic
with a string containing your Topic Id. You can find your Topic Id on Topics option of the platform or just using the automated generated Samples in the platform for your specific Workspace and Topic.
var outputTopic = client.CreateOutputTopic(TOPIC_ID);
Writing Streams
Once you have the OutputTopic instance you can create as many streams as you want using the method CreateStream
. The Stream Id is autogenerated but you can also pass a StreamId
to the method. If you do that, you can append existing stream that is already closed.
var stream = outputTopic.CreateStream();
Stream Properties
As an option, you can add context to your streams by adding metadata to the stream.
You can add this metadata to a stream by using the Properties
options of the generated stream
instance.
stream.Properties.Name = "Hello World C# stream";
stream.Properties.Location = "/test/location";
stream.Properties.Metadata["meta"] = "is";
stream.Properties.Metadata["working"] = "well";
Stream Location
The stream location property is particularly important as it defines the hierarchy of your data in the data catalouge.
For example, the following location:
stream.Properties.Location = $"/Game/Codemasters/F1-2019/{track}"
Would result in this hierarchy in the catalogue:
Any streams sent without a location property will be located under Root by default.
Writing Parameters
You can now start writing parameter data to your stream. We reccommend that you do this using the built-in buffer feature.
Timestamps
Our SDK provides several helper functions to add new timestamps to a Buffer
, ParameterData
and EventData
instances with several types of date time formats.
Some of these functions use the default Epoch
defined at Stream
level. This Epoch
is very useful to avoid specifying the date part of each timestamp you add with the SDK.
These are all the common helpers functions:
AddTimestamp(DateTime dateTime)
: Add a new timestamp inDateTime
format. DefaultEpoch
will never be added to this.AddTimestamp(TimeSpan timeSpan)
: Add a new timestamp inTimeSpan
format since the defaultEpoch
determined in the stream.AddTimestampMilliseconds(long timeMilliseconds)
: Add a new timestamp in milliseconds since the defaultEpoch
determined in the stream.AddTimestampNanoseconds(long timeNanoseconds)
: Add a new timestamp in nanoseconds since the defaultEpoch
determined in the stream.
Adding data without epoch:
stream.Parameters.Buffer
.AddTimestamp(DateTime.UtcNow)
.AddValue("ParameterA", 10)
.AddValue("ParameterB", "hello")
.Write();
or we can add timestamp 1000ms from epoch:
stream.Epoch = DateTime.UtcNow;
stream.Parameters.Buffer
.AddTimestampInMilliseconds(1000)
.AddValue("ParameterA", 10)
.AddValue("ParameterB", "hello")
.Write();
Buffer
Our SDK provides a built in Buffer
to help you achieve high performance data streaming without the complexity of managing underlying streaming technologies. Instead, you just have to configure the buffer with your requirements. For example the following configuration means that the SDK will send a packet when the size of the buffer reaches 100 timestamps:
stream.Parameters.Buffer.PacketSize = 100;
Writing a parameter to that buffer is as simple as using the AddTimestamp
method and AddValue
for each Parameter value we want to write in. At the end we use Write
method to write the timestamp to the buffer.
stream.Parameters.Buffer
.AddTimestamp(DateTime.UtcNow)
.AddValue("ParameterA", 10)
.AddValue("ParameterB", "hello")
.AddValue("ParameterC", Encoding.ASCII.GetBytes("Hello Quix!")) // Write binary data as a byte array.
.Write();
You can configure multiple conditions to determine when the Buffer has to release data, if any of these conditions become true, the buffer will release a new packet of data and that data is cleared from the buffer:
Buffer.BufferTimeout
: The maximum duration in milliseconds for which the buffer will be held before releasing the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.Buffer.PacketSize
: The maximum packet size in terms of number of timestamps. Each time the buffer has this amount of timestamps the packet of data is released.Buffer.TimeSpanInNanoseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released.Buffer.TimeSpanInMilliseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released. Note: This is a millisecond converter on top ofTimeSpanInNanoseconds
. They both work with same underlying value.Buffer.CustomTriggerBeforeEnqueue
: Custom function which is invoked before adding a new timestamp to the buffer. If returned true, the packet of data is released before adding the timestamp to it.Buffer.CustomTrigger
: Custom function which is invoked after adding a new timestamp to the buffer. If returned true, the packet of data is released with the entire buffer content.Buffer.Filter
: Custom function to filter the incoming data before adding it to the buffer. If returned true, data is added otherwise not.
Examples
This buffer configuration will send data every 100ms window or if no data is buffered in 1 second timout period, it will empty buffer anyway.
stream.Parameters.Buffer.PacketSize = 100;
stream.Parameters.Buffer.BufferTimeout = 1000
This buffer configuration will send data every 100ms window or if critical data arrives, it will empty buffer anyway.
stream.Parameters.Buffer.PacketSize = 100;
stream.Parameters.Buffer.CustomTrigger = data => data.Timestamps[0].Tags["is_critical"] == "True"
Parameter Definitions
Quix SDK allows you to define some visualization configuration and metadata for Parameters and Events. You can define things like human readable names, descriptions, ranges, etc. Quix uses some of these configuration in the Visualise in the platform but you can use them aswell in your own models, bridges or visualization implementations.
We call these configurations Definitions
and all you need to do is to use add_definition
helper function either for stream.parameters
or stream.events
:
Parameters.AddDefinition(string parameterId, string name = null, string description = null)
events.AddDefinition(string eventId, string name = null, string description = null)
stream.Parameters
.AddLocation("vehicle/ecu")
.AddDefinition("vehicle-speed", "Vehicle speed", "Current vehicle speed measured using wheel sensor")
.SetRange(0, 400)
.SetUnit("kmh");
The Min and Max range definition sets the Y axis range in the waveform visualisation view. The following definition
.AddDefinition($"{playerPrefix}Speed").SetRange(0, 400)
will set this view in Visualise:
Adding additional Definitions
for each parameter allows you to see data with diffent ranges on the same waveform view:
You can also define a Location
before adding parameter and event definitions. Locations are used to organize the Parameters and Events in hierarchy groups in the data catalogue. To add a Location you should use AddLocation
function before adding the definitions you want to include in that group.
For example, setting the following parameter location
stream.Parameters
.AddLocation("/Player/Motion/Car")
.AddDefinition("Pitch")
.AddDefinition("Roll")
.AddDefinition("Yaw");
will result in this parameter hierarchy in the parameter selection dialogue:
Once you have add a new definition you can also attach some additional configurations to it. This is the whole list of visualization and metadata options we can attach to a ParameterDefinition
:
SetRange(double minimumValue, double maximumValue)
: Set the minimum and maximum range of the parameter.SetUnit(string unit)
: Set the unit of the parameter.SetFormat(string format)
: Set the format of the parameter.SetCustomProperties(string customProperties)
: Set the custom properties of the parameter.
Writing Events
You can also write events to Quix.
Writing events to a stream is identical to writing parameters, although you can start without the buffer feature because events don't need high performance throughput.
stream.Events
.AddTimestamp(DateTime.UtcNow)
.AddValue("EventA", "Nice!")
.AddValue("EventB", "High Five")
.Write();
Event Definitions
Likewise, you can write Definitions
to each event.
This is the whole list of visualization and metadata options we can attach to a EventDefinition
:
SetLevel(EventLevel level)
: Set severity level of the event.SetCustomProperties(string customProperties)
: Set the custom properties of the event.
For example the following code is defining a human readable name and a Severity level for the EventA
.
stream.Events.AddDefinition("EventA", "The Event A").SetLevel(EventLevel.Critical);
Tags
Using tags alongside parameters and events practically index persisted data in the database. This means you will be able to filter and group data by those tags in fast queries. Tags have to be chosen carefully as excessive cardinality leads to performance degradation in the database.
Good example: This will allow you later to query maximum speed for driver "Peter" per car.
stream.Events
.AddTimestamp(DateTime.UtcNow)
.AddTag("vehicle-plate", "SL96 XCX")
.AddTag("driver-id", "Peter")
.AddValue("Speed", 53)
.AddValue("Gear", 4)
.Write();
Wrong example: This will lead to excessive cardinality as there will be a massive amount of different values for specified tag Speed.
stream.Events
.AddTimestamp(DateTime.UtcNow)
.AddTag("Speed", 53)
.AddValue("Gear", 4)
.Write();
Minimal example
This is minimal code example needed to write data to a topic using Quix SDK.
using System;
using System.Threading;
using Quix.Sdk.Streaming.Configuration;
namespace WriteHelloWorld
{
class Program
{
/// <summary>
/// Main will be invoked when you run the application
/// </summary>
static void Main()
{
// Create a client which holds generic details for creating input and output topics
var client = new Quix.Sdk.Streaming.StreamingClient(
BROKER_CLUSTER,
new SecurityOptions(
"certificates/ca.cert",
WORKSPACE_ID,
PASSWORD));
using var outputTopic = client.OpenOutputTopic(TOPIC_ID);
var stream = outputTopic.CreateStream();
stream.Properties.Name = "Hello World stream";
Console.WriteLine("Sending values for 30 seconds");
for (var index = 0; index < 3000; index++)
{
stream.Parameters.Buffer
.AddTimestamp(DateTime.UtcNow)
.AddValue("ParameterA", index)
.Write();
Thread.Sleep(10);
}
Console.WriteLine("Closing stream");
stream.Close();
Console.WriteLine("Done!");
}
}
}
Reading from Quix
In order to read streams, you need an InputTopic
instance. This instance allow you to read all the incoming streams on the specified Topic.
You can instantiate an instance of InputTopic
with a string containing your Topic Id. You can find your Topic Id on Topics page of the platform or by using the automated generated Samples in the platform for your specific Workspace and Topic.
var inputTopic = client.CreateInputTopic(TOPIC_ID);
Reading Streams
Once you have the InputTopic
instance you can start reading streams. For each stream received to the specified topic, InputTopic
will execute the event OnStreamReceived
. For example the following code is printing the StreamId for each newStream
received on that Topic:
inputTopic.OnStreamReceived += (s, newStream) =>
{
Console.WriteLine($"New stream read: {newStream.StreamId}");
};
inputTopic.StartReading();
Reading Parameters
You can also use buffers to read data, this helps you to to develop Models with a high performance throughput.
var buffer = newStream.Parameters.CreateBuffer();
You just have to configure these buffers with your input requirements using the available built in configuration. For example the following configuration means that the Buffer will release a packet when the Time Span between first and last timestamp reaches 100 milliseconds:
buffer.TimeSpanInMilliseconds = 100;
Reading parameter data from that buffer is as simple as use the OnRead
event. For each paramater data packet released from the buffer the SDK will execute the OnRead
event with the parameter data as a given parameter. For example the following code is printing the ParameterA value of first timestamp of the packet received:
buffer.OnRead += (data) =>
{
var helloWorldValue = data.Timestamps[0].Parameters['ParameterA'].NumericValue;
Console.WriteLine($"ParameterA - {data.Timestamps[0].Timestamp}: {helloWorldValue}");
// Reading binary data into a byte array.
var bData = data.Timestamps[0].Parameters["ParameterC"].BinaryValue;
Console.WriteLine($"binary_param - {parameterData.Timestamps[0].Timestamp}: {Encoding.ASCII.GetString(bData)}");
};
You can configure multiple conditions to determine when the Buffer has to release data, if any of these conditions become true, the buffer will release a new packet of data and that data is cleared from the buffer:
Buffer.BufferTimeout
: The maximum duration in milliseconds for which the buffer will be held before releasing the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.Buffer.PacketSize
: The maximum packet size in terms of number of timestamps. Each time the buffer has this amount of timestamps the packet of data is released.Buffer.TimeSpanInNanoseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released.Buffer.TimeSpanInMilliseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released. Note: This is a millisecond converter on top ofTimeSpanInNanoseconds
. They both work with same underlying value.Buffer.CustomTriggerBeforeEnqueue
: Custom function which is invoked before adding a new timestamp to the buffer. If returned true, the packet of data is released before adding the timestamp to it.Buffer.CustomTrigger
: Custom function which is invoked after adding a new timestamp to the buffer. If returned true, the packet of data is released with the entire buffer content.Buffer.Filter
: Custom function to filter the incoming data before adding it to the buffer. If returned true, data is added otherwise not.
Examples
This buffer configuration will send data every 100ms window or if no data is buffered in 1 second timout period, it will empty buffer anyway.
stream.Parameters.Buffer.PacketSize = 100;
stream.Parameters.Buffer.BufferTimeout = 1000
This buffer configuration will send data every 100ms window or if critical data arrives, it will empty buffer anyway.
stream.Parameters.Buffer.PacketSize = 100;
stream.Parameters.Buffer.CustomTrigger = data => data.Timestamps[0].Tags["is_critical"] == "True"
Reading Events
Reading events from a stream is as easy as reading parameter data. In that case Quix SDK is not using a Buffer because we don't need high performance throughput, but the way we read Event Data from a newStream
is identical.
newStream.Events.OnRead += (data) =>
{
Console.WriteLine($"Event read for stream {newStream.StreamId}, Event Id: {data.Id}");
};
Minimal example
This is minimal code example needed to read data from a topic using Quix SDK.
using System;
using System.Linq;
using System.Threading;
using Quix.Sdk.Streaming.Configuration;
using Quix.Sdk.Streaming.Models;
namespace ReadHelloWorld
{
class Program
{
/// <summary>
/// Main will be invoked when you run the application
/// </summary>
static void Main()
{
// Create a client which holds generic details for creating input and output topics
var client = new Quix.Sdk.Streaming.StreamingClient(
BROKER_CLUSTER,
new SecurityOptions(
"certificates/ca.cert",
WORKSPACE_ID,
PASSWORD));
using var inputTopic = client.OpenInputTopic(TOPIC_ID);
// Hook up events before initiating read to avoid losing out on any data
inputTopic.OnStreamReceived += (s, streamReader) =>
{
Console.WriteLine($"New stream read: {streamReader.StreamId}");
var buffer = streamReader.Parameters.CreateBuffer();
buffer.OnRead += parameterData =>
{
Console.WriteLine(
$"ParameterA - {parameterData.Timestamps[0].Timestamp}: {parameterData.Timestamps.Average(a => a.Parameters["ParameterA"].NumericValue)}");
};
};
inputTopic.StartReading(); // initiate read
Console.WriteLine("Listening for streams");
// Hook up to termination signal (for docker image) and CTRL-C
var exitEvent = new ManualResetEventSlim();
Console.CancelKeyPress += (s, e) =>
{
e.Cancel = true; // In order to allow the application to cleanly exit instead of terminating it
exitEvent.Set();
};
// Wait for CTRL-C
exitEvent.Wait();
Console.WriteLine("Exiting");
}
}
}
WebSockets and Streaming Data
Applications can react to changes occurring inside the Quix platform by subscribing to events.
Each workspce has it's own WebSockets service allowing data to be read from any topic on that workspace.
The URL to access the WebSockets interface is:
https://reader-{workspaceID}.platform.quix.ai/hub
This URL points to a SignalR hub dedicated to serving your workspace.
Note that authentication uses the same token as the rest of the Quix API's. This can be obtained in the portal under "Profile".
There are 2 steps to connecting to the WebSockets interface. - Connect - Subscribe
Connect
Using C# import or obtain the relevant SignalR implementation for your platform. E.g. for .net core use "Microsoft.AspNetCore.SignalR.Client"
Using the 'HubConnectionBuilder' from the SignalR client together with the URL and TOKEN specific to your workspace; build a connection object.
var connection = new HubConnectionBuilder()
.WithUrl(URL, options => { options.AccessTokenProvider = () => TOKEN; })
.Build();
Start the connection
connection.StartAsync()
Handle Message from the Hub
Listen for data being emitted from the interface. Once successfully started (and you should check by inspecting the connections 'State' property) you can subscribe to messages coming from the WebSockets interface.
There are 2 stages to this.
- Tell Quix which messages you'd like to receive
- Handle messages being recieved from Quix
We will tackle the event handler first.
Use one of the following lines of code as required:
connection.On<ParameterData>("ParameterDataReceived", (data) => { //handle the data here });
connection.On("ParameterDataReceived", (ParameterData data) => { //handle the data here });
//or use JsonConvert
connection.On<object>("ParameterDataReceived", (data) => {
var paramData = JsonConvert.DeserializeObject<ParameterData>(data.ToString());
});
On Data Received Subscriptions
The complete list of events emitted by the SignalR hub and that are subscribable in the .On method are:
ParameterDataReceived
Raised when parameter data is received. Type: ParameterData
ParameterDefinitionsUpdated
Raised when parameter definitions are updated Type: ParameterDefinitions
EventDataReceived
Raised when event data is received Type: EventData
EventDefinitionsUpdated
Raised when event definitions are updated Type: EventDefinitions
Subscribe
Now that your'e ready to receive the data, tell Quix what data you want.
You'll need to invoke the 'SubscribeToParameter' method on the WebSockets interface.
connection.invoke("SubscribeToParameter", TOPIC, STREAM, PARAMETER);
SubscribeToParameter takes 3 parameters. - Topic is the name of the topic you wish to listen to. - Stream is the ID of the stream to listen to. - Parameter is the ID of the parameter to listen to.
This combination uniquely identifies the Parameter to listen for.
When the specified parameter receives data, the 'On' event handler will fire, allowing your code to handle the fresh data.
WebSockets Subscriptions
The complete list of subscriptions allowed by the Quix WebSockets interface are as follows:
SubscribeToParameter
Subscribe to a specific parameter, stream topic combination Parameters: TopicName, StreamId, ParameterId
UnsubscribeFromParameter
Unsubscribe from a parameter Parameters: TopicName, StreamId, ParameterId
SubscribeToParameterDefinitions
Subscribe to updates to the parameters defined for a given topic and stream combination Parameters: TopicName, StreamId
SuscribeToEvent
Subscribe to a specific event, stream topic combination Parameters: TopicName, StreamId, EventId
UnsubscribeFromEvent
Unsubscribe from an event Parameters: TopicName, StreamId, EventId
SubscribeToEventDefinitions
Subscribe to updates to the events defined for a given topic and stream combination Parameters: TopicName, StreamId
UnsubscribeFromStream
Unsubscribe from all subscriptions to a topic, stream combination
Schemas
Schemas for the objects received from the WebSockets interface on a particular environment can be seen and obtained from the following swagger interface.
https://writer-quix-[YOUR-WORKSPACE-NAME].platform.quix.ai/index.html
This can link can also be found in Quix Platform's 'Write Topic Data' HTTP sample project.