diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java index 9e7add9e8..3bd11752e 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java @@ -21,6 +21,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Locale; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.storm.metric.api.MultiCountMetric; @@ -35,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Stores URL and selected metadata into a SQL table * */ +/** Stores URL and selected metadata into a SQL table */ public class IndexerBolt extends AbstractIndexerBolt { private static final Logger LOG = LoggerFactory.getLogger(IndexerBolt.class); @@ -43,13 +44,9 @@ public class IndexerBolt extends AbstractIndexerBolt { public static final String SQL_INDEX_TABLE_PARAM_NAME = "sql.index.table"; private OutputCollector _collector; - private MultiCountMetric eventCounter; - private Connection connection; - private String tableName; - private Map conf; @Override @@ -57,11 +54,8 @@ public void prepare( Map conf, TopologyContext context, OutputCollector collector) { super.prepare(conf, context, collector); _collector = collector; - this.eventCounter = context.registerMetric("SQLIndexer", new MultiCountMetric(), 10); - this.tableName = ConfUtils.getString(conf, SQL_INDEX_TABLE_PARAM_NAME); - this.conf = conf; } @@ -87,39 +81,32 @@ public void execute(Tuple tuple) { } try { - // which metadata to display? Map keyVals = filterMetadata(metadata); - - StringBuilder query = - new StringBuilder(" insert into ") - .append(tableName) - .append(" (") - .append(fieldNameForURL()); - Object[] keys = keyVals.keySet().toArray(); - for (Object o : keys) { - query.append(", ").append((String) o); - } + // Build SQL statement with prepared statement + StringBuilder fieldsBuilder = new StringBuilder(fieldNameForURL()); + StringBuilder placeholdersBuilder = new StringBuilder("?"); + StringBuilder updatesBuilder = new StringBuilder(); - query.append(") values(?"); - - for (int i = 0; i < keys.length; i++) { - query.append(", ?"); - } - - query.append(")"); - - query.append(" ON DUPLICATE KEY UPDATE "); for (int i = 0; i < keys.length; i++) { String key = (String) keys[i]; - if (i > 0) { - query.append(", "); - } - query.append(key).append("=VALUES(").append(key).append(")"); + fieldsBuilder.append(", ").append(key); + placeholdersBuilder.append(", ?"); + if (i > 0) updatesBuilder.append(", "); + updatesBuilder.append(key).append("=VALUES(").append(key).append(")"); } + String sql = + String.format( + Locale.ROOT, + "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", + tableName, + fieldsBuilder, + placeholdersBuilder, + updatesBuilder); + if (connection == null) { try { connection = SQLUtil.getConnection(conf); @@ -129,29 +116,30 @@ public void execute(Tuple tuple) { } } - LOG.debug("PreparedStatement => {}", query); + LOG.debug("PreparedStatement => {}", sql); - // create the mysql insert preparedstatement - PreparedStatement preparedStmt = connection.prepareStatement(query.toString()); + // Create the MySQL insert PreparedStatement + PreparedStatement preparedStmt = connection.prepareStatement(sql); // TODO store the text of the document? if (StringUtils.isNotBlank(fieldNameForText())) { // builder.field(fieldNameForText(), trimText(text)); } - // send URL as field? + // Send URL as first parameter if (fieldNameForURL() != null) { preparedStmt.setString(1, normalisedurl); } + // Send metadata values for (int i = 0; i < keys.length; i++) { insert(preparedStmt, i + 2, (String) keys[i], keyVals); } preparedStmt.executeUpdate(); + preparedStmt.close(); eventCounter.scope("Indexed").incrBy(1); - _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); _collector.ack(tuple); @@ -164,6 +152,7 @@ public void execute(Tuple tuple) { try { connection.close(); } catch (SQLException e1) { + // ignore } connection = null; } @@ -180,11 +169,11 @@ private void insert( String value = ""; if (values == null || values.length == 0) { LOG.info("No values found for label {}", label); - } else if (values.length > 1) { - LOG.info("More than one value found for label {}", label); - value = values[0]; } else { value = values[0]; + if (values.length > 1) { + LOG.info("More than one value found for label {}", label); + } } preparedStmt.setString(position, value); } diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java index facb14094..586d0b8f3 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java @@ -19,9 +19,9 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.sql.Timestamp; import java.time.Instant; import java.util.List; @@ -48,12 +48,12 @@ public class SQLSpout extends AbstractQueryingSpout { private Connection connection; /** - * if more than one instance of the spout exist, each one is in charge of a separate bucket + * If more than one instance of the spout exist, each one is in charge of a separate bucket * value. This is used to ensure a good diversity of URLs. */ private int bucketNum = -1; - /** Used to distinguish between instances in the logs * */ + /** Used to distinguish between instances in the logs */ protected String logIdprefix = ""; private int maxDocsPerBucket; @@ -69,9 +69,7 @@ public void open( super.open(conf, context, collector); maxDocsPerBucket = ConfUtils.getInt(conf, Constants.SQL_MAX_DOCS_BUCKET_PARAM_NAME, 5); - tableName = ConfUtils.getString(conf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); - maxNumResults = ConfUtils.getInt(conf, Constants.SQL_MAXRESULTS_PARAM_NAME, 100); try { @@ -81,7 +79,7 @@ public void open( throw new RuntimeException(ex); } - // determine bucket this spout instance will be in charge of + // Determine bucket this spout instance will be in charge of int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); if (totalTasks > 1) { logIdprefix = @@ -113,73 +111,80 @@ protected void populateBuffer() { } } - // select entries from mysql - // https://mariadb.com/kb/en/library/window-functions-overview/ - // http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/ - - String query = - "SELECT * from (select rank() over (partition by host order by nextfetchdate desc, url) as ranking, url, metadata, nextfetchdate from " - + tableName; + int alreadyprocessed = 0; + int numhits = 0; + long timeStartQuery = System.currentTimeMillis(); - query += - " WHERE nextfetchdate <= '" + new Timestamp(lastNextFetchDate.toEpochMilli()) + "'"; + PreparedStatement pstmt = null; + ResultSet rs = null; - // constraint on bucket num - if (bucketNum >= 0) { - query += " AND bucket = '" + bucketNum + "'"; - } + try { + // Select entries from MySQL + // https://mariadb.com/kb/en/library/window-functions-overview/ + // http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/ + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("SELECT * FROM ("); + queryBuilder.append( + "SELECT RANK() OVER (PARTITION BY host ORDER BY nextfetchdate DESC, url) AS ranking, "); + queryBuilder.append("url, metadata, nextfetchdate FROM ").append(tableName); + queryBuilder.append(" WHERE nextfetchdate <= ?"); + if (bucketNum >= 0) { + queryBuilder.append(" AND bucket = ?"); + } + queryBuilder.append(") AS urls_ranks WHERE urls_ranks.ranking <= ? "); + if (maxNumResults != -1) { + queryBuilder.append("ORDER BY ranking LIMIT ?"); + } else { + queryBuilder.append("ORDER BY ranking"); + } - query += - ") as urls_ranks where (urls_ranks.ranking <= " - + maxDocsPerBucket - + ") order by ranking"; + String query = queryBuilder.toString(); + LOG.debug("{} SQL query: {}", logIdprefix, query); - if (maxNumResults != -1) { - query += " LIMIT " + this.maxNumResults; - } + pstmt = connection.prepareStatement(query); - int alreadyprocessed = 0; - int numhits = 0; + int paramIndex = 1; + pstmt.setTimestamp(paramIndex++, new Timestamp(lastNextFetchDate.toEpochMilli())); - long timeStartQuery = System.currentTimeMillis(); + if (bucketNum >= 0) { + pstmt.setInt(paramIndex++, bucketNum); + } - // create the java statement - Statement st = null; - ResultSet rs = null; - try { - st = this.connection.createStatement(); + pstmt.setInt(paramIndex++, maxDocsPerBucket); - // dump query to log - LOG.debug("{} SQL query {}", logIdprefix, query); + if (maxNumResults != -1) { + pstmt.setInt(paramIndex++, maxNumResults); + } - // execute the query, and get a java resultset - rs = st.executeQuery(query); + rs = pstmt.executeQuery(); long timeTaken = System.currentTimeMillis() - timeStartQuery; queryTimes.addMeasurement(timeTaken); - // iterate through the java resultset while (rs.next()) { String url = rs.getString("url"); numhits++; - // already processed? skip + + // Already processed? Skip if (beingProcessed.containsKey(url)) { alreadyprocessed++; continue; } + String metadata = rs.getString("metadata"); if (metadata == null) { metadata = ""; } else if (!metadata.startsWith("\t")) { metadata = "\t" + metadata; } + String URLMD = url + metadata; List v = SCHEME.deserialize(ByteBuffer.wrap(URLMD.getBytes(StandardCharsets.UTF_8))); buffer.add(url, (Metadata) v.get(1)); } - // no results? reset the date + // No results? Reset the date if (numhits == 0) { lastNextFetchDate = null; } @@ -204,9 +209,9 @@ protected void populateBuffer() { LOG.error("Exception closing resultset", e); } try { - if (st != null) st.close(); + if (pstmt != null) pstmt.close(); } catch (SQLException e) { - LOG.error("Exception closing statement", e); + LOG.error("Exception closing prepared statement", e); } } }