【Python】实现一个简单的区块链系统

本文章利用 Python 实现一个简单的功能较为完善的区块链系统(包括区块链结构、账户、钱包、转账),采用的共识机制是 POW。

一、区块与区块链结构

Block.py

import hashlib
from datetime import datetime


class Block:
    """
        区块链结构:
            prev_hash:      父区块哈希值
            data:           区块内容
            timestamp:      区块创建时间
            hash:           区块哈希值
    """
    def __init__(self, data, prev_hash):
        # 将传入的父区块哈希值和数据保存到变量中
        self.prev_hash = prev_hash
        self.data = data

        # 获得当前的时间
        self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # 计算区块哈希值
        # 获取哈希对象
        message = hashlib.sha256()
        # 先将数据内容转为字符串并进行编码,再将它们哈希
        # 注意:update() 方法现在只接受 bytes 类型的数据,不接收 str 类型
        message.update(str(self.prev_hash).encode('utf-8'))
        message.update(str(self.prev_hash).encode('utf-8'))
        message.update(str(self.prev_hash).encode('utf-8'))
        # update() 更新 hash 对象,连续的调用该方法相当于连续的追加更新
        # 返回字符串类型的消息摘要
        self.hash = message.hexdigest()

 BlockChain.py

from Block import Block


class BlockChain:
    """
        区块链结构体
            blocks:         包含区块的列表
    """

    def __init__(self):
        self.blocks = []

    def add_block(self, block):
        """
        添加区块
        :param block:
        :return:
        """
        self.blocks.append(block)


# 新建区块
genesis_block = Block(data="创世区块", prev_hash="")
new_block1 = Block(data="张三转给李四一个比特币", prev_hash=genesis_block.hash)
new_block2 = Block(data="张三转给王五三个比特币", prev_hash=genesis_block.hash)

# 新建一个区块链对象
blockChain = BlockChain()
# 将刚才新建的区块加入区块链
blockChain.add_block(genesis_block)
blockChain.add_block(new_block1)
blockChain.add_block(new_block2)

# 打印区块链信息
print("区块链包含区块个数为:%dn" % len(blockChain.blocks))
blockHeight = 0
for block in blockChain.blocks:
    print(f"本区块高度为:{blockHeight}")
    print(f"父区块哈希:{block.prev_hash}")
    print(f"区块内容:{block.data}")
    print(f"区块哈希:{block.hash}")
    print()
    blockHeight += 1

 测试结果 

二、加入工作量证明(POW)

将工作量证明加入到 Block.py 中

import hashlib
from datetime import datetime
from time import time


class Block:
    """
        区块链结构:
            prev_hash:      父区块哈希值
            data:           区块内容
            timestamp:      区块创建时间
            hash:           区块哈希值
            nonce:          随机数
    """
    def __init__(self, data, prev_hash):
        # 将传入的父区块哈希值和数据保存到变量中
        self.prev_hash = prev_hash
        self.data = data

        # 获得当前的时间
        self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # 设置随机数、哈希初始值为 None
        self.nonce = None
        self.hash = None

    # 类的 __repr__() 方法定义了实例化对象的输出信息
    def __repr__(self):
        return f"区块内容:{self.data}n区块哈希值:{self.hash}"


class ProofOfWork:
    """
        工作量证明:
            block:          区块
            difficulty:     难度值
    """

    def __init__(self, block, difficult=5):
        self.block = block

        # 定义出块难度,默认为 5,表示有效哈希值以 5 个零开头
        self.difficulty = difficult

    def mine(self):
        """
        挖矿函数
        :return:
        """
        i = 0
        prefix = '0' * self.difficulty

        while True:
            message = hashlib.sha256()
            message.update(str(self.block.prev_hash).encode('utf-8'))
            message.update(str(self.block.data).encode('utf-8'))
            message.update(str(self.block.timestamp).encode('utf-8'))
            message.update(str(i).encode('utf-8'))
            # digest() 返回摘要,作为二进制数据字符串值
            # hexdigest() 返回摘要,作为十六进制数据字符串值
            digest = message.hexdigest()
            # str.startswith(prefix) 检测字符串是否是以 prefix(字符串)开头,返回布尔值
            if digest.startswith(prefix):
                # 幸运数字
                self.block.nonce = i
                # 区块哈希值为十六进制数据字符串摘要
                self.block.hash = digest
                return self.block
            i += 1

    def validate(self):
        """
        验证有效性
        :return:
        """
        message = hashlib.sha256()
        message.update(str(self.block.prev_hash).encode('utf-8'))
        message.update(str(self.block.data).encode('utf-8'))
        message.update(str(self.block.timestamp).encode('utf-8'))
        message.update(str(self.block.nonce).encode('utf-8'))
        digest = message.hexdigest()

        prefix = '0' * self.difficulty
        return digest.startswith(prefix)


# ++++++++测试++++++++
# 定义一个区块
b = Block(data="测试", prev_hash="")

# 定义一个工作量证明
w = ProofOfWork(b)

# 开始时间
start_time = time()
# 挖矿,并统计函数执行时间
print("+++开始挖矿+++")
valid_block = w.mine()
# 结束时间
end_time = time()
print(f"挖矿花费时间:{end_time - start_time}秒")

# 验证区块
print(f"区块哈希值是否符合规则:{w.validate()}")
print(f"区块哈希值为:{b.hash}")

测试结果

更新 BlockChain.py

from Block import Block, ProofOfWork


class BlockChain:
    """
        区块链结构体
            blocks:         包含区块的列表
    """

    def __init__(self):
        self.blocks = []

    def add_block(self, block):
        """
        添加区块
        :param block:
        :return:
        """
        self.blocks.append(block)


# 新建一个区块链对象
blockChain = BlockChain()

# 新建区块
block1 = Block(data="创世区块", prev_hash="")
w1 = ProofOfWork(block1)
genesis_block = w1.mine()
blockChain.add_block(genesis_block)

block2 = Block(data="张三转给李四一个比特币", prev_hash=genesis_block.hash)
w2 = ProofOfWork(block2)
block = w2.mine()
blockChain.add_block(block)

block3 = Block(data="张三转给王五三个比特币", prev_hash=block.hash)
w3 = ProofOfWork(block3)
block = w3.mine()
blockChain.add_block(block)

# 打印区块链信息
print("区块链包含区块个数为:%dn" % len(blockChain.blocks))
blockHeight = 0
for block in blockChain.blocks:
    print(f"本区块高度为:{blockHeight}")
    print(f"父区块哈希:{block.prev_hash}")
    print(f"区块内容:{block.data}")
    print(f"区块哈希:{block.hash}")
    print()
    blockHeight += 1
    

 测试结果

三、实现钱包、账户、交易功能

 实现钱包、账户、交易功能要先安装非对称加密算法库 ecdsa。如果网速慢,引用下面这个网站

-i https://pypi.tuna.tsinghua.edu.cn/simple

添加钱包、账户功能 Wallet.py

import base64
import binascii
from hashlib import sha256
# 导入椭圆曲线算法
from ecdsa import SigningKey, SECP256k1, VerifyingKey


class Wallet:
    """
        钱包
    """

    def __init__(self):
        """
            钱包初始化时基于椭圆曲线生成一个唯一的秘钥对,代表区块链上一个唯一的账户
        """
        # 生成私钥
        self._private_key = SigningKey.generate(curve=SECP256k1)
        # 基于私钥生成公钥
        self._public_key = self._private_key.get_verifying_key()

    @property
    def address(self):
        """
            这里通过公钥生成地址
        """
        h = sha256(self._public_key.to_pem())
        # 地址先由公钥进行哈希算法,再进行 Base64 计算而成
        return base64.b64encode(h.digest())

    @property
    def pubkey(self):
        """
            返回公钥字符串
        """
        return self._public_key.to_pem()

    def sign(self, message):
        """
            生成数字签名
        """
        h = sha256(message.encode('utf8'))
        # 利用私钥生成签名
        # 签名生成的是一串二进制字符串,为了便于查看,这里转换为 ASCII 字符串进行输出
        return binascii.hexlify(self._private_key.sign(h.digest()))


def verify_sign(pubkey, message, signature):
    """
        验证签名
    """
    verifier = VerifyingKey.from_pem(pubkey)
    h = sha256(message.encode('utf8'))
    return verifier.verify(binascii.unhexlify(signature), h.digest())

实现转账功能 Transaction.py

import json


class Transaction:
    """
        交易的结构
    """

    def __init__(self, sender, recipient, amount):
        """
            初始化交易,设置交易的发送方、接收方和交易数量
        """
        # 交易发送者的公钥
        self.pubkey = None
        # 交易的数字签名
        self.signature = None

        if isinstance(sender, bytes):
            sender = sender.decode('utf-8')
        self.sender = sender        # 发送方
        if isinstance(recipient, bytes):
            recipient = recipient.decode('utf-8')
        self.recipient = recipient  # 接收方
        self.amount = amount        # 交易数量

    def set_sign(self, signature, pubkey):
        """
            为了便于验证这个交易的可靠性,需要发送方输入他的公钥和签名
        """
        self.signature = signature  # 签名
        self.pubkey = pubkey  # 发送方公钥

    def __repr__(self):
        """
            交易大致可分为两种,一是挖矿所得,而是转账交易
            挖矿所得无发送方,以此进行区分显示不同内容
        """
        if self.sender:
            s = f"从{self.sender}转自{self.recipient}{self.amount}个加密货币"
        elif self.recipient:
            s = f"{self.recipient}挖矿所得{self.amount}个加密货币"
        else:
            s = "error"
        return s


class TransactionEncoder(json.JSONEncoder):
    """
        定义Json的编码类,用来序列化Transaction
    """
    def default(self, obj):
        if isinstance(obj, Transaction):
            return obj.__dict__
        else:
            return json.JSONEncoder.default(self, obj)
            # return super(TransactionEncoder, self).default(obj)

更新 Block.py

import hashlib
import json
from datetime import datetime
from Transaction import Transaction, TransactionEncoder


class Block:
    """
        区块结构
            prev_hash:      父区块哈希值
            transactions:   交易对
            timestamp:      区块创建时间
            hash:           区块哈希值
            Nonce:          随机数
    """

    def __init__(self, transactions, prev_hash):
        # 将传入的父哈希值和数据保存到类变量中
        self.prev_hash = prev_hash
        # 交易列表
        self.transactions = transactions
        # 获取当前时间
        self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # 设置Nonce和哈希的初始值为None
        self.nonce = None
        self.hash = None

    # 类的 __repr__() 方法定义了实例化对象的输出信息
    def __repr__(self):
        return f"区块内容:{self.transactions}n区块哈希值:{self.hash}"


class ProofOfWork:
    """
        工作量证明
            block:          区块
            difficulty:     难度值
    """

    def __init__(self, block, miner, difficult=5):
        self.block = block
        self.miner = miner

        # 定义工作量难度,默认为5,表示有效的哈希值以5个“0”开头
        self.difficulty = difficult

        # 添加挖矿奖励
        self.reward_amount = 1

    def mine(self):
        """
            挖矿函数
        """
        i = 0
        prefix = '0' * self.difficulty

        # 设置挖矿自动生成交易信息,添加挖矿奖励
        t = Transaction(
            sender="",
            recipient=self.miner.address,
            amount=self.reward_amount,
        )
        sig = self.miner.sign(json.dumps(t, cls=TransactionEncoder))
        t.set_sign(sig, self.miner.pubkey)
        self.block.transactions.append(t)

        while True:
            message = hashlib.sha256()
            message.update(str(self.block.prev_hash).encode('utf-8'))
            # 更新区块中的交易数据
            # message.update(str(self.block.data).encode('utf-8'))
            message.update(str(self.block.transactions).encode('utf-8'))
            message.update(str(self.block.timestamp).encode('utf-8'))
            message.update(str(i).encode("utf-8"))
            digest = message.hexdigest()
            if digest.startswith(prefix):
                self.block.nonce = i
                self.block.hash = digest
                return self.block
            i += 1

    def validate(self):
        """
            验证有效性
        """
        message = hashlib.sha256()
        message.update(str(self.block.prev_hash).encode('utf-8'))
        # 更新区块中的交易数据
        # message.update(str(self.block.data).encode('utf-8'))
        message.update(json.dumps(self.block.transactions).encode('utf-8'))
        message.update(str(self.block.timestamp).encode('utf-8'))
        message.update(str(self.block.nonce).encode('utf-8'))
        digest = message.hexdigest()

        prefix = '0' * self.difficulty
        return digest.startswith(prefix)

更新 BlockChain.py

from Block import Block, ProofOfWork
from Transaction import Transaction
from Wallet import Wallet, verify_sign


class BlockChain:
    """
        区块链结构体
            blocks:        包含的区块列表
    """

    def __init__(self):
        self.blocks = []

    def add_block(self, block):
        """
            添加区块
        """
        self.blocks.append(block)

    def print_list(self):
        print(f"区块链包含个数为:{len(self.blocks)}")
        for block in self.blocks:
            height = 0
            print(f"区块链高度为:{height}")
            print(f"父区块为:{block.prev_hash}")
            print(f"区块内容为:{block.transactions}")
            print(f"区块哈希值为:{block.hash}")
            height += 1
            print()

为了方便我们对区块链进行操作,我们可以在 BlockChain.py 中补充一些方法

# 传入用户和区块链,返回用户的“余额”
def get_balance(user, blockchain):
    balance = 0
    for block in blockchain.blocks:
        for t in block.transactions:
            if t.sender == user.address.decode():
                balance -= t.amount
            elif t.recipient == user.address.decode():
                balance += t.amount
    return balance


# user生成创世区块(新建区块链),并添加到区块链中
def generate_genesis_block(user):
    blockchain = BlockChain()
    new_block = Block(transactions=[], prev_hash="")
    w = ProofOfWork(new_block, user)
    genesis_block = w.mine()
    blockchain.add_block(genesis_block)
    # 返回创世区块
    return blockchain


# 用户之间进行交易并记入交易列表
def add_transaction(sender, recipient, amount):
    # 新建交易
    new_transaction = Transaction(
        sender=sender.address,
        recipient=recipient.address,
        amount=amount
    )
    # 生成数字签名
    sig = sender.sign(str(new_transaction))
    # 传入付款方的公钥和签名
    new_transaction.set_sign(sig, sender.pubkey)
    return new_transaction


# 验证交易,若验证成功则加入交易列表
def verify_new_transaction(new_transaction, transactions):
    if verify_sign(new_transaction.pubkey,
                   str(new_transaction),
                   new_transaction.signature
                   ):
        # 验证交易签名没问题,加入交易列表
        print("交易验证成功")
        transactions.append(new_transaction)
    else:
        print("交易验证失败")


# 矿工将全部验证成功的交易列表打包出块
def generate_block(miner, transactions, blockchain):
    new_block = Block(transactions=transactions,
                      prev_hash=blockchain.blocks[len(blockchain.blocks) - 1].hash)
    print("生成新的区块...")
    # 挖矿
    w = ProofOfWork(new_block, miner)
    block = w.mine()
    print("将新区块添加到区块链中")
    blockchain.add_block(block)

进行测试

# 新建交易列表
transactions = []

# 创建 3 个用户
alice = Wallet()
tom = Wallet()
bob = Wallet()

print("alice创建创世区块...")
blockchain = generate_genesis_block(alice)
print()

print(f"alice 的余额为{get_balance(alice, blockchain)}个比特币")
print(f"tom 的余额为{get_balance(tom, blockchain)}个比特币")
print(f"bob 的余额为{get_balance(bob, blockchain)}个比特币")
print()

# 打印区块链信息
blockchain.print_list()

print("新增交易:alice 转账 0.5 比特币给 tom")
nt = add_transaction(alice, tom, 0.5)
print()
verify_new_transaction(nt, transactions)
print(f"矿工 bob 将全部验证成功的交易列表打包出块...")
generate_block(bob, transactions, blockchain)
print("添加完成n")

# 打印区块链信息
blockchain.print_list()

测试结果