Skip to content

Data Windows

Data windows aggregate incoming data over time or count, returning pandas DataFrames for analysis. All window operations require the ai optional dependency.

Note

The examples given here are just the basics.

For more detailed and specific examples on windowing in Kelvin, you can read about them in the Develop SmartApps section here.

Tumbling Window

Non-overlapping, fixed-size time windows:

Basic Tumbling Window Python Example
import asyncio
from datetime import datetime, timedelta
from kelvin.application import KelvinApp

app = KelvinApp()

async def main():
    await app.connect()

    # Process data in 10-second non-overlapping windows
    window_start = datetime.now()
    async for asset_name, df in app.tumbling_window(
        window_size=timedelta(seconds=10)
    ).stream(window_start):
        print(f"Asset: {asset_name}")
        print(df)  # pandas DataFrame with all data in window

if __name__ == "__main__":
    asyncio.run(main())

Use case: Aggregate data every N seconds for batch processing (e.g., computing averages every 10 seconds).

Hopping Window

Overlapping, fixed-size windows with a configurable hop interval:

Basic Hopping Window Python Example
import asyncio
from datetime import datetime, timedelta
from kelvin.application import KelvinApp

app = KelvinApp()

async def main():
    await app.connect()

    # 10-second windows, moving forward by 5 seconds each time
    window_start = datetime.now()
    async for asset_name, df in app.hopping_window(
        window_size=timedelta(seconds=10),
        hop_size=timedelta(seconds=5)
    ).stream(window_start=window_start):
        print(f"Asset: {asset_name}")
        print(df)  # pandas DataFrame with overlapping data

if __name__ == "__main__":
    asyncio.run(main())

Use case: Sliding window analysis where you want overlapping data (e.g., moving averages with 50% overlap).

Rolling Window

Count-based windows that slide with each new message:

Basic Rolling Window Python Example
import asyncio
from kelvin.application import KelvinApp

app = KelvinApp()

async def main():
    await app.connect()

    # Window of last 5 messages, slides by 2 messages
    async for asset_name, df in app.rolling_window(
        count_size=5,
        slide=2
    ).stream():
        print(f"Asset: {asset_name}")
        print(df)  # pandas DataFrame with last 5 messages

if __name__ == "__main__":
    asyncio.run(main())

Use case: Process the last N data points (e.g., calculate trend over the last 10 readings).