Skip to main content
ethernetip_example.py
"""Example: EtherNet/IP feature showcase.

Demonstrates config-driven EtherNet/IP features against a PLC:
- Connection separation (connection passed to constructor, not in config)
- Optional backplane route path from environment configuration
- Timing config for background polling
- Typed scalar tags (bool, integers, floats)
- Write limits (write_min / write_max)
- Background daemon polling via get_channel()

Configure the PLC endpoint with environment variables:
    export ETHERNETIP_HOST=192.168.1.10
    export ETHERNETIP_PORT=44818
    export ETHERNETIP_SLOT=0

Writes are disabled by default so this can be pointed at a real PLC safely.
To run the write section:
    export ETHERNETIP_ENABLE_WRITES=1

Then run:
    python examples/ethernetip/ethernetip_example.py
"""

import os
import time
from pathlib import Path
from typing import Any

from instro.unstable.ethernetip import EtherNetIPDevice

CONFIG_PATH = Path(__file__).parent / "sample_device.json"
DEVICE_NAME = "cell_controller"


def connection_from_env() -> dict[str, Any]:
    connection: dict[str, Any] = {
        "host": os.environ.get("ETHERNETIP_HOST", "192.168.1.10"),
        "port": int(os.environ.get("ETHERNETIP_PORT", "44818")),
    }

    if slot := os.environ.get("ETHERNETIP_SLOT"):
        connection["route_path"] = {"hops": [{"type": "backplane", "slot": int(slot)}]}

    return connection


def main() -> None:
    device = EtherNetIPDevice(str(CONFIG_PATH), connection=connection_from_env(), autostart=True)
    enable_writes = os.environ.get("ETHERNETIP_ENABLE_WRITES") == "1"

    try:
        # --- Typed reads ---
        print("=== Typed Reads ===")
        for alias in ("line_speed", "speed_setpoint", "running", "faulted", "batch_count"):
            print(f"  {alias}: {device.read_tag(alias).latest}")

        # --- Background daemon: data is polled automatically at poll_interval ---
        print("\n=== Background Daemon ===")
        print("  Waiting for daemon to collect samples...")
        time.sleep(3)
        measurement = device.get_channel(f"{DEVICE_NAME}.line_speed", length=2)
        print(f"  line_speed (2 samples): {measurement.channel_data[f'{DEVICE_NAME}.line_speed']}")

        # --- Writes: disabled by default for real PLC safety ---
        print("\n=== Writes ===")
        if not enable_writes:
            print("  Skipping writes; set ETHERNETIP_ENABLE_WRITES=1 to run this section.")
            return

        device.write_tag("mode", 2)
        print(f"  Wrote mode=2 -> read back: {device.read_tag('mode').latest}")

        device.write_tag("speed_setpoint", 120.0)
        print(f"  Wrote speed_setpoint=120.0 -> read back: {device.read_tag('speed_setpoint').latest}")

        device.write_tag("run_command", True)
        print("  Wrote run_command=True")

        try:
            device.write_tag("speed_setpoint", 999.0)
        except ValueError as e:
            print(f"  Rejected speed_setpoint=999.0: {e}")

    except KeyboardInterrupt:
        print("\nStopping...")
    finally:
        device.close()


if __name__ == "__main__":
    main()