Skip to content

Commit 095300b

Browse files
committed
Consumer with output.
Signed-off-by: azerr <[email protected]>
1 parent e2f0b8f commit 095300b

File tree

6 files changed

+113
-126
lines changed

6 files changed

+113
-126
lines changed

src/commands/consumers.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ export class StartConsumerCommandHandler {
4949
this.consumerCollection.create(consumeUri);
5050
this.explorer.refresh();
5151
}
52-
53-
return openDocument(consumeUri);
5452
}
5553
}
5654

src/commands/producers.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { WorkspaceSettings } from "../settings";
99

1010
export class ProduceRecordCommandHandler {
1111
constructor(
12-
private clientAccessor: ClientAccessor,
12+
private clientAccessor: ClientAccessor,
1313
private channelProvider: OutputChannelProvider,
1414
private explorer: KafkaExplorer,
1515
private settings: WorkspaceSettings
@@ -40,10 +40,11 @@ export class ProduceRecordCommandHandler {
4040
const producer = client.producer;
4141
await producer.connect();
4242

43-
channel.show(false);
43+
// FIXME : show the channel only if there are none consumer output which are opened
44+
// channel.show(false);
4445
channel.appendLine(`Producing record(s)`);
4546
const startOperation = performance.now();
46-
47+
4748
try {
4849
await producer.send({
4950
topic: topic,

src/extension.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ import {
1818
} from "./commands";
1919
import { Context } from "./context";
2020
import { BrokerItem, KafkaExplorer, TopicItem } from "./explorer";
21-
import { ConsumerVirtualTextDocumentProvider, OutputChannelProvider, ProducerCodeLensProvider } from "./providers";
21+
import { OutputChannelProvider, ProducerCodeLensProvider } from "./providers";
2222
import { getClusterSettings, getWorkspaceSettings } from "./settings";
2323
import { ClusterItem } from "./explorer/models/cluster";
2424
import { TopicGroupItem } from "./explorer/models/topics";
2525
import { ConsumerStatusBarItem } from "./views/consumerStatusBarItem";
2626
import { SelectedClusterStatusBarItem } from "./views/selectedClusterStatusBarItem";
27+
import { ConsumerOutputRegistry } from "./output/consumerOutputRegistry";
2728

2829
export function activate(context: vscode.ExtensionContext): void {
2930
Context.register(context);
@@ -113,9 +114,7 @@ export function activate(context: vscode.ExtensionContext): void {
113114
context.subscriptions.push(
114115
vscode.languages.registerCodeLensProvider(documentSelector, new ProducerCodeLensProvider()));
115116

116-
context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
117-
ConsumerVirtualTextDocumentProvider.SCHEME,
118-
new ConsumerVirtualTextDocumentProvider(consumerCollection)));
117+
context.subscriptions.push(new ConsumerOutputRegistry(consumerCollection, outputChannelProvider));
119118
}
120119

121120
export function deactivate(): void {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import * as vscode from "vscode";
2+
import { ConsumedRecord, ConsumerChangedStatusEvent, ConsumerCollection, ConsumerCollectionChangedEvent, RecordReceivedEvent } from "../client";
3+
import { OutputChannelProvider } from "../providers/outputChannelProvider";
4+
5+
class ConsumerOutput {
6+
private channel: vscode.OutputChannel;
7+
constructor(uri: vscode.Uri, registry: ConsumerOutputRegistry) {
8+
9+
this.channel = registry.getChannel(`Kafka Consumer [${uri.path}]`);
10+
11+
}
12+
13+
public show(): void {
14+
this.channel.show();
15+
}
16+
17+
public sendText(data: string) {
18+
this.channel.appendLine(data);
19+
}
20+
}
21+
22+
export class ConsumerOutputRegistry implements vscode.Disposable {
23+
getChannel(name: string): vscode.OutputChannel {
24+
return this.channelProvider.getChannel(name);
25+
}
26+
27+
private kafkaConsumers: { [id: string /* vscode URI */]: ConsumerOutput } = {};
28+
private disposables: vscode.Disposable[] = [];
29+
30+
constructor(private consumerCollection: ConsumerCollection, private channelProvider: OutputChannelProvider) {
31+
32+
this.disposables.push(this.consumerCollection.onDidChangeCollection((event: ConsumerCollectionChangedEvent) => {
33+
for (const startedUri of event.created) {
34+
this.showConsumer(startedUri);
35+
this.onDidChangeStatus(startedUri, 'started');
36+
this.attachToConsumer(startedUri);
37+
}
38+
39+
for (const closedUri of event.closed) {
40+
this.onDidCloseConsumer(closedUri);
41+
}
42+
}));
43+
}
44+
45+
private showConsumer(uri: vscode.Uri): void {
46+
let consumer = this.kafkaConsumers[uri.toString()];
47+
if (!consumer) {
48+
consumer = new ConsumerOutput(uri, this);
49+
this.kafkaConsumers[uri.toString()] = consumer;
50+
consumer.show();
51+
}
52+
}
53+
54+
public dispose(): void {
55+
this.consumerCollection.dispose();
56+
this.disposables.forEach(d => d.dispose());
57+
}
58+
59+
60+
private attachToConsumer(uri: vscode.Uri): void {
61+
const consumer = this.consumerCollection.get(uri);
62+
63+
if (consumer === null) {
64+
return;
65+
}
66+
67+
this.disposables.push(consumer.onDidReceiveRecord((arg: RecordReceivedEvent) => {
68+
this.onDidReceiveRecord(arg.uri, arg.record);
69+
}));
70+
71+
this.disposables.push(consumer.onDidChangeStatus((arg: ConsumerChangedStatusEvent) => {
72+
this.onDidChangeStatus(arg.uri, arg.status);
73+
}));
74+
}
75+
76+
private onDidChangeStatus(uri: vscode.Uri, status: string): void {
77+
const consumer = this.kafkaConsumers[uri.toString()];
78+
if (consumer) {
79+
consumer.sendText(`Consumer: ${status}`);
80+
}
81+
}
82+
83+
private onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void {
84+
const consumer = this.kafkaConsumers[uri.toString()];
85+
if (consumer) {
86+
consumer.sendText(``);
87+
consumer.sendText(`Key: ${message.key}`);
88+
consumer.sendText(`Partition: ${message.partition}`);
89+
consumer.sendText(`Offset: ${message.offset}`);
90+
consumer.sendText(`Value:`);
91+
consumer.sendText(`${message.value}`);
92+
}
93+
}
94+
95+
private onDidCloseConsumer(uri: vscode.Uri): void {
96+
this.onDidChangeStatus(uri, 'closed');
97+
}
98+
99+
disposeConsumer(uri: vscode.Uri): void {
100+
delete this.kafkaConsumers[uri.toString()];
101+
if (this.consumerCollection.has(uri)) {
102+
this.consumerCollection.close(uri);
103+
}
104+
}
105+
106+
}

src/providers/consumerVirtualTextDocumentProvider.ts

Lines changed: 0 additions & 116 deletions
This file was deleted.

src/providers/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
1-
export * from "./consumerVirtualTextDocumentProvider";
21
export * from "./outputChannelProvider";
32
export * from "./producerCodeLensProvider";

0 commit comments

Comments
 (0)