tests: Make interface for test

This commit is contained in:
Erfan Abdi 2021-08-06 09:21:18 +04:30
parent 1959f81d19
commit 121095747e
No known key found for this signature in database
GPG Key ID: 64D46BE0629BACD5
3 changed files with 89 additions and 48 deletions

79
tests/ITest.py Normal file
View File

@ -0,0 +1,79 @@
import gbinder
import logging
import time
from gi.repository import GLib
BINDER_DRIVER = "/dev/binder"
INTERFACE = "gbinder.ITest"
SERVICE_NAME = "gbinder_test"
TRANSACTION_sendTest = 1
serviceManager = gbinder.ServiceManager(BINDER_DRIVER)
class ITest:
def __init__(self, remote):
self.client = gbinder.Client(remote, INTERFACE)
def sendTest(self, arg1):
request = self.client.new_request()
request.append_string16(arg1)
reply, status = self.client.transact_sync_reply(
TRANSACTION_sendTest, request)
if status:
logging.error("Sending reply failed")
else:
reader = reply.init_reader()
rep1 = reader.read_string16()
return rep1
return None
def add_service(sendTest):
def response_handler(req, code, flags):
logging.debug("Received transaction: ", code)
reader = req.init_reader()
local_response = response.new_reply()
if code == TRANSACTION_sendTest:
arg1 = reader.read_string16()
ret1 = sendTest(arg1)
local_response.append_string16(ret1)
return local_response, 0
def binder_presence():
if serviceManager.is_present():
status = serviceManager.add_service_sync(SERVICE_NAME, response)
if status:
logging.error("Failed to add service " + SERVICE_NAME)
loop.quit()
response = serviceManager.new_local_object(INTERFACE, response_handler)
loop = GLib.MainLoop()
binder_presence()
status = serviceManager.add_presence_handler(binder_presence)
if status:
loop.run()
else:
logging.error("Failed to add presence handler: {}".format(status))
def get_service():
tries = 1000
remote, status = serviceManager.get_service_sync(SERVICE_NAME)
while(not remote):
if tries > 0:
logging.warning(
"Failed to get service {}, trying again...".format(SERVICE_NAME))
time.sleep(1)
remote, status = serviceManager.get_service_sync(SERVICE_NAME)
tries = tries - 1
else:
return None
return ITest(remote)

View File

@ -1,27 +1,8 @@
import gbinder
service_manager = gbinder.ServiceManager("/dev/binder")
iface = "test@1.0"
fqname = "test"
code = 1
remote, status = service_manager.get_service_sync(fqname)
if remote:
print("Connected to " + fqname)
client = gbinder.Client(remote, iface)
import ITest
service = ITest.get_service()
if service:
while True:
request = client.new_request()
value = input("Say something: ")
request.append_string16(value)
reply, status = client.transact_sync_reply(code, request)
if status:
print("Sending string failed")
else:
reader = reply.init_reader()
reply_value = reader.read_string16()
print("Reply: ", reply_value)
else:
print("Failed to Connected to " + fqname)
ret = service.sendTest(value)
print(ret)

View File

@ -1,26 +1,7 @@
import gbinder
from gi.repository import GLib
import ITest
service_manager = gbinder.ServiceManager("/dev/binder")
iface = "test@1.0"
fqname = "test"
Command = 1
def sendTest(value):
print(value)
return value
def got_reply(req, code, flags):
reader = req.init_reader()
if code == Command:
val = reader.read_string16()
print ("Got: ", val)
lr = response.new_reply()
lr.append_string16(val)
return lr, 0
response = service_manager.new_local_object(iface, got_reply)
status = service_manager.add_service_sync(fqname, response)
if not status:
loop = GLib.MainLoop()
loop.run()
else:
print("Failed to add service " + fqname)
ITest.add_service(sendTest)