Elasticsearch 是一個開源的搜索引擎,建立在一個全文搜索引擎庫Apache Lucene? 基礎(chǔ)之上。
在夾江等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì) 網(wǎng)站設(shè)計(jì)制作按需網(wǎng)站建設(shè),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計(jì),營銷型網(wǎng)站建設(shè),外貿(mào)網(wǎng)站制作,夾江網(wǎng)站建設(shè)費(fèi)用合理。
那么如何實(shí)現(xiàn) Elasticsearch和 Python 的對接成為我們所關(guān)心的問題了 (怎么什么都要和 Python 關(guān)聯(lián)啊)。視頻教程文末也整理好了!
/Python 交互/
所以,Python 也就提供了可以對接 Elasticsearch的依賴庫。
def __init__(
self, index_type:
str, index_name:
str, ip=
"127.0.0.1"):
#
self.es = Elasticsearch([ip], http_auth=(
'username',
'password'), port=
9200)
self.es = Elasticsearch(
"localhost:9200")
self.index_type = index_type
self.index_name = index_name
初始化連接一個 Elasticsearch 操作對象。
def __init__(
self, index_type:
str, index_name:
str, ip=
"127.0.0.1"):
#
self.es = Elasticsearch([ip], http_auth=(
'username',
'password'), port=
9200)
self.es = Elasticsearch(
"localhost:9200")
self.index_type = index_type
self.index_name = index_name
默認(rèn)端口 9200,初始化前請確保本地已搭建好 Elasticsearch的所屬環(huán)境。
根據(jù) ID 獲取文檔數(shù)據(jù)
def
insert_one
(
self,
doc: dict):
self.es.index(index=
self.index_name, doc_type=
self.index_type, body=doc)
def
insert_array
(
self,
docs: list):
for doc
in
docs:
self.es.index(index=
self.index_name, doc_type=
self.index_type, body=doc)
插入文檔數(shù)據(jù)
def
insert_one
(
self,
doc: dict):
self.es.index(index=
self.index_name, doc_type=
self.index_type, body=doc)
def
insert_array
(
self,
docs: list):
for doc
in
docs:
self.es.index(index=
self.index_name, doc_type=
self.index_type, body=doc)
搜索文檔數(shù)據(jù)
def search(
self, query,
count: int =
30):
dsl = {
"query": {
"multi_match": {
"query": query,
"fields": [
"title",
"content",
"link"]
}
},
"highlight": {
"fields": {
"title": {}
}
}
}
match_data =
self.es.search(index=
self.index_name, body=dsl, size=
count)
return match_data
def __search(
self, query: dict,
count: int =
20): #
count: 返回的數(shù)據(jù)大小
results = []
params = {
'size':
count
}
match_data =
self.es.search(index=
self.index_name, body=query, params=params)
for hit
in match_data['hits']['hits']:
results.append(hit['_source'])
return results
刪除文檔數(shù)據(jù)
def
delete_index
(self):
try:
self.es.indices.delete(index=self.index_name)
except:
pass
好啊,封裝 search 類也是為了方便調(diào)用,整體貼一下。
from elasticsearch import Elasticsearch
class elasticSearch():
def __init__(
self, index_type:
str, index_name:
str, ip=
"127.0.0.1"):
#
self.es = Elasticsearch([ip], http_auth=(
'elastic',
'password'), port=
9200)
self.es = Elasticsearch(
"localhost:9200")
self.index_type = index_type
self.index_name = index_name
def create_index(
self):
if
self.es.indices.exists(index=
self.index_name) is True:
self.es.indices.delete(index=
self.index_name)
self.es.indices.create(index=
self.index_name, ignore=
400)
def delete_index(
self):
try:
self.es.indices.delete(index=
self.index_name)
except:
pass
def get_doc(
self, uid):
return
self.es.get(index=
self.index_name, id=uid)
def insert_one(
self, doc: dict):
self.es.index(index=
self.index_name, doc_type=
self.index_type, body=doc)
def insert_array(
self, docs: list):
for doc
in docs:
self.es.index(index=
self.index_name, doc_type=
self.index_type, body=doc)
def search(
self, query, count: int =
30):
dsl = {
"query": {
"multi_match": {
"query": query,
"fields": [
"title",
"content",
"link"]
}
},
"highlight": {
"fields": {
"title": {}
}
}
}
match_data =
self.es.search(index=
self.index_name, body=dsl, size=count)
return match_data
嘗試一下把 MongoDB 中的數(shù)據(jù)插入到 ES 中。
import json
from datetime
import datetime
import pymongo
from app.elasticsearchClass
import elasticSearch
client = pymongo.MongoClient(
'127.0.0.1',
27017)
db = client[
'spider']
sheet = db.get_collection(
'Spider').find({}, {
'_id':
0, })
es = elasticSearch(index_type=
"spider_data",index_name=
"spider")
es.create_index()
for i
in sheet:
data = {
'title': i[
"title"],
'content':i[
"data"],
'link': i[
"link"],
'create_time':datetime.now()
}
es.insert_one(doc=
data)
到ES中查看一下,啟動 elasticsearch-head 插件。
如果是 npm 安裝的那么cd到根目錄之后直接npm run start就跑起來了。
發(fā)現(xiàn)新加的 spider 數(shù)據(jù)文檔確實(shí)已經(jīng)進(jìn)去了。
/爬蟲入庫/
要想實(shí)現(xiàn) ES 搜索,首先要有數(shù)據(jù)支持,而海量的數(shù)據(jù)往往來自爬蟲。
為了節(jié)省時(shí)間,編寫一個最簡單的爬蟲,抓取 百度百科。
簡單粗暴一點(diǎn),先 遞歸獲取 很多很多的 url 鏈接
import requests
import re
import time
exist_urls = []
headers = {
'User-Agent':
'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36',
}
def get_link(url):
try:
response = requests.
get(url=url, headers=headers)
response.encoding =
'UTF-8'
html = response.text
link_lists = re.findall(
'.*?<a target=_blank href="/item/([^:#=<>]*?)".*?</a>', html)
return link_lists
except Exception
as e:
pass
finally:
exist_urls.append(url)
# 當(dāng)爬取深度小于
10層時(shí),遞歸調(diào)用主函數(shù),繼續(xù)爬取第二層的所有鏈接
def main(start_url, depth=
1):
link_lists = get_link(start_url)
if link_lists:
unique_lists = list(
set(link_lists) -
set(exist_urls))
for unique_url
in unique_lists:
unique_url =
'https://baike.baidu.com/item/' + unique_url
with
open(
'url.txt',
'a+')
as f:
f.write(unique_url +
'\n')
f.close()
if depth <
10:
main(unique_url, depth +
1)
if __name__ ==
'__main__':
start_url =
'https://baike.baidu.com/item/%E7%99%BE%E5%BA%A6%E7%99%BE%E7%A7%91'
main(start_url)
把全部 url 存到 url.txt 文件中之后,然后啟動任務(wù)。
# parse.pyfrom celery
import Celery
import requests
from lxml
import etree
import pymongo
app = Celery(
'tasks', broker=
'redis://localhost:6379/2')
client = pymongo.MongoClient(
'localhost',
27017)
db = client[
'baike']
@app.task
def get_url(link):
item = {}
headers = {
'User-Agent':
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
res = requests.get(link,headers=headers)
res.encoding =
'UTF-8'
doc = etree.HTML(res.text)
content = doc.xpath(
"//div[@class='lemma-summary']/div[@class='para']//text()")
print(res.status_code)
print(link,
'\t',
'++++++++++++++++++++')
item[
'link'] = link
data =
''.join(content).replace(
' ',
'').replace(
'\t',
'').replace(
'\n',
'').replace(
'\r',
'')
item[
'data'] = data
if db[
'Baike'].insert(dict(item)):
print(
"is OK ...")
else:
print(
'Fail')
run.py 飛起來
from parse import get_url
def
main(
url):
result = get_url.delay(url)
return
result
def
run(
):
with
open(
'./url.txt',
'r')
as f:
for url
in f.
readlines(
):
main(
url.strip(
'\n'))
if __name__ ==
'__main__':
run()
黑窗口鍵入
celery -A parse worker -l info -P gevent -c 10
哦 !! 你居然使用了 Celery 任務(wù)隊(duì)列,gevent 模式,-c 就是10個線程刷刷刷就干起來了,速度杠杠的 ??!
啥?分布式? 那就加多幾臺機(jī)器啦,直接把代碼拷貝到目標(biāo)服務(wù)器,通過redis 共享隊(duì)列協(xié)同多機(jī)抓取。
這里是先將數(shù)據(jù)存儲到了 MongoDB 上(個人習(xí)慣),你也可以直接存到 ES 中,但是單條單條的插入速度堪憂(接下來會講到優(yōu)化,哈哈)。
使用前面的例子將 Mongo 中的數(shù)據(jù)批量導(dǎo)入到 ES 中,OK !!!
到這一個簡單的數(shù)據(jù)抓取就已經(jīng)完畢了。
同學(xué)們不清楚的地方,可以留言,更多的教程,也會繼續(xù)更新,感謝大家一直以來的支持!
應(yīng)伙伴們的要求,嘔心瀝血整理了
900集的全套Python學(xué)習(xí)視頻教程:Python 900集全套視頻教程(全家桶)
https://pan.baidu.com/s/1cU5lDWq9gh0cQ7hCnXUiGA
要學(xué)習(xí)的伙伴們,可以回復(fù):“Python視頻教程”,即可領(lǐng)??!
當(dāng)前題目:Python學(xué)習(xí)教程:手把手教你使用Flask搭建ES搜索引擎
本文鏈接:http://aaarwkj.com/article34/jesdse.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供虛擬主機(jī)、靜態(tài)網(wǎng)站、網(wǎng)站排名、App設(shè)計(jì)、網(wǎng)站制作、響應(yīng)式網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)