Skip to content

Commit a2c374a

Browse files
committed
split requirements and venv into oss and yelpy for clog use
1 parent 78ed9bc commit a2c374a

File tree

10 files changed

+188
-648
lines changed

10 files changed

+188
-648
lines changed

MANIFEST.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
include requirements-oss.txt
2+
include requirements-yelp.txt

Makefile

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,33 @@ UID:=`id -u`
1515
GID:=`id -g`
1616
ITERATION=yelp1
1717

18-
.PHONY: test tests coverage clean
18+
# Determine environment (YELP or OSS)
19+
# Set SCL_ENV to 'YELP' if FQDN ends in '.yelpcorp.com'
20+
# Otherwise, set SCL_ENV to 'OSS'
21+
ifneq ($(findstring .yelpcorp.com,$(shell hostname -f)),)
22+
SCL_ENV ?= YELP
23+
else
24+
SCL_ENV ?= OSS
25+
endif
1926

27+
.PHONY: test tests-yelp tests-oss coverage clean venv venv-yelp venv-oss
2028

29+
30+
# Main test target dispatches to environment-specific test target
2131
test:
22-
tox
32+
ifeq ($(SCL_ENV),YELP)
33+
$(MAKE) tests-yelp
34+
else
35+
$(MAKE) tests-oss
36+
endif
37+
38+
tests-yelp: venv-yelp
39+
tox -e py3-yelp # Assumes py3-yelp will be the tox env for yelp
40+
41+
tests-oss: venv-oss
42+
tox -e py3 # Assumes py3 will be the default/OSS tox env
2343

24-
tests: test
44+
tests: test # Alias for backward compatibility or general use
2545
coverage: test
2646

2747
itest_%: package_%
@@ -51,8 +71,19 @@ package_%:
5171
'
5272
docker run -v $(CURDIR):/work:rw docker-dev.yelpcorp.com/$*_yelp chown -R $(UID):$(GID) /work
5373

54-
venv: requirements.txt setup.py tox.ini
55-
tox -e venv
74+
# Main venv target dispatches to environment-specific target
75+
venv:
76+
ifeq ($(SCL_ENV),YELP)
77+
$(MAKE) venv-yelp
78+
else
79+
$(MAKE) venv-oss
80+
endif
81+
82+
venv-yelp: requirements-yelp.txt requirements-oss.txt setup.py tox.ini
83+
tox -e venv-yelp # This tox env should install .[yelp]
84+
85+
venv-oss: requirements-oss.txt setup.py tox.ini
86+
tox -e venv-oss # This tox env should install normally
5687

5788
clean:
5889
rm -rf .cache

requirements.txt renamed to requirements-oss.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
boto3
2+
ephemeral-port-reserve >= 1.1.0
13
importlib-resources==5.4.0
24
pyinotify==0.9.6
3-
pyyaml >= 3.0
5+
PyYAML >= 5.1
6+
requests>=2.18.4
47
typing-extensions==4.13.2
58
# To resolve the error: botocore 1.29.125 has requirement urllib3<1.27,>=1.25.4, but you'll have urllib3 2.0.1 which is incompatible.
69
urllib3==1.26.15
7-
yelp-clog==7.2.0

requirements-yelp.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
yelp-clog==7.2.0

service_configuration_lib/spark_config.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
from service_configuration_lib.text_colors import TextColors
2525
from service_configuration_lib.utils import EPHEMERAL_PORT_END
2626
from service_configuration_lib.utils import EPHEMERAL_PORT_START
27-
from service_configuration_lib.utils import get_clog_handler
27+
28+
# Only works inside yelpy environments
29+
try:
30+
import clog
31+
except ImportError:
32+
clog = None
2833

2934
AWS_CREDENTIALS_DIR = '/etc/boto_cfg/'
3035
AWS_ENV_CREDENTIALS_PROVIDER = 'com.amazonaws.auth.EnvironmentVariableCredentialsProvider'
@@ -441,16 +446,13 @@ def __init__(self, is_driver_on_k8s_tron: bool = False):
441446
try:
442447
(
443448
self.spark_srv_conf, self.spark_constants, self.default_spark_srv_conf,
444-
self.mandatory_default_spark_srv_conf, self.spark_costs, _,
449+
self.mandatory_default_spark_srv_conf, self.spark_costs,
445450
) = utils.load_spark_srv_conf()
446451
except Exception as e:
447452
log.error(f'Failed to load Spark srv configs: {e}')
448453
# should fail because Spark config calculation depends on values in srv-configs
449454
raise e
450455

451-
# Get a MonkHandler instance for JIRA validation warnings.
452-
self.jira_monk_handler_ref = get_clog_handler(client_id='service_configuration_lib.spark_config')
453-
454456
def _append_spark_prometheus_conf(self, spark_opts: Dict[str, str]) -> Dict[str, str]:
455457
spark_opts['spark.ui.prometheus.enabled'] = 'true'
456458
spark_opts[
@@ -1020,7 +1022,6 @@ def _handle_jira_ticket_validation(
10201022
user: The user running the job
10211023
jira_ticket: The Jira ticket provided by the user
10221024
"""
1023-
# Get the jira ticket validation setting
10241025
flag_enabled = self.mandatory_default_spark_srv_conf.get('spark.yelp.jira_ticket.enabled', 'false')
10251026
valid_ticket = self._get_valid_jira_ticket(jira_ticket)
10261027

@@ -1051,21 +1052,26 @@ def _handle_jira_ticket_validation(
10511052
'For more information: http://y/spark-jira-ticket-param '
10521053
'If you have questions, please reach out to #spark on Slack. '
10531054
)
1054-
if self.jira_monk_handler_ref:
1055-
log_payload = {
1056-
'timestamp': int(time.time()),
1057-
'scribe_log': self.jira_monk_handler_ref.stream,
1058-
'event': 'jira_ticket_validation_warning',
1059-
'level': 'WARNING',
1060-
'reason': 'Ticket missing or invalid. See http://y/spark-jira-ticket-param',
1061-
'user': user,
1062-
'jira_ticket_provided': jira_ticket,
1063-
}
1064-
self.jira_monk_handler_ref.logger.log_line(
1065-
self.jira_monk_handler_ref.stream, json.dumps(log_payload),
1066-
)
1055+
if clog:
1056+
try:
1057+
clog.config.configure(
1058+
scribe_host='169.254.255.254', # Standard Scribe host
1059+
scribe_port=1463, # Standard Scribe port
1060+
monk_disable=False, # Ensure Monk (for clog) is enabled
1061+
scribe_disable=False, # Ensure Scribe is enabled
1062+
)
1063+
log_payload = {
1064+
'timestamp': int(time.time()),
1065+
'event': 'jira_ticket_validation_warning',
1066+
'level': 'WARNING',
1067+
'reason': 'Ticket missing or invalid. See http://y/spark-jira-ticket-param',
1068+
'user': user,
1069+
'jira_ticket_provided': jira_ticket,
1070+
}
1071+
clog.log_line('spark_jira_ticket', json.dumps(log_payload))
1072+
except Exception as e:
1073+
log.warning(f'{warning_message} Clog operation failed with: {e}')
10671074
else:
1068-
# Fallback to default logger if clog handler setup failed or ref is missing/invalid
10691075
log.warning(warning_message)
10701076
return valid_ticket
10711077

service_configuration_lib/utils.py

Lines changed: 2 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import errno
44
import hashlib
55
import logging
6-
import os
76
import random
87
import string
98
import uuid
@@ -14,16 +13,9 @@
1413
from socket import SOL_SOCKET
1514
from typing import Any
1615
from typing import Dict
17-
from typing import List
18-
from typing import Optional
1916
from typing import Tuple
2017

21-
import clog.config
22-
import srv_configs
2318
import yaml
24-
from clog.config import monk_host
25-
from clog.config import monk_port
26-
from clog.handlers import MonkHandler
2719
from typing_extensions import Literal
2820

2921
DEFAULT_SPARK_RUN_CONFIG = '/nail/srv/configs/spark.yaml'
@@ -40,7 +32,7 @@
4032
SPARK_DRIVER_MEM_OVERHEAD_FACTOR_DEFAULT = 0.1
4133

4234

43-
log = logging.Logger(__name__)
35+
log = logging.getLogger(__name__)
4436
log.setLevel(logging.INFO)
4537

4638

@@ -50,7 +42,6 @@ def load_spark_srv_conf(preset_values=None) -> Tuple[
5042
Dict[str, Any], # default_spark_srv_conf
5143
Dict[str, Any], # mandatory_default_spark_srv_conf
5244
Dict[str, Dict[str, float]], # spark_costs
53-
List[Dict[str, Any]], # module_configs
5445
]:
5546
if preset_values is None:
5647
preset_values = dict()
@@ -62,10 +53,9 @@ def load_spark_srv_conf(preset_values=None) -> Tuple[
6253
default_spark_srv_conf = spark_constants['defaults']
6354
mandatory_default_spark_srv_conf = spark_constants['mandatory_defaults']
6455
spark_costs = spark_constants['cost_factor']
65-
module_configs = loaded_values['module_config']
6656
return (
6757
spark_srv_conf, spark_constants, default_spark_srv_conf,
68-
mandatory_default_spark_srv_conf, spark_costs, module_configs,
58+
mandatory_default_spark_srv_conf, spark_costs,
6959
)
7060
except Exception as e:
7161
log.warning(f'Failed to load {DEFAULT_SPARK_RUN_CONFIG}: {e}')
@@ -227,161 +217,3 @@ def get_spark_driver_memory_overhead_mb(spark_conf: Dict[str, str]) -> float:
227217
)
228218
driver_mem_overhead_mb = driver_mem_mb * driver_mem_overhead_factor
229219
return round(driver_mem_overhead_mb, 5)
230-
231-
232-
def _load_default_service_configurations_for_clog() -> Optional[Dict[str, Any]]:
233-
"""
234-
Loads the external configuration file for the 'clog' namespace if specified in
235-
DEFAULT_SPARK_RUN_CONFIG's 'module_config' section.
236-
Returns the inline 'config' dictionary for the 'clog' namespace if found,
237-
otherwise None.
238-
"""
239-
clog_config_file_path = None
240-
clog_inline_config = None
241-
found_clog_module_config = False
242-
243-
try:
244-
_, _, _, _, _, module_configs = load_spark_srv_conf()
245-
246-
for mc_item in module_configs:
247-
if isinstance(mc_item, dict) and mc_item.get('namespace') == 'clog':
248-
found_clog_module_config = True
249-
clog_config_file_path = mc_item.get('file')
250-
clog_inline_config = mc_item.get('config')
251-
break
252-
253-
if not found_clog_module_config:
254-
log.warning(
255-
f"Could not find 'clog' namespace entry in 'module_config' "
256-
f'section within {DEFAULT_SPARK_RUN_CONFIG}.',
257-
)
258-
return None
259-
260-
if clog_config_file_path:
261-
if os.path.exists(clog_config_file_path):
262-
try:
263-
srv_configs.use_file(clog_config_file_path, namespace='clog')
264-
log.info(
265-
f'Successfully loaded clog configuration file {clog_config_file_path} '
266-
f"into namespace 'clog'.",
267-
)
268-
except Exception as e_use_file:
269-
log.error(
270-
f'Error loading clog configuration file {clog_config_file_path} '
271-
f'using srv_configs.use_file: {e_use_file}',
272-
)
273-
else:
274-
log.error(
275-
f"Clog configuration file specified in 'module_config' of {DEFAULT_SPARK_RUN_CONFIG} "
276-
f'does not exist: {clog_config_file_path}.',
277-
)
278-
else:
279-
log.info(
280-
f"No 'file' specified for 'clog' namespace in 'module_config' of {DEFAULT_SPARK_RUN_CONFIG}. "
281-
'Not loading any external file for clog via module_config.',
282-
)
283-
284-
# Return the inline config dictionary, which might be None if not present
285-
if isinstance(clog_inline_config, dict):
286-
return clog_inline_config
287-
elif clog_inline_config is not None:
288-
log.warning(f"Inline 'config' for 'clog' namespace in {DEFAULT_SPARK_RUN_CONFIG} is not a dictionary.")
289-
return None
290-
else:
291-
return None
292-
293-
except FileNotFoundError:
294-
log.error(
295-
f'Error: Main Spark run config file {DEFAULT_SPARK_RUN_CONFIG} not found. '
296-
'Cannot process clog configurations.',
297-
)
298-
return None
299-
except yaml.YAMLError as e_yaml:
300-
log.error(f'Error parsing YAML from {DEFAULT_SPARK_RUN_CONFIG}: {e_yaml}')
301-
return None
302-
except Exception as e_main:
303-
log.error(
304-
f'An unexpected error occurred in _load_default_service_configurations_for_clog: {e_main}',
305-
)
306-
return None
307-
308-
309-
def get_clog_handler(
310-
client_id: Optional[str] = None,
311-
stream_name_override: Optional[str] = None,
312-
) -> Optional[MonkHandler]:
313-
"""
314-
Configures and returns a clog MonkHandler for logging.
315-
316-
This utility helps in setting up a MonkHandler. It ensures the external
317-
clog configuration file (if specified in DEFAULT_SPARK_RUN_CONFIG) is loaded
318-
into srv_configs. It then determines the log_stream_name with the following
319-
priority:
320-
1. `stream_name_override` argument.
321-
2. `log_stream_name` from the inline 'config' of the 'clog' module_config
322-
in DEFAULT_SPARK_RUN_CONFIG.
323-
3. `log_stream_name` from the 'clog' namespace in srv_configs (typically
324-
loaded from the external file like /nail/srv/configs/clog.yaml).
325-
326-
Args:
327-
client_id: Optional client identifier for the log messages.
328-
Defaults to the current OS user or 'unknown_spark_user'.
329-
stream_name_override: Optional explicit clog stream name to use,
330-
overriding any configured values.
331-
332-
Returns:
333-
A configured MonkHandler instance if successful, otherwise None.
334-
"""
335-
# Load external file (if any) and get inline config from spark.yaml's module_config
336-
inline_clog_config = _load_default_service_configurations_for_clog()
337-
338-
actual_client_id = client_id or os.getenv('USER') or 'unknown_spark_user'
339-
final_stream_name = stream_name_override
340-
341-
if not final_stream_name:
342-
if inline_clog_config and isinstance(inline_clog_config.get('log_stream_name'), str):
343-
final_stream_name = inline_clog_config['log_stream_name']
344-
log.info(
345-
f"Using log_stream_name '{final_stream_name}' from inline module_config in "
346-
f'{DEFAULT_SPARK_RUN_CONFIG}.',
347-
)
348-
else:
349-
try:
350-
# Fallback to srv_configs (which should have data from the external file)
351-
clog_srv_configs_dict = srv_configs.get_namespace_as_dict('clog')
352-
final_stream_name = clog_srv_configs_dict.get('log_stream_name')
353-
if final_stream_name:
354-
log.info(
355-
f"Using log_stream_name '{final_stream_name}' from srv_configs 'clog' namespace "
356-
f'(likely from external file).',
357-
)
358-
except Exception as e:
359-
log.warning(
360-
f"Could not get 'clog' namespace from srv_configs or 'log_stream_name' key missing. "
361-
f'This may be okay if stream_name_override or inline config provides it. Error: {e}',
362-
)
363-
364-
if not final_stream_name:
365-
log.error(
366-
'Clog stream_name could not be determined. It was not provided as an argument, '
367-
'not found in the inline module_config for "clog", and not found in the '
368-
'"clog" srv_configs namespace. Clog handler cannot be configured.',
369-
)
370-
return None
371-
372-
# Ensure that clog is configured to enable Monk logging.
373-
# The default in clog.config.monk_disable is True.
374-
clog.config.configure_from_dict({'monk_disable': False})
375-
log.info('Clog has been configured to enable Monk logging (monk_disable=False).')
376-
377-
try:
378-
handler = MonkHandler(
379-
client_id=actual_client_id,
380-
host=monk_host,
381-
port=monk_port,
382-
stream=final_stream_name,
383-
)
384-
return handler
385-
except Exception as e:
386-
log.error(f"Failed to create MonkHandler for clog with stream '{final_stream_name}'. Error: {e}")
387-
return None

0 commit comments

Comments
 (0)