зеркало из https://github.com/github/vulcanizer.git
Merge pull request #27 from github/index_settings_api
Add some new methods for dealing with settings on an index-level
This commit is contained in:
Коммит
722857e783
|
@ -44,7 +44,7 @@ var cmdSettingUpdate = &cobra.Command{
|
|||
host, port := getConfiguration()
|
||||
v := vulcanizer.NewClient(host, port)
|
||||
|
||||
existingValue, newValue, err := v.SetSetting(settingToUpdate, valueToUpdate)
|
||||
existingValue, newValue, err := v.SetClusterSetting(settingToUpdate, valueToUpdate)
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("Error when trying to update \"%s\" to \"%s\n", settingToUpdate, valueToUpdate)
|
||||
|
|
|
@ -12,7 +12,7 @@ func init() {
|
|||
rootCmd.AddCommand(cmdSettings)
|
||||
}
|
||||
|
||||
func printSettings(settings []vulcanizer.ClusterSetting, name string) {
|
||||
func printSettings(settings []vulcanizer.Setting, name string) {
|
||||
if len(settings) == 0 {
|
||||
fmt.Println(fmt.Sprintf("No %s are set.\n", name))
|
||||
return
|
||||
|
@ -41,7 +41,7 @@ var cmdSettings = &cobra.Command{
|
|||
Run: func(cmd *cobra.Command, args []string) {
|
||||
host, port := getConfiguration()
|
||||
v := vulcanizer.NewClient(host, port)
|
||||
clusterSettings, err := v.GetSettings()
|
||||
clusterSettings, err := v.GetClusterSettings()
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("Error getting settings: %s\n", err)
|
||||
|
|
86
es.go
86
es.go
|
@ -1,7 +1,9 @@
|
|||
package vulcanizer
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
@ -83,16 +85,17 @@ type IndexHealth struct {
|
|||
|
||||
//Holds slices for persistent and transient cluster settings.
|
||||
type ClusterSettings struct {
|
||||
PersistentSettings []ClusterSetting
|
||||
TransientSettings []ClusterSetting
|
||||
PersistentSettings []Setting
|
||||
TransientSettings []Setting
|
||||
}
|
||||
|
||||
//A setting name and value with the setting name to be a "collapsed" version of the setting. A setting of:
|
||||
// { "indices": { "recovery" : { "max_bytes_per_sec": "10mb" } } }
|
||||
//would be represented by:
|
||||
// ClusterSetting{ Setting: "indices.recovery.max_bytes_per_sec", Value: "10mb" }
|
||||
type ClusterSetting struct {
|
||||
Setting, Value string
|
||||
type Setting struct {
|
||||
Setting string
|
||||
Value string
|
||||
}
|
||||
|
||||
type snapshotWrapper struct {
|
||||
|
@ -148,7 +151,7 @@ func NewClient(host string, port int) *Client {
|
|||
|
||||
const clusterSettingsPath = "_cluster/settings"
|
||||
|
||||
func settingsToStructs(rawJson string) ([]ClusterSetting, error) {
|
||||
func settingsToStructs(rawJson string) ([]Setting, error) {
|
||||
flatSettings, err := flatten.FlattenString(rawJson, "", flatten.DotStyle)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -166,9 +169,9 @@ func settingsToStructs(rawJson string) ([]ClusterSetting, error) {
|
|||
|
||||
sort.Strings(keys)
|
||||
|
||||
var clusterSettings []ClusterSetting
|
||||
var clusterSettings []Setting
|
||||
for _, k := range keys {
|
||||
setting := ClusterSetting{
|
||||
setting := Setting{
|
||||
Setting: k,
|
||||
Value: settingsMap[k].(string),
|
||||
}
|
||||
|
@ -421,7 +424,7 @@ func (c *Client) GetHealth() (ClusterHealth, error) {
|
|||
//Get all the persistent and transient cluster settings.
|
||||
//
|
||||
//Use case: You want to see the current settings in the cluster.
|
||||
func (c *Client) GetSettings() (ClusterSettings, error) {
|
||||
func (c *Client) GetClusterSettings() (ClusterSettings, error) {
|
||||
clusterSettings := ClusterSettings{}
|
||||
body, err := handleErrWithBytes(c.buildGetRequest(clusterSettingsPath))
|
||||
|
||||
|
@ -478,8 +481,8 @@ func (c *Client) SetAllocation(allocation string) (string, error) {
|
|||
|
||||
//Set a new value for a cluster setting
|
||||
//
|
||||
//Use case: You've doubled the number of nodes in your cluster and you want to increase the number of shards the cluster can relocate at one time. Calling `SetSetting("cluster.routing.allocation.cluster_concurrent_rebalance", "100")` will update that value with the cluster. Once data relocation is complete you can decrease the setting by calling `SetSetting("cluster.routing.allocation.cluster_concurrent_rebalance", "20")`.
|
||||
func (c *Client) SetSetting(setting string, value string) (string, string, error) {
|
||||
//Use case: You've doubled the number of nodes in your cluster and you want to increase the number of shards the cluster can relocate at one time. Calling `SetClusterSetting("cluster.routing.allocation.cluster_concurrent_rebalance", "100")` will update that value with the cluster. Once data relocation is complete you can decrease the setting by calling `SetClusterSetting("cluster.routing.allocation.cluster_concurrent_rebalance", "20")`.
|
||||
func (c *Client) SetClusterSetting(setting string, value string) (string, string, error) {
|
||||
|
||||
settingsBody, err := handleErrWithBytes(c.buildGetRequest(clusterSettingsPath))
|
||||
|
||||
|
@ -741,3 +744,64 @@ func (c *Client) AnalyzeTextWithField(index, field, text string) ([]Token, error
|
|||
|
||||
return tokenWrapper.Tokens, nil
|
||||
}
|
||||
|
||||
//Get the settings of an index in a pretty-printed format.
|
||||
//
|
||||
//Use case: You can view the custom settings that are set on a particular index.
|
||||
func (c *Client) GetPrettyIndexSettings(index string) (string, error) {
|
||||
body, err := handleErrWithBytes(c.buildGetRequest(fmt.Sprintf("%s/_settings", index)))
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
rawSettings := gjson.GetBytes(body, fmt.Sprintf("%s.settings.index", index)).Raw
|
||||
|
||||
var prettyPrinted bytes.Buffer
|
||||
err = json.Indent(&prettyPrinted, []byte(rawSettings), "", " ")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return prettyPrinted.String(), nil
|
||||
}
|
||||
|
||||
//Get the settings of an index in a machine-oriented format.
|
||||
//
|
||||
//Use case: You can view the custom settings that are set on a particular index.
|
||||
func (c *Client) GetIndexSettings(index string) ([]Setting, error) {
|
||||
body, err := handleErrWithBytes(c.buildGetRequest(fmt.Sprintf("%s/_settings", index)))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rawSettings := gjson.GetBytes(body, fmt.Sprintf("%s.settings.index", index)).Raw
|
||||
|
||||
settings, err := settingsToStructs(rawSettings)
|
||||
|
||||
return settings, err
|
||||
}
|
||||
|
||||
//Set a setting on an index.
|
||||
//
|
||||
//Use case: Set or update an index setting for a particular index.
|
||||
func (c *Client) SetIndexSetting(index, setting, value string) (string, string, error) {
|
||||
settingsPath := fmt.Sprintf("%s/_settings", index)
|
||||
body, err := handleErrWithBytes(c.buildGetRequest(settingsPath))
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
currentValue := gjson.GetBytes(body, fmt.Sprintf("%s.settings.index.%s", index, setting)).Str
|
||||
|
||||
agent := c.buildPutRequest(settingsPath).Set("Content-Type", "application/json").
|
||||
Send(fmt.Sprintf(`{"index" : { "%s" : "%s"}}`, setting, value))
|
||||
|
||||
_, err = handleErrWithBytes(agent)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
return currentValue, value, nil
|
||||
}
|
||||
|
|
104
es_test.go
104
es_test.go
|
@ -383,7 +383,7 @@ func TestGetHealth_TLS(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func TestGetSettings(t *testing.T) {
|
||||
func TestGetClusterSettings(t *testing.T) {
|
||||
testSetup := &ServerSetup{
|
||||
Method: "GET",
|
||||
Path: "/_cluster/settings",
|
||||
|
@ -394,7 +394,7 @@ func TestGetSettings(t *testing.T) {
|
|||
defer ts.Close()
|
||||
client := NewClient(host, port)
|
||||
|
||||
clusterSettings, err := client.GetSettings()
|
||||
clusterSettings, err := client.GetClusterSettings()
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error, got %s", err)
|
||||
|
@ -417,8 +417,8 @@ func TestGetSettings(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestSetSetting Func is an integration test for all things that use the SetSetting functionality.
|
||||
func TestSetSettings(t *testing.T) {
|
||||
// TestSetClusterSetting Func is an integration test for all things that use the SetClusterSetting functionality.
|
||||
func TestSetClusterSettings(t *testing.T) {
|
||||
|
||||
tt := []struct {
|
||||
Name string
|
||||
|
@ -484,7 +484,7 @@ func TestSetSettings(t *testing.T) {
|
|||
defer ts.Close()
|
||||
client := NewClient(host, port)
|
||||
|
||||
oldSetting, newSetting, err := client.SetSetting(x.Setting, x.SetValue)
|
||||
oldSetting, newSetting, err := client.SetClusterSetting(x.Setting, x.SetValue)
|
||||
|
||||
if err != nil {
|
||||
st.Errorf("Expected error to be nil, %s", err)
|
||||
|
@ -502,7 +502,7 @@ func TestSetSettings(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestSetSetting Func is an integration test for all things that use the SetAllocation functionality.
|
||||
// TestAllocationSettings Func is an integration test for all things that use the SetAllocation functionality.
|
||||
func TestAllocationSettings(t *testing.T) {
|
||||
|
||||
tt := []struct {
|
||||
|
@ -560,7 +560,7 @@ func TestAllocationSettings(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSetSetting_BadRequest(t *testing.T) {
|
||||
func TestSetClusterSetting_BadRequest(t *testing.T) {
|
||||
getSetup := &ServerSetup{
|
||||
Method: "GET",
|
||||
Path: "/_cluster/settings",
|
||||
|
@ -578,7 +578,7 @@ func TestSetSetting_BadRequest(t *testing.T) {
|
|||
defer ts.Close()
|
||||
client := NewClient(host, port)
|
||||
|
||||
_, _, err := client.SetSetting("cluster.routing.allocation.enable", "foo")
|
||||
_, _, err := client.SetClusterSetting("cluster.routing.allocation.enable", "foo")
|
||||
|
||||
if err == nil {
|
||||
t.Errorf("Expected error to not be nil, %s", err)
|
||||
|
@ -1085,3 +1085,91 @@ func TestAnalyzeTextWithField(t *testing.T) {
|
|||
t.Fatalf("Unexpected token got: %+v", tokens)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPrettyIndexSettings(t *testing.T) {
|
||||
testSetup := &ServerSetup{
|
||||
Method: "GET",
|
||||
Path: "/octocat/_settings",
|
||||
Response: `{"octocat":{"settings":{"index":{"number_of_shards":"5","provided_name":"octocat","creation_date":"1535035072757","analysis":{"analyzer":{"my_custom_analyzer":{"filter":["lowercase","asciifolding"],"char_filter":["html_strip"],"type":"custom","tokenizer":"standard"}}},"number_of_replicas":"0","uuid":"Q_Jm1mD2Syy8JgMUiicqcw","version":{"created":"5061099"}}}}}`,
|
||||
}
|
||||
|
||||
host, port, ts := setupTestServers(t, []*ServerSetup{testSetup})
|
||||
defer ts.Close()
|
||||
client := NewClient(host, port)
|
||||
|
||||
indexSettings, err := client.GetPrettyIndexSettings("octocat")
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error, got %s", err)
|
||||
}
|
||||
|
||||
if indexSettings == "" {
|
||||
t.Error("Unexpected index settings, got empty string")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetIndexSettings(t *testing.T) {
|
||||
testSetup := &ServerSetup{
|
||||
Method: "GET",
|
||||
Path: "/octocat/_settings",
|
||||
Response: `{"octocat":{"settings":{"index":{"number_of_shards":"5","provided_name":"octocat","creation_date":"1535035072757","analysis":{"analyzer":{"my_custom_analyzer":{"filter":["lowercase","asciifolding"],"char_filter":["html_strip"],"type":"custom","tokenizer":"standard"}}},"number_of_replicas":"0","uuid":"Q_Jm1mD2Syy8JgMUiicqcw","version":{"created":"5061099"}}}}}`,
|
||||
}
|
||||
|
||||
host, port, ts := setupTestServers(t, []*ServerSetup{testSetup})
|
||||
defer ts.Close()
|
||||
client := NewClient(host, port)
|
||||
|
||||
settings, err := client.GetIndexSettings("octocat")
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error, got %s", err)
|
||||
}
|
||||
|
||||
if len(settings) != 11 {
|
||||
t.Errorf("Unexpected number of settings, got %d", len(settings))
|
||||
}
|
||||
|
||||
for i := range settings {
|
||||
s := settings[i]
|
||||
if s.Setting == "number_of_shards" && s.Value != "5" {
|
||||
t.Errorf("Unexpected shards value, expected 5, got %s", s.Value)
|
||||
}
|
||||
|
||||
if s.Setting == "number_of_replicas" && s.Value != "0" {
|
||||
t.Errorf("Unexpected replicas value, expected 0, got %s", s.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetIndexSetting(t *testing.T) {
|
||||
getSetup := &ServerSetup{
|
||||
Method: "GET",
|
||||
Path: "/octocat/_settings",
|
||||
Response: `{"octocat":{"settings":{"index":{"number_of_shards":"5","provided_name":"octocat","creation_date":"1535035072757","analysis":{"analyzer":{"my_custom_analyzer":{"filter":["lowercase","asciifolding"],"char_filter":["html_strip"],"type":"custom","tokenizer":"standard"}}},"number_of_replicas":"0","uuid":"Q_Jm1mD2Syy8JgMUiicqcw","version":{"created":"5061099"}}}}}`,
|
||||
}
|
||||
|
||||
updateSetup := &ServerSetup{
|
||||
Method: "PUT",
|
||||
Path: "/octocat/_settings",
|
||||
Body: `{"index":{"number_of_replicas":"2"}}`,
|
||||
Response: `{"accepted": true}`,
|
||||
}
|
||||
|
||||
host, port, ts := setupTestServers(t, []*ServerSetup{getSetup, updateSetup})
|
||||
defer ts.Close()
|
||||
client := NewClient(host, port)
|
||||
|
||||
previous, current, err := client.SetIndexSetting("octocat", "number_of_replicas", "2")
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Error setting index setting: %s", err)
|
||||
}
|
||||
|
||||
if previous != "0" {
|
||||
t.Errorf("Unexpected previous setting value, expected 0, got %s", previous)
|
||||
}
|
||||
|
||||
if current != "2" {
|
||||
t.Errorf("Unexpected current setting value, expected 2, got %s", current)
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче