Skip to content

Commit 35b99bb

Browse files
committed
recoveries: Add Ambarella recovery implementation
Implements recovery flow for Ambarella devices: - ADS script parser for DRAM configuration - DRAM initialization support - Bootloader loading and execution - Firmware flashing with proper addressing - Complete recovery process handling
1 parent 6bfddc2 commit 35b99bb

File tree

1 file changed

+198
-0
lines changed

1 file changed

+198
-0
lines changed
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
"""Ambarella recovery implementation."""
2+
3+
import re
4+
import time
5+
from dataclasses import dataclass
6+
from enum import IntEnum
7+
from pathlib import Path
8+
from typing import List, Optional, Tuple
9+
10+
from ..firmware.amba_fw import AmbaFirmware, AmbaFirmwareInfo
11+
from ..protocols.amba import AmbaCommand, AmbaProtocol, AmbaUsbError
12+
from ..usb import UsbDevice
13+
14+
class AdsCommandType(IntEnum):
15+
"""ADS script command types."""
16+
INVALID = 0
17+
WRITE = 1
18+
READ = 2
19+
POLL = 3
20+
USLEEP = 4
21+
SLEEP = 5
22+
23+
@dataclass
24+
class AdsCommand:
25+
"""ADS script command."""
26+
type: AdsCommandType
27+
addr: int = 0
28+
data: int = 0
29+
mask: int = 0
30+
31+
class AdsParser:
32+
"""Parser for Ambarella ADS scripts."""
33+
34+
def parse(self, script: str) -> List[AdsCommand]:
35+
"""Parse ADS script into commands.
36+
37+
Args:
38+
script: ADS script content
39+
40+
Returns:
41+
List of parsed commands
42+
"""
43+
commands = []
44+
45+
# Split into lines and process each
46+
for line in script.splitlines():
47+
line = line.strip()
48+
49+
# Skip empty lines and comments
50+
if not line or line.startswith('#'):
51+
continue
52+
53+
# Parse command
54+
if cmd := self._parse_line(line):
55+
commands.append(cmd)
56+
57+
return commands
58+
59+
def _parse_line(self, line: str) -> Optional[AdsCommand]:
60+
"""Parse single ADS script line.
61+
62+
Args:
63+
line: Script line
64+
65+
Returns:
66+
Parsed command or None if invalid
67+
"""
68+
# Write command: w addr data [mask]
69+
if m := re.match(r'w\s+0x([0-9a-f]+)\s+0x([0-9a-f]+)(?:\s+0x([0-9a-f]+))?', line, re.I):
70+
addr = int(m.group(1), 16)
71+
data = int(m.group(2), 16)
72+
mask = int(m.group(3), 16) if m.group(3) else 0xFFFFFFFF
73+
return AdsCommand(AdsCommandType.WRITE, addr, data, mask)
74+
75+
# Read command: r addr
76+
if m := re.match(r'r\s+0x([0-9a-f]+)', line, re.I):
77+
addr = int(m.group(1), 16)
78+
return AdsCommand(AdsCommandType.READ, addr)
79+
80+
# Poll command: p addr data mask
81+
if m := re.match(r'p\s+0x([0-9a-f]+)\s+0x([0-9a-f]+)\s+0x([0-9a-f]+)', line, re.I):
82+
addr = int(m.group(1), 16)
83+
data = int(m.group(2), 16)
84+
mask = int(m.group(3), 16)
85+
return AdsCommand(AdsCommandType.POLL, addr, data, mask)
86+
87+
# Sleep commands: sleep/usleep time
88+
if m := re.match(r'(sleep|usleep)\s+(\d+)', line):
89+
cmd_type = AdsCommandType.SLEEP if m.group(1) == 'sleep' else AdsCommandType.USLEEP
90+
time_val = int(m.group(2))
91+
return AdsCommand(cmd_type, data=time_val)
92+
93+
return None
94+
95+
class AmbarellaRecovery:
96+
"""Ambarella device recovery implementation."""
97+
98+
def __init__(self, device: UsbDevice):
99+
"""Initialize recovery handler.
100+
101+
Args:
102+
device: USB device to recover
103+
"""
104+
self.device = device
105+
self.protocol = AmbaProtocol(device)
106+
self.firmware = AmbaFirmware()
107+
self._ads_parser = AdsParser()
108+
109+
def _execute_ads_commands(self, commands: List[AdsCommand]) -> None:
110+
"""Execute ADS script commands.
111+
112+
Args:
113+
commands: Commands to execute
114+
115+
Raises:
116+
AmbaUsbError: On execution error
117+
"""
118+
for cmd in commands:
119+
if cmd.type == AdsCommandType.WRITE:
120+
# Send write command
121+
self.protocol.send_command(AmbaCommand.RDY_TO_RCV, cmd.addr)
122+
self.protocol.send_command(AmbaCommand.RCV_DATA, cmd.data)
123+
124+
elif cmd.type == AdsCommandType.POLL:
125+
# Poll until value matches or timeout
126+
timeout = time.time() + 5.0 # 5 second timeout
127+
while time.time() < timeout:
128+
rsp, val, _ = self.protocol.send_command(
129+
AmbaCommand.INQUIRY_STATUS, cmd.addr)
130+
if (val & cmd.mask) == (cmd.data & cmd.mask):
131+
break
132+
time.sleep(0.001)
133+
134+
elif cmd.type in (AdsCommandType.SLEEP, AdsCommandType.USLEEP):
135+
# Sleep for specified time
136+
sleep_time = cmd.data / 1_000_000 if cmd.type == AdsCommandType.USLEEP else cmd.data
137+
time.sleep(sleep_time)
138+
139+
def initialize_dram(self, dram_script: Path) -> None:
140+
"""Initialize device DRAM.
141+
142+
Args:
143+
dram_script: Path to DRAM initialization script
144+
145+
Raises:
146+
AmbaUsbError: On initialization error
147+
"""
148+
# Load and parse DRAM script
149+
self.firmware.dram_script_path = dram_script
150+
self.firmware.load()
151+
152+
commands = self._ads_parser.parse(self.firmware.dram_script)
153+
self._execute_ads_commands(commands)
154+
155+
def load_bootloader(self, bootloader: Path) -> None:
156+
"""Load bootloader to device.
157+
158+
Args:
159+
bootloader: Path to bootloader binary
160+
161+
Raises:
162+
AmbaUsbError: On loading error
163+
"""
164+
# Load bootloader
165+
self.firmware.bootloader_path = bootloader
166+
self.firmware.load()
167+
168+
# Send bootloader
169+
self.protocol.send_file(0x0, self.firmware.bootloader)
170+
171+
# Send board info
172+
board_info = AmbaFirmware.pack_board_info()
173+
self.protocol.send_file(AmbaFirmware.BOARD_INFO_ADDR, board_info)
174+
175+
def flash_firmware(self, firmware: Path) -> None:
176+
"""Flash firmware to device.
177+
178+
Args:
179+
firmware: Path to firmware file
180+
181+
Raises:
182+
AmbaUsbError: On flashing error
183+
"""
184+
# Get firmware info
185+
fw_info = AmbaFirmware.get_firmware_info(firmware)
186+
187+
# Send firmware info
188+
fw_info_data = AmbaFirmware.pack_firmware_info(fw_info)
189+
self.protocol.send_file(AmbaFirmware.FW_INFO_ADDR, fw_info_data)
190+
191+
# Send board info again
192+
board_info = AmbaFirmware.pack_board_info()
193+
self.protocol.send_file(AmbaFirmware.BOARD_INFO_ADDR, board_info)
194+
195+
# Send firmware
196+
with open(firmware, 'rb') as f:
197+
fw_data = f.read()
198+
self.protocol.send_file(fw_info.memfw_prog_addr, fw_data)

0 commit comments

Comments
 (0)