mirror of
https://github.com/craigerl/aprsd.git
synced 2026-06-07 08:34:40 -04:00
Replace insecure pickle serialization with JSON
SECURITY FIX: Replace pickle.load() with json.load() to eliminate remote code execution vulnerability from malicious pickle files. Changes: - Update ObjectStoreMixin to use JSON instead of pickle - Add PacketJSONDecoder to reconstruct Packet objects from JSON - Change file extension from .p to .json - Add warning when old pickle files detected - Add OrderedDict restoration for PacketList - Update all tests to work with JSON format Users with existing pickle files must run: aprsd dev migrate-pickle Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
@@ -8,6 +9,7 @@ from unittest import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from aprsd.packets import core
|
||||
from aprsd.utils import objectstore
|
||||
|
||||
CONF = cfg.CONF
|
||||
@@ -94,7 +96,7 @@ class TestObjectStoreMixin(unittest.TestCase):
|
||||
filename = obj._save_filename()
|
||||
|
||||
self.assertIn('testobjectstore', filename.lower())
|
||||
self.assertTrue(filename.endswith('.p'))
|
||||
self.assertTrue(filename.endswith('.json'))
|
||||
|
||||
def test_save(self):
|
||||
"""Test save() method."""
|
||||
@@ -107,9 +109,9 @@ class TestObjectStoreMixin(unittest.TestCase):
|
||||
filename = obj._save_filename()
|
||||
self.assertTrue(os.path.exists(filename))
|
||||
|
||||
# Verify data was saved
|
||||
with open(filename, 'rb') as fp:
|
||||
loaded_data = pickle.load(fp)
|
||||
# Verify data was saved as JSON
|
||||
with open(filename, 'r') as fp:
|
||||
loaded_data = json.load(fp)
|
||||
self.assertEqual(loaded_data, obj.data)
|
||||
|
||||
def test_save_empty(self):
|
||||
@@ -154,13 +156,13 @@ class TestObjectStoreMixin(unittest.TestCase):
|
||||
mock_log.debug.assert_called()
|
||||
|
||||
def test_load_corrupted_file(self):
|
||||
"""Test load() with corrupted pickle file."""
|
||||
"""Test load() with corrupted JSON file."""
|
||||
obj = TestObjectStore()
|
||||
filename = obj._save_filename()
|
||||
|
||||
# Create corrupted file
|
||||
with open(filename, 'wb') as fp:
|
||||
fp.write(b'corrupted data')
|
||||
with open(filename, 'w') as fp:
|
||||
fp.write('{corrupted json data')
|
||||
|
||||
with mock.patch('aprsd.utils.objectstore.LOG') as mock_log:
|
||||
obj.load()
|
||||
@@ -246,3 +248,60 @@ class TestObjectStoreMixin(unittest.TestCase):
|
||||
self.assertEqual(len(errors), 0)
|
||||
# All operations should complete
|
||||
self.assertGreater(len(obj.data), 0)
|
||||
|
||||
def test_save_load_with_datetime(self):
|
||||
"""Test save/load with datetime objects."""
|
||||
obj = TestObjectStore()
|
||||
test_time = datetime.datetime.now()
|
||||
obj.data['timestamp'] = test_time
|
||||
obj.data['key1'] = 'value1'
|
||||
|
||||
obj.save()
|
||||
|
||||
obj2 = TestObjectStore()
|
||||
obj2.load()
|
||||
|
||||
self.assertEqual(obj2.data['key1'], 'value1')
|
||||
# Datetime should be preserved (may lose microseconds precision)
|
||||
loaded_time = obj2.data['timestamp']
|
||||
self.assertIsInstance(loaded_time, (datetime.datetime, str))
|
||||
|
||||
def test_save_load_with_packet(self):
|
||||
"""Test save/load with Packet objects."""
|
||||
obj = TestObjectStore()
|
||||
packet = core.MessagePacket(
|
||||
from_call='N0CALL',
|
||||
to_call='TEST',
|
||||
message_text='Test message',
|
||||
)
|
||||
obj.data['packet1'] = packet
|
||||
obj.data['key1'] = 'value1'
|
||||
|
||||
obj.save()
|
||||
|
||||
obj2 = TestObjectStore()
|
||||
obj2.load()
|
||||
|
||||
self.assertEqual(obj2.data['key1'], 'value1')
|
||||
# Packet should be reconstructed
|
||||
loaded_packet = obj2.data['packet1']
|
||||
self.assertIsInstance(loaded_packet, core.Packet)
|
||||
self.assertEqual(loaded_packet.from_call, 'N0CALL')
|
||||
self.assertEqual(loaded_packet.to_call, 'TEST')
|
||||
|
||||
def test_old_pickle_file_warning(self):
|
||||
"""Test warning when old pickle file exists."""
|
||||
obj = TestObjectStore()
|
||||
pickle_filename = obj._old_save_filename()
|
||||
|
||||
# Create a fake pickle file
|
||||
with open(pickle_filename, 'wb') as fp:
|
||||
fp.write(b'fake pickle data')
|
||||
|
||||
with mock.patch('aprsd.utils.objectstore.LOG') as mock_log:
|
||||
obj.load()
|
||||
# Should log warning about pickle file
|
||||
mock_log.warning.assert_called()
|
||||
call_args = str(mock_log.warning.call_args)
|
||||
self.assertIn('pickle', call_args.lower())
|
||||
self.assertIn('migrate', call_args.lower())
|
||||
|
||||
Reference in New Issue
Block a user