forked from hazelcast/hazelcast-go-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_internals.go
238 lines (208 loc) · 9.27 KB
/
client_internals.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
//go:build hazelcastinternal
// +build hazelcastinternal
/*
* Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hazelcast
import (
"context"
"fmt"
"time"
pubcluster "github.com/hazelcast/hazelcast-go-client/cluster"
"github.com/hazelcast/hazelcast-go-client/internal/client"
"github.com/hazelcast/hazelcast-go-client/internal/hzerrors"
"github.com/hazelcast/hazelcast-go-client/internal/proto"
"github.com/hazelcast/hazelcast-go-client/internal/serialization"
pubserialization "github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/hazelcast/hazelcast-go-client/types"
)
/*
WARNING!
The constants, types, methods and functions defined under hazelcastinternal are considered internal API.
No backward-compatibility guarantees apply for this code.
*/
type Data = serialization.Data
type ClientMessage = proto.ClientMessage
type ClientMessageHandler = proto.ClientMessageHandler
type Frame = proto.Frame
type ForwardFrameIterator = proto.ForwardFrameIterator
type Pair = proto.Pair
type Schema = serialization.Schema
type GenericCompactDeserializer = serialization.GenericCompactDeserializer
type GenericPortableDeserializer = serialization.GenericPortableDeserializer
var (
NullFrame = NewFrameWith([]byte{}, IsNullFlag)
BeginFrame = NewFrameWith([]byte{}, BeginDataStructureFlag)
EndFrame = NewFrameWith([]byte{}, EndDataStructureFlag)
)
func NewClientMessageForEncode() *ClientMessage {
return proto.NewClientMessageForEncode()
}
func NewFrame(content []byte) Frame {
return proto.NewFrame(content)
}
func NewFrameWith(content []byte, flags uint16) Frame {
return proto.NewFrameWith(content, flags)
}
func NewPair(key, value interface{}) Pair {
return Pair{Key: key, Value: value}
}
func SetDefaultCompactDeserializer(ds GenericCompactDeserializer) {
serialization.DefaultCompactDeserializer = ds
}
func SetDefaultPortableDeserializer(ds GenericPortableDeserializer) {
serialization.DefaultPortableDeserializer = ds
}
func SetBuiltinDeserializer(serializer pubserialization.Serializer) {
serialization.BuiltinDeserializers[serializer.ID()] = serializer
}
type InvokeOptions struct {
// Handler is the event handler for an invocation.
Handler ClientMessageHandler
}
// ClientInternal is an accessor for a Client.
type ClientInternal struct {
client *Client
invoker *client.Invoker
}
// NewClientInternal creates the client internal accessor with the given client.
func NewClientInternal(c *Client) *ClientInternal {
return &ClientInternal{
client: c,
invoker: c.proxyManager.invoker,
}
}
// Client returns the wrapped Client instance.
func (ci *ClientInternal) Client() *Client {
return ci.client
}
// ClusterID returns the cluster ID.
// It returns zero value of types.UUID{} if the cluster ID does not exist.
func (ci *ClientInternal) ClusterID() types.UUID {
return ci.client.ic.ConnectionManager.ClusterID()
}
// OrderedMembers returns the most recent member list of the cluster.
func (ci *ClientInternal) OrderedMembers() []pubcluster.MemberInfo {
return ci.client.ic.ClusterService.OrderedMembers()
}
// ConnectedToMember returns true if there is a connection to the given member.
func (ci *ClientInternal) ConnectedToMember(uuid types.UUID) bool {
return ci.client.ic.ConnectionManager.GetConnectionForUUID(uuid) != nil
}
// EncodeData serializes the given value and returns a Data value.
func (ci *ClientInternal) EncodeData(obj interface{}) (Data, error) {
return ci.client.ic.SerializationService.ToData(obj)
}
// DecodeData deserializes the given Data and returns a value.
func (ci *ClientInternal) DecodeData(data Data) (interface{}, error) {
return ci.client.ic.SerializationService.ToObject(data)
}
// InvokeOnRandomTarget sends the given request to one of the members.
// If opts.Handler is given, it is used as an event listener handler.
func (ci *ClientInternal) InvokeOnRandomTarget(ctx context.Context, request *ClientMessage, opts *InvokeOptions) (*ClientMessage, error) {
var handler proto.ClientMessageHandler
if opts != nil {
handler = opts.Handler
}
return ci.invoker.InvokeOnRandomTarget(ctx, request, handler)
}
// InvokeOnPartition sends the given request to the member which has the given partition ID.
func (ci *ClientInternal) InvokeOnPartition(ctx context.Context, request *ClientMessage, partitionID int32, opts *InvokeOptions) (*ClientMessage, error) {
return ci.invoker.InvokeOnPartition(ctx, request, partitionID)
}
// InvokeOnKey sends the given request to the member which corresponds to the given key.
func (ci *ClientInternal) InvokeOnKey(ctx context.Context, request *proto.ClientMessage, keyData Data, opts *InvokeOptions) (*proto.ClientMessage, error) {
partitionID, err := ci.GetPartitionID(keyData)
if err != nil {
return nil, err
}
return ci.invoker.InvokeOnPartition(ctx, request, partitionID)
}
// InvokeOnMember sends the request to the given member.
// If opts.Handler is given, it is used as an event listener handler.
func (ci *ClientInternal) InvokeOnMember(ctx context.Context, request *ClientMessage, uuid types.UUID, opts *InvokeOptions) (*ClientMessage, error) {
var handler proto.ClientMessageHandler
if opts != nil {
handler = opts.Handler
}
mem := ci.client.ic.ClusterService.GetMemberByUUID(uuid)
if mem == nil {
return nil, hzerrors.NewIllegalArgumentError(fmt.Sprintf("member not found: %s", uuid.String()), nil)
}
now := time.Now()
return ci.invoker.TryInvoke(ctx, func(ctx context.Context, attempt int) (interface{}, error) {
if attempt > 0 {
request = request.Copy()
}
inv := ci.invoker.Factory().NewMemberBoundInvocation(request, mem, now)
inv.SetEventHandler(handler)
if err := ci.invoker.SendInvocation(ctx, inv); err != nil {
return nil, err
}
return inv.GetWithContext(ctx)
})
}
// GetPartitionID returns the partition ID for the given Data value.
// The returned error may be ignored.
func (ci *ClientInternal) GetPartitionID(data Data) (int32, error) {
return ci.client.ic.PartitionService.GetPartitionID(data)
}
// PartitionCount returns the partition count.
func (ci *ClientInternal) PartitionCount() int32 {
return ci.client.ic.PartitionService.PartitionCount()
}
const (
TypeFieldOffset = proto.TypeFieldOffset
MessageTypeOffset = proto.MessageTypeOffset
ByteSizeInBytes = proto.ByteSizeInBytes
BooleanSizeInBytes = proto.BooleanSizeInBytes
ShortSizeInBytes = proto.ShortSizeInBytes
CharSizeInBytes = proto.CharSizeInBytes
IntSizeInBytes = proto.IntSizeInBytes
FloatSizeInBytes = proto.FloatSizeInBytes
LongSizeInBytes = proto.LongSizeInBytes
DoubleSizeInBytes = proto.DoubleSizeInBytes
UUIDSizeInBytes = proto.UUIDSizeInBytes
UuidSizeInBytes = proto.UuidSizeInBytes // Deprecated
EntryListUUIDLongEntrySizeInBytes = proto.EntryListUUIDLongEntrySizeInBytes
EntryListIntegerLongSizeInBytes = proto.EntryListIntegerLongSizeInBytes
EntryListIntegerUUIDEntrySizeInBytes = proto.EntryListIntegerUUIDEntrySizeInBytes
LocalDateSizeInBytes = proto.LocalDateSizeInBytes
LocalTimeSizeInBytes = proto.LocalTimeSizeInBytes
LocalDateTimeSizeInBytes = proto.LocalDateTimeSizeInBytes
OffsetDateTimeSizeInBytes = proto.OffsetDateTimeSizeInBytes
CorrelationIDFieldOffset = proto.CorrelationIDFieldOffset
CorrelationIDOffset = proto.CorrelationIDOffset
FragmentationIDOffset = proto.FragmentationIDOffset
PartitionIDOffset = proto.PartitionIDOffset
RequestThreadIdOffset = proto.RequestThreadIDOffset
RequestTtlOffset = proto.RequestTTLOffset
RequestIncludeValueOffset = proto.RequestIncludeValueOffset
RequestListenerFlagsOffset = proto.RequestListenerFlagsOffset
RequestLocalOnlyOffset = proto.RequestLocalOnlyOffset
RequestReferenceIdOffset = proto.RequestReferenceIdOffset
ResponseBackupAcksOffset = proto.ResponseBackupAcksOffset
UnfragmentedMessage = proto.UnfragmentedMessage
DefaultFlags = proto.DefaultFlags
BeginFragmentFlag = proto.BeginFragmentFlag
EndFragmentFlag = proto.EndFragmentFlag
IsFinalFlag = proto.IsFinalFlag
BeginDataStructureFlag = proto.BeginDataStructureFlag
EndDataStructureFlag = proto.EndDataStructureFlag
IsNullFlag = proto.IsNullFlag
IsEventFlag = proto.IsEventFlag
BackupEventFlag = proto.BackupEventFlag
SizeOfFrameLengthAndFlags = proto.SizeOfFrameLengthAndFlags
)