A Data Pipeline Project Using Brazilian Stocks

Raphael Reis
8 min readJun 14, 2022

As beginners, we usually create data projects starting in the ETL, data-viz phase. However, as long as we discover about the data world, we learn that are many steps in a project to be done.

I always liked to get a kaggle database and dive into the exploratory data analysis to plot tons of visualizations. And to be honest, I still think that is a fair way to start. But, when we get to a point where we already participate in real data projects, begin to thrive a will to develop a more robust project. So, that was what I faced when I get enough experience to fill confident to develop an entirely personal project on my own, and know I am here to share some experiences that I acquired in this adventure.

So, if we think in a data pipeline process we wil end up with something like the image below in our minds:

Data pipeline schema

Basically, we have a source where the data we want is, a mid-part responsible for extracting and transforming the data as we need and, in the end, a target where we gonna consume all those information.

With that in mind, let’s think of a way to get data from the Brazilian stock market. What we need is a crawler that shall get the data based on the name of a stock, a location to save these data, and a dashboard to create some visualization to extract some information. And above all this, will be nice if this entire process worked without a need for interaction.

So, I came up with an architecture to help us through:

Let’s break it down, starting with the end. We gonna have a PowerBI dashboard that will be our target. There, we will find some visualizations of the Stocks that we want. The dash will be plugged into a database. But in respect of data gods, we will not use excel here. We will link the PowerBi directly in a Postgres database.

A Python Script will be responsible for both, getting the data by a Crawler and storing it into the database. Last but not least, the main idea behind a data pipeline is that all that work be executed automatically. For that, we gonna use a Scheduler named Airflow and we gonna use it through a Docker image.

To execute this, I used a “Lego approach”, where I build from “from inside to outside”.

So, the first step was to build a dashboard mockup, then prepare the Postgres tables, then develop the script, and last, automate all this with Airflow.

  1. The Dashboard Mockup
  2. Postegress Database
  3. Python Script
  4. Airflow + Docker
  1. The Dashboard Mockup

The first part is the simpler one. I made the mockup below using Figma and the main idea is to have some basic info on the top with big numbers and three plots: relative variation of the stocks, dividends paid, and historic close value. On the right we gonna have one filter for stocks and one data filter, besides a button to change to another dashboard, with the same layout, with Investment Funds information.

2. Postgres Database

The idea is to connect the PowerBI to a true database and not to an excel spreadsheet. So, once the crawler was ready, the data will be inserted into the tables. For that, I built four tables using Pgadmin4:

  • Stocks: Shall save data like open value, close value, volume, etc.
  • Infos: Shall save some historic information like the max historic value of a stock and some personal information about stocks that I own, like quantity and amount of money invested.
  • Variations: Shall save the absolute and relative variation of a stock between the present day and a month before, two months before, ans so on.
  • Dividends: Shall save information about value of dividends paid to investors.

3. Python Script

The most fun part of the process. Let’s code!

First I defined some variables that I gonna use in the script:

  • date information to create a data range for our crawler, in this example I am getting two years of data.
  • A list with stocks and funds that I suppose owned
  • A dictionarie to identify what is Stock and what is Investment Fund

Then, I created the function responsible for gathering the data from Yahoo Finance and storing it into the database.

So the data will look like this:

Next, I created the function responsible to get extra information. The part is a tricky one because the goal here is to evaluate with my own data. That means that I have a private excel file where I keep the records of when I bought each stock, how much I paid for it, etc. So, if you intend to reproduce this code, maybe that part will be unnecessary or you can make you own excel file.

This function merge information gathered from the crawler with information of my own. I ran a simulation with fake data, to keep my real stock’s information safe. So, the data will be like this:

The next and most challenging one is the variation function. Here I calculated the variation of a stock between the current day and one month before, two months before, three months before, and so on until one year before. Then, I jumped to today and two years before.

As you can see, the part runs twelve times, one for each month and later I repeat the exact same thing but for 24 months. I will not put an image of it to not pollute the text.

The trick here is take care with the dates. We cannot just get today’s date and subtract months, because the date that we might end up with could be a date where bovespa (brazilian stock market) does not operate and when we tried to make the calculations, we will gonna be making counting with null values. By that, we get the first date that we have in the history which the first function gave to us, to make the data correct and like this:

The last one is the dividend function. Very similar to the first one, we also call here the crawler to get the dividend data.

And, in the database, the data will look like this:

4. Airflow + Docker

So, now we have a Python Script that inserts data into the database and the PowerBi consumes this data to make the dashboard. Awesome! But… we need to execute it everyday…? Well, if you stop here, then yes. You will need toexecute the script manually every day, or at least when you want to update the data. However, if you can skip that process, you could use Airflow!

Airflow is a scheduler for those who don’t know about it. It is responsible for scheduling routines that you might have. And it does that by something called DAGs, a short name for Directed Acyclic Graphs. That means that your tasks that you want to execute will be like nodes in a graph and this graph has a direction and does not permit cycles. So it will be something like this:

As you can see in the example above you have an initial process called check_s3_for_key. That one, once completed will trigger a dependent process: files_to_s3. That one has 4 processes that depended on it and so on. And a later process cannot re-trigger a past process.

In our case, our dag will be so much simpler because we just have one process to execute.

To get Airflow you can go to https://airflow.apache.org/ and search for the getting started guide. If you are on a Windows machine like me, I strongly recommend using it with Docker. Just Download Docker from here: https://docs.docker.com/desktop/windows/install/, it will come with docker-compose already. And then just follow the process in the Airflow Documentation.

Basically, you will have an image of the airflow, that will be up by docker-compose. In our case here, we need some libraries that don’t come with the regular Airflow image. Because of that, I built a simple Dockerfile where I install these libraries, built the docker image, and then replaced the original Aiflow Image for that customized image:

With data we gonna be able to run the airflow webserver:

Now, we need to create our dag. For that we gonna create a simple python file:

Here we are passing some basic arguments, creating the dag, by giving it a name and a schedule_interval, and at last, we gonna use our dag with a PythonOperator (because our process is in Python) by calling the main function in our code. And to be able to use our function we have to make the correct import.

Now, we can see our Dag in the airflow web server and we can execute it!

When the border box is dark green, we know that our process executed everything without errors:

With that, now we have our dashboard and an entire pipeline to supply it:

The project is entirely in https://github.com/RaphaCoelho/bovespa. I hope you guys enjoy it! ;)

Additional info: I’ve seen that some Investment Funds in Yahoo Finance break the crawler, (e.g.:BCFF11.SA). That is mainly because is a problem in the historic data and it causes a bug in the data_reader library.

--

--