added aerpawlib capabilities to uav script

This commit is contained in:
2026-01-29 20:56:54 -08:00
parent 6604928f8f
commit 338bb6c340
4 changed files with 165 additions and 18 deletions

3
.gitignore vendored
View File

@@ -52,3 +52,6 @@ sandbox/*
# Python Virtual Environment
aerpaw/venv/
aerpaw/venv/*
# Pycache
__pycache__

View File

@@ -3,21 +3,69 @@
UAV Client for AERPAW Target Location Protocol
Protocol:
Server sends: TARGET:x,y,z
Server sends: TARGET:x,y,z (ENU coordinates in meters)
Client sends: ACK:TARGET
Client (after moving): READY
Server sends: FINISHED
Coordinates:
- Server sends local ENU (East, North, Up) coordinates in meters
- Client converts to lat/lon using shared origin from config/origin.txt
- Origin must match between controller and UAV
Auto-detection:
- If flight controller is available, uses aerpawlib to move drone
- If no flight controller, runs in test mode with placeholder movement
"""
import asyncio
import socket
import sys
import time
from pathlib import Path
from typing import Optional, Tuple
# Get the aerpaw directory (parent of client/)
AERPAW_DIR = Path(__file__).parent.parent
CONFIG_DIR = AERPAW_DIR / "config"
# Server configuration
SERVER_IP = "127.0.0.1"
SERVER_PORT = 5000
def parse_target(message: str) -> tuple:
"""Parse TARGET:x,y,z message and return (x, y, z) coordinates."""
def load_origin(path: Path) -> Tuple[float, float, float]:
"""
Load ENU origin from config file.
Returns (lat, lon, alt) tuple.
"""
with open(path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split(',')
if len(parts) == 3:
return float(parts[0]), float(parts[1]), float(parts[2])
raise ValueError(f"No valid origin found in {path}")
def load_connection(path: Path) -> str:
"""
Load drone connection string from config file.
Returns connection string (e.g., 'udp:127.0.0.1:14550').
"""
with open(path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
return line
raise ValueError(f"No valid connection string found in {path}")
def parse_target(message: str) -> Tuple[float, float, float]:
"""Parse TARGET:x,y,z message and return (x, y, z) ENU coordinates."""
if not message.startswith("TARGET:"):
raise ValueError(f"Invalid message format: {message}")
@@ -30,21 +78,91 @@ def parse_target(message: str) -> tuple:
return (x, y, z)
def move_to_target(x: float, y: float, z: float):
def enu_to_coordinate(origin_lat: float, origin_lon: float, origin_alt: float,
enu_x: float, enu_y: float, enu_z: float):
"""
Placeholder for AERPAW API integration.
In the future, this will command the UAV to move to (x, y, z).
Convert ENU (East, North, Up) coordinates to lat/lon/alt.
Args:
origin_lat, origin_lon, origin_alt: Origin in degrees/meters
enu_x: East offset in meters
enu_y: North offset in meters
enu_z: Up offset in meters (altitude above origin)
Returns:
aerpawlib Coordinate object
"""
print(f" [PLACEHOLDER] Moving to target: ({x}, {y}, {z})")
# Simulate movement time
from aerpawlib.util import Coordinate, VectorNED
origin = Coordinate(origin_lat, origin_lon, origin_alt)
# ENU to NED: north=enu_y, east=enu_x, down=-enu_z
offset = VectorNED(north=enu_y, east=enu_x, down=-enu_z)
return origin + offset
def try_connect_drone(conn_str: str, timeout: float = 10.0):
"""
Try to connect to drone flight controller.
Returns Drone object if successful, None if not available.
"""
try:
from aerpawlib.vehicle import Drone
print(f" Attempting to connect to flight controller: {conn_str}")
drone = Drone(conn_str)
print(f" Connected to flight controller")
return drone
except Exception as e:
print(f" Could not connect to flight controller: {e}")
return None
def move_to_target_test(enu_x: float, enu_y: float, enu_z: float,
lat: float, lon: float, alt: float):
"""
Placeholder movement for test mode.
"""
print(f" [TEST MODE] Target ENU: ({enu_x}, {enu_y}, {enu_z}) meters")
print(f" [TEST MODE] Target lat/lon: ({lat:.6f}, {lon:.6f}, {alt:.1f})")
print(f" [TEST MODE] Simulating movement...")
time.sleep(1.0)
print(f" [PLACEHOLDER] Arrived at target: ({x}, {y}, {z})")
print(f" [TEST MODE] Arrived at target")
def run_uav_client(client_id: int):
async def move_to_target_real(drone, target_coord, tolerance: float = 2.0):
"""
Move drone to target using aerpawlib.
"""
print(f" [REAL MODE] Moving to: ({target_coord.lat:.6f}, {target_coord.lon:.6f}, {target_coord.alt:.1f})")
await drone.goto_coordinates(target_coord, tolerance=tolerance)
print(f" [REAL MODE] Arrived at target")
async def run_uav_client(client_id: int):
"""Run a single UAV client."""
print(f"UAV {client_id}: Connecting to {SERVER_IP}:{SERVER_PORT}")
print(f"UAV {client_id}: Starting...")
# Load configuration
origin_path = CONFIG_DIR / "origin.txt"
connection_path = CONFIG_DIR / "connection.txt"
print(f"UAV {client_id}: Loading origin from {origin_path}")
origin_lat, origin_lon, origin_alt = load_origin(origin_path)
print(f"UAV {client_id}: Origin: lat={origin_lat}, lon={origin_lon}, alt={origin_alt}")
print(f"UAV {client_id}: Loading connection from {connection_path}")
conn_str = load_connection(connection_path)
# Auto-detect mode based on flight controller availability
drone = try_connect_drone(conn_str)
real_mode = drone is not None
if real_mode:
print(f"UAV {client_id}: Running in REAL mode")
else:
print(f"UAV {client_id}: Running in TEST mode (no flight controller)")
# Connect to controller server
print(f"UAV {client_id}: Connecting to controller at {SERVER_IP}:{SERVER_PORT}")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((SERVER_IP, SERVER_PORT))
@@ -53,16 +171,25 @@ def run_uav_client(client_id: int):
data = s.recv(1024).decode().strip()
print(f"UAV {client_id}: Received: {data}")
# Parse target coordinates
x, y, z = parse_target(data)
print(f"UAV {client_id}: Parsed target: x={x}, y={y}, z={z}")
# Parse target coordinates (ENU)
enu_x, enu_y, enu_z = parse_target(data)
print(f"UAV {client_id}: Parsed ENU target: x={enu_x}, y={enu_y}, z={enu_z}")
# Convert ENU to lat/lon
target_coord = enu_to_coordinate(origin_lat, origin_lon, origin_alt,
enu_x, enu_y, enu_z)
print(f"UAV {client_id}: Target coordinate: lat={target_coord.lat:.6f}, lon={target_coord.lon:.6f}, alt={target_coord.alt:.1f}")
# Send acknowledgment
s.sendall(b"ACK:TARGET")
print(f"UAV {client_id}: Sent ACK:TARGET")
# Move to target (placeholder)
move_to_target(x, y, z)
# Move to target
if real_mode:
await move_to_target_real(drone, target_coord)
else:
move_to_target_test(enu_x, enu_y, enu_z,
target_coord.lat, target_coord.lon, target_coord.alt)
# Signal ready
s.sendall(b"READY")
@@ -82,10 +209,12 @@ def run_uav_client(client_id: int):
finally:
s.close()
if drone:
drone.close()
print(f"UAV {client_id}: Connection closed")
if __name__ == "__main__":
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <client_id>")
print(" Run one instance per UAV, e.g.:")
@@ -94,4 +223,8 @@ if __name__ == "__main__":
sys.exit(1)
client_id = int(sys.argv[1])
run_uav_client(client_id)
asyncio.run(run_uav_client(client_id))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,7 @@
# Drone connection string for aerpawlib/dronekit
# Examples:
# tcp:127.0.0.1:5760 - SITL TCP
# udp:127.0.0.1:14550 - SITL UDP (default MAVProxy output)
# /dev/ttyUSB0 - Serial connection
# /dev/ttyACM0 - Serial connection (Pixhawk USB)
udp:127.0.0.1:14550

4
aerpaw/config/origin.txt Normal file
View File

@@ -0,0 +1,4 @@
# Origin for ENU coordinate system (AERPAW Lake Wheeler Road Field)
# Format: lat,lon,alt (degrees, degrees, meters)
# Alt=0 means ENU z directly becomes the target altitude above home
35.727436,-78.696471,0.0