You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

152 lines
5.6 KiB

"""GNUnet REST client following https://rest.gnunet.org/"""
import requests
import socket
class GNUnetRest:
def __init__(self, host="localhost", port="7776"):
self.host = host
self.port = port
self.base_url = "http://{}:{}/".format(host, port)
self.identity = self.base_url + "identity/"
self.identity_name = self.identity + "name/"
self.identity_hash = self.identity + "pubkey/"
self.identity_subsystem = self.identity + "subsystem/"
self.identity_namestore = self.identity_subsystem + "namestore/"
self.namestore = self.base_url + "namestore/"
self.gns = self.base_url + "gns/"
def is_port_open(self):
# check if port is open
# not the best solution, it should better check if REST responds \
# and not only if port is listening
testsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
testlocation = (self.host, int(self.port))
testresult = testsocket.connect_ex(testlocation)
testsocket.close()
if testresult == 0:
return True
else:
return False
# Identity Section
def identity_list(self):
"""Returns a list of all identities or tuple in case of error."""
r = requests.get(self.identity)
return r.json() if r.ok else (False, r.reason)
def fetch_identity_keyhash(self, identity_name):
"""Returns the hash of the public key of an identity by name."""
r = requests.get(self.identity_name + identity_name)
return r.json()['pubkey'] if r.ok else (False, r.reason)
def fetch_identity_name(self, key_hash):
"""Returns the name of an identity specified by the key hash."""
r = requests.get(self.identity_hash + key_hash)
return r.json()['name'] if r.ok else (False, r.reason)
def fetch_namestore_default_identity(self):
"""Returns the default identity of subsystem 'namestore'."""
r = requests.get(self.identity_namestore)
return r.json()['name'] if r.ok else (False, r.reason)
def create_identity(self, identity_name):
"""Creates Identy, returns True if successful, tuple if not."""
r = requests.post(self.identity, json={"name": identity_name})
return r.ok if r.ok else (False, r.reason)
def change_identity_name(self, identity_name, new_identity_name):
"""Changes name of identity.
Returns True if successful, tuple if not.
"""
r = requests.put(
self.identity_name + identity_name,
json={"newname": new_identity_name}
)
return True if r.ok else (False, r.reason)
def delete_identity(self, identity_name):
"""Deletes Identity, returns True if successful, tuple if not."""
r = requests.delete(self.identity_name + identity_name)
return True if r.ok else (False, r.reason)
# Namestore Section
def set_namestore_default_identity(self, identity_name):
"""Sets the default Identity for subsystem 'namestore'.
Returns True if successful tuple if not.
"""
r = requests.put(
self.identity_subsystem + identity_name,
json={"subsystem": "namestore"})
return True if r.ok else (False, r.reason)
def list_namestore_entries(self, zone):
"""Returns a list of all namestore entries."""
r = requests.get(self.namestore + zone)
return r.json() if r.ok else (False, r.reason)
def add_namestore_entry(self, zone, record_name, value, record_type):
"""Adds a namestore entry, returns true if successful, tuple if not."""
# It needs to send all flags, otherwise it'd return 500.
entry = {
'record_name': record_name,
'data': [{
'value': value,
'record_type': record_type,
"expiration_time": "1d",
"private": False,
"relative_expiration": False,
"supplemental": False,
"shadow": False,
}]
}
r = requests.post(self.namestore + zone, json=entry)
return True if r.ok else (False, r.reason)
def change_namestore_entry(self, zone, record_name, value, record_type):
"""Changes a namestore entry,
returns true if successful, tuple if not.
"""
entry = {
'record_name': record_name,
'data': [{
'value': value,
'record_type': record_type,
"expiration_time": "1d",
"private": False,
"relative_expiration": False,
"supplemental": False,
"shadow": False,
}]
}
r = requests.put(self.namestore + zone, json=entry)
return True if r.ok else (False, r.reason)
def delete_namestore_entry(self, zone, record_name):
"""Deletes a record_namestore entry.
Returns true if successful, tuple if not.
"""
r = requests.delete(self.namestore + zone + "/" + record_name)
return True if r.ok else (False, r.reason)
# GNS section
def list_gns_record(self, zone, record_name):
"""Returns a list of values and record types by record_name."""
r = requests.get(self.gns + record_name + "." + zone)
return r.json() if r.ok else (False, r.reason)
def list_gns_record_type(self, zone, record_name, record_type):
"""Returns a list of values by record name and record type."""
r = requests.get(
self.gns + record_name + "." + zone + "?record_type=" + record_type
)
return r.json() if r.ok else (False, r.reason)