zagnix 2019-09-28
在文章《webmagic核心设计和运行机制分析》中已经提到WebMagic内部是通过生产者/消费者模式来实现的,本篇我们就分析一下WebMagic的源代码,先从爬虫入口类main方法开始。
public static void main(String[] args) { Spider.create(new GithubRepoPageProcessor()) //从https://github.com/code4craft开始抓 .addUrl("https://github.com/code4craft") //设置Scheduler,使用Redis来管理URL队列 .setScheduler(new RedisScheduler("localhost")) //设置Pipeline,将结果以json方式保存到文件 .addPipeline(new JsonFilePipeline("D:\\data\\webmagic")) //开启5个线程同时执行 .thread(5) //启动爬虫 .run(); }
通过官方给出的创建爬虫入口类的样例代码可以看到,启动爬虫是调用Spider.run()
方法。
1. Spider.run()
方法
checkRunningStat()
检查运行状态,不是很重要,跳过。
@Override public void run() { // 检查运行状态 checkRunningStat(); // 初始化组件 initComponent(); logger.info("Spider {} started!",getUUID()); // 死循环从Scheduler中拉取Request(Request中封装了url) while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { final Request request = scheduler.poll(this); if (request == null) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added // 当Scheduler中不存在Request时,线程等待 waitNewUrl(); } else { threadPool.execute(new Runnable() { @Override public void run() { try { // 处理Request,核心方法 processRequest(request); // 调用监听器onSuccess()方法 onSuccess(request); } catch (Exception e) { // 调用监听器onError()方法 onError(request); logger.error("process request " + request + " error", e); } finally { pageCount.incrementAndGet(); // 唤醒线程 signalNewUrl(); } } }); } } stat.set(STAT_STOPPED); // release some resources if (destroyWhenExit) { close(); } logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get()); }
2. initComponent()
初始化组件:设置默认Downloader实现,初始化线程池
//初始化组件 protected void initComponent() { if (downloader == null) { // 默认使用HttpClientDownloader this.downloader = new HttpClientDownloader(); } if (pipelines.isEmpty()) { pipelines.add(new ConsolePipeline()); } downloader.setThread(threadNum); // 初始化线程池 if (threadPool == null || threadPool.isShutdown()) { if (executorService != null && !executorService.isShutdown()) { threadPool = new CountableThreadPool(threadNum, executorService); } else { threadPool = new CountableThreadPool(threadNum); } } if (startRequests != null) { for (Request request : startRequests) { addRequest(request); } startRequests.clear(); } startTime = new Date(); }
3. waitNewUrl()
/ signalNewUrl()
:配合Scheduler对象实现生产者/消费者模式
// 线程等待 private void waitNewUrl() { newUrlLock.lock(); try { //double check if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { return; } newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.warn("waitNewUrl - interrupted, error {}", e); } finally { newUrlLock.unlock(); } } // 线程唤醒 private void signalNewUrl() { try { newUrlLock.lock(); newUrlCondition.signalAll(); } finally { newUrlLock.unlock(); } }
在Spider类属性中包含了默认Scheduler实现类QueueScheduler的对象scheduler,而在QueueScheduler类中默认使用内存阻塞队列来存储Request。
// Spider类属性 protected Scheduler scheduler = new QueueScheduler();
public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler { // 默认使用内存阻塞队列来存储Request对象 private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>(); @Override public void pushWhenNoDuplicate(Request request, Task task) { queue.add(request); } @Override public Request poll(Task task) { return queue.poll(); } @Override public int getLeftRequestsCount(Task task) { return queue.size(); } @Override public int getTotalRequestsCount(Task task) { return getDuplicateRemover().getTotalRequestsCount(task); } }
看到这里,有人可能会疑惑当阻塞队列中没有Request对象时,线程会卡死在Spider.run()
方法中waitNewUrl()
上。
其实只要我们看Spider.addUrl()
和Spider.addRequest()
两个方法的源码,就会发现其中调用了线程唤醒方法Spider.signalNewUrl()
,而在爬虫入口类中必然会调addUrl()
或addRequest()
其中一个来设置起始url,所以不存在线程运行卡死的情况。
public Spider addUrl(String... urls) { for (String url : urls) { addRequest(new Request(url)); } // 调用线程唤醒方法 signalNewUrl(); return this; } public Spider addRequest(Request... requests) { for (Request request : requests) { addRequest(request); } // 调用线程唤醒方法 signalNewUrl(); return this; } private void addRequest(Request request) { if (site.getDomain() == null && request != null && request.getUrl() != null) { site.setDomain(UrlUtils.getDomain(request.getUrl())); } // 将request推送到scheduler内存阻塞队列中去 scheduler.push(request, this); }
这样就构成了一个典型的生产者/消费者模式代码实现。
4. processRequest()
:爬虫业务逻辑的核心方法
首先调用Downloader下载网页,再调用自定义的PageProcessor解析网页文本并从中提取出目标数据,最后调用自定义的Pipeline持久化目标数据。
private void processRequest(Request request) { // 调用Downloader对象下载网页 Page page = downloader.download(request, this); if (page.isDownloadSuccess()){ // 下载成功 onDownloadSuccess(request, page); } else { onDownloaderFail(request); } } private void onDownloadSuccess(Request request, Page page) { if (site.getAcceptStatCode().contains(page.getStatusCode())){ // 调用自定义的PageProcessor对象(通过入口类设置)解析封装网页的Page对象 pageProcessor.process(page); // 添加后续需要爬取的url字符串,推送Request到Scheduler中去 extractAndAddRequests(page, spawnUrl); if (!page.getResultItems().isSkip()) { for (Pipeline pipeline : pipelines) { // 调用自定义的Pipeline对象(通过入口类设置)持久化目标数据,通过ResultItems对象封装传递 pipeline.process(page.getResultItems(), this); } } } else { logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode()); } sleep(site.getSleepTime()); return; } private void onDownloaderFail(Request request) { if (site.getCycleRetryTimes() == 0) { sleep(site.getSleepTime()); } else { // for cycle retry // 下载失败后重试 doCycleRetry(request); } } protected void extractAndAddRequests(Page page, boolean spawnUrl) { // 添加target Request if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) { for (Request request : page.getTargetRequests()) { // 推送Request到Scheduler内存阻塞队列中去 addRequest(request); } } }
5. onSuccess()
/ onError()
:监听Request处理成功或失败的情况
只有需要自定义SpiderListener
监听器时才会使用到,这里不是重点,不再赘述,具体使用方法可参考源码 https://github.com/xiawq87/sp... 中的实现。
通过上述源码分析的过程,让我们可以大体了解WebMagic内部生产者/消费者模式的实现方式。