Skip to content

Commit d6e924b

Browse files
authored
Lagrer innkommende signalmeldinger og hendelser (#170)
* Lagrer innkommende signalmeldinger og hendelser * Oppdatert hendelsestype og flyttet lagring av melding --------- Co-authored-by: Thomas Burnett <[email protected]>
1 parent 9934111 commit d6e924b

File tree

4 files changed

+80
-16
lines changed

4 files changed

+80
-16
lines changed

ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/App.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ fun main() = SuspendApp {
115115
resourceScope {
116116
launchSignalReceiver(
117117
config = config,
118-
cpaValidationService = cpaValidationService
118+
cpaValidationService = cpaValidationService,
119+
eventRegistrationService = eventRegistrationService
119120
)
120121
launchPayloadReceiver(
121122
config = config,
@@ -183,12 +184,14 @@ private fun CoroutineScope.launchPayloadReceiver(
183184

184185
private fun CoroutineScope.launchSignalReceiver(
185186
config: Config,
186-
cpaValidationService: CPAValidationService
187+
cpaValidationService: CPAValidationService,
188+
eventRegistrationService: EventRegistrationService
187189
) {
188190
if (config.kafkaSignalReceiver.active) {
189191
launch(Dispatchers.IO) {
190192
val signalProcessor = SignalMessageService(
191-
cpaValidationService
193+
cpaValidationService,
194+
eventRegistrationService
192195
)
193196
startSignalReceiver(config.kafkaSignalReceiver.topic, config.kafka, signalProcessor)
194197
}

ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/processing/SignalMessageService.kt

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,39 @@ package no.nav.emottak.ebms.async.processing
22

33
import kotlinx.coroutines.Dispatchers
44
import kotlinx.coroutines.withContext
5+
import kotlinx.serialization.encodeToString
6+
import kotlinx.serialization.json.Json
7+
import no.nav.emottak.ebms.async.configuration.config
58
import no.nav.emottak.ebms.async.log
9+
import no.nav.emottak.ebms.async.util.EventRegistrationService
610
import no.nav.emottak.ebms.validation.CPAValidationService
711
import no.nav.emottak.message.model.Acknowledgment
812
import no.nav.emottak.message.model.EbMSDocument
913
import no.nav.emottak.message.model.MessageError
1014
import no.nav.emottak.message.xml.getDocumentBuilder
1115
import no.nav.emottak.util.marker
16+
import no.nav.emottak.utils.common.parseOrGenerateUuid
17+
import no.nav.emottak.utils.kafka.model.EventDataType
18+
import no.nav.emottak.utils.kafka.model.EventType
1219
import java.io.ByteArrayInputStream
1320

1421
class SignalMessageService(
15-
val cpaValidationService: CPAValidationService
22+
val cpaValidationService: CPAValidationService,
23+
val eventRegistrationService: EventRegistrationService
1624
) {
1725

1826
suspend fun processSignal(requestId: String, content: ByteArray) {
1927
try {
2028
val ebxmlSignalMessage = createEbmsMessage(requestId, content)
29+
eventRegistrationService.registerEventMessageDetails(ebxmlSignalMessage)
30+
eventRegistrationService.registerEvent(
31+
eventType = EventType.MESSAGE_READ_FROM_QUEUE,
32+
requestId = ebxmlSignalMessage.requestId.parseOrGenerateUuid(),
33+
messageId = ebxmlSignalMessage.messageId,
34+
eventData = Json.encodeToString(
35+
mapOf(EventDataType.QUEUE_NAME.value to config().kafkaSignalReceiver.topic)
36+
)
37+
)
2138
cpaValidationService.validateIncomingMessage(ebxmlSignalMessage)
2239
when (ebxmlSignalMessage) {
2340
is Acknowledgment -> {
@@ -26,11 +43,14 @@ class SignalMessageService(
2643
is MessageError -> {
2744
processMessageError(ebxmlSignalMessage)
2845
}
29-
else -> log.warn(ebxmlSignalMessage.marker(), "Cannot process message as signal message: $requestId")
46+
else -> {
47+
log.warn(ebxmlSignalMessage.marker(), "Cannot process message as signal message: $requestId")
48+
throw RuntimeException("Cannot process message as signal message: $requestId")
49+
}
3050
}
3151
} catch (e: Exception) {
32-
// TODO Clearer error handling
3352
log.error("Error processing signal requestId $requestId", e)
53+
throw e
3454
}
3555
}
3656

@@ -49,16 +69,20 @@ class SignalMessageService(
4969
log.info(acknowledgment.marker(), "Got acknowledgment with requestId <${acknowledgment.requestId}>")
5070
}
5171

52-
private fun processMessageError(messageError: MessageError) {
72+
private suspend fun processMessageError(messageError: MessageError) {
5373
log.info(messageError.marker(), "Got MessageError with requestId <${messageError.requestId}>")
54-
val messageRef = messageError.refToMessageId
55-
if (messageRef == null) {
56-
log.warn(messageError.marker(), "Received MessageError without message requestId")
57-
return
58-
}
59-
// TODO store events
6074
messageError.feil.forEach { error ->
61-
log.info(messageError.marker(), "Code: ${error.code}, Description: ${error.descriptionText}")
75+
log.warn(messageError.marker(), "Code: ${error.code}, Description: ${error.descriptionText}")
76+
eventRegistrationService.registerEvent(
77+
eventType = EventType.UNKNOWN_ERROR_OCCURRED,
78+
requestId = messageError.requestId.parseOrGenerateUuid(),
79+
messageId = messageError.messageId,
80+
eventData = Json.encodeToString(
81+
mapOf(
82+
EventDataType.ERROR_MESSAGE to "${error.code}: ${error.descriptionText}"
83+
)
84+
)
85+
)
6286
}
6387
}
6488
}

ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/util/EventRegistration.kt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@ import kotlinx.serialization.encodeToString
44
import kotlinx.serialization.json.Json
55
import no.nav.emottak.ebms.async.log
66
import no.nav.emottak.message.model.AsyncPayload
7+
import no.nav.emottak.message.model.EbmsMessage
78
import no.nav.emottak.message.model.PayloadMessage
89
import no.nav.emottak.utils.common.parseOrGenerateUuid
10+
import no.nav.emottak.utils.kafka.model.EbmsMessageDetail
911
import no.nav.emottak.utils.kafka.model.Event
1012
import no.nav.emottak.utils.kafka.model.EventDataType
1113
import no.nav.emottak.utils.kafka.model.EventType
1214
import no.nav.emottak.utils.kafka.service.EventLoggingService
1315
import kotlin.uuid.Uuid
1416

1517
interface EventRegistrationService {
18+
19+
suspend fun registerEventMessageDetails(ebMSMessage: EbmsMessage)
20+
1621
suspend fun registerEvent(
1722
eventType: EventType,
1823
payloadMessage: PayloadMessage,
@@ -48,6 +53,33 @@ class EventRegistrationServiceImpl(
4853
private val eventLoggingService: EventLoggingService
4954
) : EventRegistrationService {
5055

56+
override suspend fun registerEventMessageDetails(ebMSMessage: EbmsMessage) {
57+
log.debug("Registering message with requestId: ${ebMSMessage.requestId}")
58+
59+
try {
60+
val ebMSMessageDetail = EbmsMessageDetail(
61+
requestId = ebMSMessage.requestId.parseOrGenerateUuid(),
62+
cpaId = ebMSMessage.cpaId,
63+
conversationId = ebMSMessage.conversationId,
64+
messageId = ebMSMessage.messageId,
65+
refToMessageId = ebMSMessage.refToMessageId,
66+
fromPartyId = no.nav.emottak.ebms.util.EventRegistrationService.serializePartyId(ebMSMessage.addressing.from.partyId),
67+
fromRole = ebMSMessage.addressing.from.role,
68+
toPartyId = no.nav.emottak.ebms.util.EventRegistrationService.serializePartyId(ebMSMessage.addressing.to.partyId),
69+
toRole = ebMSMessage.addressing.to.role,
70+
service = ebMSMessage.addressing.service,
71+
action = ebMSMessage.addressing.action,
72+
sentAt = ebMSMessage.sentAt
73+
)
74+
log.debug("Publishing message details: $ebMSMessageDetail")
75+
76+
eventLoggingService.logMessageDetails(ebMSMessageDetail)
77+
log.debug("Message details published successfully")
78+
} catch (e: Exception) {
79+
log.error("Error while registering message details: ${e.message}", e)
80+
}
81+
}
82+
5183
override suspend fun registerEvent(
5284
eventType: EventType,
5385
payloadMessage: PayloadMessage,
@@ -144,6 +176,10 @@ class EventRegistrationServiceImpl(
144176
}
145177

146178
class EventRegistrationServiceFake : EventRegistrationService {
179+
override suspend fun registerEventMessageDetails(ebMSMessage: EbmsMessage) {
180+
log.debug("Registering message details for ebMSDocument: $ebMSMessage")
181+
}
182+
147183
override suspend fun registerEvent(
148184
eventType: EventType,
149185
payloadMessage: PayloadMessage,

ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/processing/SignalProcessorTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package no.nav.emottak.ebms.async.processing
22

33
import io.mockk.mockk
44
import kotlinx.coroutines.runBlocking
5+
import no.nav.emottak.ebms.async.util.EventRegistrationServiceFake
56
import no.nav.emottak.ebms.validation.CPAValidationService
67
import org.junit.jupiter.api.Assertions.assertEquals
78
import org.junit.jupiter.api.Disabled
@@ -13,10 +14,10 @@ import javax.xml.bind.UnmarshalException
1314
class SignalProcessorTest {
1415

1516
val cpaValidationService = mockk<CPAValidationService>()
16-
val signalMessageService = SignalMessageService(cpaValidationService)
17+
val eventRegistrationService = EventRegistrationServiceFake()
18+
val signalMessageService = SignalMessageService(cpaValidationService, eventRegistrationService)
1719

1820
@Test
19-
@Disabled
2021
fun `Payload message throws error`() {
2122
val message = this::class.java.classLoader
2223
.getResourceAsStream("signaltest/payloadmessage.xml")

0 commit comments

Comments
 (0)