import requests import json import socket class GNUnetRest: def __init__(self, host="localhost", port="7776"): self.host = host self.port = port self.base_url = "http://{}:{}/".format(host, port) self.identity = self.base_url + "identity/" self.identity_name = self.identity + "name/" self.identity_hash = self.identity + "pubkey/" self.identity_subsystem = self.identity + "subsystem/" self.identity_namestore = self.identity_subsystem + "namestore/" self.namestore = self.base_url + "namestore/" self.gns = self.base_url + "gns/" def is_port_open(self): # check if port is open # not the best solution, it should better check if REST responds \ # and not only if port is listening testsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) testlocation = (self.host, int(self.port)) testresult = testsocket.connect_ex(testlocation) testsocket.close() if testresult == 0: return True else: return False # Identity Section def identity_list(self): """Returns a list of all identities or tuple in case of error.""" r = requests.get(self.identity) return r.json() if r.ok else (False, r.reason) def fetch_identity_keyhash(self, identity_name): """Returns the hash of the public key of an identity by name.""" r = requests.get(self.identity_hash + identity_name) return r.json()['name'] if r.ok else (False, r.reason) def fetch_identity_name(self, key_hash): """Returns the name of an identity specified by public key.""" r = requests.get(self.identity_name + key_hash) return r.json()['pubkey'] if r.ok else (False, r.reason) def fetch_identity_namestore(self): """Returns the default identity of subsystem 'namestore'.""" r = requests.get(self.identity_namestore) return r.json()['name'] if r.ok else (False, r.reason) def create_identity(self, identity_name): """Creates Identy, returns True if successful, tuple if not.""" r = requests.post(self.identity, {"name": identity_name}) return r.ok if r.ok else (False, r.reason) def change_identity_name(self, identity_name, new_identity_name): """Changes name of identity. Returns True if successful, tuple if not. """ r = requests.put( self.identity_name + identity_name, {"newname": new_identity_name} ) return True if r.ok else (False, r.reason) def delete_identity(self, identity_name): """Deletes Identity, returns True if successful, tuple if not.""" r = requests.delete(self.identity_name + identity_name) return True if r.ok else (False, r.reason) # Namestore Section def set_namestore_identity(self, identity_name): """Sets the default Identity for subsystem 'namestore'. Returns True if successful tuple if not. """ r = requests.put( self.identity_subsystem + identity_name, {"subsystem": "namestore"}) return True if r.ok else (False, r.reason) def list_namestore_entries(self, zone): """Returns a list of all namestore entries.""" r = requests.get(self.namestore + zone) return r.json() if r.ok else (False, r.reason) def add_namestore_entry(self, zone, record_name, value, record_type): """Adds a namestore entry, returns true if successful, tuple if not.""" data = { 'value': value, 'record_type': record_type, 'record_name': record_name, "expiration_time": "1d", "private": False, "relative_expiration": False, "supplemental": False, "shadow": False, } r = requests.post(self.namestore + zone, data) return True if r.ok else (False, r.reason) def change_namestore_entry(self, zone, record_name, value, record_type): """Changes a namestore entry, returns true if successful, tuple if not. """ data = { 'value': value, 'record_type': record_type, 'record_name': record_name, } r = requests.put(self.namestore + zone, data) return True if r.ok else (False, r.reason) def delete_namestore_entry(self, zone, record_name): """Deletes a record_namestore entry. Returns true if successful, tuple if not. """ r = requests.delete(self.namestore + zone + "/" + record_name) return True if r.ok else (False, r.reason) # GNS section def list_gns_record(self, zone, record_name): """Returns a list of values and record types by record_name.""" r = requests.get(self.gns + record_name + "." + zone) return r.json() if r.ok else (False, r.reason) def list_gns_record_type(self, zone, record_name, record_type): """Returns a list of values by record name and record type.""" r = requests.get( self.gns + record_name + "." + zone + "?record_type=" + record_type ) return r.json() if r.ok else (False, r.reason)