-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery.py
More file actions
114 lines (94 loc) · 2.73 KB
/
query.py
File metadata and controls
114 lines (94 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from pathlib import Path
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
ES_HOST = os.getenv("ES_HOST")
ES_USER = os.getenv("ES_USER")
ES_PASS = os.getenv("ES_PASSWORD")
ES_INDEX = os.getenv("ES_INDEX")
FIELDS=[]
FIELDS = [
"ClusterId",
"Attempt",
"AttemptEndTime",
"TransferTotalBytes",
"TransferStartTime",
"TransferEndTime",
"TransferUrl",
"AttemptTime",
"machineattrname0",
"machineattrglidein_site0",
"Endpoint"
]
def read_job_ids(filename="slow_transfers"):
"""Read job IDs from the slow_transfers file.
Args:
filename: Path to file containing job IDs (one per line)
Returns:
List of job IDs as strings
"""
job_ids = []
filepath = Path(filename)
if not filepath.exists():
raise FileNotFoundError(f"Job ID file not found: {filename}")
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'): # Skip empty lines and comments
job_ids.append(line)
return job_ids
def get_query(job_ids=None):
"""Build Elasticsearch query for transfer history.
Args:
job_ids: List of job IDs to query for. If None, reads from slow_transfers file.
Returns:
Query dictionary for Elasticsearch
"""
if job_ids is None:
job_ids = read_job_ids()
query = {
"index": ES_INDEX,
"scroll": "30s",
"size": 500,
"body": {
"_source": FIELDS,
"query": {
"bool": {
"filter": [
{
"terms": {
"ClusterId": job_ids
}
}
],
},
},
},
}
return query
def print_csv(docs):
print(",".join(FIELDS))
for doc in docs:
print(",".join([str(doc.get(field,"UNKNOWN")) for field in FIELDS]))
def main():
# Validate that credentials are loaded
if not ES_USER or not ES_PASS:
raise ValueError(
"ES_USER and ES_PASSWORD must be set in .env file.\n"
"Create a .env file with:\n"
"ES_USER=your_username\n"
"ES_PASSWORD=your_password"
)
client = Elasticsearch(ES_HOST, basic_auth=(ES_USER, ES_PASS))
query = get_query()
docs = []
for doc in scan(client=client, query=query.pop("body"), **query):
# import pdb; pdb.set_trace()
docs.append(doc["_source"])
print_csv(docs)
if __name__ == "__main__":
main()