Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/commands/consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ export class StartConsumerCommandHandler {
this.consumerCollection.create(consumeUri);
this.explorer.refresh();
}

return openDocument(consumeUri);
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { WorkspaceSettings } from "../settings";

export class ProduceRecordCommandHandler {
constructor(
private clientAccessor: ClientAccessor,
private clientAccessor: ClientAccessor,
private channelProvider: OutputChannelProvider,
private explorer: KafkaExplorer,
private settings: WorkspaceSettings
Expand Down Expand Up @@ -40,10 +40,11 @@ export class ProduceRecordCommandHandler {
const producer = client.producer;
await producer.connect();

channel.show(false);
// FIXME : show the channel only if there are none consumer output which are opened
// channel.show(false);
channel.appendLine(`Producing record(s)`);
const startOperation = performance.now();

try {
await producer.send({
topic: topic,
Expand Down
7 changes: 3 additions & 4 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import {
} from "./commands";
import { Context } from "./context";
import { BrokerItem, KafkaExplorer, TopicItem } from "./explorer";
import { ConsumerVirtualTextDocumentProvider, OutputChannelProvider, ProducerCodeLensProvider } from "./providers";
import { OutputChannelProvider, ProducerCodeLensProvider } from "./providers";
import { getClusterSettings, getWorkspaceSettings } from "./settings";
import { ClusterItem } from "./explorer/models/cluster";
import { TopicGroupItem } from "./explorer/models/topics";
import { ConsumerStatusBarItem } from "./views/consumerStatusBarItem";
import { SelectedClusterStatusBarItem } from "./views/selectedClusterStatusBarItem";
import { ConsumerOutputRegistry } from "./output/consumerOutputRegistry";

export function activate(context: vscode.ExtensionContext): void {
Context.register(context);
Expand Down Expand Up @@ -113,9 +114,7 @@ export function activate(context: vscode.ExtensionContext): void {
context.subscriptions.push(
vscode.languages.registerCodeLensProvider(documentSelector, new ProducerCodeLensProvider()));

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME,
new ConsumerVirtualTextDocumentProvider(consumerCollection)));
context.subscriptions.push(new ConsumerOutputRegistry(consumerCollection, outputChannelProvider));
}

export function deactivate(): void {
Expand Down
106 changes: 106 additions & 0 deletions src/output/consumerOutputRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import * as vscode from "vscode";
import { ConsumedRecord, ConsumerChangedStatusEvent, ConsumerCollection, ConsumerCollectionChangedEvent, RecordReceivedEvent } from "../client";
import { OutputChannelProvider } from "../providers/outputChannelProvider";

class ConsumerOutput {
private channel: vscode.OutputChannel;
constructor(uri: vscode.Uri, registry: ConsumerOutputRegistry) {

this.channel = registry.getChannel(`Kafka Consumer [${uri.path}]`);

}

public show(): void {
this.channel.show();
}

public sendText(data: string) {
this.channel.appendLine(data);
}
}

export class ConsumerOutputRegistry implements vscode.Disposable {
getChannel(name: string): vscode.OutputChannel {
return this.channelProvider.getChannel(name);
}

private kafkaConsumers: { [id: string /* vscode URI */]: ConsumerOutput } = {};
private disposables: vscode.Disposable[] = [];

constructor(private consumerCollection: ConsumerCollection, private channelProvider: OutputChannelProvider) {

this.disposables.push(this.consumerCollection.onDidChangeCollection((event: ConsumerCollectionChangedEvent) => {
for (const startedUri of event.created) {
this.showConsumer(startedUri);
this.onDidChangeStatus(startedUri, 'started');
this.attachToConsumer(startedUri);
}

for (const closedUri of event.closed) {
this.onDidCloseConsumer(closedUri);
}
}));
}

private showConsumer(uri: vscode.Uri): void {
let consumer = this.kafkaConsumers[uri.toString()];
if (!consumer) {
consumer = new ConsumerOutput(uri, this);
this.kafkaConsumers[uri.toString()] = consumer;
consumer.show();
}
}

public dispose(): void {
this.consumerCollection.dispose();
this.disposables.forEach(d => d.dispose());
}


private attachToConsumer(uri: vscode.Uri): void {
const consumer = this.consumerCollection.get(uri);

if (consumer === null) {
return;
}

this.disposables.push(consumer.onDidReceiveRecord((arg: RecordReceivedEvent) => {
this.onDidReceiveRecord(arg.uri, arg.record);
}));

this.disposables.push(consumer.onDidChangeStatus((arg: ConsumerChangedStatusEvent) => {
this.onDidChangeStatus(arg.uri, arg.status);
}));
}

private onDidChangeStatus(uri: vscode.Uri, status: string): void {
const consumer = this.kafkaConsumers[uri.toString()];
if (consumer) {
consumer.sendText(`Consumer: ${status}`);
}
}

private onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void {
const consumer = this.kafkaConsumers[uri.toString()];
if (consumer) {
consumer.sendText(``);
consumer.sendText(`Key: ${message.key}`);
consumer.sendText(`Partition: ${message.partition}`);
consumer.sendText(`Offset: ${message.offset}`);
consumer.sendText(`Value:`);
consumer.sendText(`${message.value}`);
}
}

private onDidCloseConsumer(uri: vscode.Uri): void {
this.onDidChangeStatus(uri, 'closed');
}

disposeConsumer(uri: vscode.Uri): void {
delete this.kafkaConsumers[uri.toString()];
if (this.consumerCollection.has(uri)) {
this.consumerCollection.close(uri);
}
}

}
116 changes: 0 additions & 116 deletions src/providers/consumerVirtualTextDocumentProvider.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/providers/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
export * from "./consumerVirtualTextDocumentProvider";
export * from "./outputChannelProvider";
export * from "./producerCodeLensProvider";