返回

Hyperledger Fabric 事件监听(七)

写在前面

在超级账本中的事件有区块链码交易的事件,对于这些事件的监听是至关重要的,因为我们需要获得其中的很多重要信息,所以我们要学习关于事件监听的SDK,

概述

按照官方文档中的Basic Flow来看,监听事件的步骤如下:

  1. 准备通道事件的上下文
  2. 创建事件的客户端
  3. 为想要监听的事件注册
  4. 监听到事件的信息
  5. 取消注册

官方例程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ec, err := New(mockChannelProvider("mychannel"))
if err != nil {
	fmt.Println("failed to create client")
}

registration, notifier, err := ec.RegisterChaincodeEvent("examplecc", "event123")
if err != nil {
	fmt.Println("failed to register chaincode event")
}
defer ec.Unregister(registration)

select {
case ccEvent := <-notifier:
	fmt.Printf("received chaincode event %v\n", ccEvent)
case <-time.After(time.Second * 5):
	fmt.Println("timeout while waiting for chaincode event")
}

// Timeout is expected since there is no event producer
// ---------------------------------------------------------
// Output:

// timeout while waiting for chaincode event
  1. New(mockChannelProvider()):以mock形式创建一个客户端
  2. RegisterChaincodeEvent注册一个链码事件
    • 返回值registration:作为取消注册函数Unregister的入参
    • 返回值notifier:是我们接收到事件的具体信息,是一个通道类型
  3. select{case : case :}:读取通道类型的数据

监听区块事件

1
func (c *Client) RegisterBlockEvent(filter ...fab.BlockFilter) (fab.Registration, <-chan *fab.BlockEvent, error)
  • RegisterBlockEvent
    • 返回值Registration:作为取消注册函数Unregister的入参
    • 返回值BlockEvent通道类型,保存区块信息;BlockEvent结构体中有两个成员:

官方例程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ec, err := New(mockChannelProvider("mychannel"), WithBlockEvents())
if err != nil {
	fmt.Println("failed to create client")
}

registration, _, err := ec.RegisterBlockEvent()
if err != nil {
	fmt.Println("failed to register block event")
}
defer ec.Unregister(registration)

fmt.Println("block event registered successfully")
// Output:

// block event registered successfully
  1. New(mockChannelProvider()):以mock形式创建一个链码的客户端
    • WithBlockEvents():一个可选项
  2. RegisterBlockEvent注册一个区块事件
  3. Unregister:取消注册

监听链码事件

1
func (c *Client) RegisterChaincodeEvent(ccID, eventFilter string) (fab.Registration, <-chan *fab.CCEvent, error)

RegisterChaincodeEvent

  • 返回值Registration:作为取消注册函数Unregister的入参
  • 返回值CCEvent通道类型,保存链码信息,CCEvent结构体中有六个成员:
    • TxID:交易ID
    • Chaincode:链码ID
    • EventName:事件名
    • Payload:自定义
    • BlockNumber:标识区块
    • SourceURL:产生该事件的peer节点的域名

官方例程1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ec, err := New(mockChannelProvider("mychannel"))
if err != nil {
	fmt.Println("failed to create client")
}

registration, _, err := ec.RegisterChaincodeEvent("examplecc", "event123")
if err != nil {
	fmt.Println("failed to register chaincode event")
}
defer ec.Unregister(registration)

fmt.Println("chaincode event registered successfully")
// Output:

// chaincode event registered successfully
  • 与监听区块信息的例程区别是:
    • 在以mock形式创建一个客户端时没有设置可选项WithBlockEvents()
  • RegisterChaincodeEvent注册一个链码事件,有两个入参:
    • examplecc:链码名称
    • event123:事件名称

注:事件名称和payload的定义都在链码中编写,用到shim包中的API:SetEvent,在编写一个链码函数的时候,只要在链码函数的最后加上调用SetEvents方法,第一个入参写上事件名,第二个参数传入想要返回的payload信息

官方例程2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// If you require payload for chaincode events you have to use WithBlockEvents() option
ec, err := New(mockChannelProvider("mychannel"), WithBlockEvents())
if err != nil {
	fmt.Println("failed to create client")
}

registration, _, err := ec.RegisterChaincodeEvent("examplecc", "event123")
if err != nil {
	fmt.Println("failed to register chaincode event")
}
defer ec.Unregister(registration)

fmt.Println("chaincode event registered successfully")
// Output:

// chaincode event registered successfully
  • 如果想要获取payload信息,就需要在创建客户端时添加可选项WithBlockEvents()

示例

链码事件的监听需要两边的配合:首先需要在链码编写的时候,在想要监听的函数的最后加上SetEvent方法,然后再在SDK中使用链码监听事件的方法。这样我们才能够来监听一个链码事件

链码编写

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// fabric-go-sdk/blob/main/chaincode/chaincode.go
func set(stub shim.ChaincodeStubInterface, args []string) (string, error) {
	if len(args) != 2 {
		return "", fmt.Errorf("Incorrect arguments. Expecting a key and a value")
	}

	err := stub.PutState(args[0], []byte(args[1]))
	if err != nil {
		return "", fmt.Errorf("Failed to set asset: %s", args[0])
	}
	event := outputEvent{
		EventName: "set",
	}
	payload, err := json.Marshal(event)
	if err != nil {
		return "", err
	}
	err = stub.SetEvent("chaincode-event", payload)
	return args[1], nil
}
  • 事件名称:chaincode-event
  • payload信息:自定义EventNameset,当读到链码事件时,我们就能知道所触发的链码函数是set

SDK

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// fabric-go-sdk/sdkInit/integration.go
func (t *SdkEnvInfo) InitService(chaincodeID, channelID string, org *OrgInfo, sdk *fabsdk.FabricSDK) error {
	handler := &SdkEnvInfo{
		ChaincodeID: chaincodeID,
	}
	//prepare channel client context using client context
	clientChannelContext := sdk.ChannelContext(channelID, fabsdk.WithUser(org.OrgUser), fabsdk.WithOrg(org.OrgName))
	// Channel client is used to query and execute transactions (Org1 is default org)
	var err error
	t.ChClient, err = channel.New(clientChannelContext)
	if err != nil {
		return err
	}
	t.EvClient, err = event.New(clientChannelContext, event.WithBlockEvents())
	if err != nil {
		return err
	}
	handler.ChClient = t.ChClient
	handler.EvClient = t.EvClient
	return nil
}
// 注册链码事件
func regitserEvent(client *event.Client, chaincodeID string) (fabAPI.Registration, <-chan *fabAPI.CCEvent) {
	eventName := "chaincode-event"

	reg, notifier, err := client.RegisterChaincodeEvent(chaincodeID, eventName)
	if err != nil {
		fmt.Println("注册链码事件失败: %s", err)
	}

	return reg, notifier
}
// 不断读取notifier中数据
func ChainCodeEventListener(c *event.Client, ccID string) fabAPI.Registration {

	reg, notifier := regitserEvent(c, ccID)

	// consume event
	go func() {
		for e := range notifier {
			log.Printf("Receive cc event, ccid: %v \neventName: %v\n"+
				"payload: %v \ntxid: %v \nblock: %v \nsourceURL: %v\n",
				e.ChaincodeID, e.EventName, string(e.Payload), e.TxID, e.BlockNumber, e.SourceURL)
		}
	}()

	return reg
}
// 区块事件的监听和数据读取
func BlockListener(ec *event.Client) fabAPI.Registration {
	// Register monitor block event
	beReg, beCh, err := ec.RegisterBlockEvent()
	if err != nil {
		log.Printf("Register block event error: %v", err)
	}
	log.Println("Registered block event")

	// Receive block event
	go func() {
		for e := range beCh {
			log.Printf("Receive block event:\nSourceURL: %v\nNumber: %v\nHash"+
				": %v\nPreviousHash: %v\n\n",
				e.SourceURL,
				e.Block.Header.Number,
				hex.EncodeToString(e.Block.Header.DataHash),
				hex.EncodeToString(e.Block.Header.PreviousHash))
		}
	}()

	return beReg
}
  • channel.New():为通道创建客户端ChClient,用于调用智能合约
  • event.New():为事件创建客户端EvClient,后面要加上event.WithBlockEvents()选项,可以读到链码事件中payload信息

主函数

1
2
3
// fabric-go-sdk/main.go
defer info.EvClient.Unregister(sdkInit.BlockListener(info.EvClient))
defer info.EvClient.Unregister(sdkInit.ChainCodeEventListener(info.EvClient, info.ChaincodeID))

示例代码:三次set操作,最后一次get操作

GitHub

Built with Hugo
Theme Stack designed by Jimmy