Skip to content

Commit e7cf8b7

Browse files
committed
This might be the ticket?
1 parent 3a939bf commit e7cf8b7

File tree

5 files changed

+119
-65
lines changed

5 files changed

+119
-65
lines changed

build/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ services:
1919
- LOG_LEVEL=DEBUG # DEBUG INFO WARNING ERROR CRITICAL # Default is INFO
2020
# - LOG_RECENT_COMMITS=5
2121
- MAX_CONCURRENT_CONVERSIONS_GLOBAL=20
22-
- MAX_CONCURRENT_CONVERSIONS_PER_SERVER=10
22+
- MAX_CONCURRENT_CONVERSIONS_PER_SERVER=3
2323
# - MAX_CYCLES=5
2424
- REPO_CONVERTER_INTERVAL_SECONDS=30 # Default is 3600 seconds (1 hour)
2525
# image: ghcr.io/sourcegraph/repo-converter:HEAD

dev/TODO.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@
5858
### Stability
5959

6060
- `git svn fetch`
61-
- Updated batch end rev is not getting written to .git/config, file perms issue?
62-
- Updated batch end rev is not getting read from .git/config, file perms issue?
61+
- Commits not getting committed to the local repo
62+
- Not entirely sure if svn is blocking me
6363

6464
- `svn log` commands
6565
- Longest commands, which seem to be timing out and causing issues

src/source_repo/svn.py

Lines changed: 89 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ def convert(ctx: Context) -> None:
3434
Entrypoint / main logic / orchestration function
3535
"""
3636

37-
job_start_time = int(time.time())
38-
3937
# Extract repo conversion job config values from the repos list in ctx,
4038
# and set default values for required but undefined configs
4139
_extract_repo_config_and_set_default_values(ctx)
@@ -75,14 +73,14 @@ def convert(ctx: Context) -> None:
7573
if _check_if_repo_already_up_to_date(ctx):
7674

7775
# If the repo already exists, and is already up to date, then exit early
78-
7976
### EXTERNAL COMMAND: svn log ###
80-
_log_recent_commits(ctx, commands)
81-
_cleanup(ctx)
77+
_cleanup(ctx, commands)
78+
log(ctx, "Ending svn repo conversion job", "info")
8279
return
8380

8481
### EXTERNAL COMMAND: svn log ###
8582
# This is the big one, to count all revs remaining
83+
# TODO: Separate the svn log range from calculating batch revisions
8684
_log_number_of_revs_out_of_date(ctx, commands)
8785

8886
# Calculate revision range for this fetch
@@ -95,24 +93,14 @@ def convert(ctx: Context) -> None:
9593
git_svn_fetch_result = _git_svn_fetch(ctx, commands)
9694

9795
## Gather information needed to decide if the fetch was successful or failed
98-
99-
# Check if the repo is valid after the fetch
100-
if _check_if_repo_exists_locally(ctx, "end"):
101-
pass
102-
10396
# Cleanup before exit
104-
_cleanup(ctx)
105-
106-
# Get dir size of converted git repo
107-
_get_local_git_repo_stats(ctx, "end")
108-
109-
ctx.job["result"]["run_time_seconds"] = int(time.time() - job_start_time)
97+
_cleanup(ctx, commands)
11098

11199
## Decide if the fetch was successful or failed
112100
## Also update batch end rev in git repo config file
113101
_verify_git_svn_fetch_success(ctx, git_svn_fetch_result)
114102

115-
log(ctx, "SVN repo conversion job complete", "info")
103+
log(ctx, "Ending svn repo conversion job", "info")
116104

117105

118106
def _extract_repo_config_and_set_default_values(ctx: Context) -> None:
@@ -675,14 +663,19 @@ def _log_recent_commits(ctx: Context, commands: dict) -> None:
675663
ctx.job.pop("svn_log_output")
676664

677665

678-
def _cleanup(ctx: Context) -> None:
666+
def _cleanup(ctx: Context, commands: dict) -> None:
679667
"""
680668
Groups up any other functions needed to clean up before exit
681669
"""
682670

671+
# Get dir size of converted git repo
672+
_get_local_git_repo_stats(ctx, "end")
673+
674+
_log_recent_commits(ctx, commands)
675+
683676
# Run git garbage collection and cleanup branches, even if repo is already up to date
684-
git.garbage_collection(ctx)
685677
git.cleanup_branches_and_tags(ctx)
678+
git.garbage_collection(ctx)
686679

687680

688681
def _log_number_of_revs_out_of_date(ctx: Context, commands: dict) -> None:
@@ -713,7 +706,7 @@ def _log_number_of_revs_out_of_date(ctx: Context, commands: dict) -> None:
713706
log(ctx, "Logging remaining_revs; note: this is an expensive operation", "info")
714707

715708

716-
def _calculate_batch_revisions(ctx: Context, commands: dict) -> dict:
709+
def _calculate_batch_revisions(ctx: Context, commands: dict) -> bool:
717710
"""
718711
Run the svn log command to calculate batch start and end revisions for fetching
719712
"""
@@ -730,53 +723,79 @@ def _calculate_batch_revisions(ctx: Context, commands: dict) -> dict:
730723
# Pick a revision number to start with; may or may not be a real rev number
731724
this_batch_start_rev = int(previous_batch_end_rev + 1)
732725

733-
734726
# Run the svn log command to get real revision numbers for this batch
735-
cmd_svn_log_get_batch_revs = cmd_svn_log + ["--limit", str(fetch_batch_size), "--revision", f"{this_batch_start_rev}:HEAD"]
736-
cmd_svn_log_get_batch_revs_result = cmd.run_subprocess(ctx, cmd_svn_log_get_batch_revs, password, name="cmd_svn_log_get_batch_revs")
737-
cmd_svn_log_get_batch_revs_output_list = list(cmd_svn_log_get_batch_revs_result.get("output",""))
738-
cmd_svn_log_get_batch_revs_output_string = " ".join(cmd_svn_log_get_batch_revs_output_list)
739-
740-
if cmd_svn_log_get_batch_revs_result["return_code"] == 0 and \
741-
len(cmd_svn_log_get_batch_revs_output_list) > 0 and \
742-
"revision" in cmd_svn_log_get_batch_revs_output_string:
727+
cmd_svn_log_get_batch_revs = cmd_svn_log + ["--limit", str(fetch_batch_size), "--revision", f"{this_batch_start_rev}:HEAD"]
728+
process_result = cmd.run_subprocess(ctx, cmd_svn_log_get_batch_revs, password, name="cmd_svn_log_get_batch_revs")
729+
log_details = {"process": process_result}
730+
output_list = list(process_result.get("output",""))
731+
output_string = " ".join(output_list)
732+
len_output_list = len(output_list)
733+
# Start off as a set type for built-in deduplication
734+
list_of_revs_this_batch = set()
735+
736+
if process_result["return_code"] == 0 and \
737+
len_output_list > 0 and \
738+
"revision" in output_string:
739+
740+
## Extract the specific revisions from the svn log output
741+
# "output": [
742+
# "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
743+
# "<log>",
744+
# "<logentry",
745+
# " revision=\"1636921\">",
746+
# "</logentry>",
747+
# "<logentry",
748+
# " revision=\"1636922\">",
749+
# "</logentry>",
750+
# "</log>"
751+
# ],
752+
753+
for line in output_list:
754+
if "revision" in line:
755+
list_of_revs_this_batch.add(int(line.split("revision=\"")[1].split("\"")[0]))
756+
757+
# Then convert to a list for sorting
758+
list_of_revs_this_batch = sorted(list_of_revs_this_batch)
743759

744760
# Update the this batch's starting rev to the first real rev number after the previous end rev
745-
this_batch_start_rev = int(" ".join(cmd_svn_log_get_batch_revs_output_list).split("revision=\"")[1].split("\"")[0])
761+
this_batch_start_rev = min(list_of_revs_this_batch)
746762
ctx.job["stats"]["local"]["this_batch_start_rev"] = this_batch_start_rev
747763

748-
# Reverse the output so we can get the last revision number
749-
cmd_svn_log_get_batch_revs_output_list.reverse()
750-
this_batch_end_rev = int(" ".join(cmd_svn_log_get_batch_revs_output_list).split("revision=\"")[1].split("\"")[0])
764+
# Get the last revision number
765+
this_batch_end_rev = max(list_of_revs_this_batch)
751766
ctx.job["stats"]["local"]["this_batch_end_rev"] = this_batch_end_rev
752767

753768
else:
754769
log_failure_message = "Failed to get batch revs from svn log"
755770

756771

757-
## Check if the output isn't as long as we were expecting
758-
# Expected output number of lines for
759-
# svn log --xml --with-no-revprops --non-interactive --limit 10 --revision 1:HEAD
760-
# is 3 lines per revision
761-
# and 3 lines for xml format start / end
762-
expected_output_list_len = (fetch_batch_size * 3) + 3
763-
764-
if len(cmd_svn_log_get_batch_revs_output_list) < expected_output_list_len:
765-
log_failure_message = "svn log returned fewer lines than expected"
766-
767-
768772
## Count how many revs are in the svn log output
769-
revs_in_svn_log_output = cmd_svn_log_get_batch_revs_output_string.count("revision=")
773+
len_list_of_revs_this_batch = len(list_of_revs_this_batch)
770774
# Grab the min, in case we are close to the current rev,
771775
# and there are fewer revs remaining than our current batch size
772-
fetching_batch_count = min(revs_in_svn_log_output, fetch_batch_size)
776+
fetching_batch_count = min(len_list_of_revs_this_batch, fetch_batch_size)
773777
# Store it in the job stats dict
774-
ctx.job["stats"]["local"]["fetching_batch_count"] = fetching_batch_count
778+
ctx.job["stats"]["local"]["fetching_batch_count"] = fetching_batch_count
779+
ctx.job["stats"]["local"]["list_of_revs_this_batch"] = list_of_revs_this_batch
780+
781+
782+
# ## Check if the output isn't as long as we were expecting
783+
# This isn't a valid check, as
784+
# some repos are smaller than our batch size,
785+
# and once the repo conversion catches up to the latest rev,
786+
# there will be fewer commits to convert each run
787+
# # Expected output number of lines for
788+
# # svn log --xml --with-no-revprops --non-interactive --limit 10 --revision 1:HEAD
789+
# # is 3 lines per revision
790+
# # and 3 lines for xml format start / end
791+
# expected_output_list_len = (fetch_batch_size * 3) + 3
792+
# if len_output_list < expected_output_list_len:
793+
# log(ctx, f"svn log returned fewer lines: {len_output_list} than expected: {expected_output_list_len}", "warning", log_details)
775794

776795

777796
if log_failure_message:
778797
set_job_result(ctx, "skipped", log_failure_message, False)
779-
log(ctx, log_failure_message, "error")
798+
log(ctx, log_failure_message, "error", log_details)
780799
return False
781800

782801
else:
@@ -857,11 +876,18 @@ def _verify_git_svn_fetch_success(ctx: Context, git_svn_fetch_result: dict) -> N
857876
## Gather needed inputs
858877
action = "git svn fetch"
859878
ctx.job["result"]["failures"] = []
879+
git_svn_fetch_output_for_errors = list(git_svn_fetch_result.get("output",""))
860880
git_svn_fetch_output = list(git_svn_fetch_result.get("output",""))
861881
job_config = ctx.job.get("config","")
862882
job_stats_local = ctx.job.get("stats","").get("local","")
863883
structured_log_dict = {"process": git_svn_fetch_result}
864884

885+
886+
# Check if the repo is valid after the fetch
887+
if _check_if_repo_exists_locally(ctx, "end"):
888+
pass
889+
890+
865891
## Check for any errors in the command output
866892
# TODO: Test the error message processing
867893

@@ -874,7 +900,7 @@ def _verify_git_svn_fetch_success(ctx: Context, git_svn_fetch_result: dict) -> N
874900

875901
# Remove the not_error lines from the output list
876902
for not_error in not_errors:
877-
git_svn_fetch_output = [x for x in git_svn_fetch_output if not re.search(not_error, x)]
903+
git_svn_fetch_output_for_errors = [x for x in git_svn_fetch_output_for_errors if not re.search(not_error, x)]
878904

879905
# Check for expected error messages
880906
# We should keep this list tidy, as execution time is
@@ -945,24 +971,27 @@ def _verify_git_svn_fetch_success(ctx: Context, git_svn_fetch_result: dict) -> N
945971
for error_category in error_message_regex_patterns_dict.keys():
946972
for error_message_regex_pattern in error_message_regex_patterns_dict.get(error_category):
947973

974+
regex_pattern = rf".*{error_message_regex_pattern}.*"
975+
regex = re.compile(regex_pattern, flags=re.IGNORECASE)
976+
948977
# We need the line match, but testing the match across the entire list first to reduce the exponential runtime
949-
list_match = re.search(error_message_regex_pattern, " ".join(git_svn_fetch_output))
978+
list_match = regex.search(" ".join(git_svn_fetch_output_for_errors))
950979

951980
if list_match:
952981
for match_group in list_match.groups():
953-
for line in git_svn_fetch_output:
982+
for line in git_svn_fetch_output_for_errors:
954983

955984
# Re-running the match, as list_match may match across lines,
956985
# but we only want to match within each line
957-
line_match = re.search(error_message_regex_pattern, line)
986+
line_match = regex.search(line)
958987

959988
if line_match:
960989

961990
ctx.job["result"]["failures"].append(f"Error message: {error_category}: {line}")
962991

963992
# Remove the svn fetch error line from the process output list to avoid duplicate output,
964993
# if one line in the error message matches multiple error_messages
965-
git_svn_fetch_output.remove(line)
994+
git_svn_fetch_output_for_errors.remove(line)
966995

967996

968997
## Get the latest commit from the git repo's commit logs
@@ -989,6 +1018,13 @@ def _verify_git_svn_fetch_success(ctx: Context, git_svn_fetch_result: dict) -> N
9891018
ctx.job["result"]["failures"].append(f"git_commits_added: {git_commits_added} != fetch_batch_size: {fetch_batch_size}")
9901019

9911020

1021+
## Count how many, and which revs were checked in this fetch
1022+
# Verify each of them are in the git log output
1023+
# TODO: Implement this
1024+
# git_svn_fetch_output
1025+
1026+
## Make final success / fail call
1027+
9921028
if len(ctx.job["result"]["failures"]) > 0:
9931029

9941030
reason = "output failed verification"

src/utils/concurrency_manager.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def acquire_job_slot(self, ctx: Context) -> bool:
9393

9494
if active_job_repo == this_job_repo:
9595
set_job_result(ctx, "skipped", "Repo job already in progress", False)
96-
log(ctx, f"Skipping; Repo job already in progress; repo: {active_job_repo}, timestamp: {active_job_timestamp}; trace: {active_job_trace}; running for: {int(time.time() - active_job_timestamp)} seconds", "info")
96+
log(ctx, f"{this_job_repo} Skipping; Repo job already in progress; started at: {active_job_timestamp}; trace: {active_job_trace}; running for: {int(time.time() - active_job_timestamp)} seconds", "info")
9797
return False
9898

9999
## Add this job to the dict of waiting jobs, just in case the blocking semaphore acquire takes a while
@@ -112,17 +112,17 @@ def acquire_job_slot(self, ctx: Context) -> bool:
112112

113113
# Check the semaphore value for number of remaining slots
114114
if server_semaphore.get_value() <= 0:
115-
log(ctx, f"Hit per-server concurrency limit; MAX_CONCURRENT_CONVERSIONS_PER_SERVER={self.per_server_limit}, waiting for a server slot", "info", log_concurrency_status=True)
115+
log(ctx, f"{this_job_repo} Hit per-server concurrency limit; MAX_CONCURRENT_CONVERSIONS_PER_SERVER={self.per_server_limit}, waiting for a server slot", "info", log_concurrency_status=True)
116116

117117
## Check global limit
118118
if self.global_semaphore.get_value() <= 0:
119-
log(ctx, f"Hit global concurrency limit; MAX_CONCURRENT_CONVERSIONS_GLOBAL={self.global_limit}, waiting for a slot", "info", log_concurrency_status=True)
119+
log(ctx, f"{this_job_repo} Hit global concurrency limit; MAX_CONCURRENT_CONVERSIONS_GLOBAL={self.global_limit}, waiting for a slot", "info", log_concurrency_status=True)
120120

121121
## Acquire a slot in the the server-specific semaphore
122122
# Want to block, so that the main loop has to wait until all repos get a chance to run through before finishing
123123
if not server_semaphore.acquire(block=True):
124124

125-
log(ctx, "server_semaphore.acquire failed", "error", log_concurrency_status=True)
125+
log(ctx, f"{this_job_repo} server_semaphore.acquire failed", "error", log_concurrency_status=True)
126126
return False
127127

128128
## Acquire a slot in the the global semaphore
@@ -132,7 +132,7 @@ def acquire_job_slot(self, ctx: Context) -> bool:
132132
# Release the server semaphore since we couldn't get the global one
133133
server_semaphore.release()
134134

135-
log(ctx, "self.global_semaphore.acquire failed", "error", log_concurrency_status=True)
135+
log(ctx, f"{this_job_repo} self.global_semaphore.acquire failed", "error", log_concurrency_status=True)
136136
return False
137137

138138
## Successfully acquired both semaphores
@@ -171,8 +171,10 @@ def acquire_job_slot(self, ctx: Context) -> bool:
171171
# Overwrite the managed list
172172
self.queued_jobs[server_name] = queued_jobs_list
173173

174+
ctx.job["result"]["start_timestamp"] = this_job_timestamp
175+
174176
# Log an update
175-
log(ctx, f"Acquired job slot", "debug")
177+
log(ctx, f"{this_job_repo} Acquired job slot", "debug")
176178

177179
return True
178180

@@ -184,6 +186,7 @@ def _get_server_semaphore(self, ctx: Context):
184186

185187
# Get job information from context
186188
this_job_config = ctx.job.get("config","")
189+
this_job_repo = this_job_config.get("repo_key","")
187190
server_name = this_job_config.get("server_name","")
188191

189192
# Wait for the lock to be free
@@ -199,7 +202,7 @@ def _get_server_semaphore(self, ctx: Context):
199202
self.per_server_semaphores[server_name] = multiprocessing.Semaphore(self.per_server_limit)
200203

201204
# Can't log with log_concurrency_status=True, causes a deadlock
202-
log(ctx, f"Created concurrency limit semaphore for server {server_name} with limit {self.per_server_limit}", "debug")
205+
log(ctx, f"{this_job_repo} Created concurrency limit semaphore for server {server_name} with limit {self.per_server_limit}", "debug")
203206

204207
# Whether the server already had a semaphore in the dict, or one was just created for it, return the semaphore object
205208
return self.per_server_semaphores[server_name]
@@ -355,7 +358,10 @@ def release_job_slot(self, ctx: Context) -> None:
355358
# Overwrite the managed list
356359
self.active_jobs[server_name] = server_active_jobs_list
357360

358-
log(ctx, f"Released job slot", "debug")
361+
ctx.job["result"]["end_timestamp"] = int(time.time())
362+
ctx.job["result"]["execution_time"] = int(ctx.job["result"]["end_timestamp"] - ctx.job["result"]["start_timestamp"])
363+
364+
log(ctx, f"{this_job_repo} Released job slot", "debug")
359365

360366
except ValueError as e:
361-
log(ctx, f"Error releasing job slot: {e}", "error")
367+
log(ctx, f"{this_job_repo} Error releasing job slot: {e}", "error")

src/utils/log.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,18 @@ def _build_structured_payload(
119119

120120
# Merge any job data from the context
121121
if ctx.job:
122+
123+
ctx_job_result = ctx.job.get("result",{})
124+
start_timestamp = ctx_job_result.get("start_timestamp")
125+
end_timestamp = ctx_job_result.get("end_timestamp")
126+
execution_time = ctx_job_result.get("execution_time")
127+
128+
# If the job is still running
129+
if start_timestamp and not end_timestamp and not execution_time:
130+
131+
# Then add a running_time_seconds
132+
ctx.job["result"]["running_time_seconds"] = int(time.time() - start_timestamp)
133+
122134
payload.update({"job": dict(ctx.job)})
123135

124136
# Remove any null values

0 commit comments

Comments
 (0)