SAYN Project Example: Reddit News NLP¶
Project Description¶
Overview¶
This is an example SAYN project which shows how to use SAYN for data modelling and processing. You can find the GitHub repository here.
This project does the following:
- Extracts article data from Reddit RSS feeds
- Loads it into a SQLite database
- Cleans the extracted data
- Performs some basic text analysis on the transformed data
Features Used¶
- Python tasks to extract and analyse data
- Autosql tasks to automate SQL transformations.
- Usage of parameters to make the code dynamic.
- Usage of presets to define tasks.
In addition to SAYN, this project uses the following packages:
- RSS feed data extraction:
feedparser
- Data processing:
numpy
,pandas
,nltk
- Visualisations:
matplotlib
,wordcloud
,pillow
Running The Project¶
- Clone the repository with the command
git clone https://github.com/173TECH/sayn_project_example_nlp_news_scraping
. - Rename the
sample_settings.yaml
file tosettings.yaml
. - Install the project dependencies by running the
pip install -r requirements.txt
command from the root of the project folder. - Run all SAYN commands from the root of the project folder.
Implementation Details¶
Step 1: Extract Task Group¶
Quick Summary:
- Create the task group
extract.yaml
- Create a python task to extract and load the data
Task Details (load_data
)¶
First, we need to define our extract
group in our tasks folder. This group will only include the load_data
task. This is quite a simple python task which will use the LoadData
class from load_data.py
which we will create later.
Our load_data
task will have two parameters:
table
: name of the table we plan to create in our databaselinks
: list of links to rss feeds
tasks/extract.yaml
tasks:
load_data:
type: python
class: load_data.LoadData
parameters:
table: logs_reddit_feeds
links:
- https://www.reddit.com/r/USnews/new/.rss
- https://www.reddit.com/r/UKnews/new/.rss
- https://www.reddit.com/r/EUnews/new/.rss
Note
Parameters are not a requirement, however parameters make the code dynamic which is useful for reusability.
The load_data
task will have the following steps:
Appending Reddit data to dataframe
: loops through the links array, appends data from each link to a dataframeUpdating database
: loads dataframe into SQLite database usingpandas.to_sql
method
LoadData Class¶
Next, we will create our LoadData
class.
Our LoadData inherits properties from SAYN's PythonTask, in addition it will have 3 methods:
fetch_reddit_data
: fetches data from the Reddit RSS feedssetup
: sets the order of steps to runrun
: defines what each step does during the run
Attention
fetch_reddit_data
is a utility method for this task, while setup
and run
are the usual SAYN methods. Please note that methods setup
and run
need to return either self.success()
or self.fail()
in order to run.
Utility Method (fetch_reddit_data
)¶
The fetch_reddit_data
function uses the feedparser.parse
method to fetch the raw data from the rss feed link. It then converts the data into a pandas dataframe
to make it easier to work with.
The function also extracts the source of each article and adds it under the source
column.
python/load_data.py
import pandas as pd
import feedparser as f
from sayn import PythonTask
class LoadData(PythonTask):
def fetch_reddit_data(self, link):
"""Parse and label RSS Reddit data then return it in a pandas DataFrame"""
# get data from supplied link
raw_data = f.parse(link)
# transform data to dataframe
data = pd.DataFrame(raw_data.entries)
# select columns of interest
data = data.loc[:, ["id", "link", "updated", "published", "title"]]
# get the source, only works for Reddit RSS feeds
source_elements = link.split("/")
data["source"] = source_elements[4] + "_" + source_elements[5]
return data
def setup(self):
self.set_run_steps(["Appending Reddit data to dataframe", "Updating database"])
return self.success()
def run(self):
with self.step("Appending Reddit data to dataframe"):
links = self.parameters["links"]
table = self.parameters["user_prefix"] + self.task_parameters["table"]
df = pd.DataFrame()
for link in links:
temp_df = self.fetch_reddit_data(link)
n_rows = len(temp_df)
df = df.append(temp_df)
self.info(f"Loading {n_rows} rows into destination: {table}....")
with self.step("Updating database"):
if df is not None:
df.to_sql(
table, self.default_db.engine, if_exists="append", index=False
)
return self.success()
Tip
self.parameters["user_prefix"]
is set dynamically based on what you set it to in project.yaml, this can also be overwritten in settings.yaml
Step 2: Modelling Group¶
Quick Summary:
- Create the SQL query
dim_reddit_feeds.sql
to filter out duplicates - Create a modelling preset in
project.yaml
- Create the task group
modelling.yaml
Task Details (dim_reddit_feeds
)¶
Currently our load_data
task appends data to our database but it does not filter out any potential duplicates that we might encounter after multiple runs. This is where the modelling
group comes in, we can define an AutoSQL task to filter out any duplicates.
First, we need to create a sql query in our sql
folder that will filter out any duplicates; we will call it dim_reddit_feeds.sql
sql/dim_reddit_feeds.sql
SELECT DISTINCT id
, title
, published
, updated
, link
, source
FROM {{user_prefix}}logs_reddit_feeds
Tip
{{user_prefix}}
is set dynamically. The default value is set in project.yaml
. This can be overwritten using profiles in settings.yaml
.
Next, we will define a modelling preset in project.yaml
. Presets enable you to create a task prototype which can be reused when defining tasks. Hence, the modelling preset will simplify the code in modelling.yaml
while also allowing us to set dynamic file and table names.
Attention
Presets defined in project.yaml
are project level presets, you can also define presets within individual task groups.
project.yaml
required_credentials:
- warehouse
default_db: warehouse
presets:
modelling:
type: autosql
materialisation: table
file_name: "{{ task.name }}.sql"
destination:
table: "{{ user_prefix }}{{ task.name }}"
parameters:
user_prefix:
Tip
{{ task.name }}
returns the name of task
Now that we have the modelling preset, we can use it in the modelling
group. Since we want dim_reddit_feeds
to run after our load_data
task, we will need to set the parents of the task to load_data
.
tasks/modelling.yaml
tasks:
dim_reddit_feeds:
preset: modelling
parents:
- load_data
Step 3: Data Science Group¶
Quick Summary:
- Create the task group
data_science.yaml
- Create the python task
wordcloud
to generate wordclouds - Create the python task
nlp
to generate text statistics - Create the AutoSQL task
dim_reddit_feeds_nlp_stats
to calculate aggregate statistics grouped by source
Group Overview¶
Now that we have our cleaned dataset, we can utilise python tasks to do some natural language processing on our text data. In particular, we will use two libraries for this analysis:
nltk
: for basic text statisticswordcloud
: for generating wordcloud visualisations
First, we need to create the data_science
group in the tasks
folder. There will be two tasks within this group:
nlp
: generates the text statisticswordcloud
: generates the wordclouds
Both tasks will use data from our dim_reddit_feeds
table, therefore we will need to set their their table parameters to dim_reddit_feeds
. Since both of these tasks are children of the dim_reddit_feeds
task, we will also need to set their parents attributes to dim_reddit_feeds
.
The wordcloud
task has a stopwords
parameter, this parameter provides additional context related stopwords.
tasks/data_science.yaml
tasks:
nlp:
type: python
class: nlp.LanguageProcessing
parents:
- dim_reddit_feeds
parameters:
table: dim_reddit_feeds
wordcloud:
type: python
class: wordcloud.RenderCloud
parents:
- dim_reddit_feeds
parameters:
table: dim_reddit_feeds
stopwords:
- Reddit
Task Details (wordcloud
)¶
The wordcloud
task will have the following steps:
Grouping texts
: aggregates article titles and groups them by sourceGenerating clouds
: generates a wordcloud for each source, as well as the full dataset
RenderCloud Class¶
Next, we can define the class RenderCloud
for the wordcloud
task. RenderCloud
has 3 methods:
word_cloud
: generates a wordcloud visualisationsetup
: sets the order of steps to runrun
: defines what each step does during the run
Attention
word_cloud
is a utility method for this task, while setup
and run
are the usual SAYN methods. Please note that methods setup
and run
need to return either self.success()
or self.fail()
in order to run.
python/wordcloud.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sayn import PythonTask
from PIL import Image
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
class RenderCloud(PythonTask):
def word_cloud(
self, name, text, stopwords, b_colour="white", c_colour="black", show=False
):
"""Word cloud generating function"""
# attempt to find a compatible mask
try:
mask = np.array(Image.open(f"python/img/masks/{name}_mask.png"))
image_colours = ImageColorGenerator(mask)
except:
mask = None
image_colours = None
wordcloud = WordCloud(
stopwords=stopwords,
max_words=100,
mask=mask,
background_color=b_colour,
contour_width=1,
contour_color=c_colour,
color_func=image_colours,
).generate(text)
# store wordcloud image in "python/img"
wordcloud.to_file(f"python/img/{name}_wordcloud.png")
# declare show=True if you want to show wordclouds
if show:
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.show()
def setup(self):
self.set_run_steps(["Grouping texts", "Generating clouds"])
return self.success()
def run(self):
with self.step("Grouping texts"):
table = self.parameters["user_prefix"] + self.task_parameters["table"]
df = pd.DataFrame(self.default_db.read_data(f"SELECT * FROM {table}"))
full_text = " ".join(article for article in df.title)
sources = df.groupby("source")
grouped_texts = sources.title.sum()
with self.step("Generating clouds"):
stopwords = STOPWORDS.update(self.parameters["stopwords"])
self.info("Generating reddit_wordcloud.png")
self.word_cloud("reddit", full_text, stopwords)
# Source specific wordclouds
for group, text in zip(grouped_texts.keys(), grouped_texts):
self.info(f"Generating {group}_wordcloud.png")
self.word_cloud(
group, text, stopwords, b_colour="black", c_colour="white"
)
return self.success()
Task Details (nlp
)¶
The nlp
task will have the following steps:
Processing texts
: generates text statistics for each titleUpdating database
: similar to LoadData step, has additional debugging information
LanguageProcessing Class¶
Moving on, we can define the class LanguageProcessing
for the nlp
task. LanguageProcessing
has 3 methods:
desc_text
: provides counts of letters, words and sentences in an articlesetup
: sets the order of steps to runrun
: defines what each step does during the run
Attention
desc_text
is a utility method for this task, while setup
and run
are the usual SAYN methods. Please note that methods setup
and run
need to return either self.success()
or self.fail()
in order to run.
python/nlp.py
import pandas as pd
from sayn import PythonTask
from nltk import download
from nltk.tokenize import word_tokenize, sent_tokenize
download("punkt")
class LanguageProcessing(PythonTask):
def desc_text(self, df, text_field, language):
"""Text stats generating function"""
# counts the number of letters in text_field
df[text_field + "_letters"] = df[text_field].fillna("").str.len()
# counts the number of words in text_field
df[text_field + "_words"] = (
df[text_field]
.fillna("")
.apply(lambda x: len(word_tokenize(x, language=language)))
)
# counts the number of sentences in text_field
df[text_field + "_sentences"] = (
df[text_field]
.fillna("")
.apply(lambda x: len(sent_tokenize(x, language=language)))
)
def setup(self):
self.set_run_steps(["Processing texts", "Updating database"])
return self.success()
def run(self):
with self.step("Processing texts"):
table = self.parameters["user_prefix"] + self.task_parameters["table"]
df = pd.DataFrame(self.default_db.read_data(f"SELECT * FROM {table}"))
self.info(f"Processing texts for title field")
self.desc_text(df, "title", "english")
with self.step("Updating database"):
if df is not None:
output = f"{table}_{self.name}"
n_rows = len(df)
self.info(f"Loading {n_rows} rows into destination: {output}....")
df.to_sql(
output, self.default_db.engine, if_exists="replace", index=False
)
return self.success()
Task Details (dim_reddit_feeds_nlp_stats
)¶
Now that we have individual article statistics, it would be a good idea to create an additional modelling task to find some aggregate statistics grouped by source. Let's create another SQL query called dim_reddit_feeds_nlp_stats
in the sql
folder. This query will give us the average, grouped by source, of the text statistics generated by the nlp
task.
sql/dim_reddit_feeds_nlp_stats.py
SELECT source
, AVG(title_letters) AS average_letters
, AVG(title_words) AS average_words
, AVG(title_sentences) AS average_sentences
FROM {{user_prefix}}dim_reddit_feeds_nlp
GROUP BY 1
ORDER BY 1
Finally, we can add the dim_reddit_feeds_nlp_stats
task to the the modelling
group. Like the previous modelling task, we will create this task using the modelling preset in project.yaml
; setting the parents parameter to nlp
. We want to materialise this query as a view; therefore, we will need to overwrite the materialisation parameter of the preset.
modelling.yaml
tasks:
dim_reddit_feeds:
preset: modelling
parents:
- load_data
dim_reddit_feeds_nlp_stats:
preset: modelling
materialisation: view
parents:
- nlp
Step 4: Run the project¶
All that's left is to run the project in the command line. Change your directory to this project's folder and enter sayn run
.
Attention
Please note that if you did not clone the git repo, you may have some issues with the wordcloud generation. We recommend you create a folder called img
within the python
folder, if you do not already have one.