Skip to content

Commit b631c4e

Browse files
committed
Added basic functionality for read and write to HUAWEI Object Storage Service (OBS)
1 parent 5a82613 commit b631c4e

File tree

4 files changed

+380
-1
lines changed

4 files changed

+380
-1
lines changed

setup.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ def read(fname):
4242
http_deps = ['requests']
4343
ssh_deps = ['paramiko']
4444
zst_deps = ['zstandard']
45+
obs_deps = ['esdk-obs-python']
4546

46-
all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps
47+
all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps + obs_deps
4748
tests_require = all_deps + [
4849
'moto[server]',
4950
'responses',
@@ -83,6 +84,7 @@ def read(fname):
8384
'webhdfs': http_deps,
8485
'ssh': ssh_deps,
8586
'zst': zst_deps,
87+
'obs': obs_deps,
8688
},
8789
python_requires=">=3.7,<4.0",
8890

smart_open/obs.py

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2024 Sergei Sokolov <[email protected]>
4+
#
5+
# This code is distributed under the terms and conditions
6+
# from the MIT License (MIT).
7+
#
8+
"""Implements file-like objects for reading and writing from/to HUAWEI Object Storage Service (OBS)."""
9+
from __future__ import annotations
10+
11+
import io
12+
import logging
13+
import os
14+
import struct
15+
import sys
16+
from typing import Optional
17+
18+
from smart_open.utils import set_defaults
19+
20+
try:
21+
import obs.client
22+
from obs.searchmethod import get_token
23+
from obs import loadtoken
24+
except ImportError:
25+
MISSING_DEPS = True
26+
27+
import smart_open.bytebuffer
28+
import smart_open.utils
29+
30+
from smart_open import constants
31+
32+
logger = logging.getLogger(__name__)
33+
34+
SCHEMES = ('obs',)
35+
URI_EXAMPLES = (
36+
'obs://bucket_id.server:port/object_key',
37+
)
38+
39+
DEFAULT_CHUNK_SIZE = 65536
40+
DEFAULT_HTTP_PROTOCOL = 'https'
41+
DEFAULT_SECURITY_PROVIDER_POLICY = 'ENV'
42+
43+
default_client_kwargs = {
44+
'security_provider_policy': DEFAULT_SECURITY_PROVIDER_POLICY,
45+
}
46+
47+
48+
def parse_uri(uri_as_string):
49+
split_uri = smart_open.utils.safe_urlsplit(uri_as_string)
50+
assert split_uri.scheme in SCHEMES
51+
52+
bucket_id, server = split_uri.netloc.split('.', 1)
53+
object_key = split_uri.path[1:]
54+
55+
return dict(
56+
scheme=split_uri.scheme,
57+
bucket_id=bucket_id,
58+
object_key=object_key,
59+
server=server,
60+
)
61+
62+
63+
def open_uri(uri, mode, transport_params):
64+
parsed_uri = parse_uri(uri)
65+
kwargs = smart_open.utils.check_kwargs(open, transport_params)
66+
67+
http_protocol = transport_params.get('http_protocol', DEFAULT_HTTP_PROTOCOL)
68+
client_kwargs = {
69+
'server': f'{http_protocol}://{parsed_uri["server"]}',
70+
}
71+
client_kwargs.update(default_client_kwargs)
72+
73+
kwargs['client'] = transport_params.get('client', client_kwargs)
74+
75+
default_kwarg = {
76+
'use_client_write_mode':
77+
os.environ.get('SMART_OPEN_OBS_USE_CLIENT_WRITE_MODE', 'false').lower() in ('true'),
78+
'decrypt_ak_sk':
79+
os.environ.get('SMART_OPEN_OBS_DECRYPT_AK_SK', 'false').lower() in ('true'),
80+
'scc_lib_path':
81+
os.environ.get('SMART_OPEN_OBS_SCC_LIB_PATH', None),
82+
'scc_conf_path':
83+
os.environ.get('SMART_OPEN_OBS_SCC_CONF_PATH', None),
84+
}
85+
86+
set_defaults(kwargs, default_kwarg)
87+
88+
return open(parsed_uri['bucket_id'], parsed_uri['object_key'], mode, **kwargs)
89+
90+
91+
def open(
92+
bucket_id,
93+
object_key,
94+
mode,
95+
buffer_size=DEFAULT_CHUNK_SIZE,
96+
client: Optional[obs.ObsClient | dict] = None,
97+
headers: Optional[obs.PutObjectHeader | obs.GetObjectHeader] = None,
98+
use_obs_client_write_mode: bool = False,
99+
decrypt_ak_sk: bool = False,
100+
scc_lib_path: Optional[str] = None,
101+
scc_conf_path: Optional[str] = None):
102+
"""Open an OBS object for reading or writing.
103+
104+
Parameters
105+
----------
106+
bucket_id: str
107+
The name of the bucket this object resides in.
108+
object_key: str
109+
The name of the key within the bucket.
110+
mode: str
111+
The mode for opening the object. Must be either "rb" or "wb".
112+
buffer_size: int
113+
The buffer size to use when performing I/O.
114+
client: Optional[obs.ObsClient | dict]
115+
The initialized OBS client or dict with args that will be supplied to obs.ObsClient constructor.
116+
Please see docs for esdk-obs-python.
117+
headers: Optional[obs.PutObjectHeader | obs.GetObjectHeader]
118+
The optional additional headers of the request.
119+
Please see docs for esdk-obs-python.
120+
use_obs_client_write_mode: bool
121+
True if we will use readable object to get bytes.
122+
For writing only.
123+
Please see docs for ObsClient.putContent api
124+
decrypt_ak_sk: bool
125+
True if we need decrypt Access key, Secret key and Security token.
126+
It required to install CryptoAPI libs.
127+
https://support.huawei.com/enterprise/en/software/260510077-ESW2000847337
128+
scc_lib_path: Optional[str]
129+
The path to CryptoAPI libs.
130+
scc_conf_path: Optional[str]
131+
The path to scc.conf.
132+
"""
133+
134+
logger.debug('%r', locals())
135+
if mode not in constants.BINARY_MODES:
136+
raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))
137+
138+
_client = client if isinstance(client, obs.ObsClient) else create_obs_client(
139+
client_config=client,
140+
decrypt_ak_sk=decrypt_ak_sk,
141+
scc_lib_path=scc_lib_path,
142+
scc_conf_path=scc_conf_path)
143+
144+
if mode == constants.READ_BINARY:
145+
fileobj = ObsReader(bucket_id=bucket_id,
146+
object_key=object_key,
147+
client=_client)
148+
elif mode == constants.WRITE_BINARY:
149+
fileobj = ObsWriter(bucket_id=bucket_id,
150+
object_key=object_key,
151+
client=_client,
152+
use_obs_client_write_mode=use_obs_client_write_mode)
153+
else:
154+
assert False, 'unexpected mode: %r' % mode
155+
return fileobj
156+
157+
158+
def create_obs_client(client_config: dict,
159+
decrypt_ak_sk: bool = False,
160+
scc_lib_path: Optional[str] = None,
161+
scc_conf_path: Optional[str] = None) -> obs.ObsClient:
162+
"""Initializes the ObsClient.
163+
"""
164+
if not decrypt_ak_sk:
165+
return obs.ObsClient(**client_config)
166+
167+
crypto_provider = CryptoProvider(scc_lib_path=scc_lib_path, scc_conf_path=scc_conf_path)
168+
169+
if 'access_key_id' in client_config:
170+
access_key_id = client_config.get('access_key_id')
171+
secret_access_key = client_config.get('secret_access_key')
172+
security_token = client_config.get('security_token', None)
173+
else:
174+
tokens = get_token(security_providers=loadtoken.ENV)
175+
access_key_id = tokens.get('accessKey')
176+
secret_access_key = tokens.get('secretKey')
177+
security_token = tokens.get('securityToken')
178+
179+
decrypted_config = {
180+
access_key_id: crypto_provider.decrypt(access_key_id),
181+
secret_access_key: crypto_provider.decrypt(secret_access_key),
182+
security_token: crypto_provider.decrypt(security_token),
183+
}
184+
185+
set_defaults(decrypted_config, client_config)
186+
return obs.ObsClient(**decrypted_config)
187+
188+
189+
class ObsReader(io.RawIOBase):
190+
"""Read an OBS Object.
191+
"""
192+
193+
def __init__(self,
194+
bucket_id: str,
195+
object_key: str,
196+
client: obs.ObsClient,
197+
headers: Optional[obs.GetObjectHeader] = None,
198+
buffer_size: int = DEFAULT_CHUNK_SIZE):
199+
self.name = object_key
200+
self.bucket_id = bucket_id
201+
self.object_key = object_key
202+
self.buffer_size = buffer_size
203+
self._client = client
204+
self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
205+
self._resp = self._client.getObject(bucketName=bucket_id,
206+
objectKey=object_key,
207+
headers=headers)
208+
if self._resp.status >= 300:
209+
raise RuntimeError(
210+
f'Failed to read: {self.object_key}! '
211+
f'errorCode: {self._resp.errorCode}, '
212+
f'errorMessage: {self._resp.errorMessage}')
213+
214+
def readinto(self, __buffer):
215+
data = self.read(len(__buffer))
216+
if not data:
217+
return 0
218+
__buffer[:len(data)] = data
219+
return len(data)
220+
221+
def readinto1(self, __buffer):
222+
return self.readinto(__buffer)
223+
224+
def read(self, size=-1):
225+
if size == 0:
226+
return b''
227+
228+
if self._resp is None:
229+
raise RuntimeError(f'No response received while reading: {self.object_key}')
230+
231+
if size > 0:
232+
chunk = self._resp.body.response.read(size)
233+
return chunk
234+
else:
235+
while True:
236+
chunk = self._resp.body.response.read(self.buffer_size)
237+
if not chunk:
238+
break
239+
self._buffer.fill(struct.unpack(str(len(chunk)) + 'c', chunk))
240+
return self._buffer.read()
241+
242+
def read1(self, size=-1):
243+
return self.read(size)
244+
245+
def close(self):
246+
self.__del__()
247+
248+
def seekable(self):
249+
return False
250+
251+
def detach(self):
252+
"""Unsupported."""
253+
raise io.UnsupportedOperation
254+
255+
def __del__(self):
256+
try:
257+
if self._client:
258+
self._resp = None
259+
self._client.close()
260+
self._client = None
261+
except Exception as ex:
262+
logger.warning(ex)
263+
264+
265+
class ObsWriter(io.RawIOBase):
266+
"""Write an OBS Object.
267+
268+
If use_obs_client_write_mode set to False:
269+
this class buffers all of its input in memory until its `close` method is called.
270+
Only then the data will be written to OBS and the buffer is released.
271+
272+
If use_obs_client_write_mode set to True:
273+
`write` method of the ObsWriter will accept any readable object or path to file.
274+
In this case will be used internal implementation in obs.ObsClient.putContent to read bytes
275+
Write to OBS will be triggered in `close` method.
276+
"""
277+
278+
def __init__(self,
279+
bucket_id: str,
280+
object_key: str,
281+
client: obs.ObsClient,
282+
headers: Optional[obs.PutObjectHeader] = None,
283+
use_obs_client_write_mode: bool = False
284+
):
285+
self.name = object_key
286+
self.bucket_id = bucket_id
287+
self.object_key = object_key
288+
self._client = client
289+
self._headers = headers
290+
self._content: Optional[str | io.BytesIO | io.BufferedReader] = None
291+
self.use_obs_client_write_mode = use_obs_client_write_mode
292+
293+
def write(self, __buffer):
294+
if not __buffer:
295+
return None
296+
297+
if self.use_obs_client_write_mode:
298+
self._content = __buffer
299+
else:
300+
if not self._content:
301+
self._content = io.BytesIO()
302+
self._content.write(__buffer)
303+
return None
304+
305+
def close(self):
306+
if not self._content:
307+
self._client.close()
308+
return
309+
310+
if isinstance(self._content, io.BytesIO):
311+
self._content.seek(0)
312+
313+
self._client.putContent(bucketName=self.bucket_id,
314+
objectKey=self.object_key,
315+
content=self._content,
316+
headers=self._headers)
317+
self._content = None
318+
319+
def seekable(self):
320+
return False
321+
322+
def writable(self):
323+
return self._content is not None
324+
325+
def detach(self):
326+
"""Unsupported."""
327+
raise io.UnsupportedOperation
328+
329+
330+
class CryptoProvider:
331+
"""Decrypt Access Key, Secret Key, Security Token.
332+
333+
This class use Huawei CloudGuard CSP seccomponent to decrypt AK, SK and ST.
334+
"""
335+
336+
def __init__(self, scc_lib_path: Optional[str] = None, scc_conf_path: Optional[str] = None):
337+
self._scc_lib_path = scc_lib_path
338+
self._scc_conf_path = scc_conf_path
339+
340+
if scc_lib_path and scc_lib_path not in sys.path:
341+
sys.path.append(scc_lib_path)
342+
343+
try:
344+
from CryptoAPI import CryptoAPI
345+
except ImportError:
346+
raise RuntimeError(f'Failed to use CryptoAPI module. Please install CloudGuard CSP seccomponent.')
347+
348+
self._api = CryptoAPI()
349+
350+
if self._scc_conf_path:
351+
self._api.initialize(self._scc_conf_path)
352+
else:
353+
self._api.initialize()
354+
355+
def __del__(self):
356+
if self._api:
357+
self._api.finalize()
358+
359+
def decrypt(self, encrypted: Optional[str]) -> Optional[str]:
360+
return self._api.decrypt(encrypted) if encrypted else None

smart_open/transport.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ def get_transport(scheme):
104104
register_transport("smart_open.s3")
105105
register_transport("smart_open.ssh")
106106
register_transport("smart_open.webhdfs")
107+
register_transport("smart_open.obs")
107108

108109
SUPPORTED_SCHEMES = tuple(sorted(_REGISTRY.keys()))
109110
"""The transport schemes that the local installation of ``smart_open`` supports."""

0 commit comments

Comments
 (0)