Source code for fleetmanager.model.roundtripgenerator

import datetime
import math
import operator

import numpy as np
import pandas as pd


[docs]def calc_distance(coord1, coord2): """ Simple distance function to measure the distance in km from two coordinates (lat, long) (lat, long) Parameters ---------- coord1 : (latitude, longitude) coord2 : (latitude, longitude) Returns ------- distance in km """ lat1, lon1 = coord1 lat2, lon2 = coord2 R = 6371 phi1 = lat1 * math.pi / 180 phi2 = lat2 * math.pi / 180 delta_phi = (lat2 - lat1) * math.pi / 180 delta_lambda = (lon2 - lon1) * math.pi / 180 a = math.sin(delta_phi / 2) * math.sin(delta_phi / 2) + math.cos(phi1) * math.cos( phi2 ) * math.sin(delta_lambda / 2) * math.sin(delta_lambda / 2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return R * c
[docs]def point_to_starts(coordinate, starts): """ Function used to measure the distance between a selected start to a list of starts iterates over the starts list and returns the distance and coordinates to the starting location to which there's the shortest distance. Parameters ---------- coordinate : (lat, lon) starts : list of (lat, lon) - [(lat, lon), (lat, lon), (lat, lon) ...] Returns ------- (min_distance, best_start) - distance to closest start, (lat, lon) to closest start """ min_distance = math.inf best_start = None for start in starts: distance = calc_distance(start, coordinate) if distance < min_distance: min_distance = distance best_start = start return min_distance, best_start
[docs]def get_car_starts(trips, allowed_starts_): """ Function to retrieve allowed starts for the cars. Iterates over all trips for a specific car and finds the start to which it's closest. If the distance to an allowed start is less than .2 km the start is accepted. The 3 most frequent starts for each car is returned for use in the later process when the roundtrips are defined Parameters ---------- trips : trips from the table expected in pandas format. At least "car_id", "start_latitude", "start_longitude" allowed_starts : list of cordinates (lat, lon) that are accepted start locations Returns ------- dictionary for each car id that contain the 3 most frequent starts for the car {car_id: [(lat,lon), (lat,lon), (lat,lon)], car_id: [(lat,lon), (lat,lon), (lat,lon)] ...} """ cars = trips.car_id.unique() car_dict = {car: {} for car in cars} allowed_starts_coordinates = [ (a.latitude, a.longitude) for a in allowed_starts_.itertuples() ] for car in cars: log_points = trips[trips.car_id == car] car_starts = [ (a, b) for a, b in zip(log_points.start_latitude, log_points.start_longitude) if all([type(a) is not None, type(b) is not None]) ] closest = [ point_to_starts(start, allowed_starts_coordinates) for start in car_starts ] for distance, (points) in closest: if distance < 0.2: if points not in car_dict[car]: car_dict[car][points] = 0 car_dict[car][points] += 1 allowed_car_starts = { car: [ point for point, frequency in sorted( car_dict[car].items(), key=operator.itemgetter(1), reverse=True )[:3] ] for car in cars } return allowed_car_starts
[docs]def post_routing_sanitation(routes, trips, starts_): """ Function used post routing to do sanitation - especially useful to scrutinise the trips that doesn't "make sense" from a distance or time perspective. The time (7 days) and distance (200 km) criteria defines which routes will be sought to be re-defined. For the selected routes it selects the trips points from the start - and end time, new starting locations are defined - isolated to those present as opposed to all log points for the car. The trips log points and new locations are sent to the trip extractor to define possible new routes. Parameters ---------- routes : the defined routes from trip_extractor trips : the trips frame starts_ : frame holding the allowed starts from the allowed_starts table Returns ------- routes that were originally accepted and possible sanitized routes """ distance_criteria = 200 time_criteria = 1 confirmed = [] check = [] sanitized = [] for route in routes: if ( route["end_time"].date() - route["start_time"].date() ).days >= time_criteria or route["distance"] > distance_criteria: check.append(route) else: confirmed.append(route) for route in check: # check which starts it's most in relation with # select one start based on frequency car = route["car_id"] trip_points = trips[ (trips.car_id == route["car_id"]) & (trips.start_timestamp >= route["start_time"]) & (trips.end_timestamp <= route["end_time"]) ] new_starts = {car: get_car_starts(trip_points, starts_)[car][:]} new_routes, _ = trip_extractor( trip_points, allowed_car_starts=new_starts, start_frame=starts_, ) sanitized += new_routes return confirmed + sanitized
[docs]def trip_extractor( frame, allowed_car_starts=None, start_frame=None, hour_threshold=16.9, definition_of_home=0.2, min_travel_distance=0.5, recurs=0, enforce_location=None, ): """ Major roundtrip generator function. Takes the trips pulled and the defined allowed starts for each car. Assumes that a route will start and end at the same location (+-definition of home), exists of at least 2 log points, travelled distance is more than 500 meters and driver is the same (except if driver defined moves from nan to driver_name or driver_name to nan once) Parameters ---------- frame : trips frame from trips table allowed_car_starts : dictionary of allowed starts for each car_id present in trips frame start_frame: frame holding the allowed starts from the allowed_starts table hour_threshold : int, standard for limiting the allowed length of a trip. definition_of_home : int, allowed distance to home. If a log is definition_of_home close to an allowed start at unstarted trip, the trip will begin. If a trip is underway, the trips ends if a log is seen that is definition_of_home close to the defined allowed start min_travel_distance : int, the minimum travel distance for accepting the trip as a valid roundtrip recurs : int, should not be changed. Internal value for constraining the function to only recurs once to parse unqualified roundtrips in to qualified roundtrips. E.g. There could be vehicles that find home after days because it has been lend out to another location, then the allowed start has been "wrongly" selected and should be defined to the location to which the vehicle has been lend in order to record the trips driven in the other location. enforce_location : int, defining the location. In accordance with the aforementioned, this value will overwrite the found location and always allocate the trip to the location to where the vehicle belong Returns ------- list of accepted routes in dictionary format including following attributes: start_time, end_time, start_point, end_point, car_id, distance, gps_points, driver, ids """ time_sorted_frame = frame.sort_values(["start_time"]).reset_index().iloc[:, 1:] time_sorted_frame["start_timestamp"] = time_sorted_frame.start_time.apply( lambda x: datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S") if type(x) is str else x ) time_sorted_frame["end_timestamp"] = time_sorted_frame.end_time.apply( lambda x: datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S") if type(x) is str else x ) all_routes = [] # list for qualified routes for car_id in time_sorted_frame.car_id.unique(): current_route = [] # list for holding log points for current running route trip_end = False trip_frame = time_sorted_frame[time_sorted_frame.car_id == car_id] trip_frame_length = len(trip_frame) driver = trip_frame.iloc[0]["driver_name"] if pd.isnull(driver): driver = "nan" for k, segment in enumerate(trip_frame.itertuples()): # to allow nan driver log points to be allocated to route if pd.isnull(segment.driver_name): current_driver = "nan" else: current_driver = segment.driver_name # check that current driver is equal to the previous - except if one of them is a nan if ( current_driver != driver and driver != "nan" and current_driver != "nan" ) or len( set( [ "nan" if pd.isna(a.driver_name) else a.driver_name for a in current_route ] ) ) == 3: # discard already logged points to the route because driver changed current_route = [] if len(current_route) == 0: # if this is the first log of the trip ensure that the location is an accepted start distances_to_starts = [ calc_distance( (segment.start_latitude, segment.start_longitude), (start) ) for start in allowed_car_starts[car_id] ] if all([dist > definition_of_home for dist in distances_to_starts]): continue test = 0 driver = current_driver total_travelled = 0 start_point = allowed_car_starts[car_id][ np.argsort(distances_to_starts)[0] ] start_time = segment.start_timestamp locat = start_frame[ (start_frame.latitude == start_point[0]) & (start_frame.longitude == start_point[1]) ].id.values[0] allowed_stops = start_frame[start_frame.id == locat][ ["latitude", "longitude"] ].values multiple_stops = True if len(allowed_stops) > 1 else False current_route.append(segment) if k < trip_frame_length - 1: # due to "teleportation" where the current end coordinate is not the next start point = trip_frame.iloc[k + 1][ ["start_latitude", "start_longitude"] ].values else: point = (segment.end_latitude, segment.end_longitude) # we need the end location to determine whether it's parked close to home if multiple_stops: distances = [ calc_distance(point, (lat, lon)) for lat, lon in allowed_stops ] distance = distances[np.argsort(distances)[0]] else: distance = calc_distance(start_point, point) # sanity check (sometimes the gps distance is wrong) point_to_point = calc_distance( point, (segment.start_latitude, segment.start_longitude) ) # gps points from skyhost is sometimes missing or (0, 0) if point_to_point > 5000: point_to_point = segment.distance try: total_travelled += ( point_to_point if any( [ point_to_point / segment.distance > 1.5, 0.05 > point_to_point / segment.distance, ] ) else segment.distance ) except ZeroDivisionError: total_travelled += point_to_point # distance to the route start time_difference = segment.start_timestamp - start_time time_difference_seconds = ( time_difference.days * 24 * 3600 ) + time_difference.seconds # time to the last start checkpoint # if time exceeds 24 * 7 hours, trip must have ended time_exceeded = time_difference_seconds > 3600 * hour_threshold # is a route if back to start if distance <= definition_of_home and total_travelled > min_travel_distance: if len(current_route) >= 2: trip_end = True # trip ends elif k < len(trip_frame) - 1: # test if the next point is also at the start, then we end the trip next_start_point = trip_frame.iloc[k + 1][ ["start_latitude", "start_longitude"] ].values distance_between_points = calc_distance( start_point, next_start_point ) if distance_between_points < definition_of_home: trip_end = True if time_exceeded: # recursive check to see if we can route it more segmented ar = [] if current_route and recurs == 0: n_allow = get_car_starts(pd.DataFrame(current_route), start_frame) i = 1 test += 1 if test > 10: pass elif ( current_route[-1].end_time - current_route[0].start_time ).total_seconds() / 3600 > 72: pass else: while True: ar, _ = trip_extractor( pd.DataFrame(current_route[i:]), n_allow, start_frame, recurs=recurs + 1, definition_of_home=definition_of_home, hour_threshold=hour_threshold, enforce_location=enforce_location, ) i += 1 if ar or len(current_route) == i: break if ar: for a in ar: if ( a["end_time"] - a["start_time"] ).total_seconds() / 3600 > hour_threshold: continue a["from"] = "time" all_routes.append(a) current_route = [] trip_end = False continue if trip_end: if time_exceeded: continue all_routes.append( route_format( current_route, car_id, locat if pd.isna(enforce_location) else enforce_location, total_travelled, ) ) # reset parameters when trip has been saved or dismissed current_route = [] trip_end = False return all_routes, time_sorted_frame
[docs]def trip_aggregator( car, c_trips, allowed_starts=None, hour_threshold=16.9, definition_of_home=0.2, min_travel_distance=0.5, ): """ Function for handling the aggregation library. Called by the SkyHost - and FleetCompleteExtractor to easily aggregate the new trips. Handles the whole process. Parameters ---------- car : car object, with a defined id and location id c_trips : dataframe, the car trips that will be sought to be aggregated into roundtrips allowed_starts : dataframe, the starts that are available to the car hour_threshold : int, the hour threshold for the aggregated trips definition_of_home int, the distance allowed to a start min_travel_distance int, the minimum travel distance for a trip. Returns ------- list of aggregated trips in dictionary. """ if len(c_trips) == 0: return [] print("Processing {} trips".format(c_trips.shape[0])) allowed_car_starts = get_car_starts(c_trips, allowed_starts) car_routes, trips_time = trip_extractor( c_trips, allowed_car_starts, allowed_starts, definition_of_home=definition_of_home, enforce_location=car.location, hour_threshold=hour_threshold, min_travel_distance=min_travel_distance ) trip_routes = sum([len(a["ids"]) for a in car_routes]) if trip_routes / len(c_trips) < 0.95: unused_trips, groups = get_consective_groups(trips_time, car_routes) if len(unused_trips) == 0: pass elif ( unused_trips.iloc[-1].end_time - unused_trips.iloc[0].start_time ).days < 2: pass else: allow_skip_last = True if car_routes: allow_skip_last = ( True if unused_trips.iloc[-1].end_time != car_routes[-1]["end_time"] else False ) unused_routed = [ route_format(a, car.id, car.location) for k, b in enumerate(groups) for a in group_exceeded( unused_trips.loc[b], k + 1 == len(groups) and allow_skip_last, ) ] car_routes += unused_routed if len(car_routes) == 0: return [] car_routes_frame = pd.DataFrame(car_routes).sort_values(["start_time"]) assert all( ( car_routes_frame.start_time[1:].values - car_routes_frame.end_time[:-1].values ) / np.timedelta64(1, "s") / 3600 > 0 ), "Overlapping routes" return car_routes_frame.to_dict("records")
[docs]def route_check(route): """ Convenience function to check if a list of trips qualifies to a roundtrip. Checks that the distance of the trip is above .5 km and the duration is less than 16.9 hours. Criteria are hardcoded since the function is only called with roundtrips that are aggregated based on trips that could not be aggregated with trip_extractor. Parameters ---------- route : list of frame objects that make up the Returns ------- bool : roundtrip qualified """ distance = sum([a.distance for a in route]) route_duration = (route[-1].end_time - route[0].start_time).total_seconds() / 3600 km_over_duration = distance / route_duration if distance > 0.5 and route_duration < 16.9: return True return False
[docs]def group_exceeded(group, skip_last=False): """ Function for collecting un-aggregated trips. The trip_extractor returns the aggregated trips, from the original frame we can extract the trips logs that are not used in a qualified roundtrip. The logs that have adjacent logs will be sought to be grouped and added together in order to attribute these logs in the roundtrip frame. Otherwise, we end up having less trips in the dataset than what has actually be driven. Calls the route_check to check the coherence of the collection of trips. Iterates over the group, and adds the trip to a route if the log is started less than .75 hour from the last log Parameters ---------- group : dataframe, the collected trips that are adjacent skip_last : bool, if the group is part of the last pulled trips on the major trip frame, we skip adding as a route to allow the next aggregation job to properly aggregate these trips Returns ------- list of routes """ routes = [] if len(group) > 0: last_time = group.iloc[0].end_time route = [group.iloc[0]] for k, a in enumerate(group.itertuples()): if k == 0: continue if (a.start_time - last_time).total_seconds() / 3600 > 0.75: if route_check(route): routes.append(route) route = [] route.append(a) last_time = a.end_time if skip_last is False and route_check(route): routes.append(route) return routes
[docs]def inb(row, ts): """ Checks that there are no overlapping timestamps. Sometimes the APIs return invalid data. Parameters ---------- row : dataframe row ts : times of the trips in route Returns ------- False if everything is ok, else ids for overlapping """ ovs = [ k for k, a in enumerate(ts) if row["start_time"] >= a[0] and row["end_time"] <= a[1] ] if len(ovs): return ovs return False
[docs]def get_consective_groups(car_trips, routes): """ Extracts the untripped trips from the car trips dataframe. The trip_extractor returns the aggregated trips, from the original frame we can extract the trips logs that are not used in a qualified roundtrip. The logs that have adjacent logs will be sought to be grouped and added together in order to attribute these logs in the roundtrip frame. Otherwise, we end up having less trips in the dataset than what has actually be driven. Parameters ---------- car_trips : dataframe, the trips routes : routes, the aggregated routes Returns ------- not_in : dataframe, the trips that are not used in a route groups : list, list of index groups that are adjacent and unused """ times = [(a["start_time"], a["end_time"]) for a in routes] ids = [a for b in routes for a in b["ids"]] not_in = car_trips[~car_trips.id.isin(ids)].copy() not_in["overlap"] = not_in.apply(lambda x: inb(x, times), axis=1) not_in = not_in[not_in.overlap == False].copy() not_in["t_id"] = not_in.index.values diff = not_in.t_id.diff() groups = [] group = [] for tr, b in zip(not_in.itertuples(), diff): if pd.isna(b) is False and b != 1: groups.append(group) group = [] group.append(tr.Index) if group: groups.append(group) return not_in, [a for a in groups if len(a) >= 2]
[docs]def route_format(route, car_id, location, distance=None): """ Convenience function to convert to unified route format before saving to database Parameters ---------- route : list of trips that makes up the route car_id : int, id of the car location : int, id of the location that should be enforced distance : int, the km distance of trip Returns ------- dictionary of the trip """ return { "start_time": route[0].start_time, "end_time": route[-1].end_time, "start_latitude": route[0].start_latitude, "start_longitude": route[0].start_longitude, "end_latitude": route[-1].end_latitude, "end_longitude": route[-1].end_longitude, "car_id": car_id, "distance": sum([a.distance for a in route]) if pd.isna(distance) else distance, "gps_points": [ [ (a.start_latitude, a.start_longitude), (a.end_latitude, a.end_longitude), ] for a in route ], "driver": None, "ids": [a.id for a in route], "start_location_id": location, }