Skip to content

Commit 57cab25

Browse files
committed
add http proxy support in mqtt3 client
Signed-off-by: Tony Guo <[email protected]>
1 parent 8730324 commit 57cab25

File tree

9 files changed

+232
-12
lines changed

9 files changed

+232
-12
lines changed

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ public class MqttConnectOptions {
7676
private int maxReconnectDelay = 128000;
7777
private boolean skipPortDuringHandshake = false;
7878
private Map<String, String> customWebSocketHeaders = null;
79+
private String httpProxyHost;
80+
private int httpProxyPort;
81+
private String httpProxyUser;
82+
private String httpProxyPassword;
7983

8084
// Client Operation Parameters
8185
private int executorServiceTimeout = 1; // How long to wait in seconds when terminating the executor service.
@@ -716,6 +720,38 @@ public void setCustomWebSocketHeaders(Map<String, String> props) {
716720
public Map<String, String> getCustomWebSocketHeaders() {
717721
return customWebSocketHeaders;
718722
}
723+
public String getHttpProxyHost() {
724+
return httpProxyHost;
725+
}
726+
727+
public void setHttpProxyHost(String httpProxyHost) {
728+
this.httpProxyHost = httpProxyHost;
729+
}
730+
731+
public int getHttpProxyPort() {
732+
return httpProxyPort;
733+
}
734+
735+
public void setHttpProxyPort(int httpProxyPort) {
736+
this.httpProxyPort = httpProxyPort;
737+
}
738+
739+
public String getHttpProxyUser() {
740+
return httpProxyUser;
741+
}
742+
743+
public void setHttpProxyUser(String httpProxyUser) {
744+
this.httpProxyUser = httpProxyUser;
745+
}
746+
747+
public String getHttpProxyPassword() {
748+
return httpProxyPassword;
749+
}
750+
751+
public void setHttpProxyPassword(String httpProxyPassword) {
752+
this.httpProxyPassword = httpProxyPassword;
753+
}
754+
719755

720756
public String toString() {
721757
return Debug.dumpProperties(getDebug(), "Connection options");

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttException.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ public class MqttException extends Exception {
139139
* state. New up a new client to continue.
140140
*/
141141
public static final short REASON_CODE_CLIENT_CLOSED = 32111;
142+
143+
/** Unable to connect to server though http proxy*/
144+
public static final short REASON_CODE_HTTP_PROXY_CONNECT_ERROR = 32112;
142145

143146
/**
144147
* A request has been made to use a token that is already associated with

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/SSLNetworkModuleFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
8282
netModule.setEnabledCiphers(enabledCiphers);
8383
}
8484
}
85+
86+
netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
87+
options.getHttpProxyUser(), options.getHttpProxyPassword());
8588
return netModule;
8689
}
8790
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModule.java

Lines changed: 168 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
import java.io.IOException;
1919
import java.io.InputStream;
2020
import java.io.OutputStream;
21+
import java.io.UnsupportedEncodingException;
2122
import java.net.ConnectException;
2223
import java.net.InetSocketAddress;
2324
import java.net.Socket;
2425
import java.net.SocketAddress;
2526

2627
import javax.net.SocketFactory;
2728

29+
import javax.net.ssl.SSLSocketFactory;
2830
import org.eclipse.paho.client.mqttv3.MqttException;
31+
import org.eclipse.paho.client.mqttv3.internal.websocket.Base64;
2932
import org.eclipse.paho.client.mqttv3.logging.Logger;
3033
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
3134

@@ -42,6 +45,11 @@ public class TCPNetworkModule implements NetworkModule {
4245
private int port;
4346
private int conTimeout;
4447

48+
private String httpProxyHost;
49+
private int httpProxyPort;
50+
private String httpProxyUser;
51+
private String httpProxyPassword;
52+
4553
/**
4654
* Constructs a new TCPNetworkModule using the specified host and
4755
* port. The supplied SocketFactory is used to supply the network
@@ -66,19 +74,139 @@ public TCPNetworkModule(SocketFactory factory, String host, int port, String res
6674
*/
6775
public void start() throws IOException, MqttException {
6876
final String methodName = "start";
77+
78+
// @TRACE 252=connect to host {0} port {1} timeout {2}
79+
log.fine(CLASS_NAME,methodName, "252", new Object[] {host, Integer.valueOf(port), Long.valueOf(conTimeout*1000)});
80+
81+
if(httpProxyHost != null) {
82+
Socket tunnel;
83+
84+
/*
85+
* Set up a socket to do tunneling through the proxy.
86+
* Start it off as a regular socket, then layer SSL
87+
* over the top of it.
88+
*/
89+
try {
90+
tunnel = new Socket(httpProxyHost, httpProxyPort);
91+
doTunnelHandshake(tunnel, host, port, httpProxyUser, httpProxyPassword);
92+
}catch (IOException ex) {
93+
//@TRACE 251=Failed to create TCP tunnel
94+
log.fine(CLASS_NAME,methodName,"251",null,ex);
95+
throw new MqttException(MqttException.REASON_CODE_HTTP_PROXY_CONNECT_ERROR, ex);
96+
}
97+
98+
try {
99+
socket = ((SSLSocketFactory) factory).createSocket(tunnel, host, port, true);
100+
} catch (ConnectException ex) {
101+
//@TRACE 250=Failed to create TCP socket
102+
log.fine(CLASS_NAME,methodName,"250",null,ex);
103+
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
104+
}
105+
106+
} else {
107+
try {
108+
SocketAddress sockaddr = new InetSocketAddress(host, port);
109+
socket = factory.createSocket();
110+
socket.connect(sockaddr, conTimeout * 1000);
111+
socket.setSoTimeout(1000);
112+
} catch (ConnectException ex) {
113+
//@TRACE 250=Failed to create TCP socket
114+
log.fine(CLASS_NAME, methodName, "250", null, ex);
115+
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
116+
}
117+
}
118+
}
119+
120+
/*
121+
* Tell our tunnel where we want to CONNECT, and look for the
122+
* right reply. Throw IOException if anything goes wrong.
123+
*/
124+
private void doTunnelHandshake(Socket tunnel, String host, int port, String proxyUser, String proxyPassword)
125+
throws IOException {
126+
OutputStream out = tunnel.getOutputStream();
127+
128+
String msg;
129+
if(proxyUser != null) {
130+
String proxyUserPass = String.format("%s:%s", proxyUser, proxyPassword);
131+
msg = "CONNECT " + host + ":" + port + " HTTP/1.1\n"
132+
+ "Proxy-Authorization: Basic " + Base64.encode(proxyUserPass) + "\n"
133+
+ "User-Agent: Paho MQTT3 Client\n"
134+
+ "Proxy-Connection: Keep-Alive"
135+
+ "\r\n\r\n";
136+
} else {
137+
msg = "CONNECT " + host + ":" + port + " HTTP/1.0\n"
138+
+ "User-Agent: "
139+
+ "User-Agent: Paho MQTT3 Client\n"
140+
+ "Proxy-Connection: Keep-Alive"
141+
+ "\r\n\r\n";
142+
}
143+
144+
byte b[];
69145
try {
70-
// @TRACE 252=connect to host {0} port {1} timeout {2}
71-
log.fine(CLASS_NAME,methodName, "252", new Object[] {host, Integer.valueOf(port), Long.valueOf(conTimeout*1000)});
72-
SocketAddress sockaddr = new InetSocketAddress(host, port);
73-
socket = factory.createSocket();
74-
socket.connect(sockaddr, conTimeout*1000);
75-
socket.setSoTimeout(1000);
146+
/*
147+
* We really do want ASCII7 -- the http protocol doesn't change
148+
* with locale.
149+
*/
150+
b = msg.getBytes("ASCII7");
151+
} catch (UnsupportedEncodingException ignored) {
152+
/*
153+
* If ASCII7 isn't there, something serious is wrong, but
154+
* Paranoia Is Good (tm)
155+
*/
156+
b = msg.getBytes();
76157
}
77-
catch (ConnectException ex) {
78-
//@TRACE 250=Failed to create TCP socket
79-
log.fine(CLASS_NAME,methodName,"250",null,ex);
80-
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
158+
out.write(b);
159+
out.flush();
160+
161+
/*
162+
* We need to store the reply so we can create a detailed
163+
* error message to the user.
164+
*/
165+
byte reply[] = new byte[200];
166+
int replyLen = 0;
167+
int newlinesSeen = 0;
168+
boolean headerDone = false; /* Done on first newline */
169+
170+
InputStream in = tunnel.getInputStream();
171+
boolean error = false;
172+
173+
while (newlinesSeen < 2) {
174+
int i = in.read();
175+
if (i < 0) {
176+
throw new IOException("Unexpected EOF from proxy");
177+
}
178+
if (i == '\n') {
179+
headerDone = true;
180+
++newlinesSeen;
181+
} else if (i != '\r') {
182+
newlinesSeen = 0;
183+
if (!headerDone && replyLen < reply.length) {
184+
reply[replyLen++] = (byte) i;
185+
}
186+
}
81187
}
188+
189+
/*
190+
* Converting the byte array to a string is slightly wasteful
191+
* in the case where the connection was successful, but it's
192+
* insignificant compared to the network overhead.
193+
*/
194+
String replyStr;
195+
try {
196+
replyStr = new String(reply, 0, replyLen, "ASCII7");
197+
} catch (UnsupportedEncodingException ignored) {
198+
replyStr = new String(reply, 0, replyLen);
199+
}
200+
201+
/* We asked for HTTP/1.0, so we should get that back */
202+
// if (!replyStr.startsWith("HTTP/1.0 200")) {
203+
if(replyStr.indexOf("200") == -1) {
204+
throw new IOException("Unable to tunnel through "
205+
+ tunnel.getInetAddress().getHostName() + ":" + tunnel.getPort()
206+
+ ". Proxy returns \"" + replyStr + "\"");
207+
}
208+
209+
/* tunneling Handshake was successful! */
82210
}
83211

84212
public InputStream getInputStream() throws IOException {
@@ -110,4 +238,33 @@ public void setConnectTimeout(int timeout) {
110238
public String getServerURI() {
111239
return "tcp://" + host + ":" + port;
112240
}
113-
}
241+
242+
public void setHttpProxyHost(String httpProxyHost) {
243+
this.httpProxyHost = httpProxyHost;
244+
}
245+
246+
public void setHttpProxyPort(int httpProxyPort) {
247+
this.httpProxyPort = httpProxyPort;
248+
}
249+
250+
public void setHttpProxyUser(String httpProxyUser) {
251+
this.httpProxyUser = httpProxyUser;
252+
}
253+
254+
public void setHttpProxyPassword(String httpProxyPassword) {
255+
this.httpProxyPassword = httpProxyPassword;
256+
}
257+
258+
public void configHttpProxy(String proxyHost, int proxyPort, String user, String password) {
259+
if(proxyHost != null && proxyHost.length() > 0 &&
260+
proxyPort > 0){
261+
setHttpProxyHost(proxyHost);
262+
setHttpProxyPort(proxyPort);
263+
if(user != null && user.length() > 0 &&
264+
password != null && password.length() > 0) {
265+
setHttpProxyUser(user);
266+
setHttpProxyPassword(password);
267+
}
268+
}
269+
}
270+
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModuleFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
5858
}
5959
TCPNetworkModule networkModule = new TCPNetworkModule(factory, host, port, clientId);
6060
networkModule.setConnectTimeout(options.getConnectionTimeout());
61+
62+
networkModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
63+
options.getHttpProxyUser(), options.getHttpProxyPassword());
6164
return networkModule;
6265
}
63-
}
66+
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketNetworkModuleFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
5454
WebSocketNetworkModule netModule = new WebSocketNetworkModule(factory, brokerUri.toString(), host, port,
5555
clientId, options.getCustomWebSocketHeaders(), options.isSkipPortDuringHandshake());
5656
netModule.setConnectTimeout(options.getConnectionTimeout());
57+
58+
netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
59+
options.getHttpProxyUser(), options.getHttpProxyPassword());
5760
return netModule;
5861
}
5962
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketSecureNetworkModuleFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
7575
((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
7676
}
7777
}
78+
79+
netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
80+
options.getHttpProxyUser(), options.getHttpProxyPassword());
81+
// if(options.getHttpProxyHost() != null && options.getHttpProxyHost().length() > 0 &&
82+
// options.getHttpProxyPort() > 0){
83+
// netModule.setHttpProxyHost(options.getHttpProxyHost());
84+
// netModule.setHttpProxyPort(options.getHttpProxyPort());
85+
// if(options.getHttpProxyUser() != null && options.getHttpProxyUser().length() > 0 &&
86+
// options.getHttpProxyPassword() != null && options.getHttpProxyPassword().length() > 0) {
87+
// netModule.setHttpProxyUser(options.getHttpProxyUser());
88+
// netModule.setHttpProxyPassword(options.getHttpProxyPassword());
89+
// }
90+
// }
7891
return netModule;
7992
}
8093
}

org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/logcat.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
223=failed: in closed state
3838
224=failed: not disconnected
3939
250=Failed to create TCP socket
40+
251=Failed to create TCP tunnel
4041
252=connect to host {0} port {1} timeout {2}
4142
260=setEnabledCiphers ciphers={0}
4243
300=key={0} message={1}

org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/messages.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,4 @@
3636
32200=Persistence already in use
3737
32201=Token already in use
3838
32202=Too many publishes in progress
39+
32203=Unable to connect to Http Proxy

0 commit comments

Comments
 (0)