-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathapp.py
More file actions
279 lines (242 loc) · 11.3 KB
/
app.py
File metadata and controls
279 lines (242 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
import json
import os, argparse, gradio as gr
from dvd import config
from dvd.dvd_core import DVDCoreAgent
from dvd.video_utils import load_video, decode_video_to_frames, download_srt_subtitle
from dvd.frame_caption import process_video, process_video_lite
from dvd.utils import extract_answer
########################################################################
# Helper functions
########################################################################
def get_youtube_thumbnail(video_url: str):
"""Extract YouTube video ID and return thumbnail URL."""
if not video_url:
return None
# Extract video ID from YouTube URL
video_id = None
if "youtube.com/watch?v=" in video_url:
video_id = video_url.split("v=")[1].split("&")[0]
elif "youtu.be/" in video_url:
video_id = video_url.split("youtu.be/")[1].split("?")[0]
if video_id:
# YouTube provides several thumbnail qualities
# maxresdefault > hqdefault > mqdefault > default
return f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg"
return None
def _prepare_video_assets(video_url: str):
"""Download / decode / caption the video exactly as in local_run.py,
returning (video_id, caption_file, video_db_path)."""
# --- reuse logic from local_run.py (trimmed for brevity) -------------
if "v=" in video_url: # YouTube URL
video_id = video_url.split("v=")[1]
else: # local file or misc.
video_id = os.path.splitext(os.path.basename(video_url))[0]
video_path = os.path.join(config.VIDEO_DATABASE_FOLDER, "raw", f"{video_id}.mp4")
frames_dir = os.path.join(config.VIDEO_DATABASE_FOLDER, video_id, "frames")
captions_dir = os.path.join(config.VIDEO_DATABASE_FOLDER, video_id, "captions")
video_db_path= os.path.join(config.VIDEO_DATABASE_FOLDER, video_id, "database.json")
srt_path = os.path.join(config.VIDEO_DATABASE_FOLDER, video_id, "subtitles.srt")
os.makedirs(os.path.join(config.VIDEO_DATABASE_FOLDER, "raw"), exist_ok=True)
os.makedirs(frames_dir, exist_ok=True)
os.makedirs(captions_dir, exist_ok=True)
if config.LITE_MODE:
if not os.path.exists(srt_path):
download_srt_subtitle(video_url, srt_path)
process_video_lite(captions_dir, srt_path)
caption_file = os.path.join(captions_dir, "captions.json")
else:
if not os.path.exists(video_path):
load_video(video_url, video_path)
if not os.path.exists(frames_dir) or not os.listdir(frames_dir):
decode_video_to_frames(video_path)
caption_file = os.path.join(captions_dir, "captions.json")
if not os.path.exists(caption_file):
process_video(frames_dir, captions_dir)
return video_id, caption_file, video_db_path
def solve(video_url: str, question: str):
"""Streamed inference function used by Gradio."""
if not video_url or not question:
yield "❗ Please provide both a video URL and a question."
return
try:
yield "🔄 **Processing video...**"
_, caption_file, video_db_path = _prepare_video_assets(video_url)
yield "🤖 **Initializing DVD agent...**"
agent = DVDCoreAgent(video_db_path, caption_file, config.MAX_ITERATIONS)
accumulated_text = "### 🎯 Analysis Process:\n"
final_answer = None
for msg in agent.stream_run(question):
# Only process messages with a role attribute
if not isinstance(msg, dict) or "role" not in msg:
continue
# Show assistant's thinking process
if msg.get("role") == "assistant":
content = msg.get("content", "")
if content:
accumulated_text += f"\n\n**🤔 Assistant Thinking:**\n{content}"
yield accumulated_text
# Check if assistant called the finish function
tool_calls = msg.get("tool_calls", [])
for tc in tool_calls:
if tc.get("function", {}).get("name") == "finish":
try:
args = json.loads(tc.get("function", {}).get("arguments", "{}"))
final_answer = args.get("answer", "")
except:
pass
# Show when a tool is being called
elif msg.get("role") == "tool_call":
tool_name = msg.get("name", "unknown")
tool_args = msg.get("arguments", "{}")
try:
args_dict = json.loads(tool_args)
args_dict.pop("database", None)
# Format arguments nicely
args_str = json.dumps(args_dict, indent=2)
except:
args_str = tool_args
if tool_name != "finish":
accumulated_text += f"\n\n**🔄 Calling Tool:** `{tool_name}`\n```json\n{args_str}\n```"
yield accumulated_text
# Show tool observations
elif msg.get("role") == "tool":
tool_name = msg.get("name", "unknown")
tool_result = msg.get("content", "")
# Truncate long results for display
if len(tool_result) > 2000:
tool_result = tool_result[:2000] + "..."
accumulated_text += f"\n\n**✅ Tool Result `{tool_name}`:**\n```\n{tool_result}\n```"
yield accumulated_text
# Add final answer if found
if final_answer:
accumulated_text += f"\n### 📃✅ **Final Answer:**\n\n{final_answer}"
else:
accumulated_text += "\n\n---\n### ✅ **Analysis Complete!**"
yield accumulated_text
except Exception as e:
import traceback
yield f"### ⚠️ Error Occurred\n\n```\n{e}\n```\n\nDetails:\n```\n{traceback.format_exc()}\n```"
########################################################################
# Gradio UI
########################################################################
def launch(args):
# Custom CSS for better styling
custom_css = """
.gradio-container {
font-family: 'Inter', sans-serif;
}
.markdown-text {
font-size: 16px;
}
#answer-box {
border: 2px solid #e5e7eb;
border-radius: 8px;
padding: 20px;
background-color: #f9fafb;
min-height: 400px;
max-height: 600px;
overflow-y: auto;
}
.button-primary {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
font-weight: bold;
font-size: 18px;
padding: 12px 24px;
}
#video-thumbnail {
border-radius: 8px;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);
}
"""
with gr.Blocks(title="DVD Video Q&A Demo", css=custom_css, theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🎬 Deep Video Discovery: Agentic Search with Tool Use for Long-form Video Understanding
<p style="font-size: 18px; color: #6b7280;">
Provide a YouTube URL, then ask any question about the video content.
The system will analyze the video and provide detailed answers.
Note that this online demo only provides lite mode of DVD where only subtitles are used.
To use full DVD capabilities, please deploy it locally.
</p>
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📹 Video Input")
video_url = gr.Textbox(
label="Video URL / Path",
placeholder="e.g. https://www.youtube.com/watch?v=dQw4w9WgXcQ",
lines=1,
info="Support YouTube URLs or local video paths"
)
# Add video thumbnail
video_thumbnail = gr.Image(
label="Video Thumbnail",
elem_id="video-thumbnail",
height=200,
visible=False,
interactive=False
)
gr.Markdown("### ❓ Your Question")
question = gr.Textbox(
label="Question about the video",
placeholder="What happens in this video? Who are the main characters?",
lines=3,
info="Ask anything about the video content"
)
with gr.Row():
run_btn = gr.Button("🔍 Analyze Video", variant="primary", elem_classes=["button-primary"])
clear_btn = gr.ClearButton([video_url, question, video_thumbnail], value="🗑️ Clear")
gr.Markdown("### 💡 Example Questions")
examples = gr.Examples(
examples=[
["https://www.youtube.com/watch?v=i2qSxMVeVLI", "What is the main topic discussed in this video?"],
["https://www.youtube.com/watch?v=nOxKexn3iBo", "Who are the speakers and what are their key points?"],
],
inputs=[video_url, question],
label=""
)
with gr.Column(scale=2):
gr.Markdown("### 📊 Analysis Results")
answer_box = gr.Markdown(
value="*Results will appear here after clicking 'Analyze Video'...*",
elem_id="answer-box",
label=""
)
gr.Markdown(
"""
---
<p style="text-align: center; color: #9ca3af; font-size: 14px;">
DVD: Powered by advanced video understanding and language models |
<a href="https://github.com/your-repo" style="color: #6366f1;">GitHub</a>
</p>
"""
)
# Event handlers
def update_thumbnail(url):
"""Update thumbnail when URL changes."""
thumbnail_url = get_youtube_thumbnail(url)
if thumbnail_url:
return gr.update(value=thumbnail_url, visible=True)
else:
return gr.update(value=None, visible=False)
video_url.change(
fn=update_thumbnail,
inputs=[video_url],
outputs=[video_thumbnail]
)
import inspect
click_kwargs = dict(fn=solve, inputs=[video_url, question], outputs=answer_box)
if "stream" in inspect.signature(gr.Button.click).parameters:
click_kwargs["stream"] = True
run_btn.click(**click_kwargs)
demo.launch(share=args.share)
########################################################################
# CLI entry-point (optional)
########################################################################
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--share", action="store_true", help="Gradio share flag")
args = parser.parse_args()
launch(args)