automotive-expert by personamanagmentlayer/pcl
npx skills add https://github.com/personamanagmentlayer/pcl --skill automotive-expert为汽车系统、联网车辆、车队管理、远程信息处理、高级驾驶辅助系统(ADAS)和汽车软件开发提供专业指导。
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
from decimal import Decimal
from enum import Enum
import numpy as np
class VehicleStatus(Enum):
ACTIVE = "active"
IDLE = "idle"
MAINTENANCE = "maintenance"
OUT_OF_SERVICE = "out_of_service"
class FuelType(Enum):
GASOLINE = "gasoline"
DIESEL = "diesel"
ELECTRIC = "electric"
HYBRID = "hybrid"
CNG = "cng"
@dataclass
class Vehicle:
"""Fleet vehicle information"""
vehicle_id: str
vin: str # Vehicle Identification Number
make: str
model: str
year: int
license_plate: str
fuel_type: FuelType
status: VehicleStatus
odometer_km: int
last_service_km: int
next_service_km: int
assigned_driver_id: Optional[str]
location: tuple # (latitude, longitude)
fuel_level_percent: float
@dataclass
class Trip:
"""Vehicle trip record"""
trip_id: str
vehicle_id: str
driver_id: str
start_time: datetime
end_time: Optional[datetime]
start_location: tuple
end_location: Optional[tuple]
distance_km: float
fuel_consumed_liters: float
average_speed_kmh: float
max_speed_kmh: float
harsh_braking_count: int
harsh_acceleration_count: int
class FleetManagementSystem:
"""Fleet management and telematics system"""
def __init__(self):
self.vehicles = {}
self.trips = []
self.maintenance_schedules = []
def track_vehicle_location(self, vehicle_id: str) -> dict:
"""Track real-time vehicle location"""
vehicle = self.vehicles.get(vehicle_id)
if not vehicle:
return {'error': 'Vehicle not found'}
# Get GPS data from telematics device
location = self._get_gps_location(vehicle_id)
speed = self._get_current_speed(vehicle_id)
heading = self._get_heading(vehicle_id)
vehicle.location = location
return {
'vehicle_id': vehicle_id,
'location': {
'latitude': location[0],
'longitude': location[1]
},
'speed_kmh': speed,
'heading': heading,
'timestamp': datetime.now().isoformat(),
'status': vehicle.status.value
}
def start_trip(self, vehicle_id: str, driver_id: str) -> Trip:
"""Start a new trip"""
vehicle = self.vehicles.get(vehicle_id)
if not vehicle:
raise ValueError("Vehicle not found")
trip = Trip(
trip_id=self._generate_trip_id(),
vehicle_id=vehicle_id,
driver_id=driver_id,
start_time=datetime.now(),
end_time=None,
start_location=vehicle.location,
end_location=None,
distance_km=0.0,
fuel_consumed_liters=0.0,
average_speed_kmh=0.0,
max_speed_kmh=0.0,
harsh_braking_count=0,
harsh_acceleration_count=0
)
vehicle.status = VehicleStatus.ACTIVE
self.trips.append(trip)
return trip
def end_trip(self, trip_id: str) -> dict:
"""End trip and calculate metrics"""
trip = next((t for t in self.trips if t.trip_id == trip_id), None)
if not trip:
return {'error': 'Trip not found'}
vehicle = self.vehicles.get(trip.vehicle_id)
trip.end_time = datetime.now()
trip.end_location = vehicle.location
# Calculate trip metrics
duration_hours = (trip.end_time - trip.start_time).total_seconds() / 3600
trip.average_speed_kmh = trip.distance_km / duration_hours if duration_hours > 0 else 0
# Calculate fuel efficiency
fuel_efficiency = trip.distance_km / trip.fuel_consumed_liters if trip.fuel_consumed_liters > 0 else 0
# Calculate driver score
driver_score = self._calculate_driver_score(trip)
vehicle.status = VehicleStatus.IDLE
return {
'trip_id': trip_id,
'duration_hours': duration_hours,
'distance_km': trip.distance_km,
'fuel_consumed': trip.fuel_consumed_liters,
'fuel_efficiency_km_per_liter': fuel_efficiency,
'average_speed': trip.average_speed_kmh,
'max_speed': trip.max_speed_kmh,
'harsh_events': trip.harsh_braking_count + trip.harsh_acceleration_count,
'driver_score': driver_score
}
def _calculate_driver_score(self, trip: Trip) -> float:
"""Calculate driver safety score"""
score = 100.0
# Penalize harsh events
score -= trip.harsh_braking_count * 5
score -= trip.harsh_acceleration_count * 5
# Penalize speeding
if trip.max_speed_kmh > 120:
score -= (trip.max_speed_kmh - 120) * 0.5
# Penalize low fuel efficiency
# Implementation would compare to vehicle baseline
return max(0.0, min(100.0, score))
def schedule_maintenance(self, vehicle_id: str) -> dict:
"""Schedule vehicle maintenance"""
vehicle = self.vehicles.get(vehicle_id)
if not vehicle:
return {'error': 'Vehicle not found'}
# Check if maintenance is due
km_since_service = vehicle.odometer_km - vehicle.last_service_km
km_until_service = vehicle.next_service_km - vehicle.odometer_km
if km_until_service <= 1000: # Within 1000km of service
maintenance_type = self._determine_maintenance_type(km_since_service)
schedule = {
'vehicle_id': vehicle_id,
'maintenance_type': maintenance_type,
'current_odometer': vehicle.odometer_km,
'recommended_by_odometer': vehicle.next_service_km,
'urgency': 'high' if km_until_service <= 500 else 'medium',
'estimated_cost': self._estimate_maintenance_cost(maintenance_type)
}
self.maintenance_schedules.append(schedule)
return schedule
return {
'vehicle_id': vehicle_id,
'maintenance_required': False,
'km_until_service': km_until_service
}
def optimize_routes(self, deliveries: List[dict]) -> dict:
"""Optimize delivery routes for fleet"""
# Simplified route optimization
# In production, would use sophisticated algorithms (TSP, VRP)
available_vehicles = [
v for v in self.vehicles.values()
if v.status == VehicleStatus.IDLE
]
if not available_vehicles:
return {'error': 'No available vehicles'}
# Assign deliveries to vehicles
assignments = []
for i, delivery in enumerate(deliveries):
vehicle = available_vehicles[i % len(available_vehicles)]
route = self._calculate_route(
vehicle.location,
delivery['destination']
)
assignments.append({
'vehicle_id': vehicle.vehicle_id,
'delivery_id': delivery['delivery_id'],
'route': route,
'estimated_distance_km': route['distance'],
'estimated_time_minutes': route['duration'],
'estimated_fuel_cost': self._estimate_fuel_cost(
route['distance'],
vehicle.fuel_type
)
})
return {
'total_deliveries': len(deliveries),
'vehicles_assigned': len(set(a['vehicle_id'] for a in assignments)),
'assignments': assignments,
'total_distance_km': sum(a['estimated_distance_km'] for a in assignments),
'total_estimated_cost': sum(a['estimated_fuel_cost'] for a in assignments)
}
def analyze_fleet_utilization(self) -> dict:
"""Analyze fleet utilization and efficiency"""
total_vehicles = len(self.vehicles)
active = sum(1 for v in self.vehicles.values() if v.status == VehicleStatus.ACTIVE)
idle = sum(1 for v in self.vehicles.values() if v.status == VehicleStatus.IDLE)
maintenance = sum(1 for v in self.vehicles.values() if v.status == VehicleStatus.MAINTENANCE)
utilization_rate = (active / total_vehicles * 100) if total_vehicles > 0 else 0
# Calculate average fuel efficiency
recent_trips = self.trips[-100:] # Last 100 trips
if recent_trips:
avg_fuel_efficiency = np.mean([
t.distance_km / t.fuel_consumed_liters
for t in recent_trips
if t.fuel_consumed_liters > 0
])
else:
avg_fuel_efficiency = 0
return {
'total_vehicles': total_vehicles,
'status_breakdown': {
'active': active,
'idle': idle,
'maintenance': maintenance,
'out_of_service': total_vehicles - active - idle - maintenance
},
'utilization_rate': utilization_rate,
'average_fuel_efficiency': avg_fuel_efficiency,
'recommendation': 'Reduce fleet size' if utilization_rate < 60 else
'Expand fleet' if utilization_rate > 90 else
'Optimal'
}
def _determine_maintenance_type(self, km_since_service: int) -> str:
"""Determine type of maintenance required"""
if km_since_service >= 100000:
return "major_service"
elif km_since_service >= 50000:
return "intermediate_service"
else:
return "routine_service"
def _estimate_maintenance_cost(self, maintenance_type: str) -> Decimal:
"""Estimate maintenance cost"""
costs = {
'routine_service': Decimal('150'),
'intermediate_service': Decimal('500'),
'major_service': Decimal('1500')
}
return costs.get(maintenance_type, Decimal('200'))
def _calculate_route(self, start: tuple, end: tuple) -> dict:
"""Calculate route between two points"""
# Would use routing API (Google Maps, Mapbox, etc.)
# Simplified calculation
from math import radians, sin, cos, sqrt, atan2
lat1, lon1 = radians(start[0]), radians(start[1])
lat2, lon2 = radians(end[0]), radians(end[1])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
distance_km = 6371 * c # Earth radius in km
return {
'distance': distance_km,
'duration': distance_km / 60 * 60 # Assume 60 km/h average, return minutes
}
def _estimate_fuel_cost(self, distance_km: float, fuel_type: FuelType) -> Decimal:
"""Estimate fuel cost for trip"""
fuel_prices = {
FuelType.GASOLINE: Decimal('1.50'), # per liter
FuelType.DIESEL: Decimal('1.40'),
FuelType.ELECTRIC: Decimal('0.30'), # per kWh equivalent
FuelType.HYBRID: Decimal('1.20'),
FuelType.CNG: Decimal('1.00')
}
fuel_efficiency = 8.0 # km per liter (average)
fuel_needed = distance_km / fuel_efficiency
fuel_price = fuel_prices.get(fuel_type, Decimal('1.50'))
return Decimal(str(fuel_needed)) * fuel_price
def _get_gps_location(self, vehicle_id: str) -> tuple:
"""Get GPS location from telematics device"""
# Implementation would connect to telematics API
return (40.7128, -74.0060) # Placeholder
def _get_current_speed(self, vehicle_id: str) -> float:
"""Get current vehicle speed"""
return np.random.uniform(0, 100) # Placeholder
def _get_heading(self, vehicle_id: str) -> float:
"""Get vehicle heading in degrees"""
return np.random.uniform(0, 360) # Placeholder
def _generate_trip_id(self) -> str:
import uuid
return f"TRIP-{uuid.uuid4().hex[:10].upper()}"
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
@dataclass
class VehicleTelemetry:
"""Real-time vehicle telemetry data"""
vehicle_id: str
timestamp: datetime
location: tuple
speed_kmh: float
rpm: int
engine_temp_c: float
battery_voltage: float
fuel_level_percent: float
odometer_km: int
dtc_codes: List[str] # Diagnostic Trouble Codes
class ConnectedVehiclePlatform:
"""Connected car platform with OTA updates"""
def __init__(self):
self.vehicles = {}
self.telemetry_buffer = []
self.ota_updates = {}
def process_telemetry(self, telemetry: VehicleTelemetry) -> dict:
"""Process incoming telemetry data"""
self.telemetry_buffer.append(telemetry)
# Analyze telemetry for anomalies
alerts = []
# Check engine temperature
if telemetry.engine_temp_c > 110:
alerts.append({
'type': 'high_engine_temp',
'severity': 'warning',
'value': telemetry.engine_temp_c,
'message': 'Engine temperature above normal'
})
# Check battery voltage
if telemetry.battery_voltage < 12.0:
alerts.append({
'type': 'low_battery',
'severity': 'warning',
'value': telemetry.battery_voltage,
'message': 'Battery voltage low'
})
# Check for diagnostic trouble codes
if telemetry.dtc_codes:
alerts.append({
'type': 'dtc_codes',
'severity': 'critical',
'codes': telemetry.dtc_codes,
'message': f'{len(telemetry.dtc_codes)} diagnostic code(s) detected'
})
# Check for harsh driving
if len(self.telemetry_buffer) >= 2:
prev = self.telemetry_buffer[-2]
if telemetry.vehicle_id == prev.vehicle_id:
time_diff = (telemetry.timestamp - prev.timestamp).total_seconds()
if time_diff > 0:
acceleration = (telemetry.speed_kmh - prev.speed_kmh) / time_diff
if abs(acceleration) > 5: # > 5 km/h per second
alerts.append({
'type': 'harsh_driving',
'severity': 'info',
'acceleration': acceleration,
'message': 'Harsh acceleration/braking detected'
})
return {
'vehicle_id': telemetry.vehicle_id,
'timestamp': telemetry.timestamp.isoformat(),
'alerts': alerts,
'health_score': self._calculate_vehicle_health(telemetry)
}
def deploy_ota_update(self,
vehicle_ids: List[str],
update_package: dict) -> dict:
"""Deploy over-the-air software update"""
update_id = self._generate_update_id()
ota_update = {
'update_id': update_id,
'version': update_package['version'],
'description': update_package['description'],
'package_size_mb': update_package['size_mb'],
'target_vehicles': vehicle_ids,
'deployed_at': datetime.now(),
'status_by_vehicle': {}
}
for vehicle_id in vehicle_ids:
# Schedule update for vehicle
ota_update['status_by_vehicle'][vehicle_id] = {
'status': 'scheduled',
'download_progress': 0,
'install_progress': 0
}
self.ota_updates[update_id] = ota_update
return {
'update_id': update_id,
'vehicles_targeted': len(vehicle_ids),
'estimated_completion': 'Within 48 hours'
}
def diagnose_vehicle(self, vehicle_id: str, dtc_codes: List[str]) -> dict:
"""Diagnose vehicle issues from DTC codes"""
diagnoses = []
for code in dtc_codes:
diagnosis = self._lookup_dtc_code(code)
diagnoses.append(diagnosis)
# Calculate severity
max_severity = max(d['severity'] for d in diagnoses)
return {
'vehicle_id': vehicle_id,
'dtc_codes': dtc_codes,
'diagnoses': diagnoses,
'overall_severity': max_severity,
'service_recommended': max_severity in ['high', 'critical']
}
def _calculate_vehicle_health(self, telemetry: VehicleTelemetry) -> float:
"""Calculate overall vehicle health score"""
score = 100.0
# Engine temperature
if telemetry.engine_temp_c > 110:
score -= 15
elif telemetry.engine_temp_c > 100:
score -= 5
# Battery voltage
if telemetry.battery_voltage < 11.5:
score -= 20
elif telemetry.battery_voltage < 12.0:
score -= 10
# DTC codes
score -= len(telemetry.dtc_codes) * 15
return max(0.0, score)
def _lookup_dtc_code(self, code: str) -> dict:
"""Lookup diagnostic trouble code"""
# Simplified DTC lookup
# In production, would use comprehensive OBD-II code database
dtc_database = {
'P0171': {
'description': 'System Too Lean (Bank 1)',
'severity': 'medium',
'possible_causes': ['Vacuum leak', 'Faulty MAF sensor', 'Fuel filter clogged']
},
'P0300': {
'description': 'Random/Multiple Cylinder Misfire Detected',
'severity': 'high',
'possible_causes': ['Faulty spark plugs', 'Ignition coil failure', 'Fuel injector issue']
}
}
return dtc_database.get(code, {
'description': f'Unknown code: {code}',
'severity': 'medium',
'possible_causes': ['Requires diagnostic scan']
})
def _generate_update_id(self) -> str:
import uuid
return f"OTA-{uuid.uuid4().hex[:8].upper()}"
class ElectricVehicleManagement:
"""EV-specific management functions"""
def __init__(self):
self.charging_stations = {}
self.charging_sessions = []
def calculate_range(self,
battery_capacity_kwh: float,
battery_soc_percent: float,
consumption_kwh_per_km: float) -> dict:
"""Calculate remaining range for EV"""
available_energy = battery_capacity_kwh * (battery_soc_percent / 100)
range_km = available_energy / consumption_kwh_per_km
# Adjust for temperature (simplified)
# Cold weather reduces range by up to 40%
temperature_factor = 0.8 # Assume moderate conditions
adjusted_range = range_km * temperature_factor
return {
'nominal_range_km': range_km,
'adjusted_range_km': adjusted_range,
'battery_soc_percent': battery_soc_percent,
'available_energy_kwh': available_energy
}
def find_charging_stations(self,
current_location: tuple,
max_distance_km: float) -> List[dict]:
"""Find nearby charging stations"""
nearby_stations = []
for station_id, station in self.charging_stations.items():
distance = self._calculate_distance(current_location, station['location'])
if distance <= max_distance_km:
nearby_stations.append({
'station_id': station_id,
'name': station['name'],
'location': station['location'],
'distance_km': distance,
'available_chargers': station['available_chargers'],
'charging_speed_kw': station['max_power_kw'],
'cost_per_kwh': station['cost_per_kwh']
})
# Sort by distance
nearby_stations.sort(key=lambda x: x['distance_km'])
return nearby_stations
def optimize_charging_schedule(self,
battery_capacity_kwh: float,
current_soc_percent: float,
target_soc_percent: float,
departure_time: datetime) -> dict:
"""Optimize EV charging schedule based on electricity rates"""
energy_needed = battery_capacity_kwh * ((target_soc_percent - current_soc_percent) / 100)
# Get electricity rate schedule
rate_schedule = self._get_electricity_rates(departure_time)
# Find lowest rate period
optimal_period = min(rate_schedule, key=lambda x: x['rate'])
charging_duration_hours = energy_needed / 7.0 # Assume 7kW home charger
return {
'energy_needed_kwh': energy_needed,
'optimal_start_time': optimal_period['start_time'].isoformat(),
'charging_duration_hours': charging_duration_hours,
'estimated_cost': energy_needed * float(optimal_period['rate']),
'will_complete_by': (optimal_period['start_time'] +
timedelta(hours=charging_duration_hours)).isoformat()
}
def _calculate_distance(self, point1: tuple, point2: tuple) -> float:
"""Calculate distance between two points"""
from math import radians, sin, cos, sqrt, atan2
lat1, lon1 = radians(point1[0]), radians(point1[1])
lat2, lon2 = radians(point2[0]), radians(point2[1])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return 6371 * c # Earth radius in km
def _get_electricity_rates(self, date: datetime) -> List[dict]:
"""Get time-of-use electricity rates"""
# Simplified rate schedule
# Off-peak: 11 PM - 7 AM
# Peak: 2 PM - 8 PM
# Mid-peak: all other times
return [
{
'start_time': date.replace(hour=23, minute=0),
'end_time': date.replace(hour=7, minute=0) + timedelta(days=1),
'rate': Decimal('0.08') # $0.08/kWh
},
{
'start_time': date.replace(hour=14, minute=0),
'end_time': date.replace(hour=20, minute=0),
'rate': Decimal('0.25') # $0.25/kWh
}
]
❌ 无远程信息处理或 GPS 跟踪 ❌ 仅进行被动维护 ❌ 手动路线规划 ❌ 忽略驾驶员行为数据 ❌ 无车辆诊断 ❌ 燃油管理不善 ❌ 网络安全不足 ❌ 无 OTA 更新能力 ❌ 电动汽车充电效率低下
每周安装次数
111
代码仓库
GitHub 星标数
11
首次出现
2026年1月24日
安全审计
安装于
opencode101
codex100
gemini-cli96
github-copilot94
cursor91
amp89
Expert guidance for automotive systems, connected vehicles, fleet management, telematics, advanced driver assistance systems (ADAS), and automotive software development.
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
from decimal import Decimal
from enum import Enum
import numpy as np
class VehicleStatus(Enum):
ACTIVE = "active"
IDLE = "idle"
MAINTENANCE = "maintenance"
OUT_OF_SERVICE = "out_of_service"
class FuelType(Enum):
GASOLINE = "gasoline"
DIESEL = "diesel"
ELECTRIC = "electric"
HYBRID = "hybrid"
CNG = "cng"
@dataclass
class Vehicle:
"""Fleet vehicle information"""
vehicle_id: str
vin: str # Vehicle Identification Number
make: str
model: str
year: int
license_plate: str
fuel_type: FuelType
status: VehicleStatus
odometer_km: int
last_service_km: int
next_service_km: int
assigned_driver_id: Optional[str]
location: tuple # (latitude, longitude)
fuel_level_percent: float
@dataclass
class Trip:
"""Vehicle trip record"""
trip_id: str
vehicle_id: str
driver_id: str
start_time: datetime
end_time: Optional[datetime]
start_location: tuple
end_location: Optional[tuple]
distance_km: float
fuel_consumed_liters: float
average_speed_kmh: float
max_speed_kmh: float
harsh_braking_count: int
harsh_acceleration_count: int
class FleetManagementSystem:
"""Fleet management and telematics system"""
def __init__(self):
self.vehicles = {}
self.trips = []
self.maintenance_schedules = []
def track_vehicle_location(self, vehicle_id: str) -> dict:
"""Track real-time vehicle location"""
vehicle = self.vehicles.get(vehicle_id)
if not vehicle:
return {'error': 'Vehicle not found'}
# Get GPS data from telematics device
location = self._get_gps_location(vehicle_id)
speed = self._get_current_speed(vehicle_id)
heading = self._get_heading(vehicle_id)
vehicle.location = location
return {
'vehicle_id': vehicle_id,
'location': {
'latitude': location[0],
'longitude': location[1]
},
'speed_kmh': speed,
'heading': heading,
'timestamp': datetime.now().isoformat(),
'status': vehicle.status.value
}
def start_trip(self, vehicle_id: str, driver_id: str) -> Trip:
"""Start a new trip"""
vehicle = self.vehicles.get(vehicle_id)
if not vehicle:
raise ValueError("Vehicle not found")
trip = Trip(
trip_id=self._generate_trip_id(),
vehicle_id=vehicle_id,
driver_id=driver_id,
start_time=datetime.now(),
end_time=None,
start_location=vehicle.location,
end_location=None,
distance_km=0.0,
fuel_consumed_liters=0.0,
average_speed_kmh=0.0,
max_speed_kmh=0.0,
harsh_braking_count=0,
harsh_acceleration_count=0
)
vehicle.status = VehicleStatus.ACTIVE
self.trips.append(trip)
return trip
def end_trip(self, trip_id: str) -> dict:
"""End trip and calculate metrics"""
trip = next((t for t in self.trips if t.trip_id == trip_id), None)
if not trip:
return {'error': 'Trip not found'}
vehicle = self.vehicles.get(trip.vehicle_id)
trip.end_time = datetime.now()
trip.end_location = vehicle.location
# Calculate trip metrics
duration_hours = (trip.end_time - trip.start_time).total_seconds() / 3600
trip.average_speed_kmh = trip.distance_km / duration_hours if duration_hours > 0 else 0
# Calculate fuel efficiency
fuel_efficiency = trip.distance_km / trip.fuel_consumed_liters if trip.fuel_consumed_liters > 0 else 0
# Calculate driver score
driver_score = self._calculate_driver_score(trip)
vehicle.status = VehicleStatus.IDLE
return {
'trip_id': trip_id,
'duration_hours': duration_hours,
'distance_km': trip.distance_km,
'fuel_consumed': trip.fuel_consumed_liters,
'fuel_efficiency_km_per_liter': fuel_efficiency,
'average_speed': trip.average_speed_kmh,
'max_speed': trip.max_speed_kmh,
'harsh_events': trip.harsh_braking_count + trip.harsh_acceleration_count,
'driver_score': driver_score
}
def _calculate_driver_score(self, trip: Trip) -> float:
"""Calculate driver safety score"""
score = 100.0
# Penalize harsh events
score -= trip.harsh_braking_count * 5
score -= trip.harsh_acceleration_count * 5
# Penalize speeding
if trip.max_speed_kmh > 120:
score -= (trip.max_speed_kmh - 120) * 0.5
# Penalize low fuel efficiency
# Implementation would compare to vehicle baseline
return max(0.0, min(100.0, score))
def schedule_maintenance(self, vehicle_id: str) -> dict:
"""Schedule vehicle maintenance"""
vehicle = self.vehicles.get(vehicle_id)
if not vehicle:
return {'error': 'Vehicle not found'}
# Check if maintenance is due
km_since_service = vehicle.odometer_km - vehicle.last_service_km
km_until_service = vehicle.next_service_km - vehicle.odometer_km
if km_until_service <= 1000: # Within 1000km of service
maintenance_type = self._determine_maintenance_type(km_since_service)
schedule = {
'vehicle_id': vehicle_id,
'maintenance_type': maintenance_type,
'current_odometer': vehicle.odometer_km,
'recommended_by_odometer': vehicle.next_service_km,
'urgency': 'high' if km_until_service <= 500 else 'medium',
'estimated_cost': self._estimate_maintenance_cost(maintenance_type)
}
self.maintenance_schedules.append(schedule)
return schedule
return {
'vehicle_id': vehicle_id,
'maintenance_required': False,
'km_until_service': km_until_service
}
def optimize_routes(self, deliveries: List[dict]) -> dict:
"""Optimize delivery routes for fleet"""
# Simplified route optimization
# In production, would use sophisticated algorithms (TSP, VRP)
available_vehicles = [
v for v in self.vehicles.values()
if v.status == VehicleStatus.IDLE
]
if not available_vehicles:
return {'error': 'No available vehicles'}
# Assign deliveries to vehicles
assignments = []
for i, delivery in enumerate(deliveries):
vehicle = available_vehicles[i % len(available_vehicles)]
route = self._calculate_route(
vehicle.location,
delivery['destination']
)
assignments.append({
'vehicle_id': vehicle.vehicle_id,
'delivery_id': delivery['delivery_id'],
'route': route,
'estimated_distance_km': route['distance'],
'estimated_time_minutes': route['duration'],
'estimated_fuel_cost': self._estimate_fuel_cost(
route['distance'],
vehicle.fuel_type
)
})
return {
'total_deliveries': len(deliveries),
'vehicles_assigned': len(set(a['vehicle_id'] for a in assignments)),
'assignments': assignments,
'total_distance_km': sum(a['estimated_distance_km'] for a in assignments),
'total_estimated_cost': sum(a['estimated_fuel_cost'] for a in assignments)
}
def analyze_fleet_utilization(self) -> dict:
"""Analyze fleet utilization and efficiency"""
total_vehicles = len(self.vehicles)
active = sum(1 for v in self.vehicles.values() if v.status == VehicleStatus.ACTIVE)
idle = sum(1 for v in self.vehicles.values() if v.status == VehicleStatus.IDLE)
maintenance = sum(1 for v in self.vehicles.values() if v.status == VehicleStatus.MAINTENANCE)
utilization_rate = (active / total_vehicles * 100) if total_vehicles > 0 else 0
# Calculate average fuel efficiency
recent_trips = self.trips[-100:] # Last 100 trips
if recent_trips:
avg_fuel_efficiency = np.mean([
t.distance_km / t.fuel_consumed_liters
for t in recent_trips
if t.fuel_consumed_liters > 0
])
else:
avg_fuel_efficiency = 0
return {
'total_vehicles': total_vehicles,
'status_breakdown': {
'active': active,
'idle': idle,
'maintenance': maintenance,
'out_of_service': total_vehicles - active - idle - maintenance
},
'utilization_rate': utilization_rate,
'average_fuel_efficiency': avg_fuel_efficiency,
'recommendation': 'Reduce fleet size' if utilization_rate < 60 else
'Expand fleet' if utilization_rate > 90 else
'Optimal'
}
def _determine_maintenance_type(self, km_since_service: int) -> str:
"""Determine type of maintenance required"""
if km_since_service >= 100000:
return "major_service"
elif km_since_service >= 50000:
return "intermediate_service"
else:
return "routine_service"
def _estimate_maintenance_cost(self, maintenance_type: str) -> Decimal:
"""Estimate maintenance cost"""
costs = {
'routine_service': Decimal('150'),
'intermediate_service': Decimal('500'),
'major_service': Decimal('1500')
}
return costs.get(maintenance_type, Decimal('200'))
def _calculate_route(self, start: tuple, end: tuple) -> dict:
"""Calculate route between two points"""
# Would use routing API (Google Maps, Mapbox, etc.)
# Simplified calculation
from math import radians, sin, cos, sqrt, atan2
lat1, lon1 = radians(start[0]), radians(start[1])
lat2, lon2 = radians(end[0]), radians(end[1])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
distance_km = 6371 * c # Earth radius in km
return {
'distance': distance_km,
'duration': distance_km / 60 * 60 # Assume 60 km/h average, return minutes
}
def _estimate_fuel_cost(self, distance_km: float, fuel_type: FuelType) -> Decimal:
"""Estimate fuel cost for trip"""
fuel_prices = {
FuelType.GASOLINE: Decimal('1.50'), # per liter
FuelType.DIESEL: Decimal('1.40'),
FuelType.ELECTRIC: Decimal('0.30'), # per kWh equivalent
FuelType.HYBRID: Decimal('1.20'),
FuelType.CNG: Decimal('1.00')
}
fuel_efficiency = 8.0 # km per liter (average)
fuel_needed = distance_km / fuel_efficiency
fuel_price = fuel_prices.get(fuel_type, Decimal('1.50'))
return Decimal(str(fuel_needed)) * fuel_price
def _get_gps_location(self, vehicle_id: str) -> tuple:
"""Get GPS location from telematics device"""
# Implementation would connect to telematics API
return (40.7128, -74.0060) # Placeholder
def _get_current_speed(self, vehicle_id: str) -> float:
"""Get current vehicle speed"""
return np.random.uniform(0, 100) # Placeholder
def _get_heading(self, vehicle_id: str) -> float:
"""Get vehicle heading in degrees"""
return np.random.uniform(0, 360) # Placeholder
def _generate_trip_id(self) -> str:
import uuid
return f"TRIP-{uuid.uuid4().hex[:10].upper()}"
@dataclass
class VehicleTelemetry:
"""Real-time vehicle telemetry data"""
vehicle_id: str
timestamp: datetime
location: tuple
speed_kmh: float
rpm: int
engine_temp_c: float
battery_voltage: float
fuel_level_percent: float
odometer_km: int
dtc_codes: List[str] # Diagnostic Trouble Codes
class ConnectedVehiclePlatform:
"""Connected car platform with OTA updates"""
def __init__(self):
self.vehicles = {}
self.telemetry_buffer = []
self.ota_updates = {}
def process_telemetry(self, telemetry: VehicleTelemetry) -> dict:
"""Process incoming telemetry data"""
self.telemetry_buffer.append(telemetry)
# Analyze telemetry for anomalies
alerts = []
# Check engine temperature
if telemetry.engine_temp_c > 110:
alerts.append({
'type': 'high_engine_temp',
'severity': 'warning',
'value': telemetry.engine_temp_c,
'message': 'Engine temperature above normal'
})
# Check battery voltage
if telemetry.battery_voltage < 12.0:
alerts.append({
'type': 'low_battery',
'severity': 'warning',
'value': telemetry.battery_voltage,
'message': 'Battery voltage low'
})
# Check for diagnostic trouble codes
if telemetry.dtc_codes:
alerts.append({
'type': 'dtc_codes',
'severity': 'critical',
'codes': telemetry.dtc_codes,
'message': f'{len(telemetry.dtc_codes)} diagnostic code(s) detected'
})
# Check for harsh driving
if len(self.telemetry_buffer) >= 2:
prev = self.telemetry_buffer[-2]
if telemetry.vehicle_id == prev.vehicle_id:
time_diff = (telemetry.timestamp - prev.timestamp).total_seconds()
if time_diff > 0:
acceleration = (telemetry.speed_kmh - prev.speed_kmh) / time_diff
if abs(acceleration) > 5: # > 5 km/h per second
alerts.append({
'type': 'harsh_driving',
'severity': 'info',
'acceleration': acceleration,
'message': 'Harsh acceleration/braking detected'
})
return {
'vehicle_id': telemetry.vehicle_id,
'timestamp': telemetry.timestamp.isoformat(),
'alerts': alerts,
'health_score': self._calculate_vehicle_health(telemetry)
}
def deploy_ota_update(self,
vehicle_ids: List[str],
update_package: dict) -> dict:
"""Deploy over-the-air software update"""
update_id = self._generate_update_id()
ota_update = {
'update_id': update_id,
'version': update_package['version'],
'description': update_package['description'],
'package_size_mb': update_package['size_mb'],
'target_vehicles': vehicle_ids,
'deployed_at': datetime.now(),
'status_by_vehicle': {}
}
for vehicle_id in vehicle_ids:
# Schedule update for vehicle
ota_update['status_by_vehicle'][vehicle_id] = {
'status': 'scheduled',
'download_progress': 0,
'install_progress': 0
}
self.ota_updates[update_id] = ota_update
return {
'update_id': update_id,
'vehicles_targeted': len(vehicle_ids),
'estimated_completion': 'Within 48 hours'
}
def diagnose_vehicle(self, vehicle_id: str, dtc_codes: List[str]) -> dict:
"""Diagnose vehicle issues from DTC codes"""
diagnoses = []
for code in dtc_codes:
diagnosis = self._lookup_dtc_code(code)
diagnoses.append(diagnosis)
# Calculate severity
max_severity = max(d['severity'] for d in diagnoses)
return {
'vehicle_id': vehicle_id,
'dtc_codes': dtc_codes,
'diagnoses': diagnoses,
'overall_severity': max_severity,
'service_recommended': max_severity in ['high', 'critical']
}
def _calculate_vehicle_health(self, telemetry: VehicleTelemetry) -> float:
"""Calculate overall vehicle health score"""
score = 100.0
# Engine temperature
if telemetry.engine_temp_c > 110:
score -= 15
elif telemetry.engine_temp_c > 100:
score -= 5
# Battery voltage
if telemetry.battery_voltage < 11.5:
score -= 20
elif telemetry.battery_voltage < 12.0:
score -= 10
# DTC codes
score -= len(telemetry.dtc_codes) * 15
return max(0.0, score)
def _lookup_dtc_code(self, code: str) -> dict:
"""Lookup diagnostic trouble code"""
# Simplified DTC lookup
# In production, would use comprehensive OBD-II code database
dtc_database = {
'P0171': {
'description': 'System Too Lean (Bank 1)',
'severity': 'medium',
'possible_causes': ['Vacuum leak', 'Faulty MAF sensor', 'Fuel filter clogged']
},
'P0300': {
'description': 'Random/Multiple Cylinder Misfire Detected',
'severity': 'high',
'possible_causes': ['Faulty spark plugs', 'Ignition coil failure', 'Fuel injector issue']
}
}
return dtc_database.get(code, {
'description': f'Unknown code: {code}',
'severity': 'medium',
'possible_causes': ['Requires diagnostic scan']
})
def _generate_update_id(self) -> str:
import uuid
return f"OTA-{uuid.uuid4().hex[:8].upper()}"
class ElectricVehicleManagement:
"""EV-specific management functions"""
def __init__(self):
self.charging_stations = {}
self.charging_sessions = []
def calculate_range(self,
battery_capacity_kwh: float,
battery_soc_percent: float,
consumption_kwh_per_km: float) -> dict:
"""Calculate remaining range for EV"""
available_energy = battery_capacity_kwh * (battery_soc_percent / 100)
range_km = available_energy / consumption_kwh_per_km
# Adjust for temperature (simplified)
# Cold weather reduces range by up to 40%
temperature_factor = 0.8 # Assume moderate conditions
adjusted_range = range_km * temperature_factor
return {
'nominal_range_km': range_km,
'adjusted_range_km': adjusted_range,
'battery_soc_percent': battery_soc_percent,
'available_energy_kwh': available_energy
}
def find_charging_stations(self,
current_location: tuple,
max_distance_km: float) -> List[dict]:
"""Find nearby charging stations"""
nearby_stations = []
for station_id, station in self.charging_stations.items():
distance = self._calculate_distance(current_location, station['location'])
if distance <= max_distance_km:
nearby_stations.append({
'station_id': station_id,
'name': station['name'],
'location': station['location'],
'distance_km': distance,
'available_chargers': station['available_chargers'],
'charging_speed_kw': station['max_power_kw'],
'cost_per_kwh': station['cost_per_kwh']
})
# Sort by distance
nearby_stations.sort(key=lambda x: x['distance_km'])
return nearby_stations
def optimize_charging_schedule(self,
battery_capacity_kwh: float,
current_soc_percent: float,
target_soc_percent: float,
departure_time: datetime) -> dict:
"""Optimize EV charging schedule based on electricity rates"""
energy_needed = battery_capacity_kwh * ((target_soc_percent - current_soc_percent) / 100)
# Get electricity rate schedule
rate_schedule = self._get_electricity_rates(departure_time)
# Find lowest rate period
optimal_period = min(rate_schedule, key=lambda x: x['rate'])
charging_duration_hours = energy_needed / 7.0 # Assume 7kW home charger
return {
'energy_needed_kwh': energy_needed,
'optimal_start_time': optimal_period['start_time'].isoformat(),
'charging_duration_hours': charging_duration_hours,
'estimated_cost': energy_needed * float(optimal_period['rate']),
'will_complete_by': (optimal_period['start_time'] +
timedelta(hours=charging_duration_hours)).isoformat()
}
def _calculate_distance(self, point1: tuple, point2: tuple) -> float:
"""Calculate distance between two points"""
from math import radians, sin, cos, sqrt, atan2
lat1, lon1 = radians(point1[0]), radians(point1[1])
lat2, lon2 = radians(point2[0]), radians(point2[1])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return 6371 * c # Earth radius in km
def _get_electricity_rates(self, date: datetime) -> List[dict]:
"""Get time-of-use electricity rates"""
# Simplified rate schedule
# Off-peak: 11 PM - 7 AM
# Peak: 2 PM - 8 PM
# Mid-peak: all other times
return [
{
'start_time': date.replace(hour=23, minute=0),
'end_time': date.replace(hour=7, minute=0) + timedelta(days=1),
'rate': Decimal('0.08') # $0.08/kWh
},
{
'start_time': date.replace(hour=14, minute=0),
'end_time': date.replace(hour=20, minute=0),
'rate': Decimal('0.25') # $0.25/kWh
}
]
❌ No telematics or GPS tracking ❌ Reactive maintenance only ❌ Manual route planning ❌ Ignoring driver behavior data ❌ No vehicle diagnostics ❌ Poor fuel management ❌ Inadequate cybersecurity ❌ No OTA update capability ❌ Inefficient EV charging
Weekly Installs
111
Repository
GitHub Stars
11
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
opencode101
codex100
gemini-cli96
github-copilot94
cursor91
amp89
Skills CLI 使用指南:AI Agent 技能包管理器安装与管理教程
46,600 周安装
Box自动化工具包:通过Rube MCP实现文件上传、搜索、文件夹管理
69 周安装
Datadog自动化监控:通过Rube MCP与Composio实现指标、日志、仪表板管理
69 周安装
Intercom自动化指南:通过Rube MCP与Composio实现客户支持对话管理
69 周安装
二进制初步分析指南:使用ReVa工具快速识别恶意软件与逆向工程
69 周安装
PrivateInvestigator 道德人员查找工具 | 公开数据调查、反向搜索与背景研究
69 周安装
TorchTitan:PyTorch原生分布式大语言模型预训练平台,支持4D并行与H100 GPU加速
69 周安装