|
99 | 99 |
|
100 | 100 | -record(recovery, {mode :: initial | post_boot, |
101 | 101 | ranges = #{} :: #{ra_uid() => |
102 | | - [{ets:tid(), {ra:index(), ra:index()}}]}, |
| 102 | + [{ets:tid(), ra_seq:state()}]}, |
103 | 103 | tables = #{} :: #{ra_uid() => ra_mt:state()}, |
104 | 104 | writers = #{} :: #{ra_uid() => {in_seq, ra:index()}} |
105 | 105 | }). |
@@ -584,22 +584,29 @@ roll_over(#state{wal = Wal0, file_num = Num0, |
584 | 584 | %% if this is the first wal since restart randomise the first |
585 | 585 | %% max wal size to reduce the likelihood that each erlang node will |
586 | 586 | %% flush mem tables at the same time |
587 | | - NextMaxBytes = case Wal0 of |
588 | | - undefined -> |
589 | | - Half = MaxBytes div 2, |
590 | | - Half + rand:uniform(Half); |
591 | | - #wal{ranges = Ranges, |
592 | | - filename = Filename} -> |
593 | | - _ = file:advise(Wal0#wal.fd, 0, 0, dont_need), |
594 | | - ok = close_file(Wal0#wal.fd), |
595 | | - MemTables = Ranges, |
596 | | - %% TODO: only keep base name in state |
597 | | - Basename = filename:basename(Filename), |
598 | | - ok = ra_log_segment_writer:accept_mem_tables(SegWriter, |
599 | | - MemTables, |
600 | | - Basename), |
601 | | - MaxBytes |
602 | | - end, |
| 587 | + NextMaxBytes = |
| 588 | + case Wal0 of |
| 589 | + undefined -> |
| 590 | + Half = MaxBytes div 2, |
| 591 | + Half + rand:uniform(Half); |
| 592 | + #wal{ranges = Ranges, |
| 593 | + filename = Filename} -> |
| 594 | + _ = file:advise(Wal0#wal.fd, 0, 0, dont_need), |
| 595 | + ok = close_file(Wal0#wal.fd), |
| 596 | + %% floor all sequences |
| 597 | + MemTables = maps:map( |
| 598 | + fun (UId, TidRanges) -> |
| 599 | + SmallestIdx = smallest_live_index(Conf0, UId), |
| 600 | + [{Tid, ra_seq:floor(SmallestIdx, Seq)} |
| 601 | + || {Tid, Seq} <- TidRanges] |
| 602 | + end, Ranges), |
| 603 | + %% TODO: only keep base name in state |
| 604 | + Basename = filename:basename(Filename), |
| 605 | + ok = ra_log_segment_writer:accept_mem_tables(SegWriter, |
| 606 | + MemTables, |
| 607 | + Basename), |
| 608 | + MaxBytes |
| 609 | + end, |
603 | 610 | {Conf, Wal} = open_wal(NextFile, NextMaxBytes, Conf0), |
604 | 611 | State0#state{conf = Conf, |
605 | 612 | wal = Wal, |
@@ -695,11 +702,12 @@ complete_batch(#state{batch = #batch{waiting = Waiting, |
695 | 702 | complete_batch_writer(Pid, #batch_writer{smallest_live_idx = SmallestIdx, |
696 | 703 | tid = MtTid, |
697 | 704 | uid = UId, |
698 | | - seq = Range, |
| 705 | + seq = Seq0, |
699 | 706 | term = Term, |
700 | 707 | old = undefined}, Ranges) -> |
701 | | - Pid ! {ra_log_event, {written, Term, Range}}, |
702 | | - update_ranges(Ranges, UId, MtTid, SmallestIdx, Range); |
| 708 | + Seq = ra_seq:floor(SmallestIdx, Seq0), |
| 709 | + Pid ! {ra_log_event, {written, Term, Seq}}, |
| 710 | + update_ranges(Ranges, UId, MtTid, SmallestIdx, Seq); |
703 | 711 | complete_batch_writer(Pid, #batch_writer{old = #batch_writer{} = OldBw} = Bw, |
704 | 712 | Ranges0) -> |
705 | 713 | Ranges = complete_batch_writer(Pid, OldBw, Ranges0), |
@@ -968,22 +976,19 @@ should_roll_wal(#state{conf = #conf{max_entries = MaxEntries}, |
968 | 976 | smallest_live_index(#conf{ra_log_snapshot_state_tid = Tid}, ServerUId) -> |
969 | 977 | ra_log_snapshot_state:smallest(Tid, ServerUId). |
970 | 978 |
|
971 | | -update_ranges(Ranges, UId, MtTid, SmallestIdx, AddSeq) -> |
| 979 | +update_ranges(Ranges, UId, MtTid, _SmallestIdx, AddSeq) -> |
972 | 980 | case Ranges of |
973 | | - #{UId := [{MtTid, Seq0} | Rem]} -> |
| 981 | + #{UId := [{MtTid, Seq0} | Seqs]} -> |
974 | 982 | %% SmallestIdx might have moved to we truncate the old range first |
975 | 983 | %% before extending |
976 | | - Seq1 = ra_seq:floor(SmallestIdx, Seq0), |
| 984 | + % Seq1 = ra_seq:floor(SmallestIdx, Seq0), |
977 | 985 | %% limit the old range by the add end start as in some resend |
978 | 986 | %% cases we may have got back before the prior range. |
979 | | - Seq = ra_seq:add(AddSeq, Seq1), |
980 | | - Ranges#{UId => [{MtTid, Seq} | Rem]}; |
981 | | - #{UId := [{OldMtTid, OldMtSeq} | Rem]} -> |
| 987 | + Seq = ra_seq:add(AddSeq, Seq0), |
| 988 | + Ranges#{UId => [{MtTid, Seq} | Seqs]}; |
| 989 | + #{UId := Seqs} -> |
982 | 990 | %% new Tid, need to add a new range record for this |
983 | | - Ranges#{UId => [{MtTid, AddSeq}, |
984 | | - {OldMtTid, |
985 | | - ra_seq:floor(SmallestIdx, OldMtSeq)} |
986 | | - | Rem]}; |
| 991 | + Ranges#{UId => [{MtTid, AddSeq} | Seqs]}; |
987 | 992 | _ -> |
988 | 993 | Ranges#{UId => [{MtTid, AddSeq}]} |
989 | 994 | end. |
@@ -1043,13 +1048,24 @@ recover_entry(Names, UId, {Idx, Term, _}, SmallestIdx, |
1043 | 1048 | handle_trunc(false, _UId, _Idx, State) -> |
1044 | 1049 | State; |
1045 | 1050 | handle_trunc(true, UId, Idx, #recovery{mode = Mode, |
| 1051 | + ranges = Ranges0, |
1046 | 1052 | tables = Tbls} = State) -> |
1047 | 1053 | case Tbls of |
1048 | 1054 | #{UId := Mt0} when Mode == initial -> |
1049 | 1055 | %% only meddle with mem table data in initial mode |
1050 | 1056 | {Specs, Mt} = ra_mt:set_first(Idx-1, Mt0), |
1051 | 1057 | [_ = ra_mt:delete(Spec) || Spec <- Specs], |
1052 | | - State#recovery{tables = Tbls#{UId => Mt}}; |
| 1058 | + Ranges = case Ranges0 of |
| 1059 | + #{UId := Seqs0} -> |
| 1060 | + Seqs = [{T, ra_seq:floor(Idx, Seq)} |
| 1061 | + || {T, Seq} <- Seqs0], |
| 1062 | + Ranges0#{UId => Seqs}; |
| 1063 | + _ -> |
| 1064 | + Ranges0 |
| 1065 | + end, |
| 1066 | + |
| 1067 | + State#recovery{tables = Tbls#{UId => Mt}, |
| 1068 | + ranges = Ranges}; |
1053 | 1069 | _ -> |
1054 | 1070 | State |
1055 | 1071 | end. |
|
0 commit comments