Skip to content

Commit 70e1964

Browse files
committed
templates: Add Ambarella CV22 device template
Adds recovery configuration template for CV22: - Device identification (VID/PID) - Required firmware files - Recovery steps and flow - Documentation and notes
1 parent 47ca340 commit 70e1964

File tree

2 files changed

+245
-177
lines changed

2 files changed

+245
-177
lines changed

src/snagrecover/recoveries/ambarella.py

Lines changed: 194 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -11,188 +11,205 @@
1111
from ..protocols.amba import AmbaCommand, AmbaProtocol
1212
from ..usb import UsbDevice
1313

14+
1415
class AdsCommandType(IntEnum):
15-
"""ADS script command types."""
16-
INVALID = 0
17-
WRITE = 1
18-
READ = 2
19-
POLL = 3
20-
USLEEP = 4
21-
SLEEP = 5
16+
"""ADS script command types."""
17+
18+
INVALID = 0
19+
WRITE = 1
20+
READ = 2
21+
POLL = 3
22+
USLEEP = 4
23+
SLEEP = 5
24+
2225

2326
@dataclass
2427
class AdsCommand:
25-
"""ADS script command."""
26-
type: AdsCommandType
27-
addr: int = 0
28-
data: int = 0
29-
mask: int = 0
28+
"""ADS script command."""
29+
30+
type: AdsCommandType
31+
addr: int = 0
32+
data: int = 0
33+
mask: int = 0
34+
3035

3136
class AdsParser:
32-
"""Parser for Ambarella ADS scripts."""
33-
34-
def parse(self, script: str) -> List[AdsCommand]:
35-
"""Parse ADS script into commands.
36-
37-
Args:
38-
script: ADS script content
39-
40-
Returns:
41-
List of parsed commands
42-
"""
43-
commands = []
44-
45-
# Split into lines and process each
46-
for line in script.splitlines():
47-
line = line.strip()
48-
49-
# Skip empty lines and comments
50-
if not line or line.startswith('#'):
51-
continue
52-
53-
# Parse command
54-
if cmd := self._parse_line(line):
55-
commands.append(cmd)
56-
57-
return commands
58-
59-
def _parse_line(self, line: str) -> Optional[AdsCommand]:
60-
"""Parse single ADS script line.
61-
62-
Args:
63-
line: Script line
64-
65-
Returns:
66-
Parsed command or None if invalid
67-
"""
68-
# Write command: w addr data [mask]
69-
if m := re.match(r'w\s+0x([0-9a-f]+)\s+0x([0-9a-f]+)(?:\s+0x([0-9a-f]+))?', line, re.I):
70-
addr = int(m.group(1), 16)
71-
data = int(m.group(2), 16)
72-
mask = int(m.group(3), 16) if m.group(3) else 0xFFFFFFFF
73-
return AdsCommand(AdsCommandType.WRITE, addr, data, mask)
74-
75-
# Read command: r addr
76-
if m := re.match(r'r\s+0x([0-9a-f]+)', line, re.I):
77-
addr = int(m.group(1), 16)
78-
return AdsCommand(AdsCommandType.READ, addr)
79-
80-
# Poll command: p addr data mask
81-
if m := re.match(r'p\s+0x([0-9a-f]+)\s+0x([0-9a-f]+)\s+0x([0-9a-f]+)', line, re.I):
82-
addr = int(m.group(1), 16)
83-
data = int(m.group(2), 16)
84-
mask = int(m.group(3), 16)
85-
return AdsCommand(AdsCommandType.POLL, addr, data, mask)
86-
87-
# Sleep commands: sleep/usleep time
88-
if m := re.match(r'(sleep|usleep)\s+(\d+)', line):
89-
cmd_type = AdsCommandType.SLEEP if m.group(1) == 'sleep' else AdsCommandType.USLEEP
90-
time_val = int(m.group(2))
91-
return AdsCommand(cmd_type, data=time_val)
92-
93-
return None
37+
"""Parser for Ambarella ADS scripts."""
38+
39+
def parse(self, script: str) -> List[AdsCommand]:
40+
"""Parse ADS script into commands.
41+
42+
Args:
43+
script: ADS script content
44+
45+
Returns:
46+
List of parsed commands
47+
"""
48+
commands = []
49+
50+
# Split into lines and process each
51+
for line in script.splitlines():
52+
line = line.strip()
53+
54+
# Skip empty lines and comments
55+
if not line or line.startswith("#"):
56+
continue
57+
58+
# Parse command
59+
if cmd := self._parse_line(line):
60+
commands.append(cmd)
61+
62+
return commands
63+
64+
def _parse_line(self, line: str) -> Optional[AdsCommand]:
65+
"""Parse single ADS script line.
66+
67+
Args:
68+
line: Script line
69+
70+
Returns:
71+
Parsed command or None if invalid
72+
"""
73+
# Write command: w addr data [mask]
74+
if m := re.match(
75+
r"w\s+0x([0-9a-f]+)\s+0x([0-9a-f]+)(?:\s+0x([0-9a-f]+))?", line, re.I
76+
):
77+
addr = int(m.group(1), 16)
78+
data = int(m.group(2), 16)
79+
mask = int(m.group(3), 16) if m.group(3) else 0xFFFFFFFF
80+
return AdsCommand(AdsCommandType.WRITE, addr, data, mask)
81+
82+
# Read command: r addr
83+
if m := re.match(r"r\s+0x([0-9a-f]+)", line, re.I):
84+
addr = int(m.group(1), 16)
85+
return AdsCommand(AdsCommandType.READ, addr)
86+
87+
# Poll command: p addr data mask
88+
if m := re.match(
89+
r"p\s+0x([0-9a-f]+)\s+0x([0-9a-f]+)\s+0x([0-9a-f]+)", line, re.I
90+
):
91+
addr = int(m.group(1), 16)
92+
data = int(m.group(2), 16)
93+
mask = int(m.group(3), 16)
94+
return AdsCommand(AdsCommandType.POLL, addr, data, mask)
95+
96+
# Sleep commands: sleep/usleep time
97+
if m := re.match(r"(sleep|usleep)\s+(\d+)", line):
98+
cmd_type = (
99+
AdsCommandType.SLEEP if m.group(1) == "sleep" else AdsCommandType.USLEEP
100+
)
101+
time_val = int(m.group(2))
102+
return AdsCommand(cmd_type, data=time_val)
103+
104+
return None
105+
94106

95107
class AmbarellaRecovery:
96-
"""Ambarella device recovery implementation."""
97-
98-
def __init__(self, device: UsbDevice):
99-
"""Initialize recovery handler.
100-
101-
Args:
102-
device: USB device to recover
103-
"""
104-
self.device = device
105-
self.protocol = AmbaProtocol(device)
106-
self.firmware = AmbaFirmware()
107-
self._ads_parser = AdsParser()
108-
109-
def _execute_ads_commands(self, commands: List[AdsCommand]) -> None:
110-
"""Execute ADS script commands.
111-
112-
Args:
113-
commands: Commands to execute
114-
115-
Raises:
116-
AmbaUsbError: On execution error
117-
"""
118-
for cmd in commands:
119-
if cmd.type == AdsCommandType.WRITE:
120-
# Send write command
121-
self.protocol.send_command(AmbaCommand.RDY_TO_RCV, cmd.addr)
122-
self.protocol.send_command(AmbaCommand.RCV_DATA, cmd.data)
123-
124-
elif cmd.type == AdsCommandType.POLL:
125-
# Poll until value matches or timeout
126-
timeout = time.time() + 5.0 # 5 second timeout
127-
while time.time() < timeout:
128-
rsp, val, _ = self.protocol.send_command(
129-
AmbaCommand.INQUIRY_STATUS, cmd.addr)
130-
if (val & cmd.mask) == (cmd.data & cmd.mask):
131-
break
132-
time.sleep(0.001)
133-
134-
elif cmd.type in (AdsCommandType.SLEEP, AdsCommandType.USLEEP):
135-
# Sleep for specified time
136-
sleep_time = cmd.data / 1_000_000 if cmd.type == AdsCommandType.USLEEP else cmd.data
137-
time.sleep(sleep_time)
138-
139-
def initialize_dram(self, dram_script: Path) -> None:
140-
"""Initialize device DRAM.
141-
142-
Args:
143-
dram_script: Path to DRAM initialization script
144-
145-
Raises:
146-
AmbaUsbError: On initialization error
147-
"""
148-
# Load and parse DRAM script
149-
self.firmware.dram_script_path = dram_script
150-
self.firmware.load()
151-
152-
commands = self._ads_parser.parse(self.firmware.dram_script)
153-
self._execute_ads_commands(commands)
154-
155-
def load_bootloader(self, bootloader: Path) -> None:
156-
"""Load bootloader to device.
157-
158-
Args:
159-
bootloader: Path to bootloader binary
160-
161-
Raises:
162-
AmbaUsbError: On loading error
163-
"""
164-
# Load bootloader
165-
self.firmware.bootloader_path = bootloader
166-
self.firmware.load()
167-
168-
# Send bootloader
169-
self.protocol.send_file(0x0, self.firmware.bootloader)
170-
171-
# Send board info
172-
board_info = AmbaFirmware.pack_board_info()
173-
self.protocol.send_file(AmbaFirmware.BOARD_INFO_ADDR, board_info)
174-
175-
def flash_firmware(self, firmware: Path) -> None:
176-
"""Flash firmware to device.
177-
178-
Args:
179-
firmware: Path to firmware file
180-
181-
Raises:
182-
AmbaUsbError: On flashing error
183-
"""
184-
# Get firmware info
185-
fw_info = AmbaFirmware.get_firmware_info(firmware)
186-
187-
# Send firmware info
188-
fw_info_data = AmbaFirmware.pack_firmware_info(fw_info)
189-
self.protocol.send_file(AmbaFirmware.FW_INFO_ADDR, fw_info_data)
190-
191-
# Send board info again
192-
board_info = AmbaFirmware.pack_board_info()
193-
self.protocol.send_file(AmbaFirmware.BOARD_INFO_ADDR, board_info)
194-
195-
# Send firmware
196-
with open(firmware, 'rb') as f:
197-
fw_data = f.read()
198-
self.protocol.send_file(fw_info.memfw_prog_addr, fw_data)
108+
"""Ambarella device recovery implementation."""
109+
110+
def __init__(self, device: UsbDevice):
111+
"""Initialize recovery handler.
112+
113+
Args:
114+
device: USB device to recover
115+
"""
116+
self.device = device
117+
self.protocol = AmbaProtocol(device)
118+
self.firmware = AmbaFirmware()
119+
self._ads_parser = AdsParser()
120+
121+
def _execute_ads_commands(self, commands: List[AdsCommand]) -> None:
122+
"""Execute ADS script commands.
123+
124+
Args:
125+
commands: Commands to execute
126+
127+
Raises:
128+
AmbaUsbError: On execution error
129+
"""
130+
for cmd in commands:
131+
if cmd.type == AdsCommandType.WRITE:
132+
# Send write command
133+
self.protocol.send_command(AmbaCommand.RDY_TO_RCV, cmd.addr)
134+
self.protocol.send_command(AmbaCommand.RCV_DATA, cmd.data)
135+
136+
elif cmd.type == AdsCommandType.POLL:
137+
# Poll until value matches or timeout
138+
timeout = time.time() + 5.0 # 5 second timeout
139+
while time.time() < timeout:
140+
rsp, val, _ = self.protocol.send_command(
141+
AmbaCommand.INQUIRY_STATUS, cmd.addr
142+
)
143+
if (val & cmd.mask) == (cmd.data & cmd.mask):
144+
break
145+
time.sleep(0.001)
146+
147+
elif cmd.type in (AdsCommandType.SLEEP, AdsCommandType.USLEEP):
148+
# Sleep for specified time
149+
sleep_time = (
150+
cmd.data / 1_000_000
151+
if cmd.type == AdsCommandType.USLEEP
152+
else cmd.data
153+
)
154+
time.sleep(sleep_time)
155+
156+
def initialize_dram(self, dram_script: Path) -> None:
157+
"""Initialize device DRAM.
158+
159+
Args:
160+
dram_script: Path to DRAM initialization script
161+
162+
Raises:
163+
AmbaUsbError: On initialization error
164+
"""
165+
# Load and parse DRAM script
166+
self.firmware.dram_script_path = dram_script
167+
self.firmware.load()
168+
169+
commands = self._ads_parser.parse(self.firmware.dram_script)
170+
self._execute_ads_commands(commands)
171+
172+
def load_bootloader(self, bootloader: Path) -> None:
173+
"""Load bootloader to device.
174+
175+
Args:
176+
bootloader: Path to bootloader binary
177+
178+
Raises:
179+
AmbaUsbError: On loading error
180+
"""
181+
# Load bootloader
182+
self.firmware.bootloader_path = bootloader
183+
self.firmware.load()
184+
185+
# Send bootloader
186+
self.protocol.send_file(0x0, self.firmware.bootloader)
187+
188+
# Send board info
189+
board_info = AmbaFirmware.pack_board_info()
190+
self.protocol.send_file(AmbaFirmware.BOARD_INFO_ADDR, board_info)
191+
192+
def flash_firmware(self, firmware: Path) -> None:
193+
"""Flash firmware to device.
194+
195+
Args:
196+
firmware: Path to firmware file
197+
198+
Raises:
199+
AmbaUsbError: On flashing error
200+
"""
201+
# Get firmware info
202+
fw_info = AmbaFirmware.get_firmware_info(firmware)
203+
204+
# Send firmware info
205+
fw_info_data = AmbaFirmware.pack_firmware_info(fw_info)
206+
self.protocol.send_file(AmbaFirmware.FW_INFO_ADDR, fw_info_data)
207+
208+
# Send board info again
209+
board_info = AmbaFirmware.pack_board_info()
210+
self.protocol.send_file(AmbaFirmware.BOARD_INFO_ADDR, board_info)
211+
212+
# Send firmware
213+
with open(firmware, "rb") as f:
214+
fw_data = f.read()
215+
self.protocol.send_file(fw_info.memfw_prog_addr, fw_data)

0 commit comments

Comments
 (0)