OVMS3-idf/components/test/TestCaseScript/TCPIPStress/ARPStress.py

122 lines
4.7 KiB
Python
Raw Normal View History

import time
from NativeLog import NativeLog
from TCAction import TCActionBase
from comm.NIC import Adapter
WARNING_COUNT = 5
class ARPStress(TCActionBase.CommonTCActionBase):
def __init__(self, name, test_env, cmd_set, timeout=30, log_path=TCActionBase.LOG_PATH):
TCActionBase.CommonTCActionBase.__init__(self, name, test_env, cmd_set=cmd_set,
timeout=timeout, log_path=log_path)
self.adapter = None
self.target_mode = "STA"
# load param from excel
for i in range(1, len(cmd_set)):
if cmd_set[i][0] != "dummy":
cmd_string = "self." + cmd_set[i][0]
exec cmd_string
self.result_cntx = TCActionBase.ResultCheckContext(self, test_env, self.tc_name)
pass
def cleanup(self):
self.adapter.close()
del self.adapter
def execute(self):
TCActionBase.TCActionBase.execute(self)
self.result_cntx.start()
try:
# configurable params
test_time = self.test_time * 60
# test frequency min should be 0.1s, otherwise reply could be missed
test_freq = self.test_freq if self.test_freq > 0.1 else 0.1
# test softAP or sta
target_mode = self.target_mode
# configurable params
except StandardError, e:
NativeLog.add_trace_critical("Error configuration for ARPStress script, error is %s" % e)
raise StandardError("Error configuration")
# get parameters
if target_mode == "STA":
target_ip = self.get_parameter("target_ip")
target_mac = self.get_parameter("target_mac")
pc_mac = self.get_parameter("pc_nic_mac")
pc_nic_name = self.get_parameter("pc_nic")
elif target_mode == "SoftAP":
target_ip = self.get_parameter("target_ap_ip")
target_mac = self.get_parameter("target_ap_mac")
pc_mac = self.get_parameter("pc_wifi_nic_mac")
pc_nic_name = self.get_parameter("pc_wifi_nic")
else:
raise StandardError("Unsupported target mode: %s" % target_mode)
time_start = time.time()
# open device
self.adapter = Adapter.Adapter(pc_nic_name, "capture+send")
ret = self.adapter.set_filter("arp and ether src %s and ether dst %s" % (target_mac, pc_mac))
if ret != "LIBPCAP_SUCCEED":
NativeLog.add_trace_critical("ARP Stress test error: %s" % ret)
return
ret = self.adapter.start_capture()
if ret != "LIBPCAP_SUCCEED":
NativeLog.add_trace_critical("ARP Stress test error: %s" % ret)
return
arp_pdu = self.adapter.create_pdu("ARP", self.adapter.create_payload(),
arp_op_code="request", arp_target_proto_addr=target_ip,
ethernet_dst_addr="ff:ff:ff:ff:ff:ff")
data = arp_pdu.to_bytes()
total_test_count = total_fail_count = successive_fail_count = most_successive_fail_count = 0
while (time.time() - time_start) < test_time:
# send arp req
ret = self.adapter.ether_send(data)
if ret != "LIBNET_SUCCEED":
NativeLog.add_prompt_trace("libnet send fail, %s" % ret)
continue
total_test_count += 1
# wait for reply
time.sleep(test_freq)
# should get one arp reply
pdu_list = self.adapter.get_packets()
if len(pdu_list) == 0:
# failed to get arp reply
total_fail_count += 1
successive_fail_count += 1
if successive_fail_count > WARNING_COUNT:
NativeLog.add_trace_critical("ARP Fail: successive fail %u times, total tested %u times"
% (successive_fail_count, total_test_count))
else:
most_successive_fail_count = most_successive_fail_count \
if most_successive_fail_count > successive_fail_count \
else successive_fail_count
successive_fail_count = 0
pass
NativeLog.add_trace_critical("ARP stress test, total %s times, failed %s times, most successive fail count %s"
% (total_test_count, total_fail_count, most_successive_fail_count))
self.result_cntx.set_result("Succeed")
# finally, execute done
def result_check(self, port_name, data):
TCActionBase.CommonTCActionBase.result_check(self, port_name, data)
self.result_cntx.append_data(port_name, data)
def main():
pass
if __name__ == '__main__':
main()