Tag Archives: Python

Open5Gs- Python HSS Interface

Note: NextEPC the Open Source project rebranded as Open5Gs in 2019 due to a naming issue. The remaining software called NextEPC is a branch of an old version of Open5Gs. This post was written before the rebranding.

I’ve been working for some time on Private LTE networks, the packet core I’m using is NextEPC, it’s well written, flexible and well supported.

I joined the Open5Gs group and I’ve contributed a few bits and pieces to the project, including a Python wrapper for adding / managing subscribers in the built in Home Subscriber Server (HSS).

You can get it from the support/ directory in Open5Gs.

NextEPC Python Library

Basic Python library to interface with MongoDB subscriber DB in NextEPC HSS / PCRF. Requires Python 3+, mongo, pymongo and bson. (All available through PIP)

If you are planning to run this on a different machine other than localhost (the machine hosting the MongoDB service) you will need to enable remote access to MongoDB by binding it’s IP to 0.0.0.0:

This is done by editing /etc/mongodb.conf and changing the bind IP to: bind_ip = 0.0.0.0

Restart MongoDB for changes to take effect.

$ /etc/init.d/mongodb restart

Basic Example:

import NextEPC
NextEPC_1 = NextEPC("10.0.1.118", 27017)

pdn = [{'apn': 'internet', 'pcc_rule': [], 'ambr': {'downlink': 1234, 'uplink': 1234}, 'qos': {'qci': 9, 'arp': {'priority_level': 8, 'pre_emption_vulnerability': 1, 'pre_emption_capability': 1}}, 'type': 2}]
sub_data = {'imsi': '891012222222300', \
             'pdn': pdn, \
             'ambr': {'downlink': 1024000, 'uplink': 1024001}, \
             'subscribed_rau_tau_timer': 12, \
             'network_access_mode': 2, \
             'subscriber_status': 0, \
             'access_restriction_data': 32, \
             'security': {'k': '465B5CE8 B199B49F AA5F0A2E E238A6BC', 'amf': '8000', 'op': None, 'opc': 'E8ED289D EBA952E4 283B54E8 8E6183CA'}, '__v': 0}

print(NextEPC_1.AddSubscriber(sub_data))                        #Add Subscriber using dict of sub_data

print(NextEPC_1.GetSubscriber('891012222222300'))               #Get added Subscriber's details

print(NextEPC_1.DeleteSubscriber('891012222222300'))            #Delete Subscriber

Subscriber_List = NextEPC_1.GetSubscribers()
for subscribers in Subscriber_List:
  print(subscribers['imsi'])

RTPengine Python API Calls via ng Control Protocol

RTPengine has an API / control protocol, which is what Kamailio / OpenSER uses to interact with RTPengine, called the ng Control Protocol.

Connection is based on Bencode encoded data and communicates via a UDP socket.

I wrote a simple Python script to pull active calls from RTPengine, code below:

#Quick Python library for interfacing with Sipwise's fantastic rtpengine - https://github.com/sipwise/rtpengine
#Bencode library from https://pypi.org/project/bencode.py/ (Had to download files from webpage (PIP was out of date))

import bencode
import socket
import sys
import random
import string

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ('188.0.169.13', 2224)     #Your server address

cookie = "0_2393_6"
data = bencode.encode({'command': 'list'})

message = str(cookie) + " " + str(data)
print(message)


sent = sock.sendto(message, server_address)

print('waiting to receive')
data, server = sock.recvfrom(4096)
print('received "%s"' % data)
data = data.split(" ", 1)       #Only split on first space
print("Cookie is: " + str(data[0]))
print("Data is: " + str(bencode.decode(data[1])))
print("There are " + str(len(bencode.decode(data[1])['calls'])) + " calls up on RTPengine at " + str(server_address[0]))
for calls in bencode.decode(data[1])['calls']:
    print(calls)
    cookie = "1_2393_6"
    data = bencode.encode({'command': 'query', 'call-id': str(calls)})
    message = str(cookie).encode('utf-8') + " ".encode('utf-8') + str(data).encode('utf-8')
    sent = sock.sendto(message, server_address)
    print('\n\nwaiting to receive')
    data, server = sock.recvfrom(8192)

    data = data.split(" ", 1)       #Only split on first space
    bencoded_data = bencode.decode(data[1])

    for keys in bencoded_data:
        print(keys)
        print("\t" + str(bencoded_data[keys]))

sock.close()

PyHSS – Python 3GPP LTE Home Subscriber Server

I recently started working on an issue that I’d seen was to do with the HSS response to the MME on an Update Location Answer.

I took some Wireshark traces of a connection from the MME to the HSS, and compared that to a trace from a different HSS. (Amarisoft EPC/HSS)

The Update Location Answer sent by the Amarisoft HSS to the MME over the S6a (Diameter) interface includes an AVP for “Multiple APN Configuration” which has the the dedicated bearer for IMS, while the HSS in the software I was working on didn’t.

After a bit of bashing trying to modify the S6a responses, I decided I’d just implement my own Home Subscriber Server.

The Diameter interface is pretty straight forward to understand, using a similar structure to RADIUS, and with the exception of the Crypto for the EUTRAN Authentication Vectors, it was all pretty straight forward.

If you’d like to know more you can download PyHSS from my GitLab page, and view my Diameter Primer post and my post on Diameter packet structure.

PyRTP – Simple RTP Library for Python

I recently had a scenario where I had to encode and decode RTP packets off the wire.

I wrote a Python Library to handle it which I’ve published for anyone to use.

Encoding data is quite simple, it takes a dictionary of values to fill the headers and payload and returns hex data to be sent down the wire:

payload = 'd5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5' 

packet_vars = {'version' : 2, 'padding' : 0, 'extension' : 0, 'csi_count' : 0, 'marker' : 0, 'payload_type' : 8, 'sequence_number' : 306, 'timestamp' : 306, 'ssrc' : 185755418, payload' : payload} 

PyRTP.GenerateRTPpacket(packet_vars)             #Generates hex to send down the wire 

And decoding is the same but reverse, feed it hex data and it returns a dict of values:

packet_bytes = '8008d4340000303c0b12671ad5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5d5'

rtp_params = PyRTP.DecodeRTPpacket(packet_bytes) #Returns dict of values from packet

Hopefully it’ll save someone else some time in the future.

For more info on RTP see:

RTP – More than you Wanted to Know for a deep dive into the packet structure

Reverse MD5 on SIP Auth

MD5 isn’t a particularly well regarded hashing function these days, but it’s still pretty ubiquitous.

SIP authentication, for the most part, still uses MD5 in the form of Message Digest Authentication,

If we were to take the password password and hash it using an online tool to generate MD5 Hashes we’d get “482c811da5d5b4bc6d497ffa98491e38”


If we hash password again with MD5 we’d get the same output – “482c811da5d5b4bc6d497ffa98491e38”,


The catch with this is if you put “5f4dcc3b5aa765d61d8327deb882cf99” into a search engine, Google immediately tells you it’s plain text value. That’s because the MD5 of password is always 5f4dcc3b5aa765d61d8327deb882cf99, hashing the same input phase “password” always results in the same output MD5 hash aka “response”.

By using Message Digest Authentication we introduce a “nonce” value and mix it (“salt”) with the SIP realm, username, password and request URI, to ensure that the response is different every time.

Let’s look at this example REGISTER flow:

We can see a REGISTER message has been sent by Bob to the SIP Server.

REGISTER sips:ss2.biloxi.example.com SIP/2.0    
Via: SIP/2.0/TLS client.biloxi.example.com:5061;branch=z9hG4bKnashds7
Max-Forwards: 70
From: Bob <sips:[email protected]>;tag=a73kszlfl
To: Bob <sips:[email protected]>
Call-ID: [email protected]
CSeq: 1 REGISTER
Contact: <sips:[email protected]>
Content-Length: 0

The SIP Server has sent back a 401 Unauthorised message, but includes the WWW-Authenticate header field, from this, we can grab a Realm value, and a Nonce, which we’ll use to generate our response that we’ll send back.

 SIP/2.0 401 Unauthorized    
Via: SIP/2.0/TLS client.biloxi.example.com:5061;branch=z9hG4bKnashds7 ;received=192.0.2.201
From: Bob <sips:[email protected]>;tag=a73kszlfl
To: Bob <sips:[email protected]>;tag=1410948204
Call-ID: [email protected]
CSeq: 1 REGISTER
WWW-Authenticate: Digest realm="atlanta.example.com", qop="auth",nonce="ea9c8e88df84f1cec4341ae6cbe5a359", opaque="", stale=FALSE, algorithm=MD5
Content-Length: 0

The formula for generating the response looks rather complex but really isn’t that bad.

HA1=MD5(username:realm:password)
HA2=MD5(method:digestURI)
response=MD5(HA1:nonce:HA2)

Let’s say in this case Bob’s password is “bobspassword”, let’s generate a response back to the server.

We know the username which is bob, the realm which is atlanta.example.com, digest URI is sips:biloxi.example.com, method is REGISTER and the password which is bobspassword. This seems like a lot to go through but all of these values, with the exception of the password, we just get from the 401 headers above.

So let’s generate the first part called HA1 using the formula HA1=MD5(username:realm:password), so let’s substitute this with our real values:
HA1 = MD5(bob:atlanta.example.com:bobspassword)
So if we drop bob:atlanta.example.com:bobspassword into our MD5 hasher and we get our HA1 hash and it it looks like 2da91700e1ef4f38df91500c8729d35f, so HA1 = 2da91700e1ef4f38df91500c8729d35f

Now onto the second part, we know the Method is REGISTER, and our digestURI is sips:biloxi.example.com
HA2=MD5(method:digestURI)
HA2=MD5(REGISTER:sips:biloxi.example.com)
Again, drop REGISTER:sips:biloxi.example.com into our MD5 hasher, and grab the output – 8f2d44a2696b3b3ed781d2f44375b3df
This means HA2 = 8f2d44a2696b3b3ed781d2f44375b3df

Finally we join HA1, the nonce and HA2 in one string and hash it:
Response = MD5(2da91700e1ef4f38df91500c8729d35f:ea9c8e88df84f1cec4341ae6cbe5a359:8f2d44a2696b3b3ed781d2f44375b3df)

Which gives us our final response of “bc2f51f99c2add3e9dfce04d43df0c6a”, so let’s see what happens when Bob sends this to the SIP Server.

REGISTER sips:ss2.biloxi.example.com SIP/2.0 
Via: SIP/2.0/TLS client.biloxi.example.com:5061;branch=z9hG4bKnashd92
Max-Forwards: 70
From: Bob <sips:[email protected]>;tag=ja743ks76zlflH
To: Bob <sips:[email protected]>
Call-ID: [email protected]
CSeq: 2 REGISTER
Contact: <sips:[email protected]>
Authorization: Digest username="bob", realm="atlanta.example.com", nonce="ea9c8e88df84f1cec4341ae6cbe5a359", opaque="", uri="sips:ss2.biloxi.example.com", response="bc2f51f99c2add3e9dfce04d43df0c6a"
Content-Length: 0
SIP/2.0 200 OK
Via: SIP/2.0/TLS client.biloxi.example.com:5061;branch=z9hG4bKnashd92;received=192.0.2.201
From: Bob <sips:[email protected]>;tag=ja743ks76zlflH
To: Bob <sips:[email protected]>;tag=37GkEhwl6
Call-ID: [email protected]
CSeq: 2 REGISTER
Contact: <sips:[email protected]>;expires=3600
Content-Length: 0

There you have it, a 200 OK response and Bob is registered on biloxi.example.com.

Update 2021: Jason Murley has contributed a much more robust version of the code below, which is way better than what I’d made!

You can find his code here.

I’ve written a little tool in Python to generate the correct response based on the nonce and values associated with it:

import hashlib

nonce = 'ea9c8e88df84f1cec4341ae6cbe5a359'
realm = 'sips:biloxi.example.com'
password = 'bobspassword'
username    =   str("bob")
requesturi  =   str(s"ips:biloxi.example.com")
print("username: " + username)
print("nonce: " + nonce)
print("realm: " + realm)
print("password: " + password)
print("\n")

HA1str = username + ":" + realm + ":" + password
HA1enc = (hashlib.md5(HA1str.encode()).hexdigest())
print ("HA1 String: " + HA1str)
print ("HA1 Encrypted: " + HA1enc)
HA2str = "REGISTER:" + requesturi
HA2enc = (hashlib.md5(HA2str.encode()).hexdigest())

print ("HA2 String: " + HA2str)
print ("HA2 Encrypted: " + HA2enc)

responsestr = HA1enc + ":" + nonce + ":" + HA2enc
print("Response String: " + responsestr)
responseenc = str((hashlib.md5(responsestr.encode()).hexdigest()))
print("Response Encrypted" + responseenc)