Database for Financial Analysis II - MongoDB
MongoDB is a document-oriented database.
Instead of storing your data in tables made out of individual rows, like a relational database does, it stores your data in collections made out of individual documents. In MongoDB, a document is a big JSON blob with no particular format or schema.
You can have all your data in one single collection.
Goal: Create Database in MongoDB
- Create a document style data in MongoDB
- example: reinsurance treaty by reinsurer/year/treaty type
- Create a document style data in MongoDB from Json API format
- example: realtime stock price and realtime sector data
- Read Twitter data into MongoDB
- example : read twitter about what type of insurance people are interesting
Making a Connection with MongoClient
- Below link is an introduction tutorial to working with MongoDB and PyMongo
- ref: http://api.mongodb.com/python/current/tutorial.html
## Making a Connection with MongoClient
## Import pymongo
import pymongo
## The first step when working with PyMongo is to create a MongoClient to the running MongoDB instance
from pymongo import MongoClient
## Connect on the default host and port
client = MongoClient()
## We can also specify the host and port explicitly
client = MongoClient('localhost', 27017)
Getting a Database
- A single instance of MongoDB can support multiple independent databases
- When working with PyMongo you access databases using attribute style access on MongoClient instances
- Database name can not use attribute style access (ie. test-data), but "test_data" is okay
- A collection is a group of documents stored in MongoDB, and can be thought of as roughly the equivalent of a table
- Getting a collection in PyMongo works the same as getting a database
- Collections and databases are created when the first document is inserted into them.
## Set up an instance name "test_database" in MongoDB
db = client.test_database
## Set up collection - Table
collection = db.test_collection
collection
Document - JSON-style
- Data in MongoDB is represented (and stored) using JSON-style documents.
- In PyMongo we use dictionaries to represent documents.
- Example dictionary: reinsurance treaty by reinsurer/year/treaty type
- Documents can contain native Python types (like datetime.datetime instances) which will be automatically converted to and from the appropriate BSON types.
## Enter document into the collection
import datetime
treaty = {"reinsurer": "AIG",
"treaty": "XOL layer",
"tags": ["reinsurer", "treaty", "year"],
"date": datetime.datetime.utcnow()}
Insert a document into a collection - use the insert_one() method
- When a document is inserted a special key, "_id", is automatically added.
- The value of "_id" must be unique across the collection. insert_one() returns an instance of InsertOneResult.
treaties = db.treaties
treaty_id = treaties.insert_one(treaty).inserted_id
treaty_id
After inserting the first document, the posts collection has actually been created on the server.
- We can verify this by listing all of the collections in our database
db.list_collection_names()
treaty2 = {"reinsurer": "Swiss Re",
"treaty": "Clash Layer",
"tags": ["reinsurer", "treaty", "year"],
"date": datetime.datetime.utcnow()}
treaty_id = treaties.insert_one(treaty2).inserted_id
treaty_id
treaty3 = {"reinsurer": "Parter Re",
"treaty": "XOL 2nd Layer",
"tags": ["reinsurer", "treaty", "year"],
"date": datetime.datetime.utcnow()}
treaty_id = treaties.insert_one(treaty3).inserted_id
treaty_id
Getting a Single Document With find_one()
- The most basic type of query that can be performed in MongoDB is find_one()
- This method returns a single document matching a query (or None if there are no matches)
- It is useful when you know there is only one matching document, or you are only interested in the first match
- Here we use find_one() to get the first document from the posts collection:
import pprint
pprint.pprint(treaties.find_one())
pprint.pprint(treaties.find_one({"reinsurer": "AIG"}))
pprint.pprint(treaties.find_one({"reinsurer": "ACE"}))
Querying By ObjectId
- We can also find a post by its _id, which in our example is an ObjectId:
treaty_id ##output is an object
pprint.pprint(treaties.find_one({"_id": treaty_id}))
Note that an ObjectId is not the same as its string representation
treaty_id_as_str = str(treaty_id)
treaty_id_as_str ## output is a string
treaties.find_one({"_id": treaty_id_as_str}) # No result
Bulk Inserts
- we can also perform bulk insert operations, by passing a list as the first argument to insert_many()
- This will insert each document in the list, sending only a single command to the server
- New_treaty has a different “shape” than the other posts - there is no "tags" field and we’ve added a new field, "title"
- This is what we mean when we say that MongoDB is schema-free
## Enter a new document with different “shape” than the other posts
## There is no "tags" field and we’ve added a new field, "retenion"
new_treaty = [{"reinsurer": "AIG",
"treaty": "XOL Layer 2018",
"tags": ["bulk", "insert"],
"date": datetime.datetime(2018, 11, 12, 11, 14)},
{"reinsurer": "Munich Re",
"treaty": "QS 2018",
"retenion": "QS 20% for US business",
"date": datetime.datetime(2018, 11, 10, 10, 45)}]
result = treaties.insert_many(new_treaty)
result.inserted_ids
The result from insert_many() now returns two ObjectId instances, one for each inserted document.
Query data
- Use find()
- Use find_one()
for treaty in treaties.find():
pprint.pprint(treaty)
## Use find() for query
# Find all treaty in the collection
# for treaty in treaties.find():
# pprint.pprint(treaty)
# Find all reinsurer-AIG treaty:
for treaty in treaties.find({"reinsurer": "AIG"}):
pprint.pprint(treaty)
## Find one reinsurer-AIG treaty:
pprint.pprint(treaties.find_one({"reinsurer": "AIG"}))
Counting
- If we just want to know how many documents match a query we can perform a count() operation instead of a full query
- We can get a count of all of the documents in a collection:
## How many AIG treaty?
treaties.count_documents(
filter={"reinsurer": "AIG"}
)
## How many treaties?
treaties.count_documents(
filter={}
)
Range Queries
- MongoDB supports many different types of advanced queries.
- As an example, lets perform a query where we limit results to treaties older than a certain date, but also sort the results by reinsurers
d = datetime.datetime(2018, 9, 12, 12)
for treaty in treaties.find({"date": {"$lt": d}}).sort("reinsurer"):
pprint.pprint(treaty)
Indexing
- Adding indexes can help accelerate certain queries and can also add additional functionality to querying and storing documents
- In this example, we’ll demonstrate how to create a unique index on a key that rejects documents whose value for that key already exists in the index
- First, we’ll need to create the index
result = db.profiles.create_index([('reinsurer_id', pymongo.ASCENDING)],
unique=True)
sorted(list(db.profiles.index_information()))
Notice that we have two indexes now
- One is the index on _id that MongoDB creates automatically, and the other is the index on reinsurer_id we just created.
## set up some user profiles
user_profiles = [
{'reinsurer_id': 100, 'reinsurer': 'Atlantic Re'},
{'reinsurer_id': 25, 'reinsurer': 'XL Re'}]
result = db.profiles.insert_many(user_profiles)
## DuplicateKeyError: E11000 duplicate key error collection: test_database.profiles index: user_id_1 dup key: { : 235 }
new_profile = {'user_id': 213, 'reinsurer': 'XL American'}
duplicate_profile = {'user_id': 235, 'reinsurer': 'SCOR S.E'}
result = db.profiles.insert_one(new_profile) # This is fine.
result = db.profiles.insert_one(duplicate_profile)
Create a document style data in MongoDB from Json API format
- example: realtime intraday MSFT stock price
- example: realtime sector stock return
realtime_stock_data={
"Meta Data": {
"1 Information": "Intraday (5min) open, high, low, close prices and volume",
"2 Symbol": "MSFT",
"3 Last Refreshed": "2018-08-22 15:55:00",
"4 Interval": "5min",
"5 Output Size": "Compact",
"6 Time Zone": "US/Eastern"
},
"Time Series (5min)": {
"2018-08-22 15:55:00": {
"1 open": "107.1500",
"2 high": "107.2100",
"3 low": "107.0500",
"4 close": "107.0500",
"5 volume": "970838"
}
}
}
stock_data = db.stock_data
stock_data_id = stock_data.insert_one(realtime_stock_data).inserted_id
stock_data_id
realtime_sector_data={
"Meta Data": {
"Information": "US Sector Performance (realtime & historical)",
"Last Refreshed": "02:05 PM ET 08/23/2018"
},
"Rank A: Real-Time Performance": {
"Information Technology": "0.51%",
"Consumer Discretionary": "-0.04%",
"Utilities": "-0.11%",
"Telecommunication Services": "-0.13%",
"Health Care": "-0.20%",
"Real Estate": "-0.25%",
"Consumer Staples": "-0.27%",
"Industrials": "-0.48%",
"Financials": "-0.54%",
"Energy": "-0.57%",
"Materials": "-0.86%"
},
"Rank B: 1 Day Performance": {
"Energy": "1.20%",
"Information Technology": "0.48%",
"Consumer Discretionary": "0.12%",
"Health Care": "0.11%",
"Financials": "-0.26%",
"Materials": "-0.45%",
"Consumer Staples": "-0.63%",
"Real Estate": "-0.65%",
"Utilities": "-0.78%",
"Industrials": "-0.93%",
"Telecommunication Services": "-2.02%"
},
"Rank C: 5 Day Performance": {
"Energy": "3.27%",
"Industrials": "2.23%",
"Consumer Discretionary": "2.21%",
"Materials": "2.02%",
"Financials": "1.93%",
"Health Care": "1.71%",
"Telecommunication Services": "1.09%",
"Information Technology": "0.84%",
"Consumer Staples": "0.76%",
"Real Estate": "0.16%",
"Utilities": "-0.35%"
},
"Rank D: 1 Month Performance": {
"Telecommunication Services": "6.53%",
"Health Care": "5.38%",
"Industrials": "2.75%",
"Real Estate": "2.62%",
"Consumer Staples": "2.57%",
"Financials": "2.45%",
"Utilities": "1.92%",
"Consumer Discretionary": "1.90%",
"Materials": "0.90%",
"Information Technology": "0.57%",
"Energy": "-1.17%"
},
"Rank E: 3 Month Performance": {
"Health Care": "9.80%",
"Consumer Staples": "9.18%",
"Utilities": "8.72%",
"Consumer Discretionary": "8.41%",
"Real Estate": "8.28%",
"Telecommunication Services": "6.02%",
"Information Technology": "5.55%",
"Financials": "0.49%",
"Industrials": "0.15%",
"Materials": "-2.15%",
"Energy": "-5.60%"
},
"Rank F: Year-to-Date (YTD) Performance": {
"Information Technology": "16.10%",
"Consumer Discretionary": "15.79%",
"Health Care": "10.55%",
"Energy": "2.53%",
"Financials": "1.05%",
"Utilities": "1.04%",
"Real Estate": "0.96%",
"Industrials": "0.75%",
"Materials": "-2.75%",
"Consumer Staples": "-5.33%",
"Telecommunication Services": "-6.21%"
},
"Rank G: 1 Year Performance": {
"Information Technology": "31.09%",
"Consumer Discretionary": "29.36%",
"Energy": "20.32%",
"Health Care": "16.84%",
"Financials": "15.29%",
"Industrials": "12.18%",
"Materials": "9.50%",
"Real Estate": "3.09%",
"Telecommunication Services": "-0.85%",
"Utilities": "-2.24%",
"Consumer Staples": "-2.32%"
},
"Rank H: 3 Year Performance": {
"Information Technology": "95.67%",
"Consumer Discretionary": "54.45%",
"Financials": "46.80%",
"Industrials": "45.21%",
"Materials": "36.15%",
"Health Care": "27.81%",
"Utilities": "18.65%",
"Energy": "18.31%",
"Consumer Staples": "13.63%",
"Telecommunication Services": "5.73%"
},
"Rank I: 5 Year Performance": {
"Information Technology": "151.29%",
"Consumer Discretionary": "98.24%",
"Health Care": "85.23%",
"Financials": "75.87%",
"Industrials": "67.23%",
"Materials": "45.10%",
"Utilities": "43.92%",
"Consumer Staples": "35.93%",
"Telecommunication Services": "4.47%",
"Energy": "-6.33%"
},
"Rank J: 10 Year Performance": {
"Consumer Discretionary": "284.77%",
"Information Technology": "252.24%",
"Health Care": "176.38%",
"Industrials": "107.68%",
"Consumer Staples": "90.16%",
"Financials": "74.45%",
"Materials": "50.53%",
"Utilities": "41.62%",
"Telecommunication Services": "21.26%",
"Energy": "-3.80%"
}
}
sector_data = db.sector_data
sector_data_id = sector_data.insert_one(realtime_sector_data).inserted_id
sector_data_id
Collect data from Twitter into MongoDB
- ref: http://pythondata.com/collecting-storing-tweets-with-python-and-mongodb/
from __future__ import print_function
import tweepy
import json
from pymongo import MongoClient
MONGO_HOST= 'mongodb://localhost/twitterdb' # assuming you have mongoDB installed locally
# and a database called 'twitterdb'
WORDS = ['#insurance', '#AI', '#datascience', '#machinelearning', '#auto', '#reinsurance']
CONSUMER_KEY = %env TWITTER_CONSUMER_KEY
CONSUMER_SECRET = %env TWITTER_CONSUMER_SECRET
ACCESS_TOKEN =%env TWITTER_ACCESS_TOKEN
ACCESS_TOKEN_SECRET = %env TWITTER_ACCESS_TOKEN_SECRET
class StreamListener(tweepy.StreamListener):
#This is a class provided by tweepy to access the Twitter Streaming API.
def on_connect(self):
# Called initially to connect to the Streaming API
print("You are now connected to the streaming API.")
def on_error(self, status_code):
# On error - if an error occurs, display the error / status code
print('An Error has occured: ' + repr(status_code))
return False
def on_data(self, data):
#This is the meat of the script...it connects to your mongoDB and stores the tweet
try:
client = MongoClient(MONGO_HOST)
# Use twitterdb database. If it doesn't exist, it will be created.
db = client.twitterdb
# Decode the JSON from Twitter
datajson = json.loads(data)
#grab the 'created_at' data from the Tweet to use for display
created_at = datajson['created_at']
#print out a message to the screen that we have collected a tweet
print("Tweet collected at " + str(created_at))
#insert the data into the mongoDB into a collection called twitter_search
#if twitter_search doesn't exist, it will be created.
db.twitter_search.insert(datajson)
except Exception as e:
print(e)
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
#Set up the listener. The 'wait_on_rate_limit=True' is needed to help with Twitter API rate limiting.
listener = StreamListener(api=tweepy.API(wait_on_rate_limit=True))
streamer = tweepy.Stream(auth=auth, listener=listener)
print("Tracking: " + str(WORDS))
streamer.filter(track=WORDS)