234 lines
8.5 KiB
Python
234 lines
8.5 KiB
Python
import os
|
||
import json
|
||
import getpass
|
||
import gc
|
||
from pathlib import Path
|
||
from Crypto.Cipher import AES
|
||
from Crypto.Protocol.KDF import PBKDF2
|
||
from Crypto.Random import get_random_bytes
|
||
from rich import print
|
||
from rich.prompt import Prompt
|
||
from bit import Key
|
||
import requests
|
||
|
||
WALLETS_DIR = "wallets"
|
||
API_BASE_DEFAULT = "https://blockstream.info/api"
|
||
|
||
if not os.path.exists(WALLETS_DIR):
|
||
os.makedirs(WALLETS_DIR)
|
||
|
||
def encrypt_wallet(secret, password):
|
||
salt = get_random_bytes(16)
|
||
key = PBKDF2(password, salt, dkLen=32)
|
||
cipher = AES.new(key, AES.MODE_GCM)
|
||
ciphertext, tag = cipher.encrypt_and_digest(secret.encode())
|
||
return {
|
||
'ciphertext': ciphertext.hex(),
|
||
'salt': salt.hex(),
|
||
'nonce': cipher.nonce.hex(),
|
||
'tag': tag.hex()
|
||
}
|
||
|
||
def decrypt_wallet(data, password):
|
||
salt = bytes.fromhex(data['salt'])
|
||
key = PBKDF2(password, salt, dkLen=32)
|
||
cipher = AES.new(key, AES.MODE_GCM, nonce=bytes.fromhex(data['nonce']))
|
||
return cipher.decrypt_and_verify(bytes.fromhex(data['ciphertext']), bytes.fromhex(data['tag'])).decode()
|
||
|
||
|
||
api_base = API_BASE_DEFAULT
|
||
selected_wallet = None
|
||
|
||
def connect_to_network():
|
||
global api_base
|
||
url = Prompt.ask("[bold cyan]Введите REST-URL узла/провайдера (Blockstream REST по умолчанию)[/]", default=API_BASE_DEFAULT)
|
||
|
||
try:
|
||
r = requests.get(f"{url.rstrip('/')}/blocks/tip/height", timeout=6)
|
||
if r.status_code == 200:
|
||
api_base = url.rstrip('/')
|
||
print("[green1]Подключено! Используется API:[/]", api_base)
|
||
else:
|
||
print(f"[red1]Ошибка соединения: HTTP {r.status_code}[/]")
|
||
except Exception as e:
|
||
print(f"[red1]Ошибка подключения: {e}[/]")
|
||
|
||
def create_wallet():
|
||
k = Key()
|
||
save_wallet(k)
|
||
show_private_key = Prompt.ask("[bold cyan]Показать приватный ключ (WIF)?[/]", choices=["y","n"], default="n")
|
||
if show_private_key == 'y':
|
||
print(f"[orange1]{k.to_wif()}[/]")
|
||
|
||
def import_wallet():
|
||
print("[bold cyan]Введите приватный ключ (WIF): [/]", end='')
|
||
wif = getpass.getpass(prompt="")
|
||
try:
|
||
k = Key(wif)
|
||
save_wallet(k)
|
||
except Exception as e:
|
||
print(f"[red1]Не удалось импортировать ключ: {e}[/]")
|
||
|
||
def save_wallet(key_obj):
|
||
print("[bold cyan]Введите пароль для шифрования кошелька: [/]", end='')
|
||
password = getpass.getpass(prompt="")
|
||
encrypted = encrypt_wallet(key_obj.to_wif(), password)
|
||
filename = os.path.join(WALLETS_DIR, f"{key_obj.address}.json")
|
||
with open(filename, 'w') as f:
|
||
json.dump({"address": key_obj.address, "data": encrypted}, f)
|
||
print(f"[green1]Кошелек сохранен: {key_obj.address}[/]")
|
||
|
||
def select_wallet():
|
||
global selected_wallet
|
||
files = os.listdir(WALLETS_DIR)
|
||
if not files:
|
||
print("[red1]Нет доступных кошельков.[/]")
|
||
return
|
||
for i, fname in enumerate(files):
|
||
print(f"[orange1][{i+1}] {Path(fname).stem}[/]")
|
||
try:
|
||
choice = int(Prompt.ask("[bold cyan]Выберите номер кошелька[/]"))
|
||
except:
|
||
print("[red1]Неверный номер.[/]")
|
||
return
|
||
with open(os.path.join(WALLETS_DIR, files[choice-1])) as f:
|
||
selected_wallet = json.load(f)
|
||
|
||
def check_password():
|
||
return get_private_key() is not None
|
||
|
||
def get_private_key():
|
||
if not selected_wallet:
|
||
print("[red1]Кошелек не выбран.[/]")
|
||
return None
|
||
print("[bold cyan]Введите пароль от кошелька: [/]", end='')
|
||
password = getpass.getpass(prompt="")
|
||
try:
|
||
wif = decrypt_wallet(selected_wallet['data'], password)
|
||
del password
|
||
return wif
|
||
except Exception:
|
||
print("[red1]Неверный пароль![/]")
|
||
return None
|
||
|
||
def delete_wallet():
|
||
global selected_wallet
|
||
while not check_password():
|
||
pass
|
||
address = Prompt.ask("[red1]При удалении кошелька доступ к средствам будет потерян, введите адрес кошелька для подтверждения операции[/]")
|
||
if address != selected_wallet['address']:
|
||
print("[red1]Операция отменена[/]")
|
||
return
|
||
file_path = os.path.join(WALLETS_DIR, selected_wallet['address'] + '.json')
|
||
if os.path.exists(file_path):
|
||
os.remove(file_path)
|
||
selected_wallet = None
|
||
print(f"[green1]Кошелек удален[/]")
|
||
|
||
def get_balance():
|
||
if not selected_wallet:
|
||
print("[red1]Кошелек не выбран.[/]")
|
||
return
|
||
addr = selected_wallet['address']
|
||
try:
|
||
r = requests.get(f"{api_base}/address/{addr}/utxo", timeout=8)
|
||
if r.status_code != 200:
|
||
print(f"[red1]Ошибка получения UTXO: HTTP {r.status_code}[/]")
|
||
return
|
||
utxos = r.json()
|
||
balance_sats = sum([u['value'] for u in utxos])
|
||
|
||
print(f"[orange1]Баланс: {balance_sats / 1e8} BTC ({balance_sats} сатоши)[/]")
|
||
except Exception as e:
|
||
print(f"[red1]Ошибка при получении баланса: {e}[/]")
|
||
|
||
def send_transaction():
|
||
if not selected_wallet:
|
||
print("[red1]Кошелек не выбран.[/]")
|
||
return
|
||
to = Prompt.ask("[bold cyan]Введите адрес получателя[/]")
|
||
amount = float(Prompt.ask("[bold cyan]Введите сумму в BTC[/]"))
|
||
fee_per_byte = int(Prompt.ask("[bold cyan]Введите комиссию (sat/byte)[/]", default="10"))
|
||
|
||
estimated_size = int(Prompt.ask("[bold cyan]Оценочный размер транзакции в байтах[/]", default="200"))
|
||
fee_sats = fee_per_byte * estimated_size
|
||
|
||
print(f"[yellow]Ориентировочная комиссия: {fee_sats} сатоши ({fee_sats/1e8} BTC)[/]")
|
||
confirm = Prompt.ask("[bold cyan]Отправить транзакцию?[/]", choices=["y","n"], default="n")
|
||
if confirm != 'y':
|
||
print("[red1]Операция отменена[/]")
|
||
return
|
||
|
||
wif = None
|
||
while not wif:
|
||
wif = get_private_key()
|
||
|
||
try:
|
||
key = Key(wif)
|
||
outputs = [(to, amount, 'btc')]
|
||
# bit.Key.send автоматически выбирает UTXO и подписывает.
|
||
# Параметр fee в send ожидает целую сумму в сатоши (absolute fee)
|
||
tx_hash = key.send(outputs, fee=fee_sats)
|
||
print(f"[green1]Транзакция отправлена! TXID: {tx_hash}[/]")
|
||
except Exception as e:
|
||
print(f"[red1]Ошибка при отправке: {e}[/]")
|
||
finally:
|
||
del wif
|
||
gc.collect()
|
||
|
||
def main():
|
||
menu = """
|
||
[bold blue]Меню:[/]
|
||
a. Подключение к сети
|
||
c. Создание кошелька
|
||
w. Выбор кошелька
|
||
b. Просмотр баланса
|
||
s. Отправка средств
|
||
i. Импорт существующего кошелька (WIF)
|
||
d. Удаление кошелька
|
||
q. Выход
|
||
"""
|
||
print(menu)
|
||
while True:
|
||
try:
|
||
choice = Prompt.ask("[slate_blue1]Выберите действие[/]").lower()
|
||
if choice == 'a':
|
||
connect_to_network()
|
||
elif choice == 'c':
|
||
create_wallet()
|
||
elif choice == 'w':
|
||
select_wallet()
|
||
elif choice == 'b':
|
||
if selected_wallet:
|
||
get_balance()
|
||
else:
|
||
print("[red1]Необходимо выбрать кошелек[/]")
|
||
elif choice == 's':
|
||
if selected_wallet:
|
||
send_transaction()
|
||
else:
|
||
print("[red1]Необходимо выбрать кошелек[/]")
|
||
elif choice == 'i':
|
||
import_wallet()
|
||
elif choice == 'd':
|
||
if not selected_wallet:
|
||
select_wallet()
|
||
if selected_wallet:
|
||
delete_wallet()
|
||
else:
|
||
print("[red1]Необходимо выбрать кошелек[/]")
|
||
elif choice == 'q':
|
||
break
|
||
elif choice == "clear":
|
||
if os.name == 'nt':
|
||
_ = os.system('cls')
|
||
else:
|
||
_ = os.system('clear')
|
||
print(menu)
|
||
except KeyboardInterrupt:
|
||
print()
|
||
continue
|
||
|
||
if __name__ == "__main__":
|
||
main()
|