5 min readMay 30, 2021


Internet of Things (loT) is a world-wide network connecting all the smart objects together. It is the way in which all things are enabled to talk with each other. Whenever those smart things being connected over internet are restricted to only vehicles, then it is called as Internet of Vehicles (loV). With continuously increasing urban population and rapidly expanding cities, vehicle ownership has been increasing at an exponential rate. Hence, traffic management has become a great problem in our day today life. This project provides IOV based traffic management solution to overcome the problem that is prevailing in our daily life.


The internet in today’s world is a global phenomenon. With more and more devices becoming internet friendly, traffic management in transportation working with the internet becomes easier. As vehicle ownership has been increasing at an exponential rate, more traffic management problems arise. It is logical that the monitoring of speed, Violation checks should also be taken care to make life easier. The traditional solutions offered to this problem are monitoring of vehicular speed through CCTV cameras and speed trackers. While being the obvious choices, these strategies do tend to fail when it comes to monitoring of a huge number of vehicles i.e. when the number of vehicles increases, the effectiveness by which a road transport authority can handle the incoming breaches of vehicular code decreases. This is where Internet of Vehicle (loV) comes into play.

Problem Statement

Nowadays traffic congestion and long queues at intersection during peak hour are the major problems. Because of lack of traffic management system high number of traffic jam is happening.


Traffic Control Algorithm:

· start:

· load all vehicle data

· assert which vehicle is on which road

· take count of vehicle on each incoming lane

· if current flowing is almost empty

· start the flow of road(let max_v_road) with max vehicles

· else

· if max flow time has passed and packed lane is not set to flow

· set most packed lane to flow

Simulation Algorithm:

· Generate vehicle with chance 1/4 and max capacity of 20 vehicles per path.

· Choose one start path out of all path

. Move the vehicle by one step

Architecture Diagrams

Packages and modules used in this project

· Map (Package)

It stores current map state which is traffic signals state, roads, traffic signals and road ends.

Road: -

class Lane:

end1: LaneEnd = field(compare=False, hash=False)

end2: LaneEnd = field(compare=False, hash=False)

side1: Line2D = field(compare=False, hash=False)

side2: Line2D = field(compare=False, hash=False)

id: int = field(default_factory=next_road_id, compare=True, hash=True)

class Intersection:

incoming_ends: Set[LaneEnd] = field(compare=False, hash=False)

outgoing_ends: Set[LaneEnd] = field(compare=False, hash=False)

id: int = field(default_factory=next_intersection_id, compare=True, hash=True)

def traffic_lights(self):

result = []

for end in [*self.incoming_ends, *self.outgoing_ends]:

if end.traffic_light is not None:


return result

Traffic Lights

class TrafficLight:

def __init__(self, signal=TrafficSignal.GO, id=None):

self.signal = signal

if id is None: = next_trafficlight_id()

else: = id


def next_road_id():

global _last_road_id

_last_road_id += 1

return _last_road_id

def next_roadend_id():

global _last_roadend_id

_last_roadend_id += 1

return _last_roadend_id

def next_trafficlight_id():

global _last_trafficlight_id

_last_trafficlight_id += 1

return _last_trafficlight_id

def next_intersection_id():

global _last_intersection_id

_last_intersection_id += 1

return _last_intersection_id

· Simulation (Package)

IT loads the data of road, vehicles. Move the vehicles on road, Creates path for vehicle and movement of vehicle and finally reports traffic rules violation.

Movements of vehicles

class Path:

# traffic_distance = distance from start of path to traffic light

def __init__(self, path, length, traffic_light, traffic_distance=simmap.rl):

self.path = path

self.vehicles = OrderedDict() # {vehicle: distance from path start}

self.length = length

self.traffic_light = traffic_light

self.traffic_distance = traffic_distance

Simulating map

road_ends = [road1_ends, road2_ends, road3_ends, road4_ends]

lane_ends = [[road_end[0], road_end[3]] for road_end in road_ends]

lane_ends.extend([[road_end[1], road_end[2]] for road_end in road_ends])

lanes = []

for lane_end in lane_ends:

side1 = Line2D(lane_end[0].boundary.p1, lane_end[1].boundary.p1)

side2 = Line2D(lane_end[0].boundary.p2, lane_end[1].boundary.p2)

lane = Lane(*lane_end, side1=side1, side2=side2)


for end in lane_end:

end.lane = lane

intersection = Intersection(set([road_end[0] for road_end in road_ends]), set([road_end[1] for road_end in road_ends]))

· Vehicle (Module)

It stores all the vehicles and provides a list of all vehicle.

def next_vehicle_id():

global _last_vehicle_id

_last_vehicle_id += 1

return _last_vehicle_id

class Vehicle:

p: Point2D = field(compare=False, hash=False)

info: Dict = field(default_factory=lambda: {}, compare=False, hash=False)

id: int = field(default_factory=next_vehicle_id, compare=True, hash=True)

def add(vehicle):

if isinstance(vehicle, Vehicle):



raise TypeError(f’Expected type {Vehicle} found {type(vehicle)}’)

· Traffic control (Module)

It simply performs the algorithm of traffic control system

class TrafficControl:

def __init__(self, intersection):

self.intersection = intersection

self.max_block_time = MAX_FLOW_TIME * len(intersection.incoming_ends)

self.blocked_lane_ends = {} # {lane_end: timedelta(seconds=time.time()}

self.last_flow_update = timedelta(seconds=0)

self.flow_lane = None

def vehicle_count(self):

vc = {lane_end: 0 for lane_end in self.intersection.incoming_ends}

for vehicle in list(vehicles):

for lane_end in self.intersection.incoming_ends:

if lane_end.lane.on(vehicle.p):

vc[lane_end] += 1


· Police client (Module)

This module uploads traffic violation report to the server.

ubidots_client = ubidots.ApiClient(token=’BBFF-i8XWP7su6Zg9oq7PGcJsXC4hUQxmxN’)

flagged_vehicles = ubidots_client.get_variable(‘608420c81d84727ad63e7dd8’)

def report(vehicle):


Testing (Simulating)


The simulation is running in the above figures. In fig 7.1 we can see the left downside lane is most packed. Then in fig 7.2 we see that the left downside lane is now green.

Uploading on server


This system configuration reduces huge traffic queues caused by the conventionally implemented system used in many places. The system also additionally reduces the workload of officers who would have to be present near the area to check the rules. It also helps take down the vehicle ID with date and time of violation. The system in simple words provides a simple yet effective solution to improper traffic management systems.

Team Members:-

Akshit Kumar(ENG18CS0029)

Anirban Saha(ENG18CS0037)

Arjun Upadhayay(ENG18CS0045)

Kinshuk Kishore(ENG18CS0135)