fromkelvin.messageimportNumberfromkelvin.krnimportKRNAssetDataStreamapp=KelvinApp()# Create and Publish a Number every 10 seconds@app.timer(interval=10)asyncdefpublish_data():awaitapp.publish(Number(resource=KRNAssetDataStream("asset-1","motor_temperature_fahrenheit"),payload=97.3))app.run()
fromkelvin.messageimportBooleanfromkelvin.krnimportKRNAssetDataStreamapp=KelvinApp()# Create and Publish a Boolean every 10 seconds@app.timer(interval=10)asyncdefpublish_data():awaitapp.publish(Boolean(resource=KRNAssetDataStream("asset-1","motor_error"),payload=True))app.run()
fromkelvin.messageimportStringfromkelvin.krnimportKRNAssetDataStreamapp=KelvinApp()# Create and Publish a String every 10 seconds@app.timer(interval=10)asyncdefpublish_data():awaitapp.publish(String(resource=KRNAssetDataStream("asset-1","motor_error_description"),payload="Temperature is too high"))app.run()
For objects, the payload is an arbitrary dict with any given dict structure.
fromkelvin.messageimportString,KMessageTypeDatafromkelvin.krnimportKRNAssetDataStreamapp=KelvinApp()gpsd_dict={"latitude":90,"longitude":100}# Create and Publish an Object every 10 seconds@app.timer(interval=10)asyncdefpublish_data():awaitapp.publish(Message(type=KMessageTypeData(primitive="object",icd="gps"),resource=KRNAssetDataStream("asset-1","gps_data"),payload=gps_dict,))app.run()