ethernetip_example.py
"""Example: EtherNet/IP feature showcase.
Demonstrates config-driven EtherNet/IP features against a PLC:
- Connection separation (connection passed to constructor, not in config)
- Optional backplane route path from environment configuration
- Timing config for background polling
- Typed scalar tags (bool, integers, floats)
- Write limits (write_min / write_max)
- Background daemon polling via get_channel()
Configure the PLC endpoint with environment variables:
export ETHERNETIP_HOST=192.168.1.10
export ETHERNETIP_PORT=44818
export ETHERNETIP_SLOT=0
Writes are disabled by default so this can be pointed at a real PLC safely.
To run the write section:
export ETHERNETIP_ENABLE_WRITES=1
Then run:
python examples/ethernetip/ethernetip_example.py
"""
import os
import time
from pathlib import Path
from typing import Any
from instro.unstable.ethernetip import EtherNetIPDevice
CONFIG_PATH = Path(__file__).parent / "sample_device.json"
DEVICE_NAME = "cell_controller"
def connection_from_env() -> dict[str, Any]:
connection: dict[str, Any] = {
"host": os.environ.get("ETHERNETIP_HOST", "192.168.1.10"),
"port": int(os.environ.get("ETHERNETIP_PORT", "44818")),
}
if slot := os.environ.get("ETHERNETIP_SLOT"):
connection["route_path"] = {"hops": [{"type": "backplane", "slot": int(slot)}]}
return connection
def main() -> None:
device = EtherNetIPDevice(str(CONFIG_PATH), connection=connection_from_env(), autostart=True)
enable_writes = os.environ.get("ETHERNETIP_ENABLE_WRITES") == "1"
try:
# --- Typed reads ---
print("=== Typed Reads ===")
for alias in ("line_speed", "speed_setpoint", "running", "faulted", "batch_count"):
print(f" {alias}: {device.read_tag(alias).latest}")
# --- Background daemon: data is polled automatically at poll_interval ---
print("\n=== Background Daemon ===")
print(" Waiting for daemon to collect samples...")
time.sleep(3)
measurement = device.get_channel(f"{DEVICE_NAME}.line_speed", length=2)
print(f" line_speed (2 samples): {measurement.channel_data[f'{DEVICE_NAME}.line_speed']}")
# --- Writes: disabled by default for real PLC safety ---
print("\n=== Writes ===")
if not enable_writes:
print(" Skipping writes; set ETHERNETIP_ENABLE_WRITES=1 to run this section.")
return
device.write_tag("mode", 2)
print(f" Wrote mode=2 -> read back: {device.read_tag('mode').latest}")
device.write_tag("speed_setpoint", 120.0)
print(f" Wrote speed_setpoint=120.0 -> read back: {device.read_tag('speed_setpoint').latest}")
device.write_tag("run_command", True)
print(" Wrote run_command=True")
try:
device.write_tag("speed_setpoint", 999.0)
except ValueError as e:
print(f" Rejected speed_setpoint=999.0: {e}")
except KeyboardInterrupt:
print("\nStopping...")
finally:
device.close()
if __name__ == "__main__":
main()