1+ use bytes:: BytesMut ;
12use std:: ffi:: { c_int, c_void, CStr } ;
23use std:: fs:: { remove_dir_all, File , OpenOptions } ;
34use std:: io:: Write ;
@@ -8,21 +9,18 @@ use std::sync::Arc;
89
910use anyhow:: { bail, ensure} ;
1011use bytemuck:: { bytes_of, pod_read_unaligned, Pod , Zeroable } ;
11- use bytes:: { Bytes , BytesMut } ;
1212use parking_lot:: RwLock ;
1313use sqld_libsql_bindings:: init_static_wal_method;
1414use tokio:: sync:: watch;
1515use uuid:: Uuid ;
1616
17- #[ cfg( feature = "bottomless" ) ]
18- use crate :: libsql:: ffi:: SQLITE_IOERR_WRITE ;
1917use crate :: libsql:: ffi:: {
2018 sqlite3,
2119 types:: { XWalCheckpointFn , XWalFrameFn , XWalSavePointUndoFn , XWalUndoFn } ,
22- PageHdrIter , PgHdr , Wal , SQLITE_CHECKPOINT_TRUNCATE , SQLITE_IOERR , SQLITE_OK ,
20+ PageHdrIter , PgHdr , Wal , SQLITE_CHECKPOINT_TRUNCATE , SQLITE_IOERR_WRITE , SQLITE_OK ,
2321} ;
2422use crate :: libsql:: wal_hook:: WalHook ;
25- use crate :: replication:: frame:: { Frame , FrameHeader } ;
23+ use crate :: replication:: frame:: { Frame , FrameHeader , FrameLocation , FrameRef } ;
2624use crate :: replication:: snapshot:: { find_snapshot_file, LogCompactor , SnapshotFile } ;
2725use crate :: replication:: { FrameNo , CRC_64_GO_ISO , WAL_MAGIC , WAL_PAGE_SIZE } ;
2826
@@ -75,20 +73,22 @@ unsafe impl WalHook for ReplicationLoggerHook {
7573 orig : XWalFrameFn ,
7674 ) -> c_int {
7775 assert_eq ! ( page_size, 4096 ) ;
76+ let mut frame_no = wal. hdr . mxFrame + 1 ;
7877 let wal_ptr = wal as * mut _ ;
7978 #[ cfg( feature = "bottomless" ) ]
8079 let last_valid_frame = wal. hdr . mxFrame ;
8180 #[ cfg( feature = "bottomless" ) ]
8281 let frame_checksum = wal. hdr . aFrameCksum ;
8382 let ctx = Self :: wal_extract_ctx ( wal) ;
8483
85- for ( page_no, data) in PageHdrIter :: new ( page_headers, page_size as _ ) {
86- ctx. write_frame ( page_no, data)
84+ for ( page_no, _data) in PageHdrIter :: new ( page_headers, page_size as _ ) {
85+ ctx. write_frame ( page_no, FrameLocation :: in_wal_file ( frame_no) ) ;
86+ frame_no += 1 ;
8787 }
8888 if let Err ( e) = ctx. flush ( ntruncate) {
8989 tracing:: error!( "error writing to replication log: {e}" ) ;
9090 // returning IO_ERR ensure that xUndo will be called by sqlite.
91- return SQLITE_IOERR ;
91+ return SQLITE_IOERR_WRITE ;
9292 }
9393
9494 // FIXME: instead of block_on, we should consider replicating asynchronously in the background,
@@ -294,7 +294,7 @@ pub struct WalPage {
294294 pub page_no : u32 ,
295295 /// 0 for non-commit frames
296296 pub size_after : u32 ,
297- pub data : Bytes ,
297+ pub data : FrameLocation ,
298298}
299299
300300impl ReplicationLoggerHookCtx {
@@ -314,11 +314,11 @@ impl ReplicationLoggerHookCtx {
314314 }
315315 }
316316
317- fn write_frame ( & mut self , page_no : u32 , data : & [ u8 ] ) {
317+ fn write_frame ( & mut self , page_no : u32 , data : FrameLocation ) {
318318 let entry = WalPage {
319319 page_no,
320320 size_after : 0 ,
321- data : Bytes :: copy_from_slice ( data ) ,
321+ data,
322322 } ;
323323 self . buffer . push ( entry) ;
324324 }
@@ -352,6 +352,7 @@ impl ReplicationLoggerHookCtx {
352352#[ derive( Debug ) ]
353353pub struct LogFile {
354354 file : File ,
355+ main_db_path : PathBuf , // actual data of the logged frames resides either in the main db file or the wal file
355356 pub header : LogFileHeader ,
356357 /// the maximum number of frames this log is allowed to contain before it should be compacted.
357358 max_log_frame_count : u64 ,
@@ -379,9 +380,16 @@ pub enum LogReadError {
379380
380381impl LogFile {
381382 /// size of a single frame
383+ /// FIXME: LogFile should only ever use references -> what to do with snapshots?
382384 pub const FRAME_SIZE : usize = size_of :: < FrameHeader > ( ) + WAL_PAGE_SIZE as usize ;
385+ /// size of a single frame reference
386+ pub const FRAME_REF_SIZE : usize = size_of :: < FrameRef > ( ) ;
383387
384- pub fn new ( file : File , max_log_frame_count : u64 ) -> anyhow:: Result < Self > {
388+ pub fn new (
389+ file : File ,
390+ main_db_path : PathBuf ,
391+ max_log_frame_count : u64 ,
392+ ) -> anyhow:: Result < Self > {
385393 // FIXME: we should probably take a lock on this file, to prevent anybody else to write to
386394 // it.
387395 let file_end = file. metadata ( ) ?. len ( ) ;
@@ -401,6 +409,7 @@ impl LogFile {
401409
402410 let mut this = Self {
403411 file,
412+ main_db_path,
404413 header,
405414 max_log_frame_count,
406415 uncommitted_frame_count : 0 ,
@@ -415,6 +424,7 @@ impl LogFile {
415424 let header = Self :: read_header ( & file) ?;
416425 let mut this = Self {
417426 file,
427+ main_db_path,
418428 header,
419429 max_log_frame_count,
420430 uncommitted_frame_count : 0 ,
@@ -504,30 +514,33 @@ impl LogFile {
504514 } ) )
505515 }
506516
507- fn compute_checksum ( & self , page : & WalPage ) -> u64 {
508- let mut digest = CRC_64_GO_ISO . digest_with_initial ( self . uncommitted_checksum ) ;
509- digest. update ( & page. data ) ;
517+ fn compute_checksum ( & self , _page : & WalPage ) -> u64 {
518+ let digest = CRC_64_GO_ISO . digest_with_initial ( self . uncommitted_checksum ) ;
519+ // FIXME: we should either read the page from its location and compute checksum,
520+ // or just rely on the fact that the page is already checksummed by WAL or the main db file.
521+ //digest.update(&page.data);
510522 digest. finalize ( )
511523 }
512524
513525 pub fn push_page ( & mut self , page : & WalPage ) -> anyhow:: Result < ( ) > {
514526 let checksum = self . compute_checksum ( page) ;
515- let frame = Frame :: from_parts (
516- & FrameHeader {
517- frame_no : self . next_frame_no ( ) ,
518- checksum,
519- page_no : page. page_no ,
520- size_after : page. size_after ,
521- } ,
522- & page. data ,
523- ) ;
527+ let header = FrameHeader {
528+ frame_no : self . next_frame_no ( ) ,
529+ checksum,
530+ page_no : page. page_no ,
531+ size_after : page. size_after ,
532+ } ;
533+
534+ let frame_ref = FrameRef :: new ( header, page. data ) ;
524535
525536 let byte_offset = self . next_byte_offset ( ) ;
526- tracing:: trace!(
527- "writing frame {} at offset {byte_offset}" ,
528- frame. header( ) . frame_no
537+ let data = frame_ref. as_bytes ( ) ;
538+ tracing:: warn!(
539+ "writing frame {} at offset {byte_offset}, size {}" ,
540+ frame_ref. header. frame_no,
541+ data. len( )
529542 ) ;
530- self . file . write_all_at ( frame . as_slice ( ) , byte_offset) ?;
543+ self . file . write_all_at ( & data , byte_offset) ?;
531544
532545 self . uncommitted_frame_count += 1 ;
533546 self . uncommitted_checksum = checksum;
@@ -546,7 +559,7 @@ impl LogFile {
546559
547560 /// Returns the bytes position of the `nth` entry in the log
548561 fn absolute_byte_offset ( nth : u64 ) -> u64 {
549- std:: mem:: size_of :: < LogFileHeader > ( ) as u64 + nth * Self :: FRAME_SIZE as u64
562+ std:: mem:: size_of :: < LogFileHeader > ( ) as u64 + nth * Self :: FRAME_REF_SIZE as u64
550563 }
551564
552565 fn byte_offset ( & self , id : FrameNo ) -> anyhow:: Result < Option < u64 > > {
@@ -602,7 +615,8 @@ impl LogFile {
602615 . write ( true )
603616 . create ( true )
604617 . open ( & temp_log_path) ?;
605- let mut new_log_file = LogFile :: new ( file, self . max_log_frame_count ) ?;
618+ let mut new_log_file =
619+ LogFile :: new ( file, self . main_db_path . clone ( ) , self . max_log_frame_count ) ?;
606620 let new_header = LogFileHeader {
607621 start_frame_no : self . header . start_frame_no + self . header . frame_count ,
608622 frame_count : 0 ,
@@ -620,11 +634,40 @@ impl LogFile {
620634 }
621635
622636 fn read_frame_byte_offset ( & self , offset : u64 ) -> anyhow:: Result < Frame > {
623- let mut buffer = BytesMut :: zeroed ( LogFile :: FRAME_SIZE ) ;
637+ let mut buffer = BytesMut :: zeroed ( LogFile :: FRAME_REF_SIZE ) ;
624638 self . file . read_exact_at ( & mut buffer, offset) ?;
625- let buffer = buffer. freeze ( ) ;
639+ tracing:: trace!( "Buffer size {}" , buffer. len( ) ) ;
640+ let frame_ref = FrameRef :: try_from_bytes ( buffer. freeze ( ) ) ?;
641+
642+ tracing:: trace!( "Frame reference: {frame_ref:?}" ) ;
643+
644+ let mut frame_data = [ 0 ; WAL_PAGE_SIZE as usize ] ;
645+ match frame_ref. location {
646+ FrameLocation {
647+ frame_no : FrameLocation :: IN_MAIN_DB_FILE ,
648+ } => {
649+ let main_db_file = std:: fs:: File :: open ( & self . main_db_path ) ?;
650+ main_db_file. read_exact_at (
651+ & mut frame_data,
652+ frame_ref. header . page_no as u64 * WAL_PAGE_SIZE as u64 ,
653+ ) ?;
654+ }
655+ FrameLocation { frame_no } => {
656+ tracing:: trace!( "Reading {}" , self . main_db_path. join( "data-wal" ) . display( ) ) ;
657+ let wal_file = std:: fs:: File :: open ( self . main_db_path . join ( "data-wal" ) ) ?;
658+ // FIXME: this is *not* the correct way to read a frame from the wal file.
659+ // It needs to take into account the wal file header, and the wal page headers.
660+ wal_file. read_exact_at ( & mut frame_data, Self :: offset_in_wal ( frame_no) ) ?;
661+ }
662+ }
663+
664+ // FIXME: memory copy, easy enough to avoid
665+ Ok ( Frame :: from_parts ( & frame_ref. header , & frame_data) )
666+ }
626667
627- Frame :: try_from_bytes ( buffer)
668+ // The offset of frame `frame_no` in the libSQL WAL file
669+ fn offset_in_wal ( frame_no : u32 ) -> u64 {
670+ 32 + ( ( frame_no - 1 ) as u64 ) * ( WAL_PAGE_SIZE as u64 + 24 )
628671 }
629672
630673 fn last_commited_frame_no ( & self ) -> Option < FrameNo > {
@@ -639,7 +682,7 @@ impl LogFile {
639682 let max_log_frame_count = self . max_log_frame_count ;
640683 // truncate file
641684 self . file . set_len ( 0 ) ?;
642- Self :: new ( self . file , max_log_frame_count)
685+ Self :: new ( self . file , self . main_db_path , max_log_frame_count)
643686 }
644687}
645688
@@ -754,7 +797,7 @@ impl ReplicationLogger {
754797 . open ( log_path) ?;
755798
756799 let max_log_frame_count = max_log_size * 1_000_000 / LogFile :: FRAME_SIZE as u64 ;
757- let log_file = LogFile :: new ( file, max_log_frame_count) ?;
800+ let log_file = LogFile :: new ( file, db_path . to_owned ( ) , max_log_frame_count) ?;
758801 let header = log_file. header ( ) ;
759802
760803 let should_recover = if header. version < 2 || header. sqld_version ( ) != Version :: current ( ) {
@@ -798,21 +841,18 @@ impl ReplicationLogger {
798841 // best effort, there may be no snapshots
799842 let _ = remove_dir_all ( snapshot_path) ;
800843
801- let data_file = File :: open ( & data_path) ?;
802844 let size = data_path. metadata ( ) ?. len ( ) ;
803845 assert ! (
804846 size % WAL_PAGE_SIZE as u64 == 0 ,
805847 "database file size is not a multiple of page size"
806848 ) ;
807849 let num_page = size / WAL_PAGE_SIZE as u64 ;
808- let mut buf = [ 0 ; WAL_PAGE_SIZE as usize ] ;
809850 let mut page_no = 1 ; // page numbering starts at 1
810851 for i in 0 ..num_page {
811- data_file. read_exact_at ( & mut buf, i * WAL_PAGE_SIZE as u64 ) ?;
812852 log_file. push_page ( & WalPage {
813853 page_no,
814854 size_after : if i == num_page - 1 { num_page as _ } else { 0 } ,
815- data : Bytes :: copy_from_slice ( & buf ) ,
855+ data : FrameLocation :: in_main_db_file ( ) , // log recovery is performed from the main db file
816856 } ) ?;
817857 log_file. commit ( ) ?;
818858
@@ -918,7 +958,7 @@ mod test {
918958 . map ( |i| WalPage {
919959 page_no : i,
920960 size_after : 0 ,
921- data : Bytes :: from ( vec ! [ i as _ ; 4096 ] ) ,
961+ data : FrameLocation :: in_main_db_file ( ) ,
922962 } )
923963 . collect :: < Vec < _ > > ( ) ;
924964 logger. write_pages ( & frames) . unwrap ( ) ;
@@ -953,7 +993,7 @@ mod test {
953993 let entry = WalPage {
954994 page_no : 0 ,
955995 size_after : 0 ,
956- data : vec ! [ 0 ; 3 ] . into ( ) ,
996+ data : FrameLocation :: in_main_db_file ( ) ,
957997 } ;
958998
959999 logger. write_pages ( & [ entry] ) . unwrap ( ) ;
@@ -962,13 +1002,14 @@ mod test {
9621002
9631003 #[ test]
9641004 fn log_file_test_rollback ( ) {
1005+ let db = tempfile:: tempdir ( ) . unwrap ( ) ;
9651006 let f = tempfile:: tempfile ( ) . unwrap ( ) ;
966- let mut log_file = LogFile :: new ( f, 100 ) . unwrap ( ) ;
1007+ let mut log_file = LogFile :: new ( f, db . path ( ) . to_owned ( ) , 100 ) . unwrap ( ) ;
9671008 ( 0 ..5 )
9681009 . map ( |i| WalPage {
9691010 page_no : i,
9701011 size_after : 5 ,
971- data : Bytes :: from_static ( & [ 1 ; 4096 ] ) ,
1012+ data : FrameLocation :: in_main_db_file ( ) , // FIXME: actually fill the fake main db file with data
9721013 } )
9731014 . for_each ( |p| {
9741015 log_file. push_page ( & p) . unwrap ( ) ;
@@ -982,7 +1023,7 @@ mod test {
9821023 . map ( |i| WalPage {
9831024 page_no : i,
9841025 size_after : 5 ,
985- data : Bytes :: from_static ( & [ 1 ; 4096 ] ) ,
1026+ data : FrameLocation :: in_main_db_file ( ) ,
9861027 } )
9871028 . for_each ( |p| {
9881029 log_file. push_page ( & p) . unwrap ( ) ;
@@ -995,7 +1036,7 @@ mod test {
9951036 . push_page ( & WalPage {
9961037 page_no : 42 ,
9971038 size_after : 5 ,
998- data : Bytes :: from_static ( & [ 1 ; 4096 ] ) ,
1039+ data : FrameLocation :: in_main_db_file ( ) ,
9991040 } )
10001041 . unwrap ( ) ;
10011042
0 commit comments