Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions netbox/ipam/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def db_type(self, connection):
IPAddressField.register_lookup(lookups.NetHostContained)
IPAddressField.register_lookup(lookups.NetFamily)
IPAddressField.register_lookup(lookups.NetMaskLength)
IPAddressField.register_lookup(lookups.NetHostLessThan)
IPAddressField.register_lookup(lookups.NetHostLessThanOrEqual)
IPAddressField.register_lookup(lookups.NetHostGreaterThan)
IPAddressField.register_lookup(lookups.NetHostGreaterThanOrEqual)


class ASNField(models.BigIntegerField):
Expand Down
40 changes: 40 additions & 0 deletions netbox/ipam/lookups.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,46 @@ def as_sql(self, qn, connection):
return 'CAST(HOST(%s) AS INET) <<= %s' % (lhs, rhs), params


class NetHostGreaterThan(Lookup):
lookup_name = 'net_host_gt'

def as_sql(self, qn, connection):
lhs, lhs_params = self.process_lhs(qn, connection)
rhs, rhs_params = self.process_rhs(qn, connection)
params = lhs_params + rhs_params
return 'CAST(HOST(%s) AS INET) > INET %s' % (lhs, rhs), params


class NetHostLessThan(Lookup):
lookup_name = 'net_host_lt'

def as_sql(self, qn, connection):
lhs, lhs_params = self.process_lhs(qn, connection)
rhs, rhs_params = self.process_rhs(qn, connection)
params = lhs_params + rhs_params
return 'CAST(HOST(%s) AS INET) < INET %s' % (lhs, rhs), params


class NetHostGreaterThanOrEqual(Lookup):
lookup_name = 'net_host_gte'

def as_sql(self, qn, connection):
lhs, lhs_params = self.process_lhs(qn, connection)
rhs, rhs_params = self.process_rhs(qn, connection)
params = lhs_params + rhs_params
return 'CAST(HOST(%s) AS INET) >= INET %s' % (lhs, rhs), params


class NetHostLessThanOrEqual(Lookup):
lookup_name = 'net_host_lte'

def as_sql(self, qn, connection):
lhs, lhs_params = self.process_lhs(qn, connection)
rhs, rhs_params = self.process_rhs(qn, connection)
params = lhs_params + rhs_params
return 'CAST(HOST(%s) AS INET) <= INET %s' % (lhs, rhs), params


class NetFamily(Transform):
lookup_name = 'family'
function = 'FAMILY'
Expand Down
14 changes: 7 additions & 7 deletions netbox/ipam/models/ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,15 +580,15 @@ def clean(self):
})

# Check for overlapping ranges
overlapping_range = IPRange.objects.exclude(pk=self.pk).filter(vrf=self.vrf).filter(
Q(start_address__gte=self.start_address, start_address__lte=self.end_address) | # Starts inside
Q(end_address__gte=self.start_address, end_address__lte=self.end_address) | # Ends inside
Q(start_address__lte=self.start_address, end_address__gte=self.end_address) # Starts & ends outside
).first()
if overlapping_range:
overlapping_ranges = IPRange.objects.exclude(pk=self.pk).filter(vrf=self.vrf).filter(
Q(start_address__net_host_gte=self.start_address.ip, start_address__net_host_lte=self.end_address.ip) | # Starts inside
Q(end_address__net_host_gte=self.start_address.ip, end_address__net_host_lte=self.end_address.ip) | # Ends inside
Q(start_address__net_host_lte=self.start_address.ip, end_address__net_host_gte=self.end_address.ip) # Starts & ends outside
)
if overlapping_ranges.exists():
raise ValidationError(
_("Defined addresses overlap with range {overlapping_range} in VRF {vrf}").format(
overlapping_range=overlapping_range,
overlapping_range=overlapping_ranges.first(),
vrf=self.vrf
))

Expand Down
29 changes: 29 additions & 0 deletions netbox/ipam/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,35 @@ def test_get_utilization(self):
self.assertEqual(aggregate.get_utilization(), 100)


class TestIPRange(TestCase):

def test_overlapping_range(self):
iprange_192_168 = IPRange.objects.create(start_address=IPNetwork('192.168.0.1/22'), end_address=IPNetwork('192.168.0.49/22'))
iprange_192_168.clean()
iprange_3_1_99 = IPRange.objects.create(start_address=IPNetwork('1.2.3.1/24'), end_address=IPNetwork('1.2.3.99/24'))
iprange_3_1_99.clean()
iprange_3_100_199 = IPRange.objects.create(start_address=IPNetwork('1.2.3.100/24'), end_address=IPNetwork('1.2.3.199/24'))
iprange_3_100_199.clean()
iprange_3_200_255 = IPRange.objects.create(start_address=IPNetwork('1.2.3.200/24'), end_address=IPNetwork('1.2.3.255/24'))
iprange_3_200_255.clean()
iprange_4_1_99 = IPRange.objects.create(start_address=IPNetwork('1.2.4.1/24'), end_address=IPNetwork('1.2.4.99/24'))
iprange_4_1_99.clean()
iprange_4_200 = IPRange.objects.create(start_address=IPNetwork('1.2.4.200/24'), end_address=IPNetwork('1.2.4.255/24'))
iprange_4_200.clean()
# Overlapping range entirely within existing
with self.assertRaises(ValidationError):
iprange_3_123_124 = IPRange.objects.create(start_address=IPNetwork('1.2.3.123/26'), end_address=IPNetwork('1.2.3.124/26'))
iprange_3_123_124.clean()
# Overlapping range starting within existing
with self.assertRaises(ValidationError):
iprange_4_98_101 = IPRange.objects.create(start_address=IPNetwork('1.2.4.98/24'), end_address=IPNetwork('1.2.4.101/24'))
iprange_4_98_101.clean()
# Overlapping range ending within existing
with self.assertRaises(ValidationError):
iprange_4_198_201 = IPRange.objects.create(start_address=IPNetwork('1.2.4.198/24'), end_address=IPNetwork('1.2.4.201/24'))
iprange_4_198_201.clean()


class TestPrefix(TestCase):

def test_get_duplicates(self):
Expand Down