Creación de oyentes de temas eth_newFilter con Python

Ethereum JSON RPC permite escuchar eventos de contrato a través de la llamada eth_newFilter . Basado en fuentes no autorizadas, eth_newFilter toma el parámetro de temas que de alguna manera es un hash que se calcula a partir del nombre y la firma del evento.

¿Dónde se explica qué es el parámetro de temas y cómo se debe calcular?

¿Cómo calcular el parámetro de temas para una firma de evento Solidy conocida en Python?

solc contract.sol --asmy verifique los valores de los códigos de operación de inserción de eventos

Respuestas (1)

Aquí hay un ejemplo de un oyente:

"""Poll Ethereum blockchain, install log hooks to call contracts.

Using geth JSON RPC API: https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter

Copyright 2016 Mikko Ohtamaa - Licensed under MIT license.
"""


import logging
from typing import Callable, Iterable, List, Optional
import datetime


from .ethjsonrpc import EthJsonRpc

from .utils import sha3

#: Default logger
_logger = logging.getLogger(__name__)


#: Called when we spot a fired event. Callback is (contract_address, event_signature, api_data)
callback_type = Callable[[str, str, dict], None]


class ContractStatus:
    """Hold information about the processing status of a single contract."""

    def __init__(self, filter_id, last_updated_at):
        self.filter_id = filter_id
        self.last_updated_at = last_updated_at


def now() -> datetime.datetime:
    """Get the current time as timezone-aware UTC timestamp."""
    return datetime.datetime.now(datetime.timezone.utc)


def calculate_event_signature(decl: str) -> str:
    """Calculate bytecode signature of an event Solidy declaration.

    Example:

    .. code-block:

        assert calculate_event_signature("VerifyTokenSet(address,uint256)") == "3D2E225F28C7AAA8014B84B0DD267E297CB25A0B24CB02AB9C9FCF76F660F05F"

    To verify signature from the contract push opcodes:

    .. code-block:: console

        solc contract.sol --asm

    To debug transactions on Morden testnet

    * https://morden.ether.camp/transaction/9685191fece0dd0ef5a02210a305738be3fceb4089003924bc53de0cce0c0103

    http://solidity.readthedocs.io/en/latest/contracts.html#events

    https://github.com/ethereum/wiki/wiki/Solidity-Features#events
    """
    return "0x" + sha3(decl.encode("utf-8")).hex().lower()


class ContractListener:
    """Fetch updates on events Solidy contract posts to Ethereum blockchain.

    """

    def __init__(self, client: EthJsonRpc, events: Iterable[str], callback: callback_type, logger=_logger):
        """Create a contract listener.

        Callbacks look like:

        .. code-block:: python

            def cb(address, event, api_data)
                pass

        :param client: EthJsonRpc instance we use to connect to geth node
        :param events: List of Solidy event signatures we want to listne like like ``["Transfer(address,address,uint256)]``
        :param callback: Callable that's going to get called for every new event detected.
        :param logger: Optional
        """
        self.logger = _logger
        self.client = client
        self.events = events
        self.callback = callback
        self.event_signatures = {calculate_event_signature(e): e for e in events}

        #: Mapping contract address -> ContractStatus
        self.currently_monitored_contracts = {}

    def install_filter(self, contract_address: str):
        """Set up event filtering for a single contract using eth_newFilter.

        :param contract_address: hex string
        """

        installed_filter_id = self.client.eth_newFilter(from_block=0, address=contract_address)
        status = ContractStatus(filter_id=installed_filter_id, last_updated_at=None)
        self.currently_monitored_contracts[contract_address] = status

    def process_events(self, status: ContractStatus, changes: Optional[List[dict]]) -> int:

        updates = 0

        # Nothing changed
        if changes is None:
            return 0

        for change in changes:

            contract_address = change["address"]

            if contract_address not in self.currently_monitored_contracts:
                self.logger.warn("Received a change for non-monitored contract %s", contract_address)
                continue

            topics = change["topics"]
            if not topics:
                self.logger.warn("Did not get topics with change data %s", change)
                continue

            event_type = topics[0]
            if event_type not in self.event_signatures:
                self.logger.warn("Unknown event signature %s", change)
                continue

            try:
                self.callback(contract_address, self.event_signatures[event_type], change)
                updates += 1
            except Exception as e:
                # IF we have bad code for processing one contract, don't stop at that but keep pushing for others
                self.logger.error("Failed to update contract %s", contract_address)
                self.logger.exception(e)

        status.last_updated_at = now()

        return updates

    def fetch_all(self, contract: str) -> int:
        status = self.currently_monitored_contracts[contract]
        filter_id = status.filter_id

        # Signature different as for newFilter :(
        changes = self.client.eth_getLogs(dict(fromBlock=0, address=contract))
        return self.process_events(status, changes)

    def fetch_changes(self, contract) -> int:
        """Fetch latest events from geth.

        .. note ::

                The some transction might be posted twice due to ramp up and poll calls running differently.
                Always make sure callbacks handle this.

        :param contracts: List of contract addresses as hex string we are interested in

        :return: Number of callbacks made
        """
        status = self.currently_monitored_contracts[contract]
        filter_id = status.filter_id
        changes = self.client.eth_getFilterChanges(filter_id=filter_id)
        return self.process_events(status, changes)

    def monitor_contract(self, contract_address) -> int:
        """Start monitoring a contract and run callback for its all past events.

        If contract is already added do nothing.

        :param contract_address:
        :return: Number of triggered callbacks
        """
        assert type(contract_address) == str
        assert contract_address.startswith("0x")
        contract_address = contract_address.lower()

        if contract_address in self.currently_monitored_contracts:
            return 0

        self.install_filter(contract_address)

        return self.fetch_all(contract_address)

    def remove_contract(self, contract_address):
        del self.currently_monitored_contracts["contract_address"]

    def poll(self):
        """Fetch changes to all monitored contracts.

        Note that some events might be posted twice due to time elapse between ``monitor_contract`` and ``poll``.

        :return: Number of triggered callbacks
        """
        updates = 0
        for c in self.currently_monitored_contracts.keys():
            updates += self.fetch_changes(c)
        return updates

Aquí hay un ejemplo de uso:

import logging
import transaction

from sqlalchemy.orm import Session

from shareregistry.contractlistener import ContractListener
from shareregistry.models import Entity
from shareregistry.utils import bin_to_eth_address, eth_address_to_bin, txid_to_bin
from .ethjsonrpc import EthJsonRpc


logger = logging.getLogger(__name__)

class EntityUpdater:

    def __init__(self, client: EthJsonRpc, dbsession: Session):
        self.client = client
        self.dbsession = dbsession
        self.events = {
            "created": "Created(address,uint256,string,string)",
            "transfer": "Transfer(address,address,uint256)",
            "verify": "VerifyTokenSet(address,uint256)",
        }

        self.contract_listener = ContractListener(self.client, self.events.values(), self.on_blockchain_event)

    def on_blockchain_event(self, address: str, event: str, api_data: dict):
        """Called by ContractLister to tell us about the contract events.

        :param address: Contract address as a hex string
        :param event: One of event signatures
        :param api_data: eth_getFilterChanges() result https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
        :return:
        """
        logger.info("Received contract event %s for %s", event, address)

        bin_address = eth_address_to_bin(address)

        with transaction.manager:
            entity = self.dbsession.query(Entity).filter_by(contract_address=bin_address).one()

            if event == self.events["created"]:
                txid = txid_to_bin(api_data["transactionHash"])
                entity.initial_supply = 0
                entity.txid = txid
            elif event == self.events["transfer"]:
                from_address = api_data["topics"][1]
                to_address = api_data["topics"][2]
                value = int(api_data["data"][0])
                txid = api_data["transactionHash"]
            else:
                raise RuntimeError("Unknown event: {}".format(event))

    def update_all(self) -> int:
        """Poll geth and get updates for the contracts in our database."""
        updates = 0
        contracts = []

        # Get list of all known contracts in the database
        with transaction.manager:
            for e in self.dbsession.query(Entity).all():
                if not e.contract_address:
                    continue
                contracts.append(bin_to_eth_address(e.contract_address))

        # Each callback will run in its own db transaction context
        for c in contracts:
            updates += self.contract_listener.monitor_contract(c)

        updates += self.contract_listener.poll()

        return updates
¿Qué pasa si finalmente surge la excepción "Error al actualizar el contrato" y algunos cambios no se escriben en su base de datos local? ¿No significaría eso que no puede usar eth_getFilterChanges nuevamente ya que la cadena de bloques pensaría que ya tiene esos cambios? ¿Cómo manejas esta situación? ¿O hay algo que me he perdido (en el código o en cómo funcionan los filtros)?
No entendí tu pregunta. Consulte el código fuente completo github.com/websauna/websauna.wallet/blob/feat/ethereum-3/…
Veo que lo hiciste de una manera diferente ahora. Solo quería entender cómo funciona eth_getFilterChanges. AFAIU devuelve un evento una sola vez. Por lo tanto, uno debe asegurarse de que su aplicación no "pierda" ese evento. Y si es así, la aplicación tendrá que obtener ese evento con otro método (como web3.get_logs, por ejemplo).
@takeshi: Recomiendo no hacer stateful (filtro nuevo, obtener cambios de filtro), pero administre todo el estado del filtro en el código de su aplicación y solo use getLogs.
@takeshi: tiene más control y, en caso de duda, puede volver a leer los eventos y actualizar la base de datos de su aplicación.
Sí, esto es lo que estoy haciendo ahora mismo. Gracias por despejar mis dudas!