Database for Financial Analysis II - MongoDB

MongoDB is a document-oriented database.

Instead of storing your data in tables made out of individual rows, like a relational database does, it stores your data in collections made out of individual documents. In MongoDB, a document is a big JSON blob with no particular format or schema.

You can have all your data in one single collection.


Goal: Create Database in MongoDB

- Create a document style data in MongoDB
    - example: reinsurance treaty by reinsurer/year/treaty type        
- Create a document style data in MongoDB from Json API format
    - example: realtime stock price and realtime sector data        
- Read Twitter data into MongoDB  
  - example : read twitter about what type of insurance people are interesting 


Making a Connection with MongoClient

- Below link is an introduction tutorial to working with MongoDB and PyMongo
- ref: http://api.mongodb.com/python/current/tutorial.html
In [4]:
## Making a Connection with MongoClient
## Import pymongo
import pymongo

## The first step when working with PyMongo is to create a MongoClient to the running MongoDB instance
from pymongo import MongoClient

## Connect on the default host and port
client = MongoClient()

## We can also specify the host and port explicitly
client = MongoClient('localhost', 27017)


Getting a Database

- A single instance of MongoDB can support multiple independent databases
- When working with PyMongo you access databases using attribute style access on MongoClient instances
- Database name can not use attribute style access (ie. test-data), but "test_data" is okay
- A collection is a group of documents stored in MongoDB, and can be thought of as roughly the equivalent of a table
- Getting a collection in PyMongo works the same as getting a database
- Collections and databases are created when the first document is inserted into them.
In [5]:
## Set up an instance name "test_database" in MongoDB
db = client.test_database

## Set up collection - Table
collection = db.test_collection

collection
Out[5]:
Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'test_database'), 'test_collection')


Document - JSON-style

- Data in MongoDB is represented (and stored) using JSON-style documents.
- In PyMongo we use dictionaries to represent documents. 
- Example dictionary: reinsurance treaty by reinsurer/year/treaty type
- Documents can contain native Python types (like datetime.datetime instances) which will be automatically converted to and from the appropriate BSON types.
In [6]:
## Enter document into the collection
import datetime
treaty = {"reinsurer": "AIG",
        "treaty": "XOL layer",
        "tags": ["reinsurer", "treaty", "year"],
        "date": datetime.datetime.utcnow()}


Insert a document into a collection - use the insert_one() method

- When a document is inserted a special key, "_id", is automatically added.
- The value of "_id" must be unique across the collection. insert_one() returns an instance of InsertOneResult.     
In [7]:
treaties = db.treaties
treaty_id = treaties.insert_one(treaty).inserted_id
treaty_id
Out[7]:
ObjectId('5b92dad7760df70abc053950')


After inserting the first document, the posts collection has actually been created on the server.
- We can verify this by listing all of the collections in our database

In [8]:
db.list_collection_names()
Out[8]:
['profiles', 'treaties', 'census', 'sector_data', 'stock_data']
In [9]:
treaty2 = {"reinsurer": "Swiss Re",
        "treaty": "Clash Layer",
        "tags": ["reinsurer", "treaty", "year"],
        "date": datetime.datetime.utcnow()}

treaty_id = treaties.insert_one(treaty2).inserted_id
treaty_id
Out[9]:
ObjectId('5b92dadb760df70abc053951')
In [10]:
treaty3 = {"reinsurer": "Parter Re",
        "treaty": "XOL 2nd Layer",
        "tags": ["reinsurer", "treaty", "year"],
        "date": datetime.datetime.utcnow()}

treaty_id = treaties.insert_one(treaty3).inserted_id
treaty_id
Out[10]:
ObjectId('5b92dadd760df70abc053952')


Getting a Single Document With find_one()

- The most basic type of query that can be performed in MongoDB is find_one()
- This method returns a single document matching a query (or None if there are no matches)
- It is useful when you know there is only one matching document, or you are only interested in the first match
- Here we use find_one() to get the first document from the posts collection:
In [11]:
import pprint
pprint.pprint(treaties.find_one())
{'_id': ObjectId('5b92dad7760df70abc053950'),
 'date': datetime.datetime(2018, 9, 7, 20, 8, 52, 84000),
 'reinsurer': 'AIG',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'XOL layer'}
In [12]:
pprint.pprint(treaties.find_one({"reinsurer": "AIG"}))
{'_id': ObjectId('5b92dad7760df70abc053950'),
 'date': datetime.datetime(2018, 9, 7, 20, 8, 52, 84000),
 'reinsurer': 'AIG',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'XOL layer'}
In [13]:
pprint.pprint(treaties.find_one({"reinsurer": "ACE"}))
None


Querying By ObjectId
- We can also find a post by its _id, which in our example is an ObjectId:

In [14]:
treaty_id  ##output is an object
Out[14]:
ObjectId('5b92dadd760df70abc053952')
In [15]:
pprint.pprint(treaties.find_one({"_id": treaty_id}))
{'_id': ObjectId('5b92dadd760df70abc053952'),
 'date': datetime.datetime(2018, 9, 7, 20, 9, 1, 965000),
 'reinsurer': 'Parter Re',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'XOL 2nd Layer'}


Note that an ObjectId is not the same as its string representation

In [16]:
treaty_id_as_str = str(treaty_id)
treaty_id_as_str  ## output is a string
Out[16]:
'5b92dadd760df70abc053952'
In [17]:
treaties.find_one({"_id": treaty_id_as_str}) # No result


Bulk Inserts

- we can also perform bulk insert operations, by passing a list as the first argument to insert_many()
- This will insert each document in the list, sending only a single command to the server
- New_treaty has a different “shape” than the other posts - there is no "tags" field and we’ve added a new field, "title"
- This is what we mean when we say that MongoDB is schema-free
In [17]:
## Enter a new document with different “shape” than the other posts 
## There is no "tags" field and we’ve added a new field, "retenion"

new_treaty = [{"reinsurer": "AIG",
              "treaty": "XOL Layer 2018",
              "tags": ["bulk", "insert"],
              "date": datetime.datetime(2018, 11, 12, 11, 14)},
              {"reinsurer": "Munich Re",
               "treaty": "QS 2018",
               "retenion": "QS 20% for US business",
               "date": datetime.datetime(2018, 11, 10, 10, 45)}]

result = treaties.insert_many(new_treaty)
result.inserted_ids
Out[17]:
[ObjectId('5b92daee760df70abc053953'), ObjectId('5b92daee760df70abc053954')]

The result from insert_many() now returns two ObjectId instances, one for each inserted document.


Query data

- Use find()
- Use find_one()
In [18]:
for treaty in treaties.find():
     pprint.pprint(treaty)
{'_id': ObjectId('5b92dad7760df70abc053950'),
 'date': datetime.datetime(2018, 9, 7, 20, 8, 52, 84000),
 'reinsurer': 'AIG',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'XOL layer'}
{'_id': ObjectId('5b92dadb760df70abc053951'),
 'date': datetime.datetime(2018, 9, 7, 20, 8, 59, 246000),
 'reinsurer': 'Swiss Re',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'Clash Layer'}
{'_id': ObjectId('5b92dadd760df70abc053952'),
 'date': datetime.datetime(2018, 9, 7, 20, 9, 1, 965000),
 'reinsurer': 'Parter Re',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'XOL 2nd Layer'}
{'_id': ObjectId('5b92daee760df70abc053953'),
 'date': datetime.datetime(2018, 11, 12, 11, 14),
 'reinsurer': 'AIG',
 'tags': ['bulk', 'insert'],
 'treaty': 'XOL Layer 2018'}
{'_id': ObjectId('5b92daee760df70abc053954'),
 'date': datetime.datetime(2018, 11, 10, 10, 45),
 'reinsurer': 'Munich Re',
 'retenion': 'QS 20% for US business',
 'treaty': 'QS 2018'}
In [20]:
## Use find() for query

# Find all treaty in the collection
# for treaty in treaties.find():
#      pprint.pprint(treaty)

# Find all reinsurer-AIG treaty:
for treaty in treaties.find({"reinsurer": "AIG"}):
    pprint.pprint(treaty)
{'_id': ObjectId('5b92dad7760df70abc053950'),
 'date': datetime.datetime(2018, 9, 7, 20, 8, 52, 84000),
 'reinsurer': 'AIG',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'XOL layer'}
{'_id': ObjectId('5b92daee760df70abc053953'),
 'date': datetime.datetime(2018, 11, 12, 11, 14),
 'reinsurer': 'AIG',
 'tags': ['bulk', 'insert'],
 'treaty': 'XOL Layer 2018'}
In [21]:
## Find one reinsurer-AIG treaty:
pprint.pprint(treaties.find_one({"reinsurer": "AIG"}))
{'_id': ObjectId('5b92dad7760df70abc053950'),
 'date': datetime.datetime(2018, 9, 7, 20, 8, 52, 84000),
 'reinsurer': 'AIG',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'XOL layer'}


Counting

- If we just want to know how many documents match a query we can perform a count() operation instead of a full query
- We can get a count of all of the documents in a collection:
In [22]:
## How many AIG treaty?
treaties.count_documents(
  filter={"reinsurer": "AIG"} 
)


## How many treaties?
treaties.count_documents(
  filter={} 
)
Out[22]:
2
Out[22]:
5


Range Queries

- MongoDB supports many different types of advanced queries.
- As an example, lets perform a query where we limit results to treaties older than a certain date, but also sort the results by reinsurers
In [25]:
d = datetime.datetime(2018, 9, 12, 12)
for treaty in treaties.find({"date": {"$lt": d}}).sort("reinsurer"):
    pprint.pprint(treaty)
{'_id': ObjectId('5b92dad7760df70abc053950'),
 'date': datetime.datetime(2018, 9, 7, 20, 8, 52, 84000),
 'reinsurer': 'AIG',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'XOL layer'}
{'_id': ObjectId('5b92dadd760df70abc053952'),
 'date': datetime.datetime(2018, 9, 7, 20, 9, 1, 965000),
 'reinsurer': 'Parter Re',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'XOL 2nd Layer'}
{'_id': ObjectId('5b92dadb760df70abc053951'),
 'date': datetime.datetime(2018, 9, 7, 20, 8, 59, 246000),
 'reinsurer': 'Swiss Re',
 'tags': ['reinsurer', 'treaty', 'year'],
 'treaty': 'Clash Layer'}


Indexing

- Adding indexes can help accelerate certain queries and can also add additional functionality to querying and storing documents
- In this example, we’ll demonstrate how to create a unique index on a key that rejects documents whose value for that key already exists in the index
- First, we’ll need to create the index
In [26]:
 result = db.profiles.create_index([('reinsurer_id', pymongo.ASCENDING)],
                                  unique=True)
 sorted(list(db.profiles.index_information()))
Out[26]:
['_id_', 'reinsurer_id_1']


Notice that we have two indexes now
- One is the index on _id that MongoDB creates automatically, and the other is the index on reinsurer_id we just created.

In [28]:
 ## set up some user profiles
    
 user_profiles = [
     {'reinsurer_id': 100, 'reinsurer': 'Atlantic Re'},
     {'reinsurer_id': 25, 'reinsurer': 'XL Re'}]
 result = db.profiles.insert_many(user_profiles)
In [ ]:
 ## DuplicateKeyError: E11000 duplicate key error collection: test_database.profiles index: user_id_1 dup key: { : 235 }
 new_profile = {'user_id': 213, 'reinsurer': 'XL American'}
 duplicate_profile = {'user_id': 235, 'reinsurer': 'SCOR S.E'}
 result = db.profiles.insert_one(new_profile)  # This is fine.
 result = db.profiles.insert_one(duplicate_profile)


Create a document style data in MongoDB from Json API format

  • example: realtime intraday MSFT stock price
  • example: realtime sector stock return
In [29]:
realtime_stock_data={
"Meta Data": {
"1 Information": "Intraday (5min) open, high, low, close prices and volume",
"2 Symbol": "MSFT",
"3 Last Refreshed": "2018-08-22 15:55:00",
"4 Interval": "5min",
"5 Output Size": "Compact",
"6 Time Zone": "US/Eastern"
},
"Time Series (5min)": {
"2018-08-22 15:55:00": {
"1 open": "107.1500",
"2 high": "107.2100",
"3 low": "107.0500",
"4 close": "107.0500",
"5 volume": "970838"
}
}
}
In [30]:
stock_data = db.stock_data
stock_data_id = stock_data.insert_one(realtime_stock_data).inserted_id
stock_data_id
Out[30]:
ObjectId('5b92dc1e760df70abc053959')
In [31]:
realtime_sector_data={
    "Meta Data": {
        "Information": "US Sector Performance (realtime & historical)",
        "Last Refreshed": "02:05 PM ET 08/23/2018"
    },
    "Rank A: Real-Time Performance": {
        "Information Technology": "0.51%",
        "Consumer Discretionary": "-0.04%",
        "Utilities": "-0.11%",
        "Telecommunication Services": "-0.13%",
        "Health Care": "-0.20%",
        "Real Estate": "-0.25%",
        "Consumer Staples": "-0.27%",
        "Industrials": "-0.48%",
        "Financials": "-0.54%",
        "Energy": "-0.57%",
        "Materials": "-0.86%"
    },
    "Rank B: 1 Day Performance": {
        "Energy": "1.20%",
        "Information Technology": "0.48%",
        "Consumer Discretionary": "0.12%",
        "Health Care": "0.11%",
        "Financials": "-0.26%",
        "Materials": "-0.45%",
        "Consumer Staples": "-0.63%",
        "Real Estate": "-0.65%",
        "Utilities": "-0.78%",
        "Industrials": "-0.93%",
        "Telecommunication Services": "-2.02%"
    },
    "Rank C: 5 Day Performance": {
        "Energy": "3.27%",
        "Industrials": "2.23%",
        "Consumer Discretionary": "2.21%",
        "Materials": "2.02%",
        "Financials": "1.93%",
        "Health Care": "1.71%",
        "Telecommunication Services": "1.09%",
        "Information Technology": "0.84%",
        "Consumer Staples": "0.76%",
        "Real Estate": "0.16%",
        "Utilities": "-0.35%"
    },
    "Rank D: 1 Month Performance": {
        "Telecommunication Services": "6.53%",
        "Health Care": "5.38%",
        "Industrials": "2.75%",
        "Real Estate": "2.62%",
        "Consumer Staples": "2.57%",
        "Financials": "2.45%",
        "Utilities": "1.92%",
        "Consumer Discretionary": "1.90%",
        "Materials": "0.90%",
        "Information Technology": "0.57%",
        "Energy": "-1.17%"
    },
    "Rank E: 3 Month Performance": {
        "Health Care": "9.80%",
        "Consumer Staples": "9.18%",
        "Utilities": "8.72%",
        "Consumer Discretionary": "8.41%",
        "Real Estate": "8.28%",
        "Telecommunication Services": "6.02%",
        "Information Technology": "5.55%",
        "Financials": "0.49%",
        "Industrials": "0.15%",
        "Materials": "-2.15%",
        "Energy": "-5.60%"
    },
    "Rank F: Year-to-Date (YTD) Performance": {
        "Information Technology": "16.10%",
        "Consumer Discretionary": "15.79%",
        "Health Care": "10.55%",
        "Energy": "2.53%",
        "Financials": "1.05%",
        "Utilities": "1.04%",
        "Real Estate": "0.96%",
        "Industrials": "0.75%",
        "Materials": "-2.75%",
        "Consumer Staples": "-5.33%",
        "Telecommunication Services": "-6.21%"
    },
    "Rank G: 1 Year Performance": {
        "Information Technology": "31.09%",
        "Consumer Discretionary": "29.36%",
        "Energy": "20.32%",
        "Health Care": "16.84%",
        "Financials": "15.29%",
        "Industrials": "12.18%",
        "Materials": "9.50%",
        "Real Estate": "3.09%",
        "Telecommunication Services": "-0.85%",
        "Utilities": "-2.24%",
        "Consumer Staples": "-2.32%"
    },
    "Rank H: 3 Year Performance": {
        "Information Technology": "95.67%",
        "Consumer Discretionary": "54.45%",
        "Financials": "46.80%",
        "Industrials": "45.21%",
        "Materials": "36.15%",
        "Health Care": "27.81%",
        "Utilities": "18.65%",
        "Energy": "18.31%",
        "Consumer Staples": "13.63%",
        "Telecommunication Services": "5.73%"
    },
    "Rank I: 5 Year Performance": {
        "Information Technology": "151.29%",
        "Consumer Discretionary": "98.24%",
        "Health Care": "85.23%",
        "Financials": "75.87%",
        "Industrials": "67.23%",
        "Materials": "45.10%",
        "Utilities": "43.92%",
        "Consumer Staples": "35.93%",
        "Telecommunication Services": "4.47%",
        "Energy": "-6.33%"
    },
    "Rank J: 10 Year Performance": {
        "Consumer Discretionary": "284.77%",
        "Information Technology": "252.24%",
        "Health Care": "176.38%",
        "Industrials": "107.68%",
        "Consumer Staples": "90.16%",
        "Financials": "74.45%",
        "Materials": "50.53%",
        "Utilities": "41.62%",
        "Telecommunication Services": "21.26%",
        "Energy": "-3.80%"
    }
}
In [32]:
sector_data = db.sector_data
sector_data_id = sector_data.insert_one(realtime_sector_data).inserted_id
sector_data_id
Out[32]:
ObjectId('5b92dc29760df70abc05395a')


Collect data from Twitter into MongoDB
- ref: http://pythondata.com/collecting-storing-tweets-with-python-and-mongodb/

In [ ]:
from __future__ import print_function
import tweepy
import json
from pymongo import MongoClient
 
MONGO_HOST= 'mongodb://localhost/twitterdb'  # assuming you have mongoDB installed locally
                                             # and a database called 'twitterdb'
 
WORDS = ['#insurance', '#AI', '#datascience', '#machinelearning', '#auto', '#reinsurance']
 
CONSUMER_KEY = %env TWITTER_CONSUMER_KEY
CONSUMER_SECRET = %env TWITTER_CONSUMER_SECRET
ACCESS_TOKEN =%env TWITTER_ACCESS_TOKEN
ACCESS_TOKEN_SECRET = %env TWITTER_ACCESS_TOKEN_SECRET

 
 
class StreamListener(tweepy.StreamListener):    
    #This is a class provided by tweepy to access the Twitter Streaming API. 
 
    def on_connect(self):
        # Called initially to connect to the Streaming API
        print("You are now connected to the streaming API.")
 
    def on_error(self, status_code):
        # On error - if an error occurs, display the error / status code
        print('An Error has occured: ' + repr(status_code))
        return False
 
    def on_data(self, data):
        #This is the meat of the script...it connects to your mongoDB and stores the tweet
        try:
            client = MongoClient(MONGO_HOST)
            
            # Use twitterdb database. If it doesn't exist, it will be created.
            db = client.twitterdb
    
            # Decode the JSON from Twitter
            datajson = json.loads(data)
            
            #grab the 'created_at' data from the Tweet to use for display
            created_at = datajson['created_at']
 
            #print out a message to the screen that we have collected a tweet
            print("Tweet collected at " + str(created_at))
            
            #insert the data into the mongoDB into a collection called twitter_search
            #if twitter_search doesn't exist, it will be created.
            db.twitter_search.insert(datajson)
        except Exception as e:
           print(e)
 
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
#Set up the listener. The 'wait_on_rate_limit=True' is needed to help with Twitter API rate limiting.
listener = StreamListener(api=tweepy.API(wait_on_rate_limit=True)) 
streamer = tweepy.Stream(auth=auth, listener=listener)
print("Tracking: " + str(WORDS))
streamer.filter(track=WORDS)