```python
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import unittest
from dataclasses import dataclass, field
from threading import Lock
import multiprocessing
@dataclass
class SharedData:
value: int = 0
# Using a Lock to ensure thread-safety when accessing shared data
lock: Lock = field(default_factory=Lock, init=False, repr=False)
def increment(self):
with self.lock:
self.value += 1
def get_value(self):
with self.lock:
return self.value
def worker(data: SharedData, num_iterations: int):
local_sum = 0
for _ in range(num_iterations):
local_sum += 1
# Use a lock to safely update the shared data
with data.lock:
data.value += local_sum
class TestSharedDataThreadSafety(unittest.TestCase):
def test_concurrent_increments(self):
shared_data = SharedData()
# Use 2x CPU count for threads to test both CPU-bound and I/O-bound scenarios
num_threads = multiprocessing.cpu_count() * 2
num_iterations = 1000000 // num_threads
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(worker, shared_data, num_iterations) for _ in range(num_threads)]
for future in futures:
future.result()
expected_value = num_threads * num_iterations
self.assertEqual(shared_data.get_value(), expected_value,
f"Expected {expected_value}, but got {shared_data.get_value()}")
def test_race_condition(self):
shared_data = SharedData()
race_detected = threading.Event()
def racer():
with shared_data.lock:
initial_value = shared_data.value
time.sleep(0.001) # Simulate some work
# Check if the value has changed, which would indicate a race condition
if initial_value == shared_data.value:
shared_data.value += 1
else:
race_detected.set()
threads = [threading.Thread(target=racer) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertFalse(race_detected.is_set(), "Race condition detected")
def test_stress_test(self):
shared_data = SharedData()
stop_flag = threading.Event()
def stress_worker():
local_sum = 0
while not stop_flag.is_set():
local_sum += 1
# Use a lock to safely update the shared data after intensive local computation
with shared_data.lock:
shared_data.value += local_sum
# Use CPU count for threads to maximize resource utilization
threads = [threading.Thread(target=stress_worker) for _ in range(multiprocessing.cpu_count())]
for t in threads:
t.start()
time.sleep(5) # Run for 5 seconds to simulate prolonged stress
stop_flag.set()
for t in threads:
t.join()
print(f"Stress test final value: {shared_data.get_value()}")
if __name__ == '__main__':
unittest.main()
``` |