Updated on 2022-07-26
Data pipeline with Python, MySQL and AWS
In this project, I created an ETL process to move real estate data from the RapidAPI website to a relational database. With the API provided by that website, it was possible to access the data using Python and load it into the MySQL database. Finally, the entire process was transferred to the cloud and automated using AWS services.
The data extracted is about sale, rental and sold properties in New York City. The complete code and instructions of how to reproduce this project are on Github.
Environment
To execute this project, I used Python 3.10 language and MySQL 8 database. Also, I use requests package to extract data from the API, pandas for data manipulation and pymysql to connect to the database.
Execution
Initially, some functions were defined for the better organization of the code. As an example, below is the get_sales() function, which returns data about sales.
def get_sales(city, state_code, limit):
url = "https://realty-in-us.p.rapidapi.com/properties/v2/list-for-sale"
querystring = {"city":city,"state_code":state_code,"offset":"0","limit":limit,"sort":"relevance"}
headers = {
"X-RapidAPI-Host": "realty-in-us.p.rapidapi.com",
"X-RapidAPI-Key": "80d4101192msha3850ddbf66ba87p1c5dc9jsn7ad589c1b934"
}
try:
response_sale = pd.json_normalize(requests.request("GET", url, headers=headers, params=querystring).json(), record_path="properties", sep="_")
return response_sale
except:
print("Error")
get_rents and get_sold functions were also defined.
As some fields returned as NAN, the function to fill in the missing data was also defined:
# Funtion to fill NAN values in the whole dataframe
def fill_na(df):
columns = df.columns.to_list()
for col in columns:
if df[col].dtype == 'object':
df[col].fillna("NA", inplace=True)
elif df[col].dtype == 'int64':
df[col].fillna(1, inplace=True)
elif df[col].dtype == 'float64':
df[col].fillna(1.0, inplace=True)
return df
Extraction
When executing the get_sales function, we get the following dataframe:

After doing the same with the rent and sales data, we run fill_na() function and check if there is still any NA value:

Now we are all set to load the data into the database.
Load
In this step, the cloud services from AWS were used. A MySQL instance was created using Amazon RDS. Using the pymysql package, the code below makes the connection to the database and creates the schema called realty.
# Create a connection with the database service in local machine
con = pymysql.connect(host ='db-realty.ctu8ks44oyou.us-east-1.rds.amazonaws.com', port=int(3306), user='admin', passwd='')
# Creating a cursor object
cursor = con.cursor()
# Create a schema called realty
cursor.execute('''
DROP SCHEMA IF EXISTS realty;
''')
cursor.execute('''
CREATE SCHEMA realty;
''')
# Use the new schema
cursor.execute('''
USE realty;
''')
After that, we create the sales, rents and sold tables. Below is an example of sales data table:
# Create tables SALES
cursor.execute('''
CREATE TABLE sales (
property_id VARCHAR(12),
prop_type VARCHAR(20),
prop_status VARCHAR(20),
price INT,
baths INT,
beds FLOAT,
address_city VARCHAR(50),
address_line VARCHAR(200),
address_state_code CHAR(2),
address_state VARCHAR(50),
address_county VARCHAR(50),
addres_lat DOUBLE,
address_lon DOUBLE,
address_neighborhood_name VARCHAR(50)
)
''')
To effectively load the data into the database, we run the code below for sales table, and the same is done for the other tables.
# Convert the Dataframe into a list of arrays
sales_tuples = tuple(sales.to_records(index=False))
for data in range(len(sales_tuples)):
# Create a new record
query = "INSERT INTO sales (property_id, prop_type, prop_status, price, baths, beds, address_city, address_line, address_state_code, address_state, \
address_county, addres_lat, address_lon, address_neighborhood_name) VALUES {}".format(sales_tuples[data])
# Execute the query
cursor.execute(query)
Finally, we commit and close the connection:
# Commit
con.commit()
con.close()
MySQL Workbench
We can access the database using MySQL Workbench software for better data manipulation. Below is a sample:

We can see the data is there:

AWS Lambda and Amazon EventBridge
To automate the entire ETL process, I used AWS Lambda service, which allows us to run code without having a server. I created a lambda function and added all the code shown earlier. For the external packages to work, it was necessary to add layers through the ARN found here.

After that, a rule was created in Amazon EventBridge (CloudWatch Events) to schedule and automate the execution of the script. In the example below, I scheduled the run for once a day at 6 AM using a Cron expression.
Cron expression
0 6 * * ? *
Next 10 trigger date(s)
Fri, 27 May 2022 06:00:00 GMT
Sat, 28 May 2022 06:00:00 GMT
.
.
.
Next steps
As the data collected contains latitude and longitude of the properties, it would be interesting to collect data about the regions of New York, such as crime or air quality, for example, to match with our data and get more complete information about a particular property.
Also, we can use the same endpoint of our Amazon RDS database and make a dashboard in PowerBI or Tableau.
Ps: I couldn't find examples of some data containing latitude and longitude. If you know, please email me.
Project information
- Category: Data Engineering - ETL
- Project date: 25 May, 2022
- Project files: On Github