Skip to content

Commit 50cbe19

Browse files
committed
hashmail: update to user structured logs
This will make querying by stream IDs more uniform (and hence, way easier).
1 parent 11caa66 commit 50cbe19

File tree

1 file changed

+61
-51
lines changed

1 file changed

+61
-51
lines changed

hashmail_server.go

Lines changed: 61 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"github.com/btcsuite/btclog"
78
"io"
89
"sync"
910
"time"
@@ -104,8 +105,8 @@ func (r *readStream) ReadNextMsg(ctx context.Context) ([]byte, error) {
104105

105106
// ReturnStream gives up the read stream by passing it back up through the
106107
// payment stream.
107-
func (r *readStream) ReturnStream() {
108-
log.Debugf("Returning read stream %x", r.parentStream.id[:])
108+
func (r *readStream) ReturnStream(ctx context.Context) {
109+
log.DebugS(ctx, "Returning read stream")
109110
r.parentStream.ReturnReadStream(r)
110111
}
111112

@@ -193,7 +194,7 @@ type stream struct {
193194
}
194195

195196
// newStream creates a new stream independent of any given stream ID.
196-
func newStream(id streamID, limiter *rate.Limiter,
197+
func newStream(ctx context.Context, id streamID, limiter *rate.Limiter,
197198
equivAuth func(auth *hashmailrpc.CipherBoxAuth) error,
198199
onStale func() error, staleTimeout time.Duration) *stream {
199200

@@ -210,7 +211,7 @@ func newStream(id streamID, limiter *rate.Limiter,
210211
id: id,
211212
equivAuth: equivAuth,
212213
limiter: limiter,
213-
status: newStreamStatus(onStale, staleTimeout),
214+
status: newStreamStatus(ctx, onStale, staleTimeout),
214215
readBytesChan: make(chan []byte),
215216
readErrChan: make(chan error, 1),
216217
quit: make(chan struct{}),
@@ -305,8 +306,8 @@ func (s *stream) ReturnWriteStream(w *writeStream) {
305306
// RequestReadStream attempts to request the read stream from the main backing
306307
// stream. If we're unable to obtain it before the timeout, then an error is
307308
// returned.
308-
func (s *stream) RequestReadStream() (*readStream, error) {
309-
log.Tracef("HashMailStream(%x): requesting read stream", s.id[:])
309+
func (s *stream) RequestReadStream(ctx context.Context) (*readStream, error) {
310+
log.TraceS(ctx, "Requested read stream")
310311

311312
select {
312313
case r := <-s.readStreamChan:
@@ -320,8 +321,8 @@ func (s *stream) RequestReadStream() (*readStream, error) {
320321
// RequestWriteStream attempts to request the read stream from the main backing
321322
// stream. If we're unable to obtain it before the timeout, then an error is
322323
// returned.
323-
func (s *stream) RequestWriteStream() (*writeStream, error) {
324-
log.Tracef("HashMailStream(%x): requesting write stream", s.id[:])
324+
func (s *stream) RequestWriteStream(ctx context.Context) (*writeStream, error) {
325+
log.TraceS(ctx, "Requesting write stream")
325326

326327
select {
327328
case w := <-s.writeStreamChan:
@@ -389,8 +390,10 @@ func (h *hashMailServer) Stop() {
389390
}
390391

391392
// tearDownStaleStream can be used to tear down a stale mailbox stream.
392-
func (h *hashMailServer) tearDownStaleStream(id streamID) error {
393-
log.Debugf("Tearing down stale HashMail stream: id=%x", id)
393+
func (h *hashMailServer) tearDownStaleStream(ctx context.Context,
394+
id streamID) error {
395+
396+
log.DebugS(ctx, "Tearing down stale HashMail stream")
394397

395398
h.Lock()
396399
defer h.Unlock()
@@ -428,15 +431,15 @@ func (h *hashMailServer) ValidateStreamAuth(ctx context.Context,
428431
}
429432

430433
// InitStream attempts to initialize a new stream given a valid descriptor.
431-
func (h *hashMailServer) InitStream(
434+
func (h *hashMailServer) InitStream(ctx context.Context,
432435
init *hashmailrpc.CipherBoxAuth) (*hashmailrpc.CipherInitResp, error) {
433436

434437
h.Lock()
435438
defer h.Unlock()
436439

437440
streamID := newStreamID(init.Desc.StreamId)
438441

439-
log.Debugf("Creating new HashMail Stream: %x", streamID)
442+
log.DebugS(ctx, "Creating new HashMail Stream")
440443

441444
// The stream is already active, and we only allow a single session for
442445
// a given stream to exist.
@@ -452,10 +455,11 @@ func (h *hashMailServer) InitStream(
452455
rate.Every(h.cfg.msgRate), h.cfg.msgBurstAllowance,
453456
)
454457
freshStream := newStream(
455-
streamID, limiter, func(auth *hashmailrpc.CipherBoxAuth) error {
458+
ctx, streamID, limiter,
459+
func(auth *hashmailrpc.CipherBoxAuth) error {
456460
return nil
457461
}, func() error {
458-
return h.tearDownStaleStream(streamID)
462+
return h.tearDownStaleStream(ctx, streamID)
459463
}, h.cfg.staleTimeout,
460464
)
461465

@@ -470,7 +474,9 @@ func (h *hashMailServer) InitStream(
470474

471475
// LookUpReadStream attempts to loop up a new stream. If the stream is found, then
472476
// the stream is marked as being active. Otherwise, an error is returned.
473-
func (h *hashMailServer) LookUpReadStream(streamID []byte) (*readStream, error) {
477+
func (h *hashMailServer) LookUpReadStream(ctx context.Context,
478+
streamID []byte) (*readStream, error) {
479+
474480
h.RLock()
475481
defer h.RUnlock()
476482

@@ -479,12 +485,13 @@ func (h *hashMailServer) LookUpReadStream(streamID []byte) (*readStream, error)
479485
return nil, fmt.Errorf("stream not found")
480486
}
481487

482-
return stream.RequestReadStream()
488+
return stream.RequestReadStream(ctx)
483489
}
484490

485491
// LookUpWriteStream attempts to loop up a new stream. If the stream is found,
486492
// then the stream is marked as being active. Otherwise, an error is returned.
487-
func (h *hashMailServer) LookUpWriteStream(streamID []byte) (*writeStream, error) {
493+
func (h *hashMailServer) LookUpWriteStream(ctx context.Context,
494+
streamID []byte) (*writeStream, error) {
488495

489496
h.RLock()
490497
defer h.RUnlock()
@@ -494,7 +501,7 @@ func (h *hashMailServer) LookUpWriteStream(streamID []byte) (*writeStream, error
494501
return nil, fmt.Errorf("stream not found")
495502
}
496503

497-
return stream.RequestWriteStream()
504+
return stream.RequestWriteStream(ctx)
498505
}
499506

500507
// TearDownStream attempts to tear down a stream which renders both sides of
@@ -523,8 +530,7 @@ func (h *hashMailServer) TearDownStream(ctx context.Context, streamID []byte,
523530
return err
524531
}
525532

526-
log.Debugf("Tearing down HashMail stream: id=%x, auth=%v",
527-
auth.Desc.StreamId, auth.Auth)
533+
log.DebugS(ctx, "Tearing down HashMail stream", "auth", auth.Auth)
528534

529535
// At this point we know the auth was valid, so we'll tear down the
530536
// stream.
@@ -568,16 +574,16 @@ func (h *hashMailServer) NewCipherBox(ctx context.Context,
568574
return nil, err
569575
}
570576

571-
log.Debugf("New HashMail stream init: id=%x, auth=%v",
572-
init.Desc.StreamId, init.Auth)
577+
ctxl := btclog.WithCtx(ctx, btclog.Hex("stream_id", init.Desc.StreamId))
573578

574-
if err := h.ValidateStreamAuth(ctx, init); err != nil {
575-
log.Debugf("Stream creation validation failed (id=%x): %v",
576-
init.Desc.StreamId, err)
579+
log.DebugS(ctxl, "New HashMail stream init", "auth", init.Auth)
580+
581+
if err := h.ValidateStreamAuth(ctxl, init); err != nil {
582+
log.DebugS(ctxl, "Stream creation validation failed", err)
577583
return nil, err
578584
}
579585

580-
resp, err := h.InitStream(init)
586+
resp, err := h.InitStream(ctxl, init)
581587
if err != nil {
582588
return nil, err
583589
}
@@ -597,8 +603,9 @@ func (h *hashMailServer) DelCipherBox(ctx context.Context,
597603
return nil, err
598604
}
599605

600-
log.Debugf("New HashMail stream deletion: id=%x, auth=%v",
601-
auth.Desc.StreamId, auth.Auth)
606+
ctxl := btclog.WithCtx(ctx, btclog.Hex("stream_id", auth.Desc.StreamId))
607+
608+
log.DebugS(ctxl, "New HashMail stream deletion", "auth", auth.Auth)
602609

603610
if err := h.TearDownStream(ctx, auth.Desc.StreamId, auth); err != nil {
604611
return nil, err
@@ -610,7 +617,7 @@ func (h *hashMailServer) DelCipherBox(ctx context.Context,
610617
// SendStream implements the client streaming call to utilize the write end of
611618
// a stream to send a message to the read end.
612619
func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamServer) error {
613-
log.Debugf("New HashMail write stream pending...")
620+
log.Debug("New HashMail write stream pending...")
614621

615622
// We'll need to receive the first message in order to determine if
616623
// this stream exists or not
@@ -621,6 +628,9 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
621628
return err
622629
}
623630

631+
ctx := btclog.WithCtx(readStream.Context(),
632+
btclog.Hex("stream_id", cipherBox.Desc.StreamId))
633+
624634
switch {
625635
case cipherBox.Desc == nil:
626636
return fmt.Errorf("cipher box descriptor required")
@@ -629,12 +639,11 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
629639
return fmt.Errorf("stream_id required")
630640
}
631641

632-
log.Debugf("New HashMail write stream: id=%x",
633-
cipherBox.Desc.StreamId)
642+
log.DebugS(ctx, "New HashMail write stream")
634643

635644
// Now that we have the first message, we can attempt to look up the
636645
// given stream.
637-
writeStream, err := h.LookUpWriteStream(cipherBox.Desc.StreamId)
646+
writeStream, err := h.LookUpWriteStream(ctx, cipherBox.Desc.StreamId)
638647
if err != nil {
639648
return err
640649
}
@@ -643,13 +652,12 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
643652
// write inactive if the client hangs up on their end.
644653
defer writeStream.ReturnStream()
645654

646-
log.Tracef("Sending msg_len=%v to stream_id=%x", len(cipherBox.Msg),
647-
cipherBox.Desc.StreamId)
655+
log.TraceS(ctx, "Sending message to stream",
656+
"msg_len", len(cipherBox.Msg))
648657

649658
// We'll send the first message into the stream, then enter our loop
650659
// below to continue to read from the stream and send it to the read
651660
// end.
652-
ctx := readStream.Context()
653661
if err := writeStream.WriteMsg(ctx, cipherBox.Msg); err != nil {
654662
return err
655663
}
@@ -659,7 +667,7 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
659667
// exit before shutting down.
660668
select {
661669
case <-ctx.Done():
662-
log.Debugf("SendStream: Context done, exiting")
670+
log.DebugS(ctx, "SendStream: Context done, exiting")
663671
return nil
664672
case <-h.quit:
665673
return fmt.Errorf("server shutting down")
@@ -669,13 +677,13 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
669677

670678
cipherBox, err := readStream.Recv()
671679
if err != nil {
672-
log.Debugf("SendStream: Exiting write stream RPC "+
673-
"stream read: %v", err)
680+
log.DebugS(ctx, "SendStream: Exiting write stream RPC "+
681+
"stream read", err)
674682
return err
675683
}
676684

677-
log.Tracef("Sending msg_len=%v to stream_id=%x",
678-
len(cipherBox.Msg), cipherBox.Desc.StreamId)
685+
log.TraceS(ctx, "Sending message to stream",
686+
"msg_len", len(cipherBox.Msg))
679687

680688
if err := writeStream.WriteMsg(ctx, cipherBox.Msg); err != nil {
681689
return err
@@ -689,25 +697,28 @@ func (h *hashMailServer) SendStream(readStream hashmailrpc.HashMail_SendStreamSe
689697
func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
690698
reader hashmailrpc.HashMail_RecvStreamServer) error {
691699

700+
ctx := btclog.WithCtx(reader.Context(),
701+
btclog.Hex("stream_id", desc.StreamId))
702+
692703
// First, we'll attempt to locate the stream. We allow any single
693704
// entity that knows of the full stream ID to access the read end.
694-
readStream, err := h.LookUpReadStream(desc.StreamId)
705+
readStream, err := h.LookUpReadStream(ctx, desc.StreamId)
695706
if err != nil {
696707
return err
697708
}
698709

699-
log.Debugf("New HashMail read stream: id=%x", desc.StreamId)
710+
log.DebugS(ctx, "New HashMail read stream")
700711

701712
// If the reader hangs up, then we'll mark the stream as inactive so
702713
// another can take its place.
703-
defer readStream.ReturnStream()
714+
defer readStream.ReturnStream(ctx)
704715

705716
for {
706717
// Check to see if the stream has been closed or if we need to
707-
// exit before shutting down.
718+
// exit before shutting d[own.
708719
select {
709720
case <-reader.Context().Done():
710-
log.Debugf("Read stream context done.")
721+
log.DebugS(ctx, "Read stream context done.")
711722
return nil
712723
case <-h.quit:
713724
return fmt.Errorf("server shutting down")
@@ -717,12 +728,11 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
717728

718729
nextMsg, err := readStream.ReadNextMsg(reader.Context())
719730
if err != nil {
720-
log.Debugf("Got error an read stream read: %v", err)
731+
log.ErrorS(ctx, "Got error on read stream read", err)
721732
return err
722733
}
723734

724-
log.Tracef("Read %v bytes for HashMail stream_id=%x",
725-
len(nextMsg), desc.StreamId)
735+
log.TraceS(ctx, "Read bytes", "msg_len", len(nextMsg))
726736

727737
// In order not to duplicate metric data, we only record this
728738
// read if its streamID is odd. We use the base stream ID as the
@@ -742,7 +752,7 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc,
742752
Msg: nextMsg,
743753
})
744754
if err != nil {
745-
log.Debugf("Got error when sending on read stream: %v",
755+
log.DebugS(ctx, "Got error when sending on read stream",
746756
err)
747757
return err
748758
}
@@ -767,7 +777,7 @@ type streamStatus struct {
767777
}
768778

769779
// newStreamStatus constructs a new streamStatus instance.
770-
func newStreamStatus(onStale func() error,
780+
func newStreamStatus(ctx context.Context, onStale func() error,
771781
staleTimeout time.Duration) *streamStatus {
772782

773783
if staleTimeout < 0 {
@@ -778,7 +788,7 @@ func newStreamStatus(onStale func() error,
778788

779789
staleTimer := time.AfterFunc(staleTimeout, func() {
780790
if err := onStale(); err != nil {
781-
log.Errorf("error in onStale callback: %v", err)
791+
log.ErrorS(ctx, "Error from onStale callback", err)
782792
}
783793
})
784794

0 commit comments

Comments
 (0)