started optar

This commit is contained in:
Askill 2022-10-14 23:04:13 +02:00
commit aefcfa85fa
12 changed files with 285 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
venv/**
.idead

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/../../../../:\projects\optar\.idea/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
</profile>
</component>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (optar)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/optar.iml" filepath="$PROJECT_DIR$/.idea/optar.iml" />
</modules>
</component>
</project>

10
.idea/optar.iml Normal file
View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (optar)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

95
src/Crawler.py Normal file
View File

@ -0,0 +1,95 @@
import json
from time import sleep
from urllib.parse import urljoin
from lxml import html
import requests
import logging
class Crawler:
url = "" # the url of the website to be checked
links = dict() # dic. with all sites and urls on those sites
header_values = {
'Connection:': 'Keep-alive',
'name': 'Michael Foord',
'location': 'Northampton',
'language': 'English',
'User-Agent': 'Mozilla 4/0'}
exclude = [
]
def __init__(self, logger=None, exclude=None):
if exclude:
self.exclude += exclude
if logger:
self.logger = logger
else:
self.logger = logging.Logger(
name="star_crawler", level=logging.INFO)
def persist(self, path):
with open(path, 'w') as fp:
json.dump(self.links, fp)
def load_site(self, path):
with open(path, 'r') as fp:
self.links = json.load(fp)
def run(self, root, limit, sleep_time=0):
self.url = root
unchecked = [root]
while unchecked and len(self.links) < limit:
root = unchecked.pop()
if root in self.links or self.url.rsplit('/')[2] not in root:
continue
if "https" not in root:
continue
clean = False
for element in self.exclude:
if element in root:
clean = False
break
else:
clean = True
if not clean:
continue
self.logger.info(f"{len(self.links)} {root}")
try:
site = requests.get(root)
tree = html.fromstring(site.content)
links = tree.xpath('//a/@href')
except:
continue
nlinks = []
for link in links:
if link not in nlinks:
if link.startswith("http"):
nlinks.append(link)
else:
nlinks.append(urljoin(site.url, link))
unchecked += nlinks
self.links[root] = nlinks
sleep(sleep_time)
def getNodesEdges(self):
nodes = []
edges = []
for key, value in self.links.items():
nodes.append(key)
for edge in value:
edges.append([key, edge])
return nodes, edges
def makeGraph(self, g):
nodes, edges = self.getNodesEdges()
for node in nodes:
g.add_node(node)
for f, t in edges:
g.add_edge(f, t)

79
src/SiteReader.py Normal file
View File

@ -0,0 +1,79 @@
import json
from typing import List, Dict
import requests
import trafilatura
from requests.exceptions import MissingSchema
from bs4 import BeautifulSoup
class SiteReader:
def __init__(self):
pass
def beautifulsoup_extract_text_fallback(self, response_content):
'''
This is a fallback function, so that we can always return a value for text content.
Even for when both Trafilatura and BeautifulSoup are unable to extract the text from a
single URL.
'''
# Create the beautifulsoup object:
soup = BeautifulSoup(response_content, 'html.parser')
# Finding the text:
text = soup.find_all(text=True)
# Remove unwanted tag elements:
cleaned_text = ''
blacklist = [
'[document]',
'noscript',
'header',
'html',
'meta',
'head',
'input',
'script',
'style', ]
# Then we will loop over every item in the extract text and make sure that the beautifulsoup4 tag
# is NOT in the blacklist
for item in text:
if item.parent.name not in blacklist:
cleaned_text += '{} '.format(item)
# Remove any tab separation and strip the text:
cleaned_text = cleaned_text.replace('\t', '')
return cleaned_text.strip()
def extract_text_from_single_web_page(self, url):
downloaded_url = trafilatura.fetch_url(url)
try:
a = trafilatura.extract(downloaded_url, json_output=True, with_metadata=True, include_comments=False,
date_extraction_params={'extensive_search': True, 'original_date': True})
except AttributeError:
a = trafilatura.extract(downloaded_url, json_output=True, with_metadata=True,
date_extraction_params={'extensive_search': True, 'original_date': True})
if a:
json_output = json.loads(a)
return json_output['text']
else:
try:
resp = requests.get(url)
# We will only extract the text from successful requests:
if resp.status_code == 200:
return self.beautifulsoup_extract_text_fallback(resp.content)
else:
# This line will handle for any failures in both the Trafilature and BeautifulSoup4 functions:
return None
# Handling for any URLs that don't have the correct protocol
except MissingSchema:
return None
def get_sites_content_dynamic(self, urls: List[str]):
pass
def get_sites_content_static(self, urls: List[str]) -> Dict[str, str]:
return {url: self.extract_text_from_single_web_page(url) for url in urls}

15
src/SiteStore.py Normal file
View File

@ -0,0 +1,15 @@
import os
from typing import List
class SiteStore:
def __init__(self):
pass
@staticmethod
def get_site_history(fqdn) -> List[str]:
cache_path = f"./cached/{fqdn}"
if not os.path.isdir(cache_path):
return [""]
return sorted(os.listdir(cache_path))

46
src/Watcher.py Normal file
View File

@ -0,0 +1,46 @@
from typing import List, Dict
from src.SiteReader import SiteReader
from src.SiteStore import SiteStore
class Watcher:
def __init__(self) -> None:
self.site_store = SiteStore()
self.site_reader = SiteReader()
self.keywords_source_path = ""
self.sites_source_path = ""
def read_txt_file(self, path):
with open(path) as f:
return f.read().splitlines()
def watch(self):
while True:
keywords = self.read_txt_file(self.keywords_source_path)
sites = self.read_txt_file(self.sites_source_path)
contents = [self.get_new_content(site) for site in sites]
keywords = [x for x in self.get_new_content(keyword) for keyword in keywords]
matches = []
for url, content in contents.items():
matches.append(self.search_sites(url, content, keywords))
print(matches)
def get_new_content(self, fqdm) -> List[str]:
""" get all past iterations of a site by the fully qualified domain name """
list_of_files = self.site_store.get_site_history(fqdm)
prev_version = list_of_files[-2]
current_version = list_of_files[-1]
news = dict(set(prev_version.items()) ^ set(current_version.items()))
sites_contents = self.site_reader.get_sites_content_static(sum(news.items()))
return sites_contents
def search_sites(self, url, content, keywords: List[str]):
results = []
for keyword in keywords:
if keyword in content.values():
results.append((url, keyword))
return results