|
10 | 10 | from pyinfra.api.connect import connect_all |
11 | 11 | from pyinfra.api.exceptions import ConnectError, PyinfraError |
12 | 12 | from pyinfra.context import ctx_state |
| 13 | +from pyinfra.connectors import ssh |
13 | 14 |
|
14 | 15 | from ..util import make_inventory |
15 | 16 |
|
@@ -119,6 +120,94 @@ def test_connect_with_rsa_ssh_key(self): |
119 | 120 |
|
120 | 121 | connect_all(second_state) |
121 | 122 |
|
| 123 | + def test_retry_paramiko_agent_keys_single_key(self): |
| 124 | + connector = ssh.SSHConnector.__new__(ssh.SSHConnector) |
| 125 | + connector.client = mock.Mock() |
| 126 | + |
| 127 | + attempts = [] |
| 128 | + connect_outcomes = [None] |
| 129 | + |
| 130 | + def make_client(): |
| 131 | + client = mock.Mock() |
| 132 | + |
| 133 | + def fake_connect(hostname, **kwargs): |
| 134 | + attempts.append(dict(kwargs)) |
| 135 | + outcome = connect_outcomes.pop(0) |
| 136 | + if isinstance(outcome, Exception): |
| 137 | + raise outcome |
| 138 | + |
| 139 | + client.connect.side_effect = fake_connect |
| 140 | + client.close = mock.Mock() |
| 141 | + return client |
| 142 | + |
| 143 | + with ( |
| 144 | + mock.patch("pyinfra.connectors.ssh.Agent") as fake_agent, |
| 145 | + mock.patch("pyinfra.connectors.ssh.SSHClient", side_effect=make_client), |
| 146 | + ): |
| 147 | + fake_agent.return_value.get_keys.return_value = ["key-one"] |
| 148 | + |
| 149 | + result = connector._retry_paramiko_agent_keys( |
| 150 | + "host", |
| 151 | + {"allow_agent": True}, |
| 152 | + SSHException("No existing session"), |
| 153 | + ) |
| 154 | + |
| 155 | + self.assertTrue(result) |
| 156 | + self.assertEqual( |
| 157 | + attempts, |
| 158 | + [ |
| 159 | + {"allow_agent": False, "pkey": "key-one"}, |
| 160 | + ], |
| 161 | + ) |
| 162 | + |
| 163 | + def test_retry_paramiko_agent_keys_returns_false_without_keys(self): |
| 164 | + connector = ssh.SSHConnector.__new__(ssh.SSHConnector) |
| 165 | + connector.client = mock.Mock() |
| 166 | + |
| 167 | + with mock.patch("pyinfra.connectors.ssh.Agent") as fake_agent: |
| 168 | + fake_agent.return_value.get_keys.return_value = [] |
| 169 | + |
| 170 | + result = connector._retry_paramiko_agent_keys( |
| 171 | + "host", |
| 172 | + {"allow_agent": True}, |
| 173 | + SSHException("No existing session"), |
| 174 | + ) |
| 175 | + |
| 176 | + self.assertFalse(result) |
| 177 | + |
| 178 | + @mock.patch("pyinfra.connectors.ssh.Agent") |
| 179 | + def test_connect_retries_agent_keys_after_paramiko_failure(self, fake_agent): |
| 180 | + key_one = mock.Mock(name="agent-key-1") |
| 181 | + key_two = mock.Mock(name="agent-key-2") |
| 182 | + fake_agent.return_value.get_keys.return_value = [key_one, key_two] |
| 183 | + |
| 184 | + connect_calls = [] |
| 185 | + |
| 186 | + def fake_connect(hostname, **kwargs): |
| 187 | + connect_calls.append((hostname, dict(kwargs))) |
| 188 | + if len(connect_calls) == 1: |
| 189 | + raise SSHException("No existing session") |
| 190 | + |
| 191 | + self.fake_connect_mock.side_effect = fake_connect |
| 192 | + |
| 193 | + inventory = make_inventory(hosts=("somehost",)) |
| 194 | + state = State(inventory, Config()) |
| 195 | + |
| 196 | + connect_all(state) |
| 197 | + |
| 198 | + self.assertEqual(len(state.active_hosts), 1) |
| 199 | + self.assertEqual(len(connect_calls), 2) |
| 200 | + |
| 201 | + first_hostname, first_kwargs = connect_calls[0] |
| 202 | + self.assertEqual(first_hostname, "somehost") |
| 203 | + self.assertTrue(first_kwargs.get("allow_agent")) |
| 204 | + self.assertNotIn("pkey", first_kwargs) |
| 205 | + |
| 206 | + second_hostname, second_kwargs = connect_calls[1] |
| 207 | + self.assertEqual(second_hostname, "somehost") |
| 208 | + self.assertFalse(second_kwargs.get("allow_agent")) |
| 209 | + self.assertIs(second_kwargs.get("pkey"), key_two) |
| 210 | + |
122 | 211 | def test_connect_with_rsa_ssh_key_password(self): |
123 | 212 | state = State( |
124 | 213 | make_inventory( |
|
0 commit comments