Skip to content

Commit f11fe92

Browse files
authored
Merge pull request #175 from nikitos/hls_notify
add on_hls notification hook
2 parents 888e74f + 2d810b4 commit f11fe92

File tree

10 files changed

+92
-33
lines changed

10 files changed

+92
-33
lines changed

application/xiu/src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ pub struct HttpNotifierConfig {
189189
pub on_unpublish: Option<String>,
190190
pub on_play: Option<String>,
191191
pub on_stop: Option<String>,
192+
pub on_hls: Option<String>,
192193
}
193194

194195
#[derive(Debug, Deserialize, Clone, Default)]

application/xiu/src/service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ impl Service {
7272
httpnotifier.on_unpublish.clone(),
7373
httpnotifier.on_play.clone(),
7474
httpnotifier.on_stop.clone(),
75+
httpnotifier.on_hls.clone(),
7576
)))
7677
}
7778
} else {

library/streamhub/src/define.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ pub enum StreamHubEvent {
318318
identifier: StreamIdentifier,
319319
sender: InformationSender,
320320
},
321+
OnHls {
322+
identifier: StreamIdentifier,
323+
segment: Segment,
324+
}
321325
}
322326

323327
impl StreamHubEvent {
@@ -445,3 +449,33 @@ pub enum StatisticData {
445449
start_time: DateTime<Local>,
446450
},
447451
}
452+
453+
454+
#[derive(Debug, Clone, Serialize, Deserialize)]
455+
pub struct Segment {
456+
/*ts duration*/
457+
pub duration: i64,
458+
pub discontinuity: bool,
459+
/*ts name*/
460+
pub name: String,
461+
pub path: String,
462+
pub is_eof: bool,
463+
}
464+
465+
impl Segment {
466+
pub fn new(
467+
duration: i64,
468+
discontinuity: bool,
469+
name: String,
470+
path: String,
471+
is_eof: bool,
472+
) -> Self {
473+
Self {
474+
duration,
475+
discontinuity,
476+
name,
477+
path,
478+
is_eof,
479+
}
480+
}
481+
}

library/streamhub/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,11 @@ impl StreamsHub {
774774
log::error!("event_loop request error: {}", err);
775775
}
776776
}
777+
StreamHubEvent::OnHls { identifier: _ , segment: _ } => {
778+
if let Some(notifier) = &self.notifier {
779+
notifier.on_hls_notify(&message).await;
780+
}
781+
}
777782
}
778783
}
779784
}

library/streamhub/src/notify/http.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub struct HttpNotifier {
2323
on_unpublish_url: Option<String>,
2424
on_play_url: Option<String>,
2525
on_stop_url: Option<String>,
26+
on_hls_url: Option<String>,
2627
}
2728

2829
impl HttpNotifier {
@@ -31,13 +32,16 @@ impl HttpNotifier {
3132
on_unpublish_url: Option<String>,
3233
on_play_url: Option<String>,
3334
on_stop_url: Option<String>,
35+
on_hls_url: Option<String>,
36+
3437
) -> Self {
3538
Self {
3639
request_client: reqwest::Client::new(),
3740
on_publish_url,
3841
on_unpublish_url,
3942
on_play_url,
4043
on_stop_url,
44+
on_hls_url,
4145
}
4246
}
4347
}
@@ -119,4 +123,23 @@ impl Notifier for HttpNotifier {
119123
}
120124
}
121125
}
126+
127+
async fn on_hls_notify(&self, event: &StreamHubEventMessage) {
128+
if let Some(on_hls_url) = &self.on_hls_url {
129+
match self
130+
.request_client
131+
.post(on_hls_url)
132+
.body(serialize_event!(event))
133+
.send()
134+
.await
135+
{
136+
Err(err) => {
137+
log::error!("on_hls error: {}", err);
138+
}
139+
Ok(response) => {
140+
log::info!("on_hls success: {:?}", response);
141+
}
142+
}
143+
}
144+
}
122145
}

library/streamhub/src/notify/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ pub trait Notifier: Sync + Send {
99
async fn on_unpublish_notify(&self, event: &StreamHubEventMessage);
1010
async fn on_play_notify(&self, event: &StreamHubEventMessage);
1111
async fn on_stop_notify(&self, event: &StreamHubEventMessage);
12+
async fn on_hls_notify(&self, event: &StreamHubEventMessage);
1213
}

protocol/hls/src/flv2hls.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use {
55
define::{frame_type, FlvData},
66
demuxer::{FlvAudioTagDemuxer, FlvVideoTagDemuxer},
77
},
8+
streamhub::{define::{StreamHubEventSender, StreamHubEvent}, stream::StreamIdentifier},
89
xmpegts::{
910
define::{epsi_stream_type, MPEG_FLAG_IDR_FRAME},
1011
ts::TsMuxer,
@@ -30,7 +31,11 @@ pub struct Flv2HlsRemuxer {
3031
audio_pid: u16,
3132

3233
m3u8_handler: M3u8,
34+
event_producer: Option<StreamHubEventSender>,
35+
app_name: String,
36+
stream_name: String,
3337
aof_ratio: i64,
38+
3439
}
3540

3641
impl Flv2HlsRemuxer {
@@ -39,6 +44,7 @@ impl Flv2HlsRemuxer {
3944
app_name: String,
4045
stream_name: String,
4146
need_record: bool,
47+
event_producer: Option<StreamHubEventSender>,
4248
path: String,
4349
aof_ratio: i64,
4450
) -> Self {
@@ -68,8 +74,10 @@ impl Flv2HlsRemuxer {
6874
video_pid,
6975
audio_pid,
7076

71-
m3u8_handler: M3u8::new(duration, 6, app_name, stream_name, need_record, path),
72-
77+
m3u8_handler: M3u8::new(duration, 6, app_name.clone(), stream_name.clone(), need_record, path),
78+
event_producer,
79+
app_name,
80+
stream_name,
7381
aof_ratio,
7482
}
7583
}
@@ -162,6 +170,18 @@ impl Flv2HlsRemuxer {
162170
}
163171
let data = self.ts_muxer.get_data();
164172

173+
if let Some(segment) = self.m3u8_handler.segments.back() {
174+
let identifier = StreamIdentifier::Rtmp { app_name: self.app_name.clone(), stream_name: self.stream_name.clone() };
175+
let hub_event = StreamHubEvent::OnHls { identifier: identifier.clone(), segment: segment.clone() };
176+
if let Some(producer) = self.event_producer.clone() {
177+
if let Err(err) = producer.send(hub_event) {
178+
log::error!("send notify on_hls event error: {}", err);
179+
}
180+
} else {
181+
log::warn!("event_producer is None, cannot send on_hls event");
182+
}
183+
log::info!("on_hls success: {:?}", identifier);
184+
}
165185
self.m3u8_handler
166186
.add_segment(dts - self.last_ts_dts, discontinuity, false, data)?;
167187
self.m3u8_handler.refresh_playlist()?;

protocol/hls/src/flv_data_receiver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ impl FlvDataReceiver {
4646
app_name: app_name.clone(),
4747
stream_name: stream_name.clone(),
4848
data_consumer,
49-
event_producer,
50-
media_processor: Flv2HlsRemuxer::new(duration, app_name, stream_name, need_record, path, aof_ratio),
49+
event_producer: event_producer.clone(),
50+
media_processor: Flv2HlsRemuxer::new(duration, app_name, stream_name, need_record, Some(event_producer), path, aof_ratio),
5151
subscriber_id,
5252
}
5353
}

protocol/hls/src/m3u8.rs

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,9 @@ use {
22
super::{errors::MediaError, ts::Ts},
33
bytes::BytesMut,
44
std::{collections::VecDeque, fs, fs::File, io::Write},
5+
streamhub::define::Segment,
56
};
67

7-
pub struct Segment {
8-
/*ts duration*/
9-
pub duration: i64,
10-
pub discontinuity: bool,
11-
/*ts name*/
12-
pub name: String,
13-
path: String,
14-
pub is_eof: bool,
15-
}
16-
17-
impl Segment {
18-
pub fn new(
19-
duration: i64,
20-
discontinuity: bool,
21-
name: String,
22-
path: String,
23-
is_eof: bool,
24-
) -> Self {
25-
Self {
26-
duration,
27-
discontinuity,
28-
name,
29-
path,
30-
is_eof,
31-
}
32-
}
33-
}
348

359
pub struct M3u8 {
3610
version: u16,
@@ -43,7 +17,7 @@ pub struct M3u8 {
4317
The normal recommendation is 3, but the optimum number may be larger.*/
4418
live_ts_count: usize,
4519

46-
segments: VecDeque<Segment>,
20+
pub segments: VecDeque<Segment>,
4721

4822
m3u8_folder: String,
4923
live_m3u8_name: String,

protocol/hls/src/test_flv2hls.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ mod tests {
6565

6666
let start = Instant::now();
6767
let mut media_demuxer =
68-
Flv2HlsRemuxer::new(5, String::from("live"), String::from("test"), false, String::from("./"), 1);
68+
Flv2HlsRemuxer::new(5, String::from("live"), String::from("test"), false, None, String::from("./"), 1);
6969

7070
loop {
7171
let data_ = demuxer.read_flv_tag();

0 commit comments

Comments
 (0)