#!/usr/bin/env python3 """ UAV Runner for AERPAW - aerpawlib BasicRunner implementation. Run via: python -m aerpawlib --script client.uav_runner --conn --vehicle drone Or use the auto-detecting wrapper: ./run_uav.sh Binary Protocol: Server sends: TARGET (1 byte) + x,y,z (24 bytes as 3 doubles) Client sends: ACK (1 byte) Client (after moving): READY (1 byte) Server sends: RTL (1 byte) Client sends: ACK (1 byte) Client (after returning home): READY (1 byte) Server sends: LAND (1 byte) Client sends: ACK (1 byte) Client (after landing): READY (1 byte) Server sends: READY (1 byte) - mission complete, disconnect """ from enum import IntEnum from pathlib import Path import asyncio import os import socket import struct import yaml from aerpawlib.runner import BasicRunner, entrypoint from aerpawlib.util import Coordinate, VectorNED from aerpawlib.vehicle import Drone # Message types - must match MESSAGE_TYPE.m enum class MessageType(IntEnum): TARGET = 1 ACK = 2 READY = 3 RTL = 4 LAND = 5 AERPAW_DIR = Path(__file__).parent.parent CONFIG_FILE = AERPAW_DIR / "config" / "config.yaml" def load_config(): """Load configuration from YAML file.""" with open(CONFIG_FILE, 'r') as f: return yaml.safe_load(f) def get_environment(): """Get environment from AERPAW_ENV variable. Fails if not set.""" env = os.environ.get('AERPAW_ENV') if env is None: raise RuntimeError( "AERPAW_ENV environment variable not set. " "Set to 'local' or 'testbed', or use: ./run_uav.sh [local|testbed]" ) if env not in ('local', 'testbed'): raise RuntimeError( f"Invalid AERPAW_ENV '{env}'. Must be 'local' or 'testbed'." ) return env def recv_exactly(sock: socket.socket, n: int) -> bytes: """Receive exactly n bytes from socket.""" data = b'' while len(data) < n: chunk = sock.recv(n - len(data)) if not chunk: raise ConnectionError("Connection closed while receiving data") data += chunk return data def recv_message_type(sock: socket.socket) -> MessageType: """Receive a single-byte message type.""" data = recv_exactly(sock, 1) return MessageType(data[0]) def send_message_type(sock: socket.socket, msg_type: MessageType): """Send a single-byte message type.""" sock.sendall(bytes([msg_type])) def recv_target(sock: socket.socket) -> tuple[float, float, float]: """Receive TARGET message (1 byte type + 3 doubles). Returns (x, y, z) ENU coordinates. """ # First byte is message type (already consumed or check it) msg_type = recv_message_type(sock) if msg_type != MessageType.TARGET: raise ValueError(f"Expected TARGET, got {msg_type.name}") # Next 24 bytes are 3 doubles (little-endian) data = recv_exactly(sock, 24) x, y, z = struct.unpack('