[simulation] Implement Prediction Market IM (#5)
This commit is contained in:
@ -129,6 +129,6 @@ class DataHandler(SmartContract):
def update_claimable_amount(self, receiver: Address, stored_data: StoredData, reward_amount: float):
# The Solidity implementation does the update in another place which is fine for it.
# Here we only update it once we're sure the refund can be completed successfully.
stored_data.claimed_by[receiver] = True
stored_data.claimable_amount -= reward_amount
if reward_amount > 0:
stored_data.claimed_by[receiver] = True
stored_data.claimable_amount -= reward_amount
@ -0,0 +1,388 @@
import random
from collections import Counter, defaultdict
from dataclasses import dataclass
from enum import Enum
from hashlib import sha256
from logging import Logger
from typing import Dict, List, Tuple
import math
import numpy as np
from injector import inject, Module, singleton
from decai.simulation.contract.balances import Balances
from decai.simulation.contract.classification.classifier import Classifier
from decai.simulation.contract.data.data_handler import StoredData
from decai.simulation.contract.incentive.incentive_mechanism import IncentiveMechanism
from decai.simulation.contract.objects import Address, Msg, RejectException, TimeMock
class MarketPhase(Enum):
""" Phases for the current market. """
# Phases are in chronological order.
""" The market is being initialized and awaiting for the requested test set index to be revealed. """
""" The market is open to data contributions. """
""" The market will no longer accept data and the test set must be revealed before rewards can be calculated. """
""" No more data contributions are being accepted but rewards still need to be calculated. """
""" Same as `REWARD` but the model needs to be re-initialized. """
""" The reward values have been computed and are ready to be collected. """
class _Contribution:
A contribution to train data.
This is stored for convenience but for some applications, storing the data could be very expensive,
instead, hashes could be stored and during the reward phase,
the hash can be used to verify data as data is re-submitted.
Note: this is not in the spirit of the prediction market (the current state should be public)
since the model would not actually be updated and the submitted data would be private
so new data contributors have very limited information.
contributor_address: Address
data: np.array
classification: int
class PredictionMarket(IncentiveMechanism):
An IM where rewards are computed based on how the model's performance changes with respect to a test set.
For now, for the purposes of the simulation, the market is only intended to be run once.
Eventually this class and the actual smart contract implementation of it
should support restarting the market with a new bounty once a market has ended.
def __init__(self,
# Injected
balances: Balances,
logger: Logger,
model: Classifier,
time_method: TimeMock,
# Parameters
any_address_claim_wait_time_s=60 * 60 * 24 * 7.
self._balances = balances
self._logger = logger
self.model = model
self._time = time_method
self._market_earliest_end_time_s = None
self._market_balances: Dict[Address, float] = defaultdict(float)
""" Keeps track of balances in the market. """
self._next_data_index = None
self.state = None
def distribute_payment_for_prediction(self, sender, value):
def get_num_contributions_in_market(self):
:return: The total number of contributions currently in the market.
This can decrease as "bad" contributors are removed during the reward phase.
return len(self._market_data)
# Methods in chronological order of the PM.
def hash_test_set(test_set):
:param test_set: A test set.
:return: The hash of `test_set`.
return sha256(str(test_set).encode()).hexdigest()
def get_test_set_hashes(num_pieces, x_test, y_test) -> Tuple[list, list]:
Helper to break the test set into `num_pieces` to initialize the market.
:param num_pieces: The number of pieces to break the test set into.
:param x_test: The features for the test set.
:param y_test: The labels for `x_test`.
:return: tuple
A list of `num_pieces` hashes for each portion of the test set.
The test set divided into `num_pieces`.
test_sets = []
test_dataset_hashes = []
assert len(x_test) == len(y_test) >= num_pieces
for i in range(num_pieces):
start = int(i / num_pieces * len(x_test))
end = int((i + 1) / num_pieces * len(x_test))
test_set = list(zip(x_test[start:end], y_test[start:end]))
assert sum(len(t) for t in test_sets) == len(x_test)
return test_dataset_hashes, test_sets
def initialize_market(self, msg: Msg,
x_init_data, y_init_data,
test_dataset_hashes: List[str],
# Ending criteria:
min_length_s: int, min_num_contributions: int) -> int:
Initialize the prediction market.
:param msg: Indicates the one posting the bounty and the amount being committed for the bounty.
The total bounty should be an integer since it also represents the number of "rounds" in the PM.
:param x_init_data: The data to use to re-initialize the model.
:param y_init_data: The labels to use to re-initialize the model.
:param test_dataset_hashes: The committed hashes for the portions of the test set.
:param min_length_s: The minimum length in seconds of the market.
:param min_num_contributions: The minimum number of contributions before ending the market.
:return: The index of the test set that must be revealed.
assert self._market_earliest_end_time_s is None
assert self._next_data_index is None, "The market end has already been triggered."
assert self.state is None
self.bounty_provider = msg.sender
self.total_bounty = msg.value
self.remaining_bounty_rounds = self.total_bounty
# TODO Instead of storing data, make sure that models can be restarted for free by storing their initial params.
self._x_init_data = x_init_data
self._y_init_data = y_init_data
self.test_set_hashes = test_dataset_hashes
assert len(self.test_set_hashes) > 1
self.test_reveal_index = random.randrange(len(self.test_set_hashes))
self.next_test_set_index_to_verify = 0
if self.next_test_set_index_to_verify == self.test_reveal_index:
self.next_test_set_index_to_verify += 1
self.min_stake = 1
self._market_data: List[_Contribution] = []
self.min_num_contributions = min_num_contributions
self._market_earliest_end_time_s = self._time() + min_length_s
self.reward_phase_end_time_s = None
# Pay the owner since it will be the owner distributing funds using `handle_refund` and `handle_reward` later.
self._balances.send(self.bounty_provider, self.owner, self.total_bounty)
self.state = MarketPhase.INITIALIZATION
return self.test_reveal_index
def add_test_set_hashes(self, msg: Msg, more_test_set_hashes: List[str]) -> int:
Add more hashes for portions of the test set to reveal.
This helps in case not all hashes can be sent in one transaction.
:param msg: The message for this transaction.
The sender must be the bounty provider.
:param more_test_set_hashes: More committed hashes for the portions of the test set.
:return: The index of the test set that must be revealed.
assert self.state == MarketPhase.INITIALIZATION
assert msg.sender == self.bounty_provider
# Ensure that a new test set is given and the sender isn't just trying to get a new random index.
assert len(more_test_set_hashes) > 0, "You must give at least one hash."
self.test_set_hashes += more_test_set_hashes
self.test_reveal_index = random.randrange(len(self.test_set_hashes))
self.next_test_set_index_to_verify = 0
if self.next_test_set_index_to_verify == self.test_reveal_index:
self.next_test_set_index_to_verify += 1
return self.test_reveal_index
def verify_test_set(self, index: int, test_set_portion):
Verify that a portion of the test set matches the committed to hash.
:param index: The index of the test set in the originally committed list of hashes.
:param test_set_portion: The portion of the test set to reveal.
assert 0 <= index < len(self.test_set_hashes)
assert len(test_set_portion) > 0
test_set_hash = self.hash_test_set(test_set_portion)
assert test_set_hash == self.test_set_hashes[index]
def reveal_init_test_set(self, test_set_portion):
Reveal the required portion of the full test set.
:param test_set_portion: The portion of the test set that must be revealed before started the Participation Phase.
assert self.state == MarketPhase.INITIALIZATION
self.verify_test_set(self.test_reveal_index, test_set_portion)
self.state = MarketPhase.PARTICIPATION
def handle_add_data(self, contributor_address: Address, msg_value: float, data, classification) -> (float, bool):
assert self.state == MarketPhase.PARTICIPATION
cost = self.min_stake
update_model = False
if cost > msg_value:
raise RejectException(f"Did not pay enough. Sent {msg_value} < {cost}")
self._market_data.append(_Contribution(contributor_address, data, classification))
self._market_balances[contributor_address] += cost
return (cost, update_model)
def end_market(self):
Signal the end of the prediction market.
assert self.state == MarketPhase.PARTICIPATION
if self.get_num_contributions_in_market() < self.min_num_contributions \
and self._time() < self._market_earliest_end_time_s:
raise RejectException("Can't end the market yet.")
self._logger.info("Ending market.")
self.state = MarketPhase.REVEAL_TEST_SET
self._next_data_index = 0
self.test_data, self.test_labels = [], []
def verify_next_test_set(self, test_set_portion):
assert self.state == MarketPhase.REVEAL_TEST_SET
self.verify_test_set(self.next_test_set_index_to_verify, test_set_portion)
test_data, test_labels = zip(*test_set_portion)
self.test_data += test_data
self.test_labels += test_labels
self.next_test_set_index_to_verify += 1
if self.next_test_set_index_to_verify == self.test_reveal_index:
self.next_test_set_index_to_verify += 1
if self.next_test_set_index_to_verify == len(self.test_set_hashes):
self.state = MarketPhase.REWARD_RE_INITIALIZE_MODEL
def process_contribution(self):
Reward Phase:
Process the next data contribution.
assert self.remaining_bounty_rounds > 0, "The market has ended."
if self.state == MarketPhase.REWARD_RE_INITIALIZE_MODEL:
self._next_data_index = 0
self._logger.debug("Remaining bounty rounds: %s", self.remaining_bounty_rounds)
self._scores = defaultdict(float)
# The paper implies that we should not retrain the model and instead only train once.
# The problem there is that a contributor is affected by bad contributions
# between them and the last counted contribution.
# So this will be implemented with retraining for now,
# though this might not be feasible with gas limits in Ethereum.
self._logger.debug("Re-initializing model.", )
self.model.init_model(self._x_init_data, self._y_init_data)
# XXX This evaluation can be expensive and likely won't work in Ethereum.
# We need to find a more efficient way to do this or let a contributor proved they did it.
self.prev_acc = self.model.evaluate(self.test_data, self.test_labels)
self._logger.debug("Accuracy: %0.2f%%", self.prev_acc * 100)
self._num_market_contributions: Dict[Address, int] = Counter()
self._worst_contributor = None
self._min_score = math.inf
self.state = MarketPhase.REWARD
assert self.state == MarketPhase.REWARD
contribution = self._market_data[self._next_data_index]
self._num_market_contributions[contribution.contributor_address] += 1
self.model.update(contribution.data, contribution.classification)
self._next_data_index += 1
need_restart = self._next_data_index >= self.get_num_contributions_in_market()
if need_restart \
or self._market_data[self._next_data_index].contributor_address != contribution.contributor_address:
# Next contributor is different.
# XXX Potentially expensive gas cost.
acc = self.model.evaluate(self.test_data, self.test_labels)
score_change = acc - self.prev_acc
new_score = self._scores[contribution.contributor_address] + score_change
self._logger.debug(" Score change for \"%s\": %0.2f (new score: %0.2f)",
contribution.contributor_address, score_change, new_score)
self._scores[contribution.contributor_address] = new_score
if new_score < self._min_score:
self._min_score = self._scores[contribution.contributor_address]
self._worst_contributor = contribution.contributor_address
elif self._worst_contributor == contribution.contributor_address and score_change > 0:
# Their score increased, they might not be the worst anymore.
# Optimize: use a heap.
self._worst_contributor, self._min_score = min(self._scores.items(), key=lambda x: x[1])
self.prev_acc = acc
if need_restart:
# Find min score and remove that address from the list.
self._logger.debug("Minimum score: \"%s\": %.2f", self._worst_contributor, self._min_score)
if self._min_score < 0:
num_rounds = self._market_balances[self._worst_contributor] / -self._min_score
if num_rounds > self.remaining_bounty_rounds:
num_rounds = self.remaining_bounty_rounds
self.remaining_bounty_rounds -= num_rounds
participants_to_remove = set()
for participant, score in self._scores.items():
self._logger.debug("Score for \"%s\": %.2f", participant, score)
self._market_balances[participant] += score * num_rounds
if self._market_balances[participant] < self._num_market_contributions[participant]:
# They don't have enough left to stake next time.
self._market_data: List[_Contribution] = list(
filter(lambda c: c.contributor_address not in participants_to_remove, self._market_data))
if self.get_num_contributions_in_market() == 0:
self.state = MarketPhase.REWARD_COLLECT
self.remaining_bounty_rounds = 0
self.reward_phase_end_time_s = self._time()
self.state = MarketPhase.REWARD_RE_INITIALIZE_MODEL
self._logger.debug("Dividing remaining bounty amongst all remaining contributors.")
num_rounds = self.remaining_bounty_rounds
self.remaining_bounty_rounds = 0
self.reward_phase_end_time_s = self._time()
self.state = MarketPhase.REWARD_COLLECT
for participant, score in self._scores.items():
self._logger.debug("Score for \"%s\": %.2f", participant, score)
self._market_balances[participant] += score * num_rounds
def handle_refund(self, submitter: Address, stored_data: StoredData,
claimable_amount: float, claimed_by_submitter: bool,
prediction) -> float:
assert self.remaining_bounty_rounds == 0, "The reward phase has not finished processing contributions."
assert self.state == MarketPhase.REWARD_COLLECT
result = self._market_balances[submitter]
self._logger.debug("Reward for \"%s\": %.2f", submitter, result)
if result > 0:
del self._market_balances[submitter]
result = 0
return result
def handle_report(self, reporter: Address, stored_data: StoredData, claimed_by_reporter: bool, prediction) -> float:
assert self.state == MarketPhase.REWARD_COLLECT, "The reward phase has not finished processing contributions."
assert self.remaining_bounty_rounds == 0
assert self.reward_phase_end_time_s > 0
if self._time() - self.reward_phase_end_time_s >= self.any_address_claim_wait_time_s:
submitter = stored_data.sender
result = self._market_balances[submitter]
if result > 0:
self._logger.debug("Giving reward for \"%s\" to \"%s\". Reward: %s", submitter, reporter, result)
del self._market_balances[reporter]
result = 0
return result
class PredictionMarketImModule(Module):
def configure(self, binder):
binder.bind(IncentiveMechanism, to=PredictionMarket)
@ -0,0 +1,268 @@
import unittest
from collections import defaultdict
from typing import cast
from injector import Injector
from decai.simulation.contract.balances import Balances
from decai.simulation.contract.classification.perceptron import PerceptronModule
from decai.simulation.contract.data.data_handler import StoredData
from decai.simulation.contract.incentive.incentive_mechanism import IncentiveMechanism
from decai.simulation.contract.incentive.prediction_market import MarketPhase, \
PredictionMarket, PredictionMarketImModule
from decai.simulation.contract.objects import Msg, TimeMock
from decai.simulation.data.data_loader import DataLoader
from decai.simulation.data.simple_data_loader import SimpleDataModule
from decai.simulation.logging_module import LoggingModule
class TestPredictionMarket(unittest.TestCase):
def test_market(self):
inj = Injector([
balances = inj.get(Balances)
data = inj.get(DataLoader)
im = cast(PredictionMarket, inj.get(IncentiveMechanism))
assert isinstance(im, PredictionMarket)
init_train_data_portion = 0.2
initializer_address = 'initializer'
total_bounty = 100_000
balances.initialize(initializer_address, total_bounty)
good_contributor_address = 'good_contributor'
initial_good_balance = 10_000
balances.initialize(good_contributor_address, initial_good_balance)
bad_contributor_address = 'bad_contributor'
initial_bad_balance = 10_000
balances.initialize(bad_contributor_address, initial_bad_balance)
(x_train, y_train), (x_test, y_test) = data.load_data()
init_idx = int(len(x_train) * init_train_data_portion)
assert init_idx > 0
x_init_data, y_init_data = x_train[:init_idx], y_train[:init_idx]
x_remaining, y_remaining = x_train[init_idx:], y_train[init_idx:]
# Split test set into pieces.
num_pieces = 10
test_dataset_hashes, test_sets = im.get_test_set_hashes(num_pieces, x_test, y_test)
# Ending criteria:
min_length_s = 100
min_num_contributions = min(len(x_remaining), 100)
# Commitment Phase
hashes_split = 3
test_reveal_index = im.initialize_market(Msg(initializer_address, total_bounty),
x_init_data, y_init_data,
min_length_s, min_num_contributions)
assert 0 <= test_reveal_index < len(test_dataset_hashes)
self.assertEqual(MarketPhase.INITIALIZATION, im.state)
test_reveal_index = im.add_test_set_hashes(Msg(initializer_address, 0), test_dataset_hashes[hashes_split:])
assert 0 <= test_reveal_index < len(test_dataset_hashes)
self.assertEqual(MarketPhase.INITIALIZATION, im.state)
self.assertEqual(MarketPhase.PARTICIPATION, im.state)
# Participation Phase
value = 100
total_deposits = defaultdict(float)
for i in range(min_num_contributions):
data = x_remaining[i]
classification = y_remaining[i]
if i % 2 == 0:
contributor = good_contributor_address
contributor = bad_contributor_address
classification = 1 - classification
cost, _ = im.handle_add_data(contributor, value, data, classification)
balances.send(contributor, im.owner, cost)
total_deposits[contributor] += cost
# Reward Phase
self.assertEqual(MarketPhase.PARTICIPATION, im.state)
self.assertEqual(MarketPhase.REVEAL_TEST_SET, im.state)
for i, test_set_portion in enumerate(test_sets):
if i != test_reveal_index:
self.assertEqual(MarketPhase.REWARD_RE_INITIALIZE_MODEL, im.state)
while im.remaining_bounty_rounds > 0:
# Collect rewards.
self.assertEqual(MarketPhase.REWARD_COLLECT, im.state)
for contributor in [good_contributor_address, bad_contributor_address]:
# Don't need to pass the right StoredData.
# noinspection PyTypeChecker
reward = im.handle_refund(contributor, None, 0, False, None)
balances.send(im.owner, contributor, reward)
self.assertGreater(total_deposits[good_contributor_address], 0)
self.assertGreater(total_deposits[bad_contributor_address], 0)
# General checks that should be true for a market with a reasonably sensitive model.
self.assertLess(balances[im.owner], total_bounty,
f"Some of the bounty should be distributed.\n"
f"Balances: {balances.get_all()}")
self.assertLess(0, balances[im.owner])
self.assertLess(balances[bad_contributor_address], initial_bad_balance)
self.assertGreater(balances[good_contributor_address], initial_good_balance)
self.assertLess(balances[bad_contributor_address], balances[good_contributor_address])
self.assertLessEqual(balances[good_contributor_address] - balances[bad_contributor_address],
self.assertEqual(initial_good_balance + initial_bad_balance + total_bounty,
balances[good_contributor_address] + balances[bad_contributor_address] +
"Should be a zero-sum.")
self.assertGreater(total_deposits[bad_contributor_address], 0)
self.assertEqual(initial_bad_balance - total_deposits[bad_contributor_address],
"The bad contributor should lose all of their deposits.")
def test_report(self):
inj = Injector([
balances = inj.get(Balances)
data = inj.get(DataLoader)
im = cast(PredictionMarket, inj.get(IncentiveMechanism))
time_method = inj.get(TimeMock)
assert isinstance(im, PredictionMarket)
init_train_data_portion = 0.2
initializer_address = 'initializer'
total_bounty = 100_000
balances.initialize(initializer_address, total_bounty)
good_contributor_address = 'good_contributor'
initial_good_balance = 10_000
balances.initialize(good_contributor_address, initial_good_balance)
bad_contributor_address = 'bad_contributor'
initial_bad_balance = 10_000
balances.initialize(bad_contributor_address, initial_bad_balance)
(x_train, y_train), (x_test, y_test) = data.load_data()
init_idx = int(len(x_train) * init_train_data_portion)
assert init_idx > 0
x_init_data, y_init_data = x_train[:init_idx], y_train[:init_idx]
x_remaining, y_remaining = x_train[init_idx:], y_train[init_idx:]
# Split test set into pieces.
num_pieces = 10
test_dataset_hashes, test_sets = im.get_test_set_hashes(num_pieces, x_test, y_test)
# Ending criteria:
min_length_s = 100
min_num_contributions = min(len(x_remaining), 100)
# Commitment Phase
test_reveal_index = im.initialize_market(Msg(initializer_address, total_bounty),
x_init_data, y_init_data,
min_length_s, min_num_contributions)
self.assertEqual(MarketPhase.INITIALIZATION, im.state)
assert 0 <= test_reveal_index < len(test_dataset_hashes)
self.assertEqual(MarketPhase.PARTICIPATION, im.state)
# Participation Phase
value = 100
total_deposits = defaultdict(float)
stored_data = None
for i in range(min_num_contributions):
data = x_remaining[i]
classification = y_remaining[i]
if i % 2 == 0:
contributor = good_contributor_address
contributor = bad_contributor_address
classification = 1 - classification
cost, _ = im.handle_add_data(contributor, value, data, classification)
if stored_data is None:
stored_data = StoredData(classification, time_method(), contributor, cost, cost)
balances.send(contributor, im.owner, cost)
total_deposits[contributor] += cost
# Reward Phase
self.assertEqual(MarketPhase.PARTICIPATION, im.state)
self.assertEqual(MarketPhase.REVEAL_TEST_SET, im.state)
for i, test_set_portion in enumerate(test_sets):
if i != test_reveal_index:
self.assertEqual(MarketPhase.REWARD_RE_INITIALIZE_MODEL, im.state)
while im.remaining_bounty_rounds > 0:
# Collect rewards.
self.assertEqual(MarketPhase.REWARD_COLLECT, im.state)
# Get some stored data.
# Make sure reporting doesn't work yet.
reward = im.handle_report(bad_contributor_address, stored_data, False, None)
self.assertEqual(0, reward, "There should be no reward yet.")
reward = im.handle_report(bad_contributor_address, stored_data, False, None)
balances.send(im.owner, bad_contributor_address, reward)
# Don't need to pass the right StoredData.
# noinspection PyTypeChecker
reward = im.handle_refund(bad_contributor_address, None, 0, False, None)
balances.send(im.owner, bad_contributor_address, reward)
# General checks that should be true for a market with a reasonably sensitive model.
self.assertLess(balances[im.owner], total_bounty,
f"Some of the bounty should be distributed.\n"
f"Balances: {balances.get_all()}")
self.assertLess(0, balances[im.owner])
self.assertGreater(total_deposits[good_contributor_address], 0)
self.assertGreater(total_deposits[bad_contributor_address], 0)
# The bad contributor profited because they reported the good contributor.
self.assertGreater(balances[bad_contributor_address], initial_bad_balance)
self.assertLess(balances[good_contributor_address], initial_good_balance)
self.assertLess(balances[good_contributor_address], balances[bad_contributor_address])
self.assertLessEqual(balances[bad_contributor_address] - balances[good_contributor_address],
self.assertEqual(initial_good_balance + initial_bad_balance + total_bounty,
balances[good_contributor_address] + balances[bad_contributor_address] +
"Should be a zero-sum.")
self.assertGreater(total_deposits[bad_contributor_address], 0)
self.assertEqual(initial_good_balance - total_deposits[good_contributor_address],
"The good contributor should lose all of their deposits.")
@ -1,9 +1,8 @@
# Objects for all smart contracts.
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Optional
from unittest.mock import Mock
from injector import inject, singleton
from injector import singleton
Address = str
""" An address that can receive funds and participate in training models. """
@ -43,25 +42,29 @@ class SmartContract(object):
class TimeMock(object):
Helps fake the current time.
Helps fake the current time (in seconds).
Ideally the value returned is an integer (like `now` in Solidity) but this is not guaranteed.
Normally in an Ethereum smart contract `now` can be called.
To speed up simulations, use this class to get the current time.
def __init__(self):
self._time_method: Mock = Mock(name='time', return_value=0)
_time: float = field(default=0, init=False)
def __call__(self, *args, **kwargs):
""" Get the currently set time (in seconds). """
return self._time
def add_time(self, amount):
""" Add `amount` (in seconds) to the current time. """
self._time += amount
def set_time(self, time_value):
""" Set the time to return when `time()` is called. """
self._time_method.return_value = time_value
self._time = time_value
def time(self):
""" Get the currently set time. """
return self._time_method()
def __call__(self, *args, **kwargs):
""" Get the currently set time. """
return self.time()
""" Get the currently set time (in seconds). """
return self._time
@ -0,0 +1,84 @@
from dataclasses import dataclass
from logging import Logger
import numpy as np
from injector import Binder, inject, Module
from decai.simulation.data.data_loader import DataLoader
class SimpleDataLoader(DataLoader):
Load simple data for testing.
_logger: Logger
def load_data(self, train_size: int = None, test_size: int = None) -> (tuple, tuple):
def _ground_truth(data):
if data[0] * data[2] > 0:
return 1
return 0
x_train = np.array([
[0, 0, 0],
[1, 1, 1],
[0, 0, 1],
[0, 1, 0],
[0, 1, 1],
[1, 0, 0],
[1, 0, 1],
[1, 1, 0],
[0, 0, 2],
[0, 2, 0],
[2, 0, 0],
[2, 0, 2],
[0, 0, -3],
[0, 3, 0],
[0, 3, -3],
[0, -3, 3],
[0, 0, 4],
[0, 4, 4],
[4, 0, 0],
[-6, 0, 0],
x_test = np.array([
[0, 2, 2],
[0, 1, -1],
[-1, 0, 0],
[0, -1, 0],
[1, -1, 2],
[0, 0, 3],
[0, -2, 0],
[0, 2, -2],
[3, 0, 0],
[-2, 0, 2],
[2, -2, 0],
if train_size is not None:
x_train = x_train[:train_size]
if test_size is not None:
x_test = x_test[:test_size]
y_train = [_ground_truth(x) for x in x_train]
y_test = [_ground_truth(x) for x in x_test]
return (x_train, y_train), (x_test, y_test)
class SimpleDataModule(Module):
Set up a `DataLoader` mainly for testing.
def configure(self, binder: Binder):
binder.bind(DataLoader, to=SimpleDataLoader)
@ -20,7 +20,8 @@ from tqdm import tqdm
from decai.simulation.contract.balances import Balances
from decai.simulation.contract.collab_trainer import CollaborativeTrainer
from decai.simulation.contract.objects import Msg, RejectException, TimeMock
from decai.simulation.contract.incentive.prediction_market import MarketPhase, PredictionMarket
from decai.simulation.contract.objects import Address, Msg, RejectException, TimeMock
from decai.simulation.data.data_loader import DataLoader
@ -29,7 +30,7 @@ class Agent:
A user to run in the simulator.
address: str
address: Address
start_balance: float
mean_deposit: float
stdev_deposit: float
@ -83,6 +84,9 @@ class Simulator(object):
agents: List[Agent],
baseline_accuracy: float = None,
init_train_data_portion: float = 0.1,
pm_test_sets: list = None,
train_size: int = None, test_size: int = None,
Run a simulation.
@ -91,6 +95,10 @@ class Simulator(object):
:param baseline_accuracy: The baseline accuracy of the model.
Usually the accuracy on a hidden test set when the model is trained with all data.
:param init_train_data_portion: The portion of the data to initially use for training. Must be [0,1].
:param pm_test_sets: The test sets for the prediction market incentive mechanism.
:param accuracy_plot_wait_s: The amount of time to wait in seconds between plotting the accuracy.
:param train_size: The amount of training data to use.
:param test_size: The amount of test data to use.
assert 0 <= init_train_data_portion <= 1
@ -152,8 +160,8 @@ class Simulator(object):
plot.legend.location = 'top_left'
plot.legend.label_text_font_size = '12pt'
# JavaScript code.
plot.xaxis[0].formatter = FuncTickFormatter(code="""
// JavaScript code
return (tick / 86400).toFixed(0);
plot.yaxis[0].formatter = PrintfTickFormatter(format="%0.1f%%")
@ -180,8 +188,11 @@ class Simulator(object):
acc_source.stream(dict(t=[t], a=[a * 100]))
save_data['accuracies'].append(dict(t=t, accuracy=a))
continuous_evaluation = not isinstance(self._decai.im, PredictionMarket)
def task():
(x_train, y_train), (x_test, y_test) = self._data_loader.load_data()
(x_train, y_train), (x_test, y_test) = \
self._data_loader.load_data(train_size=train_size, test_size=test_size)
init_idx = int(len(x_train) * init_train_data_portion)
self._logger.info("Initializing model with %d out of %d samples.",
init_idx, len(x_train))
@ -189,6 +200,7 @@ class Simulator(object):
x_remaining, y_remaining = x_train[init_idx:], y_train[init_idx:]
self._decai.model.init_model(x_init_data, y_init_data)
if self._logger.isEnabledFor(logging.DEBUG):
s = self._decai.model.evaluate(x_init_data, y_init_data)
self._logger.debug("Initial training data evaluation: %s", s)
@ -223,8 +235,9 @@ class Simulator(object):
# since it should be relatively cheaper than the deposit required to add data.
# It may not be cheaper than calling `report`.
if next_data_index >= len(x_remaining) and len(unclaimed_data) == 0:
if next_data_index >= len(x_remaining):
if not continuous_evaluation or len(unclaimed_data) == 0:
current_time, agent = q.get()
update_balance_plot = False
@ -232,13 +245,14 @@ class Simulator(object):
# Might be need to sleep to allow the plot to update.
# time.sleep(0.1)
next_accuracy_plot_time += 2E5
next_accuracy_plot_time += accuracy_plot_wait_s
accuracy = self._decai.model.evaluate(x_test, y_test)
partial(plot_accuracy_cb, t=current_time, a=accuracy))
self._logger.debug("Unclaimed data: %d", len(unclaimed_data))
pbar.set_description(f"{desc} ({len(unclaimed_data)} unclaimed)")
if continuous_evaluation:
self._logger.debug("Unclaimed data: %d", len(unclaimed_data))
pbar.set_description(f"{desc} ({len(unclaimed_data)} unclaimed)")
with open(save_path, 'w') as f:
json.dump(save_data, f, separators=(',', ':'))
@ -272,9 +286,11 @@ class Simulator(object):
msg = Msg(agent.address, value)
self._decai.add_data(msg, x, y)
update_balance_plot = True
# Don't need to plot every time. Plot less as we get more data.
update_balance_plot = next_data_index / len(x_remaining) + 0.1 < random.random()
balance = self._balances[agent.address]
unclaimed_data.append((current_time, agent, x, y))
if continuous_evaluation:
unclaimed_data.append((current_time, agent, x, y))
next_data_index += 1
except RejectException:
@ -336,7 +352,53 @@ class Simulator(object):
partial(plot_cb, agent=agent, t=current_time, b=balance))
self._logger.info("Done going through data.")
pbar.set_description(f"{desc} ({len(unclaimed_data)} unclaimed)")
if continuous_evaluation:
pbar.set_description(f"{desc} ({len(unclaimed_data)} unclaimed)")
if isinstance(self._decai.im, PredictionMarket):
for i, test_set_portion in enumerate(pm_test_sets):
if i != self._decai.im.test_reveal_index:
with tqdm(desc="Processing contributions",
unit_scale=True, mininterval=2, unit=" contributions",
) as pbar:
while self._decai.im.remaining_bounty_rounds > 0:
if self._decai.im.state == MarketPhase.REWARD_RE_INITIALIZE_MODEL:
self._time.add_time(60 * 60)
accuracy = self._decai.im.prev_acc
partial(plot_accuracy_cb, t=self._time(), a=accuracy))
pbar.total += self._decai.im.get_num_contributions_in_market()
for agent in agents:
msg = Msg(agent.address, 0)
# Find data submitted by them.
data = None
for key, stored_data in self._decai.data_handler:
if stored_data.sender == agent.address:
data = key[0]
if data is not None:
self._decai.refund(msg, data, stored_data.classification, stored_data.time)
balance = self._balances[agent.address]
partial(plot_cb, agent=agent, t=current_time, b=balance))
self._logger.info("Balance for \"%s\": %0.2f", agent.address, balance)
self._logger.warning("No data submitted by \"%s\" was found."
"\nWill not update it's balance.", agent.address)
accuracy = self._decai.im.model.evaluate(x_test, y_test)
t = self._time()
partial(plot_accuracy_cb, t=t, a=accuracy))
self._logger.info("Done issuing rewards.")
with open(save_path, 'w') as f:
json.dump(save_data, f, separators=(',', ':'))
@ -0,0 +1,117 @@
import os
import sys
import math
from injector import inject, Injector
from decai.simulation.contract.balances import Balances
from decai.simulation.contract.classification.perceptron import PerceptronModule
from decai.simulation.contract.collab_trainer import DefaultCollaborativeTrainerModule
from decai.simulation.contract.incentive.incentive_mechanism import IncentiveMechanism
from decai.simulation.contract.incentive.prediction_market import PredictionMarket, PredictionMarketImModule
from decai.simulation.contract.objects import Msg
from decai.simulation.data.data_loader import DataLoader
from decai.simulation.data.imdb_data_loader import ImdbDataModule
from decai.simulation.logging_module import LoggingModule
from decai.simulation.simulate import Agent, Simulator
# For `bokeh serve`.
sys.path.append(os.path.join(os.path.dirname(__file__), '../..'))
class Runner(object):
def __init__(self,
balances: Balances,
data: DataLoader,
im: IncentiveMechanism,
simulator: Simulator,
assert isinstance(im, PredictionMarket)
self._balances = balances
self._data = data
self._im = im
self._s = simulator
def run(self):
initializer_address = 'initializer'
total_bounty = 100_000
init_train_data_portion = 0.01
train_size = 5_000
# Set up the agents that will act in the simulation.
agents = [
# Good
mean_update_wait_s=10 * 60,
# Malicious: A determined agent with the goal of disrupting others.
mean_update_wait_s=1 * 60 * 60,
self._balances.initialize(initializer_address, total_bounty)
(x_train, y_train), (x_test, y_test) = self._data.load_data(train_size=train_size)
init_idx = int(len(x_train) * init_train_data_portion)
assert init_idx > 0
x_init_data, y_init_data = x_train[:init_idx], y_train[:init_idx]
x_remaining, y_remaining = x_train[init_idx:], y_train[init_idx:]
# Split test set into pieces.
num_pieces = 10
test_dataset_hashes, test_sets = self._im.get_test_set_hashes(num_pieces, x_test, y_test)
# Ending criteria:
min_length_s = 1_000_000
min_num_contributions = len(x_remaining)
test_reveal_index = self._im.initialize_market(Msg(initializer_address, total_bounty),
x_init_data, y_init_data,
min_length_s, min_num_contributions)
assert 0 <= test_reveal_index < len(test_dataset_hashes)
# Start the simulation.
# Accuracy on hidden test set after training with all training data:
# With num_words = 100:
# With num_words = 200:
# baseline_accuracy=0.6173,
# With num_words = 1000:
# baseline_accuracy=0.7945,
# With num_words = 10000:
# baseline_accuracy=0.84692,
# With num_words = 20000:
# baseline_accuracy=0.8484,
# Run with `bokeh serve PATH`.
if __name__.startswith('bk_script_'):
# Set up the data, model, and incentive mechanism.
inj = Injector([
Ссылка в новой задаче