Kelvin Messages
Messages are the primary interface for exchanging data with the Kelvin platform. Applications send and receive messages to communicate with assets and other components.
Message Types
The SDK provides pre-built message types for common data:
| Message Type |
Description |
| Number |
Handles all numeric data types including integers, floats, and unsigned values, etc. |
| String |
Represents any type of text or character data. |
| Boolean |
Represents only true or false logical values. |
| Messaging Types Python Example |
|---|
| from kelvin.message import Number, String, Boolean
from kelvin.krn import KRNAssetDataStream
# Create primitive data messages
number_msg = Number(
resource=KRNAssetDataStream("asset-name", "datastream-name"),
payload=42.5
)
string_msg = String(
resource=KRNAssetDataStream("asset-name", "datastream-name"),
payload="Hello Kelvin"
)
boolean_msg = Boolean(
resource=KRNAssetDataStream("asset-name", "datastream-name"),
payload=True
)
|
Message Builders
For more complex messages, use the provided MessageBuilder helpers:
| Message Type |
Description |
| ControlChange |
Requests a change in asset behavior by publishing new control values to a target resource such as a data stream. |
| ControlAck |
Acknowledges a received control change and reports its result or applied state. |
| Recommendation |
Suggests one or more control changes, typically for optimization or advisory purposes, without immediate execution. |
| DataTag |
Attaches metadata or annotations to an asset or data stream, often used to flag events or anomalies. |
| AppParameter |
Defines a single app configuration value such as a threshold or limit. |
| AppParameters |
Groups multiple app configuration parameters into a list to be included in a single update message. |
| StateEnum |
Enumerates possible states for control acknowledgment, such as pending, applied, or failed. |
| Message Builders Python Example |
|---|
| from kelvin.message import (
ControlChange,
ControlAck,
Recommendation,
DataTag,
AssetParameter,
AssetParameters,
StateEnum
)
from kelvin.krn import KRNAsset, KRNAssetDataStream, KRNAssetParameter
from datetime import datetime, timedelta
# Control Change - request a change in asset behavior and stating the last_value used to process the new value
control_change = ControlChange(
resource=KRNAssetDataStream("asset-name", "setpoint"),
from_value={
"value": last_value,
"timestamp": datetime.now(timezone.utc)
},
payload=75.0,
expiration_date=timedelta(minutes=10),
timeout=60,
retries=3
)
# Control Acknowledgement - respond to control changes
ack = ControlAck(
resource=KRNAssetDataStream("asset-name", "setpoint"),
state=StateEnum.applied,
message="Control change successfully applied"
)
# Recommendation - suggest multiple control changes
recommendation = Recommendation(
resource=KRNAsset("asset-name"),
type="optimization",
control_changes=[control_change],
expiration_date=timedelta(hours=1),
auto_accepted=False
)
# Data Tag - add metadata to data
data_tag = DataTag(
resource=KRNAsset("asset-name"),
start_date=datetime.now(),
tag_name="anomaly_detected",
description="Temperature spike detected"
)
# App Parameters - update asset specific App Parameter configuration
param = AppParameter(
resource=KRNAssetParameter("asset-name", "threshold"),
value=100
)
params = AppParameters(parameters=[param, param1, param2 ...])
|
Message Handling
Stream Decorators
Stream decorators are the recommended way to handle incoming messages. They allow you to process messages based on specific criteria:
Use decorators to process messages based on specific criteria:
| Message Stream Decorators Python Example |
|---|
| from kelvin.application import KelvinApp
from kelvin.message.typing import AssetDataMessage
app = KelvinApp()
# Process all messages
@app.stream()
async def handle_all(msg: AssetDataMessage):
print(f"All messages: {msg.payload}")
# Filter by asset
@app.stream(assets=["asset-1", "asset-2"])
async def handle_specific_assets(msg: AssetDataMessage):
print(f"From specific assets: {msg.payload}")
# Filter by input datastream
@app.stream(inputs=["temperature", "pressure"])
async def handle_specific_inputs(msg: AssetDataMessage):
print(f"From specific inputs: {msg.payload}")
# Can also register functions directly
def my_handler(msg: AssetDataMessage):
print(f"Handler: {msg.payload}")
app.stream(my_handler, inputs=["humidity"])
app.run()
|
Filters
Filters allow you to selectively process messages using queues or async streams:
| Message Filters Python Example |
|---|
| from kelvin.application import KelvinApp, filters
app = KelvinApp()
async def main():
await app.connect()
# Using a queue with filters
queue = app.filter(filters.input_equals("temperature"))
while True:
msg = await queue.get()
print(f"Temperature: {msg.payload}")
# Using async stream with filters
stream = app.stream_filter(filters.asset_equals("asset-1"))
async for msg in stream:
print(f"Asset 1 data: {msg.payload}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
|
Built-in Filters
| Message Built-in Filters Python Example |
|---|
| from kelvin.application import filters
# Filter by input datastream name(s)
filters.input_equals("temperature")
filters.input_equals(["temperature", "pressure"])
# Filter by asset name(s)
filters.asset_equals("my-asset")
filters.asset_equals(["asset-1", "asset-2"])
# Filter by resource KRN
filters.resource_equals(krn_instance)
filters.resource_equals([krn1, krn2])
# Filter by message type
filters.is_data_message(msg)
filters.is_asset_data_message(msg)
filters.is_control_status_message(msg)
filters.is_custom_action(msg)
filters.is_data_quality_message(msg)
|
Custom Filters
You can create custom filter functions:
| Custom Message Filters Python Example |
|---|
| def custom_filter(msg: Message) -> bool:
"""Filter for high-value readings"""
return msg.payload > 100
queue = app.filter(custom_filter)
|
Callbacks
For advanced scenarios, you can define callbacks for specific lifecycle events. However, stream decorators are generally preferred for message processing:
| Message Callbacks Python Example |
|---|
| from kelvin.application import KelvinApp, AssetInfo
from kelvin.message.typing import AssetDataMessage
from typing import Optional
app = KelvinApp()
async def on_connect():
"""Called when the app connects to Kelvin"""
print("Connected to Kelvin platform")
print(f"Configuration: {app.app_configuration}")
print(f"Assets: {app.assets}")
async def on_asset_input(msg: AssetDataMessage):
"""Called for data messages from asset inputs"""
print(f"Data from {msg.resource}: {msg.payload}")
async def on_control_change(msg: AssetDataMessage):
"""Called when control changes are received"""
print(f"Control change for {msg.resource}: {msg.payload}")
async def on_asset_change(new_asset: Optional[AssetInfo], old_asset: Optional[AssetInfo]):
"""Called when assets are added, removed, or modified"""
if new_asset is None:
print(f"Asset removed: {old_asset.name}")
else:
print(f"Asset changed: {new_asset.name}")
async def on_app_configuration(config: dict):
"""Called when app configuration changes"""
print(f"New configuration: {config}")
# Assign callbacks
app.on_connect = on_connect
app.on_asset_input = on_asset_input
app.on_control_change = on_control_change
app.on_asset_change = on_asset_change
app.on_app_configuration = on_app_configuration
app.run()
|
Publishing Messages
Use app.publish() to send messages to the Kelvin platform:
| Publishing Messages Python Example |
|---|
| from kelvin.application import KelvinApp
from kelvin.message import Number, ControlChange, Recommendation
from kelvin.krn import KRNAssetDataStream, KRNAsset
from datetime import timedelta
app = KelvinApp()
@app.timer(interval=10)
async def publish_data():
# Publish data
await app.publish(
Number(
resource=KRNAssetDataStream("asset-1", "output"),
payload=42.0
)
)
# Publish control change
await app.publish(
ControlChange(
resource=KRNAssetDataStream("asset-1", "setpoint"),
payload=75.0,
expiration_date=timedelta(minutes=5)
)
)
# Publish recommendation
await app.publish(
Recommendation(
resource=KRNAsset("asset-1"),
type="optimization",
control_changes=[...],
expiration_date=timedelta(hours=1)
)
)
app.run()
|