479 lines
20 KiB
Python
479 lines
20 KiB
Python
import csv
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import time
|
|
from argparse import ArgumentParser, Namespace
|
|
from typing import List, Dict
|
|
from xml.dom import minidom
|
|
|
|
from xml.parsers.expat import ExpatError
|
|
|
|
ALL_APPS = ['ActivityDiary', 'AmazeFileManager', 'and-bible', 'AnkiDroid', 'APhotoManager', 'commons',
|
|
'collect', 'FirefoxLite', 'Frost', 'geohashdroid', 'MaterialFBook', 'nextcloud', 'Omni-Notes',
|
|
'open-event-attendee-android', 'openlauncher', 'osmeditor4android', 'Phonograph', 'Scarlet-Notes',
|
|
'sunflower', 'WordPress']
|
|
|
|
|
|
def get_app_name(testing_result_dir):
|
|
for app_name in ALL_APPS:
|
|
if os.path.basename(testing_result_dir).startswith(app_name):
|
|
return app_name
|
|
print("Warning: cannot find app name for this testing result dir: %s" % testing_result_dir)
|
|
|
|
|
|
def get_apk_name(testing_result_dir: str):
|
|
base_name = os.path.basename(testing_result_dir)
|
|
target_apk_file_name = str(base_name.split(".apk")[0]) + ".apk"
|
|
return target_apk_file_name
|
|
|
|
|
|
def get_issue_id(testing_result_dir: str):
|
|
base_name = os.path.basename(testing_result_dir)
|
|
issue_id_str = base_name.split("#")[1].split(".")[0]
|
|
return str(issue_id_str)
|
|
|
|
|
|
def read_coverage_jacoco(jacoco_report_file):
|
|
if not os.path.isfile(jacoco_report_file):
|
|
return False, 0, 0, 0, 0
|
|
|
|
try:
|
|
# see the format of coverage report generated by Jacoco in xml
|
|
xmldoc = minidom.parse(jacoco_report_file)
|
|
counters = xmldoc.getElementsByTagName('counter')
|
|
|
|
line_coverage = 0
|
|
branch_coverage = 0
|
|
method_coverage = 0
|
|
class_coverage = 0
|
|
|
|
for counter in counters:
|
|
type_name = counter.getAttribute('type')
|
|
missed_items = int(counter.getAttribute('missed'))
|
|
covered_items = int(counter.getAttribute('covered'))
|
|
|
|
if type_name == 'LINE':
|
|
line_coverage = covered_items * 100.0 / (missed_items + covered_items)
|
|
|
|
if type_name == 'BRANCH':
|
|
branch_coverage = covered_items * 100.0 / (missed_items + covered_items)
|
|
|
|
if type_name == 'METHOD':
|
|
method_coverage = covered_items * 100.0 / (missed_items + covered_items)
|
|
|
|
if type_name == 'CLASS':
|
|
class_coverage = covered_items * 100.0 / (missed_items + covered_items)
|
|
|
|
print("-----------")
|
|
print("Line: " + str(line_coverage) + ", Branch: " + str(branch_coverage) + ", Method: " + str(method_coverage)
|
|
+ ", Class: " + str(class_coverage))
|
|
print("-----------")
|
|
return True, float("{:.2f}".format(line_coverage)), float("{:.2f}".format(branch_coverage)), \
|
|
float("{:.2f}".format(method_coverage)), float("{:.2f}".format(class_coverage))
|
|
except ExpatError:
|
|
print("*****Parse xml error, catch it!********")
|
|
return False, 0, 0, 0, 0
|
|
|
|
|
|
def get_class_source_files_dirs(app_name, target_apk_file_name):
|
|
class_files = os.path.join("../" + app_name, "class_files.json")
|
|
assert os.path.exists(class_files)
|
|
|
|
tmp_file = open(class_files, "r")
|
|
tmp_file_dict = json.load(tmp_file)
|
|
tmp_file.close()
|
|
|
|
# Get the class and source files #
|
|
class_source_files_dict = tmp_file_dict[target_apk_file_name]
|
|
|
|
class_files_dirs = class_source_files_dict['classfiles']
|
|
source_files_dirs = class_source_files_dict['sourcefiles']
|
|
|
|
assert len(class_files_dirs) != 0 and len(source_files_dirs) != 0
|
|
|
|
return class_files_dirs, source_files_dirs
|
|
|
|
|
|
def get_class_files_str(app_name, class_files_dirs):
|
|
class_files_dirs_str = ""
|
|
for tmp_dir in class_files_dirs:
|
|
class_files_dirs_str += " --classfiles " + os.path.join("../" + app_name, tmp_dir)
|
|
return class_files_dirs_str
|
|
|
|
|
|
def get_coverage_ec_files_str(coverage_data_dir):
|
|
# Get the coverage data files #
|
|
coverage_ec_files = [os.path.join(coverage_data_dir, f) for f in os.listdir(coverage_data_dir) if
|
|
os.path.isfile(os.path.join(coverage_data_dir, f)) and f.endswith('.ec')]
|
|
|
|
coverage_ec_files_str = ""
|
|
for ec_file in coverage_ec_files:
|
|
coverage_ec_files_str += " " + ec_file
|
|
|
|
merged_coverage_ec_file_path = os.path.join(coverage_data_dir, "coverage_all.ec")
|
|
|
|
if not os.path.exists(merged_coverage_ec_file_path):
|
|
# only merge when the "coverage_all.ec" does not exist
|
|
|
|
merge_cmd = "java -jar ../tools/jacococli.jar merge " + coverage_ec_files_str + \
|
|
" --destfile " + merged_coverage_ec_file_path
|
|
print('$ %s' % merge_cmd)
|
|
|
|
try:
|
|
p = subprocess.Popen(merge_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
# clear the output
|
|
output = p.communicate()[0].decode('utf-8').strip()
|
|
print(output)
|
|
except os.error as e:
|
|
print(e)
|
|
|
|
return merged_coverage_ec_file_path
|
|
|
|
|
|
def get_coverage_ec_files_str_optimized(coverage_data_dir):
|
|
# Get the coverage data files #
|
|
coverage_ec_files = [os.path.join(coverage_data_dir, f) for f in os.listdir(coverage_data_dir) if
|
|
os.path.isfile(os.path.join(coverage_data_dir, f)) and f.endswith('.ec')]
|
|
|
|
# split the coverage ec files due to the list is too long
|
|
split_size = 30
|
|
coverage_ec_files_list: List[List[str]] = [coverage_ec_files[i:i + split_size] for i in
|
|
range(0, len(coverage_ec_files), split_size)]
|
|
|
|
merged_coverage_file_index = 0
|
|
merged_coverage_ec_files = []
|
|
|
|
for sublist_coverage_ec_files in coverage_ec_files_list:
|
|
|
|
coverage_ec_files_str = ""
|
|
for ec_file in sublist_coverage_ec_files:
|
|
coverage_ec_files_str += " " + ec_file
|
|
|
|
merged_coverage_file_index += 1
|
|
merged_coverage_ec_file_path = os.path.join(coverage_data_dir, "coverage_all_" + str(merged_coverage_file_index) + ".ec")
|
|
|
|
merge_cmd = "java -jar ../tools/jacococli.jar merge " + coverage_ec_files_str + \
|
|
" --destfile " + merged_coverage_ec_file_path
|
|
print('$ %s' % merge_cmd)
|
|
|
|
try:
|
|
p = subprocess.Popen(merge_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
# clear the output
|
|
output = p.communicate()[0].decode('utf-8').strip()
|
|
print(output)
|
|
|
|
merged_coverage_ec_files.append(merged_coverage_ec_file_path)
|
|
|
|
except os.error as e:
|
|
print(e)
|
|
|
|
merged_coverage_ec_files_str = ""
|
|
for ec_file in merged_coverage_ec_files:
|
|
merged_coverage_ec_files_str += " " + ec_file
|
|
|
|
return merged_coverage_ec_files_str
|
|
|
|
|
|
def compute_code_coverage(app_name, tool_name, testing_result_dir, coverage_data_dir):
|
|
target_apk_file_name = get_apk_name(testing_result_dir)
|
|
|
|
class_files_dirs, source_files_dirs = get_class_source_files_dirs(app_name, target_apk_file_name)
|
|
|
|
class_files_dirs_str = get_class_files_str(app_name, class_files_dirs)
|
|
|
|
if tool_name == "timemachine":
|
|
coverage_ec_files_str = get_coverage_ec_files_str_optimized(coverage_data_dir)
|
|
else:
|
|
coverage_ec_files_str = get_coverage_ec_files_str(coverage_data_dir)
|
|
|
|
# handle the case when no coverage data is available
|
|
if len(coverage_ec_files_str) == 0:
|
|
return False, 0, 0, 0, 0
|
|
|
|
# Assemble and execute the coverage computation command #
|
|
xml_coverage_report_file_path = os.path.join(testing_result_dir, "coverage_report.xml")
|
|
cmd = "java -jar ../tools/jacococli.jar report " + coverage_ec_files_str + class_files_dirs_str + " --xml " + \
|
|
xml_coverage_report_file_path
|
|
print('$ %s' % cmd)
|
|
|
|
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
# clear the output
|
|
output = p.communicate()[0].decode('utf-8').strip()
|
|
print(output)
|
|
|
|
return read_coverage_jacoco(xml_coverage_report_file_path)
|
|
|
|
|
|
def compute_single_run_code_coverage(app_name, tool_name, issue_id,
|
|
target_app_testing_result_dirs,
|
|
coverage_data_summary_file_path):
|
|
for tmp_dir in target_app_testing_result_dirs:
|
|
|
|
# only compute coverage for specific issue
|
|
if issue_id is not None and issue_id not in tmp_dir:
|
|
continue
|
|
|
|
if tool_name not in tmp_dir:
|
|
# double check to ensure the testing result dir is indeed from the target tool
|
|
continue
|
|
|
|
print(tmp_dir)
|
|
|
|
if tool_name == "timemachine":
|
|
coverage_data_dir = os.path.join(tmp_dir, "timemachine-output")
|
|
else:
|
|
coverage_data_dir = tmp_dir
|
|
|
|
# If is_valid_data is False, it means the no coverage files exists or parsing coverage report failed.
|
|
is_valid_data, line_coverage, branch_coverage, method_coverage, class_coverage = \
|
|
compute_code_coverage(app_name, tool_name, tmp_dir, coverage_data_dir)
|
|
|
|
# dump info into csv
|
|
if is_valid_data:
|
|
print("**** Valid data, write to the file ****")
|
|
time.sleep(1)
|
|
with open(coverage_data_summary_file_path, "a") as csv_file:
|
|
writer = csv.writer(csv_file)
|
|
writer.writerow(
|
|
[app_name, tool_name, os.path.basename(tmp_dir), line_coverage, branch_coverage, method_coverage,
|
|
class_coverage])
|
|
csv_file.close()
|
|
|
|
|
|
def compute_average_code_coverage(app_name, tool_name, issue_id,
|
|
target_app_testing_result_dirs,
|
|
average_coverage_data_summary_file_path):
|
|
average_coverage_dict: Dict[str, Dict[str, List[float]]] = {}
|
|
|
|
for tmp_dir in target_app_testing_result_dirs:
|
|
|
|
# only compute coverage for specific issue
|
|
if issue_id is not None and issue_id not in tmp_dir:
|
|
continue
|
|
|
|
if tool_name not in tmp_dir:
|
|
# double check to ensure the testing result dir is indeed from the target tool
|
|
continue
|
|
|
|
print(tmp_dir)
|
|
|
|
if tool_name == "timemachine":
|
|
coverage_data_dir = os.path.join(tmp_dir, "timemachine-output")
|
|
else:
|
|
coverage_data_dir = tmp_dir
|
|
|
|
# If is_valid_data is False, it means the no coverage files exists or parsing coverage report failed.
|
|
is_valid_data, line_coverage, branch_coverage, method_coverage, class_coverage = \
|
|
compute_code_coverage(app_name, tool_name, tmp_dir, coverage_data_dir)
|
|
|
|
# dump info into csv
|
|
if is_valid_data and line_coverage > 0.0:
|
|
|
|
# skip ZERO coverage
|
|
issue_id_str = get_issue_id(tmp_dir)
|
|
if issue_id_str not in average_coverage_dict:
|
|
average_coverage_dict[issue_id_str] = {'line': [line_coverage],
|
|
'branch': [branch_coverage],
|
|
'method': [method_coverage],
|
|
'class': [class_coverage]}
|
|
else:
|
|
tmp_coverage_dict = average_coverage_dict[issue_id_str]
|
|
tmp_coverage_dict['line'].append(line_coverage)
|
|
tmp_coverage_dict['branch'].append(branch_coverage)
|
|
tmp_coverage_dict['method'].append(method_coverage)
|
|
tmp_coverage_dict['class'].append(class_coverage)
|
|
|
|
for issue_id_str in average_coverage_dict:
|
|
print("**** [%s] Compute average coverage data, write to the file ****" % issue_id_str)
|
|
tmp_coverage_dict = average_coverage_dict[issue_id_str]
|
|
average_line_coverage = "{:.2f}".format(sum(tmp_coverage_dict['line']) / len(tmp_coverage_dict['line']))
|
|
average_branch_coverage = "{:.2f}".format(sum(tmp_coverage_dict['branch']) / len(tmp_coverage_dict['branch']))
|
|
average_method_coverage = "{:.2f}".format(sum(tmp_coverage_dict['method']) / len(tmp_coverage_dict['method']))
|
|
average_class_coverage = "{:.2f}".format(sum(tmp_coverage_dict['class']) / len(tmp_coverage_dict['class']))
|
|
|
|
with open(average_coverage_data_summary_file_path, "a") as csv_file:
|
|
writer = csv.writer(csv_file)
|
|
writer.writerow(
|
|
[app_name, issue_id_str, average_line_coverage, average_branch_coverage, average_method_coverage,
|
|
average_class_coverage])
|
|
csv_file.close()
|
|
|
|
|
|
def cluster_testing_result_dirs_by_apk(testing_result_dirs: List[str]):
|
|
clustered_dict: Dict[str, List[str]] = {}
|
|
for tmp_dir in testing_result_dirs:
|
|
target_apk_file_name = get_apk_name(tmp_dir)
|
|
if target_apk_file_name not in clustered_dict:
|
|
clustered_dict[target_apk_file_name] = [tmp_dir]
|
|
else:
|
|
clustered_dict[target_apk_file_name].append(tmp_dir)
|
|
return clustered_dict
|
|
|
|
|
|
def compute_all_run_code_coverage(app_name: str, output_dir, tool_name,
|
|
target_app_testing_result_dirs: List[str],
|
|
accumulative_coverage_result_file_path):
|
|
clustered_dict = cluster_testing_result_dirs_by_apk(target_app_testing_result_dirs)
|
|
|
|
for target_apk_file_name in clustered_dict:
|
|
|
|
all_run_testing_result_dirs = clustered_dict[target_apk_file_name]
|
|
|
|
coverage_ec_files_str = ""
|
|
|
|
for tmp_dir in all_run_testing_result_dirs:
|
|
|
|
if tool_name not in tmp_dir:
|
|
# double check to ensure the testing result dir is indeed from the target tool
|
|
continue
|
|
|
|
if tool_name == "timemachine":
|
|
coverage_data_dir = os.path.join(tmp_dir, "timemachine-output")
|
|
coverage_ec_files_str += " " + get_coverage_ec_files_str_optimized(coverage_data_dir)
|
|
else:
|
|
coverage_data_dir = tmp_dir
|
|
coverage_ec_files_str += " " + get_coverage_ec_files_str(coverage_data_dir)
|
|
|
|
class_files_dirs, source_files_dirs = get_class_source_files_dirs(app_name, target_apk_file_name)
|
|
|
|
class_files_dirs_str = get_class_files_str(app_name, class_files_dirs)
|
|
|
|
# Assemble and execute the coverage computation command #
|
|
xml_coverage_report_file_path = os.path.join(output_dir, target_apk_file_name + ".coverage_report.xml")
|
|
cmd = "java -jar ../tools/jacococli.jar report " + coverage_ec_files_str + class_files_dirs_str + " --xml " + \
|
|
xml_coverage_report_file_path
|
|
print('$ %s' % cmd)
|
|
|
|
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
# clear the output
|
|
output = p.communicate()[0].decode('utf-8').strip()
|
|
print(output)
|
|
|
|
is_valid_data, line_coverage, branch_coverage, method_coverage, class_coverage = \
|
|
read_coverage_jacoco(xml_coverage_report_file_path)
|
|
|
|
# dump info into csv
|
|
if is_valid_data:
|
|
with open(accumulative_coverage_result_file_path, "a") as csv_file:
|
|
writer = csv.writer(csv_file)
|
|
writer.writerow(
|
|
[app_name, tool_name, target_apk_file_name, line_coverage, branch_coverage, method_coverage,
|
|
class_coverage])
|
|
csv_file.close()
|
|
|
|
|
|
def main(args: Namespace):
|
|
# collect all testing result dirs and cluster them according to their app names
|
|
all_testing_results_dirs: Dict[str, List[str]] = {}
|
|
subdirs = os.listdir(args.o)
|
|
for subdir in subdirs:
|
|
# print(subdir)
|
|
subdir_path = os.path.join(args.o, subdir)
|
|
if os.path.isdir(subdir_path):
|
|
app_name = get_app_name(subdir_path)
|
|
if app_name not in all_testing_results_dirs:
|
|
all_testing_results_dirs[app_name] = [subdir_path]
|
|
else:
|
|
all_testing_results_dirs[app_name].append(subdir_path)
|
|
|
|
print("---------")
|
|
print(all_testing_results_dirs)
|
|
print("---------")
|
|
|
|
if args.timemachine:
|
|
tool_name = "timemachine"
|
|
else:
|
|
if args.monkey:
|
|
tool_name = "monkey"
|
|
elif args.ape:
|
|
tool_name = "ape"
|
|
elif args.combo:
|
|
tool_name = "combodroid"
|
|
elif args.humandroid:
|
|
tool_name = "humandroid"
|
|
elif args.stoat:
|
|
tool_name = "stoat"
|
|
elif args.qtesting:
|
|
tool_name = "qtesting"
|
|
else:
|
|
tool_name = ""
|
|
|
|
if args.acc_csv is not None:
|
|
|
|
accumulative_coverage_result_file_path = args.acc_csv
|
|
if os.path.exists(accumulative_coverage_result_file_path):
|
|
os.remove(accumulative_coverage_result_file_path)
|
|
|
|
for app_name in all_testing_results_dirs:
|
|
# only compute coverage for specific app
|
|
if args.app_name is not None and app_name != args.app_name:
|
|
continue
|
|
target_app_testing_result_dirs = all_testing_results_dirs[app_name]
|
|
# compute coverage for all runs of an apk
|
|
compute_all_run_code_coverage(app_name, args.o, tool_name,
|
|
target_app_testing_result_dirs,
|
|
accumulative_coverage_result_file_path)
|
|
|
|
if args.single_csv is not None:
|
|
|
|
single_run_coverage_result_file_path = args.single_csv
|
|
if os.path.exists(single_run_coverage_result_file_path):
|
|
os.remove(single_run_coverage_result_file_path)
|
|
|
|
for app_name in all_testing_results_dirs:
|
|
# only compute coverage for specific app
|
|
if args.app_name is not None and app_name != args.app_name:
|
|
continue
|
|
target_app_testing_result_dirs = all_testing_results_dirs[app_name]
|
|
compute_single_run_code_coverage(app_name, tool_name, args.issue_id,
|
|
target_app_testing_result_dirs,
|
|
single_run_coverage_result_file_path)
|
|
if args.average_csv is not None:
|
|
|
|
average_coverage_result_file_path = args.average_csv
|
|
if os.path.exists(average_coverage_result_file_path):
|
|
os.remove(average_coverage_result_file_path)
|
|
|
|
for app_name in all_testing_results_dirs:
|
|
# only compute coverage for specific app
|
|
if args.app_name is not None and app_name != args.app_name:
|
|
continue
|
|
target_app_testing_result_dirs = all_testing_results_dirs[app_name]
|
|
compute_average_code_coverage(app_name, tool_name, args.issue_id,
|
|
target_app_testing_result_dirs,
|
|
average_coverage_result_file_path)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
ap = ArgumentParser()
|
|
|
|
ap.add_argument('-o', required=True, help="the output directory of testing results")
|
|
ap.add_argument('-v', default=False, action='store_true')
|
|
|
|
# supported fuzzing tools
|
|
ap.add_argument('--monkey', default=False, action='store_true')
|
|
ap.add_argument('--ape', default=False, action='store_true')
|
|
ap.add_argument('--timemachine', default=False, action='store_true')
|
|
ap.add_argument('--combo', default=False, action='store_true')
|
|
ap.add_argument('--humandroid', default=False, action='store_true')
|
|
ap.add_argument('--qtesting', default=False, action='store_true')
|
|
ap.add_argument('--stoat', default=False, action='store_true')
|
|
|
|
ap.add_argument('--app', type=str, dest='app_name')
|
|
ap.add_argument('--id', type=str, dest='issue_id')
|
|
ap.add_argument('--acc_csv', type=str, default=None, dest='acc_csv',
|
|
help="compute the accumulative coverage of all runs")
|
|
ap.add_argument('--single_csv', type=str, default=None, dest='single_csv',
|
|
help="compute the coverage of single runs")
|
|
ap.add_argument('--average_csv', type=str, default=None, dest='average_csv',
|
|
help="compute the average coverage of all runs")
|
|
|
|
args = ap.parse_args()
|
|
|
|
if not os.path.exists(args.o):
|
|
ap.error("Error: the output directory does not exist!")
|
|
|
|
main(args)
|