Skip to content

Kelvin Messages

Messages are the primary interface for exchanging data with the Kelvin platform. Applications send and receive messages to communicate with assets and other components.

Message Types

The SDK provides pre-built message types for common data:

Message Type Description
Number Handles all numeric data types including integers, floats, and unsigned values, etc.
String Represents any type of text or character data.
Boolean Represents only true or false logical values.
Messaging Types Python Example
from kelvin.message import Number, String, Boolean
from kelvin.krn import KRNAssetDataStream

# Create primitive data messages

number_msg = Number(
    resource=KRNAssetDataStream("asset-name", "datastream-name"),
    payload=42.5
)

string_msg = String(
    resource=KRNAssetDataStream("asset-name", "datastream-name"),
    payload="Hello Kelvin"
)

boolean_msg = Boolean(
    resource=KRNAssetDataStream("asset-name", "datastream-name"),
    payload=True
)

Message Builders

For more complex messages, use the provided MessageBuilder helpers:

Message Type Description
ControlChange Requests a change in asset behavior by publishing new control values to a target resource such as a data stream.
ControlAck Acknowledges a received control change and reports its result or applied state.
Recommendation Suggests one or more control changes, typically for optimization or advisory purposes, without immediate execution.
DataTag Attaches metadata or annotations to an asset or data stream, often used to flag events or anomalies.
AppParameter Defines a single app configuration value such as a threshold or limit.
AppParameters Groups multiple app configuration parameters into a list to be included in a single update message.
StateEnum Enumerates possible states for control acknowledgment, such as pending, applied, or failed.
Message Builders Python Example
from kelvin.message import (
    ControlChange,
    ControlAck,
    Recommendation,
    DataTag,
    AssetParameter,
    AssetParameters,
    StateEnum
)

from kelvin.krn import KRNAsset, KRNAssetDataStream, KRNAssetParameter
from datetime import datetime, timedelta

# Control Change - request a change in asset behavior and stating the last_value used to process the new value
control_change = ControlChange(
    resource=KRNAssetDataStream("asset-name", "setpoint"),
    from_value={
                "value": last_value,
                "timestamp": datetime.now(timezone.utc)
                },
    payload=75.0,
    expiration_date=timedelta(minutes=10),
    timeout=60,
    retries=3
)

# Control Acknowledgement - respond to control changes
ack = ControlAck(
    resource=KRNAssetDataStream("asset-name", "setpoint"),
    state=StateEnum.applied,
    message="Control change successfully applied"
)

# Recommendation - suggest multiple control changes
recommendation = Recommendation(
    resource=KRNAsset("asset-name"),
    type="optimization",
    control_changes=[control_change],
    expiration_date=timedelta(hours=1),
    auto_accepted=False
)

# Data Tag - add metadata to data
data_tag = DataTag(
    resource=KRNAsset("asset-name"),
    start_date=datetime.now(),
    tag_name="anomaly_detected",
    description="Temperature spike detected"
)

# App Parameters - update asset specific App Parameter configuration
param = AppParameter(
    resource=KRNAssetParameter("asset-name", "threshold"),
    value=100
)

params = AppParameters(parameters=[param, param1, param2 ...])

Message Handling

Stream Decorators

Stream decorators are the recommended way to handle incoming messages. They allow you to process messages based on specific criteria:

Use decorators to process messages based on specific criteria:

Message Stream Decorators Python Example
from kelvin.application import KelvinApp
from kelvin.message.typing import AssetDataMessage

app = KelvinApp()

# Process all messages
@app.stream()
async def handle_all(msg: AssetDataMessage):
    print(f"All messages: {msg.payload}")

# Filter by asset
@app.stream(assets=["asset-1", "asset-2"])
async def handle_specific_assets(msg: AssetDataMessage):
    print(f"From specific assets: {msg.payload}")

# Filter by input datastream
@app.stream(inputs=["temperature", "pressure"])
async def handle_specific_inputs(msg: AssetDataMessage):
    print(f"From specific inputs: {msg.payload}")

# Can also register functions directly
def my_handler(msg: AssetDataMessage):
    print(f"Handler: {msg.payload}")

app.stream(my_handler, inputs=["humidity"])

app.run()

Filters

Filters allow you to selectively process messages using queues or async streams:

Message Filters Python Example
from kelvin.application import KelvinApp, filters

app = KelvinApp()

async def main():
    await app.connect()

    # Using a queue with filters
    queue = app.filter(filters.input_equals("temperature"))
    while True:
        msg = await queue.get()
        print(f"Temperature: {msg.payload}")

    # Using async stream with filters
    stream = app.stream_filter(filters.asset_equals("asset-1"))
    async for msg in stream:
        print(f"Asset 1 data: {msg.payload}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Built-in Filters

Message Built-in Filters Python Example
from kelvin.application import filters

# Filter by input datastream name(s)
filters.input_equals("temperature")
filters.input_equals(["temperature", "pressure"])

# Filter by asset name(s)
filters.asset_equals("my-asset")
filters.asset_equals(["asset-1", "asset-2"])

# Filter by resource KRN
filters.resource_equals(krn_instance)
filters.resource_equals([krn1, krn2])

# Filter by message type
filters.is_data_message(msg)
filters.is_asset_data_message(msg)
filters.is_control_status_message(msg)
filters.is_custom_action(msg)
filters.is_data_quality_message(msg)

Custom Filters

You can create custom filter functions:

Custom Message Filters Python Example
1
2
3
4
5
def custom_filter(msg: Message) -> bool:
    """Filter for high-value readings"""
    return msg.payload > 100

queue = app.filter(custom_filter)

Callbacks

For advanced scenarios, you can define callbacks for specific lifecycle events. However, stream decorators are generally preferred for message processing:

Message Callbacks Python Example
from kelvin.application import KelvinApp, AssetInfo
from kelvin.message.typing import AssetDataMessage
from typing import Optional

app = KelvinApp()

async def on_connect():
    """Called when the app connects to Kelvin"""
    print("Connected to Kelvin platform")
    print(f"Configuration: {app.app_configuration}")
    print(f"Assets: {app.assets}")

async def on_asset_input(msg: AssetDataMessage):
    """Called for data messages from asset inputs"""
    print(f"Data from {msg.resource}: {msg.payload}")

async def on_control_change(msg: AssetDataMessage):
    """Called when control changes are received"""
    print(f"Control change for {msg.resource}: {msg.payload}")

async def on_asset_change(new_asset: Optional[AssetInfo], old_asset: Optional[AssetInfo]):
    """Called when assets are added, removed, or modified"""
    if new_asset is None:
        print(f"Asset removed: {old_asset.name}")
    else:
        print(f"Asset changed: {new_asset.name}")

async def on_app_configuration(config: dict):
    """Called when app configuration changes"""
    print(f"New configuration: {config}")

# Assign callbacks
app.on_connect = on_connect
app.on_asset_input = on_asset_input
app.on_control_change = on_control_change
app.on_asset_change = on_asset_change
app.on_app_configuration = on_app_configuration

app.run()

Publishing Messages

Use app.publish() to send messages to the Kelvin platform:

Publishing Messages Python Example
from kelvin.application import KelvinApp
from kelvin.message import Number, ControlChange, Recommendation
from kelvin.krn import KRNAssetDataStream, KRNAsset
from datetime import timedelta

app = KelvinApp()

@app.timer(interval=10)
async def publish_data():
    # Publish data
    await app.publish(
        Number(
            resource=KRNAssetDataStream("asset-1", "output"),
            payload=42.0
        )
    )

    # Publish control change
    await app.publish(
        ControlChange(
            resource=KRNAssetDataStream("asset-1", "setpoint"),
            payload=75.0,
            expiration_date=timedelta(minutes=5)
        )
    )

    # Publish recommendation
    await app.publish(
        Recommendation(
            resource=KRNAsset("asset-1"),
            type="optimization",
            control_changes=[...],
            expiration_date=timedelta(hours=1)
        )
    )

app.run()