没写过 Java,心里不顶真;我看了网上的资料应该 ok,试了下也没发现什么问题;要并发的去发送一些请求,用到了连接池,我这样应该是线程安全的吧?
public class MyRequest {
    private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MyRequest.class);
    private final String REQUEST_URL;
    private ExecutorService executorService;
    private Queue<Map<String, String>> tasks;
    private static PoolingHttpClientConnectionManager cm; //<-----
    private static CloseableHttpClient httpClient;
    private MyRequest(int taskQueueSize, int executorCount, String requestURL, Map<String, Object> connConfig) {
        this.tasks = new ArrayBlockingQueue<>(taskQueueSize);
        this.executorService = Executors.newFixedThreadPool(executorCount);
        REQUEST_URL = requestURL;
        String proxyHost = connConfig.get("proxyHost").toString();
        int proxyPort = Integer.parseInt(connConfig.get("proxyPort").toString());
        cm = new PoolingHttpClientConnectionManager();
        cm.setMaxTotal(Integer.parseInt(connConfig.get("maxTotal").toString()));
        cm.setDefaultMaxPerRoute(Integer.parseInt(connConfig.get("defaultMaxPerRoute").toString()));
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(Integer.parseInt(connConfig.get("connectTimeout").toString()))
                .setSocketTimeout(Integer.parseInt(connConfig.get("socketTimeout").toString()))
                .setConnectionRequestTimeout(Integer.parseInt(connConfig.get("cxxRxxTxout").toString()))
                .build();
        HttpClientBuilder httpClientBuilder = HttpClients.custom()
                .setConnectionManager(cm)
                .setDefaultRequestConfig(config);
        if (!proxyHost.equals("") && 0 != proxyPort) {
            httpClient = httpClientBuilder.setProxy(new HttpHost(proxyHost, proxyPort)).build();
        } else {
            httpClient = httpClientBuilder.build();
        }
    }
    private void addTask(Map<String, String> parameters) {
        tasks.offer(parameters);
    }
    private void flush() {
        List<Future> futures = this.tasks.stream()
                .map(this::delegate)
                .collect(Collectors.toList());
        futures.forEach((f) -> {
            try {
                f.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        this.tasks.clear();
    }
    private Future delegate(Map<String, String> parameters) {
        return this.executorService.submit(() -> {
            doRequest(parameters, REQUEST_URL);
        });
    }
    private void doRequest(Map<String, String> parameters, String url) {
        CloseableHttpResponse resp = null; 
        HttpGet get = null; 
        try {
            URIBuilder builder = new URIBuilder(url);
            builder.addParameter("foo", parameters.get("bar"));
            get = new HttpGet(builder.build()); //<-----
            resp = httpClient.execute(get); //<-----
            if (resp.getStatusLine().getStatusCode() != 200) {
                LOGGER.warn("xxx");
            } else {
                LOGGER.info("xxx");
            }
            resp.close();
        } catch (URISyntaxException e) {
            LOGGER.warn("xxx" + e.getMessage());
        } catch (ClientProtocolException e) {
            LOGGER.warn("xxx" + e.getMessage());
        } catch (IOException e) {
            LOGGER.warn("xxx" + e.getMessage());
        } finally {
            if (resp != null) {
                try {
                    resp.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    private void shutdown() {
        this.executorService.shutdown();
    }
    public static void main(String args[]) throws IOException {
        String logPath = System.getProperty("readLogPath");
        String requestURL = System.getProperty("requestURL");
        int taskQueueSize = Integer.valueOf(System.getProperty("max.requests", "2000"));
        int executorCount = Integer.valueOf(System.getProperty("num.executors", "100"));
        int interval = Integer.valueOf(System.getProperty("max.interval", "500"));
        String proxyHost = System.getProperty("proxyHost", "");
        int proxyPort = Integer.parseInt(System.getProperty("proxyInt", "0"));
        int maxTotal = Integer.parseInt(System.getProperty("maxTotal", "5000"));
        int defaultMaxPerRoute = Integer.parseInt(System.getProperty("defaultMaxPerRoute", "1000"));
        int connectTimeout = Integer.parseInt(System.getProperty("connectTimeout", "1000"));
        int socketTimeout = Integer.parseInt(System.getProperty("socketTimeout", "3000"));
        int connectionRequestTimeout = Integer.parseInt(System.getProperty("connectionRequestTimeout", "3000"));
        Map<String, Object> connConfig = new HashMap<>();
        connConfig.put("proxyHost", proxyHost);
        connConfig.put("proxyPort", proxyPort);
        connConfig.put("maxTotal", maxTotal);
        connConfig.put("defaultMaxPerRoute", defaultMaxPerRoute);
        connConfig.put("connectTimeout", connectTimeout);
        connConfig.put("socketTimeout", socketTimeout);
        connConfig.put("connectionRequestTimeout", connectionRequestTimeout);
        try {
            MyRequest syncLog = new MyRequest(taskQueueSize, executorCount, requestURL, connConfig);
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            Path path = new Path(logPath);
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
            try {
                String line;
                int size = 0;
                long startTs = System.currentTimeMillis();
                line = br.readLine();
                while (line != null) {
                    JsonElement root = new JsonParser().parse(line);
                    Map<String, String> parameters = new HashMap<>();
                    JsonObject rootJson = root.getAsJsonObject();
                    for (Map.Entry entry : rootJson.entrySet()) {
                        parameters.put(entry.getKey().toString(),
                                rootJson.get(entry.getKey().toString()).getAsString());
                    }
                    syncLog.addTask(parameters);
                    ++size;
                    if (size >= taskQueueSize || (System.currentTimeMillis() - startTs) > interval) {
                        syncLog.flush();
                        size = 0;
                        startTs = System.currentTimeMillis();
                    }
                    line = br.readLine();
                }
                if (0 != size) {
                    syncLog.flush();
                }
            } catch (IOException e) {
                LOGGER.error("xxx" + e.getMessage());
            } finally {
                br.close();
            }
            syncLog.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
|  |      1Finest      2017-09-11 07:49:47 +08:00 doRequest 的 resp.close()重复了吧,既然你都在 finally 里 close 了 | 
|  |      2Finest      2017-09-11 07:56:27 +08:00  1 还有就是 ArrayBlockingQueue 的用法有问题,你这用法还不如直接用 List 批量提交。 如果想用到 Queue,那直接开 N 个线程作为消费者线程。 |