import schedule
import time
from web3 import Web3
import cobo_waas2
# Specify the Stargate contract address
STG_CLAIM_CONTRACT = "0xdfc47dcef7e8f9ab19a1b8af3eecf000c7ea0b80"
# Get the ABI from Etherscan
STG_CLAIM_ABI = []
# Specify the Stargate pool address
STG_ETH_POOL = "0x98fB8522d891F43B771e2d27367b41Ba138D0B80"
w3 = Web3(Web3.HTTPProvider("<RPC_URL>"))
contract = w3.eth.contract(address=STG_CLAIM_CONTRACT, abi=STG_CLAIM_ABI)
data = contract.encodeABI("claim", [[STG_ETH_POOL]])
def get_delegate(wallet_id, request: cobo_waas2.SafeWalletDelegates):
# Enter a context with an instance of the API client
with cobo_waas2.ApiClient(configuration) as api_client:
# Create an instance of the API class
wallet_api_instance = cobo_waas2.WalletsSmartContractWalletsApi(api_client)
try:
# Call the List Delegates operation to retrieve available Delegates
api_response = wallet_api_instance.list_safe_wallet_delegates(
wallet_id=wallet_id,
safe_wallet_delegates=request
)
if not api_response:
raise Exception("No delegate found")
print("The response of WalletsApi->list_safe_wallet_delegates:")
print(json.dumps(api_response[0].to_dict(), indent=2))
return api_response[0]
except Exception as e:
print("Exception when calling WalletsApi: %s\n", e)
def get_wallet(wallet_id) -> cobo_waas2.SafeWallet:
with cobo_waas2.ApiClient(configuration) as api_client:
# Create an instance of the API class
wallet_api_instance = cobo_waas2.WalletsApi(api_client)
try:
# Call the Get wallet information operation to retrieve the wallet information, including the wallet address
api_response = wallet_api_instance.get_wallet_by_id(
wallet_id=wallet_id
)
print("The response of WalletsApi->get_wallet_by_id:")
print(json.dumps(api_response.to_dict(), indent=2))
return api_response.actual_instance.actual_instance
except Exception as e:
print("Exception when calling WalletsApi: %s\n", e)
def contract_call(wallet_id, contract_address, data, value):
contract_call_request = cobo_waas2.SafeWalletDelegates(
actual_instance=cobo_waas2.SafeWalletDelegatesContractCall(
request_type=cobo_waas2.EstimateFeeRequestType.CONTRACTCALL,
address=contract_address,
calldata=data,
value=value,
)
)
delegate = get_delegate(wallet_id, contract_call_request)
wallet = get_wallet(wallet_id)
with cobo_waas2.ApiClient(configuration) as api_client:
wallet_api_instance = cobo_waas2.TransactionsApi(api_client)
try:
# Use the Call smart contract operation to interact with the Stargate contract
api_response = wallet_api_instance.create_contract_call_transaction(
cobo_waas2.ContractCallParams(
request_id=str(uuid.uuid4()),
chain_id=wallet.chain_id,
source=cobo_waas2.ContractCallSource(
actual_instance=cobo_waas2.SafeContractCallSource(
source_type=cobo_waas2.ContractCallSourceType.SAFE_WALLET,
wallet_id=wallet_id,
address=wallet.safe_address,
delegate=delegate,
)
),
destination=cobo_waas2.ContractCallDestination(
actual_instance=cobo_waas2.EvmContractCallDestination(
destination_type=cobo_waas2.ContractCallDestinationType.EVM_CONTRACT,
address=contract_address,
calldata=data,
value=value,
),
),
category_names=["<CATEGORY_NAME>"],
description="<DESCRIPTION>",
)
)
print("The response of TransactionsApi->create_contract_call_transaction:")
print(json.dumps(api_response.to_dict(), indent=2))
except Exception as e:
print("Exception when calling TransactionsApi: %s\n", e)
def daily_contract_call():
print("claim stargate")
contract_call(
wallet_id=wallet_id,
contract_address=STG_CLAIM_CONTRACT,
data=data,
value="0",
)
# Schedule the contract call to run every day at 9:00 a.m.
schedule.every().day.at("09:00").do(daily_contract_call)
# Keep the script running
while True:
schedule.run_pending()
time.sleep(1)