Monday, February 18, 2019

A Minimalist Guide to FoundationDB

I'll use Pandas to read CSV data in chunks out of a GZIP-compressed CSV file. This file contains 20 million NYC taxi trips conducted in 2009. I'll feed the originating neighbourhood(s) and final total taxi fare into FDB.

from   datetime import datetime

import fdb
from   fdb.tuple import pack, unpack
import pandas as pd


fdb.api_version(510)


@fdb.transactional
def get_score(tr, user):
    user_key = pack(('scores', user))
    score = tr.get(user_key)

    if score == None:
        score = 0
        tr.set(user_key, pack((score,)))
        tr.set(pack(('leaderboard', score, user)), b'')
    else:
        score = unpack(score)[0]

    return score


@fdb.transactional
def add(tr, user, increment=1):
    score = get_score(tr, user)
    total = score + increment
    user_key = pack(('scores', user))

    tr.set(user_key, pack((total,)))
    tr.clear(pack(('leaderboard', score, user)))
    tr.set(pack(('leaderboard', total, user)), b'')

    return total


cols = ['trip_id',
        'vendor_id',
        'pickup_datetime',
        'dropoff_datetime',
        'store_and_fwd_flag',
        'rate_code_id',
        'pickup_longitude',
        'pickup_latitude',
        'dropoff_longitude',
        'dropoff_latitude',
        'passenger_count',
        'trip_distance',
        'fare_amount',
        'extra',
        'mta_tax',
        'tip_amount',
        'tolls_amount',
        'ehail_fee',
        'improvement_surcharge',
        'total_amount',
        'payment_type',
        'trip_type',
        'pickup',
        'dropoff',
        'cab_type',
        'precipitation',
        'snow_depth',
        'snowfall',
        'max_temperature',
        'min_temperature',
        'average_wind_speed',
        'pickup_nyct2010_gid',
        'pickup_ctlabel',
        'pickup_borocode',
        'pickup_boroname',
        'pickup_ct2010',
        'pickup_boroct2010',
        'pickup_cdeligibil',
        'pickup_ntacode',
        'pickup_ntaname',
        'pickup_puma',
        'dropoff_nyct2010_gid',
        'dropoff_ctlabel',
        'dropoff_borocode',
        'dropoff_boroname',
        'dropoff_ct2010',
        'dropoff_boroct2010',
        'dropoff_cdeligibil',
        'dropoff_ntacode',
        'dropoff_ntaname',
        'dropoff_puma']

db = fdb.open()
counter, start = 0, datetime.utcnow()

for chunk in pd.read_csv('trips_xaa.csv.gz',
                         header=None,
                         chunksize=10000,
                         names=cols,
                         usecols=['total_amount',
                                  'pickup_ntaname']):
    for x in range(0, len(chunk)):
        add(db,
            chunk.iloc[x].pickup_ntaname,
            chunk.iloc[x].total_amount)
    counter = counter + 1
    print (counter * 10000) / (datetime.utcnow() - start).total_seconds()

The above imported at a rate of 495 records per second. While the import was taking place I was able to begin querying the leaderboard.

from   operator import itemgetter

import fdb
from   fdb.tuple import pack, unpack


fdb.api_version(510)


@fdb.transactional
def top(tr, count=3):
    out = dict()
    iterator = tr.get_range_startswith(pack(('leaderboard',)), reverse=True)

    for key, _ in iterator:
        _, score, user = unpack(key)

        if score in out.keys():
            out[score].append(user)
        elif len(out.keys()) == count:
            break
        else:
            out[score] = [user]

    return dict(sorted(out.items(), key=itemgetter(0), reverse=True))


top(db)

This is the top three pick up points by total cab fare after a few minutes of importing the CSV file.


DataTau published first on DataTau

No comments:

Post a Comment